X-Git-Url: https://projects.mako.cc/source/twitter-api-cdsw-solutions/blobdiff_plain/4c43c9724c25c3d919cb535973559fab1f1c0a7b..2ada46e15dd4a24ed4565acc7961417798d27491:/tweepy/auth.py diff --git a/tweepy/auth.py b/tweepy/auth.py index 86c4430..b15434b 100644 --- a/tweepy/auth.py +++ b/tweepy/auth.py @@ -1,13 +1,18 @@ -# Tweepy -# Copyright 2009-2010 Joshua Roesslein -# See LICENSE for details. +from __future__ import print_function -from urllib2 import Request, urlopen -import base64 +import six +import logging -from tweepy import oauth from tweepy.error import TweepError from tweepy.api import API +import requests +from requests_oauthlib import OAuth1Session, OAuth1 +from requests.auth import AuthBase +from six.moves.urllib.parse import parse_qs + +WARNING_MESSAGE = """Warning! Due to a Twitter API bug, signin_with_twitter +and access_type don't always play nice together. Details +https://dev.twitter.com/discussions/21281""" class AuthHandler(object): @@ -23,76 +28,64 @@ class AuthHandler(object): class OAuthHandler(AuthHandler): """OAuth authentication handler""" - OAUTH_HOST = 'api.twitter.com' OAUTH_ROOT = '/oauth/' - def __init__(self, consumer_key, consumer_secret, callback=None, secure=True): - if type(consumer_key) == unicode: - consumer_key = bytes(consumer_key) + def __init__(self, consumer_key, consumer_secret, callback=None): + if type(consumer_key) == six.text_type: + consumer_key = consumer_key.encode('ascii') - if type(consumer_secret) == unicode: - consumer_secret = bytes(consumer_secret) + if type(consumer_secret) == six.text_type: + consumer_secret = consumer_secret.encode('ascii') - self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secret) - self._sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1() - self.request_token = None + self.consumer_key = consumer_key + self.consumer_secret = consumer_secret self.access_token = None + self.access_token_secret = None self.callback = callback self.username = None - self.secure = secure + self.oauth = OAuth1Session(consumer_key, + client_secret=consumer_secret, + callback_uri=self.callback) - def _get_oauth_url(self, endpoint, secure=True): - if self.secure or secure: - prefix = 'https://' - else: - prefix = 'http://' + def _get_oauth_url(self, endpoint): + return 'https://' + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint - return prefix + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint + def apply_auth(self): + return OAuth1(self.consumer_key, + client_secret=self.consumer_secret, + resource_owner_key=self.access_token, + resource_owner_secret=self.access_token_secret, + decoding=None) - def apply_auth(self, url, method, headers, parameters): - request = oauth.OAuthRequest.from_consumer_and_token( - self._consumer, http_url=url, http_method=method, - token=self.access_token, parameters=parameters - ) - request.sign_request(self._sigmethod, self._consumer, self.access_token) - headers.update(request.to_header()) - - def _get_request_token(self): + def _get_request_token(self, access_type=None): try: url = self._get_oauth_url('request_token') - request = oauth.OAuthRequest.from_consumer_and_token( - self._consumer, http_url=url, callback=self.callback - ) - request.sign_request(self._sigmethod, self._consumer, None) - resp = urlopen(Request(url, headers=request.to_header())) - return oauth.OAuthToken.from_string(resp.read()) + if access_type: + url += '?x_auth_access_type=%s' % access_type + return self.oauth.fetch_request_token(url) except Exception as e: raise TweepError(e) - def set_request_token(self, key, secret): - self.request_token = oauth.OAuthToken(key, secret) - def set_access_token(self, key, secret): - self.access_token = oauth.OAuthToken(key, secret) + self.access_token = key + self.access_token_secret = secret - def get_authorization_url(self, signin_with_twitter=False): + def get_authorization_url(self, + signin_with_twitter=False, + access_type=None): """Get the authorization URL to redirect the user""" try: - # get the request token - self.request_token = self._get_request_token() - - # build auth request and return as url if signin_with_twitter: url = self._get_oauth_url('authenticate') + if access_type: + logging.warning(WARNING_MESSAGE) else: url = self._get_oauth_url('authorize') - request = oauth.OAuthRequest.from_token_and_callback( - token=self.request_token, http_url=url - ) - - return request.to_url() + self.request_token = self._get_request_token(access_type=access_type) + return self.oauth.authorization_url(url) except Exception as e: + raise raise TweepError(e) def get_access_token(self, verifier=None): @@ -102,19 +95,15 @@ class OAuthHandler(AuthHandler): """ try: url = self._get_oauth_url('access_token') - - # build request - request = oauth.OAuthRequest.from_consumer_and_token( - self._consumer, - token=self.request_token, http_url=url, - verifier=str(verifier) - ) - request.sign_request(self._sigmethod, self._consumer, self.request_token) - - # send request - resp = urlopen(Request(url, headers=request.to_header())) - self.access_token = oauth.OAuthToken.from_string(resp.read()) - return self.access_token + self.oauth = OAuth1Session(self.consumer_key, + client_secret=self.consumer_secret, + resource_owner_key=self.request_token['oauth_token'], + resource_owner_secret=self.request_token['oauth_token_secret'], + verifier=verifier, callback_uri=self.callback) + resp = self.oauth.fetch_access_token(url) + self.access_token = resp['oauth_token'] + self.access_token_secret = resp['oauth_token_secret'] + return self.access_token, self.access_token_secret except Exception as e: raise TweepError(e) @@ -126,21 +115,18 @@ class OAuthHandler(AuthHandler): and request activation of xAuth for it. """ try: - url = self._get_oauth_url('access_token', secure=True) # must use HTTPS - request = oauth.OAuthRequest.from_consumer_and_token( - oauth_consumer=self._consumer, - http_method='POST', http_url=url, - parameters = { - 'x_auth_mode': 'client_auth', - 'x_auth_username': username, - 'x_auth_password': password - } - ) - request.sign_request(self._sigmethod, self._consumer, None) - - resp = urlopen(Request(url, data=request.to_postdata())) - self.access_token = oauth.OAuthToken.from_string(resp.read()) - return self.access_token + url = self._get_oauth_url('access_token') + oauth = OAuth1(self.consumer_key, + client_secret=self.consumer_secret) + r = requests.post(url=url, + auth=oauth, + headers={'x_auth_mode': 'client_auth', + 'x_auth_username': username, + 'x_auth_password': password}) + + print(r.content) + credentials = parse_qs(r.content) + return credentials.get('oauth_token')[0], credentials.get('oauth_token_secret')[0] except Exception as e: raise TweepError(e) @@ -151,6 +137,44 @@ class OAuthHandler(AuthHandler): if user: self.username = user.screen_name else: - raise TweepError("Unable to get username, invalid oauth token!") + raise TweepError('Unable to get username,' + ' invalid oauth token!') return self.username + +class OAuth2Bearer(AuthBase): + def __init__(self, bearer_token): + self.bearer_token = bearer_token + + def __call__(self, request): + request.headers['Authorization'] = 'Bearer ' + self.bearer_token + return request + + +class AppAuthHandler(AuthHandler): + """Application-only authentication handler""" + + OAUTH_HOST = 'api.twitter.com' + OAUTH_ROOT = '/oauth2/' + + def __init__(self, consumer_key, consumer_secret): + self.consumer_key = consumer_key + self.consumer_secret = consumer_secret + self._bearer_token = '' + + resp = requests.post(self._get_oauth_url('token'), + auth=(self.consumer_key, + self.consumer_secret), + data={'grant_type': 'client_credentials'}) + data = resp.json() + if data.get('token_type') != 'bearer': + raise TweepError('Expected token_type to equal "bearer", ' + 'but got %s instead' % data.get('token_type')) + + self._bearer_token = data['access_token'] + + def _get_oauth_url(self, endpoint): + return 'https://' + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint + + def apply_auth(self): + return OAuth2Bearer(self._bearer_token)