]> projects.mako.cc - twitter-api-cdsw-solutions/blob - tweepy/auth.py
86c443091afc5e35c77caa801c8854288a72693d
[twitter-api-cdsw-solutions] / tweepy / auth.py
1 # Tweepy
2 # Copyright 2009-2010 Joshua Roesslein
3 # See LICENSE for details.
4
5 from urllib2 import Request, urlopen
6 import base64
7
8 from tweepy import oauth
9 from tweepy.error import TweepError
10 from tweepy.api import API
11
12
13 class AuthHandler(object):
14
15     def apply_auth(self, url, method, headers, parameters):
16         """Apply authentication headers to request"""
17         raise NotImplementedError
18
19     def get_username(self):
20         """Return the username of the authenticated user"""
21         raise NotImplementedError
22
23
24 class OAuthHandler(AuthHandler):
25     """OAuth authentication handler"""
26
27     OAUTH_HOST = 'api.twitter.com'
28     OAUTH_ROOT = '/oauth/'
29
30     def __init__(self, consumer_key, consumer_secret, callback=None, secure=True):
31         if type(consumer_key) == unicode:
32             consumer_key = bytes(consumer_key)
33
34         if type(consumer_secret) == unicode:
35             consumer_secret = bytes(consumer_secret)
36
37         self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secret)
38         self._sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1()
39         self.request_token = None
40         self.access_token = None
41         self.callback = callback
42         self.username = None
43         self.secure = secure
44
45     def _get_oauth_url(self, endpoint, secure=True):
46         if self.secure or secure:
47             prefix = 'https://'
48         else:
49             prefix = 'http://'
50
51         return prefix + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
52
53     def apply_auth(self, url, method, headers, parameters):
54         request = oauth.OAuthRequest.from_consumer_and_token(
55             self._consumer, http_url=url, http_method=method,
56             token=self.access_token, parameters=parameters
57         )
58         request.sign_request(self._sigmethod, self._consumer, self.access_token)
59         headers.update(request.to_header())
60
61     def _get_request_token(self):
62         try:
63             url = self._get_oauth_url('request_token')
64             request = oauth.OAuthRequest.from_consumer_and_token(
65                 self._consumer, http_url=url, callback=self.callback
66             )
67             request.sign_request(self._sigmethod, self._consumer, None)
68             resp = urlopen(Request(url, headers=request.to_header()))
69             return oauth.OAuthToken.from_string(resp.read())
70         except Exception as e:
71             raise TweepError(e)
72
73     def set_request_token(self, key, secret):
74         self.request_token = oauth.OAuthToken(key, secret)
75
76     def set_access_token(self, key, secret):
77         self.access_token = oauth.OAuthToken(key, secret)
78
79     def get_authorization_url(self, signin_with_twitter=False):
80         """Get the authorization URL to redirect the user"""
81         try:
82             # get the request token
83             self.request_token = self._get_request_token()
84
85             # build auth request and return as url
86             if signin_with_twitter:
87                 url = self._get_oauth_url('authenticate')
88             else:
89                 url = self._get_oauth_url('authorize')
90             request = oauth.OAuthRequest.from_token_and_callback(
91                 token=self.request_token, http_url=url
92             )
93
94             return request.to_url()
95         except Exception as e:
96             raise TweepError(e)
97
98     def get_access_token(self, verifier=None):
99         """
100         After user has authorized the request token, get access token
101         with user supplied verifier.
102         """
103         try:
104             url = self._get_oauth_url('access_token')
105
106             # build request
107             request = oauth.OAuthRequest.from_consumer_and_token(
108                 self._consumer,
109                 token=self.request_token, http_url=url,
110                 verifier=str(verifier)
111             )
112             request.sign_request(self._sigmethod, self._consumer, self.request_token)
113
114             # send request
115             resp = urlopen(Request(url, headers=request.to_header()))
116             self.access_token = oauth.OAuthToken.from_string(resp.read())
117             return self.access_token
118         except Exception as e:
119             raise TweepError(e)
120
121     def get_xauth_access_token(self, username, password):
122         """
123         Get an access token from an username and password combination.
124         In order to get this working you need to create an app at
125         http://twitter.com/apps, after that send a mail to api@twitter.com
126         and request activation of xAuth for it.
127         """
128         try:
129             url = self._get_oauth_url('access_token', secure=True) # must use HTTPS
130             request = oauth.OAuthRequest.from_consumer_and_token(
131                 oauth_consumer=self._consumer,
132                 http_method='POST', http_url=url,
133                 parameters = {
134                     'x_auth_mode': 'client_auth',
135                     'x_auth_username': username,
136                     'x_auth_password': password
137                 }
138             )
139             request.sign_request(self._sigmethod, self._consumer, None)
140
141             resp = urlopen(Request(url, data=request.to_postdata()))
142             self.access_token = oauth.OAuthToken.from_string(resp.read())
143             return self.access_token
144         except Exception as e:
145             raise TweepError(e)
146
147     def get_username(self):
148         if self.username is None:
149             api = API(self)
150             user = api.verify_credentials()
151             if user:
152                 self.username = user.screen_name
153             else:
154                 raise TweepError("Unable to get username, invalid oauth token!")
155         return self.username
156

Benjamin Mako Hill || Want to submit a patch?