Handle content-type header charset value for streaming API
[twitter-api-cdsw] / tweepy / streaming.py
1 # Tweepy
2 # Copyright 2009-2010 Joshua Roesslein
3 # See LICENSE for details.
4
5 # Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets
6
7 from __future__ import absolute_import, print_function
8
9 import logging
10 import re
11 import requests
12 from requests.exceptions import Timeout
13 from threading import Thread
14 from time import sleep
15
16 import six
17
18 import ssl
19
20 from tweepy.models import Status
21 from tweepy.api import API
22 from tweepy.error import TweepError
23
24 from tweepy.utils import import_simplejson
25 json = import_simplejson()
26
27 STREAM_VERSION = '1.1'
28
29
30 class StreamListener(object):
31
32     def __init__(self, api=None):
33         self.api = api or API()
34
35     def on_connect(self):
36         """Called once connected to streaming server.
37
38         This will be invoked once a successful response
39         is received from the server. Allows the listener
40         to perform some work prior to entering the read loop.
41         """
42         pass
43
44     def on_data(self, raw_data):
45         """Called when raw data is received from connection.
46
47         Override this method if you wish to manually handle
48         the stream data. Return False to stop stream and close connection.
49         """
50         data = json.loads(raw_data)
51
52         if 'in_reply_to_status_id' in data:
53             status = Status.parse(self.api, data)
54             if self.on_status(status) is False:
55                 return False
56         elif 'delete' in data:
57             delete = data['delete']['status']
58             if self.on_delete(delete['id'], delete['user_id']) is False:
59                 return False
60         elif 'event' in data:
61             status = Status.parse(self.api, data)
62             if self.on_event(status) is False:
63                 return False
64         elif 'direct_message' in data:
65             status = Status.parse(self.api, data)
66             if self.on_direct_message(status) is False:
67                 return False
68         elif 'friends' in data:
69             if self.on_friends(data['friends']) is False:
70                 return False
71         elif 'limit' in data:
72             if self.on_limit(data['limit']['track']) is False:
73                 return False
74         elif 'disconnect' in data:
75             if self.on_disconnect(data['disconnect']) is False:
76                 return False
77         elif 'warning' in data:
78             if self.on_warning(data['warning']) is False:
79                 return False
80         else:
81             logging.error("Unknown message type: " + str(raw_data))
82
83     def keep_alive(self):
84         """Called when a keep-alive arrived"""
85         return
86
87     def on_status(self, status):
88         """Called when a new status arrives"""
89         return
90
91     def on_exception(self, exception):
92         """Called when an unhandled exception occurs."""
93         return
94
95     def on_delete(self, status_id, user_id):
96         """Called when a delete notice arrives for a status"""
97         return
98
99     def on_event(self, status):
100         """Called when a new event arrives"""
101         return
102
103     def on_direct_message(self, status):
104         """Called when a new direct message arrives"""
105         return
106
107     def on_friends(self, friends):
108         """Called when a friends list arrives.
109
110         friends is a list that contains user_id
111         """
112         return
113
114     def on_limit(self, track):
115         """Called when a limitation notice arrives"""
116         return
117
118     def on_error(self, status_code):
119         """Called when a non-200 status code is returned"""
120         return False
121
122     def on_timeout(self):
123         """Called when stream connection times out"""
124         return
125
126     def on_disconnect(self, notice):
127         """Called when twitter sends a disconnect notice
128
129         Disconnect codes are listed here:
130         https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
131         """
132         return
133
134     def on_warning(self, notice):
135         """Called when a disconnection warning message arrives"""
136         return
137
138
139 class ReadBuffer(object):
140     """Buffer data from the response in a smarter way than httplib/requests can.
141
142     Tweets are roughly in the 2-12kb range, averaging around 3kb.
143     Requests/urllib3/httplib/socket all use socket.read, which blocks
144     until enough data is returned. On some systems (eg google appengine), socket
145     reads are quite slow. To combat this latency we can read big chunks,
146     but the blocking part means we won't get results until enough tweets
147     have arrived. That may not be a big deal for high throughput systems.
148     For low throughput systems we don't want to sacrafice latency, so we
149     use small chunks so it can read the length and the tweet in 2 read calls.
150     """
151
152     def __init__(self, stream, chunk_size, encoding='utf-8'):
153         self._stream = stream
154         self._buffer = six.b('')
155         self._chunk_size = chunk_size
156         self._encoding = encoding
157
158     def read_len(self, length):
159         while not self._stream.closed:
160             if len(self._buffer) >= length:
161                 return self._pop(length)
162             read_len = max(self._chunk_size, length - len(self._buffer))
163             self._buffer += self._stream.read(read_len)
164
165     def read_line(self, sep=six.b('\n')):
166         """Read the data stream until a given separator is found (default \n)
167
168         :param sep: Separator to read until. Must by of the bytes type (str in python 2,
169             bytes in python 3)
170         :return: The str of the data read until sep
171         """
172         start = 0
173         while not self._stream.closed:
174             loc = self._buffer.find(sep, start)
175             if loc >= 0:
176                 return self._pop(loc + len(sep))
177             else:
178                 start = len(self._buffer)
179             self._buffer += self._stream.read(self._chunk_size)
180
181     def _pop(self, length):
182         r = self._buffer[:length]
183         self._buffer = self._buffer[length:]
184         return r.decode(self._encoding)
185
186
187 class Stream(object):
188
189     host = 'stream.twitter.com'
190
191     def __init__(self, auth, listener, **options):
192         self.auth = auth
193         self.listener = listener
194         self.running = False
195         self.timeout = options.get("timeout", 300.0)
196         self.retry_count = options.get("retry_count")
197         # values according to
198         # https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
199         self.retry_time_start = options.get("retry_time", 5.0)
200         self.retry_420_start = options.get("retry_420", 60.0)
201         self.retry_time_cap = options.get("retry_time_cap", 320.0)
202         self.snooze_time_step = options.get("snooze_time", 0.25)
203         self.snooze_time_cap = options.get("snooze_time_cap", 16)
204
205         # The default socket.read size. Default to less than half the size of
206         # a tweet so that it reads tweets with the minimal latency of 2 reads
207         # per tweet. Values higher than ~1kb will increase latency by waiting
208         # for more data to arrive but may also increase throughput by doing
209         # fewer socket read calls.
210         self.chunk_size = options.get("chunk_size",  512)
211
212         self.verify = options.get("verify", True)
213
214         self.api = API()
215         self.headers = options.get("headers") or {}
216         self.new_session()
217         self.body = None
218         self.retry_time = self.retry_time_start
219         self.snooze_time = self.snooze_time_step
220
221     def new_session(self):
222         self.session = requests.Session()
223         self.session.headers = self.headers
224         self.session.params = None
225
226     def _run(self):
227         # Authenticate
228         url = "https://%s%s" % (self.host, self.url)
229
230         # Connect and process the stream
231         error_counter = 0
232         resp = None
233         exception = None
234         while self.running:
235             if self.retry_count is not None:
236                 if error_counter > self.retry_count:
237                     # quit if error count greater than retry count
238                     break
239             try:
240                 auth = self.auth.apply_auth()
241                 resp = self.session.request('POST',
242                                             url,
243                                             data=self.body,
244                                             timeout=self.timeout,
245                                             stream=True,
246                                             auth=auth,
247                                             verify=self.verify)
248                 if resp.status_code != 200:
249                     if self.listener.on_error(resp.status_code) is False:
250                         break
251                     error_counter += 1
252                     if resp.status_code == 420:
253                         self.retry_time = max(self.retry_420_start,
254                                               self.retry_time)
255                     sleep(self.retry_time)
256                     self.retry_time = min(self.retry_time * 2,
257                                           self.retry_time_cap)
258                 else:
259                     error_counter = 0
260                     self.retry_time = self.retry_time_start
261                     self.snooze_time = self.snooze_time_step
262                     self.listener.on_connect()
263                     self._read_loop(resp)
264             except (Timeout, ssl.SSLError) as exc:
265                 # This is still necessary, as a SSLError can actually be
266                 # thrown when using Requests
267                 # If it's not time out treat it like any other exception
268                 if isinstance(exc, ssl.SSLError):
269                     if not (exc.args and 'timed out' in str(exc.args[0])):
270                         exception = exc
271                         break
272                 if self.listener.on_timeout() is False:
273                     break
274                 if self.running is False:
275                     break
276                 sleep(self.snooze_time)
277                 self.snooze_time = min(self.snooze_time + self.snooze_time_step,
278                                        self.snooze_time_cap)
279             except Exception as exc:
280                 exception = exc
281                 # any other exception is fatal, so kill loop
282                 break
283
284         # cleanup
285         self.running = False
286         if resp:
287             resp.close()
288
289         self.new_session()
290
291         if exception:
292             # call a handler first so that the exception can be logged.
293             self.listener.on_exception(exception)
294             raise exception
295
296     def _data(self, data):
297         if self.listener.on_data(data) is False:
298             self.running = False
299
300     def _read_loop(self, resp):
301         charset = resp.headers.get('content-type', default='')
302         enc_search = re.search('charset=(?P<enc>\S*)', charset)
303         if enc_search is not None:
304             encoding = enc_search.group('enc')
305         else:
306             encoding = 'utf-8'
307
308         buf = ReadBuffer(resp.raw, self.chunk_size, encoding=encoding)
309
310         while self.running and not resp.raw.closed:
311             length = 0
312             while not resp.raw.closed:
313                 line = buf.read_line().strip()
314                 if not line:
315                     self.listener.keep_alive()  # keep-alive new lines are expected
316                 elif line.isdigit():
317                     length = int(line)
318                     break
319                 else:
320                     raise TweepError('Expecting length, unexpected value found')
321
322             next_status_obj = buf.read_len(length)
323             if self.running:
324                 self._data(next_status_obj)
325
326             # # Note: keep-alive newlines might be inserted before each length value.
327             # # read until we get a digit...
328             # c = b'\n'
329             # for c in resp.iter_content(decode_unicode=True):
330             #     if c == b'\n':
331             #         continue
332             #     break
333             #
334             # delimited_string = c
335             #
336             # # read rest of delimiter length..
337             # d = b''
338             # for d in resp.iter_content(decode_unicode=True):
339             #     if d != b'\n':
340             #         delimited_string += d
341             #         continue
342             #     break
343             #
344             # # read the next twitter status object
345             # if delimited_string.decode('utf-8').strip().isdigit():
346             #     status_id = int(delimited_string)
347             #     next_status_obj = resp.raw.read(status_id)
348             #     if self.running:
349             #         self._data(next_status_obj.decode('utf-8'))
350
351
352         if resp.raw.closed:
353             self.on_closed(resp)
354
355     def _start(self, async):
356         self.running = True
357         if async:
358             self._thread = Thread(target=self._run)
359             self._thread.start()
360         else:
361             self._run()
362
363     def on_closed(self, resp):
364         """ Called when the response has been closed by Twitter """
365         pass
366
367     def userstream(self,
368                    stall_warnings=False,
369                    _with=None,
370                    replies=None,
371                    track=None,
372                    locations=None,
373                    async=False,
374                    encoding='utf8'):
375         self.session.params = {'delimited': 'length'}
376         if self.running:
377             raise TweepError('Stream object already connected!')
378         self.url = '/%s/user.json' % STREAM_VERSION
379         self.host = 'userstream.twitter.com'
380         if stall_warnings:
381             self.session.params['stall_warnings'] = stall_warnings
382         if _with:
383             self.session.params['with'] = _with
384         if replies:
385             self.session.params['replies'] = replies
386         if locations and len(locations) > 0:
387             if len(locations) % 4 != 0:
388                 raise TweepError("Wrong number of locations points, "
389                                  "it has to be a multiple of 4")
390             self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
391         if track:
392             self.session.params['track'] = u','.join(track).encode(encoding)
393
394         self._start(async)
395
396     def firehose(self, count=None, async=False):
397         self.session.params = {'delimited': 'length'}
398         if self.running:
399             raise TweepError('Stream object already connected!')
400         self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
401         if count:
402             self.url += '&count=%s' % count
403         self._start(async)
404
405     def retweet(self, async=False):
406         self.session.params = {'delimited': 'length'}
407         if self.running:
408             raise TweepError('Stream object already connected!')
409         self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
410         self._start(async)
411
412     def sample(self, async=False, languages=None):
413         self.session.params = {'delimited': 'length'}
414         if self.running:
415             raise TweepError('Stream object already connected!')
416         self.url = '/%s/statuses/sample.json' % STREAM_VERSION
417         if languages:
418             self.session.params['language'] = ','.join(map(str, languages))
419         self._start(async)
420
421     def filter(self, follow=None, track=None, async=False, locations=None,
422                stall_warnings=False, languages=None, encoding='utf8', filter_level=None):
423         self.body = {}
424         self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
425         if self.running:
426             raise TweepError('Stream object already connected!')
427         self.url = '/%s/statuses/filter.json' % STREAM_VERSION
428         if follow:
429             self.body['follow'] = u','.join(follow).encode(encoding)
430         if track:
431             self.body['track'] = u','.join(track).encode(encoding)
432         if locations and len(locations) > 0:
433             if len(locations) % 4 != 0:
434                 raise TweepError("Wrong number of locations points, "
435                                  "it has to be a multiple of 4")
436             self.body['locations'] = u','.join(['%.4f' % l for l in locations])
437         if stall_warnings:
438             self.body['stall_warnings'] = stall_warnings
439         if languages:
440             self.body['language'] = u','.join(map(str, languages))
441         if filter_level:
442             self.body['filter_level'] = unicode(filter_level, encoding)
443         self.session.params = {'delimited': 'length'}
444         self.host = 'stream.twitter.com'
445         self._start(async)
446
447     def sitestream(self, follow, stall_warnings=False,
448                    with_='user', replies=False, async=False):
449         self.body = {}
450         if self.running:
451             raise TweepError('Stream object already connected!')
452         self.url = '/%s/site.json' % STREAM_VERSION
453         self.body['follow'] = u','.join(map(six.text_type, follow))
454         self.body['delimited'] = 'length'
455         if stall_warnings:
456             self.body['stall_warnings'] = stall_warnings
457         if with_:
458             self.body['with'] = with_
459         if replies:
460             self.body['replies'] = replies
461         self._start(async)
462
463     def disconnect(self):
464         if self.running is False:
465             return
466         self.running = False

Benjamin Mako Hill || Want to submit a patch?