2 # Copyright 2009-2010 Joshua Roesslein
3 # See LICENSE for details.
5 # Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets
7 from __future__ import absolute_import, print_function
11 from requests.exceptions import Timeout
12 from threading import Thread
13 from time import sleep
19 from tweepy.models import Status
20 from tweepy.api import API
21 from tweepy.error import TweepError
23 from tweepy.utils import import_simplejson
24 json = import_simplejson()
26 STREAM_VERSION = '1.1'
29 class StreamListener(object):
31 def __init__(self, api=None):
32 self.api = api or API()
35 """Called once connected to streaming server.
37 This will be invoked once a successful response
38 is received from the server. Allows the listener
39 to perform some work prior to entering the read loop.
43 def on_data(self, raw_data):
44 """Called when raw data is received from connection.
46 Override this method if you wish to manually handle
47 the stream data. Return False to stop stream and close connection.
49 data = json.loads(raw_data)
51 if 'in_reply_to_status_id' in data:
52 status = Status.parse(self.api, data)
53 if self.on_status(status) is False:
55 elif 'delete' in data:
56 delete = data['delete']['status']
57 if self.on_delete(delete['id'], delete['user_id']) is False:
60 status = Status.parse(self.api, data)
61 if self.on_event(status) is False:
63 elif 'direct_message' in data:
64 status = Status.parse(self.api, data)
65 if self.on_direct_message(status) is False:
67 elif 'friends' in data:
68 if self.on_friends(data['friends']) is False:
71 if self.on_limit(data['limit']['track']) is False:
73 elif 'disconnect' in data:
74 if self.on_disconnect(data['disconnect']) is False:
76 elif 'warning' in data:
77 if self.on_warning(data['warning']) is False:
80 logging.error("Unknown message type: " + str(raw_data))
83 """Called when a keep-alive arrived"""
86 def on_status(self, status):
87 """Called when a new status arrives"""
90 def on_exception(self, exception):
91 """Called when an unhandled exception occurs."""
94 def on_delete(self, status_id, user_id):
95 """Called when a delete notice arrives for a status"""
98 def on_event(self, status):
99 """Called when a new event arrives"""
102 def on_direct_message(self, status):
103 """Called when a new direct message arrives"""
106 def on_friends(self, friends):
107 """Called when a friends list arrives.
109 friends is a list that contains user_id
113 def on_limit(self, track):
114 """Called when a limitation notice arrives"""
117 def on_error(self, status_code):
118 """Called when a non-200 status code is returned"""
121 def on_timeout(self):
122 """Called when stream connection times out"""
125 def on_disconnect(self, notice):
126 """Called when twitter sends a disconnect notice
128 Disconnect codes are listed here:
129 https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
133 def on_warning(self, notice):
134 """Called when a disconnection warning message arrives"""
138 class ReadBuffer(object):
139 """Buffer data from the response in a smarter way than httplib/requests can.
141 Tweets are roughly in the 2-12kb range, averaging around 3kb.
142 Requests/urllib3/httplib/socket all use socket.read, which blocks
143 until enough data is returned. On some systems (eg google appengine), socket
144 reads are quite slow. To combat this latency we can read big chunks,
145 but the blocking part means we won't get results until enough tweets
146 have arrived. That may not be a big deal for high throughput systems.
147 For low throughput systems we don't want to sacrafice latency, so we
148 use small chunks so it can read the length and the tweet in 2 read calls.
151 def __init__(self, stream, chunk_size):
152 self._stream = stream
154 self._chunk_size = chunk_size
156 def read_len(self, length):
157 while not self._stream.closed:
158 if len(self._buffer) >= length:
159 return self._pop(length)
160 read_len = max(self._chunk_size, length - len(self._buffer))
161 self._buffer += self._stream.read(read_len)
163 def read_line(self, sep='\n'):
165 while not self._stream.closed:
166 loc = self._buffer.find(sep, start)
168 return self._pop(loc + len(sep))
170 start = len(self._buffer)
171 self._buffer += self._stream.read(self._chunk_size)
173 def _pop(self, length):
174 r = self._buffer[:length]
175 self._buffer = self._buffer[length:]
179 class Stream(object):
181 host = 'stream.twitter.com'
183 def __init__(self, auth, listener, **options):
185 self.listener = listener
187 self.timeout = options.get("timeout", 300.0)
188 self.retry_count = options.get("retry_count")
189 # values according to
190 # https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
191 self.retry_time_start = options.get("retry_time", 5.0)
192 self.retry_420_start = options.get("retry_420", 60.0)
193 self.retry_time_cap = options.get("retry_time_cap", 320.0)
194 self.snooze_time_step = options.get("snooze_time", 0.25)
195 self.snooze_time_cap = options.get("snooze_time_cap", 16)
197 # The default socket.read size. Default to less than half the size of
198 # a tweet so that it reads tweets with the minimal latency of 2 reads
199 # per tweet. Values higher than ~1kb will increase latency by waiting
200 # for more data to arrive but may also increase throughput by doing
201 # fewer socket read calls.
202 self.chunk_size = options.get("chunk_size", 512)
204 self.verify = options.get("verify", True)
207 self.headers = options.get("headers") or {}
210 self.retry_time = self.retry_time_start
211 self.snooze_time = self.snooze_time_step
213 def new_session(self):
214 self.session = requests.Session()
215 self.session.headers = self.headers
216 self.session.params = None
220 url = "https://%s%s" % (self.host, self.url)
222 # Connect and process the stream
227 if self.retry_count is not None:
228 if error_counter > self.retry_count:
229 # quit if error count greater than retry count
232 auth = self.auth.apply_auth()
233 resp = self.session.request('POST',
236 timeout=self.timeout,
240 if resp.status_code != 200:
241 if self.listener.on_error(resp.status_code) is False:
244 if resp.status_code == 420:
245 self.retry_time = max(self.retry_420_start,
247 sleep(self.retry_time)
248 self.retry_time = min(self.retry_time * 2,
252 self.retry_time = self.retry_time_start
253 self.snooze_time = self.snooze_time_step
254 self.listener.on_connect()
255 self._read_loop(resp)
256 except (Timeout, ssl.SSLError) as exc:
257 # This is still necessary, as a SSLError can actually be
258 # thrown when using Requests
259 # If it's not time out treat it like any other exception
260 if isinstance(exc, ssl.SSLError):
261 if not (exc.args and 'timed out' in str(exc.args[0])):
264 if self.listener.on_timeout() is False:
266 if self.running is False:
268 sleep(self.snooze_time)
269 self.snooze_time = min(self.snooze_time + self.snooze_time_step,
270 self.snooze_time_cap)
271 except Exception as exc:
273 # any other exception is fatal, so kill loop
284 # call a handler first so that the exception can be logged.
285 self.listener.on_exception(exception)
288 def _data(self, data):
289 if self.listener.on_data(data) is False:
292 def _read_loop(self, resp):
293 buf = ReadBuffer(resp.raw, self.chunk_size)
295 while self.running and not resp.raw.closed:
297 while not resp.raw.closed:
298 line = buf.read_line().strip()
300 self.listener.keep_alive() # keep-alive new lines are expected
305 raise TweepError('Expecting length, unexpected value found')
307 next_status_obj = buf.read_len(length)
309 self._data(next_status_obj)
311 # # Note: keep-alive newlines might be inserted before each length value.
312 # # read until we get a digit...
314 # for c in resp.iter_content(decode_unicode=True):
319 # delimited_string = c
321 # # read rest of delimiter length..
323 # for d in resp.iter_content(decode_unicode=True):
325 # delimited_string += d
329 # # read the next twitter status object
330 # if delimited_string.decode('utf-8').strip().isdigit():
331 # status_id = int(delimited_string)
332 # next_status_obj = resp.raw.read(status_id)
334 # self._data(next_status_obj.decode('utf-8'))
340 def _start(self, async):
343 self._thread = Thread(target=self._run)
348 def on_closed(self, resp):
349 """ Called when the response has been closed by Twitter """
353 stall_warnings=False,
360 self.session.params = {'delimited': 'length'}
362 raise TweepError('Stream object already connected!')
363 self.url = '/%s/user.json' % STREAM_VERSION
364 self.host = 'userstream.twitter.com'
366 self.session.params['stall_warnings'] = stall_warnings
368 self.session.params['with'] = _with
370 self.session.params['replies'] = replies
371 if locations and len(locations) > 0:
372 if len(locations) % 4 != 0:
373 raise TweepError("Wrong number of locations points, "
374 "it has to be a multiple of 4")
375 self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
377 self.session.params['track'] = u','.join(track).encode(encoding)
381 def firehose(self, count=None, async=False):
382 self.session.params = {'delimited': 'length'}
384 raise TweepError('Stream object already connected!')
385 self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
387 self.url += '&count=%s' % count
390 def retweet(self, async=False):
391 self.session.params = {'delimited': 'length'}
393 raise TweepError('Stream object already connected!')
394 self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
397 def sample(self, async=False, languages=None):
398 self.session.params = {'delimited': 'length'}
400 raise TweepError('Stream object already connected!')
401 self.url = '/%s/statuses/sample.json' % STREAM_VERSION
403 self.session.params['language'] = ','.join(map(str, languages))
406 def filter(self, follow=None, track=None, async=False, locations=None,
407 stall_warnings=False, languages=None, encoding='utf8', filter_level=None):
409 self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
411 raise TweepError('Stream object already connected!')
412 self.url = '/%s/statuses/filter.json' % STREAM_VERSION
414 self.body['follow'] = u','.join(follow).encode(encoding)
416 self.body['track'] = u','.join(track).encode(encoding)
417 if locations and len(locations) > 0:
418 if len(locations) % 4 != 0:
419 raise TweepError("Wrong number of locations points, "
420 "it has to be a multiple of 4")
421 self.body['locations'] = u','.join(['%.4f' % l for l in locations])
423 self.body['stall_warnings'] = stall_warnings
425 self.body['language'] = u','.join(map(str, languages))
427 self.body['filter_level'] = unicode(filter_level, encoding)
428 self.session.params = {'delimited': 'length'}
429 self.host = 'stream.twitter.com'
432 def sitestream(self, follow, stall_warnings=False,
433 with_='user', replies=False, async=False):
436 raise TweepError('Stream object already connected!')
437 self.url = '/%s/site.json' % STREAM_VERSION
438 self.body['follow'] = u','.join(map(six.text_type, follow))
439 self.body['delimited'] = 'length'
441 self.body['stall_warnings'] = stall_warnings
443 self.body['with'] = with_
445 self.body['replies'] = replies
448 def disconnect(self):
449 if self.running is False: