Source code for wheezy.security.crypto.ticket

""" ``crypto`` module.
"""

from base64 import b64decode
from base64 import b64encode
from binascii import Error as BinError
from hmac import new as hmac_new
from os import urandom
from struct import pack
from struct import unpack
from time import time
from warnings import warn

from wheezy.security.crypto.comp import aes128
from wheezy.security.crypto.comp import b
from wheezy.security.crypto.comp import block_size
from wheezy.security.crypto.comp import btos
from wheezy.security.crypto.comp import decrypt
from wheezy.security.crypto.comp import digest_size
from wheezy.security.crypto.comp import encrypt
from wheezy.security.crypto.comp import sha1
from wheezy.security.crypto.padding import pad
from wheezy.security.crypto.padding import unpad

BASE64_ALTCHARS = b('-~')
EMPTY = b('')
EPOCH = 1317212745


def ensure_strong_key(key, digestmod):
[docs] """ Translates a given key to a computed strong key of length 3 * digestmode.digest_size suitable for encryption, e.g. with digestmod set to ``sha1`` returns 480 bit (60 bytes) key. """ hmac = hmac_new(key, key, digestmod) k1 = hmac.digest() hmac.update(k1) k2 = hmac.digest() hmac.update(k2) return k1 + k2 + hmac.digest() def timestamp():
return int(time()) - EPOCH class Ticket(object):
[docs] """ Protects sensitive information (e.g. user id). Default policy applies verification and encryption. Verification is provided by ``hmac`` initialized with ``sha1`` digestmod. Encryption is provided if available, by default it attempts to use AES cypher. """ __slots__ = ('cypher', 'max_age', 'hmac', 'digest_size', 'block_size') def __init__(self, max_age=900, salt='', digestmod=None, cypher=aes128, options=None): self.max_age = max_age if not digestmod: warn('Ticket: digestmod is not specified, fallback to sha1', stacklevel=2) digestmod = sha1 options = options or {} key = b(salt + options.get('CRYPTO_VALIDATION_KEY', '')) key = ensure_strong_key(key, digestmod) self.hmac = hmac_new(key, digestmod=digestmod) self.digest_size = digest_size(digestmod) if cypher: key = b(salt + options.get('CRYPTO_ENCRYPTION_KEY', '')) key = ensure_strong_key(key, digestmod) self.cypher = cypher(key) self.block_size = block_size(self.cypher()) else: self.cypher = None warn('Ticket: cypher not available', stacklevel=2) def encode(self, value, encoding='UTF-8'):
[docs] """ Encode ``value`` according to ticket policy. """ value = b(value, encoding) expires = pack('<i', timestamp() + self.max_age) noise = urandom(12) value = EMPTY.join(( noise[:4], expires, noise[4:8], value, noise[8:] )) if self.cypher: value = encrypt(self.cypher(), pad(value, self.block_size)) return btos(b64encode(self.sign(value) + value, BASE64_ALTCHARS), 'latin1') def decode(self, value, encoding='UTF-8'):
[docs] """ Decode ``value`` according to ticket policy. """ if len(value) < 48: return (None, None) try: value = b64decode(b(value), BASE64_ALTCHARS) except (TypeError, BinError): return (None, None) signature = value[:self.digest_size] value = value[self.digest_size:] if signature != self.sign(value): return (None, None) if self.cypher: if len(value) % self.block_size != 0: return (None, None) value = unpad(decrypt(self.cypher(), value), self.block_size) if value is None: return (None, None) if len(value) < 16: # pragma: nocover return (None, None) expires, value = value[4:8], value[12:-4] time_left = unpack('<i', expires)[0] - timestamp() if time_left < 0 or time_left > self.max_age: return (None, None) try: return (btos(value, encoding), time_left) except UnicodeDecodeError: return (None, None) def sign(self, value):
[docs] """ Compute hmac digest. """ h = self.hmac.copy() h.update(value) return h.digest()