2 # Copyright 2009-2010 Joshua Roesslein
3 # See LICENSE for details.
7 from socket import timeout
8 from threading import Thread
12 from tweepy.models import Status
13 from tweepy.api import API
14 from tweepy.error import TweepError
16 from tweepy.utils import import_simplejson, urlencode_noplus
17 json = import_simplejson()
19 STREAM_VERSION = '1.1'
22 class StreamListener(object):
24 def __init__(self, api=None):
25 self.api = api or API()
28 """Called once connected to streaming server.
30 This will be invoked once a successful response
31 is received from the server. Allows the listener
32 to perform some work prior to entering the read loop.
36 def on_data(self, raw_data):
37 """Called when raw data is received from connection.
39 Override this method if you wish to manually handle
40 the stream data. Return False to stop stream and close connection.
42 data = json.loads(raw_data)
44 if 'in_reply_to_status_id' in data:
45 status = Status.parse(self.api, data)
46 if self.on_status(status) is False:
48 elif 'delete' in data:
49 delete = data['delete']['status']
50 if self.on_delete(delete['id'], delete['user_id']) is False:
53 status = Status.parse(self.api, data)
54 if self.on_event(status) is False:
56 elif 'direct_message' in data:
57 status = Status.parse(self.api, data)
58 if self.on_direct_message(status) is False:
61 if self.on_limit(data['limit']['track']) is False:
63 elif 'disconnect' in data:
64 if self.on_disconnect(data['disconnect']) is False:
67 logging.error("Unknown message type: " + str(raw_data))
69 def on_status(self, status):
70 """Called when a new status arrives"""
73 def on_exception(self, exception):
74 """Called when an unhandled exception occurs."""
77 def on_delete(self, status_id, user_id):
78 """Called when a delete notice arrives for a status"""
81 def on_event(self, status):
82 """Called when a new event arrives"""
85 def on_direct_message(self, status):
86 """Called when a new direct message arrives"""
89 def on_limit(self, track):
90 """Called when a limitation notice arrvies"""
93 def on_error(self, status_code):
94 """Called when a non-200 status code is returned"""
98 """Called when stream connection times out"""
101 def on_disconnect(self, notice):
102 """Called when twitter sends a disconnect notice
104 Disconnect codes are listed here:
105 https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
110 class Stream(object):
112 host = 'stream.twitter.com'
114 def __init__(self, auth, listener, **options):
116 self.listener = listener
118 self.timeout = options.get("timeout", 300.0)
119 self.retry_count = options.get("retry_count")
120 # values according to https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
121 self.retry_time_start = options.get("retry_time", 5.0)
122 self.retry_420_start = options.get("retry_420", 60.0)
123 self.retry_time_cap = options.get("retry_time_cap", 320.0)
124 self.snooze_time_step = options.get("snooze_time", 0.25)
125 self.snooze_time_cap = options.get("snooze_time_cap", 16)
126 self.buffer_size = options.get("buffer_size", 1500)
127 if options.get("secure", True):
128 self.scheme = "https"
133 self.headers = options.get("headers") or {}
134 self.parameters = None
136 self.retry_time = self.retry_time_start
137 self.snooze_time = self.snooze_time_step
141 url = "%s://%s%s" % (self.scheme, self.host, self.url)
143 # Connect and process the stream
148 if self.retry_count is not None and error_counter > self.retry_count:
149 # quit if error count greater than retry count
152 if self.scheme == "http":
153 conn = httplib.HTTPConnection(self.host, timeout=self.timeout)
155 conn = httplib.HTTPSConnection(self.host, timeout=self.timeout)
156 self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
158 conn.request('POST', self.url, self.body, headers=self.headers)
159 resp = conn.getresponse()
160 if resp.status != 200:
161 if self.listener.on_error(resp.status) is False:
164 if resp.status == 420:
165 self.retry_time = max(self.retry_420_start, self.retry_time)
166 sleep(self.retry_time)
167 self.retry_time = min(self.retry_time * 2, self.retry_time_cap)
170 self.retry_time = self.retry_time_start
171 self.snooze_time = self.snooze_time_step
172 self.listener.on_connect()
173 self._read_loop(resp)
174 except (timeout, ssl.SSLError) as exc:
175 # If it's not time out treat it like any other exception
176 if isinstance(exc, ssl.SSLError) and not (exc.args and 'timed out' in str(exc.args[0])):
180 if self.listener.on_timeout() == False:
182 if self.running is False:
185 sleep(self.snooze_time)
186 self.snooze_time = min(self.snooze_time + self.snooze_time_step,
187 self.snooze_time_cap)
188 except Exception as exception:
189 # any other exception is fatal, so kill loop
198 # call a handler first so that the exception can be logged.
199 self.listener.on_exception(exception)
202 def _data(self, data):
203 if self.listener.on_data(data) is False:
206 def _read_loop(self, resp):
208 while self.running and not resp.isclosed():
210 # Note: keep-alive newlines might be inserted before each length value.
211 # read until we get a digit...
213 while c == '\n' and self.running and not resp.isclosed():
217 # read rest of delimiter length..
219 while d != '\n' and self.running and not resp.isclosed():
221 delimited_string += d
223 # read the next twitter status object
224 if delimited_string.strip().isdigit():
225 next_status_obj = resp.read( int(delimited_string) )
226 self._data(next_status_obj)
231 def _start(self, async):
234 Thread(target=self._run).start()
238 def on_closed(self, resp):
239 """ Called when the response has been closed by Twitter """
242 def userstream(self, stall_warnings=False, _with=None, replies=None,
243 track=None, locations=None, async=False, encoding='utf8'):
244 self.parameters = {'delimited': 'length'}
246 raise TweepError('Stream object already connected!')
247 self.url = '/%s/user.json?delimited=length' % STREAM_VERSION
248 self.host='userstream.twitter.com'
250 self.parameters['stall_warnings'] = stall_warnings
252 self.parameters['with'] = _with
254 self.parameters['replies'] = replies
255 if locations and len(locations) > 0:
256 assert len(locations) % 4 == 0
257 self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
259 encoded_track = [s.encode(encoding) for s in track]
260 self.parameters['track'] = ','.join(encoded_track)
261 self.body = urlencode_noplus(self.parameters)
264 def firehose(self, count=None, async=False):
265 self.parameters = {'delimited': 'length'}
267 raise TweepError('Stream object already connected!')
268 self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION
270 self.url += '&count=%s' % count
273 def retweet(self, async=False):
274 self.parameters = {'delimited': 'length'}
276 raise TweepError('Stream object already connected!')
277 self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION
280 def sample(self, count=None, async=False):
281 self.parameters = {'delimited': 'length'}
283 raise TweepError('Stream object already connected!')
284 self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
286 self.url += '&count=%s' % count
289 def filter(self, follow=None, track=None, async=False, locations=None,
290 count=None, stall_warnings=False, languages=None, encoding='utf8'):
292 self.headers['Content-type'] = "application/x-www-form-urlencoded"
294 raise TweepError('Stream object already connected!')
295 self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
297 encoded_follow = [s.encode(encoding) for s in follow]
298 self.parameters['follow'] = ','.join(encoded_follow)
300 encoded_track = [s.encode(encoding) for s in track]
301 self.parameters['track'] = ','.join(encoded_track)
302 if locations and len(locations) > 0:
303 assert len(locations) % 4 == 0
304 self.parameters['locations'] = ','.join(['%.4f' % l for l in locations])
306 self.parameters['count'] = count
308 self.parameters['stall_warnings'] = stall_warnings
310 self.parameters['language'] = ','.join(map(str, languages))
311 self.body = urlencode_noplus(self.parameters)
312 self.parameters['delimited'] = 'length'
315 def disconnect(self):
316 if self.running is False: