X-Git-Url: https://projects.mako.cc/source/twitter-api-cdsw-solutions/blobdiff_plain/4c43c9724c25c3d919cb535973559fab1f1c0a7b..HEAD:/tweepy/binder.py diff --git a/tweepy/binder.py b/tweepy/binder.py index 1f8c619..2ac6146 100644 --- a/tweepy/binder.py +++ b/tweepy/binder.py @@ -2,24 +2,30 @@ # Copyright 2009-2010 Joshua Roesslein # See LICENSE for details. -import httplib -import urllib +from __future__ import print_function + import time import re -from StringIO import StringIO -import gzip + +from six.moves.urllib.parse import quote +import requests + +import logging from tweepy.error import TweepError from tweepy.utils import convert_to_utf8_str from tweepy.models import Model + re_path_template = re.compile('{\w+}') +log = logging.getLogger('tweepy.binder') def bind_api(**config): class APIMethod(object): + api = config['api'] path = config['path'] payload_type = config.get('payload_type', None) payload_list = config.get('payload_list', False) @@ -27,38 +33,47 @@ def bind_api(**config): method = config.get('method', 'GET') require_auth = config.get('require_auth', False) search_api = config.get('search_api', False) + upload_api = config.get('upload_api', False) use_cache = config.get('use_cache', True) + session = requests.Session() - def __init__(self, api, args, kargs): + def __init__(self, args, kwargs): + api = self.api # If authentication is required and no credentials # are provided, throw an error. if self.require_auth and not api.auth: raise TweepError('Authentication required!') - self.api = api - self.post_data = kargs.pop('post_data', None) - self.retry_count = kargs.pop('retry_count', api.retry_count) - self.retry_delay = kargs.pop('retry_delay', api.retry_delay) - self.retry_errors = kargs.pop('retry_errors', api.retry_errors) - self.headers = kargs.pop('headers', {}) - self.build_parameters(args, kargs) + self.post_data = kwargs.pop('post_data', None) + self.retry_count = kwargs.pop('retry_count', + api.retry_count) + self.retry_delay = kwargs.pop('retry_delay', + api.retry_delay) + self.retry_errors = kwargs.pop('retry_errors', + api.retry_errors) + self.wait_on_rate_limit = kwargs.pop('wait_on_rate_limit', + api.wait_on_rate_limit) + self.wait_on_rate_limit_notify = kwargs.pop('wait_on_rate_limit_notify', + api.wait_on_rate_limit_notify) + self.parser = kwargs.pop('parser', api.parser) + self.session.headers = kwargs.pop('headers', {}) + self.build_parameters(args, kwargs) # Pick correct URL root to use if self.search_api: self.api_root = api.search_root + elif self.upload_api: + self.api_root = api.upload_root else: self.api_root = api.api_root # Perform any path variable substitution self.build_path() - if api.secure: - self.scheme = 'https://' - else: - self.scheme = 'http://' - if self.search_api: self.host = api.search_host + elif self.upload_api: + self.host = api.upload_host else: self.host = api.host @@ -66,40 +81,44 @@ def bind_api(**config): # or older where Host is set including the 443 port. # This causes Twitter to issue 301 redirect. # See Issue https://github.com/tweepy/tweepy/issues/12 - self.headers['Host'] = self.host + self.session.headers['Host'] = self.host + # Monitoring rate limits + self._remaining_calls = None + self._reset_time = None - def build_parameters(self, args, kargs): - self.parameters = {} + def build_parameters(self, args, kwargs): + self.session.params = {} for idx, arg in enumerate(args): if arg is None: continue - try: - self.parameters[self.allowed_param[idx]] = convert_to_utf8_str(arg) + self.session.params[self.allowed_param[idx]] = convert_to_utf8_str(arg) except IndexError: raise TweepError('Too many parameters supplied!') - for k, arg in kargs.items(): + for k, arg in kwargs.items(): if arg is None: continue - if k in self.parameters: + if k in self.session.params: raise TweepError('Multiple values for parameter %s supplied!' % k) - self.parameters[k] = convert_to_utf8_str(arg) + self.session.params[k] = convert_to_utf8_str(arg) + + log.info("PARAMS: %r", self.session.params) def build_path(self): for variable in re_path_template.findall(self.path): name = variable.strip('{}') - if name == 'user' and 'user' not in self.parameters and self.api.auth: + if name == 'user' and 'user' not in self.session.params and self.api.auth: # No 'user' parameter provided, fetch it from Auth instead. value = self.api.auth.get_username() else: try: - value = urllib.quote(self.parameters[name]) + value = quote(self.session.params[name]) except KeyError: raise TweepError('No parameter value found for path variable: %s' % name) - del self.parameters[name] + del self.session.params[name] self.path = self.path.replace(variable, value) @@ -108,8 +127,7 @@ def bind_api(**config): # Build the request URL url = self.api_root + self.path - if len(self.parameters): - url = '%s?%s' % (url, urllib.urlencode(self.parameters)) + full_url = 'https://' + self.host + url # Query the cache if one is available # and this request uses a GET method. @@ -132,60 +150,80 @@ def bind_api(**config): # or maximum number of retries is reached. retries_performed = 0 while retries_performed < self.retry_count + 1: - # Open connection - if self.api.secure: - conn = httplib.HTTPSConnection(self.host, timeout=self.api.timeout) - else: - conn = httplib.HTTPConnection(self.host, timeout=self.api.timeout) + # handle running out of api calls + if self.wait_on_rate_limit: + if self._reset_time is not None: + if self._remaining_calls is not None: + if self._remaining_calls < 1: + sleep_time = self._reset_time - int(time.time()) + if sleep_time > 0: + if self.wait_on_rate_limit_notify: + print("Rate limit reached. Sleeping for:", sleep_time) + time.sleep(sleep_time + 5) # sleep for few extra sec + + # if self.wait_on_rate_limit and self._reset_time is not None and \ + # self._remaining_calls is not None and self._remaining_calls < 1: + # sleep_time = self._reset_time - int(time.time()) + # if sleep_time > 0: + # if self.wait_on_rate_limit_notify: + # print("Rate limit reached. Sleeping for: " + str(sleep_time)) + # time.sleep(sleep_time + 5) # sleep for few extra sec # Apply authentication if self.api.auth: - self.api.auth.apply_auth( - self.scheme + self.host + url, - self.method, self.headers, self.parameters - ) + auth = self.api.auth.apply_auth() # Request compression if configured if self.api.compression: - self.headers['Accept-encoding'] = 'gzip' + self.session.headers['Accept-encoding'] = 'gzip' # Execute request try: - conn.request(self.method, url, headers=self.headers, body=self.post_data) - resp = conn.getresponse() + resp = self.session.request(self.method, + full_url, + data=self.post_data, + timeout=self.api.timeout, + auth=auth, + proxies=self.api.proxy) except Exception as e: raise TweepError('Failed to send request: %s' % e) - + rem_calls = resp.headers.get('x-rate-limit-remaining') + if rem_calls is not None: + self._remaining_calls = int(rem_calls) + elif isinstance(self._remaining_calls, int): + self._remaining_calls -= 1 + reset_time = resp.headers.get('x-rate-limit-reset') + if reset_time is not None: + self._reset_time = int(reset_time) + if self.wait_on_rate_limit and self._remaining_calls == 0 and ( + # if ran out of calls before waiting switching retry last call + resp.status_code == 429 or resp.status_code == 420): + continue + retry_delay = self.retry_delay # Exit request loop if non-retry error code - if self.retry_errors: - if resp.status not in self.retry_errors: break - else: - if resp.status == 200: break + if resp.status_code == 200: + break + elif (resp.status_code == 429 or resp.status_code == 420) and self.wait_on_rate_limit: + if 'retry-after' in resp.headers: + retry_delay = float(resp.headers['retry-after']) + elif self.retry_errors and resp.status_code not in self.retry_errors: + break # Sleep before retrying request again - time.sleep(self.retry_delay) + time.sleep(retry_delay) retries_performed += 1 # If an error was returned, throw an exception self.api.last_response = resp - if resp.status and not 200 <= resp.status < 300: + if resp.status_code and not 200 <= resp.status_code < 300: try: - error_msg = self.api.parser.parse_error(resp.read()) + error_msg = self.parser.parse_error(resp.text) except Exception: - error_msg = "Twitter error response: status code = %s" % resp.status + error_msg = "Twitter error response: status code = %s" % resp.status_code raise TweepError(error_msg, resp) # Parse the response payload - body = resp.read() - if resp.getheader('Content-Encoding', '') == 'gzip': - try: - zipper = gzip.GzipFile(fileobj=StringIO(body)) - body = zipper.read() - except Exception as e: - raise TweepError('Failed to decompress data: %s' % e) - result = self.api.parser.parse(self, body) - - conn.close() + result = self.parser.parse(self, resp.text) # Store result into cache if one is available. if self.use_cache and self.api.cache and self.method == 'GET' and result: @@ -193,21 +231,20 @@ def bind_api(**config): return result - - def _call(api, *args, **kargs): - - method = APIMethod(api, args, kargs) - return method.execute() - + def _call(*args, **kwargs): + method = APIMethod(args, kwargs) + if kwargs.get('create'): + return method + else: + return method.execute() # Set pagination mode if 'cursor' in APIMethod.allowed_param: _call.pagination_mode = 'cursor' - elif 'max_id' in APIMethod.allowed_param and \ - 'since_id' in APIMethod.allowed_param: - _call.pagination_mode = 'id' + elif 'max_id' in APIMethod.allowed_param: + if 'since_id' in APIMethod.allowed_param: + _call.pagination_mode = 'id' elif 'page' in APIMethod.allowed_param: _call.pagination_mode = 'page' return _call -