]> projects.mako.cc - twitter-api-cdsw/blob - oauth/example/server.py
Changed gitignore to not grab auth
[twitter-api-cdsw] / oauth / example / server.py
1 """
2 The MIT License
3
4 Copyright (c) 2007 Leah Culver
5
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"), to deal
8 in the Software without restriction, including without limitation the rights
9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 copies of the Software, and to permit persons to whom the Software is
11 furnished to do so, subject to the following conditions:
12
13 The above copyright notice and this permission notice shall be included in
14 all copies or substantial portions of the Software.
15
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 THE SOFTWARE.
23 """
24
25 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
26 import urllib
27
28 import oauth.oauth as oauth
29
30 # fake urls for the test server
31 REQUEST_TOKEN_URL = 'https://photos.example.net/request_token'
32 ACCESS_TOKEN_URL = 'https://photos.example.net/access_token'
33 AUTHORIZATION_URL = 'https://photos.example.net/authorize'
34 CALLBACK_URL = 'http://printer.example.com/request_token_ready'
35 RESOURCE_URL = 'http://photos.example.net/photos'
36 REALM = 'http://photos.example.net/'
37 VERIFIER = 'verifier'
38
39 # example store for one of each thing
40 class MockOAuthDataStore(oauth.OAuthDataStore):
41
42     def __init__(self):
43         self.consumer = oauth.OAuthConsumer('key', 'secret')
44         self.request_token = oauth.OAuthToken('requestkey', 'requestsecret')
45         self.access_token = oauth.OAuthToken('accesskey', 'accesssecret')
46         self.nonce = 'nonce'
47         self.verifier = VERIFIER
48
49     def lookup_consumer(self, key):
50         if key == self.consumer.key:
51             return self.consumer
52         return None
53
54     def lookup_token(self, token_type, token):
55         token_attrib = getattr(self, '%s_token' % token_type)
56         if token == token_attrib.key:
57             ## HACK
58             token_attrib.set_callback(CALLBACK_URL)
59             return token_attrib
60         return None
61
62     def lookup_nonce(self, oauth_consumer, oauth_token, nonce):
63         if oauth_token and oauth_consumer.key == self.consumer.key and (oauth_token.key == self.request_token.key or oauth_token.key == self.access_token.key) and nonce == self.nonce:
64             return self.nonce
65         return None
66
67     def fetch_request_token(self, oauth_consumer, oauth_callback):
68         if oauth_consumer.key == self.consumer.key:
69             if oauth_callback:
70                 # want to check here if callback is sensible
71                 # for mock store, we assume it is
72                 self.request_token.set_callback(oauth_callback)
73             return self.request_token
74         return None
75
76     def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier):
77         if oauth_consumer.key == self.consumer.key and oauth_token.key == self.request_token.key and oauth_verifier == self.verifier:
78             # want to check here if token is authorized
79             # for mock store, we assume it is
80             return self.access_token
81         return None
82
83     def authorize_request_token(self, oauth_token, user):
84         if oauth_token.key == self.request_token.key:
85             # authorize the request token in the store
86             # for mock store, do nothing
87             return self.request_token
88         return None
89
90 class RequestHandler(BaseHTTPRequestHandler):
91
92     def __init__(self, *args, **kwargs):
93         self.oauth_server = oauth.OAuthServer(MockOAuthDataStore())
94         self.oauth_server.add_signature_method(oauth.OAuthSignatureMethod_PLAINTEXT())
95         self.oauth_server.add_signature_method(oauth.OAuthSignatureMethod_HMAC_SHA1())
96         BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
97
98     # example way to send an oauth error
99     def send_oauth_error(self, err=None):
100         # send a 401 error
101         self.send_error(401, str(err.message))
102         # return the authenticate header
103         header = oauth.build_authenticate_header(realm=REALM)
104         for k, v in header.iteritems():
105             self.send_header(k, v) 
106
107     def do_GET(self):
108
109         # debug info
110         #print self.command, self.path, self.headers
111         
112         # get the post data (if any)
113         postdata = None
114         if self.command == 'POST':
115             try:
116                 length = int(self.headers.getheader('content-length'))
117                 postdata = self.rfile.read(length)
118             except:
119                 pass
120
121         # construct the oauth request from the request parameters
122         oauth_request = oauth.OAuthRequest.from_request(self.command, self.path, headers=self.headers, query_string=postdata)
123
124         # request token
125         if self.path.startswith(REQUEST_TOKEN_URL):
126             try:
127                 # create a request token
128                 token = self.oauth_server.fetch_request_token(oauth_request)
129                 # send okay response
130                 self.send_response(200, 'OK')
131                 self.end_headers()
132                 # return the token
133                 self.wfile.write(token.to_string())
134             except oauth.OAuthError, err:
135                 self.send_oauth_error(err)
136             return
137
138         # user authorization
139         if self.path.startswith(AUTHORIZATION_URL):
140             try:
141                 # get the request token
142                 token = self.oauth_server.fetch_request_token(oauth_request)
143                 # authorize the token (kind of does nothing for now)
144                 token = self.oauth_server.authorize_token(token, None)
145                 token.set_verifier(VERIFIER)
146                 # send okay response
147                 self.send_response(200, 'OK')
148                 self.end_headers()
149                 # return the callback url (to show server has it)
150                 self.wfile.write(token.get_callback_url())
151             except oauth.OAuthError, err:
152                 self.send_oauth_error(err)
153             return
154
155         # access token
156         if self.path.startswith(ACCESS_TOKEN_URL):
157             try:
158                 # create an access token
159                 token = self.oauth_server.fetch_access_token(oauth_request)
160                 # send okay response
161                 self.send_response(200, 'OK')
162                 self.end_headers()
163                 # return the token
164                 self.wfile.write(token.to_string())
165             except oauth.OAuthError, err:
166                 self.send_oauth_error(err)
167             return
168
169         # protected resources
170         if self.path.startswith(RESOURCE_URL):
171             try:
172                 # verify the request has been oauth authorized
173                 consumer, token, params = self.oauth_server.verify_request(oauth_request)
174                 # send okay response
175                 self.send_response(200, 'OK')
176                 self.end_headers()
177                 # return the extra parameters - just for something to return
178                 self.wfile.write(str(params))
179             except oauth.OAuthError, err:
180                 self.send_oauth_error(err)
181             return
182
183     def do_POST(self):
184         return self.do_GET()
185
186 def main():
187     try:
188         server = HTTPServer(('', 8080), RequestHandler)
189         print 'Test server running...'
190         server.serve_forever()
191     except KeyboardInterrupt:
192         server.socket.close()
193
194 if __name__ == '__main__':
195     main()

Benjamin Mako Hill || Want to submit a patch?