]> projects.mako.cc - twitter-api-cdsw/blob - tweepy/cache.py
Fixed key error in some tweets
[twitter-api-cdsw] / tweepy / cache.py
1 # Tweepy
2 # Copyright 2009-2010 Joshua Roesslein
3 # See LICENSE for details.
4
5 from __future__ import print_function
6
7 import time
8 import datetime
9 import threading
10 import os
11
12 try:
13     import cPickle as pickle
14 except ImportError:
15     import pickle
16
17 try:
18     import hashlib
19 except ImportError:
20     # python 2.4
21     import md5 as hashlib
22
23 try:
24     import fcntl
25 except ImportError:
26     # Probably on a windows system
27     # TODO: use win32file
28     pass
29
30
31 class Cache(object):
32     """Cache interface"""
33
34     def __init__(self, timeout=60):
35         """Initialize the cache
36             timeout: number of seconds to keep a cached entry
37         """
38         self.timeout = timeout
39
40     def store(self, key, value):
41         """Add new record to cache
42             key: entry key
43             value: data of entry
44         """
45         raise NotImplementedError
46
47     def get(self, key, timeout=None):
48         """Get cached entry if exists and not expired
49             key: which entry to get
50             timeout: override timeout with this value [optional]
51         """
52         raise NotImplementedError
53
54     def count(self):
55         """Get count of entries currently stored in cache"""
56         raise NotImplementedError
57
58     def cleanup(self):
59         """Delete any expired entries in cache."""
60         raise NotImplementedError
61
62     def flush(self):
63         """Delete all cached entries"""
64         raise NotImplementedError
65
66
67 class MemoryCache(Cache):
68     """In-memory cache"""
69
70     def __init__(self, timeout=60):
71         Cache.__init__(self, timeout)
72         self._entries = {}
73         self.lock = threading.Lock()
74
75     def __getstate__(self):
76         # pickle
77         return {'entries': self._entries, 'timeout': self.timeout}
78
79     def __setstate__(self, state):
80         # unpickle
81         self.lock = threading.Lock()
82         self._entries = state['entries']
83         self.timeout = state['timeout']
84
85     def _is_expired(self, entry, timeout):
86         return timeout > 0 and (time.time() - entry[0]) >= timeout
87
88     def store(self, key, value):
89         self.lock.acquire()
90         self._entries[key] = (time.time(), value)
91         self.lock.release()
92
93     def get(self, key, timeout=None):
94         self.lock.acquire()
95         try:
96             # check to see if we have this key
97             entry = self._entries.get(key)
98             if not entry:
99                 # no hit, return nothing
100                 return None
101
102             # use provided timeout in arguments if provided
103             # otherwise use the one provided during init.
104             if timeout is None:
105                 timeout = self.timeout
106
107             # make sure entry is not expired
108             if self._is_expired(entry, timeout):
109                 # entry expired, delete and return nothing
110                 del self._entries[key]
111                 return None
112
113             # entry found and not expired, return it
114             return entry[1]
115         finally:
116             self.lock.release()
117
118     def count(self):
119         return len(self._entries)
120
121     def cleanup(self):
122         self.lock.acquire()
123         try:
124             for k, v in dict(self._entries).items():
125                 if self._is_expired(v, self.timeout):
126                     del self._entries[k]
127         finally:
128             self.lock.release()
129
130     def flush(self):
131         self.lock.acquire()
132         self._entries.clear()
133         self.lock.release()
134
135
136 class FileCache(Cache):
137     """File-based cache"""
138
139     # locks used to make cache thread-safe
140     cache_locks = {}
141
142     def __init__(self, cache_dir, timeout=60):
143         Cache.__init__(self, timeout)
144         if os.path.exists(cache_dir) is False:
145             os.mkdir(cache_dir)
146         self.cache_dir = cache_dir
147         if cache_dir in FileCache.cache_locks:
148             self.lock = FileCache.cache_locks[cache_dir]
149         else:
150             self.lock = threading.Lock()
151             FileCache.cache_locks[cache_dir] = self.lock
152
153         if os.name == 'posix':
154             self._lock_file = self._lock_file_posix
155             self._unlock_file = self._unlock_file_posix
156         elif os.name == 'nt':
157             self._lock_file = self._lock_file_win32
158             self._unlock_file = self._unlock_file_win32
159         else:
160             print('Warning! FileCache locking not supported on this system!')
161             self._lock_file = self._lock_file_dummy
162             self._unlock_file = self._unlock_file_dummy
163
164     def _get_path(self, key):
165         md5 = hashlib.md5()
166         md5.update(key.encode('utf-8'))
167         return os.path.join(self.cache_dir, md5.hexdigest())
168
169     def _lock_file_dummy(self, path, exclusive=True):
170         return None
171
172     def _unlock_file_dummy(self, lock):
173         return
174
175     def _lock_file_posix(self, path, exclusive=True):
176         lock_path = path + '.lock'
177         if exclusive is True:
178             f_lock = open(lock_path, 'w')
179             fcntl.lockf(f_lock, fcntl.LOCK_EX)
180         else:
181             f_lock = open(lock_path, 'r')
182             fcntl.lockf(f_lock, fcntl.LOCK_SH)
183         if os.path.exists(lock_path) is False:
184             f_lock.close()
185             return None
186         return f_lock
187
188     def _unlock_file_posix(self, lock):
189         lock.close()
190
191     def _lock_file_win32(self, path, exclusive=True):
192         # TODO: implement
193         return None
194
195     def _unlock_file_win32(self, lock):
196         # TODO: implement
197         return
198
199     def _delete_file(self, path):
200         os.remove(path)
201         if os.path.exists(path + '.lock'):
202             os.remove(path + '.lock')
203
204     def store(self, key, value):
205         path = self._get_path(key)
206         self.lock.acquire()
207         try:
208             # acquire lock and open file
209             f_lock = self._lock_file(path)
210             datafile = open(path, 'wb')
211
212             # write data
213             pickle.dump((time.time(), value), datafile)
214
215             # close and unlock file
216             datafile.close()
217             self._unlock_file(f_lock)
218         finally:
219             self.lock.release()
220
221     def get(self, key, timeout=None):
222         return self._get(self._get_path(key), timeout)
223
224     def _get(self, path, timeout):
225         if os.path.exists(path) is False:
226             # no record
227             return None
228         self.lock.acquire()
229         try:
230             # acquire lock and open
231             f_lock = self._lock_file(path, False)
232             datafile = open(path, 'rb')
233
234             # read pickled object
235             created_time, value = pickle.load(datafile)
236             datafile.close()
237
238             # check if value is expired
239             if timeout is None:
240                 timeout = self.timeout
241             if timeout > 0:
242                 if (time.time() - created_time) >= timeout:
243                     # expired! delete from cache
244                     value = None
245                     self._delete_file(path)
246
247             # unlock and return result
248             self._unlock_file(f_lock)
249             return value
250         finally:
251             self.lock.release()
252
253     def count(self):
254         c = 0
255         for entry in os.listdir(self.cache_dir):
256             if entry.endswith('.lock'):
257                 continue
258             c += 1
259         return c
260
261     def cleanup(self):
262         for entry in os.listdir(self.cache_dir):
263             if entry.endswith('.lock'):
264                 continue
265             self._get(os.path.join(self.cache_dir, entry), None)
266
267     def flush(self):
268         for entry in os.listdir(self.cache_dir):
269             if entry.endswith('.lock'):
270                 continue
271             self._delete_file(os.path.join(self.cache_dir, entry))
272
273
274 class MemCacheCache(Cache):
275     """Cache interface"""
276
277     def __init__(self, client, timeout=60):
278         """Initialize the cache
279             client: The memcache client
280             timeout: number of seconds to keep a cached entry
281         """
282         self.client = client
283         self.timeout = timeout
284
285     def store(self, key, value):
286         """Add new record to cache
287             key: entry key
288             value: data of entry
289         """
290         self.client.set(key, value, time=self.timeout)
291
292     def get(self, key, timeout=None):
293         """Get cached entry if exists and not expired
294             key: which entry to get
295             timeout: override timeout with this value [optional].
296             DOES NOT WORK HERE
297         """
298         return self.client.get(key)
299
300     def count(self):
301         """Get count of entries currently stored in cache. RETURN 0"""
302         raise NotImplementedError
303
304     def cleanup(self):
305         """Delete any expired entries in cache. NO-OP"""
306         raise NotImplementedError
307
308     def flush(self):
309         """Delete all cached entries. NO-OP"""
310         raise NotImplementedError
311
312
313 class RedisCache(Cache):
314     """Cache running in a redis server"""
315
316     def __init__(self, client,
317                  timeout=60,
318                  keys_container='tweepy:keys',
319                  pre_identifier='tweepy:'):
320         Cache.__init__(self, timeout)
321         self.client = client
322         self.keys_container = keys_container
323         self.pre_identifier = pre_identifier
324
325     def _is_expired(self, entry, timeout):
326         # Returns true if the entry has expired
327         return timeout > 0 and (time.time() - entry[0]) >= timeout
328
329     def store(self, key, value):
330         """Store the key, value pair in our redis server"""
331         # Prepend tweepy to our key,
332         # this makes it easier to identify tweepy keys in our redis server
333         key = self.pre_identifier + key
334         # Get a pipe (to execute several redis commands in one step)
335         pipe = self.client.pipeline()
336         # Set our values in a redis hash (similar to python dict)
337         pipe.set(key, pickle.dumps((time.time(), value)))
338         # Set the expiration
339         pipe.expire(key, self.timeout)
340         # Add the key to a set containing all the keys
341         pipe.sadd(self.keys_container, key)
342         # Execute the instructions in the redis server
343         pipe.execute()
344
345     def get(self, key, timeout=None):
346         """Given a key, returns an element from the redis table"""
347         key = self.pre_identifier + key
348         # Check to see if we have this key
349         unpickled_entry = self.client.get(key)
350         if not unpickled_entry:
351             # No hit, return nothing
352             return None
353
354         entry = pickle.loads(unpickled_entry)
355         # Use provided timeout in arguments if provided
356         # otherwise use the one provided during init.
357         if timeout is None:
358             timeout = self.timeout
359
360         # Make sure entry is not expired
361         if self._is_expired(entry, timeout):
362             # entry expired, delete and return nothing
363             self.delete_entry(key)
364             return None
365         # entry found and not expired, return it
366         return entry[1]
367
368     def count(self):
369         """Note: This is not very efficient,
370         since it retreives all the keys from the redis
371         server to know how many keys we have"""
372         return len(self.client.smembers(self.keys_container))
373
374     def delete_entry(self, key):
375         """Delete an object from the redis table"""
376         pipe = self.client.pipeline()
377         pipe.srem(self.keys_container, key)
378         pipe.delete(key)
379         pipe.execute()
380
381     def cleanup(self):
382         """Cleanup all the expired keys"""
383         keys = self.client.smembers(self.keys_container)
384         for key in keys:
385             entry = self.client.get(key)
386             if entry:
387                 entry = pickle.loads(entry)
388                 if self._is_expired(entry, self.timeout):
389                     self.delete_entry(key)
390
391     def flush(self):
392         """Delete all entries from the cache"""
393         keys = self.client.smembers(self.keys_container)
394         for key in keys:
395             self.delete_entry(key)
396
397
398 class MongodbCache(Cache):
399     """A simple pickle-based MongoDB cache sytem."""
400
401     def __init__(self, db, timeout=3600, collection='tweepy_cache'):
402         """Should receive a "database" cursor from pymongo."""
403         Cache.__init__(self, timeout)
404         self.timeout = timeout
405         self.col = db[collection]
406         self.col.create_index('created', expireAfterSeconds=timeout)
407
408     def store(self, key, value):
409         from bson.binary import Binary
410
411         now = datetime.datetime.utcnow()
412         blob = Binary(pickle.dumps(value))
413
414         self.col.insert({'created': now, '_id': key, 'value': blob})
415
416     def get(self, key, timeout=None):
417         if timeout:
418             raise NotImplementedError
419         obj = self.col.find_one({'_id': key})
420         if obj:
421             return pickle.loads(obj['value'])
422
423     def count(self):
424         return self.col.find({}).count()
425
426     def delete_entry(self, key):
427         return self.col.remove({'_id': key})
428
429     def cleanup(self):
430         """MongoDB will automatically clear expired keys."""
431         pass
432
433     def flush(self):
434         self.col.drop()
435         self.col.create_index('created', expireAfterSeconds=self.timeout)

Benjamin Mako Hill || Want to submit a patch?