]> projects.mako.cc - twitter-api-cdsw/blob - tweepy/streaming.py
be233ffe0ef158bf7fa732976b2abfe6c18285f9
[twitter-api-cdsw] / tweepy / streaming.py
1 # Tweepy
2 # Copyright 2009-2010 Joshua Roesslein
3 # See LICENSE for details.
4
5 import logging
6 import httplib
7 from socket import timeout
8 from threading import Thread
9 from time import sleep
10 import ssl
11
12 from tweepy.models import Status
13 from tweepy.api import API
14 from tweepy.error import TweepError
15
16 from tweepy.utils import import_simplejson, urlencode_noplus
17 json = import_simplejson()
18
19 STREAM_VERSION = '1.1'
20
21
22 class StreamListener(object):
23
24     def __init__(self, api=None):
25         self.api = api or API()
26
27     def on_connect(self):
28         """Called once connected to streaming server.
29
30         This will be invoked once a successful response
31         is received from the server. Allows the listener
32         to perform some work prior to entering the read loop.
33         """
34         pass
35
36     def on_data(self, raw_data):
37         """Called when raw data is received from connection.
38
39         Override this method if you wish to manually handle
40         the stream data. Return False to stop stream and close connection.
41         """
42         data = json.loads(raw_data)
43
44         if 'in_reply_to_status_id' in data:
45             status = Status.parse(self.api, data)
46             if self.on_status(status) is False:
47                 return False
48         elif 'delete' in data:
49             delete = data['delete']['status']
50             if self.on_delete(delete['id'], delete['user_id']) is False:
51                 return False
52         elif 'event' in data:
53             status = Status.parse(self.api, data)
54             if self.on_event(status) is False:
55                 return False
56         elif 'direct_message' in data:
57             status = Status.parse(self.api, data)
58             if self.on_direct_message(status) is False:
59                 return False
60         elif 'limit' in data:
61             if self.on_limit(data['limit']['track']) is False:
62                 return False
63         elif 'disconnect' in data:
64             if self.on_disconnect(data['disconnect']) is False:
65                 return False
66         else:
67             logging.error("Unknown message type: " + str(raw_data))
68
69     def on_status(self, status):
70         """Called when a new status arrives"""
71         return
72
73     def on_exception(self, exception):
74         """Called when an unhandled exception occurs."""
75         return
76
77     def on_delete(self, status_id, user_id):
78         """Called when a delete notice arrives for a status"""
79         return
80
81     def on_event(self, status):
82         """Called when a new event arrives"""
83         return
84
85     def on_direct_message(self, status):
86         """Called when a new direct message arrives"""
87         return
88
89     def on_limit(self, track):
90         """Called when a limitation notice arrvies"""
91         return
92
93     def on_error(self, status_code):
94         """Called when a non-200 status code is returned"""
95         return False
96
97     def on_timeout(self):
98         """Called when stream connection times out"""
99         return
100
101     def on_disconnect(self, notice):
102         """Called when twitter sends a disconnect notice
103
104         Disconnect codes are listed here:
105         https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
106         """
107         return
108
109
110 class Stream(object):
111
112     host = 'stream.twitter.com'
113
114     def __init__(self, auth, listener, **options):
115         self.auth = auth
116         self.listener = listener
117         self.running = False
118         self.timeout = options.get("timeout", 300.0)
119         self.retry_count = options.get("retry_count")
120         # values according to https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
121         self.retry_time_start = options.get("retry_time", 5.0)
122         self.retry_420_start = options.get("retry_420", 60.0)
123         self.retry_time_cap = options.get("retry_time_cap", 320.0)
124         self.snooze_time_step = options.get("snooze_time", 0.25)
125         self.snooze_time_cap = options.get("snooze_time_cap", 16)
126         self.buffer_size = options.get("buffer_size",  1500)
127         if options.get("secure", True):
128             self.scheme = "https"
129         else:
130             self.scheme = "http"
131
132         self.api = API()
133         self.headers = options.get("headers") or {}
134         self.parameters = None
135         self.body = None
136         self.retry_time = self.retry_time_start
137         self.snooze_time = self.snooze_time_step
138
139     def _run(self):
140         # Authenticate
141         url = "%s://%s%s" % (self.scheme, self.host, self.url)
142
143         # Connect and process the stream
144         error_counter = 0
145         conn = None
146         exception = None
147         while self.running:
148             if self.retry_count is not None and error_counter > self.retry_count:
149                 # quit if error count greater than retry count
150                 break
151             try:
152                 if self.scheme == "http":
153                     conn = httplib.HTTPConnection(self.host, timeout=self.timeout)
154                 else:
155                     conn = httplib.HTTPSConnection(self.host, timeout=self.timeout)
156                 self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
157                 conn.connect()
158                 conn.request('POST', self.url, self.body, headers=self.headers)
159                 resp = conn.getresponse()
160                 if resp.status != 200:
161                     if self.listener.on_error(resp.status) is False:
162                         break
163                     error_counter += 1
164                     if resp.status == 420:
165                         self.retry_time = max(self.retry_420_start, self.retry_time)
166                     sleep(self.retry_time)
167                     self.retry_time = min(self.retry_time * 2, self.retry_time_cap)
168                 else:
169                     error_counter = 0
170                     self.retry_time = self.retry_time_start
171                     self.snooze_time = self.snooze_time_step
172                     self.listener.on_connect()
173                     self._read_loop(resp)
174             except (timeout, ssl.SSLError) as exc:
175                 # If it's not time out treat it like any other exception
176                 if isinstance(exc, ssl.SSLError) and not (exc.args and 'timed out' in str(exc.args[0])):
177                     exception = exc
178                     break
179
180                 if self.listener.on_timeout() == False:
181                     break
182                 if self.running is False:
183                     break
184                 conn.close()
185                 sleep(self.snooze_time)
186                 self.snooze_time = min(self.snooze_time + self.snooze_time_step,
187                                        self.snooze_time_cap)
188             except Exception as exception:
189                 # any other exception is fatal, so kill loop
190                 break
191
192         # cleanup
193         self.running = False
194         if conn:
195             conn.close()
196
197         if exception:
198             # call a handler first so that the exception can be logged.
199             self.listener.on_exception(exception)
200             raise
201
202     def _data(self, data):
203         if self.listener.on_data(data) is False:
204             self.running = False
205
206     def _read_loop(self, resp):
207
208         while self.running and not resp.isclosed():
209
210             # Note: keep-alive newlines might be inserted before each length value.
211             # read until we get a digit...
212             c = '\n'
213             while c == '\n' and self.running and not resp.isclosed():
214                 c = resp.read(1)
215             delimited_string = c
216
217             # read rest of delimiter length..
218             d = ''
219             while d != '\n' and self.running and not resp.isclosed():
220                 d = resp.read(1)
221                 delimited_string += d
222
223             # read the next twitter status object
224             if delimited_string.strip().isdigit():
225                 next_status_obj = resp.read( int(delimited_string) )
226                 self._data(next_status_obj)
227
228         if resp.isclosed():
229             self.on_closed(resp)
230
231     def _start(self, async):
232         self.running = True
233         if async:
234             Thread(target=self._run).start()
235         else:
236             self._run()
237
238     def on_closed(self, resp):
239         """ Called when the response has been closed by Twitter """
240         pass
241
242     def userstream(self, stall_warnings=False, _with=None, replies=None,
243             track=None, locations=None, async=False, encoding='utf8'):
244         self.parameters = {'delimited': 'length'}
245         if self.running:
246             raise TweepError('Stream object already connected!')
247         self.url = '/%s/user.json?delimited=length' % STREAM_VERSION
248         self.host='userstream.twitter.com'
249         if stall_warnings:
250             self.parameters['stall_warnings'] = stall_warnings
251         if _with:
252             self.parameters['with'] = _with
253         if replies:
254             self.parameters['replies'] = replies
255         if locations and len(locations) > 0:
256             assert len(locations) % 4 == 0
257             self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
258         if track:
259             encoded_track = [s.encode(encoding) for s in track]
260             self.parameters['track'] = ','.join(encoded_track)
261         self.body = urlencode_noplus(self.parameters)
262         self._start(async)
263
264     def firehose(self, count=None, async=False):
265         self.parameters = {'delimited': 'length'}
266         if self.running:
267             raise TweepError('Stream object already connected!')
268         self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION
269         if count:
270             self.url += '&count=%s' % count
271         self._start(async)
272
273     def retweet(self, async=False):
274         self.parameters = {'delimited': 'length'}
275         if self.running:
276             raise TweepError('Stream object already connected!')
277         self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION
278         self._start(async)
279
280     def sample(self, count=None, async=False):
281         self.parameters = {'delimited': 'length'}
282         if self.running:
283             raise TweepError('Stream object already connected!')
284         self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
285         if count:
286             self.url += '&count=%s' % count
287         self._start(async)
288
289     def filter(self, follow=None, track=None, async=False, locations=None,
290                count=None, stall_warnings=False, languages=None, encoding='utf8'):
291         self.parameters = {}
292         self.headers['Content-type'] = "application/x-www-form-urlencoded"
293         if self.running:
294             raise TweepError('Stream object already connected!')
295         self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
296         if follow:
297             encoded_follow = [s.encode(encoding) for s in follow]
298             self.parameters['follow'] = ','.join(encoded_follow)
299         if track:
300             encoded_track = [s.encode(encoding) for s in track]
301             self.parameters['track'] = ','.join(encoded_track)
302         if locations and len(locations) > 0:
303             assert len(locations) % 4 == 0
304             self.parameters['locations'] = ','.join(['%.4f' % l for l in locations])
305         if count:
306             self.parameters['count'] = count
307         if stall_warnings:
308             self.parameters['stall_warnings'] = stall_warnings
309         if languages:
310             self.parameters['language'] = ','.join(map(str, languages))
311         self.body = urlencode_noplus(self.parameters)
312         self.parameters['delimited'] = 'length'
313         self._start(async)
314
315     def disconnect(self):
316         if self.running is False:
317             return
318         self.running = False
319

Benjamin Mako Hill || Want to submit a patch?