]> projects.mako.cc - twitter-api-cdsw/blobdiff - tweepy/auth.py
Switched to stdout codec, and switched ignore to replace. This could be a teaching...
[twitter-api-cdsw] / tweepy / auth.py
index 86c443091afc5e35c77caa801c8854288a72693d..b15434bcdbbb65b0962d103336209efd0927bdc4 100644 (file)
@@ -1,13 +1,18 @@
-# Tweepy
-# Copyright 2009-2010 Joshua Roesslein
-# See LICENSE for details.
+from __future__ import print_function
 
-from urllib2 import Request, urlopen
-import base64
+import six
+import logging
 
-from tweepy import oauth
 from tweepy.error import TweepError
 from tweepy.api import API
+import requests
+from requests_oauthlib import OAuth1Session, OAuth1
+from requests.auth import AuthBase
+from six.moves.urllib.parse import parse_qs
+
+WARNING_MESSAGE = """Warning! Due to a Twitter API bug, signin_with_twitter
+and access_type don't always play nice together. Details
+https://dev.twitter.com/discussions/21281"""
 
 
 class AuthHandler(object):
@@ -23,76 +28,64 @@ class AuthHandler(object):
 
 class OAuthHandler(AuthHandler):
     """OAuth authentication handler"""
-
     OAUTH_HOST = 'api.twitter.com'
     OAUTH_ROOT = '/oauth/'
 
-    def __init__(self, consumer_key, consumer_secret, callback=None, secure=True):
-        if type(consumer_key) == unicode:
-            consumer_key = bytes(consumer_key)
+    def __init__(self, consumer_key, consumer_secret, callback=None):
+        if type(consumer_key) == six.text_type:
+            consumer_key = consumer_key.encode('ascii')
 
-        if type(consumer_secret) == unicode:
-            consumer_secret = bytes(consumer_secret)
+        if type(consumer_secret) == six.text_type:
+            consumer_secret = consumer_secret.encode('ascii')
 
-        self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secret)
-        self._sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1()
-        self.request_token = None
+        self.consumer_key = consumer_key
+        self.consumer_secret = consumer_secret
         self.access_token = None
+        self.access_token_secret = None
         self.callback = callback
         self.username = None
-        self.secure = secure
+        self.oauth = OAuth1Session(consumer_key,
+                                   client_secret=consumer_secret,
+                                   callback_uri=self.callback)
 
-    def _get_oauth_url(self, endpoint, secure=True):
-        if self.secure or secure:
-            prefix = 'https://'
-        else:
-            prefix = 'http://'
+    def _get_oauth_url(self, endpoint):
+        return 'https://' + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
 
-        return prefix + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
+    def apply_auth(self):
+        return OAuth1(self.consumer_key,
+                      client_secret=self.consumer_secret,
+                      resource_owner_key=self.access_token,
+                      resource_owner_secret=self.access_token_secret,
+                      decoding=None)
 
-    def apply_auth(self, url, method, headers, parameters):
-        request = oauth.OAuthRequest.from_consumer_and_token(
-            self._consumer, http_url=url, http_method=method,
-            token=self.access_token, parameters=parameters
-        )
-        request.sign_request(self._sigmethod, self._consumer, self.access_token)
-        headers.update(request.to_header())
-
-    def _get_request_token(self):
+    def _get_request_token(self, access_type=None):
         try:
             url = self._get_oauth_url('request_token')
-            request = oauth.OAuthRequest.from_consumer_and_token(
-                self._consumer, http_url=url, callback=self.callback
-            )
-            request.sign_request(self._sigmethod, self._consumer, None)
-            resp = urlopen(Request(url, headers=request.to_header()))
-            return oauth.OAuthToken.from_string(resp.read())
+            if access_type:
+                url += '?x_auth_access_type=%s' % access_type
+            return self.oauth.fetch_request_token(url)
         except Exception as e:
             raise TweepError(e)
 
-    def set_request_token(self, key, secret):
-        self.request_token = oauth.OAuthToken(key, secret)
-
     def set_access_token(self, key, secret):
-        self.access_token = oauth.OAuthToken(key, secret)
+        self.access_token = key
+        self.access_token_secret = secret
 
-    def get_authorization_url(self, signin_with_twitter=False):
+    def get_authorization_url(self,
+                              signin_with_twitter=False,
+                              access_type=None):
         """Get the authorization URL to redirect the user"""
         try:
-            # get the request token
-            self.request_token = self._get_request_token()
-
-            # build auth request and return as url
             if signin_with_twitter:
                 url = self._get_oauth_url('authenticate')
+                if access_type:
+                    logging.warning(WARNING_MESSAGE)
             else:
                 url = self._get_oauth_url('authorize')
-            request = oauth.OAuthRequest.from_token_and_callback(
-                token=self.request_token, http_url=url
-            )
-
-            return request.to_url()
+            self.request_token = self._get_request_token(access_type=access_type)
+            return self.oauth.authorization_url(url)
         except Exception as e:
+            raise
             raise TweepError(e)
 
     def get_access_token(self, verifier=None):
@@ -102,19 +95,15 @@ class OAuthHandler(AuthHandler):
         """
         try:
             url = self._get_oauth_url('access_token')
-
-            # build request
-            request = oauth.OAuthRequest.from_consumer_and_token(
-                self._consumer,
-                token=self.request_token, http_url=url,
-                verifier=str(verifier)
-            )
-            request.sign_request(self._sigmethod, self._consumer, self.request_token)
-
-            # send request
-            resp = urlopen(Request(url, headers=request.to_header()))
-            self.access_token = oauth.OAuthToken.from_string(resp.read())
-            return self.access_token
+            self.oauth = OAuth1Session(self.consumer_key,
+                                       client_secret=self.consumer_secret,
+                                       resource_owner_key=self.request_token['oauth_token'],
+                                       resource_owner_secret=self.request_token['oauth_token_secret'],
+                                       verifier=verifier, callback_uri=self.callback)
+            resp = self.oauth.fetch_access_token(url)
+            self.access_token = resp['oauth_token']
+            self.access_token_secret = resp['oauth_token_secret']
+            return self.access_token, self.access_token_secret
         except Exception as e:
             raise TweepError(e)
 
@@ -126,21 +115,18 @@ class OAuthHandler(AuthHandler):
         and request activation of xAuth for it.
         """
         try:
-            url = self._get_oauth_url('access_token', secure=True) # must use HTTPS
-            request = oauth.OAuthRequest.from_consumer_and_token(
-                oauth_consumer=self._consumer,
-                http_method='POST', http_url=url,
-                parameters = {
-                    'x_auth_mode': 'client_auth',
-                    'x_auth_username': username,
-                    'x_auth_password': password
-                }
-            )
-            request.sign_request(self._sigmethod, self._consumer, None)
-
-            resp = urlopen(Request(url, data=request.to_postdata()))
-            self.access_token = oauth.OAuthToken.from_string(resp.read())
-            return self.access_token
+            url = self._get_oauth_url('access_token')
+            oauth = OAuth1(self.consumer_key,
+                           client_secret=self.consumer_secret)
+            r = requests.post(url=url,
+                              auth=oauth,
+                              headers={'x_auth_mode': 'client_auth',
+                                       'x_auth_username': username,
+                                       'x_auth_password': password})
+
+            print(r.content)
+            credentials = parse_qs(r.content)
+            return credentials.get('oauth_token')[0], credentials.get('oauth_token_secret')[0]
         except Exception as e:
             raise TweepError(e)
 
@@ -151,6 +137,44 @@ class OAuthHandler(AuthHandler):
             if user:
                 self.username = user.screen_name
             else:
-                raise TweepError("Unable to get username, invalid oauth token!")
+                raise TweepError('Unable to get username,'
+                                 ' invalid oauth token!')
         return self.username
 
+
+class OAuth2Bearer(AuthBase):
+    def __init__(self, bearer_token):
+        self.bearer_token = bearer_token
+
+    def __call__(self, request):
+        request.headers['Authorization'] = 'Bearer ' + self.bearer_token
+        return request
+
+
+class AppAuthHandler(AuthHandler):
+    """Application-only authentication handler"""
+
+    OAUTH_HOST = 'api.twitter.com'
+    OAUTH_ROOT = '/oauth2/'
+
+    def __init__(self, consumer_key, consumer_secret):
+        self.consumer_key = consumer_key
+        self.consumer_secret = consumer_secret
+        self._bearer_token = ''
+
+        resp = requests.post(self._get_oauth_url('token'),
+                             auth=(self.consumer_key,
+                                   self.consumer_secret),
+                             data={'grant_type': 'client_credentials'})
+        data = resp.json()
+        if data.get('token_type') != 'bearer':
+            raise TweepError('Expected token_type to equal "bearer", '
+                             'but got %s instead' % data.get('token_type'))
+
+        self._bearer_token = data['access_token']
+
+    def _get_oauth_url(self, endpoint):
+        return 'https://' + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
+
+    def apply_auth(self):
+        return OAuth2Bearer(self._bearer_token)

Benjamin Mako Hill || Want to submit a patch?