123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871 |
- """
- The MIT License
- Copyright (c) 2007-2010 Leah Culver, Joe Stump, Mark Paschal, Vic Fryzel
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
- all copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- THE SOFTWARE.
- """
- import base64
- import urllib
- import time
- import random
- import urlparse
- import hmac
- import binascii
- import httplib2
- try:
- from urlparse import parse_qs
- parse_qs # placate pyflakes
- except ImportError:
- # fall back for Python 2.5
- from cgi import parse_qs
- try:
- from hashlib import sha1
- sha = sha1
- except ImportError:
- # hashlib was added in Python 2.5
- import sha
- import _version
- __version__ = _version.__version__
- OAUTH_VERSION = '1.0' # Hi Blaine!
- HTTP_METHOD = 'GET'
- SIGNATURE_METHOD = 'PLAINTEXT'
- class Error(RuntimeError):
- """Generic exception class."""
- def __init__(self, message='OAuth error occurred.'):
- self._message = message
- @property
- def message(self):
- """A hack to get around the deprecation errors in 2.6."""
- return self._message
- def __str__(self):
- return self._message
- class MissingSignature(Error):
- pass
- def build_authenticate_header(realm=''):
- """Optional WWW-Authenticate header (401 error)"""
- return {'WWW-Authenticate': 'OAuth realm="%s"' % realm}
- def build_xoauth_string(url, consumer, token=None):
- """Build an XOAUTH string for use in SMTP/IMPA authentication."""
- request = Request.from_consumer_and_token(consumer, token,
- "GET", url)
- signing_method = SignatureMethod_HMAC_SHA1()
- request.sign_request(signing_method, consumer, token)
- params = []
- for k, v in sorted(request.iteritems()):
- if v is not None:
- params.append('%s="%s"' % (k, escape(v)))
- return "%s %s %s" % ("GET", url, ','.join(params))
- def to_unicode(s):
- """ Convert to unicode, raise exception with instructive error
- message if s is not unicode, ascii, or utf-8. """
- if not isinstance(s, unicode):
- if not isinstance(s, str):
- raise TypeError('You are required to pass either unicode or string here, not: %r (%s)' % (type(s), s))
- try:
- s = s.decode('utf-8')
- except UnicodeDecodeError, le:
- raise TypeError('You are required to pass either a unicode object or a utf-8 string here. You passed a Python string object which contained non-utf-8: %r. The UnicodeDecodeError that resulted from attempting to interpret it as utf-8 was: %s' % (s, le,))
- return s
- def to_utf8(s):
- return to_unicode(s).encode('utf-8')
- def to_unicode_if_string(s):
- if isinstance(s, basestring):
- return to_unicode(s)
- else:
- return s
- def to_utf8_if_string(s):
- if isinstance(s, basestring):
- return to_utf8(s)
- else:
- return s
- def to_unicode_optional_iterator(x):
- """
- Raise TypeError if x is a str containing non-utf8 bytes or if x is
- an iterable which contains such a str.
- """
- if isinstance(x, basestring):
- return to_unicode(x)
- try:
- l = list(x)
- except TypeError, e:
- assert 'is not iterable' in str(e)
- return x
- else:
- return [ to_unicode(e) for e in l ]
- def to_utf8_optional_iterator(x):
- """
- Raise TypeError if x is a str or if x is an iterable which
- contains a str.
- """
- if isinstance(x, basestring):
- return to_utf8(x)
- try:
- l = list(x)
- except TypeError, e:
- assert 'is not iterable' in str(e)
- return x
- else:
- return [ to_utf8_if_string(e) for e in l ]
- def escape(s):
- """Escape a URL including any /."""
- return urllib.quote(s.encode('utf-8'), safe='~')
- def generate_timestamp():
- """Get seconds since epoch (UTC)."""
- return int(time.time())
- def generate_nonce(length=8):
- """Generate pseudorandom number."""
- return ''.join([str(random.randint(0, 9)) for i in range(length)])
- def generate_verifier(length=8):
- """Generate pseudorandom number."""
- return ''.join([str(random.randint(0, 9)) for i in range(length)])
- class Consumer(object):
- """A consumer of OAuth-protected services.
-
- The OAuth consumer is a "third-party" service that wants to access
- protected resources from an OAuth service provider on behalf of an end
- user. It's kind of the OAuth client.
-
- Usually a consumer must be registered with the service provider by the
- developer of the consumer software. As part of that process, the service
- provider gives the consumer a *key* and a *secret* with which the consumer
- software can identify itself to the service. The consumer will include its
- key in each request to identify itself, but will use its secret only when
- signing requests, to prove that the request is from that particular
- registered consumer.
-
- Once registered, the consumer can then use its consumer credentials to ask
- the service provider for a request token, kicking off the OAuth
- authorization process.
- """
- key = None
- secret = None
- def __init__(self, key, secret):
- self.key = key
- self.secret = secret
- if self.key is None or self.secret is None:
- raise ValueError("Key and secret must be set.")
- def __str__(self):
- data = {'oauth_consumer_key': self.key,
- 'oauth_consumer_secret': self.secret}
- return urllib.urlencode(data)
- class Token(object):
- """An OAuth credential used to request authorization or a protected
- resource.
-
- Tokens in OAuth comprise a *key* and a *secret*. The key is included in
- requests to identify the token being used, but the secret is used only in
- the signature, to prove that the requester is who the server gave the
- token to.
-
- When first negotiating the authorization, the consumer asks for a *request
- token* that the live user authorizes with the service provider. The
- consumer then exchanges the request token for an *access token* that can
- be used to access protected resources.
- """
- key = None
- secret = None
- callback = None
- callback_confirmed = None
- verifier = None
- def __init__(self, key, secret):
- self.key = key
- self.secret = secret
- if self.key is None or self.secret is None:
- raise ValueError("Key and secret must be set.")
- def set_callback(self, callback):
- self.callback = callback
- self.callback_confirmed = 'true'
- def set_verifier(self, verifier=None):
- if verifier is not None:
- self.verifier = verifier
- else:
- self.verifier = generate_verifier()
- def get_callback_url(self):
- if self.callback and self.verifier:
- # Append the oauth_verifier.
- parts = urlparse.urlparse(self.callback)
- scheme, netloc, path, params, query, fragment = parts[:6]
- if query:
- query = '%s&oauth_verifier=%s' % (query, self.verifier)
- else:
- query = 'oauth_verifier=%s' % self.verifier
- return urlparse.urlunparse((scheme, netloc, path, params,
- query, fragment))
- return self.callback
- def to_string(self):
- """Returns this token as a plain string, suitable for storage.
-
- The resulting string includes the token's secret, so you should never
- send or store this string where a third party can read it.
- """
- data = {
- 'oauth_token': self.key,
- 'oauth_token_secret': self.secret,
- }
- if self.callback_confirmed is not None:
- data['oauth_callback_confirmed'] = self.callback_confirmed
- return urllib.urlencode(data)
-
- @staticmethod
- def from_string(s):
- """Deserializes a token from a string like one returned by
- `to_string()`."""
- if not len(s):
- raise ValueError("Invalid parameter string.")
- params = parse_qs(s, keep_blank_values=False)
- if not len(params):
- raise ValueError("Invalid parameter string.")
- try:
- key = params['oauth_token'][0]
- except Exception:
- raise ValueError("'oauth_token' not found in OAuth request.")
- try:
- secret = params['oauth_token_secret'][0]
- except Exception:
- raise ValueError("'oauth_token_secret' not found in "
- "OAuth request.")
- token = Token(key, secret)
- try:
- token.callback_confirmed = params['oauth_callback_confirmed'][0]
- except KeyError:
- pass # 1.0, no callback confirmed.
- return token
- def __str__(self):
- return self.to_string()
- def setter(attr):
- name = attr.__name__
-
- def getter(self):
- try:
- return self.__dict__[name]
- except KeyError:
- raise AttributeError(name)
-
- def deleter(self):
- del self.__dict__[name]
-
- return property(getter, attr, deleter)
- class Request(dict):
-
- """The parameters and information for an HTTP request, suitable for
- authorizing with OAuth credentials.
-
- When a consumer wants to access a service's protected resources, it does
- so using a signed HTTP request identifying itself (the consumer) with its
- key, and providing an access token authorized by the end user to access
- those resources.
-
- """
-
- version = OAUTH_VERSION
- def __init__(self, method=HTTP_METHOD, url=None, parameters=None,
- body='', is_form_encoded=False):
- if url is not None:
- self.url = to_unicode(url)
- self.method = method
- if parameters is not None:
- for k, v in parameters.iteritems():
- k = to_unicode(k)
- v = to_unicode_optional_iterator(v)
- self[k] = v
- self.body = body
- self.is_form_encoded = is_form_encoded
- @setter
- def url(self, value):
- self.__dict__['url'] = value
- if value is not None:
- scheme, netloc, path, params, query, fragment = urlparse.urlparse(value)
- # Exclude default port numbers.
- if scheme == 'http' and netloc[-3:] == ':80':
- netloc = netloc[:-3]
- elif scheme == 'https' and netloc[-4:] == ':443':
- netloc = netloc[:-4]
- if scheme not in ('http', 'https'):
- raise ValueError("Unsupported URL %s (%s)." % (value, scheme))
- # Normalized URL excludes params, query, and fragment.
- self.normalized_url = urlparse.urlunparse((scheme, netloc, path, None, None, None))
- else:
- self.normalized_url = None
- self.__dict__['url'] = None
-
- @setter
- def method(self, value):
- self.__dict__['method'] = value.upper()
-
- def _get_timestamp_nonce(self):
- return self['oauth_timestamp'], self['oauth_nonce']
-
- def get_nonoauth_parameters(self):
- """Get any non-OAuth parameters."""
- return dict([(k, v) for k, v in self.iteritems()
- if not k.startswith('oauth_')])
-
- def to_header(self, realm=''):
- """Serialize as a header for an HTTPAuth request."""
- oauth_params = ((k, v) for k, v in self.items()
- if k.startswith('oauth_'))
- stringy_params = ((k, escape(str(v))) for k, v in oauth_params)
- header_params = ('%s="%s"' % (k, v) for k, v in stringy_params)
- params_header = ', '.join(header_params)
-
- auth_header = 'OAuth realm="%s"' % realm
- if params_header:
- auth_header = "%s, %s" % (auth_header, params_header)
-
- return {'Authorization': auth_header}
-
- def to_postdata(self):
- """Serialize as post data for a POST request."""
- d = {}
- for k, v in self.iteritems():
- d[k.encode('utf-8')] = to_utf8_optional_iterator(v)
- # tell urlencode to deal with sequence values and map them correctly
- # to resulting querystring. for example self["k"] = ["v1", "v2"] will
- # result in 'k=v1&k=v2' and not k=%5B%27v1%27%2C+%27v2%27%5D
- return urllib.urlencode(d, True).replace('+', '%20')
-
- def to_url(self):
- """Serialize as a URL for a GET request."""
- base_url = urlparse.urlparse(self.url)
- try:
- query = base_url.query
- except AttributeError:
- # must be python <2.5
- query = base_url[4]
- query = parse_qs(query)
- for k, v in self.items():
- query.setdefault(k, []).append(v)
-
- try:
- scheme = base_url.scheme
- netloc = base_url.netloc
- path = base_url.path
- params = base_url.params
- fragment = base_url.fragment
- except AttributeError:
- # must be python <2.5
- scheme = base_url[0]
- netloc = base_url[1]
- path = base_url[2]
- params = base_url[3]
- fragment = base_url[5]
-
- url = (scheme, netloc, path, params,
- urllib.urlencode(query, True), fragment)
- return urlparse.urlunparse(url)
- def get_parameter(self, parameter):
- ret = self.get(parameter)
- if ret is None:
- raise Error('Parameter not found: %s' % parameter)
- return ret
- def get_normalized_parameters(self):
- """Return a string that contains the parameters that must be signed."""
- items = []
- for key, value in self.iteritems():
- if key == 'oauth_signature':
- continue
- # 1.0a/9.1.1 states that kvp must be sorted by key, then by value,
- # so we unpack sequence values into multiple items for sorting.
- if isinstance(value, basestring):
- items.append((to_utf8_if_string(key), to_utf8(value)))
- else:
- try:
- value = list(value)
- except TypeError, e:
- assert 'is not iterable' in str(e)
- items.append((to_utf8_if_string(key), to_utf8_if_string(value)))
- else:
- items.extend((to_utf8_if_string(key), to_utf8_if_string(item)) for item in value)
- # Include any query string parameters from the provided URL
- query = urlparse.urlparse(self.url)[4]
- url_items = self._split_url_string(query).items()
- url_items = [(to_utf8(k), to_utf8(v)) for k, v in url_items if k != 'oauth_signature' ]
- items.extend(url_items)
- items.sort()
- encoded_str = urllib.urlencode(items)
- # Encode signature parameters per Oauth Core 1.0 protocol
- # spec draft 7, section 3.6
- # (http://tools.ietf.org/html/draft-hammer-oauth-07#section-3.6)
- # Spaces must be encoded with "%20" instead of "+"
- return encoded_str.replace('+', '%20').replace('%7E', '~')
- def sign_request(self, signature_method, consumer, token):
- """Set the signature parameter to the result of sign."""
- if not self.is_form_encoded:
- # according to
- # http://oauth.googlecode.com/svn/spec/ext/body_hash/1.0/oauth-bodyhash.html
- # section 4.1.1 "OAuth Consumers MUST NOT include an
- # oauth_body_hash parameter on requests with form-encoded
- # request bodies."
- self['oauth_body_hash'] = base64.b64encode(sha(self.body).digest())
- if 'oauth_consumer_key' not in self:
- self['oauth_consumer_key'] = consumer.key
- if token and 'oauth_token' not in self:
- self['oauth_token'] = token.key
- self['oauth_signature_method'] = signature_method.name
- self['oauth_signature'] = signature_method.sign(self, consumer, token)
-
- @classmethod
- def make_timestamp(cls):
- """Get seconds since epoch (UTC)."""
- return str(int(time.time()))
-
- @classmethod
- def make_nonce(cls):
- """Generate pseudorandom number."""
- return str(random.randint(0, 100000000))
-
- @classmethod
- def from_request(cls, http_method, http_url, headers=None, parameters=None,
- query_string=None):
- """Combines multiple parameter sources."""
- if parameters is None:
- parameters = {}
-
- # Headers
- if headers and 'Authorization' in headers:
- auth_header = headers['Authorization']
- # Check that the authorization header is OAuth.
- if auth_header[:6] == 'OAuth ':
- auth_header = auth_header[6:]
- try:
- # Get the parameters from the header.
- header_params = cls._split_header(auth_header)
- parameters.update(header_params)
- except:
- raise Error('Unable to parse OAuth parameters from '
- 'Authorization header.')
-
- # GET or POST query string.
- if query_string:
- query_params = cls._split_url_string(query_string)
- parameters.update(query_params)
-
- # URL parameters.
- param_str = urlparse.urlparse(http_url)[4] # query
- url_params = cls._split_url_string(param_str)
- parameters.update(url_params)
-
- if parameters:
- return cls(http_method, http_url, parameters)
-
- return None
-
- @classmethod
- def from_consumer_and_token(cls, consumer, token=None,
- http_method=HTTP_METHOD, http_url=None, parameters=None,
- body='', is_form_encoded=False):
- if not parameters:
- parameters = {}
-
- defaults = {
- 'oauth_consumer_key': consumer.key,
- 'oauth_timestamp': cls.make_timestamp(),
- 'oauth_nonce': cls.make_nonce(),
- 'oauth_version': cls.version,
- }
-
- defaults.update(parameters)
- parameters = defaults
-
- if token:
- parameters['oauth_token'] = token.key
- if token.verifier:
- parameters['oauth_verifier'] = token.verifier
-
- return Request(http_method, http_url, parameters, body=body,
- is_form_encoded=is_form_encoded)
-
- @classmethod
- def from_token_and_callback(cls, token, callback=None,
- http_method=HTTP_METHOD, http_url=None, parameters=None):
- if not parameters:
- parameters = {}
-
- parameters['oauth_token'] = token.key
-
- if callback:
- parameters['oauth_callback'] = callback
-
- return cls(http_method, http_url, parameters)
-
- @staticmethod
- def _split_header(header):
- """Turn Authorization: header into parameters."""
- params = {}
- parts = header.split(',')
- for param in parts:
- # Ignore realm parameter.
- if param.find('realm') > -1:
- continue
- # Remove whitespace.
- param = param.strip()
- # Split key-value.
- param_parts = param.split('=', 1)
- # Remove quotes and unescape the value.
- params[param_parts[0]] = urllib.unquote(param_parts[1].strip('\"'))
- return params
-
- @staticmethod
- def _split_url_string(param_str):
- """Turn URL string into parameters."""
- parameters = parse_qs(param_str.encode('utf-8'), keep_blank_values=True)
- for k, v in parameters.iteritems():
- parameters[k] = urllib.unquote(v[0])
- return parameters
- class Client(httplib2.Http):
- """OAuthClient is a worker to attempt to execute a request."""
- def __init__(self, consumer, token=None, cache=None, timeout=None,
- proxy_info=None):
- if consumer is not None and not isinstance(consumer, Consumer):
- raise ValueError("Invalid consumer.")
- if token is not None and not isinstance(token, Token):
- raise ValueError("Invalid token.")
- self.consumer = consumer
- self.token = token
- self.method = SignatureMethod_HMAC_SHA1()
- httplib2.Http.__init__(self, cache=cache, timeout=timeout, proxy_info=proxy_info)
- def set_signature_method(self, method):
- if not isinstance(method, SignatureMethod):
- raise ValueError("Invalid signature method.")
- self.method = method
- def request(self, uri, method="GET", body='', headers=None,
- redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None):
- DEFAULT_POST_CONTENT_TYPE = 'application/x-www-form-urlencoded'
- if not isinstance(headers, dict):
- headers = {}
- if method == "POST":
- headers['Content-Type'] = headers.get('Content-Type',
- DEFAULT_POST_CONTENT_TYPE)
- is_form_encoded = \
- headers.get('Content-Type') == 'application/x-www-form-urlencoded'
- if is_form_encoded and body:
- parameters = parse_qs(body)
- else:
- parameters = None
- req = Request.from_consumer_and_token(self.consumer,
- token=self.token, http_method=method, http_url=uri,
- parameters=parameters, body=body, is_form_encoded=is_form_encoded)
- req.sign_request(self.method, self.consumer, self.token)
- schema, rest = urllib.splittype(uri)
- if rest.startswith('//'):
- hierpart = '//'
- else:
- hierpart = ''
- host, rest = urllib.splithost(rest)
- realm = schema + ':' + hierpart + host
- if is_form_encoded:
- body = req.to_postdata()
- elif method == "GET":
- uri = req.to_url()
- else:
- headers.update(req.to_header(realm=realm))
- try:
- return httplib2.Http.request(self, uri, method=method, body=body,
- headers=headers, redirections=redirections,
- connection_type=connection_type)
- except Exception as e:
- if (type(e[0]) is unicode or type(e[0]) is str) and e[0][0:53] == "Server presented certificate that does not match host":
- httplib2.Http.disable_ssl_certificate_validation = True
- return httplib2.Http.request(self, uri, method=method, body=body,
- headers=headers, redirections=redirections,
- connection_type=connection_type)
- else:
- raise Error(e[0])
- class Server(object):
- """A skeletal implementation of a service provider, providing protected
- resources to requests from authorized consumers.
-
- This class implements the logic to check requests for authorization. You
- can use it with your web server or web framework to protect certain
- resources with OAuth.
- """
- timestamp_threshold = 300 # In seconds, five minutes.
- version = OAUTH_VERSION
- signature_methods = None
- def __init__(self, signature_methods=None):
- self.signature_methods = signature_methods or {}
- def add_signature_method(self, signature_method):
- self.signature_methods[signature_method.name] = signature_method
- return self.signature_methods
- def verify_request(self, request, consumer, token):
- """Verifies an api call and checks all the parameters."""
- self._check_version(request)
- self._check_signature(request, consumer, token)
- parameters = request.get_nonoauth_parameters()
- return parameters
- def build_authenticate_header(self, realm=''):
- """Optional support for the authenticate header."""
- return {'WWW-Authenticate': 'OAuth realm="%s"' % realm}
- def _check_version(self, request):
- """Verify the correct version of the request for this server."""
- version = self._get_version(request)
- if version and version != self.version:
- raise Error('OAuth version %s not supported.' % str(version))
- def _get_version(self, request):
- """Return the version of the request for this server."""
- try:
- version = request.get_parameter('oauth_version')
- except:
- version = OAUTH_VERSION
- return version
- def _get_signature_method(self, request):
- """Figure out the signature with some defaults."""
- try:
- signature_method = request.get_parameter('oauth_signature_method')
- except:
- signature_method = SIGNATURE_METHOD
- try:
- # Get the signature method object.
- signature_method = self.signature_methods[signature_method]
- except:
- signature_method_names = ', '.join(self.signature_methods.keys())
- raise Error('Signature method %s not supported try one of the following: %s' % (signature_method, signature_method_names))
- return signature_method
- def _get_verifier(self, request):
- return request.get_parameter('oauth_verifier')
- def _check_signature(self, request, consumer, token):
- timestamp, nonce = request._get_timestamp_nonce()
- self._check_timestamp(timestamp)
- signature_method = self._get_signature_method(request)
- try:
- signature = request.get_parameter('oauth_signature')
- except:
- raise MissingSignature('Missing oauth_signature.')
- # Validate the signature.
- valid = signature_method.check(request, consumer, token, signature)
- if not valid:
- key, base = signature_method.signing_base(request, consumer, token)
- raise Error('Invalid signature. Expected signature base '
- 'string: %s' % base)
- def _check_timestamp(self, timestamp):
- """Verify that timestamp is recentish."""
- timestamp = int(timestamp)
- now = int(time.time())
- lapsed = now - timestamp
- if lapsed > self.timestamp_threshold:
- raise Error('Expired timestamp: given %d and now %s has a '
- 'greater difference than threshold %d' % (timestamp, now,
- self.timestamp_threshold))
- class SignatureMethod(object):
- """A way of signing requests.
-
- The OAuth protocol lets consumers and service providers pick a way to sign
- requests. This interface shows the methods expected by the other `oauth`
- modules for signing requests. Subclass it and implement its methods to
- provide a new way to sign requests.
- """
- def signing_base(self, request, consumer, token):
- """Calculates the string that needs to be signed.
- This method returns a 2-tuple containing the starting key for the
- signing and the message to be signed. The latter may be used in error
- messages to help clients debug their software.
- """
- raise NotImplementedError
- def sign(self, request, consumer, token):
- """Returns the signature for the given request, based on the consumer
- and token also provided.
- You should use your implementation of `signing_base()` to build the
- message to sign. Otherwise it may be less useful for debugging.
- """
- raise NotImplementedError
- def check(self, request, consumer, token, signature):
- """Returns whether the given signature is the correct signature for
- the given consumer and token signing the given request."""
- built = self.sign(request, consumer, token)
- return built == signature
- class SignatureMethod_HMAC_SHA1(SignatureMethod):
- name = 'HMAC-SHA1'
- def signing_base(self, request, consumer, token):
- if not hasattr(request, 'normalized_url') or request.normalized_url is None:
- raise ValueError("Base URL for request is not set.")
- sig = (
- escape(request.method),
- escape(request.normalized_url),
- escape(request.get_normalized_parameters()),
- )
- key = '%s&' % escape(consumer.secret)
- if token:
- key += escape(token.secret)
- raw = '&'.join(sig)
- return key, raw
- def sign(self, request, consumer, token):
- """Builds the base signature string."""
- key, raw = self.signing_base(request, consumer, token)
- hashed = hmac.new(key, raw, sha)
- # Calculate the digest base 64.
- return binascii.b2a_base64(hashed.digest())[:-1]
- class SignatureMethod_PLAINTEXT(SignatureMethod):
- name = 'PLAINTEXT'
- def signing_base(self, request, consumer, token):
- """Concatenates the consumer key and secret with the token's
- secret."""
- sig = '%s&' % escape(consumer.secret)
- if token:
- sig = sig + escape(token.secret)
- return sig, sig
- def sign(self, request, consumer, token):
- key, raw = self.signing_base(request, consumer, token)
- return raw
|