initial version of several example programs using the tweepy twitter API
[twitter-api-cdsw-solutions] / tweepy / cache.py
1 # Tweepy
2 # Copyright 2009-2010 Joshua Roesslein
3 # See LICENSE for details.
4
5 import time
6 import datetime
7 import threading
8 import os
9
10 try:
11     import cPickle as pickle
12 except ImportError:
13     import pickle
14
15 try:
16     import hashlib
17 except ImportError:
18     # python 2.4
19     import md5 as hashlib
20
21 try:
22     import fcntl
23 except ImportError:
24     # Probably on a windows system
25     # TODO: use win32file
26     pass
27
28
29 class Cache(object):
30     """Cache interface"""
31
32     def __init__(self, timeout=60):
33         """Initialize the cache
34             timeout: number of seconds to keep a cached entry
35         """
36         self.timeout = timeout
37
38     def store(self, key, value):
39         """Add new record to cache
40             key: entry key
41             value: data of entry
42         """
43         raise NotImplementedError
44
45     def get(self, key, timeout=None):
46         """Get cached entry if exists and not expired
47             key: which entry to get
48             timeout: override timeout with this value [optional]
49         """
50         raise NotImplementedError
51
52     def count(self):
53         """Get count of entries currently stored in cache"""
54         raise NotImplementedError
55
56     def cleanup(self):
57         """Delete any expired entries in cache."""
58         raise NotImplementedError
59
60     def flush(self):
61         """Delete all cached entries"""
62         raise NotImplementedError
63
64
65 class MemoryCache(Cache):
66     """In-memory cache"""
67
68     def __init__(self, timeout=60):
69         Cache.__init__(self, timeout)
70         self._entries = {}
71         self.lock = threading.Lock()
72
73     def __getstate__(self):
74         # pickle
75         return {'entries': self._entries, 'timeout': self.timeout}
76
77     def __setstate__(self, state):
78         # unpickle
79         self.lock = threading.Lock()
80         self._entries = state['entries']
81         self.timeout = state['timeout']
82
83     def _is_expired(self, entry, timeout):
84         return timeout > 0 and (time.time() - entry[0]) >= timeout
85
86     def store(self, key, value):
87         self.lock.acquire()
88         self._entries[key] = (time.time(), value)
89         self.lock.release()
90
91     def get(self, key, timeout=None):
92         self.lock.acquire()
93         try:
94             # check to see if we have this key
95             entry = self._entries.get(key)
96             if not entry:
97                 # no hit, return nothing
98                 return None
99
100             # use provided timeout in arguments if provided
101             # otherwise use the one provided during init.
102             if timeout is None:
103                 timeout = self.timeout
104
105             # make sure entry is not expired
106             if self._is_expired(entry, timeout):
107                 # entry expired, delete and return nothing
108                 del self._entries[key]
109                 return None
110
111             # entry found and not expired, return it
112             return entry[1]
113         finally:
114             self.lock.release()
115
116     def count(self):
117         return len(self._entries)
118
119     def cleanup(self):
120         self.lock.acquire()
121         try:
122             for k, v in self._entries.items():
123                 if self._is_expired(v, self.timeout):
124                     del self._entries[k]
125         finally:
126             self.lock.release()
127
128     def flush(self):
129         self.lock.acquire()
130         self._entries.clear()
131         self.lock.release()
132
133
134 class FileCache(Cache):
135     """File-based cache"""
136
137     # locks used to make cache thread-safe
138     cache_locks = {}
139
140     def __init__(self, cache_dir, timeout=60):
141         Cache.__init__(self, timeout)
142         if os.path.exists(cache_dir) is False:
143             os.mkdir(cache_dir)
144         self.cache_dir = cache_dir
145         if cache_dir in FileCache.cache_locks:
146             self.lock = FileCache.cache_locks[cache_dir]
147         else:
148             self.lock = threading.Lock()
149             FileCache.cache_locks[cache_dir] = self.lock
150
151         if os.name == 'posix':
152             self._lock_file = self._lock_file_posix
153             self._unlock_file = self._unlock_file_posix
154         elif os.name == 'nt':
155             self._lock_file = self._lock_file_win32
156             self._unlock_file = self._unlock_file_win32
157         else:
158             print('Warning! FileCache locking not supported on this system!')
159             self._lock_file = self._lock_file_dummy
160             self._unlock_file = self._unlock_file_dummy
161
162     def _get_path(self, key):
163         md5 = hashlib.md5()
164         md5.update(key)
165         return os.path.join(self.cache_dir, md5.hexdigest())
166
167     def _lock_file_dummy(self, path, exclusive=True):
168         return None
169
170     def _unlock_file_dummy(self, lock):
171         return
172
173     def _lock_file_posix(self, path, exclusive=True):
174         lock_path = path + '.lock'
175         if exclusive is True:
176             f_lock = open(lock_path, 'w')
177             fcntl.lockf(f_lock, fcntl.LOCK_EX)
178         else:
179             f_lock = open(lock_path, 'r')
180             fcntl.lockf(f_lock, fcntl.LOCK_SH)
181         if os.path.exists(lock_path) is False:
182             f_lock.close()
183             return None
184         return f_lock
185
186     def _unlock_file_posix(self, lock):
187         lock.close()
188
189     def _lock_file_win32(self, path, exclusive=True):
190         # TODO: implement
191         return None
192
193     def _unlock_file_win32(self, lock):
194         # TODO: implement
195         return
196
197     def _delete_file(self, path):
198         os.remove(path)
199         if os.path.exists(path + '.lock'):
200             os.remove(path + '.lock')
201
202     def store(self, key, value):
203         path = self._get_path(key)
204         self.lock.acquire()
205         try:
206             # acquire lock and open file
207             f_lock = self._lock_file(path)
208             datafile = open(path, 'wb')
209
210             # write data
211             pickle.dump((time.time(), value), datafile)
212
213             # close and unlock file
214             datafile.close()
215             self._unlock_file(f_lock)
216         finally:
217             self.lock.release()
218
219     def get(self, key, timeout=None):
220         return self._get(self._get_path(key), timeout)
221
222     def _get(self, path, timeout):
223         if os.path.exists(path) is False:
224             # no record
225             return None
226         self.lock.acquire()
227         try:
228             # acquire lock and open
229             f_lock = self._lock_file(path, False)
230             datafile = open(path, 'rb')
231
232             # read pickled object
233             created_time, value = pickle.load(datafile)
234             datafile.close()
235
236             # check if value is expired
237             if timeout is None:
238                 timeout = self.timeout
239             if timeout > 0 and (time.time() - created_time) >= timeout:
240                 # expired! delete from cache
241                 value = None
242                 self._delete_file(path)
243
244             # unlock and return result
245             self._unlock_file(f_lock)
246             return value
247         finally:
248             self.lock.release()
249
250     def count(self):
251         c = 0
252         for entry in os.listdir(self.cache_dir):
253             if entry.endswith('.lock'):
254                 continue
255             c += 1
256         return c
257
258     def cleanup(self):
259         for entry in os.listdir(self.cache_dir):
260             if entry.endswith('.lock'):
261                 continue
262             self._get(os.path.join(self.cache_dir, entry), None)
263
264     def flush(self):
265         for entry in os.listdir(self.cache_dir):
266             if entry.endswith('.lock'):
267                 continue
268             self._delete_file(os.path.join(self.cache_dir, entry))
269
270 class MemCacheCache(Cache):
271     """Cache interface"""
272
273     def __init__(self, client, timeout=60):
274         """Initialize the cache
275             client: The memcache client
276             timeout: number of seconds to keep a cached entry
277         """
278         self.client = client
279         self.timeout = timeout
280
281     def store(self, key, value):
282         """Add new record to cache
283             key: entry key
284             value: data of entry
285         """
286         self.client.set(key, value, time=self.timeout)
287
288     def get(self, key, timeout=None):
289         """Get cached entry if exists and not expired
290             key: which entry to get
291             timeout: override timeout with this value [optional]. DOES NOT WORK HERE
292         """
293         return self.client.get(key)
294
295     def count(self):
296         """Get count of entries currently stored in cache. RETURN 0"""
297         raise NotImplementedError
298
299     def cleanup(self):
300         """Delete any expired entries in cache. NO-OP"""
301         raise NotImplementedError
302
303     def flush(self):
304         """Delete all cached entries. NO-OP"""
305         raise NotImplementedError
306
307 class RedisCache(Cache):
308     '''Cache running in a redis server'''
309
310     def __init__(self, client, timeout=60, keys_container = 'tweepy:keys', pre_identifier = 'tweepy:'):
311         Cache.__init__(self, timeout)
312         self.client = client
313         self.keys_container = keys_container
314         self.pre_identifier = pre_identifier
315
316     def _is_expired(self, entry, timeout):
317         # Returns true if the entry has expired
318         return timeout > 0 and (time.time() - entry[0]) >= timeout
319
320     def store(self, key, value):
321         '''Store the key, value pair in our redis server'''
322         # Prepend tweepy to our key, this makes it easier to identify tweepy keys in our redis server
323         key = self.pre_identifier + key
324         # Get a pipe (to execute several redis commands in one step)
325         pipe = self.client.pipeline()
326         # Set our values in a redis hash (similar to python dict)
327         pipe.set(key, pickle.dumps((time.time(), value)))
328         # Set the expiration
329         pipe.expire(key, self.timeout)
330         # Add the key to a set containing all the keys
331         pipe.sadd(self.keys_container, key)
332         # Execute the instructions in the redis server
333         pipe.execute()
334
335     def get(self, key, timeout=None):
336         '''Given a key, returns an element from the redis table'''
337         key = self.pre_identifier + key
338         # Check to see if we have this key
339         unpickled_entry = self.client.get(key)
340         if not unpickled_entry:
341             # No hit, return nothing
342             return None
343
344         entry = pickle.loads(unpickled_entry)
345         # Use provided timeout in arguments if provided
346         # otherwise use the one provided during init.
347         if timeout is None:
348             timeout = self.timeout
349
350         # Make sure entry is not expired
351         if self._is_expired(entry, timeout):
352             # entry expired, delete and return nothing
353             self.delete_entry(key)
354             return None
355         # entry found and not expired, return it
356         return entry[1]
357
358     def count(self):
359         '''Note: This is not very efficient, since it retreives all the keys from the redis
360         server to know how many keys we have'''
361         return len(self.client.smembers(self.keys_container))
362
363     def delete_entry(self, key):
364         '''Delete an object from the redis table'''
365         pipe = self.client.pipeline()
366         pipe.srem(self.keys_container, key)
367         pipe.delete(key)
368         pipe.execute()
369
370     def cleanup(self):
371         '''Cleanup all the expired keys'''
372         keys = self.client.smembers(self.keys_container)
373         for key in keys:
374             entry = self.client.get(key)
375             if entry:
376                 entry = pickle.loads(entry)
377                 if self._is_expired(entry, self.timeout):
378                     self.delete_entry(key)
379
380     def flush(self):
381         '''Delete all entries from the cache'''
382         keys = self.client.smembers(self.keys_container)
383         for key in keys:
384             self.delete_entry(key)
385
386
387 class MongodbCache(Cache):
388     """A simple pickle-based MongoDB cache sytem."""
389
390     def __init__(self, db, timeout=3600, collection='tweepy_cache'):
391         """Should receive a "database" cursor from pymongo."""
392         Cache.__init__(self, timeout)
393         self.timeout = timeout
394         self.col = db[collection]
395         self.col.create_index('created', expireAfterSeconds=timeout)
396
397     def store(self, key, value):
398         from bson.binary import Binary
399
400         now = datetime.datetime.utcnow()
401         blob = Binary(pickle.dumps(value))
402
403         self.col.insert({'created': now, '_id': key, 'value': blob})
404
405     def get(self, key, timeout=None):
406         if timeout:
407             raise NotImplementedError
408         obj = self.col.find_one({'_id': key})
409         if obj:
410             return pickle.loads(obj['value'])
411
412     def count(self):
413         return self.col.find({}).count()
414
415     def delete_entry(self, key):
416         return self.col.remove({'_id': key})
417
418     def cleanup(self):
419         """MongoDB will automatically clear expired keys."""
420         pass
421
422     def flush(self):
423         self.col.drop()
424         self.col.create_index('created', expireAfterSeconds=self.timeout)

Benjamin Mako Hill || Want to submit a patch?