Merge branch 'master' of github.com:guyrt/twitter-data-examples
[twitter-api-cdsw] / tweepy / auth.py
1 from __future__ import print_function
2
3 import six
4 import logging
5
6 from tweepy.error import TweepError
7 from tweepy.api import API
8 import requests
9 from requests_oauthlib import OAuth1Session, OAuth1
10 from requests.auth import AuthBase
11 from six.moves.urllib.parse import parse_qs
12
13 WARNING_MESSAGE = """Warning! Due to a Twitter API bug, signin_with_twitter
14 and access_type don't always play nice together. Details
15 https://dev.twitter.com/discussions/21281"""
16
17
18 class AuthHandler(object):
19
20     def apply_auth(self, url, method, headers, parameters):
21         """Apply authentication headers to request"""
22         raise NotImplementedError
23
24     def get_username(self):
25         """Return the username of the authenticated user"""
26         raise NotImplementedError
27
28
29 class OAuthHandler(AuthHandler):
30     """OAuth authentication handler"""
31     OAUTH_HOST = 'api.twitter.com'
32     OAUTH_ROOT = '/oauth/'
33
34     def __init__(self, consumer_key, consumer_secret, callback=None):
35         if type(consumer_key) == six.text_type:
36             consumer_key = consumer_key.encode('ascii')
37
38         if type(consumer_secret) == six.text_type:
39             consumer_secret = consumer_secret.encode('ascii')
40
41         self.consumer_key = consumer_key
42         self.consumer_secret = consumer_secret
43         self.access_token = None
44         self.access_token_secret = None
45         self.callback = callback
46         self.username = None
47         self.oauth = OAuth1Session(consumer_key,
48                                    client_secret=consumer_secret,
49                                    callback_uri=self.callback)
50
51     def _get_oauth_url(self, endpoint):
52         return 'https://' + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
53
54     def apply_auth(self):
55         return OAuth1(self.consumer_key,
56                       client_secret=self.consumer_secret,
57                       resource_owner_key=self.access_token,
58                       resource_owner_secret=self.access_token_secret,
59                       decoding=None)
60
61     def _get_request_token(self, access_type=None):
62         try:
63             url = self._get_oauth_url('request_token')
64             if access_type:
65                 url += '?x_auth_access_type=%s' % access_type
66             return self.oauth.fetch_request_token(url)
67         except Exception as e:
68             raise TweepError(e)
69
70     def set_access_token(self, key, secret):
71         self.access_token = key
72         self.access_token_secret = secret
73
74     def get_authorization_url(self,
75                               signin_with_twitter=False,
76                               access_type=None):
77         """Get the authorization URL to redirect the user"""
78         try:
79             if signin_with_twitter:
80                 url = self._get_oauth_url('authenticate')
81                 if access_type:
82                     logging.warning(WARNING_MESSAGE)
83             else:
84                 url = self._get_oauth_url('authorize')
85             self.request_token = self._get_request_token(access_type=access_type)
86             return self.oauth.authorization_url(url)
87         except Exception as e:
88             raise
89             raise TweepError(e)
90
91     def get_access_token(self, verifier=None):
92         """
93         After user has authorized the request token, get access token
94         with user supplied verifier.
95         """
96         try:
97             url = self._get_oauth_url('access_token')
98             self.oauth = OAuth1Session(self.consumer_key,
99                                        client_secret=self.consumer_secret,
100                                        resource_owner_key=self.request_token['oauth_token'],
101                                        resource_owner_secret=self.request_token['oauth_token_secret'],
102                                        verifier=verifier, callback_uri=self.callback)
103             resp = self.oauth.fetch_access_token(url)
104             self.access_token = resp['oauth_token']
105             self.access_token_secret = resp['oauth_token_secret']
106             return self.access_token, self.access_token_secret
107         except Exception as e:
108             raise TweepError(e)
109
110     def get_xauth_access_token(self, username, password):
111         """
112         Get an access token from an username and password combination.
113         In order to get this working you need to create an app at
114         http://twitter.com/apps, after that send a mail to api@twitter.com
115         and request activation of xAuth for it.
116         """
117         try:
118             url = self._get_oauth_url('access_token')
119             oauth = OAuth1(self.consumer_key,
120                            client_secret=self.consumer_secret)
121             r = requests.post(url=url,
122                               auth=oauth,
123                               headers={'x_auth_mode': 'client_auth',
124                                        'x_auth_username': username,
125                                        'x_auth_password': password})
126
127             print(r.content)
128             credentials = parse_qs(r.content)
129             return credentials.get('oauth_token')[0], credentials.get('oauth_token_secret')[0]
130         except Exception as e:
131             raise TweepError(e)
132
133     def get_username(self):
134         if self.username is None:
135             api = API(self)
136             user = api.verify_credentials()
137             if user:
138                 self.username = user.screen_name
139             else:
140                 raise TweepError('Unable to get username,'
141                                  ' invalid oauth token!')
142         return self.username
143
144
145 class OAuth2Bearer(AuthBase):
146     def __init__(self, bearer_token):
147         self.bearer_token = bearer_token
148
149     def __call__(self, request):
150         request.headers['Authorization'] = 'Bearer ' + self.bearer_token
151         return request
152
153
154 class AppAuthHandler(AuthHandler):
155     """Application-only authentication handler"""
156
157     OAUTH_HOST = 'api.twitter.com'
158     OAUTH_ROOT = '/oauth2/'
159
160     def __init__(self, consumer_key, consumer_secret):
161         self.consumer_key = consumer_key
162         self.consumer_secret = consumer_secret
163         self._bearer_token = ''
164
165         resp = requests.post(self._get_oauth_url('token'),
166                              auth=(self.consumer_key,
167                                    self.consumer_secret),
168                              data={'grant_type': 'client_credentials'})
169         data = resp.json()
170         if data.get('token_type') != 'bearer':
171             raise TweepError('Expected token_type to equal "bearer", '
172                              'but got %s instead' % data.get('token_type'))
173
174         self._bearer_token = data['access_token']
175
176     def _get_oauth_url(self, endpoint):
177         return 'https://' + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
178
179     def apply_auth(self):
180         return OAuth2Bearer(self._bearer_token)

Benjamin Mako Hill || Want to submit a patch?