2 # Copyright 2009-2010 Joshua Roesslein
3 # See LICENSE for details.
5 # Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets
7 from __future__ import absolute_import, print_function
12 from requests.exceptions import Timeout
13 from threading import Thread
14 from time import sleep
20 from tweepy.models import Status
21 from tweepy.api import API
22 from tweepy.error import TweepError
24 from tweepy.utils import import_simplejson
25 json = import_simplejson()
27 STREAM_VERSION = '1.1'
30 class StreamListener(object):
32 def __init__(self, api=None):
33 self.api = api or API()
36 """Called once connected to streaming server.
38 This will be invoked once a successful response
39 is received from the server. Allows the listener
40 to perform some work prior to entering the read loop.
44 def on_data(self, raw_data):
45 """Called when raw data is received from connection.
47 Override this method if you wish to manually handle
48 the stream data. Return False to stop stream and close connection.
50 data = json.loads(raw_data)
52 if 'in_reply_to_status_id' in data:
53 status = Status.parse(self.api, data)
54 if self.on_status(status) is False:
56 elif 'delete' in data:
57 delete = data['delete']['status']
58 if self.on_delete(delete['id'], delete['user_id']) is False:
61 status = Status.parse(self.api, data)
62 if self.on_event(status) is False:
64 elif 'direct_message' in data:
65 status = Status.parse(self.api, data)
66 if self.on_direct_message(status) is False:
68 elif 'friends' in data:
69 if self.on_friends(data['friends']) is False:
72 if self.on_limit(data['limit']['track']) is False:
74 elif 'disconnect' in data:
75 if self.on_disconnect(data['disconnect']) is False:
77 elif 'warning' in data:
78 if self.on_warning(data['warning']) is False:
81 logging.error("Unknown message type: " + str(raw_data))
84 """Called when a keep-alive arrived"""
87 def on_status(self, status):
88 """Called when a new status arrives"""
91 def on_exception(self, exception):
92 """Called when an unhandled exception occurs."""
95 def on_delete(self, status_id, user_id):
96 """Called when a delete notice arrives for a status"""
99 def on_event(self, status):
100 """Called when a new event arrives"""
103 def on_direct_message(self, status):
104 """Called when a new direct message arrives"""
107 def on_friends(self, friends):
108 """Called when a friends list arrives.
110 friends is a list that contains user_id
114 def on_limit(self, track):
115 """Called when a limitation notice arrives"""
118 def on_error(self, status_code):
119 """Called when a non-200 status code is returned"""
122 def on_timeout(self):
123 """Called when stream connection times out"""
126 def on_disconnect(self, notice):
127 """Called when twitter sends a disconnect notice
129 Disconnect codes are listed here:
130 https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
134 def on_warning(self, notice):
135 """Called when a disconnection warning message arrives"""
139 class ReadBuffer(object):
140 """Buffer data from the response in a smarter way than httplib/requests can.
142 Tweets are roughly in the 2-12kb range, averaging around 3kb.
143 Requests/urllib3/httplib/socket all use socket.read, which blocks
144 until enough data is returned. On some systems (eg google appengine), socket
145 reads are quite slow. To combat this latency we can read big chunks,
146 but the blocking part means we won't get results until enough tweets
147 have arrived. That may not be a big deal for high throughput systems.
148 For low throughput systems we don't want to sacrafice latency, so we
149 use small chunks so it can read the length and the tweet in 2 read calls.
152 def __init__(self, stream, chunk_size, encoding='utf-8'):
153 self._stream = stream
154 self._buffer = six.b('')
155 self._chunk_size = chunk_size
156 self._encoding = encoding
158 def read_len(self, length):
159 while not self._stream.closed:
160 if len(self._buffer) >= length:
161 return self._pop(length)
162 read_len = max(self._chunk_size, length - len(self._buffer))
163 self._buffer += self._stream.read(read_len)
165 def read_line(self, sep=six.b('\n')):
166 """Read the data stream until a given separator is found (default \n)
168 :param sep: Separator to read until. Must by of the bytes type (str in python 2,
170 :return: The str of the data read until sep
173 while not self._stream.closed:
174 loc = self._buffer.find(sep, start)
176 return self._pop(loc + len(sep))
178 start = len(self._buffer)
179 self._buffer += self._stream.read(self._chunk_size)
181 def _pop(self, length):
182 r = self._buffer[:length]
183 self._buffer = self._buffer[length:]
184 return r.decode(self._encoding)
187 class Stream(object):
189 host = 'stream.twitter.com'
191 def __init__(self, auth, listener, **options):
193 self.listener = listener
195 self.timeout = options.get("timeout", 300.0)
196 self.retry_count = options.get("retry_count")
197 # values according to
198 # https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
199 self.retry_time_start = options.get("retry_time", 5.0)
200 self.retry_420_start = options.get("retry_420", 60.0)
201 self.retry_time_cap = options.get("retry_time_cap", 320.0)
202 self.snooze_time_step = options.get("snooze_time", 0.25)
203 self.snooze_time_cap = options.get("snooze_time_cap", 16)
205 # The default socket.read size. Default to less than half the size of
206 # a tweet so that it reads tweets with the minimal latency of 2 reads
207 # per tweet. Values higher than ~1kb will increase latency by waiting
208 # for more data to arrive but may also increase throughput by doing
209 # fewer socket read calls.
210 self.chunk_size = options.get("chunk_size", 512)
212 self.verify = options.get("verify", True)
215 self.headers = options.get("headers") or {}
218 self.retry_time = self.retry_time_start
219 self.snooze_time = self.snooze_time_step
221 def new_session(self):
222 self.session = requests.Session()
223 self.session.headers = self.headers
224 self.session.params = None
228 url = "https://%s%s" % (self.host, self.url)
230 # Connect and process the stream
235 if self.retry_count is not None:
236 if error_counter > self.retry_count:
237 # quit if error count greater than retry count
240 auth = self.auth.apply_auth()
241 resp = self.session.request('POST',
244 timeout=self.timeout,
248 if resp.status_code != 200:
249 if self.listener.on_error(resp.status_code) is False:
252 if resp.status_code == 420:
253 self.retry_time = max(self.retry_420_start,
255 sleep(self.retry_time)
256 self.retry_time = min(self.retry_time * 2,
260 self.retry_time = self.retry_time_start
261 self.snooze_time = self.snooze_time_step
262 self.listener.on_connect()
263 self._read_loop(resp)
264 except (Timeout, ssl.SSLError) as exc:
265 # This is still necessary, as a SSLError can actually be
266 # thrown when using Requests
267 # If it's not time out treat it like any other exception
268 if isinstance(exc, ssl.SSLError):
269 if not (exc.args and 'timed out' in str(exc.args[0])):
272 if self.listener.on_timeout() is False:
274 if self.running is False:
276 sleep(self.snooze_time)
277 self.snooze_time = min(self.snooze_time + self.snooze_time_step,
278 self.snooze_time_cap)
279 except Exception as exc:
281 # any other exception is fatal, so kill loop
292 # call a handler first so that the exception can be logged.
293 self.listener.on_exception(exception)
296 def _data(self, data):
297 if self.listener.on_data(data) is False:
300 def _read_loop(self, resp):
301 charset = resp.headers.get('content-type', default='')
302 enc_search = re.search('charset=(?P<enc>\S*)', charset)
303 if enc_search is not None:
304 encoding = enc_search.group('enc')
308 buf = ReadBuffer(resp.raw, self.chunk_size, encoding=encoding)
310 while self.running and not resp.raw.closed:
312 while not resp.raw.closed:
313 line = buf.read_line().strip()
315 self.listener.keep_alive() # keep-alive new lines are expected
320 raise TweepError('Expecting length, unexpected value found')
322 next_status_obj = buf.read_len(length)
324 self._data(next_status_obj)
326 # # Note: keep-alive newlines might be inserted before each length value.
327 # # read until we get a digit...
329 # for c in resp.iter_content(decode_unicode=True):
334 # delimited_string = c
336 # # read rest of delimiter length..
338 # for d in resp.iter_content(decode_unicode=True):
340 # delimited_string += d
344 # # read the next twitter status object
345 # if delimited_string.decode('utf-8').strip().isdigit():
346 # status_id = int(delimited_string)
347 # next_status_obj = resp.raw.read(status_id)
349 # self._data(next_status_obj.decode('utf-8'))
355 def _start(self, async):
358 self._thread = Thread(target=self._run)
363 def on_closed(self, resp):
364 """ Called when the response has been closed by Twitter """
368 stall_warnings=False,
375 self.session.params = {'delimited': 'length'}
377 raise TweepError('Stream object already connected!')
378 self.url = '/%s/user.json' % STREAM_VERSION
379 self.host = 'userstream.twitter.com'
381 self.session.params['stall_warnings'] = stall_warnings
383 self.session.params['with'] = _with
385 self.session.params['replies'] = replies
386 if locations and len(locations) > 0:
387 if len(locations) % 4 != 0:
388 raise TweepError("Wrong number of locations points, "
389 "it has to be a multiple of 4")
390 self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
392 self.session.params['track'] = u','.join(track).encode(encoding)
396 def firehose(self, count=None, async=False):
397 self.session.params = {'delimited': 'length'}
399 raise TweepError('Stream object already connected!')
400 self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
402 self.url += '&count=%s' % count
405 def retweet(self, async=False):
406 self.session.params = {'delimited': 'length'}
408 raise TweepError('Stream object already connected!')
409 self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
412 def sample(self, async=False, languages=None):
413 self.session.params = {'delimited': 'length'}
415 raise TweepError('Stream object already connected!')
416 self.url = '/%s/statuses/sample.json' % STREAM_VERSION
418 self.session.params['language'] = ','.join(map(str, languages))
421 def filter(self, follow=None, track=None, async=False, locations=None,
422 stall_warnings=False, languages=None, encoding='utf8', filter_level=None):
424 self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
426 raise TweepError('Stream object already connected!')
427 self.url = '/%s/statuses/filter.json' % STREAM_VERSION
429 self.body['follow'] = u','.join(follow).encode(encoding)
431 self.body['track'] = u','.join(track).encode(encoding)
432 if locations and len(locations) > 0:
433 if len(locations) % 4 != 0:
434 raise TweepError("Wrong number of locations points, "
435 "it has to be a multiple of 4")
436 self.body['locations'] = u','.join(['%.4f' % l for l in locations])
438 self.body['stall_warnings'] = stall_warnings
440 self.body['language'] = u','.join(map(str, languages))
442 self.body['filter_level'] = unicode(filter_level, encoding)
443 self.session.params = {'delimited': 'length'}
444 self.host = 'stream.twitter.com'
447 def sitestream(self, follow, stall_warnings=False,
448 with_='user', replies=False, async=False):
451 raise TweepError('Stream object already connected!')
452 self.url = '/%s/site.json' % STREAM_VERSION
453 self.body['follow'] = u','.join(map(six.text_type, follow))
454 self.body['delimited'] = 'length'
456 self.body['stall_warnings'] = stall_warnings
458 self.body['with'] = with_
460 self.body['replies'] = replies
463 def disconnect(self):
464 if self.running is False: