X-Git-Url: https://projects.mako.cc/source/twitter-api-cdsw-solutions/blobdiff_plain/935dc47766b0e4ee54b4dcfbddf4ac549280574e..043b63c67557591f1e85b53d17fc8a1a797f48ca:/oauth/example/server.py diff --git a/oauth/example/server.py b/oauth/example/server.py deleted file mode 100644 index 5986b0e..0000000 --- a/oauth/example/server.py +++ /dev/null @@ -1,195 +0,0 @@ -""" -The MIT License - -Copyright (c) 2007 Leah Culver - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -""" - -from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer -import urllib - -import oauth.oauth as oauth - -# fake urls for the test server -REQUEST_TOKEN_URL = 'https://photos.example.net/request_token' -ACCESS_TOKEN_URL = 'https://photos.example.net/access_token' -AUTHORIZATION_URL = 'https://photos.example.net/authorize' -CALLBACK_URL = 'http://printer.example.com/request_token_ready' -RESOURCE_URL = 'http://photos.example.net/photos' -REALM = 'http://photos.example.net/' -VERIFIER = 'verifier' - -# example store for one of each thing -class MockOAuthDataStore(oauth.OAuthDataStore): - - def __init__(self): - self.consumer = oauth.OAuthConsumer('key', 'secret') - self.request_token = oauth.OAuthToken('requestkey', 'requestsecret') - self.access_token = oauth.OAuthToken('accesskey', 'accesssecret') - self.nonce = 'nonce' - self.verifier = VERIFIER - - def lookup_consumer(self, key): - if key == self.consumer.key: - return self.consumer - return None - - def lookup_token(self, token_type, token): - token_attrib = getattr(self, '%s_token' % token_type) - if token == token_attrib.key: - ## HACK - token_attrib.set_callback(CALLBACK_URL) - return token_attrib - return None - - def lookup_nonce(self, oauth_consumer, oauth_token, nonce): - if oauth_token and oauth_consumer.key == self.consumer.key and (oauth_token.key == self.request_token.key or oauth_token.key == self.access_token.key) and nonce == self.nonce: - return self.nonce - return None - - def fetch_request_token(self, oauth_consumer, oauth_callback): - if oauth_consumer.key == self.consumer.key: - if oauth_callback: - # want to check here if callback is sensible - # for mock store, we assume it is - self.request_token.set_callback(oauth_callback) - return self.request_token - return None - - def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier): - if oauth_consumer.key == self.consumer.key and oauth_token.key == self.request_token.key and oauth_verifier == self.verifier: - # want to check here if token is authorized - # for mock store, we assume it is - return self.access_token - return None - - def authorize_request_token(self, oauth_token, user): - if oauth_token.key == self.request_token.key: - # authorize the request token in the store - # for mock store, do nothing - return self.request_token - return None - -class RequestHandler(BaseHTTPRequestHandler): - - def __init__(self, *args, **kwargs): - self.oauth_server = oauth.OAuthServer(MockOAuthDataStore()) - self.oauth_server.add_signature_method(oauth.OAuthSignatureMethod_PLAINTEXT()) - self.oauth_server.add_signature_method(oauth.OAuthSignatureMethod_HMAC_SHA1()) - BaseHTTPRequestHandler.__init__(self, *args, **kwargs) - - # example way to send an oauth error - def send_oauth_error(self, err=None): - # send a 401 error - self.send_error(401, str(err.message)) - # return the authenticate header - header = oauth.build_authenticate_header(realm=REALM) - for k, v in header.iteritems(): - self.send_header(k, v) - - def do_GET(self): - - # debug info - #print self.command, self.path, self.headers - - # get the post data (if any) - postdata = None - if self.command == 'POST': - try: - length = int(self.headers.getheader('content-length')) - postdata = self.rfile.read(length) - except: - pass - - # construct the oauth request from the request parameters - oauth_request = oauth.OAuthRequest.from_request(self.command, self.path, headers=self.headers, query_string=postdata) - - # request token - if self.path.startswith(REQUEST_TOKEN_URL): - try: - # create a request token - token = self.oauth_server.fetch_request_token(oauth_request) - # send okay response - self.send_response(200, 'OK') - self.end_headers() - # return the token - self.wfile.write(token.to_string()) - except oauth.OAuthError, err: - self.send_oauth_error(err) - return - - # user authorization - if self.path.startswith(AUTHORIZATION_URL): - try: - # get the request token - token = self.oauth_server.fetch_request_token(oauth_request) - # authorize the token (kind of does nothing for now) - token = self.oauth_server.authorize_token(token, None) - token.set_verifier(VERIFIER) - # send okay response - self.send_response(200, 'OK') - self.end_headers() - # return the callback url (to show server has it) - self.wfile.write(token.get_callback_url()) - except oauth.OAuthError, err: - self.send_oauth_error(err) - return - - # access token - if self.path.startswith(ACCESS_TOKEN_URL): - try: - # create an access token - token = self.oauth_server.fetch_access_token(oauth_request) - # send okay response - self.send_response(200, 'OK') - self.end_headers() - # return the token - self.wfile.write(token.to_string()) - except oauth.OAuthError, err: - self.send_oauth_error(err) - return - - # protected resources - if self.path.startswith(RESOURCE_URL): - try: - # verify the request has been oauth authorized - consumer, token, params = self.oauth_server.verify_request(oauth_request) - # send okay response - self.send_response(200, 'OK') - self.end_headers() - # return the extra parameters - just for something to return - self.wfile.write(str(params)) - except oauth.OAuthError, err: - self.send_oauth_error(err) - return - - def do_POST(self): - return self.do_GET() - -def main(): - try: - server = HTTPServer(('', 8080), RequestHandler) - print 'Test server running...' - server.serve_forever() - except KeyboardInterrupt: - server.socket.close() - -if __name__ == '__main__': - main() \ No newline at end of file