4 Copyright (c) 2007 Leah Culver
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"), to deal
8 in the Software without restriction, including without limitation the rights
9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 copies of the Software, and to permit persons to whom the Software is
11 furnished to do so, subject to the following conditions:
13 The above copyright notice and this permission notice shall be included in
14 all copies or substantial portions of the Software.
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
28 import oauth.oauth as oauth
30 # fake urls for the test server
31 REQUEST_TOKEN_URL = 'https://photos.example.net/request_token'
32 ACCESS_TOKEN_URL = 'https://photos.example.net/access_token'
33 AUTHORIZATION_URL = 'https://photos.example.net/authorize'
34 CALLBACK_URL = 'http://printer.example.com/request_token_ready'
35 RESOURCE_URL = 'http://photos.example.net/photos'
36 REALM = 'http://photos.example.net/'
39 # example store for one of each thing
40 class MockOAuthDataStore(oauth.OAuthDataStore):
43 self.consumer = oauth.OAuthConsumer('key', 'secret')
44 self.request_token = oauth.OAuthToken('requestkey', 'requestsecret')
45 self.access_token = oauth.OAuthToken('accesskey', 'accesssecret')
47 self.verifier = VERIFIER
49 def lookup_consumer(self, key):
50 if key == self.consumer.key:
54 def lookup_token(self, token_type, token):
55 token_attrib = getattr(self, '%s_token' % token_type)
56 if token == token_attrib.key:
58 token_attrib.set_callback(CALLBACK_URL)
62 def lookup_nonce(self, oauth_consumer, oauth_token, nonce):
63 if oauth_token and oauth_consumer.key == self.consumer.key and (oauth_token.key == self.request_token.key or oauth_token.key == self.access_token.key) and nonce == self.nonce:
67 def fetch_request_token(self, oauth_consumer, oauth_callback):
68 if oauth_consumer.key == self.consumer.key:
70 # want to check here if callback is sensible
71 # for mock store, we assume it is
72 self.request_token.set_callback(oauth_callback)
73 return self.request_token
76 def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier):
77 if oauth_consumer.key == self.consumer.key and oauth_token.key == self.request_token.key and oauth_verifier == self.verifier:
78 # want to check here if token is authorized
79 # for mock store, we assume it is
80 return self.access_token
83 def authorize_request_token(self, oauth_token, user):
84 if oauth_token.key == self.request_token.key:
85 # authorize the request token in the store
86 # for mock store, do nothing
87 return self.request_token
90 class RequestHandler(BaseHTTPRequestHandler):
92 def __init__(self, *args, **kwargs):
93 self.oauth_server = oauth.OAuthServer(MockOAuthDataStore())
94 self.oauth_server.add_signature_method(oauth.OAuthSignatureMethod_PLAINTEXT())
95 self.oauth_server.add_signature_method(oauth.OAuthSignatureMethod_HMAC_SHA1())
96 BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
98 # example way to send an oauth error
99 def send_oauth_error(self, err=None):
101 self.send_error(401, str(err.message))
102 # return the authenticate header
103 header = oauth.build_authenticate_header(realm=REALM)
104 for k, v in header.iteritems():
105 self.send_header(k, v)
110 #print self.command, self.path, self.headers
112 # get the post data (if any)
114 if self.command == 'POST':
116 length = int(self.headers.getheader('content-length'))
117 postdata = self.rfile.read(length)
121 # construct the oauth request from the request parameters
122 oauth_request = oauth.OAuthRequest.from_request(self.command, self.path, headers=self.headers, query_string=postdata)
125 if self.path.startswith(REQUEST_TOKEN_URL):
127 # create a request token
128 token = self.oauth_server.fetch_request_token(oauth_request)
130 self.send_response(200, 'OK')
133 self.wfile.write(token.to_string())
134 except oauth.OAuthError, err:
135 self.send_oauth_error(err)
139 if self.path.startswith(AUTHORIZATION_URL):
141 # get the request token
142 token = self.oauth_server.fetch_request_token(oauth_request)
143 # authorize the token (kind of does nothing for now)
144 token = self.oauth_server.authorize_token(token, None)
145 token.set_verifier(VERIFIER)
147 self.send_response(200, 'OK')
149 # return the callback url (to show server has it)
150 self.wfile.write(token.get_callback_url())
151 except oauth.OAuthError, err:
152 self.send_oauth_error(err)
156 if self.path.startswith(ACCESS_TOKEN_URL):
158 # create an access token
159 token = self.oauth_server.fetch_access_token(oauth_request)
161 self.send_response(200, 'OK')
164 self.wfile.write(token.to_string())
165 except oauth.OAuthError, err:
166 self.send_oauth_error(err)
169 # protected resources
170 if self.path.startswith(RESOURCE_URL):
172 # verify the request has been oauth authorized
173 consumer, token, params = self.oauth_server.verify_request(oauth_request)
175 self.send_response(200, 'OK')
177 # return the extra parameters - just for something to return
178 self.wfile.write(str(params))
179 except oauth.OAuthError, err:
180 self.send_oauth_error(err)
188 server = HTTPServer(('', 8080), RequestHandler)
189 print 'Test server running...'
190 server.serve_forever()
191 except KeyboardInterrupt:
192 server.socket.close()
194 if __name__ == '__main__':