Source code for biryani.jwtconv

# -*- coding: utf-8 -*-


# Biryani -- A conversion and validation toolbox
# By: Emmanuel Raviart <emmanuel@raviart.com>
#
# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Emmanuel Raviart
# http://packages.python.org/Biryani/
#
# This file is part of Biryani.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Converters for JSON Web Tokens (JWT)

cf http://tools.ietf.org/html/draft-jones-json-web-token
"""


import calendar
import datetime
from struct import pack
import zlib

from Crypto import Random
from Crypto.Cipher import AES as Cipher_AES, PKCS1_v1_5 as Cipher_PKCS1_v1_5, PKCS1_OAEP as Cipher_PKCS1_OAEP
from Crypto.Signature import PKCS1_v1_5 as Signature_PKCS1_v1_5
from Crypto.Hash import HMAC, SHA256, SHA384, SHA512
from Crypto.PublicKey import RSA
from Crypto.Util import number

from . import gcm, states
from .base64conv import base64_to_bytes, make_base64url_to_bytes, make_bytes_to_base64url
from .baseconv import (check, cleanup_line, get, make_input_to_url, N_, noop, not_none, pipe, struct, test,
    test_greater_or_equal, test_in, test_isinstance, test_less_or_equal, uniform_sequence)
from .jsonconv import make_json_to_str, make_input_to_json
from .jwkconv import json_to_json_web_key


__all__ = [
    'decode_json_web_token',
    'decode_json_web_token_claims',
    'decoded_json_web_token_to_json',
    'decrypt_json_web_token',
    'derive_key',
    'encrypt_json_web_token',
    'input_to_json_web_token',
    'make_json_to_json_web_token',
    'make_payload_to_json_web_token',
    'sign_json_web_token',
    'verify_decoded_json_web_token_signature',
    'verify_decoded_json_web_token_time',
    ]

digest_constructor_by_size = {
    256: SHA256,
    384: SHA384,
    512: SHA512,
    }
valid_encryption_algorithms = (
    u'A128KW',
    u'A256KW',
    # u'ECDH-ES',
    u'RSA1_5',
    u'RSA-OAEP',
    )
valid_encryption_methods = (
    u'A128CBC',
    u'A256CBC',
    u'A128GCM',
    u'A256GCM',
    )
valid_integrity_algorithms = (
    u'HS256',
    u'HS384',
    u'HS512',
    )
valid_key_derivation_functions = (
    u'CS256',
    u'CS384',
    u'CS512',
    )
valid_signature_algorithms = (
    # u'ES256',
    u'HS256',
    u'HS384',
    u'HS512',
    u'RS256',
    u'RS384',
    u'RS512',
    )


[docs]def decode_json_web_token(token, state = None): """Decode a JSON Web Token, without converting payload to JSON claims, nor verifying its content.""" if token is None: return None, None if state is None: state = states.default_state errors = {} decoded_token = dict(token = token) try: decoded_token['secured_input'], decoded_token['encoded_signature'] = str(token).rsplit('.', 1) decoded_token['encoded_header'], decoded_token['encoded_payload'] = decoded_token['secured_input'].split('.', 1) except: return decoded_token, dict(token = state._(u'Invalid format')) errors = {} header, error = pipe( make_base64url_to_bytes(add_padding = True), make_input_to_json(), )(decoded_token['encoded_header'], state = state) if error is None: decoded_token['header'] = header else: errors['encoded_header'] = state._(u'Invalid format') payload, error = pipe( make_base64url_to_bytes(add_padding = True), not_none, )(decoded_token['encoded_payload'], state = state) if error is None: decoded_token['payload'] = payload else: payload = None errors['encoded_payload'] = state._(u'Invalid format') signature, error = make_base64url_to_bytes(add_padding = True)(decoded_token['encoded_signature'], state = state) if error is None: decoded_token['signature'] = signature else: errors['encoded_signature'] = state._(u'Invalid format') if decoded_token['header'].get('typ', u'JWT') not in (u'JWT', u'urn:ietf:params:oauth:token-type:jwt'): return decoded_token, dict(header = dict(typ = state._(u'Not a JSON Web Token'))) return decoded_token, errors or None
[docs]def decode_json_web_token_claims(decoded_token, state = None): if decoded_token is None: return None, None if state is None: state = states.default_state claims, errors = pipe( make_input_to_json(), test_isinstance(dict), struct( dict( aud = pipe( test_isinstance(basestring), cleanup_line, ), exp = pipe( test_isinstance((int, long)), test_greater_or_equal(0), ), iat = pipe( test_isinstance((int, long)), test_greater_or_equal(0), ), iss = pipe( test_isinstance(basestring), cleanup_line, ), jti = pipe( test_isinstance(basestring), cleanup_line, ), nbf = pipe( test_isinstance((int, long)), test_greater_or_equal(0), ), prn = pipe( test_isinstance(basestring), cleanup_line, ), typ = pipe( test_isinstance(basestring), cleanup_line, ), ), default = noop, ), )(decoded_token.get('payload'), state = state) if errors is not None: return decoded_token, dict(claims = errors) decoded_token['claims'] = claims return decoded_token, None
decoded_json_web_token_to_json = get('claims')
[docs]def decrypt_json_web_token(private_key = None, require_encrypted_token = False, shared_secret = None): """Return a converter that decrypts a JSON Web Token and returns a non crypted JSON Web Token. >>> from Crypto.PublicKey import RSA >>> from Crypto.Util import number >>> # Mike Jones Test 1 >>> plaintext_bytes_list = [78, 111, 119, 32, 105, 115, 32, 116, 104, 101, 32, 116, 105, 109, 101, 32, ... 102, 111, 114, 32, 97, 108, 108, 32, 103, 111, 111, 100, 32, 109, 101, 110, ... 32, 116, 111, 32, 99, 111, 109, 101, 32, 116, 111, 32, 116, 104, 101, 32, ... 97, 105, 100, 32, 111, 102, 32, 116, 104, 101, 105, 114, 32, 99, 111, 117, ... 110, 116, 114, 121, 46] >>> plaintext = ''.join(chr(byte) for byte in plaintext_bytes_list) >>> jwt = check(make_payload_to_json_web_token())(plaintext) >>> jwt 'eyJhbGciOiJub25lIn0.Tm93IGlzIHRoZSB0aW1lIGZvciBhbGwgZ29vZCBtZW4gdG8gY29tZSB0byB0aGUgYWlkIG9mIHRoZWlyIGNvdW50cnku.' >>> cmk_bytes_list = [4, 211, 31, 197, 84, 157, 252, 254, 11, 100, 157, 250, 63, 170, 106, 206, ... 107, 124, 212, 45, 111, 107, 9, 219, 200, 177, 0, 240, 143, 156, 44, 207] >>> cmk = ''.join(chr(byte) for byte in cmk_bytes_list) >>> iv_bytes_list = [3, 22, 60, 12, 43, 67, 104, 105, 108, 108, 105, 99, 111, 116, 104, 101] >>> iv = ''.join(chr(byte) for byte in iv_bytes_list) >>> key_modulus_bytes_list = [177, 119, 33, 13, 164, 30, 108, 121, 207, 136, 107, 242, 12, 224, 19, 226, ... 198, 134, 17, 71, 173, 75, 42, 61, 48, 162, 206, 161, 97, 108, 185, 234, ... 226, 219, 118, 206, 118, 5, 169, 224, 60, 181, 90, 85, 51, 123, 6, 224, ... 4, 122, 29, 230, 151, 12, 244, 127, 121, 25, 4, 85, 220, 144, 215, 110, ... 130, 17, 68, 228, 129, 138, 7, 130, 231, 40, 212, 214, 17, 179, 28, 124, ... 151, 178, 207, 20, 14, 154, 222, 113, 176, 24, 198, 73, 211, 113, 9, 33, ... 178, 80, 13, 25, 21, 25, 153, 212, 206, 67, 154, 147, 70, 194, 192, 183, ... 160, 83, 98, 236, 175, 85, 23, 97, 75, 199, 177, 73, 145, 50, 253, 206, ... 32, 179, 254, 236, 190, 82, 73, 67, 129, 253, 252, 220, 108, 136, 138, 11, ... 192, 1, 36, 239, 228, 55, 81, 113, 17, 25, 140, 63, 239, 146, 3, 172, ... 96, 60, 227, 233, 64, 255, 224, 173, 225, 228, 229, 92, 112, 72, 99, 97, ... 26, 87, 187, 123, 46, 50, 90, 202, 117, 73, 10, 153, 47, 224, 178, 163, ... 77, 48, 46, 154, 33, 148, 34, 228, 33, 172, 216, 89, 46, 225, 127, 68, ... 146, 234, 30, 147, 54, 146, 5, 133, 45, 78, 254, 85, 55, 75, 213, 86, ... 194, 218, 215, 163, 189, 194, 54, 6, 83, 36, 18, 153, 53, 7, 48, 89, ... 35, 66, 144, 7, 65, 154, 13, 97, 75, 55, 230, 132, 3, 13, 239, 71] >>> key_modulus = ''.join(chr(byte) for byte in key_modulus_bytes_list) >>> key_public_exponent_bytes_list = [1, 0, 1] >>> key_public_exponent = ''.join(chr(byte) for byte in key_public_exponent_bytes_list) >>> public_key = RSA.construct((number.bytes_to_long(key_modulus), ... number.bytes_to_long(key_public_exponent))) >>> public_key_as_encoded_str = public_key.exportKey() >>> print public_key_as_encoded_str -----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsXchDaQebHnPiGvyDOAT 4saGEUetSyo9MKLOoWFsueri23bOdgWp4Dy1WlUzewbgBHod5pcM9H95GQRV3JDX boIRROSBigeC5yjU1hGzHHyXss8UDprecbAYxknTcQkhslANGRUZmdTOQ5qTRsLA t6BTYuyvVRdhS8exSZEy/c4gs/7svlJJQ4H9/NxsiIoLwAEk7+Q3UXERGYw/75ID rGA84+lA/+Ct4eTlXHBIY2EaV7t7LjJaynVJCpkv4LKjTTAumiGUIuQhrNhZLuF/ RJLqHpM2kgWFLU7+VTdL1VbC2tejvcI2BlMkEpk1BzBZI0KQB0GaDWFLN+aEAw3v RwIDAQAB -----END PUBLIC KEY----- >>> encrypted_key_bytes_list = [32, 242, 63, 207, 94, 246, 133, 37, 135, 48, 88, 4, 15, 193, 6, 244, ... 51, 58, 132, 133, 212, 255, 163, 90, 59, 80, 200, 152, 41, 244, 188, 215, ... 174, 160, 26, 188, 227, 180, 165, 234, 172, 63, 24, 116, 152, 28, 149, 16, ... 94, 213, 201, 171, 180, 191, 11, 21, 149, 172, 143, 54, 194, 58, 206, 201, ... 164, 28, 107, 155, 75, 101, 22, 92, 227, 144, 95, 40, 119, 170, 7, 36, ... 225, 40, 141, 186, 213, 7, 175, 16, 174, 122, 75, 32, 48, 193, 119, 202, ... 41, 152, 210, 190, 68, 57, 119, 4, 197, 74, 7, 242, 239, 170, 204, 73, ... 75, 213, 202, 113, 216, 18, 23, 66, 106, 208, 69, 244, 117, 147, 2, 37, ... 207, 199, 184, 96, 102, 44, 70, 212, 87, 143, 253, 0, 166, 59, 41, 115, ... 217, 80, 165, 87, 38, 5, 9, 184, 202, 68, 67, 176, 4, 87, 254, 166, ... 227, 88, 124, 238, 249, 75, 114, 205, 148, 149, 45, 78, 193, 134, 64, 189, ... 168, 76, 170, 76, 176, 72, 148, 77, 215, 159, 146, 55, 189, 213, 85, 253, ... 135, 200, 59, 247, 79, 37, 22, 200, 32, 110, 53, 123, 54, 39, 9, 178, ... 231, 238, 95, 25, 211, 143, 87, 220, 88, 138, 209, 13, 227, 72, 58, 102, ... 164, 136, 241, 14, 14, 45, 32, 77, 44, 244, 162, 239, 150, 248, 181, 138, ... 251, 116, 245, 205, 137, 78, 34, 34, 10, 6, 59, 4, 197, 2, 153, 251] >>> encrypted_key = ''.join(chr(byte) for byte in encrypted_key_bytes_list) >>> encryptor = encrypt_json_web_token(algorithm = 'RSA1_5', content_master_key = cmk, ... encrypted_key = encrypted_key, initialization_vector = iv, integrity = 'HS256', method = 'A128CBC', ... public_key_as_encoded_str = public_key_as_encoded_str) >>> jwe = check(encryptor)(jwt) >>> jwe 'eyJhbGciOiJSU0ExXzUiLCJlbmMiOiJBMTI4Q0JDIiwiaW50IjoiSFMyNTYiLCJpdiI6IkF4WThEQ3REYUdsc2JHbGpiM1JvWlEifQ.IPI_z172h\ SWHMFgED8EG9DM6hIXU_6NaO1DImCn0vNeuoBq847Sl6qw_GHSYHJUQXtXJq7S_CxWVrI82wjrOyaQca5tLZRZc45BfKHeqByThKI261QevEK56SyAwwX\ fKKZjSvkQ5dwTFSgfy76rMSUvVynHYEhdCatBF9HWTAiXPx7hgZixG1FeP_QCmOylz2VClVyYFCbjKREOwBFf-puNYfO75S3LNlJUtTsGGQL2oTKpMsEi\ UTdefkje91VX9h8g7908lFsggbjV7NicJsufuXxnTj1fcWIrRDeNIOmakiPEODi0gTSz0ou-W-LWK-3T1zYlOIiIKBjsExQKZ-w._Z_djlIoC4MDSCKir\ eWS2beti4Q6iSG2UjFujQvdz-_PQdUcFNkOulegD6BgjgdFLjeB4HHOO7UHvP8PEDu0a0sA2a_-CI0w2YQQ2QQe35M.c41k4T4eAgCCt63m8ZNmiOinMc\ iFFypOFpvid7i6D0k' >>> key_private_exponent_bytes_list = [84, 80, 150, 58, 165, 235, 242, 123, 217, 55, 38, 154, 36, 181, 221, 156, ... 211, 215, 100, 164, 90, 88, 40, 228, 83, 148, 54, 122, 4, 16, 165, 48, ... 76, 194, 26, 107, 51, 53, 179, 165, 31, 18, 198, 173, 78, 61, 56, 97, ... 252, 158, 140, 80, 63, 25, 223, 156, 36, 203, 214, 252, 120, 67, 180, 167, ... 3, 82, 243, 25, 97, 214, 83, 133, 69, 16, 104, 54, 160, 200, 41, 83, ... 164, 187, 70, 153, 111, 234, 242, 158, 175, 28, 198, 48, 211, 45, 148, 58, ... 23, 62, 227, 74, 52, 117, 42, 90, 41, 249, 130, 154, 80, 119, 61, 26, ... 193, 40, 125, 10, 152, 174, 227, 225, 205, 32, 62, 66, 6, 163, 100, 99, ... 219, 19, 253, 25, 105, 80, 201, 29, 252, 157, 237, 69, 1, 80, 171, 167, ... 20, 196, 156, 109, 249, 88, 0, 3, 152, 38, 165, 72, 87, 6, 152, 71, ... 156, 214, 16, 71, 30, 82, 51, 103, 76, 218, 63, 9, 84, 163, 249, 91, ... 215, 44, 238, 85, 101, 240, 148, 1, 82, 224, 91, 135, 105, 127, 84, 171, ... 181, 152, 210, 183, 126, 24, 46, 196, 90, 173, 38, 245, 219, 186, 222, 27, ... 240, 212, 194, 15, 66, 135, 226, 178, 190, 52, 245, 74, 65, 224, 81, 100, ... 85, 25, 204, 165, 203, 187, 175, 84, 100, 82, 15, 11, 23, 202, 151, 107, ... 54, 41, 207, 3, 136, 229, 134, 131, 93, 139, 50, 182, 204, 93, 130, 89] >>> key_private_exponent = ''.join(chr(byte) for byte in key_private_exponent_bytes_list) >>> private_key = RSA.construct((number.bytes_to_long(key_modulus), ... number.bytes_to_long(key_public_exponent), number.bytes_to_long(key_private_exponent))) >>> private_key_as_encoded_str = private_key.exportKey() >>> print private_key_as_encoded_str -----BEGIN RSA PRIVATE KEY----- MIIEpAIBAAKCAQEAsXchDaQebHnPiGvyDOAT4saGEUetSyo9MKLOoWFsueri23bO dgWp4Dy1WlUzewbgBHod5pcM9H95GQRV3JDXboIRROSBigeC5yjU1hGzHHyXss8U DprecbAYxknTcQkhslANGRUZmdTOQ5qTRsLAt6BTYuyvVRdhS8exSZEy/c4gs/7s vlJJQ4H9/NxsiIoLwAEk7+Q3UXERGYw/75IDrGA84+lA/+Ct4eTlXHBIY2EaV7t7 LjJaynVJCpkv4LKjTTAumiGUIuQhrNhZLuF/RJLqHpM2kgWFLU7+VTdL1VbC2tej vcI2BlMkEpk1BzBZI0KQB0GaDWFLN+aEAw3vRwIDAQABAoIBAFRQljql6/J72Tcm miS13ZzT12SkWlgo5FOUNnoEEKUwTMIaazM1s6UfEsatTj04YfyejFA/Gd+cJMvW /HhDtKcDUvMZYdZThUUQaDagyClTpLtGmW/q8p6vHMYw0y2UOhc+40o0dSpaKfmC mlB3PRrBKH0KmK7j4c0gPkIGo2Rj2xP9GWlQyR38ne1FAVCrpxTEnG35WAADmCal SFcGmEec1hBHHlIzZ0zaPwlUo/lb1yzuVWXwlAFS4FuHaX9Uq7WY0rd+GC7EWq0m 9du63hvw1MIPQofisr409UpB4FFkVRnMpcu7r1RkUg8LF8qXazYpzwOI5YaDXYsy tsxdglkCgYEAuKlCKvKv/ZJMVcdIs5vVSU/6cPtYI1ljWytExV/skstvRSNi9r66 jdd9+yBhVfuG4shsp2j7rGnIio901RBeHo6TPKWVVykPu1iYhQXw1jIABfw+MVsN +3bQ76WLdt2SDxsHs7q7zPyUyHXmps7ycZ5c72wGkUwNOjYelmkiNS0CgYEA9gY2 w6I6S6L0juEKsbeDAwpd9WMfgqFoeA9vEyEUuk4kLwBKcoe1x4HG68ik918hdDSE 9vDQSccA3xXHOAFOPJ8R9EeIAbTi1VwBYnbTp87X+xcPWlEPkrdoUKW60tgs1aNd /Nnc9LEVVPMS390zbFxt8TN/biaBgelNgbC95sMCgYEAo/8V14SezckO6CNLKs/b tPdFiO9/kC1DsuUTd2LAfIIVeMZ7jn1Gus/Ff7B7IVx3p5KuBGOVF8L+qifLb6nQ nLysgHDh132NDioZkhH7mI7hPG+PYE/odApKdnqECHWw0J+F0JWnUd6D2B/1TvF9 mXA2Qx+iGYn8OVV1Bsmp6qUCgYEAw0kZbV63cVRvVX6yk3C8cMxo2qCM4Y8nsq1l mMSYhG4EcL6FWbX5h9yuvngs4iLEFk6eALoUS4vIWEwcL4txw9LsWH/zKI+hwoRe oP77cOdSL4AVcraHawlkpyd2TWjE5evgbhWtOxnZee3cXJBkAi64Ik6jZxbvk+RR 3pEhnCsCgYBd9Pl1dGQ/4PlGika1bezRFWlnNnPxIHPWrnWRxBCKYr+lycWOUPk1 clEAVQXzLHVIigANNKdVH6h4PfI/mgcrzkQnYCaBv68CjX2Rv9r42T6P2DALQfT+ 2vQxZSMnRNS2mCV8et5ZnWcjHV9kN1aEjVU/o8fDuia+fAaDbe190g== -----END RSA PRIVATE KEY----- >>> decryptor = decrypt_json_web_token(private_key = private_key_as_encoded_str) >>> decrypted_jwt = check(decryptor)(jwe) >>> decrypted_jwt 'eyJhbGciOiJub25lIn0.Tm93IGlzIHRoZSB0aW1lIGZvciBhbGwgZ29vZCBtZW4gdG8gY29tZSB0byB0aGUgYWlkIG9mIHRoZWlyIGNvdW50cnku.' >>> decrypted_jwt == jwt True >>> decoded_jwt = check(decode_json_web_token)(decrypted_jwt) >>> decoded_jwt['payload'] 'Now is the time for all good men to come to the aid of their country.' >>> decoded_jwt['payload'] == plaintext True >>> # Same test with random keys. >>> encryptor = encrypt_json_web_token(algorithm = 'RSA1_5', integrity = 'HS256', method = 'A128CBC', ... public_key_as_encoded_str = public_key_as_encoded_str) >>> jwe = check(encryptor)(jwt) >>> decryptor = decrypt_json_web_token(private_key = private_key_as_encoded_str) >>> decrypted_jwt = check(decryptor)(jwe) >>> decrypted_jwt 'eyJhbGciOiJub25lIn0.Tm93IGlzIHRoZSB0aW1lIGZvciBhbGwgZ29vZCBtZW4gdG8gY29tZSB0byB0aGUgYWlkIG9mIHRoZWlyIGNvdW50cnku.' >>> decrypted_jwt == jwt True >>> # Mike Jones Test 2 >>> plaintext_bytes_list = [76, 105, 118, 101, 32, 108, 111, 110, 103, 32, 97, 110, 100, 32, 112, 114, ... 111, 115, 112, 101, 114, 46] >>> plaintext = ''.join(chr(byte) for byte in plaintext_bytes_list) >>> jwt = check(make_payload_to_json_web_token())(plaintext) >>> cmk_bytes_list = [177, 161, 244, 128, 84, 143, 225, 115, 63, 180, 3, 255, 107, 154, 212, 246, ... 138, 7, 110, 91, 112, 46, 34, 105, 47, 130, 203, 46, 122, 234, 64, 252] >>> cmk = ''.join(chr(byte) for byte in cmk_bytes_list) >>> iv_bytes_list = [227, 197, 117, 252, 2, 219, 233, 68, 180, 225, 77, 219] >>> iv = ''.join(chr(byte) for byte in iv_bytes_list) >>> key_modulus_bytes_list = [161, 168, 84, 34, 133, 176, 208, 173, 46, 176, 163, 110, 57, 30, 135, 227, ... 9, 31, 226, 128, 84, 92, 116, 241, 70, 248, 27, 227, 193, 62, 5, 91, ... 241, 145, 224, 205, 141, 176, 184, 133, 239, 43, 81, 103, 9, 161, 153, 157, ... 179, 104, 123, 51, 189, 34, 152, 69, 97, 69, 78, 93, 140, 131, 87, 182, ... 169, 101, 92, 142, 3, 22, 167, 8, 212, 56, 35, 79, 210, 222, 192, 208, ... 252, 49, 109, 138, 173, 253, 210, 166, 201, 63, 102, 74, 5, 158, 41, 90, ... 144, 108, 160, 79, 10, 89, 222, 231, 172, 31, 227, 197, 0, 19, 72, 81, ... 138, 78, 136, 221, 121, 118, 196, 17, 146, 10, 244, 188, 72, 113, 55, 221, ... 162, 217, 171, 27, 57, 233, 210, 101, 236, 154, 199, 56, 138, 239, 101, 48, ... 198, 186, 202, 160, 76, 111, 234, 71, 57, 183, 5, 211, 171, 136, 126, 64, ... 40, 75, 58, 89, 244, 254, 107, 84, 103, 7, 236, 69, 163, 18, 180, 251, ... 58, 153, 46, 151, 174, 12, 103, 197, 181, 161, 162, 55, 250, 235, 123, 110, ... 17, 11, 158, 24, 47, 133, 8, 199, 235, 107, 126, 130, 246, 73, 195, 20, ... 108, 202, 176, 214, 187, 45, 146, 182, 118, 54, 32, 200, 61, 201, 71, 243, ... 1, 255, 131, 84, 37, 111, 211, 168, 228, 45, 192, 118, 27, 197, 235, 232, ... 36, 10, 230, 248, 190, 82, 182, 140, 35, 204, 108, 190, 253, 186, 186, 27] >>> key_modulus = ''.join(chr(byte) for byte in key_modulus_bytes_list) >>> key_public_exponent_bytes_list = [1, 0, 1] >>> key_public_exponent = ''.join(chr(byte) for byte in key_public_exponent_bytes_list) >>> public_key = RSA.construct((number.bytes_to_long(key_modulus), ... number.bytes_to_long(key_public_exponent))) >>> public_key_as_encoded_str = public_key.exportKey() >>> encrypted_key_bytes_list = [142, 252, 40, 202, 21, 177, 56, 198, 232, 7, 151, 49, 95, 169, 220, 2, ... 46, 214, 167, 116, 57, 20, 164, 109, 150, 98, 49, 223, 154, 95, 71, 209, ... 233, 17, 174, 142, 203, 232, 132, 167, 17, 42, 51, 125, 22, 221, 135, 17, ... 67, 197, 148, 246, 139, 145, 160, 238, 99, 119, 171, 95, 117, 202, 87, 251, ... 101, 254, 58, 215, 135, 195, 135, 103, 49, 119, 76, 46, 49, 198, 27, 31, ... 58, 44, 192, 222, 21, 16, 13, 216, 161, 179, 236, 65, 143, 38, 43, 218, ... 195, 76, 140, 243, 71, 243, 79, 124, 216, 208, 242, 171, 34, 245, 57, 154, ... 93, 76, 230, 204, 234, 82, 117, 248, 39, 13, 62, 60, 215, 8, 51, 248, ... 254, 47, 150, 36, 46, 27, 247, 98, 77, 56, 92, 44, 19, 39, 12, 77, ... 54, 101, 194, 126, 86, 0, 64, 239, 95, 211, 64, 26, 219, 93, 211, 36, ... 154, 250, 117, 177, 213, 232, 142, 184, 216, 92, 20, 248, 69, 175, 180, 71, ... 205, 221, 235, 224, 95, 113, 5, 33, 86, 18, 157, 61, 199, 8, 121, 0, ... 0, 135, 65, 67, 220, 164, 15, 230, 155, 71, 53, 64, 253, 209, 169, 255, ... 34, 64, 101, 7, 43, 102, 227, 83, 171, 52, 225, 119, 253, 182, 96, 195, ... 225, 34, 156, 211, 202, 7, 194, 255, 137, 59, 170, 172, 72, 234, 222, 203, ... 123, 249, 121, 254, 143, 173, 105, 65, 187, 189, 163, 64, 151, 145, 99, 17] >>> encrypted_key = ''.join(chr(byte) for byte in encrypted_key_bytes_list) >>> encryptor = encrypt_json_web_token(algorithm = 'RSA-OAEP', content_master_key = cmk, ... encrypted_key = encrypted_key, initialization_vector = iv, method = 'A256GCM', ... public_key_as_encoded_str = public_key_as_encoded_str) >>> jwe = check(encryptor)(jwt) >>> jwe 'eyJhbGciOiJSU0EtT0FFUCIsImVuYyI6IkEyNTZHQ00iLCJpdiI6IjQ4VjFfQUxiNlVTMDRVM2IifQ.jvwoyhWxOMboB5cxX6ncAi7Wp3Q5FKRtl\ mIx35pfR9HpEa6Oy-iEpxEqM30W3YcRQ8WU9ouRoO5jd6tfdcpX-2X-OteHw4dnMXdMLjHGGx86LMDeFRAN2KGz7EGPJivaw0yM80fzT3zY0PKrIvU5ml\ 1M5szqUnX4Jw0-PNcIM_j-L5YkLhv3Yk04XCwTJwxNNmXCflYAQO9f00Aa213TJJr6dbHV6I642FwU-EWvtEfN3evgX3EFIVYSnT3HCHkAAIdBQ9ykD-a\ bRzVA_dGp_yJAZQcrZuNTqzThd_22YMPhIpzTygfC_4k7qqxI6t7Le_l5_o-taUG7vaNAl5FjEQ._e21tGGhac_peEFkLXr2dMPUZiUkrw.YbZSeHCNDZ\ BqAdzpROlyiw' >>> key_private_exponent_bytes_list = [144, 183, 109, 34, 62, 134, 108, 57, 44, 252, 10, 66, 73, 54, 16, 181, ... 233, 92, 54, 219, 101, 42, 35, 178, 63, 51, 43, 92, 119, 136, 251, 41, ... 53, 23, 191, 164, 164, 60, 88, 227, 229, 152, 228, 213, 149, 228, 169, 237, ... 104, 71, 151, 75, 88, 252, 216, 77, 251, 231, 28, 97, 88, 193, 215, 202, ... 248, 216, 121, 195, 211, 245, 250, 112, 71, 243, 61, 129, 95, 39, 244, 122, ... 225, 217, 169, 211, 165, 48, 253, 220, 59, 122, 219, 42, 86, 223, 32, 236, ... 39, 48, 103, 78, 122, 216, 187, 88, 176, 89, 24, 1, 42, 177, 24, 99, ... 142, 170, 1, 146, 43, 3, 108, 64, 194, 121, 182, 95, 187, 134, 71, 88, ... 96, 134, 74, 131, 167, 69, 106, 143, 121, 27, 72, 44, 245, 95, 39, 194, ... 179, 175, 203, 122, 16, 112, 183, 17, 200, 202, 31, 17, 138, 156, 184, 210, ... 157, 184, 154, 131, 128, 110, 12, 85, 195, 122, 241, 79, 251, 229, 183, 117, ... 21, 123, 133, 142, 220, 153, 9, 59, 57, 105, 81, 255, 138, 77, 82, 54, ... 62, 216, 38, 249, 208, 17, 197, 49, 45, 19, 232, 157, 251, 131, 137, 175, ... 72, 126, 43, 229, 69, 179, 117, 82, 157, 213, 83, 35, 57, 210, 197, 252, ... 171, 143, 194, 11, 47, 163, 6, 253, 75, 252, 96, 11, 187, 84, 130, 210, ... 7, 121, 78, 91, 79, 57, 251, 138, 132, 220, 60, 224, 173, 56, 224, 201] >>> key_private_exponent = ''.join(chr(byte) for byte in key_private_exponent_bytes_list) >>> private_key = RSA.construct((number.bytes_to_long(key_modulus), ... number.bytes_to_long(key_public_exponent), number.bytes_to_long(key_private_exponent))) >>> private_key_as_encoded_str = private_key.exportKey() >>> decryptor = decrypt_json_web_token(private_key = private_key_as_encoded_str) >>> decrypted_jwt = check(decryptor)(jwe) >>> decrypted_jwt == jwt True >>> decoded_jwt = check(decode_json_web_token)(decrypted_jwt) >>> decoded_jwt['payload'] 'Live long and prosper.' >>> decoded_jwt['payload'] == plaintext True >>> # Mike Jones Test 3: RSA-OAEP/AES-GSM >>> jwe = 'eyJhbGciOiJSU0EtT0FFUCIsImVuYyI6IkEyNTZHQ00iLCJpdiI6Il9fNzlfUHY2LWZqMzl2WDAifQ.LX4rQictZUJSYrVH3-TmCeH\ 08qlpMrbFpKhEBVyAFX6h4V3Xt6omuTyMmkeiMIR6gbkn9ww4A0xDRnRu2GuM-v30Ri6OHx04fP1l1osYRngALuzMplcreyFCErm5asXTghweg_oI2y6f\ fhgR8P0ONWSlv0Bg-y58nAiNMi9Z51P42sx1f2JknAhG_oJ2u4Dy-6xY3tWCS4hXolMt5NMZm1dUN56RKvGlp-nspa9mYONfV-CITnjA3zMVWGdpTF4SX\ Nq0fK5gWM3CgxJEtuW1gk69q9k_zOAEqCph_urnx0T7_6ZaBa90nV3TvX0cC5bq75alEqGWkqJLwTMsEnAg-A.ykqmBHvu8Yhqf5jafmUKcgDY8kZ0vw.\ _3cYrFHM7X640lLd_QoUKw' >>> key_modulus_bytes_list = [152, 66, 27, 221, 242, 12, 205, 109, 3, 3, 73, 191, 7, 43, 214, 144, ... 254, 253, 173, 139, 211, 33, 139, 34, 95, 176, 106, 246, 5, 176, 94, 78, ... 150, 102, 87, 240, 113, 118, 40, 137, 1, 245, 201, 181, 133, 9, 175, 161, ... 119, 134, 19, 81, 38, 21, 100, 25, 214, 68, 37, 66, 189, 149, 75, 143, ... 148, 24, 249, 69, 38, 236, 119, 81, 118, 149, 244, 115, 242, 43, 3, 90, ... 107, 238, 42, 3, 9, 90, 173, 192, 72, 175, 165, 17, 77, 92, 175, 8, ... 22, 252, 201, 168, 30, 109, 167, 12, 23, 56, 114, 122, 217, 30, 241, 127, ... 233, 130, 119, 100, 190, 121, 77, 95, 106, 107, 109, 19, 5, 103, 110, 0, ... 208, 248, 166, 68, 213, 22, 203, 25, 50, 35, 207, 165, 188, 185, 62, 103, ... 164, 45, 172, 183, 49, 132, 72, 72, 159, 223, 180, 22, 157, 253, 197, 185, ... 77, 67, 236, 72, 99, 14, 155, 255, 100, 159, 208, 199, 50, 4, 232, 132, ... 147, 61, 84, 150, 56, 13, 109, 17, 6, 247, 76, 172, 122, 185, 142, 120, ... 207, 117, 90, 94, 96, 161, 216, 2, 221, 17, 89, 107, 229, 214, 154, 97, ... 2, 17, 14, 222, 116, 61, 249, 198, 194, 55, 187, 13, 243, 34, 151, 65, ... 197, 17, 145, 225, 124, 238, 155, 235, 84, 192, 107, 107, 118, 185, 67, 196, ... 4, 75, 15, 89, 140, 30, 169, 51, 94, 160, 20, 98, 153, 156, 216, 51] >>> key_modulus = ''.join(chr(byte) for byte in key_modulus_bytes_list) >>> key_public_exponent_bytes_list = [1, 0, 1] >>> key_public_exponent = ''.join(chr(byte) for byte in key_public_exponent_bytes_list) >>> key_private_exponent_bytes_list = [107, 210, 84, 253, 165, 77, 95, 164, 21, 0, 29, 23, 68, 50, 205, 45, ... 189, 5, 84, 2, 178, 175, 12, 98, 121, 52, 235, 105, 241, 185, 101, 239, ... 109, 30, 104, 164, 3, 21, 83, 187, 66, 66, 22, 103, 143, 32, 190, 217, ... 47, 85, 41, 20, 204, 77, 85, 167, 222, 78, 63, 188, 181, 152, 165, 251, ... 181, 58, 194, 59, 48, 71, 64, 111, 213, 244, 119, 58, 44, 130, 61, 75, ... 169, 38, 237, 101, 93, 24, 115, 246, 185, 2, 121, 120, 121, 58, 107, 80, ... 229, 70, 122, 95, 173, 188, 165, 17, 48, 216, 110, 105, 132, 156, 31, 21, ... 31, 253, 158, 35, 31, 167, 179, 29, 32, 181, 150, 118, 99, 219, 76, 207, ... 251, 137, 174, 83, 77, 177, 19, 244, 49, 154, 248, 255, 112, 107, 231, 233, ... 96, 24, 96, 218, 77, 28, 47, 208, 75, 221, 69, 210, 226, 175, 61, 65, ... 106, 13, 8, 127, 96, 188, 205, 210, 251, 101, 176, 46, 22, 245, 249, 13, ... 174, 22, 109, 117, 255, 141, 126, 39, 90, 231, 44, 51, 49, 54, 95, 99, ... 149, 61, 238, 4, 17, 48, 239, 76, 198, 16, 193, 252, 160, 213, 155, 98, ... 51, 21, 155, 203, 163, 238, 112, 23, 29, 231, 76, 141, 93, 115, 91, 83, ... 103, 66, 110, 227, 188, 231, 105, 78, 23, 172, 126, 196, 130, 181, 226, 214, ... 178, 46, 56, 1, 181, 180, 154, 182, 80, 186, 154, 158, 79, 15, 177, 65] >>> key_private_exponent = ''.join(chr(byte) for byte in key_private_exponent_bytes_list) >>> private_key = RSA.construct((number.bytes_to_long(key_modulus), ... number.bytes_to_long(key_public_exponent), number.bytes_to_long(key_private_exponent))) >>> private_key_as_encoded_str = private_key.exportKey() >>> decryptor = decrypt_json_web_token(private_key = private_key_as_encoded_str) >>> decrypted_jwt = check(decryptor)(jwe) >>> decoded_jwt = check(decode_json_web_token)(decrypted_jwt) >>> decoded_jwt['payload'] 'What hath God wrought?' """ if shared_secret is not None: assert isinstance(shared_secret, str) # Shared secret must not be unicode. def decrypt_json_web_token_converter(token, state = None): if token is None: return None, None if state is None: state = states.default_state if token.count('.') != 3: if require_encrypted_token: return token, state._(u'Invalid crypted JSON web token') return token, None encoded_header, encoded_encrypted_key, encoded_cyphertext, encoded_integrity_value = token.split('.') header, error = pipe( make_base64url_to_bytes(add_padding = True), make_input_to_json(), test_isinstance(dict), struct( dict( alg = pipe( test_isinstance(basestring), test_in(valid_encryption_algorithms), not_none, ), cty = pipe( test_isinstance(basestring), test_in([ u'JWT', # u'urn:ietf:params:oauth:token-type:jwt', ]), ), enc = pipe( test_isinstance(basestring), test_in(valid_encryption_methods), not_none, ), # epk = TODO to support ECDH-ES int = pipe( test_isinstance(basestring), test_in(valid_integrity_algorithms), ), iv = pipe( test_isinstance(basestring), make_base64url_to_bytes(add_padding = True), ), jku = pipe( test_isinstance(basestring), make_input_to_url(add_prefix = None, error_if_fragment = True, full = True, schemes = ['https']), ), jwk = pipe( test_isinstance(basestring), make_input_to_json(), json_to_json_web_key, ), kdf = pipe( test_isinstance(basestring), test_in(valid_key_derivation_functions), ), kid = test_isinstance(basestring), typ = pipe( test_isinstance(basestring), test_in([ u'JWE', ]), ), x5c = pipe( test_isinstance(list), uniform_sequence(pipe( test_isinstance(basestring), base64_to_bytes, # TODO )), ), x5t = pipe( test_isinstance(basestring), make_base64url_to_bytes(add_padding = True), # TODO ), x5u = pipe( test_isinstance(basestring), make_input_to_url(add_prefix = None, error_if_fragment = True, full = True, schemes = ['https']), ), zip = pipe( test_isinstance(basestring), test_in([ u'DEF', u'none', ]), ), ), # default = None, # For security reasons a header can only contain known attributes. ), not_none, )(encoded_header, state = state) if error is not None: return token, state._(u'Invalid header: {0}').format(error) encrypted_key, error = make_base64url_to_bytes(add_padding = True)(encoded_encrypted_key, state = state) if error is not None: return token, state._(u'Invalid encrypted key: {0}').format(error) cyphertext, error = make_base64url_to_bytes(add_padding = True)(encoded_cyphertext, state = state) if error is not None: return token, state._(u'Invalid cyphertext: {0}').format(error) integrity_value, error = make_base64url_to_bytes(add_padding = True)(encoded_integrity_value, state = state) if error is not None: return token, state._(u'Invalid integrity value: {0}').format(error) # TODO: Verify that the JWE Header references a key known to the recipient. algorithm = header['alg'] if algorithm == u'RSA1_5': assert private_key is not None rsa_private_key = RSA.importKey(private_key) cipher = Cipher_PKCS1_v1_5.new(rsa_private_key) # Build a sentinel that has the same size of the plaintext (ie the content master key). sentinel = Random.get_random_bytes(256 >> 3) try: content_master_key = cipher.decrypt(encrypted_key, sentinel) except: return token, state._(u'Invalid content master key') elif algorithm == u'RSA-OAEP': assert private_key is not None rsa_private_key = RSA.importKey(private_key) cipher = Cipher_PKCS1_OAEP.new(rsa_private_key) try: content_master_key = cipher.decrypt(encrypted_key) except: return token, state._(u'Invalid content master key') method = header['enc'] if method.endswith('GCM'): # Algorithm is an AEAD algorithm. if header['int'] is not None: return token, state._( u'Unexpected "int" header forbidden by AEAD algorithm {0}').format(algorithm) content_encryption_key = content_master_key else: # Algorithm is not an AEAD algorithm. if header['int'] is None: return token, state._( u'Missing "int" header, required by non AEAD algorithm {0}').format(algorithm) method_size = int(method[1:4]) key_derivation_digest_size = int((header['kdf'] or u'CS256')[2:]) content_encryption_key = derive_key(content_master_key, 'Encryption', digest_size = key_derivation_digest_size, key_size = method_size) integrity = header['int'] integrity_size = int(integrity[2:]) content_integrity_key = derive_key(content_master_key, 'Integrity', digest_size = key_derivation_digest_size, key_size = integrity_size) secured_input = token.rsplit('.', 1)[0] digest_constructor = digest_constructor_by_size[integrity_size] signature = HMAC.new(content_integrity_key, msg = secured_input, digestmod = digest_constructor).digest() encoded_signature = check(make_bytes_to_base64url(remove_padding = True))(signature, state = state) if encoded_integrity_value != encoded_signature: return token, state._(u'Non authentic signature') if method.startswith(u'A') and method.endswith(u'CBC'): if header['iv'] is None: return token, state._( u'Invalid header: "iv" required for {0} encryption method').format(method) cipher = Cipher_AES.new(content_encryption_key, mode = Cipher_AES.MODE_CBC, IV = header['iv']) try: compressed_plaintext = cipher.decrypt(cyphertext) except: return token, state._(u'Invalid cyphertext') # Remove PKCS #5 padding. padding_number = ord(compressed_plaintext[-1]) compressed_plaintext = compressed_plaintext[:-padding_number] elif method.startswith(u'A') and method.endswith(u'GCM'): if header['iv'] is None: return token, state._( u'Invalid header: "iv" required for {0} encryption method').format(method) additional_authenticated_data = '{0}.{1}'.format(encoded_header, encoded_encrypted_key) try: compressed_plaintext = gcm.gcm_decrypt(content_encryption_key, header['iv'], cyphertext, additional_authenticated_data, integrity_value) except: return token, state._(u'Invalid cyphertext') else: raise 'TODO' compression = header['zip'] if compression == u'DEF': try: plaintext = zlib.decompress(compressed_plaintext) except zlib.error: return token, state._(u'Invalid compressed plaintext') else: assert compression in (None, u'none'), compression plaintext = compressed_plaintext if header['cty'] == u'JWT': # Token was a nested token and plaintext is also a token. return plaintext, None # Create a new (unencrypted and unsigned) token containing plaintext. header = dict( alg = u'none', ) encoded_header = check(pipe( make_json_to_str(encoding = 'utf-8', ensure_ascii = False, separators = (',', ':'), sort_keys = True), make_bytes_to_base64url(remove_padding = True), ))(header) encoded_payload = check(make_bytes_to_base64url(remove_padding = True))(plaintext, state = state) return '{0}.{1}.'.format(encoded_header, encoded_payload), None return decrypt_json_web_token_converter
[docs]def derive_key(master_key, label, digest_size = None, key_size = None): """Concatenation Key Derivation Function This is a simplified version of the algorithm described in section "5.8.1 Concatenation Key Derivation Function (Approved Alternative 1)" of "Recommendation for Pair-Wise Key Establishment Schemes Using Discrete Logarithm Cryptography" (NIST SP 800-56 A). http://csrc.nist.gov/publications/nistpubs/800-56A/SP800-56A_Revision1_Mar08-2007.pdf .. note:: ``key_size`` & ``digest_size`` are the length in bits (not bytes). >>> # Mike Jones Tests >>> cmk1_bytes_list = [4, 211, 31, 197, 84, 157, 252, 254, 11, 100, 157, 250, 63, 170, 106, 206, ... 107, 124, 212, 45, 111, 107, 9, 219, 200, 177, 0, 240, 143, 156, 44, 207] >>> cmk1 = ''.join(chr(byte) for byte in cmk1_bytes_list) >>> cek1 = derive_key(cmk1, 'Encryption', key_size = 256) >>> cek1_bytes_list = [ord(c) for c in cek1] >>> cek1_bytes_list [249, 255, 87, 218, 224, 223, 221, 53, 204, 121, 166, 130, 195, 184, 50, 69, \ 11, 237, 202, 71, 10, 96, 59, 199, 140, 88, 126, 147, 146, 113, 222, 41] >>> cik1 = derive_key(cmk1, 'Integrity', key_size = 256) >>> cik1_bytes_list = [ord(c) for c in cik1] >>> cik1_bytes_list [218, 209, 130, 50, 169, 45, 70, 214, 29, 187, 123, 20, 3, 158, 111, 122, \ 182, 94, 57, 133, 245, 76, 97, 44, 193, 80, 81, 246, 115, 177, 225, 159] >>> cmk2_bytes_list = [148, 116, 199, 126, 2, 117, 233, 76, 150, 149, 89, 193, 61, 34, 239, 226, ... 109, 71, 59, 160, 192, 140, 150, 235, 106, 204, 49, 176, 68, 119, 13, 34, ... 49, 19, 41, 69, 5, 20, 252, 145, 104, 129, 137, 138, 67, 23, 153, 83, ... 81, 234, 82, 247, 48, 211, 41, 130, 35, 124, 45, 156, 249, 7, 225, 168] >>> cmk2 = ''.join(chr(byte) for byte in cmk2_bytes_list) >>> cek2 = derive_key(cmk2, 'Encryption', key_size = 128) >>> cek2_bytes_list = [ord(c) for c in cek2] >>> cek2_bytes_list [137, 5, 92, 9, 17, 47, 17, 86, 253, 235, 34, 247, 121, 78, 11, 144] >>> cik2 = derive_key(cmk2, 'Integrity', key_size = 512) >>> cik2_bytes_list = [ord(c) for c in cik2] >>> cik2_bytes_list [11, 179, 132, 177, 171, 24, 126, 19, 113, 1, 200, 102, 100, 74, 88, 149, \ 31, 41, 71, 57, 51, 179, 106, 242, 113, 211, 56, 56, 37, 198, 57, 17, \ 149, 209, 221, 113, 40, 191, 95, 252, 142, 254, 141, 230, 39, 113, 139, 84, \ 44, 156, 247, 47, 223, 101, 229, 180, 82, 231, 38, 96, 170, 119, 236, 81] """ assert isinstance(master_key, str) assert isinstance(label, str) if digest_size is None: digest_size = 256 digest_constructor = digest_constructor_by_size[digest_size] if key_size is None: key_size = 256 hashes = [] block_count, remaining_length = divmod(key_size >> 3, digest_size >> 3) for index in range(block_count): hash_object = digest_constructor.new(pack('>I', index + 1)) hash_object.update(master_key) hash_object.update(label) hashes.append(hash_object.digest()) if remaining_length != 0: # Generated key length is not a multiple of digest length. hash_object = digest_constructor.new(pack('>I', len(hashes) + 1)) hash_object.update(master_key) hash_object.update(label) hashes.append(hash_object.digest()[:remaining_length]) return ''.join(hashes)
[docs]def encrypt_json_web_token(algorithm = None, compression = None, content_master_key = None, encrypted_key = None, integrity = None, initialization_vector = None, json_web_key_url = None, key_derivation_function = None, key_id = None, method = None, public_key_as_encoded_str = None, public_key_as_json_web_key = None, shared_secret = None): """Return a converter that encrypts a JSON Web Token. .. note:: ``content_master_key``, ``encrypted_key`` & ``initialization_vector`` parameters should be kept to ``None``, except for testing. >>> # Mike Jones Test 1 >>> from Crypto.PublicKey import RSA >>> from Crypto.Util import number >>> plaintext_bytes_list = [78, 111, 119, 32, 105, 115, 32, 116, 104, 101, 32, 116, 105, 109, 101, 32, ... 102, 111, 114, 32, 97, 108, 108, 32, 103, 111, 111, 100, 32, 109, 101, 110, ... 32, 116, 111, 32, 99, 111, 109, 101, 32, 116, 111, 32, 116, 104, 101, 32, ... 97, 105, 100, 32, 111, 102, 32, 116, 104, 101, 105, 114, 32, 99, 111, 117, ... 110, 116, 114, 121, 46] >>> plaintext = ''.join(chr(byte) for byte in plaintext_bytes_list) >>> jwt = check(make_payload_to_json_web_token())(plaintext) >>> jwt 'eyJhbGciOiJub25lIn0.Tm93IGlzIHRoZSB0aW1lIGZvciBhbGwgZ29vZCBtZW4gdG8gY29tZSB0byB0aGUgYWlkIG9mIHRoZWlyIGNvdW50cnku.' >>> cmk_bytes_list = [4, 211, 31, 197, 84, 157, 252, 254, 11, 100, 157, 250, 63, 170, 106, 206, ... 107, 124, 212, 45, 111, 107, 9, 219, 200, 177, 0, 240, 143, 156, 44, 207] >>> cmk = ''.join(chr(byte) for byte in cmk_bytes_list) >>> iv_bytes_list = [3, 22, 60, 12, 43, 67, 104, 105, 108, 108, 105, 99, 111, 116, 104, 101] >>> iv = ''.join(chr(byte) for byte in iv_bytes_list) >>> key_modulus_bytes_list = [177, 119, 33, 13, 164, 30, 108, 121, 207, 136, 107, 242, 12, 224, 19, 226, ... 198, 134, 17, 71, 173, 75, 42, 61, 48, 162, 206, 161, 97, 108, 185, 234, ... 226, 219, 118, 206, 118, 5, 169, 224, 60, 181, 90, 85, 51, 123, 6, 224, ... 4, 122, 29, 230, 151, 12, 244, 127, 121, 25, 4, 85, 220, 144, 215, 110, ... 130, 17, 68, 228, 129, 138, 7, 130, 231, 40, 212, 214, 17, 179, 28, 124, ... 151, 178, 207, 20, 14, 154, 222, 113, 176, 24, 198, 73, 211, 113, 9, 33, ... 178, 80, 13, 25, 21, 25, 153, 212, 206, 67, 154, 147, 70, 194, 192, 183, ... 160, 83, 98, 236, 175, 85, 23, 97, 75, 199, 177, 73, 145, 50, 253, 206, ... 32, 179, 254, 236, 190, 82, 73, 67, 129, 253, 252, 220, 108, 136, 138, 11, ... 192, 1, 36, 239, 228, 55, 81, 113, 17, 25, 140, 63, 239, 146, 3, 172, ... 96, 60, 227, 233, 64, 255, 224, 173, 225, 228, 229, 92, 112, 72, 99, 97, ... 26, 87, 187, 123, 46, 50, 90, 202, 117, 73, 10, 153, 47, 224, 178, 163, ... 77, 48, 46, 154, 33, 148, 34, 228, 33, 172, 216, 89, 46, 225, 127, 68, ... 146, 234, 30, 147, 54, 146, 5, 133, 45, 78, 254, 85, 55, 75, 213, 86, ... 194, 218, 215, 163, 189, 194, 54, 6, 83, 36, 18, 153, 53, 7, 48, 89, ... 35, 66, 144, 7, 65, 154, 13, 97, 75, 55, 230, 132, 3, 13, 239, 71] >>> key_modulus = ''.join(chr(byte) for byte in key_modulus_bytes_list) >>> key_public_exponent_bytes_list = [1, 0, 1] >>> key_public_exponent = ''.join(chr(byte) for byte in key_public_exponent_bytes_list) >>> public_key = RSA.construct((number.bytes_to_long(key_modulus), ... number.bytes_to_long(key_public_exponent))) >>> public_key_as_encoded_str = public_key.exportKey() >>> print public_key_as_encoded_str -----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsXchDaQebHnPiGvyDOAT 4saGEUetSyo9MKLOoWFsueri23bOdgWp4Dy1WlUzewbgBHod5pcM9H95GQRV3JDX boIRROSBigeC5yjU1hGzHHyXss8UDprecbAYxknTcQkhslANGRUZmdTOQ5qTRsLA t6BTYuyvVRdhS8exSZEy/c4gs/7svlJJQ4H9/NxsiIoLwAEk7+Q3UXERGYw/75ID rGA84+lA/+Ct4eTlXHBIY2EaV7t7LjJaynVJCpkv4LKjTTAumiGUIuQhrNhZLuF/ RJLqHpM2kgWFLU7+VTdL1VbC2tejvcI2BlMkEpk1BzBZI0KQB0GaDWFLN+aEAw3v RwIDAQAB -----END PUBLIC KEY----- >>> encrypted_key_bytes_list = [32, 242, 63, 207, 94, 246, 133, 37, 135, 48, 88, 4, 15, 193, 6, 244, ... 51, 58, 132, 133, 212, 255, 163, 90, 59, 80, 200, 152, 41, 244, 188, 215, ... 174, 160, 26, 188, 227, 180, 165, 234, 172, 63, 24, 116, 152, 28, 149, 16, ... 94, 213, 201, 171, 180, 191, 11, 21, 149, 172, 143, 54, 194, 58, 206, 201, ... 164, 28, 107, 155, 75, 101, 22, 92, 227, 144, 95, 40, 119, 170, 7, 36, ... 225, 40, 141, 186, 213, 7, 175, 16, 174, 122, 75, 32, 48, 193, 119, 202, ... 41, 152, 210, 190, 68, 57, 119, 4, 197, 74, 7, 242, 239, 170, 204, 73, ... 75, 213, 202, 113, 216, 18, 23, 66, 106, 208, 69, 244, 117, 147, 2, 37, ... 207, 199, 184, 96, 102, 44, 70, 212, 87, 143, 253, 0, 166, 59, 41, 115, ... 217, 80, 165, 87, 38, 5, 9, 184, 202, 68, 67, 176, 4, 87, 254, 166, ... 227, 88, 124, 238, 249, 75, 114, 205, 148, 149, 45, 78, 193, 134, 64, 189, ... 168, 76, 170, 76, 176, 72, 148, 77, 215, 159, 146, 55, 189, 213, 85, 253, ... 135, 200, 59, 247, 79, 37, 22, 200, 32, 110, 53, 123, 54, 39, 9, 178, ... 231, 238, 95, 25, 211, 143, 87, 220, 88, 138, 209, 13, 227, 72, 58, 102, ... 164, 136, 241, 14, 14, 45, 32, 77, 44, 244, 162, 239, 150, 248, 181, 138, ... 251, 116, 245, 205, 137, 78, 34, 34, 10, 6, 59, 4, 197, 2, 153, 251] >>> encrypted_key = ''.join(chr(byte) for byte in encrypted_key_bytes_list) >>> encryptor = encrypt_json_web_token(algorithm = 'RSA1_5', content_master_key = cmk, ... encrypted_key = encrypted_key, initialization_vector = iv, integrity = 'HS256', method = 'A128CBC', ... public_key_as_encoded_str = public_key_as_encoded_str) >>> check(encryptor)(jwt) 'eyJhbGciOiJSU0ExXzUiLCJlbmMiOiJBMTI4Q0JDIiwiaW50IjoiSFMyNTYiLCJpdiI6IkF4WThEQ3REYUdsc2JHbGpiM1JvWlEifQ.IPI_z172h\ SWHMFgED8EG9DM6hIXU_6NaO1DImCn0vNeuoBq847Sl6qw_GHSYHJUQXtXJq7S_CxWVrI82wjrOyaQca5tLZRZc45BfKHeqByThKI261QevEK56SyAwwX\ fKKZjSvkQ5dwTFSgfy76rMSUvVynHYEhdCatBF9HWTAiXPx7hgZixG1FeP_QCmOylz2VClVyYFCbjKREOwBFf-puNYfO75S3LNlJUtTsGGQL2oTKpMsEi\ UTdefkje91VX9h8g7908lFsggbjV7NicJsufuXxnTj1fcWIrRDeNIOmakiPEODi0gTSz0ou-W-LWK-3T1zYlOIiIKBjsExQKZ-w._Z_djlIoC4MDSCKir\ eWS2beti4Q6iSG2UjFujQvdz-_PQdUcFNkOulegD6BgjgdFLjeB4HHOO7UHvP8PEDu0a0sA2a_-CI0w2YQQ2QQe35M.c41k4T4eAgCCt63m8ZNmiOinMc\ iFFypOFpvid7i6D0k' >>> # Mike Jones Test 2 >>> plaintext_bytes_list = [76, 105, 118, 101, 32, 108, 111, 110, 103, 32, 97, 110, 100, 32, 112, 114, ... 111, 115, 112, 101, 114, 46] >>> plaintext = ''.join(chr(byte) for byte in plaintext_bytes_list) >>> jwt = check(make_payload_to_json_web_token())(plaintext) >>> cmk_bytes_list = [177, 161, 244, 128, 84, 143, 225, 115, 63, 180, 3, 255, 107, 154, 212, 246, ... 138, 7, 110, 91, 112, 46, 34, 105, 47, 130, 203, 46, 122, 234, 64, 252] >>> cmk = ''.join(chr(byte) for byte in cmk_bytes_list) >>> iv_bytes_list = [227, 197, 117, 252, 2, 219, 233, 68, 180, 225, 77, 219] >>> iv = ''.join(chr(byte) for byte in iv_bytes_list) >>> key_modulus_bytes_list = [161, 168, 84, 34, 133, 176, 208, 173, 46, 176, 163, 110, 57, 30, 135, 227, ... 9, 31, 226, 128, 84, 92, 116, 241, 70, 248, 27, 227, 193, 62, 5, 91, ... 241, 145, 224, 205, 141, 176, 184, 133, 239, 43, 81, 103, 9, 161, 153, 157, ... 179, 104, 123, 51, 189, 34, 152, 69, 97, 69, 78, 93, 140, 131, 87, 182, ... 169, 101, 92, 142, 3, 22, 167, 8, 212, 56, 35, 79, 210, 222, 192, 208, ... 252, 49, 109, 138, 173, 253, 210, 166, 201, 63, 102, 74, 5, 158, 41, 90, ... 144, 108, 160, 79, 10, 89, 222, 231, 172, 31, 227, 197, 0, 19, 72, 81, ... 138, 78, 136, 221, 121, 118, 196, 17, 146, 10, 244, 188, 72, 113, 55, 221, ... 162, 217, 171, 27, 57, 233, 210, 101, 236, 154, 199, 56, 138, 239, 101, 48, ... 198, 186, 202, 160, 76, 111, 234, 71, 57, 183, 5, 211, 171, 136, 126, 64, ... 40, 75, 58, 89, 244, 254, 107, 84, 103, 7, 236, 69, 163, 18, 180, 251, ... 58, 153, 46, 151, 174, 12, 103, 197, 181, 161, 162, 55, 250, 235, 123, 110, ... 17, 11, 158, 24, 47, 133, 8, 199, 235, 107, 126, 130, 246, 73, 195, 20, ... 108, 202, 176, 214, 187, 45, 146, 182, 118, 54, 32, 200, 61, 201, 71, 243, ... 1, 255, 131, 84, 37, 111, 211, 168, 228, 45, 192, 118, 27, 197, 235, 232, ... 36, 10, 230, 248, 190, 82, 182, 140, 35, 204, 108, 190, 253, 186, 186, 27] >>> key_modulus = ''.join(chr(byte) for byte in key_modulus_bytes_list) >>> key_public_exponent_bytes_list = [1, 0, 1] >>> key_public_exponent = ''.join(chr(byte) for byte in key_public_exponent_bytes_list) >>> public_key = RSA.construct((number.bytes_to_long(key_modulus), ... number.bytes_to_long(key_public_exponent))) >>> public_key_as_encoded_str = public_key.exportKey() >>> encrypted_key_bytes_list = [142, 252, 40, 202, 21, 177, 56, 198, 232, 7, 151, 49, 95, 169, 220, 2, ... 46, 214, 167, 116, 57, 20, 164, 109, 150, 98, 49, 223, 154, 95, 71, 209, ... 233, 17, 174, 142, 203, 232, 132, 167, 17, 42, 51, 125, 22, 221, 135, 17, ... 67, 197, 148, 246, 139, 145, 160, 238, 99, 119, 171, 95, 117, 202, 87, 251, ... 101, 254, 58, 215, 135, 195, 135, 103, 49, 119, 76, 46, 49, 198, 27, 31, ... 58, 44, 192, 222, 21, 16, 13, 216, 161, 179, 236, 65, 143, 38, 43, 218, ... 195, 76, 140, 243, 71, 243, 79, 124, 216, 208, 242, 171, 34, 245, 57, 154, ... 93, 76, 230, 204, 234, 82, 117, 248, 39, 13, 62, 60, 215, 8, 51, 248, ... 254, 47, 150, 36, 46, 27, 247, 98, 77, 56, 92, 44, 19, 39, 12, 77, ... 54, 101, 194, 126, 86, 0, 64, 239, 95, 211, 64, 26, 219, 93, 211, 36, ... 154, 250, 117, 177, 213, 232, 142, 184, 216, 92, 20, 248, 69, 175, 180, 71, ... 205, 221, 235, 224, 95, 113, 5, 33, 86, 18, 157, 61, 199, 8, 121, 0, ... 0, 135, 65, 67, 220, 164, 15, 230, 155, 71, 53, 64, 253, 209, 169, 255, ... 34, 64, 101, 7, 43, 102, 227, 83, 171, 52, 225, 119, 253, 182, 96, 195, ... 225, 34, 156, 211, 202, 7, 194, 255, 137, 59, 170, 172, 72, 234, 222, 203, ... 123, 249, 121, 254, 143, 173, 105, 65, 187, 189, 163, 64, 151, 145, 99, 17] >>> encrypted_key = ''.join(chr(byte) for byte in encrypted_key_bytes_list) >>> encryptor = encrypt_json_web_token(algorithm = 'RSA-OAEP', content_master_key = cmk, ... encrypted_key = encrypted_key, initialization_vector = iv, method = 'A256GCM', ... public_key_as_encoded_str = public_key_as_encoded_str) >>> check(encryptor)(jwt) 'eyJhbGciOiJSU0EtT0FFUCIsImVuYyI6IkEyNTZHQ00iLCJpdiI6IjQ4VjFfQUxiNlVTMDRVM2IifQ.jvwoyhWxOMboB5cxX6ncAi7Wp3Q5FKRtl\ mIx35pfR9HpEa6Oy-iEpxEqM30W3YcRQ8WU9ouRoO5jd6tfdcpX-2X-OteHw4dnMXdMLjHGGx86LMDeFRAN2KGz7EGPJivaw0yM80fzT3zY0PKrIvU5ml\ 1M5szqUnX4Jw0-PNcIM_j-L5YkLhv3Yk04XCwTJwxNNmXCflYAQO9f00Aa213TJJr6dbHV6I642FwU-EWvtEfN3evgX3EFIVYSnT3HCHkAAIdBQ9ykD-a\ bRzVA_dGp_yJAZQcrZuNTqzThd_22YMPhIpzTygfC_4k7qqxI6t7Le_l5_o-taUG7vaNAl5FjEQ._e21tGGhac_peEFkLXr2dMPUZiUkrw.YbZSeHCNDZ\ BqAdzpROlyiw' """ assert algorithm is None or algorithm in valid_encryption_algorithms, algorithm assert integrity is None or integrity in valid_integrity_algorithms, integrity assert key_derivation_function is None or key_derivation_function in valid_key_derivation_functions, \ key_derivation_function assert method is None or method in valid_encryption_methods, method if algorithm is not None: assert method is not None method_size = int(method[1:4]) encryption_key_length = method_size >> 3 # method_size is in bits, but length is in bytes. if method.endswith('GCM'): # AEAD encryption doesn't use a separate integrity algorithm integrity = None integrity_key_length = 0 else: assert integrity is not None, 'Encryption algorithm requires a separate integrity algorithm' integrity_size = int(integrity[2:]) integrity_key_length = integrity_size >> 3 # The content master key must be at least as long as the encryption & integrity keys. # TODO: Don't create a content master key, when key agreement is employed. if content_master_key is None: content_master_key = Random.get_random_bytes(max(encryption_key_length, integrity_key_length)) else: assert len(content_master_key) >= max(encryption_key_length, integrity_key_length) if encrypted_key is None: # Note: ``encrypted_key`` should be ``None`` except for testing. if algorithm.startswith(u'RSA'): if public_key_as_encoded_str is None: assert public_key_as_json_web_key is not None public_key_dict = public_key_as_json_web_key['jwk'][-1] # TODO assert public_key_dict['alg'] == u'RSA', public_key_as_json_web_key # TODO rsa_public_key = RSA.construct(( number.bytes_to_long(check(make_base64url_to_bytes(add_padding = True))( public_key_dict['mod'])), number.bytes_to_long(check(make_base64url_to_bytes(add_padding = True))( public_key_dict['exp'])), )) else: rsa_public_key = RSA.importKey(public_key_as_encoded_str) if algorithm == u'RSA1_5': cipher = Cipher_PKCS1_v1_5.new(rsa_public_key) else: assert algorithm == u'RSA-OAEP', algorithm cipher = Cipher_PKCS1_OAEP.new(rsa_public_key) encrypted_key = cipher.encrypt(content_master_key) else: raise 'TODO' encoded_encrypted_key = check(make_bytes_to_base64url(remove_padding = True))(encrypted_key) # Generate a random Initialization Vector (IV) (if required for the algorithm). if method in (u'A128CBC', u'A256CBC'): # All AES CBC ciphers use 128 bits (= 16 bytes) blocks if initialization_vector is None: initialization_vector = Random.get_random_bytes(16) else: assert len(initialization_vector) == 16 elif method in (u'A128GCM', u'A256GCM'): # All AES GCM ciphers use 96 bits (= 12 bytes) blocks if initialization_vector is None: initialization_vector = Random.get_random_bytes(12) else: assert len(initialization_vector) == 12 else: initialization_vector = None if method.endswith('GCM'): # Algorithm is an AEAD algorithm. content_encryption_key = content_master_key content_integrity_key = None assert key_derivation_function is None else: key_derivation_digest_size = int((key_derivation_function or u'CS256')[2:]) content_encryption_key = derive_key(content_master_key, 'Encryption', digest_size = key_derivation_digest_size, key_size = method_size) content_integrity_key = derive_key(content_master_key, 'Integrity', digest_size = key_derivation_digest_size, key_size = integrity_size) def encrypt_json_web_token_converter(token, state = None): if token is None: return None, None if algorithm is None: return token, None if state is None: state = states.default_state if '.' not in token: return token, state._(u'Missing header') encoded_header, token_without_header = token.split('.', 1) header, error = pipe( make_base64url_to_bytes(add_padding = True), make_input_to_json(), )(encoded_header, state = state) if error is not None: return token, state._(u'Invalid header: {0}').format(error) if header['alg'] == u'none': if '.' not in token_without_header: return token, state._(u'Missing signature') encoded_payload, encoded_integrity_value = token_without_header.split('.', 1) if encoded_integrity_value: return token, state._(u'Unexpected signature in plaintext token') plaintext, error = make_base64url_to_bytes(add_padding = True)(encoded_payload, state = state) if error is not None: return token, state._(u'Invalid encoded payload: {0}').format(error) else: # Token is already signed. Use nested encryption. header = dict( cty = u'JWT', typ = u'JWE', # optional ) plaintext = token if compression == u'DEF': compressed_plaintext = zlib.compress(plaintext, 9) else: assert compression in (None, u'none'), compression compressed_plaintext = plaintext header['alg'] = algorithm header['enc'] = method if integrity is not None: header['int'] = integrity # TODO ephemeral_public_key # header['epk'] = ephemeral_public_key if initialization_vector is not None: header['iv'] = check(make_bytes_to_base64url(remove_padding = True))(initialization_vector, state = state) # TODO header['jku'] # TODO header['jwk'] # TODO header['kid'] # TODO header['x5c'] # TODO header['x5t'] # TODO header['x5u'] if compression not in (None, 'none'): header['zip'] = compression encoded_header = check(pipe( make_json_to_str(encoding = 'utf-8', ensure_ascii = False, separators = (',', ':'), sort_keys = True), make_bytes_to_base64url(remove_padding = True), ))(header, state = state) if method.startswith(u'A') and method.endswith(u'CBC'): # Add PKCS #5 padding. padding_number = 16 - len(compressed_plaintext) % 16 compressed_plaintext += chr(padding_number) * padding_number cipher = Cipher_AES.new(content_encryption_key, mode = Cipher_AES.MODE_CBC, IV = initialization_vector) cyphertext = cipher.encrypt(compressed_plaintext) integrity_value = None elif method.startswith(u'A') and method.endswith(u'GCM'): additional_authenticated_data = '{0}.{1}'.format(encoded_header, encoded_encrypted_key) cyphertext, integrity_value = gcm.gcm_encrypt(content_encryption_key, initialization_vector, compressed_plaintext, additional_authenticated_data) else: raise 'TODO' encoded_cyphertext = check(make_bytes_to_base64url(remove_padding = True))(cyphertext, state = state) secured_input = '{0}.{1}.{2}'.format(encoded_header, encoded_encrypted_key, encoded_cyphertext) if integrity is None: assert integrity_value is not None else: assert integrity_value is None digest_constructor = digest_constructor_by_size[integrity_size] integrity_value = HMAC.new(content_integrity_key, msg = secured_input, digestmod = digest_constructor).digest() encoded_integrity_value = check(make_bytes_to_base64url(remove_padding = True))(integrity_value, state = state) token = '{0}.{1}'.format(secured_input, encoded_integrity_value) return token, None return encrypt_json_web_token_converter
input_to_json_web_token = cleanup_line
[docs]def make_json_to_json_web_token(typ = None): """Return a converter that wraps JSON data into an unsigned and unencrypted JSON web token.""" return pipe( make_json_to_str(encoding = 'utf-8', ensure_ascii = False, separators = (',', ':'), sort_keys = True), make_payload_to_json_web_token(typ = typ), )
[docs]def make_payload_to_json_web_token(typ = None): """Return a converter that wraps binary data into an unsigned and unencrypted JSON web token.""" header = dict( alg = u'none', ) if typ is not None: header['typ'] = typ encoded_header = check(pipe( make_json_to_str(encoding = 'utf-8', ensure_ascii = False, separators = (',', ':'), sort_keys = True), make_bytes_to_base64url(remove_padding = True), ))(header) def payload_to_json_web_token(payload, state = None): if payload is None: return None, None if state is None: state = states.default_state encoded_payload, error = make_bytes_to_base64url(remove_padding = True)(payload, state = state) if error is not None: return encoded_payload, error secured_input = '{0}.{1}'.format(encoded_header, encoded_payload) encoded_signature = '' token = '{0}.{1}'.format(secured_input, encoded_signature) return token, None return payload_to_json_web_token
[docs]def sign_json_web_token(algorithm = None, json_web_key_url = None, key_id = None, private_key = None, shared_secret = None): if algorithm is None: algorithm = u'none' assert algorithm == u'none' or algorithm in valid_signature_algorithms if algorithm != u'none': algorithm_prefix = algorithm[:2] algorithm_size = int(algorithm[2:]) digest_constructor = digest_constructor_by_size[algorithm_size] # if algorithm_prefix == u'ES': # TODO # elif algorithm_prefix == u'HS': if algorithm_prefix == u'HS': assert shared_secret is not None assert isinstance(shared_secret, str) else: assert algorithm_prefix == u'RS' assert private_key is not None rsa_private_key = RSA.importKey(private_key) signer = Signature_PKCS1_v1_5.new(rsa_private_key) def sign_json_web_token_converter(token, state = None): if token is None: return None, None if algorithm == u'none': return token, None if state is None: state = states.default_state if '.' not in token: return token, state._(u'Missing header') encoded_header, token_without_header = token.split('.', 1) header, error = pipe( make_base64url_to_bytes(add_padding = True), make_input_to_json(), )(encoded_header, state = state) if error is not None: return token, state._(u'Invalid header: {0}').format(error) if header['alg'] == u'none': if '.' not in token_without_header: return token, state._(u'Missing signature') encoded_payload, encoded_signature = token_without_header.split('.', 1) if encoded_signature: return token, state._(u'Unexpected signature in plaintext token') else: # Token is already signed or encrypted. Use nested signing. header = dict( cty = u'JWT', typ = u'JWS', # optional ) encoded_payload = check(make_bytes_to_base64url(remove_padding = True))(token, state = state) header['alg'] = algorithm if algorithm_prefix == u'RS': if json_web_key_url is not None: header['jku'] = json_web_key_url if key_id is not None: header['kid'] = key_id encoded_header = check(pipe( make_json_to_str(encoding = 'utf-8', ensure_ascii = False, separators = (',', ':'), sort_keys = True), make_bytes_to_base64url(remove_padding = True), ))(header) secured_input = '{0}.{1}'.format(encoded_header, encoded_payload) # if algorithm_prefix == u'ES': # TODO # elif algorithm_prefix == u'HS': if algorithm_prefix == u'HS': signature = HMAC.new(shared_secret, msg = secured_input, digestmod = digest_constructor).digest() else: assert algorithm_prefix == u'RS' digest = digest_constructor.new(secured_input) signature = signer.sign(digest) encoded_signature = check(make_bytes_to_base64url(remove_padding = True))(signature, state = state) token = '{0}.{1}'.format(secured_input, encoded_signature) return token, None return sign_json_web_token_converter
[docs]def verify_decoded_json_web_token_signature(allowed_algorithms = None, public_key_as_encoded_str = None, public_key_as_json_web_key = None, shared_secret = None): if shared_secret is not None: assert isinstance(shared_secret, str) # Shared secret must not be unicode. def verify_decoded_json_web_token_signature_converter(value, state = None): if value is None: return None, None if state is None: state = states.default_state errors = {} algorithm = value['header'].get('alg') if allowed_algorithms is not None and algorithm not in allowed_algorithms: errors['header'] = dict(alg = state._( u'Unauthorized digital signature algorithm')) elif algorithm in valid_signature_algorithms: algorithm_prefix = algorithm[:2] algorithm_size = int(algorithm[2:]) digest_constructor = digest_constructor_by_size[algorithm_size] # if algorithm_prefix == u'ES': # TODO # elif algorithm_prefix == u'HS': if algorithm_prefix == u'HS': if shared_secret is None: errors['signature'] = state._( u'Unable to check signature: Missing shared secret') else: verified = HMAC.new(shared_secret, msg = value['secured_input'], digestmod = digest_constructor).digest() == value['signature'] else: assert algorithm_prefix == u'RS' if public_key_as_encoded_str is None: assert public_key_as_json_web_key is not None public_key_dict = public_key_as_json_web_key['jwk'][-1] # TODO assert public_key_dict['alg'] == u'RSA', public_key_as_json_web_key # TODO rsa_public_key = RSA.construct(( number.bytes_to_long(check(make_base64url_to_bytes(add_padding = True))( public_key_dict['mod'], state = state)), number.bytes_to_long(check(make_base64url_to_bytes(add_padding = True))( public_key_dict['exp'], state = state)), )) else: rsa_public_key = RSA.importKey(public_key_as_encoded_str) verifier = Signature_PKCS1_v1_5.new(rsa_public_key) try: digest = digest_constructor.new(value['secured_input']) verified = verifier.verify(digest, value['signature']) except: errors['signature'] = state._(u'Invalid signature') if 'signature' not in errors and not verified: errors['signature'] = state._(u'Non authentic signature') elif algorithm != u'none': errors['header'] = dict(alg = state._( u'Unimplemented digital signature algorithm')) return value, errors or None return verify_decoded_json_web_token_signature_converter
[docs]def verify_decoded_json_web_token_time(): now_timestamp = calendar.timegm(datetime.datetime.utcnow().timetuple()) return struct( dict( claims = struct( dict( exp = test( lambda timestamp: now_timestamp - 300 < timestamp, # Allow 5 minutes drift. error = N_(u'Expired JSON web token'), ), iat = test_less_or_equal(now_timestamp + 300, # Allow 5 minutes drift. error = N_(u'JSON web token issued in the future'), ), nbf = test( lambda timestamp: now_timestamp + 300 >= timestamp, # Allow 5 minutes drift. error = N_(u'JSON web token not yet valid'), ), ), default = noop, ), ), default = noop, )