From 150490dba4012a7c7e26438f4ee5129ae6c09210 Mon Sep 17 00:00:00 2001 From: L2501 Date: Sun, 26 Nov 2023 12:06:08 +0000 Subject: [PATCH] [script.module.pyjwt] 2.8.0 --- script.module.pyjwt/AUTHORS | 29 - script.module.pyjwt/CHANGELOG.md | 262 ------ script.module.pyjwt/LICENSE | 21 - script.module.pyjwt/LICENSE.txt | 2 +- script.module.pyjwt/README.rst | 81 -- script.module.pyjwt/addon.xml | 31 +- script.module.pyjwt/lib/pyjwt/__init__.py | 93 +- script.module.pyjwt/lib/pyjwt/__main__.py | 168 ---- script.module.pyjwt/lib/pyjwt/algorithms.py | 825 ++++++++++++++---- script.module.pyjwt/lib/pyjwt/api_jwk.py | 132 +++ script.module.pyjwt/lib/pyjwt/api_jws.py | 340 +++++--- script.module.pyjwt/lib/pyjwt/api_jwt.py | 436 ++++++--- script.module.pyjwt/lib/pyjwt/compat.py | 68 -- .../lib/pyjwt/contrib/__init__.py | 0 .../lib/pyjwt/contrib/algorithms/__init__.py | 0 .../lib/pyjwt/contrib/algorithms/py_ecdsa.py | 60 -- .../lib/pyjwt/contrib/algorithms/pycrypto.py | 46 - script.module.pyjwt/lib/pyjwt/exceptions.py | 25 +- script.module.pyjwt/lib/pyjwt/help.py | 43 +- .../lib/pyjwt/jwk_set_cache.py | 31 + script.module.pyjwt/lib/pyjwt/jwks_client.py | 124 +++ script.module.pyjwt/lib/pyjwt/types.py | 5 + script.module.pyjwt/lib/pyjwt/utils.py | 157 ++-- script.module.pyjwt/lib/pyjwt/warnings.py | 2 + script.module.pyjwt/{ => resources}/icon.png | Bin 25 files changed, 1666 insertions(+), 1315 deletions(-) delete mode 100644 script.module.pyjwt/AUTHORS delete mode 100644 script.module.pyjwt/CHANGELOG.md delete mode 100644 script.module.pyjwt/LICENSE delete mode 100644 script.module.pyjwt/README.rst delete mode 100644 script.module.pyjwt/lib/pyjwt/__main__.py create mode 100644 script.module.pyjwt/lib/pyjwt/api_jwk.py delete mode 100644 script.module.pyjwt/lib/pyjwt/compat.py delete mode 100644 script.module.pyjwt/lib/pyjwt/contrib/__init__.py delete mode 100644 script.module.pyjwt/lib/pyjwt/contrib/algorithms/__init__.py delete mode 100644 script.module.pyjwt/lib/pyjwt/contrib/algorithms/py_ecdsa.py delete mode 100644 script.module.pyjwt/lib/pyjwt/contrib/algorithms/pycrypto.py create mode 100644 script.module.pyjwt/lib/pyjwt/jwk_set_cache.py create mode 100644 script.module.pyjwt/lib/pyjwt/jwks_client.py create mode 100644 script.module.pyjwt/lib/pyjwt/types.py create mode 100644 script.module.pyjwt/lib/pyjwt/warnings.py rename script.module.pyjwt/{ => resources}/icon.png (100%) diff --git a/script.module.pyjwt/AUTHORS b/script.module.pyjwt/AUTHORS deleted file mode 100644 index 90c7fa4d8..000000000 --- a/script.module.pyjwt/AUTHORS +++ /dev/null @@ -1,29 +0,0 @@ -PyJWT lead developer ---------------------- - - - jpadilla - - -Original author ------------------- - -- progrium - - -Patches and Suggestions ------------------------ - - - Boris Feld - - - Åsmund Ødegård - Adding support for RSA-SHA256 privat/public signature. - - - Mark Adams - - - Wouter Bolsterlee - - - Michael Davis - - - Vinod Gupta - - - Derek Weitzel diff --git a/script.module.pyjwt/CHANGELOG.md b/script.module.pyjwt/CHANGELOG.md deleted file mode 100644 index 747267b0b..000000000 --- a/script.module.pyjwt/CHANGELOG.md +++ /dev/null @@ -1,262 +0,0 @@ -Change Log -========================================================================= - -All notable changes to this project will be documented in this file. -This project adheres to [Semantic Versioning](http://semver.org/). - -[Unreleased][unreleased] -------------------------------------------------------------------------- -### Changed - -### Fixed - -### Added - -[v1.6.4][1.6.4] -------------------------------------------------------------------------- -### Fixed - -- Reverse an unintentional breaking API change to .decode() [#352][352] - -[v1.6.3][1.6.3] -------------------------------------------------------------------------- -### Changed - -- All exceptions inherit from PyJWTError [#340][340] - -### Added - -- Add type hints [#344][344] -- Add help module [7ca41e][7ca41e] - -### Docs - -- Added section to usage docs for jwt.get_unverified_header() [#350][350] -- Update legacy instructions for using pycrypto [#337][337] - -[v1.6.1][1.6.1] -------------------------------------------------------------------------- -### Fixed - -- Audience parameter throws `InvalidAudienceError` when application does not specify an audience, but the token does. [#336][336] - -[v1.6.0][1.6.0] -------------------------------------------------------------------------- -### Changed - -- Dropped support for python 2.6 and 3.3 [#301][301] -- An invalid signature now raises an `InvalidSignatureError` instead of `DecodeError` [#316][316] - -### Fixed - -- Fix over-eager fallback to stdin [#304][304] - -### Added - -- Audience parameter now supports iterables [#306][306] - -[v1.5.3][1.5.3] -------------------------------------------------------------------------- -### Changed - -- Increase required version of the cryptography package to >=1.4.0. - -### Fixed - -- Remove uses of deprecated functions from the cryptography package. -- Warn about missing `algorithms` param to `decode()` only when `verify` param is `True` [#281][281] - -[v1.5.2][1.5.2] -------------------------------------------------------------------------- -### Fixed - -- Ensure correct arguments order in decode super call [7c1e61d][7c1e61d] - -[v1.5.1][1.5.1] -------------------------------------------------------------------------- -### Changed - -- Change optparse for argparse. [#238][238] - -### Fixed - -- Guard against PKCS1 PEM encododed public keys [#277][277] -- Add deprecation warning when decoding without specifying `algorithms` [#277][277] -- Improve deprecation messages [#270][270] -- PyJWT.decode: move verify param into options [#271][271] - -### Added - -- Support for Python 3.6 [#262][262] -- Expose jwt.InvalidAlgorithmError [#264][264] - -[v1.5.0][1.5.0] -------------------------------------------------------------------------- -### Changed -- Add support for ECDSA public keys in RFC 4253 (OpenSSH) format [#244][244] -- Renamed commandline script `jwt` to `jwt-cli` to avoid issues with the script clobbering the `jwt` module in some circumstances. [#187][187] -- Better error messages when using an algorithm that requires the cryptography package, but it isn't available [#230][230] -- Tokens with future 'iat' values are no longer rejected [#190][190] -- Non-numeric 'iat' values now raise InvalidIssuedAtError instead of DecodeError -- Remove rejection of future 'iat' claims [#252][252] - -### Fixed -- Add back 'ES512' for backward compatibility (for now) [#225][225] -- Fix incorrectly named ECDSA algorithm [#219][219] -- Fix rpm build [#196][196] - -### Added -- Add JWK support for HMAC and RSA keys [#202][202] - -[v1.4.2][1.4.2] -------------------------------------------------------------------------- -### Fixed -- A PEM-formatted key encoded as bytes could cause a `TypeError` to be raised [#213][213] - -[v1.4.1][1.4.1] -------------------------------------------------------------------------- -### Fixed -- Newer versions of Pytest could not detect warnings properly [#182][182] -- Non-string 'kid' value now raises `InvalidTokenError` [#174][174] -- `jwt.decode(None)` now gracefully fails with `InvalidTokenError` [#183][183] - -[v1.4][1.4.0] -------------------------------------------------------------------------- -### Fixed -- Exclude Python cache files from PyPI releases. - -### Added -- Added new options to require certain claims - (require_nbf, require_iat, require_exp) and raise `MissingRequiredClaimError` - if they are not present. -- If `audience=` or `issuer=` is specified but the claim is not present, - `MissingRequiredClaimError` is now raised instead of `InvalidAudienceError` - and `InvalidIssuerError` - -[v1.3][1.3.0] -------------------------------------------------------------------------- -### Fixed -- ECDSA (ES256, ES384, ES512) signatures are now being properly serialized [#158][158] -- RSA-PSS (PS256, PS384, PS512) signatures now use the proper salt length for PSS padding. [#163][163] - -### Added -- Added a new `jwt.get_unverified_header()` to parse and return the header portion of a token prior to signature verification. - -### Removed -- Python 3.2 is no longer a supported platform. This version of Python is -rarely used. Users affected by this should upgrade to 3.3+. - -[v1.2.0][1.2.0] -------------------------------------------------------------------------- -### Fixed -- Added back `verify_expiration=` argument to `jwt.decode()` that was erroneously removed in [v1.1.0][1.1.0]. - - -### Changed -- Refactored JWS-specific logic out of PyJWT and into PyJWS superclass. [#141][141] - -### Deprecated -- `verify_expiration=` argument to `jwt.decode()` is now deprecated and will be removed in a future version. Use the `option=` argument instead. - -[v1.1.0][1.1.0] -------------------------------------------------------------------------- -### Added -- Added support for PS256, PS384, and PS512 algorithms. [#132][132] -- Added flexible and complete verification options during decode. [#131][131] -- Added this CHANGELOG.md file. - - -### Deprecated -- Deprecated usage of the .decode(..., verify=False) parameter. - - -### Fixed -- Fixed command line encoding. [#128][128] - -[v1.0.1][1.0.1] -------------------------------------------------------------------------- -### Fixed -- Include jwt/contrib' and jwt/contrib/algorithms` in setup.py so that they will - actually be included when installing. [882524d][882524d] -- Fix bin/jwt after removing jwt.header(). [bd57b02][bd57b02] - -[v1.0.0][1.0.0] -------------------------------------------------------------------------- -### Changed -- Moved `jwt.api.header` out of the public API. [#85][85] -- Added README details how to extract public / private keys from an x509 certificate. [#100][100] -- Refactor api.py functions into an object (`PyJWT`). [#101][101] -- Added support for PyCrypto and ecdsa when cryptography isn't available. [#101][103] - -### Fixed -- Fixed a security vulnerability where `alg=None` header could bypass signature verification. [#109][109] -- Fixed a security vulnerability by adding support for a whitelist of allowed `alg` values `jwt.decode(algorithms=[])`. [#110][110] - - -[unreleased]: https://github.com/jpadilla/pyjwt/compare/1.4.2...HEAD -[1.0.0]: https://github.com/jpadilla/pyjwt/compare/0.4.3...1.0.0 -[1.0.1]: https://github.com/jpadilla/pyjwt/compare/1.0.0...1.0.1 -[1.0.1]: https://github.com/jpadilla/pyjwt/compare/1.0.0...1.0.1 -[1.0.1]: https://github.com/jpadilla/pyjwt/compare/1.0.0...1.0.1 -[1.1.0]: https://github.com/jpadilla/pyjwt/compare/1.0.1...1.1.0 -[1.2.0]: https://github.com/jpadilla/pyjwt/compare/1.1.0...1.2.0 -[1.3.0]: https://github.com/jpadilla/pyjwt/compare/1.2.0...1.3.0 -[1.4.0]: https://github.com/jpadilla/pyjwt/compare/1.3.0...1.4.0 -[1.4.1]: https://github.com/jpadilla/pyjwt/compare/1.4.0...1.4.1 -[1.4.2]: https://github.com/jpadilla/pyjwt/compare/1.4.1...1.4.2 -[1.5.0]: https://github.com/jpadilla/pyjwt/compare/1.4.2...1.5.0 -[1.5.1]: https://github.com/jpadilla/pyjwt/compare/1.5.0...1.5.1 -[1.5.2]: https://github.com/jpadilla/pyjwt/compare/1.5.1...1.5.2 -[1.5.3]: https://github.com/jpadilla/pyjwt/compare/1.5.2...1.5.3 -[1.6.0]: https://github.com/jpadilla/pyjwt/compare/1.5.3...1.6.0 -[1.6.1]: https://github.com/jpadilla/pyjwt/compare/1.6.0...1.6.1 -[1.6.3]: https://github.com/jpadilla/pyjwt/compare/1.6.1...1.6.3 -[1.6.4]: https://github.com/jpadilla/pyjwt/compare/1.6.3...1.6.4 - -[109]: https://github.com/jpadilla/pyjwt/pull/109 -[110]: https://github.com/jpadilla/pyjwt/pull/110 -[100]: https://github.com/jpadilla/pyjwt/pull/100 -[101]: https://github.com/jpadilla/pyjwt/pull/101 -[103]: https://github.com/jpadilla/pyjwt/pull/103 -[85]: https://github.com/jpadilla/pyjwt/pull/85 -[882524d]: https://github.com/jpadilla/pyjwt/commit/882524d -[bd57b02]: https://github.com/jpadilla/pyjwt/commit/bd57b02 -[131]: https://github.com/jpadilla/pyjwt/pull/131 -[132]: https://github.com/jpadilla/pyjwt/pull/132 -[128]: https://github.com/jpadilla/pyjwt/pull/128 -[141]: https://github.com/jpadilla/pyjwt/pull/141 -[158]: https://github.com/jpadilla/pyjwt/pull/158 -[163]: https://github.com/jpadilla/pyjwt/pull/163 -[174]: https://github.com/jpadilla/pyjwt/pull/174 -[182]: https://github.com/jpadilla/pyjwt/pull/182 -[183]: https://github.com/jpadilla/pyjwt/pull/183 -[190]: https://github.com/jpadilla/pyjwt/pull/190 -[213]: https://github.com/jpadilla/pyjwt/pull/214 -[244]: https://github.com/jpadilla/pyjwt/pull/244 -[202]: https://github.com/jpadilla/pyjwt/pull/202 -[252]: https://github.com/jpadilla/pyjwt/pull/252 -[225]: https://github.com/jpadilla/pyjwt/pull/225 -[219]: https://github.com/jpadilla/pyjwt/pull/219 -[196]: https://github.com/jpadilla/pyjwt/pull/196 -[187]: https://github.com/jpadilla/pyjwt/pull/187 -[230]: https://github.com/jpadilla/pyjwt/pull/230 -[238]: https://github.com/jpadilla/pyjwt/pull/238 -[262]: https://github.com/jpadilla/pyjwt/pull/262 -[264]: https://github.com/jpadilla/pyjwt/pull/264 -[270]: https://github.com/jpadilla/pyjwt/pull/270 -[271]: https://github.com/jpadilla/pyjwt/pull/271 -[277]: https://github.com/jpadilla/pyjwt/pull/277 -[281]: https://github.com/jpadilla/pyjwt/pull/281 -[301]: https://github.com/jpadilla/pyjwt/pull/301 -[304]: https://github.com/jpadilla/pyjwt/pull/304 -[306]: https://github.com/jpadilla/pyjwt/pull/306 -[315]: https://github.com/jpadilla/pyjwt/pull/315 -[316]: https://github.com/jpadilla/pyjwt/pull/316 -[336]: https://github.com/jpadilla/pyjwt/pull/336 -[337]: https://github.com/jpadilla/pyjwt/pull/337 -[340]: https://github.com/jpadilla/pyjwt/pull/340 -[344]: https://github.com/jpadilla/pyjwt/pull/344 -[350]: https://github.com/jpadilla/pyjwt/pull/350 -[352]: https://github.com/jpadilla/pyjwt/pull/352 -[7c1e61d]: https://github.com/jpadilla/pyjwt/commit/7c1e61dde27bafe16e7d1bb6e35199e778962742 -[7ca41e]: https://github.com/jpadilla/pyjwt/commit/7ca41e53b3d7d9f5cd31bdd8a2b832d192006239 diff --git a/script.module.pyjwt/LICENSE b/script.module.pyjwt/LICENSE deleted file mode 100644 index bdc7819ea..000000000 --- a/script.module.pyjwt/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2015 José Padilla - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/script.module.pyjwt/LICENSE.txt b/script.module.pyjwt/LICENSE.txt index bdc7819ea..fd0ecbc88 100644 --- a/script.module.pyjwt/LICENSE.txt +++ b/script.module.pyjwt/LICENSE.txt @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2015 José Padilla +Copyright (c) 2015-2022 José Padilla Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/script.module.pyjwt/README.rst b/script.module.pyjwt/README.rst deleted file mode 100644 index 07f6fead0..000000000 --- a/script.module.pyjwt/README.rst +++ /dev/null @@ -1,81 +0,0 @@ -PyJWT -===== - -.. image:: https://travis-ci.com/jpadilla/pyjwt.svg?branch=master - :target: http://travis-ci.com/jpadilla/pyjwt?branch=master - -.. image:: https://ci.appveyor.com/api/projects/status/h8nt70aqtwhht39t?svg=true - :target: https://ci.appveyor.com/project/jpadilla/pyjwt - -.. image:: https://img.shields.io/pypi/v/pyjwt.svg - :target: https://pypi.python.org/pypi/pyjwt - -.. image:: https://coveralls.io/repos/jpadilla/pyjwt/badge.svg?branch=master - :target: https://coveralls.io/r/jpadilla/pyjwt?branch=master - -.. image:: https://readthedocs.org/projects/pyjwt/badge/?version=latest - :target: https://pyjwt.readthedocs.io - -A Python implementation of `RFC 7519 `_. Original implementation was written by `@progrium `_. - -Sponsor -------- - -+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| |auth0-logo| | If you want to quickly add secure token-based authentication to Python projects, feel free to check Auth0's Python SDK and free plan at `auth0.com/overview `_. | -+--------------+-----------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - -.. |auth0-logo| image:: https://user-images.githubusercontent.com/83319/31722733-de95bbde-b3ea-11e7-96bf-4f4e8f915588.png - -Installing ----------- - -Install with **pip**: - -.. code-block:: sh - - $ pip install PyJWT - - -Usage ------ - -.. code:: python - - >>> import jwt - >>> encoded = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256') - 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg' - - >>> jwt.decode(encoded, 'secret', algorithms=['HS256']) - {'some': 'payload'} - - -Command line ------------- - -Usage:: - - pyjwt [options] INPUT - -Decoding examples:: - - pyjwt --key=secret decode TOKEN - pyjwt decode --no-verify TOKEN - -See more options executing ``pyjwt --help``. - - -Documentation -------------- - -View the full docs online at https://pyjwt.readthedocs.io/en/latest/ - - -Tests ------ - -You can run tests from the project root after cloning with: - -.. code-block:: sh - - $ python setup.py test diff --git a/script.module.pyjwt/addon.xml b/script.module.pyjwt/addon.xml index 472536dd7..605243a4c 100644 --- a/script.module.pyjwt/addon.xml +++ b/script.module.pyjwt/addon.xml @@ -1,22 +1,19 @@ - - + + + - + - pyjwt module - pyjwt module - MIT - all - hello@jpadilla.com - https://pyjwt.readthedocs.io/en/latest/ - https://github.com/jpadilla/pyjwt - - icon.png - - + JSON Web Token implementation in Python + JSON Web Token implementation in Python + MIT + all + https://github.com/jpadilla/pyjwt + https://github.com/jpadilla/pyjwt + + resources/icon.png + + diff --git a/script.module.pyjwt/lib/pyjwt/__init__.py b/script.module.pyjwt/lib/pyjwt/__init__.py index 946983f02..68d09c1c4 100644 --- a/script.module.pyjwt/lib/pyjwt/__init__.py +++ b/script.module.pyjwt/lib/pyjwt/__init__.py @@ -1,31 +1,74 @@ -# -*- coding: utf-8 -*- -# flake8: noqa +from .api_jwk import PyJWK, PyJWKSet +from .api_jws import ( + PyJWS, + get_algorithm_by_name, + get_unverified_header, + register_algorithm, + unregister_algorithm, +) +from .api_jwt import PyJWT, decode, encode +from .exceptions import ( + DecodeError, + ExpiredSignatureError, + ImmatureSignatureError, + InvalidAlgorithmError, + InvalidAudienceError, + InvalidIssuedAtError, + InvalidIssuerError, + InvalidKeyError, + InvalidSignatureError, + InvalidTokenError, + MissingRequiredClaimError, + PyJWKClientConnectionError, + PyJWKClientError, + PyJWKError, + PyJWKSetError, + PyJWTError, +) +from .jwks_client import PyJWKClient -""" -JSON Web Token implementation +__version__ = "2.8.0" -Minimum implementation based on this spec: -http://self-issued.info/docs/draft-jones-json-web-token-01.html -""" +__title__ = "PyJWT" +__description__ = "JSON Web Token implementation in Python" +__url__ = "https://pyjwt.readthedocs.io" +__uri__ = __url__ +__doc__ = f"{__description__} <{__uri__}>" +__author__ = "José Padilla" +__email__ = "hello@jpadilla.com" -__title__ = 'pyjwt' -__version__ = '1.7.1' -__author__ = 'José Padilla' -__license__ = 'MIT' -__copyright__ = 'Copyright 2015-2018 José Padilla' +__license__ = "MIT" +__copyright__ = "Copyright 2015-2022 José Padilla" -from .api_jwt import ( - encode, decode, register_algorithm, unregister_algorithm, - get_unverified_header, PyJWT -) -from .api_jws import PyJWS -from .exceptions import ( - InvalidTokenError, DecodeError, InvalidAlgorithmError, - InvalidAudienceError, ExpiredSignatureError, ImmatureSignatureError, - InvalidIssuedAtError, InvalidIssuerError, ExpiredSignature, - InvalidAudience, InvalidIssuer, MissingRequiredClaimError, - InvalidSignatureError, - PyJWTError, -) +__all__ = [ + "PyJWS", + "PyJWT", + "PyJWKClient", + "PyJWK", + "PyJWKSet", + "decode", + "encode", + "get_unverified_header", + "register_algorithm", + "unregister_algorithm", + "get_algorithm_by_name", + # Exceptions + "DecodeError", + "ExpiredSignatureError", + "ImmatureSignatureError", + "InvalidAlgorithmError", + "InvalidAudienceError", + "InvalidIssuedAtError", + "InvalidIssuerError", + "InvalidKeyError", + "InvalidSignatureError", + "InvalidTokenError", + "MissingRequiredClaimError", + "PyJWKClientConnectionError", + "PyJWKClientError", + "PyJWKError", + "PyJWKSetError", + "PyJWTError", +] diff --git a/script.module.pyjwt/lib/pyjwt/__main__.py b/script.module.pyjwt/lib/pyjwt/__main__.py deleted file mode 100644 index bf50aabf4..000000000 --- a/script.module.pyjwt/lib/pyjwt/__main__.py +++ /dev/null @@ -1,168 +0,0 @@ -#!/usr/bin/env python - -from __future__ import absolute_import, print_function - -import argparse -import json -import sys -import time - -from . import DecodeError, __version__, decode, encode - - -def encode_payload(args): - # Try to encode - if args.key is None: - raise ValueError('Key is required when encoding. See --help for usage.') - - # Build payload object to encode - payload = {} - - for arg in args.payload: - k, v = arg.split('=', 1) - - # exp +offset special case? - if k == 'exp' and v[0] == '+' and len(v) > 1: - v = str(int(time.time()+int(v[1:]))) - - # Cast to integer? - if v.isdigit(): - v = int(v) - else: - # Cast to float? - try: - v = float(v) - except ValueError: - pass - - # Cast to true, false, or null? - constants = {'true': True, 'false': False, 'null': None} - - if v in constants: - v = constants[v] - - payload[k] = v - - token = encode( - payload, - key=args.key, - algorithm=args.algorithm - ) - - return token.decode('utf-8') - - -def decode_payload(args): - try: - if args.token: - token = args.token - else: - if sys.stdin.isatty(): - token = sys.stdin.readline().strip() - else: - raise IOError('Cannot read from stdin: terminal not a TTY') - - token = token.encode('utf-8') - data = decode(token, key=args.key, verify=args.verify) - - return json.dumps(data) - - except DecodeError as e: - raise DecodeError('There was an error decoding the token: %s' % e) - - -def build_argparser(): - - usage = ''' - Encodes or decodes JSON Web Tokens based on input. - - %(prog)s [options] [options] input - - Decoding examples: - - %(prog)s --key=secret decode json.web.token - %(prog)s decode --no-verify json.web.token - - Encoding requires the key option and takes space separated key/value pairs - separated by equals (=) as input. Examples: - - %(prog)s --key=secret encode iss=me exp=1302049071 - %(prog)s --key=secret encode foo=bar exp=+10 - - The exp key is special and can take an offset to current Unix time. - ''' - - arg_parser = argparse.ArgumentParser( - prog='pyjwt', - usage=usage - ) - - arg_parser.add_argument( - '-v', '--version', - action='version', - version='%(prog)s ' + __version__ - ) - - arg_parser.add_argument( - '--key', - dest='key', - metavar='KEY', - default=None, - help='set the secret key to sign with' - ) - - arg_parser.add_argument( - '--alg', - dest='algorithm', - metavar='ALG', - default='HS256', - help='set crypto algorithm to sign with. default=HS256' - ) - - subparsers = arg_parser.add_subparsers( - title='PyJWT subcommands', - description='valid subcommands', - help='additional help' - ) - - # Encode subcommand - encode_parser = subparsers.add_parser('encode', help='use to encode a supplied payload') - - payload_help = """Payload to encode. Must be a space separated list of key/value - pairs separated by equals (=) sign.""" - - encode_parser.add_argument('payload', nargs='+', help=payload_help) - encode_parser.set_defaults(func=encode_payload) - - # Decode subcommand - decode_parser = subparsers.add_parser('decode', help='use to decode a supplied JSON web token') - decode_parser.add_argument( - 'token', - help='JSON web token to decode.', - nargs='?') - - decode_parser.add_argument( - '-n', '--no-verify', - action='store_false', - dest='verify', - default=True, - help='ignore signature and claims verification on decode' - ) - - decode_parser.set_defaults(func=decode_payload) - - return arg_parser - - -def main(): - arg_parser = build_argparser() - - try: - arguments = arg_parser.parse_args(sys.argv[1:]) - - output = arguments.func(arguments) - - print(output) - except Exception as e: - print('There was an unforseen error: ', e) - arg_parser.print_help() diff --git a/script.module.pyjwt/lib/pyjwt/algorithms.py b/script.module.pyjwt/lib/pyjwt/algorithms.py index 134368834..ed1871529 100644 --- a/script.module.pyjwt/lib/pyjwt/algorithms.py +++ b/script.module.pyjwt/lib/pyjwt/algorithms.py @@ -1,106 +1,221 @@ +from __future__ import annotations + import hashlib import hmac import json +import sys +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, ClassVar, NoReturn, Union, cast, overload - -from .compat import constant_time_compare, string_types from .exceptions import InvalidKeyError +from .types import HashlibHash, JWKDict from .utils import ( - base64url_decode, base64url_encode, der_to_raw_signature, - force_bytes, force_unicode, from_base64url_uint, raw_to_der_signature, - to_base64url_uint + base64url_decode, + base64url_encode, + der_to_raw_signature, + force_bytes, + from_base64url_uint, + is_pem_format, + is_ssh_key, + raw_to_der_signature, + to_base64url_uint, ) +if sys.version_info >= (3, 8): + from typing import Literal +else: + from typing_extensions import Literal + + try: + from cryptography.exceptions import InvalidSignature + from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes - from cryptography.hazmat.primitives.serialization import ( - load_pem_private_key, load_pem_public_key, load_ssh_public_key + from cryptography.hazmat.primitives.asymmetric import padding + from cryptography.hazmat.primitives.asymmetric.ec import ( + ECDSA, + SECP256K1, + SECP256R1, + SECP384R1, + SECP521R1, + EllipticCurve, + EllipticCurvePrivateKey, + EllipticCurvePrivateNumbers, + EllipticCurvePublicKey, + EllipticCurvePublicNumbers, + ) + from cryptography.hazmat.primitives.asymmetric.ed448 import ( + Ed448PrivateKey, + Ed448PublicKey, + ) + from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, ) from cryptography.hazmat.primitives.asymmetric.rsa import ( - RSAPrivateKey, RSAPublicKey, RSAPrivateNumbers, RSAPublicNumbers, - rsa_recover_prime_factors, rsa_crt_dmp1, rsa_crt_dmq1, rsa_crt_iqmp + RSAPrivateKey, + RSAPrivateNumbers, + RSAPublicKey, + RSAPublicNumbers, + rsa_crt_dmp1, + rsa_crt_dmq1, + rsa_crt_iqmp, + rsa_recover_prime_factors, ) - from cryptography.hazmat.primitives.asymmetric.ec import ( - EllipticCurvePrivateKey, EllipticCurvePublicKey + from cryptography.hazmat.primitives.serialization import ( + Encoding, + NoEncryption, + PrivateFormat, + PublicFormat, + load_pem_private_key, + load_pem_public_key, + load_ssh_public_key, ) - from cryptography.hazmat.primitives.asymmetric import ec, padding - from cryptography.hazmat.backends import default_backend - from cryptography.exceptions import InvalidSignature has_crypto = True -except ImportError: +except ModuleNotFoundError: has_crypto = False -requires_cryptography = set(['RS256', 'RS384', 'RS512', 'ES256', 'ES384', - 'ES521', 'ES512', 'PS256', 'PS384', 'PS512']) +if TYPE_CHECKING: + # Type aliases for convenience in algorithms method signatures + AllowedRSAKeys = RSAPrivateKey | RSAPublicKey + AllowedECKeys = EllipticCurvePrivateKey | EllipticCurvePublicKey + AllowedOKPKeys = ( + Ed25519PrivateKey | Ed25519PublicKey | Ed448PrivateKey | Ed448PublicKey + ) + AllowedKeys = AllowedRSAKeys | AllowedECKeys | AllowedOKPKeys + AllowedPrivateKeys = ( + RSAPrivateKey | EllipticCurvePrivateKey | Ed25519PrivateKey | Ed448PrivateKey + ) + AllowedPublicKeys = ( + RSAPublicKey | EllipticCurvePublicKey | Ed25519PublicKey | Ed448PublicKey + ) + + +requires_cryptography = { + "RS256", + "RS384", + "RS512", + "ES256", + "ES256K", + "ES384", + "ES521", + "ES512", + "PS256", + "PS384", + "PS512", + "EdDSA", +} -def get_default_algorithms(): + +def get_default_algorithms() -> dict[str, Algorithm]: """ Returns the algorithms that are implemented by the library. """ default_algorithms = { - 'none': NoneAlgorithm(), - 'HS256': HMACAlgorithm(HMACAlgorithm.SHA256), - 'HS384': HMACAlgorithm(HMACAlgorithm.SHA384), - 'HS512': HMACAlgorithm(HMACAlgorithm.SHA512) + "none": NoneAlgorithm(), + "HS256": HMACAlgorithm(HMACAlgorithm.SHA256), + "HS384": HMACAlgorithm(HMACAlgorithm.SHA384), + "HS512": HMACAlgorithm(HMACAlgorithm.SHA512), } if has_crypto: - default_algorithms.update({ - 'RS256': RSAAlgorithm(RSAAlgorithm.SHA256), - 'RS384': RSAAlgorithm(RSAAlgorithm.SHA384), - 'RS512': RSAAlgorithm(RSAAlgorithm.SHA512), - 'ES256': ECAlgorithm(ECAlgorithm.SHA256), - 'ES384': ECAlgorithm(ECAlgorithm.SHA384), - 'ES521': ECAlgorithm(ECAlgorithm.SHA512), - 'ES512': ECAlgorithm(ECAlgorithm.SHA512), # Backward compat for #219 fix - 'PS256': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256), - 'PS384': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384), - 'PS512': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA512) - }) + default_algorithms.update( + { + "RS256": RSAAlgorithm(RSAAlgorithm.SHA256), + "RS384": RSAAlgorithm(RSAAlgorithm.SHA384), + "RS512": RSAAlgorithm(RSAAlgorithm.SHA512), + "ES256": ECAlgorithm(ECAlgorithm.SHA256), + "ES256K": ECAlgorithm(ECAlgorithm.SHA256), + "ES384": ECAlgorithm(ECAlgorithm.SHA384), + "ES521": ECAlgorithm(ECAlgorithm.SHA512), + "ES512": ECAlgorithm( + ECAlgorithm.SHA512 + ), # Backward compat for #219 fix + "PS256": RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256), + "PS384": RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384), + "PS512": RSAPSSAlgorithm(RSAPSSAlgorithm.SHA512), + "EdDSA": OKPAlgorithm(), + } + ) return default_algorithms -class Algorithm(object): +class Algorithm(ABC): """ The interface for an algorithm used to sign and verify tokens. """ - def prepare_key(self, key): + + def compute_hash_digest(self, bytestr: bytes) -> bytes: + """ + Compute a hash digest using the specified algorithm's hash algorithm. + + If there is no hash algorithm, raises a NotImplementedError. + """ + # lookup self.hash_alg if defined in a way that mypy can understand + hash_alg = getattr(self, "hash_alg", None) + if hash_alg is None: + raise NotImplementedError + + if ( + has_crypto + and isinstance(hash_alg, type) + and issubclass(hash_alg, hashes.HashAlgorithm) + ): + digest = hashes.Hash(hash_alg(), backend=default_backend()) + digest.update(bytestr) + return bytes(digest.finalize()) + else: + return bytes(hash_alg(bytestr).digest()) + + @abstractmethod + def prepare_key(self, key: Any) -> Any: """ Performs necessary validation and conversions on the key and returns the key value in the proper format for sign() and verify(). """ - raise NotImplementedError - def sign(self, msg, key): + @abstractmethod + def sign(self, msg: bytes, key: Any) -> bytes: """ Returns a digital signature for the specified message using the specified key value. """ - raise NotImplementedError - def verify(self, msg, key, sig): + @abstractmethod + def verify(self, msg: bytes, key: Any, sig: bytes) -> bool: """ Verifies that the specified digital signature is valid for the specified message and key values. """ - raise NotImplementedError + + @overload + @staticmethod + @abstractmethod + def to_jwk(key_obj, as_dict: Literal[True]) -> JWKDict: + ... # pragma: no cover + + @overload + @staticmethod + @abstractmethod + def to_jwk(key_obj, as_dict: Literal[False] = False) -> str: + ... # pragma: no cover @staticmethod - def to_jwk(key_obj): + @abstractmethod + def to_jwk(key_obj, as_dict: bool = False) -> Union[JWKDict, str]: """ - Serializes a given RSA key into a JWK + Serializes a given key into a JWK """ - raise NotImplementedError @staticmethod - def from_jwk(jwk): + @abstractmethod + def from_jwk(jwk: str | JWKDict) -> Any: """ - Deserializes a given RSA key from JWK back into a PublicKey or PrivateKey object + Deserializes a given key from JWK back into a key object """ - raise NotImplementedError class NoneAlgorithm(Algorithm): @@ -108,8 +223,9 @@ class NoneAlgorithm(Algorithm): Placeholder for use when no signing or verification operations are required. """ - def prepare_key(self, key): - if key == '': + + def prepare_key(self, key: str | None) -> None: + if key == "": key = None if key is not None: @@ -117,63 +233,89 @@ def prepare_key(self, key): return key - def sign(self, msg, key): - return b'' + def sign(self, msg: bytes, key: None) -> bytes: + return b"" - def verify(self, msg, key, sig): + def verify(self, msg: bytes, key: None, sig: bytes) -> bool: return False + @staticmethod + def to_jwk(key_obj: Any, as_dict: bool = False) -> NoReturn: + raise NotImplementedError() + + @staticmethod + def from_jwk(jwk: str | JWKDict) -> NoReturn: + raise NotImplementedError() + class HMACAlgorithm(Algorithm): """ Performs signing and verification operations using HMAC and the specified hash function. """ - SHA256 = hashlib.sha256 - SHA384 = hashlib.sha384 - SHA512 = hashlib.sha512 - def __init__(self, hash_alg): - self.hash_alg = hash_alg + SHA256: ClassVar[HashlibHash] = hashlib.sha256 + SHA384: ClassVar[HashlibHash] = hashlib.sha384 + SHA512: ClassVar[HashlibHash] = hashlib.sha512 - def prepare_key(self, key): - key = force_bytes(key) + def __init__(self, hash_alg: HashlibHash) -> None: + self.hash_alg = hash_alg - invalid_strings = [ - b'-----BEGIN PUBLIC KEY-----', - b'-----BEGIN CERTIFICATE-----', - b'-----BEGIN RSA PUBLIC KEY-----', - b'ssh-rsa' - ] + def prepare_key(self, key: str | bytes) -> bytes: + key_bytes = force_bytes(key) - if any([string_value in key for string_value in invalid_strings]): + if is_pem_format(key_bytes) or is_ssh_key(key_bytes): raise InvalidKeyError( - 'The specified key is an asymmetric key or x509 certificate and' - ' should not be used as an HMAC secret.') + "The specified key is an asymmetric key or x509 certificate and" + " should not be used as an HMAC secret." + ) - return key + return key_bytes + @overload @staticmethod - def to_jwk(key_obj): - return json.dumps({ - 'k': force_unicode(base64url_encode(force_bytes(key_obj))), - 'kty': 'oct' - }) + def to_jwk(key_obj: str | bytes, as_dict: Literal[True]) -> JWKDict: + ... # pragma: no cover + @overload @staticmethod - def from_jwk(jwk): - obj = json.loads(jwk) + def to_jwk(key_obj: str | bytes, as_dict: Literal[False] = False) -> str: + ... # pragma: no cover - if obj.get('kty') != 'oct': - raise InvalidKeyError('Not an HMAC key') + @staticmethod + def to_jwk(key_obj: str | bytes, as_dict: bool = False) -> Union[JWKDict, str]: + jwk = { + "k": base64url_encode(force_bytes(key_obj)).decode(), + "kty": "oct", + } + + if as_dict: + return jwk + else: + return json.dumps(jwk) + + @staticmethod + def from_jwk(jwk: str | JWKDict) -> bytes: + try: + if isinstance(jwk, str): + obj: JWKDict = json.loads(jwk) + elif isinstance(jwk, dict): + obj = jwk + else: + raise ValueError + except ValueError: + raise InvalidKeyError("Key is not valid JSON") - return base64url_decode(obj['k']) + if obj.get("kty") != "oct": + raise InvalidKeyError("Not an HMAC key") - def sign(self, msg, key): + return base64url_decode(obj["k"]) + + def sign(self, msg: bytes, key: bytes) -> bytes: return hmac.new(key, msg, self.hash_alg).digest() - def verify(self, msg, key, sig): - return constant_time_compare(sig, self.sign(msg, key)) + def verify(self, msg: bytes, key: bytes, sig: bytes) -> bool: + return hmac.compare_digest(sig, self.sign(msg, key)) if has_crypto: @@ -183,107 +325,132 @@ class RSAAlgorithm(Algorithm): Performs signing and verification operations using RSASSA-PKCS-v1_5 and the specified hash function. """ - SHA256 = hashes.SHA256 - SHA384 = hashes.SHA384 - SHA512 = hashes.SHA512 - def __init__(self, hash_alg): + SHA256: ClassVar[type[hashes.HashAlgorithm]] = hashes.SHA256 + SHA384: ClassVar[type[hashes.HashAlgorithm]] = hashes.SHA384 + SHA512: ClassVar[type[hashes.HashAlgorithm]] = hashes.SHA512 + + def __init__(self, hash_alg: type[hashes.HashAlgorithm]) -> None: self.hash_alg = hash_alg - def prepare_key(self, key): - if isinstance(key, RSAPrivateKey) or \ - isinstance(key, RSAPublicKey): + def prepare_key(self, key: AllowedRSAKeys | str | bytes) -> AllowedRSAKeys: + if isinstance(key, (RSAPrivateKey, RSAPublicKey)): return key - if isinstance(key, string_types): - key = force_bytes(key) + if not isinstance(key, (bytes, str)): + raise TypeError("Expecting a PEM-formatted key.") - try: - if key.startswith(b'ssh-rsa'): - key = load_ssh_public_key(key, backend=default_backend()) - else: - key = load_pem_private_key(key, password=None, backend=default_backend()) - except ValueError: - key = load_pem_public_key(key, backend=default_backend()) - else: - raise TypeError('Expecting a PEM-formatted key.') + key_bytes = force_bytes(key) - return key + try: + if key_bytes.startswith(b"ssh-rsa"): + return cast(RSAPublicKey, load_ssh_public_key(key_bytes)) + else: + return cast( + RSAPrivateKey, load_pem_private_key(key_bytes, password=None) + ) + except ValueError: + return cast(RSAPublicKey, load_pem_public_key(key_bytes)) + @overload @staticmethod - def to_jwk(key_obj): - obj = None + def to_jwk(key_obj: AllowedRSAKeys, as_dict: Literal[True]) -> JWKDict: + ... # pragma: no cover - if getattr(key_obj, 'private_numbers', None): + @overload + @staticmethod + def to_jwk(key_obj: AllowedRSAKeys, as_dict: Literal[False] = False) -> str: + ... # pragma: no cover + + @staticmethod + def to_jwk( + key_obj: AllowedRSAKeys, as_dict: bool = False + ) -> Union[JWKDict, str]: + obj: dict[str, Any] | None = None + + if hasattr(key_obj, "private_numbers"): # Private key numbers = key_obj.private_numbers() obj = { - 'kty': 'RSA', - 'key_ops': ['sign'], - 'n': force_unicode(to_base64url_uint(numbers.public_numbers.n)), - 'e': force_unicode(to_base64url_uint(numbers.public_numbers.e)), - 'd': force_unicode(to_base64url_uint(numbers.d)), - 'p': force_unicode(to_base64url_uint(numbers.p)), - 'q': force_unicode(to_base64url_uint(numbers.q)), - 'dp': force_unicode(to_base64url_uint(numbers.dmp1)), - 'dq': force_unicode(to_base64url_uint(numbers.dmq1)), - 'qi': force_unicode(to_base64url_uint(numbers.iqmp)) + "kty": "RSA", + "key_ops": ["sign"], + "n": to_base64url_uint(numbers.public_numbers.n).decode(), + "e": to_base64url_uint(numbers.public_numbers.e).decode(), + "d": to_base64url_uint(numbers.d).decode(), + "p": to_base64url_uint(numbers.p).decode(), + "q": to_base64url_uint(numbers.q).decode(), + "dp": to_base64url_uint(numbers.dmp1).decode(), + "dq": to_base64url_uint(numbers.dmq1).decode(), + "qi": to_base64url_uint(numbers.iqmp).decode(), } - elif getattr(key_obj, 'verify', None): + elif hasattr(key_obj, "verify"): # Public key numbers = key_obj.public_numbers() obj = { - 'kty': 'RSA', - 'key_ops': ['verify'], - 'n': force_unicode(to_base64url_uint(numbers.n)), - 'e': force_unicode(to_base64url_uint(numbers.e)) + "kty": "RSA", + "key_ops": ["verify"], + "n": to_base64url_uint(numbers.n).decode(), + "e": to_base64url_uint(numbers.e).decode(), } else: - raise InvalidKeyError('Not a public or private key') + raise InvalidKeyError("Not a public or private key") - return json.dumps(obj) + if as_dict: + return obj + else: + return json.dumps(obj) @staticmethod - def from_jwk(jwk): + def from_jwk(jwk: str | JWKDict) -> AllowedRSAKeys: try: - obj = json.loads(jwk) + if isinstance(jwk, str): + obj = json.loads(jwk) + elif isinstance(jwk, dict): + obj = jwk + else: + raise ValueError except ValueError: - raise InvalidKeyError('Key is not valid JSON') + raise InvalidKeyError("Key is not valid JSON") - if obj.get('kty') != 'RSA': - raise InvalidKeyError('Not an RSA key') + if obj.get("kty") != "RSA": + raise InvalidKeyError("Not an RSA key") - if 'd' in obj and 'e' in obj and 'n' in obj: + if "d" in obj and "e" in obj and "n" in obj: # Private key - if 'oth' in obj: - raise InvalidKeyError('Unsupported RSA private key: > 2 primes not supported') + if "oth" in obj: + raise InvalidKeyError( + "Unsupported RSA private key: > 2 primes not supported" + ) - other_props = ['p', 'q', 'dp', 'dq', 'qi'] + other_props = ["p", "q", "dp", "dq", "qi"] props_found = [prop in obj for prop in other_props] any_props_found = any(props_found) if any_props_found and not all(props_found): - raise InvalidKeyError('RSA key must include all parameters if any are present besides d') + raise InvalidKeyError( + "RSA key must include all parameters if any are present besides d" + ) public_numbers = RSAPublicNumbers( - from_base64url_uint(obj['e']), from_base64url_uint(obj['n']) + from_base64url_uint(obj["e"]), + from_base64url_uint(obj["n"]), ) if any_props_found: numbers = RSAPrivateNumbers( - d=from_base64url_uint(obj['d']), - p=from_base64url_uint(obj['p']), - q=from_base64url_uint(obj['q']), - dmp1=from_base64url_uint(obj['dp']), - dmq1=from_base64url_uint(obj['dq']), - iqmp=from_base64url_uint(obj['qi']), - public_numbers=public_numbers + d=from_base64url_uint(obj["d"]), + p=from_base64url_uint(obj["p"]), + q=from_base64url_uint(obj["q"]), + dmp1=from_base64url_uint(obj["dp"]), + dmq1=from_base64url_uint(obj["dq"]), + iqmp=from_base64url_uint(obj["qi"]), + public_numbers=public_numbers, ) else: - d = from_base64url_uint(obj['d']) + d = from_base64url_uint(obj["d"]) p, q = rsa_recover_prime_factors( public_numbers.n, d, public_numbers.e ) @@ -295,24 +462,23 @@ def from_jwk(jwk): dmp1=rsa_crt_dmp1(d, p), dmq1=rsa_crt_dmq1(d, q), iqmp=rsa_crt_iqmp(p, q), - public_numbers=public_numbers + public_numbers=public_numbers, ) - return numbers.private_key(default_backend()) - elif 'n' in obj and 'e' in obj: + return numbers.private_key() + elif "n" in obj and "e" in obj: # Public key - numbers = RSAPublicNumbers( - from_base64url_uint(obj['e']), from_base64url_uint(obj['n']) - ) - - return numbers.public_key(default_backend()) + return RSAPublicNumbers( + from_base64url_uint(obj["e"]), + from_base64url_uint(obj["n"]), + ).public_key() else: - raise InvalidKeyError('Not a public or private key') + raise InvalidKeyError("Not a public or private key") - def sign(self, msg, key): + def sign(self, msg: bytes, key: RSAPrivateKey) -> bytes: return key.sign(msg, padding.PKCS1v15(), self.hash_alg()) - def verify(self, msg, key, sig): + def verify(self, msg: bytes, key: RSAPublicKey, sig: bytes) -> bool: try: key.verify(sig, msg, padding.PKCS1v15(), self.hash_alg()) return True @@ -324,80 +490,373 @@ class ECAlgorithm(Algorithm): Performs signing and verification operations using ECDSA and the specified hash function """ - SHA256 = hashes.SHA256 - SHA384 = hashes.SHA384 - SHA512 = hashes.SHA512 - def __init__(self, hash_alg): + SHA256: ClassVar[type[hashes.HashAlgorithm]] = hashes.SHA256 + SHA384: ClassVar[type[hashes.HashAlgorithm]] = hashes.SHA384 + SHA512: ClassVar[type[hashes.HashAlgorithm]] = hashes.SHA512 + + def __init__(self, hash_alg: type[hashes.HashAlgorithm]) -> None: self.hash_alg = hash_alg - def prepare_key(self, key): - if isinstance(key, EllipticCurvePrivateKey) or \ - isinstance(key, EllipticCurvePublicKey): + def prepare_key(self, key: AllowedECKeys | str | bytes) -> AllowedECKeys: + if isinstance(key, (EllipticCurvePrivateKey, EllipticCurvePublicKey)): return key - if isinstance(key, string_types): - key = force_bytes(key) + if not isinstance(key, (bytes, str)): + raise TypeError("Expecting a PEM-formatted key.") - # Attempt to load key. We don't know if it's - # a Signing Key or a Verifying Key, so we try - # the Verifying Key first. - try: - if key.startswith(b'ecdsa-sha2-'): - key = load_ssh_public_key(key, backend=default_backend()) - else: - key = load_pem_public_key(key, backend=default_backend()) - except ValueError: - key = load_pem_private_key(key, password=None, backend=default_backend()) + key_bytes = force_bytes(key) - else: - raise TypeError('Expecting a PEM-formatted key.') + # Attempt to load key. We don't know if it's + # a Signing Key or a Verifying Key, so we try + # the Verifying Key first. + try: + if key_bytes.startswith(b"ecdsa-sha2-"): + crypto_key = load_ssh_public_key(key_bytes) + else: + crypto_key = load_pem_public_key(key_bytes) # type: ignore[assignment] + except ValueError: + crypto_key = load_pem_private_key(key_bytes, password=None) # type: ignore[assignment] + + # Explicit check the key to prevent confusing errors from cryptography + if not isinstance( + crypto_key, (EllipticCurvePrivateKey, EllipticCurvePublicKey) + ): + raise InvalidKeyError( + "Expecting a EllipticCurvePrivateKey/EllipticCurvePublicKey. Wrong key provided for ECDSA algorithms" + ) - return key + return crypto_key - def sign(self, msg, key): - der_sig = key.sign(msg, ec.ECDSA(self.hash_alg())) + def sign(self, msg: bytes, key: EllipticCurvePrivateKey) -> bytes: + der_sig = key.sign(msg, ECDSA(self.hash_alg())) return der_to_raw_signature(der_sig, key.curve) - def verify(self, msg, key, sig): + def verify(self, msg: bytes, key: "AllowedECKeys", sig: bytes) -> bool: try: der_sig = raw_to_der_signature(sig, key.curve) except ValueError: return False try: - key.verify(der_sig, msg, ec.ECDSA(self.hash_alg())) + public_key = ( + key.public_key() + if isinstance(key, EllipticCurvePrivateKey) + else key + ) + public_key.verify(der_sig, msg, ECDSA(self.hash_alg())) return True except InvalidSignature: return False + @overload + @staticmethod + def to_jwk(key_obj: AllowedECKeys, as_dict: Literal[True]) -> JWKDict: + ... # pragma: no cover + + @overload + @staticmethod + def to_jwk(key_obj: AllowedECKeys, as_dict: Literal[False] = False) -> str: + ... # pragma: no cover + + @staticmethod + def to_jwk( + key_obj: AllowedECKeys, as_dict: bool = False + ) -> Union[JWKDict, str]: + if isinstance(key_obj, EllipticCurvePrivateKey): + public_numbers = key_obj.public_key().public_numbers() + elif isinstance(key_obj, EllipticCurvePublicKey): + public_numbers = key_obj.public_numbers() + else: + raise InvalidKeyError("Not a public or private key") + + if isinstance(key_obj.curve, SECP256R1): + crv = "P-256" + elif isinstance(key_obj.curve, SECP384R1): + crv = "P-384" + elif isinstance(key_obj.curve, SECP521R1): + crv = "P-521" + elif isinstance(key_obj.curve, SECP256K1): + crv = "secp256k1" + else: + raise InvalidKeyError(f"Invalid curve: {key_obj.curve}") + + obj: dict[str, Any] = { + "kty": "EC", + "crv": crv, + "x": to_base64url_uint(public_numbers.x).decode(), + "y": to_base64url_uint(public_numbers.y).decode(), + } + + if isinstance(key_obj, EllipticCurvePrivateKey): + obj["d"] = to_base64url_uint( + key_obj.private_numbers().private_value + ).decode() + + if as_dict: + return obj + else: + return json.dumps(obj) + + @staticmethod + def from_jwk(jwk: str | JWKDict) -> AllowedECKeys: + try: + if isinstance(jwk, str): + obj = json.loads(jwk) + elif isinstance(jwk, dict): + obj = jwk + else: + raise ValueError + except ValueError: + raise InvalidKeyError("Key is not valid JSON") + + if obj.get("kty") != "EC": + raise InvalidKeyError("Not an Elliptic curve key") + + if "x" not in obj or "y" not in obj: + raise InvalidKeyError("Not an Elliptic curve key") + + x = base64url_decode(obj.get("x")) + y = base64url_decode(obj.get("y")) + + curve = obj.get("crv") + curve_obj: EllipticCurve + + if curve == "P-256": + if len(x) == len(y) == 32: + curve_obj = SECP256R1() + else: + raise InvalidKeyError("Coords should be 32 bytes for curve P-256") + elif curve == "P-384": + if len(x) == len(y) == 48: + curve_obj = SECP384R1() + else: + raise InvalidKeyError("Coords should be 48 bytes for curve P-384") + elif curve == "P-521": + if len(x) == len(y) == 66: + curve_obj = SECP521R1() + else: + raise InvalidKeyError("Coords should be 66 bytes for curve P-521") + elif curve == "secp256k1": + if len(x) == len(y) == 32: + curve_obj = SECP256K1() + else: + raise InvalidKeyError( + "Coords should be 32 bytes for curve secp256k1" + ) + else: + raise InvalidKeyError(f"Invalid curve: {curve}") + + public_numbers = EllipticCurvePublicNumbers( + x=int.from_bytes(x, byteorder="big"), + y=int.from_bytes(y, byteorder="big"), + curve=curve_obj, + ) + + if "d" not in obj: + return public_numbers.public_key() + + d = base64url_decode(obj.get("d")) + if len(d) != len(x): + raise InvalidKeyError( + "D should be {} bytes for curve {}", len(x), curve + ) + + return EllipticCurvePrivateNumbers( + int.from_bytes(d, byteorder="big"), public_numbers + ).private_key() + class RSAPSSAlgorithm(RSAAlgorithm): """ Performs a signature using RSASSA-PSS with MGF1 """ - def sign(self, msg, key): + def sign(self, msg: bytes, key: RSAPrivateKey) -> bytes: return key.sign( msg, padding.PSS( mgf=padding.MGF1(self.hash_alg()), - salt_length=self.hash_alg.digest_size + salt_length=self.hash_alg().digest_size, ), - self.hash_alg() + self.hash_alg(), ) - def verify(self, msg, key, sig): + def verify(self, msg: bytes, key: RSAPublicKey, sig: bytes) -> bool: try: key.verify( sig, msg, padding.PSS( mgf=padding.MGF1(self.hash_alg()), - salt_length=self.hash_alg.digest_size + salt_length=self.hash_alg().digest_size, ), - self.hash_alg() + self.hash_alg(), ) return True except InvalidSignature: return False + + class OKPAlgorithm(Algorithm): + """ + Performs signing and verification operations using EdDSA + + This class requires ``cryptography>=2.6`` to be installed. + """ + + def __init__(self, **kwargs: Any) -> None: + pass + + def prepare_key(self, key: AllowedOKPKeys | str | bytes) -> AllowedOKPKeys: + if isinstance(key, (bytes, str)): + key_str = key.decode("utf-8") if isinstance(key, bytes) else key + key_bytes = key.encode("utf-8") if isinstance(key, str) else key + + if "-----BEGIN PUBLIC" in key_str: + key = load_pem_public_key(key_bytes) # type: ignore[assignment] + elif "-----BEGIN PRIVATE" in key_str: + key = load_pem_private_key(key_bytes, password=None) # type: ignore[assignment] + elif key_str[0:4] == "ssh-": + key = load_ssh_public_key(key_bytes) # type: ignore[assignment] + + # Explicit check the key to prevent confusing errors from cryptography + if not isinstance( + key, + (Ed25519PrivateKey, Ed25519PublicKey, Ed448PrivateKey, Ed448PublicKey), + ): + raise InvalidKeyError( + "Expecting a EllipticCurvePrivateKey/EllipticCurvePublicKey. Wrong key provided for EdDSA algorithms" + ) + + return key + + def sign( + self, msg: str | bytes, key: Ed25519PrivateKey | Ed448PrivateKey + ) -> bytes: + """ + Sign a message ``msg`` using the EdDSA private key ``key`` + :param str|bytes msg: Message to sign + :param Ed25519PrivateKey}Ed448PrivateKey key: A :class:`.Ed25519PrivateKey` + or :class:`.Ed448PrivateKey` isinstance + :return bytes signature: The signature, as bytes + """ + msg_bytes = msg.encode("utf-8") if isinstance(msg, str) else msg + return key.sign(msg_bytes) + + def verify( + self, msg: str | bytes, key: AllowedOKPKeys, sig: str | bytes + ) -> bool: + """ + Verify a given ``msg`` against a signature ``sig`` using the EdDSA key ``key`` + + :param str|bytes sig: EdDSA signature to check ``msg`` against + :param str|bytes msg: Message to sign + :param Ed25519PrivateKey|Ed25519PublicKey|Ed448PrivateKey|Ed448PublicKey key: + A private or public EdDSA key instance + :return bool verified: True if signature is valid, False if not. + """ + try: + msg_bytes = msg.encode("utf-8") if isinstance(msg, str) else msg + sig_bytes = sig.encode("utf-8") if isinstance(sig, str) else sig + + public_key = ( + key.public_key() + if isinstance(key, (Ed25519PrivateKey, Ed448PrivateKey)) + else key + ) + public_key.verify(sig_bytes, msg_bytes) + return True # If no exception was raised, the signature is valid. + except InvalidSignature: + return False + + @overload + @staticmethod + def to_jwk(key: AllowedOKPKeys, as_dict: Literal[True]) -> JWKDict: + ... # pragma: no cover + + @overload + @staticmethod + def to_jwk(key: AllowedOKPKeys, as_dict: Literal[False] = False) -> str: + ... # pragma: no cover + + @staticmethod + def to_jwk(key: AllowedOKPKeys, as_dict: bool = False) -> Union[JWKDict, str]: + if isinstance(key, (Ed25519PublicKey, Ed448PublicKey)): + x = key.public_bytes( + encoding=Encoding.Raw, + format=PublicFormat.Raw, + ) + crv = "Ed25519" if isinstance(key, Ed25519PublicKey) else "Ed448" + + obj = { + "x": base64url_encode(force_bytes(x)).decode(), + "kty": "OKP", + "crv": crv, + } + + if as_dict: + return obj + else: + return json.dumps(obj) + + if isinstance(key, (Ed25519PrivateKey, Ed448PrivateKey)): + d = key.private_bytes( + encoding=Encoding.Raw, + format=PrivateFormat.Raw, + encryption_algorithm=NoEncryption(), + ) + + x = key.public_key().public_bytes( + encoding=Encoding.Raw, + format=PublicFormat.Raw, + ) + + crv = "Ed25519" if isinstance(key, Ed25519PrivateKey) else "Ed448" + obj = { + "x": base64url_encode(force_bytes(x)).decode(), + "d": base64url_encode(force_bytes(d)).decode(), + "kty": "OKP", + "crv": crv, + } + + if as_dict: + return obj + else: + return json.dumps(obj) + + raise InvalidKeyError("Not a public or private key") + + @staticmethod + def from_jwk(jwk: str | JWKDict) -> AllowedOKPKeys: + try: + if isinstance(jwk, str): + obj = json.loads(jwk) + elif isinstance(jwk, dict): + obj = jwk + else: + raise ValueError + except ValueError: + raise InvalidKeyError("Key is not valid JSON") + + if obj.get("kty") != "OKP": + raise InvalidKeyError("Not an Octet Key Pair") + + curve = obj.get("crv") + if curve != "Ed25519" and curve != "Ed448": + raise InvalidKeyError(f"Invalid curve: {curve}") + + if "x" not in obj: + raise InvalidKeyError('OKP should have "x" parameter') + x = base64url_decode(obj.get("x")) + + try: + if "d" not in obj: + if curve == "Ed25519": + return Ed25519PublicKey.from_public_bytes(x) + return Ed448PublicKey.from_public_bytes(x) + d = base64url_decode(obj.get("d")) + if curve == "Ed25519": + return Ed25519PrivateKey.from_private_bytes(d) + return Ed448PrivateKey.from_private_bytes(d) + except ValueError as err: + raise InvalidKeyError("Invalid key parameter") from err diff --git a/script.module.pyjwt/lib/pyjwt/api_jwk.py b/script.module.pyjwt/lib/pyjwt/api_jwk.py new file mode 100644 index 000000000..456c7f4d8 --- /dev/null +++ b/script.module.pyjwt/lib/pyjwt/api_jwk.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +import json +import time +from typing import Any + +from .algorithms import get_default_algorithms, has_crypto, requires_cryptography +from .exceptions import InvalidKeyError, PyJWKError, PyJWKSetError, PyJWTError +from .types import JWKDict + + +class PyJWK: + def __init__(self, jwk_data: JWKDict, algorithm: str | None = None) -> None: + self._algorithms = get_default_algorithms() + self._jwk_data = jwk_data + + kty = self._jwk_data.get("kty", None) + if not kty: + raise InvalidKeyError(f"kty is not found: {self._jwk_data}") + + if not algorithm and isinstance(self._jwk_data, dict): + algorithm = self._jwk_data.get("alg", None) + + if not algorithm: + # Determine alg with kty (and crv). + crv = self._jwk_data.get("crv", None) + if kty == "EC": + if crv == "P-256" or not crv: + algorithm = "ES256" + elif crv == "P-384": + algorithm = "ES384" + elif crv == "P-521": + algorithm = "ES512" + elif crv == "secp256k1": + algorithm = "ES256K" + else: + raise InvalidKeyError(f"Unsupported crv: {crv}") + elif kty == "RSA": + algorithm = "RS256" + elif kty == "oct": + algorithm = "HS256" + elif kty == "OKP": + if not crv: + raise InvalidKeyError(f"crv is not found: {self._jwk_data}") + if crv == "Ed25519": + algorithm = "EdDSA" + else: + raise InvalidKeyError(f"Unsupported crv: {crv}") + else: + raise InvalidKeyError(f"Unsupported kty: {kty}") + + if not has_crypto and algorithm in requires_cryptography: + raise PyJWKError(f"{algorithm} requires 'cryptography' to be installed.") + + self.Algorithm = self._algorithms.get(algorithm) + + if not self.Algorithm: + raise PyJWKError(f"Unable to find an algorithm for key: {self._jwk_data}") + + self.key = self.Algorithm.from_jwk(self._jwk_data) + + @staticmethod + def from_dict(obj: JWKDict, algorithm: str | None = None) -> "PyJWK": + return PyJWK(obj, algorithm) + + @staticmethod + def from_json(data: str, algorithm: None = None) -> "PyJWK": + obj = json.loads(data) + return PyJWK.from_dict(obj, algorithm) + + @property + def key_type(self) -> str | None: + return self._jwk_data.get("kty", None) + + @property + def key_id(self) -> str | None: + return self._jwk_data.get("kid", None) + + @property + def public_key_use(self) -> str | None: + return self._jwk_data.get("use", None) + + +class PyJWKSet: + def __init__(self, keys: list[JWKDict]) -> None: + self.keys = [] + + if not keys: + raise PyJWKSetError("The JWK Set did not contain any keys") + + if not isinstance(keys, list): + raise PyJWKSetError("Invalid JWK Set value") + + for key in keys: + try: + self.keys.append(PyJWK(key)) + except PyJWTError: + # skip unusable keys + continue + + if len(self.keys) == 0: + raise PyJWKSetError( + "The JWK Set did not contain any usable keys. Perhaps 'cryptography' is not installed?" + ) + + @staticmethod + def from_dict(obj: dict[str, Any]) -> "PyJWKSet": + keys = obj.get("keys", []) + return PyJWKSet(keys) + + @staticmethod + def from_json(data: str) -> "PyJWKSet": + obj = json.loads(data) + return PyJWKSet.from_dict(obj) + + def __getitem__(self, kid: str) -> "PyJWK": + for key in self.keys: + if key.key_id == kid: + return key + raise KeyError(f"keyset has no key for kid: {kid}") + + +class PyJWTSetWithTimestamp: + def __init__(self, jwk_set: PyJWKSet): + self.jwk_set = jwk_set + self.timestamp = time.monotonic() + + def get_jwk_set(self) -> PyJWKSet: + return self.jwk_set + + def get_timestamp(self) -> float: + return self.timestamp diff --git a/script.module.pyjwt/lib/pyjwt/api_jws.py b/script.module.pyjwt/lib/pyjwt/api_jws.py index a9354adb0..fa6708ccc 100644 --- a/script.module.pyjwt/lib/pyjwt/api_jws.py +++ b/script.module.pyjwt/lib/pyjwt/api_jws.py @@ -1,163 +1,241 @@ +from __future__ import annotations + import binascii import json import warnings -try: - # import required by mypy to perform type checking, not used for normal execution - from typing import Callable, Dict, List, Optional, Union # NOQA -except ImportError: - pass +from typing import TYPE_CHECKING, Any from .algorithms import ( - Algorithm, get_default_algorithms, has_crypto, requires_cryptography # NOQA + Algorithm, + get_default_algorithms, + has_crypto, + requires_cryptography, ) -from .compat import Mapping, binary_type, string_types, text_type from .exceptions import ( - DecodeError, InvalidAlgorithmError, InvalidSignatureError, - InvalidTokenError + DecodeError, + InvalidAlgorithmError, + InvalidSignatureError, + InvalidTokenError, ) -from .utils import base64url_decode, base64url_encode, force_bytes, merge_dict +from .utils import base64url_decode, base64url_encode +from .warnings import RemovedInPyjwt3Warning + +if TYPE_CHECKING: + from .algorithms import AllowedPrivateKeys, AllowedPublicKeys -class PyJWS(object): - header_typ = 'JWT' +class PyJWS: + header_typ = "JWT" - def __init__(self, algorithms=None, options=None): + def __init__( + self, + algorithms: list[str] | None = None, + options: dict[str, Any] | None = None, + ) -> None: self._algorithms = get_default_algorithms() - self._valid_algs = (set(algorithms) if algorithms is not None - else set(self._algorithms)) + self._valid_algs = ( + set(algorithms) if algorithms is not None else set(self._algorithms) + ) # Remove algorithms that aren't on the whitelist for key in list(self._algorithms.keys()): if key not in self._valid_algs: del self._algorithms[key] - if not options: + if options is None: options = {} - - self.options = merge_dict(self._get_default_options(), options) + self.options = {**self._get_default_options(), **options} @staticmethod - def _get_default_options(): - return { - 'verify_signature': True - } + def _get_default_options() -> dict[str, bool]: + return {"verify_signature": True} - def register_algorithm(self, alg_id, alg_obj): + def register_algorithm(self, alg_id: str, alg_obj: Algorithm) -> None: """ Registers a new Algorithm for use when creating and verifying tokens. """ if alg_id in self._algorithms: - raise ValueError('Algorithm already has a handler.') + raise ValueError("Algorithm already has a handler.") if not isinstance(alg_obj, Algorithm): - raise TypeError('Object is not of type `Algorithm`') + raise TypeError("Object is not of type `Algorithm`") self._algorithms[alg_id] = alg_obj self._valid_algs.add(alg_id) - def unregister_algorithm(self, alg_id): + def unregister_algorithm(self, alg_id: str) -> None: """ Unregisters an Algorithm for use when creating and verifying tokens Throws KeyError if algorithm is not registered. """ if alg_id not in self._algorithms: - raise KeyError('The specified algorithm could not be removed' - ' because it is not registered.') + raise KeyError( + "The specified algorithm could not be removed" + " because it is not registered." + ) del self._algorithms[alg_id] self._valid_algs.remove(alg_id) - def get_algorithms(self): + def get_algorithms(self) -> list[str]: """ Returns a list of supported values for the 'alg' parameter. """ return list(self._valid_algs) - def encode(self, - payload, # type: Union[Dict, bytes] - key, # type: str - algorithm='HS256', # type: str - headers=None, # type: Optional[Dict] - json_encoder=None # type: Optional[Callable] - ): + def get_algorithm_by_name(self, alg_name: str) -> Algorithm: + """ + For a given string name, return the matching Algorithm object. + + Example usage: + + >>> jws_obj.get_algorithm_by_name("RS256") + """ + try: + return self._algorithms[alg_name] + except KeyError as e: + if not has_crypto and alg_name in requires_cryptography: + raise NotImplementedError( + f"Algorithm '{alg_name}' could not be found. Do you have cryptography installed?" + ) from e + raise NotImplementedError("Algorithm not supported") from e + + def encode( + self, + payload: bytes, + key: AllowedPrivateKeys | str | bytes, + algorithm: str | None = "HS256", + headers: dict[str, Any] | None = None, + json_encoder: type[json.JSONEncoder] | None = None, + is_payload_detached: bool = False, + sort_headers: bool = True, + ) -> str: segments = [] - if algorithm is None: - algorithm = 'none' + # declare a new var to narrow the type for type checkers + algorithm_: str = algorithm if algorithm is not None else "none" + + # Prefer headers values if present to function parameters. + if headers: + headers_alg = headers.get("alg") + if headers_alg: + algorithm_ = headers["alg"] - if algorithm not in self._valid_algs: - pass + headers_b64 = headers.get("b64") + if headers_b64 is False: + is_payload_detached = True # Header - header = {'typ': self.header_typ, 'alg': algorithm} + header: dict[str, Any] = {"typ": self.header_typ, "alg": algorithm_} if headers: self._validate_headers(headers) header.update(headers) - json_header = force_bytes( - json.dumps( - header, - separators=(',', ':'), - cls=json_encoder - ) - ) + if not header["typ"]: + del header["typ"] + + if is_payload_detached: + header["b64"] = False + elif "b64" in header: + # True is the standard value for b64, so no need for it + del header["b64"] + + json_header = json.dumps( + header, separators=(",", ":"), cls=json_encoder, sort_keys=sort_headers + ).encode() segments.append(base64url_encode(json_header)) - segments.append(base64url_encode(payload)) + + if is_payload_detached: + msg_payload = payload + else: + msg_payload = base64url_encode(payload) + segments.append(msg_payload) # Segments - signing_input = b'.'.join(segments) - try: - alg_obj = self._algorithms[algorithm] - key = alg_obj.prepare_key(key) - signature = alg_obj.sign(signing_input, key) + signing_input = b".".join(segments) - except KeyError: - if not has_crypto and algorithm in requires_cryptography: - raise NotImplementedError( - "Algorithm '%s' could not be found. Do you have cryptography " - "installed?" % algorithm - ) - else: - raise NotImplementedError('Algorithm not supported') + alg_obj = self.get_algorithm_by_name(algorithm_) + key = alg_obj.prepare_key(key) + signature = alg_obj.sign(signing_input, key) segments.append(base64url_encode(signature)) - return b'.'.join(segments) - - def decode(self, - jwt, # type: str - key='', # type: str - verify=True, # type: bool - algorithms=None, # type: List[str] - options=None, # type: Dict - **kwargs): - - merged_options = merge_dict(self.options, options) - verify_signature = merged_options['verify_signature'] + # Don't put the payload content inside the encoded token when detached + if is_payload_detached: + segments[1] = b"" + encoded_string = b".".join(segments) + + return encoded_string.decode("utf-8") + + def decode_complete( + self, + jwt: str | bytes, + key: AllowedPublicKeys | str | bytes = "", + algorithms: list[str] | None = None, + options: dict[str, Any] | None = None, + detached_payload: bytes | None = None, + **kwargs, + ) -> dict[str, Any]: + if kwargs: + warnings.warn( + "passing additional kwargs to decode_complete() is deprecated " + "and will be removed in pyjwt version 3. " + f"Unsupported kwargs: {tuple(kwargs.keys())}", + RemovedInPyjwt3Warning, + ) + if options is None: + options = {} + merged_options = {**self.options, **options} + verify_signature = merged_options["verify_signature"] if verify_signature and not algorithms: - warnings.warn( - 'It is strongly recommended that you pass in a ' + - 'value for the "algorithms" argument when calling decode(). ' + - 'This argument will be mandatory in a future version.', - DeprecationWarning + raise DecodeError( + 'It is required that you pass in a value for the "algorithms" argument when calling decode().' ) payload, signing_input, header, signature = self._load(jwt) - if not verify: - warnings.warn('The verify parameter is deprecated. ' - 'Please use verify_signature in options instead.', - DeprecationWarning, stacklevel=2) - elif verify_signature: - self._verify_signature(payload, signing_input, header, signature, - key, algorithms) + if header.get("b64", True) is False: + if detached_payload is None: + raise DecodeError( + 'It is required that you pass in a value for the "detached_payload" argument to decode a message having the b64 header set to false.' + ) + payload = detached_payload + signing_input = b".".join([signing_input.rsplit(b".", 1)[0], payload]) - return payload + if verify_signature: + self._verify_signature(signing_input, header, signature, key, algorithms) - def get_unverified_header(self, jwt): + return { + "payload": payload, + "header": header, + "signature": signature, + } + + def decode( + self, + jwt: str | bytes, + key: AllowedPublicKeys | str | bytes = "", + algorithms: list[str] | None = None, + options: dict[str, Any] | None = None, + detached_payload: bytes | None = None, + **kwargs, + ) -> Any: + if kwargs: + warnings.warn( + "passing additional kwargs to decode() is deprecated " + "and will be removed in pyjwt version 3. " + f"Unsupported kwargs: {tuple(kwargs.keys())}", + RemovedInPyjwt3Warning, + ) + decoded = self.decode_complete( + jwt, key, algorithms, options, detached_payload=detached_payload + ) + return decoded["payload"] + + def get_unverified_header(self, jwt: str | bytes) -> dict[str, Any]: """Returns back the JWT header parameters as a dict() Note: The signature is not verified so the header parameters @@ -168,75 +246,83 @@ def get_unverified_header(self, jwt): return headers - def _load(self, jwt): - if isinstance(jwt, text_type): - jwt = jwt.encode('utf-8') + def _load(self, jwt: str | bytes) -> tuple[bytes, bytes, dict[str, Any], bytes]: + if isinstance(jwt, str): + jwt = jwt.encode("utf-8") - if not issubclass(type(jwt), binary_type): - raise DecodeError("Invalid token type. Token must be a {0}".format( - binary_type)) + if not isinstance(jwt, bytes): + raise DecodeError(f"Invalid token type. Token must be a {bytes}") try: - signing_input, crypto_segment = jwt.rsplit(b'.', 1) - header_segment, payload_segment = signing_input.split(b'.', 1) - except ValueError: - raise DecodeError('Not enough segments') + signing_input, crypto_segment = jwt.rsplit(b".", 1) + header_segment, payload_segment = signing_input.split(b".", 1) + except ValueError as err: + raise DecodeError("Not enough segments") from err try: header_data = base64url_decode(header_segment) - except (TypeError, binascii.Error): - raise DecodeError('Invalid header padding') + except (TypeError, binascii.Error) as err: + raise DecodeError("Invalid header padding") from err try: - header = json.loads(header_data.decode('utf-8')) + header = json.loads(header_data) except ValueError as e: - raise DecodeError('Invalid header string: %s' % e) + raise DecodeError(f"Invalid header string: {e}") from e - if not isinstance(header, Mapping): - raise DecodeError('Invalid header string: must be a json object') + if not isinstance(header, dict): + raise DecodeError("Invalid header string: must be a json object") try: payload = base64url_decode(payload_segment) - except (TypeError, binascii.Error): - raise DecodeError('Invalid payload padding') + except (TypeError, binascii.Error) as err: + raise DecodeError("Invalid payload padding") from err try: signature = base64url_decode(crypto_segment) - except (TypeError, binascii.Error): - raise DecodeError('Invalid crypto padding') + except (TypeError, binascii.Error) as err: + raise DecodeError("Invalid crypto padding") from err return (payload, signing_input, header, signature) - def _verify_signature(self, payload, signing_input, header, signature, - key='', algorithms=None): - - alg = header.get('alg') + def _verify_signature( + self, + signing_input: bytes, + header: dict[str, Any], + signature: bytes, + key: AllowedPublicKeys | str | bytes = "", + algorithms: list[str] | None = None, + ) -> None: + try: + alg = header["alg"] + except KeyError: + raise InvalidAlgorithmError("Algorithm not specified") - if algorithms is not None and alg not in algorithms: - raise InvalidAlgorithmError('The specified alg value is not allowed') + if not alg or (algorithms is not None and alg not in algorithms): + raise InvalidAlgorithmError("The specified alg value is not allowed") try: - alg_obj = self._algorithms[alg] - key = alg_obj.prepare_key(key) + alg_obj = self.get_algorithm_by_name(alg) + except NotImplementedError as e: + raise InvalidAlgorithmError("Algorithm not supported") from e + prepared_key = alg_obj.prepare_key(key) - if not alg_obj.verify(signing_input, key, signature): - raise InvalidSignatureError('Signature verification failed') - - except KeyError: - raise InvalidAlgorithmError('Algorithm not supported') + if not alg_obj.verify(signing_input, prepared_key, signature): + raise InvalidSignatureError("Signature verification failed") - def _validate_headers(self, headers): - if 'kid' in headers: - self._validate_kid(headers['kid']) + def _validate_headers(self, headers: dict[str, Any]) -> None: + if "kid" in headers: + self._validate_kid(headers["kid"]) - def _validate_kid(self, kid): - if not isinstance(kid, string_types): - raise InvalidTokenError('Key ID header parameter must be a string') + def _validate_kid(self, kid: Any) -> None: + if not isinstance(kid, str): + raise InvalidTokenError("Key ID header parameter must be a string") _jws_global_obj = PyJWS() encode = _jws_global_obj.encode +decode_complete = _jws_global_obj.decode_complete decode = _jws_global_obj.decode register_algorithm = _jws_global_obj.register_algorithm unregister_algorithm = _jws_global_obj.unregister_algorithm +get_algorithm_by_name = _jws_global_obj.get_algorithm_by_name get_unverified_header = _jws_global_obj.get_unverified_header diff --git a/script.module.pyjwt/lib/pyjwt/api_jwt.py b/script.module.pyjwt/lib/pyjwt/api_jwt.py index 85504acf9..48d739ad6 100644 --- a/script.module.pyjwt/lib/pyjwt/api_jwt.py +++ b/script.module.pyjwt/lib/pyjwt/api_jwt.py @@ -1,222 +1,372 @@ +from __future__ import annotations + import json import warnings from calendar import timegm -from datetime import datetime, timedelta -try: - # import required by mypy to perform type checking, not used for normal execution - from typing import Callable, Dict, List, Optional, Union # NOQA -except ImportError: - pass - -from .api_jws import PyJWS -from .algorithms import Algorithm, get_default_algorithms # NOQA -from .compat import Iterable, Mapping, string_types +from collections.abc import Iterable +from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING, Any + +from . import api_jws from .exceptions import ( - DecodeError, ExpiredSignatureError, ImmatureSignatureError, - InvalidAudienceError, InvalidIssuedAtError, - InvalidIssuerError, MissingRequiredClaimError + DecodeError, + ExpiredSignatureError, + ImmatureSignatureError, + InvalidAudienceError, + InvalidIssuedAtError, + InvalidIssuerError, + MissingRequiredClaimError, ) -from .utils import merge_dict +from .warnings import RemovedInPyjwt3Warning +if TYPE_CHECKING: + from .algorithms import AllowedPrivateKeys, AllowedPublicKeys -class PyJWT(PyJWS): - header_type = 'JWT' + +class PyJWT: + def __init__(self, options: dict[str, Any] | None = None) -> None: + if options is None: + options = {} + self.options: dict[str, Any] = {**self._get_default_options(), **options} @staticmethod - def _get_default_options(): - # type: () -> Dict[str, bool] + def _get_default_options() -> dict[str, bool | list[str]]: return { - 'verify_signature': True, - 'verify_exp': True, - 'verify_nbf': True, - 'verify_iat': True, - 'verify_aud': True, - 'verify_iss': True, - 'require_exp': False, - 'require_iat': False, - 'require_nbf': False + "verify_signature": True, + "verify_exp": True, + "verify_nbf": True, + "verify_iat": True, + "verify_aud": True, + "verify_iss": True, + "require": [], } - def encode(self, - payload, # type: Union[Dict, bytes] - key, # type: str - algorithm='HS256', # type: str - headers=None, # type: Optional[Dict] - json_encoder=None # type: Optional[Callable] - ): - # Check that we get a mapping - if not isinstance(payload, Mapping): - raise TypeError('Expecting a mapping object, as JWT only supports ' - 'JSON objects as payloads.') + def encode( + self, + payload: dict[str, Any], + key: AllowedPrivateKeys | str | bytes, + algorithm: str | None = "HS256", + headers: dict[str, Any] | None = None, + json_encoder: type[json.JSONEncoder] | None = None, + sort_headers: bool = True, + ) -> str: + # Check that we get a dict + if not isinstance(payload, dict): + raise TypeError( + "Expecting a dict object, as JWT only supports " + "JSON objects as payloads." + ) # Payload - for time_claim in ['exp', 'iat', 'nbf']: + payload = payload.copy() + for time_claim in ["exp", "iat", "nbf"]: # Convert datetime to a intDate value in known time-format claims if isinstance(payload.get(time_claim), datetime): - payload[time_claim] = timegm(payload[time_claim].utctimetuple()) # type: ignore + payload[time_claim] = timegm(payload[time_claim].utctimetuple()) - json_payload = json.dumps( + json_payload = self._encode_payload( payload, - separators=(',', ':'), - cls=json_encoder - ).encode('utf-8') + headers=headers, + json_encoder=json_encoder, + ) - return super(PyJWT, self).encode( - json_payload, key, algorithm, headers, json_encoder + return api_jws.encode( + json_payload, + key, + algorithm, + headers, + json_encoder, + sort_headers=sort_headers, ) - def decode(self, - jwt, # type: str - key='', # type: str - verify=True, # type: bool - algorithms=None, # type: List[str] - options=None, # type: Dict - **kwargs): + def _encode_payload( + self, + payload: dict[str, Any], + headers: dict[str, Any] | None = None, + json_encoder: type[json.JSONEncoder] | None = None, + ) -> bytes: + """ + Encode a given payload to the bytes to be signed. + + This method is intended to be overridden by subclasses that need to + encode the payload in a different way, e.g. compress the payload. + """ + return json.dumps( + payload, + separators=(",", ":"), + cls=json_encoder, + ).encode("utf-8") + + def decode_complete( + self, + jwt: str | bytes, + key: AllowedPublicKeys | str | bytes = "", + algorithms: list[str] | None = None, + options: dict[str, Any] | None = None, + # deprecated arg, remove in pyjwt3 + verify: bool | None = None, + # could be used as passthrough to api_jws, consider removal in pyjwt3 + detached_payload: bytes | None = None, + # passthrough arguments to _validate_claims + # consider putting in options + audience: str | Iterable[str] | None = None, + issuer: str | None = None, + leeway: float | timedelta = 0, + # kwargs + **kwargs: Any, + ) -> dict[str, Any]: + if kwargs: + warnings.warn( + "passing additional kwargs to decode_complete() is deprecated " + "and will be removed in pyjwt version 3. " + f"Unsupported kwargs: {tuple(kwargs.keys())}", + RemovedInPyjwt3Warning, + ) + options = dict(options or {}) # shallow-copy or initialize an empty dict + options.setdefault("verify_signature", True) - if verify and not algorithms: + # If the user has set the legacy `verify` argument, and it doesn't match + # what the relevant `options` entry for the argument is, inform the user + # that they're likely making a mistake. + if verify is not None and verify != options["verify_signature"]: warnings.warn( - 'It is strongly recommended that you pass in a ' + - 'value for the "algorithms" argument when calling decode(). ' + - 'This argument will be mandatory in a future version.', - DeprecationWarning + "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. " + "The equivalent is setting `verify_signature` to False in the `options` dictionary. " + "This invocation has a mismatch between the kwarg and the option entry.", + category=DeprecationWarning, ) - payload, _, _, _ = self._load(jwt) + if not options["verify_signature"]: + options.setdefault("verify_exp", False) + options.setdefault("verify_nbf", False) + options.setdefault("verify_iat", False) + options.setdefault("verify_aud", False) + options.setdefault("verify_iss", False) - if options is None: - options = {'verify_signature': verify} - else: - options.setdefault('verify_signature', verify) + if options["verify_signature"] and not algorithms: + raise DecodeError( + 'It is required that you pass in a value for the "algorithms" argument when calling decode().' + ) - decoded = super(PyJWT, self).decode( - jwt, key=key, algorithms=algorithms, options=options, **kwargs + decoded = api_jws.decode_complete( + jwt, + key=key, + algorithms=algorithms, + options=options, + detached_payload=detached_payload, ) - try: - payload = json.loads(decoded.decode('utf-8')) - except ValueError as e: - raise DecodeError('Invalid payload string: %s' % e) - if not isinstance(payload, Mapping): - raise DecodeError('Invalid payload string: must be a json object') + payload = self._decode_payload(decoded) - if verify: - merged_options = merge_dict(self.options, options) - self._validate_claims(payload, merged_options, **kwargs) + merged_options = {**self.options, **options} + self._validate_claims( + payload, merged_options, audience=audience, issuer=issuer, leeway=leeway + ) - return payload + decoded["payload"] = payload + return decoded - def _validate_claims(self, payload, options, audience=None, issuer=None, - leeway=0, **kwargs): + def _decode_payload(self, decoded: dict[str, Any]) -> Any: + """ + Decode the payload from a JWS dictionary (payload, signature, header). - if 'verify_expiration' in kwargs: - options['verify_exp'] = kwargs.get('verify_expiration', True) - warnings.warn('The verify_expiration parameter is deprecated. ' - 'Please use verify_exp in options instead.', - DeprecationWarning) + This method is intended to be overridden by subclasses that need to + decode the payload in a different way, e.g. decompress compressed + payloads. + """ + try: + payload = json.loads(decoded["payload"]) + except ValueError as e: + raise DecodeError(f"Invalid payload string: {e}") + if not isinstance(payload, dict): + raise DecodeError("Invalid payload string: must be a json object") + return payload + def decode( + self, + jwt: str | bytes, + key: AllowedPublicKeys | str | bytes = "", + algorithms: list[str] | None = None, + options: dict[str, Any] | None = None, + # deprecated arg, remove in pyjwt3 + verify: bool | None = None, + # could be used as passthrough to api_jws, consider removal in pyjwt3 + detached_payload: bytes | None = None, + # passthrough arguments to _validate_claims + # consider putting in options + audience: str | Iterable[str] | None = None, + issuer: str | None = None, + leeway: float | timedelta = 0, + # kwargs + **kwargs: Any, + ) -> Any: + if kwargs: + warnings.warn( + "passing additional kwargs to decode() is deprecated " + "and will be removed in pyjwt version 3. " + f"Unsupported kwargs: {tuple(kwargs.keys())}", + RemovedInPyjwt3Warning, + ) + decoded = self.decode_complete( + jwt, + key, + algorithms, + options, + verify=verify, + detached_payload=detached_payload, + audience=audience, + issuer=issuer, + leeway=leeway, + ) + return decoded["payload"] + + def _validate_claims( + self, + payload: dict[str, Any], + options: dict[str, Any], + audience=None, + issuer=None, + leeway: float | timedelta = 0, + ) -> None: if isinstance(leeway, timedelta): leeway = leeway.total_seconds() - if not isinstance(audience, (string_types, type(None), Iterable)): - raise TypeError('audience must be a string, iterable, or None') + if audience is not None and not isinstance(audience, (str, Iterable)): + raise TypeError("audience must be a string, iterable or None") self._validate_required_claims(payload, options) - now = timegm(datetime.utcnow().utctimetuple()) + now = datetime.now(tz=timezone.utc).timestamp() - if 'iat' in payload and options.get('verify_iat'): + if "iat" in payload and options["verify_iat"]: self._validate_iat(payload, now, leeway) - if 'nbf' in payload and options.get('verify_nbf'): + if "nbf" in payload and options["verify_nbf"]: self._validate_nbf(payload, now, leeway) - if 'exp' in payload and options.get('verify_exp'): + if "exp" in payload and options["verify_exp"]: self._validate_exp(payload, now, leeway) - if options.get('verify_iss'): + if options["verify_iss"]: self._validate_iss(payload, issuer) - if options.get('verify_aud'): - self._validate_aud(payload, audience) - - def _validate_required_claims(self, payload, options): - if options.get('require_exp') and payload.get('exp') is None: - raise MissingRequiredClaimError('exp') - - if options.get('require_iat') and payload.get('iat') is None: - raise MissingRequiredClaimError('iat') - - if options.get('require_nbf') and payload.get('nbf') is None: - raise MissingRequiredClaimError('nbf') + if options["verify_aud"]: + self._validate_aud( + payload, audience, strict=options.get("strict_aud", False) + ) - def _validate_iat(self, payload, now, leeway): + def _validate_required_claims( + self, + payload: dict[str, Any], + options: dict[str, Any], + ) -> None: + for claim in options["require"]: + if payload.get(claim) is None: + raise MissingRequiredClaimError(claim) + + def _validate_iat( + self, + payload: dict[str, Any], + now: float, + leeway: float, + ) -> None: try: - int(payload['iat']) + iat = int(payload["iat"]) except ValueError: - raise InvalidIssuedAtError('Issued At claim (iat) must be an integer.') - - def _validate_nbf(self, payload, now, leeway): + raise InvalidIssuedAtError("Issued At claim (iat) must be an integer.") + if iat > (now + leeway): + raise ImmatureSignatureError("The token is not yet valid (iat)") + + def _validate_nbf( + self, + payload: dict[str, Any], + now: float, + leeway: float, + ) -> None: try: - nbf = int(payload['nbf']) + nbf = int(payload["nbf"]) except ValueError: - raise DecodeError('Not Before claim (nbf) must be an integer.') + raise DecodeError("Not Before claim (nbf) must be an integer.") if nbf > (now + leeway): - raise ImmatureSignatureError('The token is not yet valid (nbf)') - - def _validate_exp(self, payload, now, leeway): + raise ImmatureSignatureError("The token is not yet valid (nbf)") + + def _validate_exp( + self, + payload: dict[str, Any], + now: float, + leeway: float, + ) -> None: try: - exp = int(payload['exp']) + exp = int(payload["exp"]) except ValueError: - raise DecodeError('Expiration Time claim (exp) must be an' - ' integer.') - - if exp < (now - leeway): - raise ExpiredSignatureError('Signature has expired') - - def _validate_aud(self, payload, audience): - if audience is None and 'aud' not in payload: - return + raise DecodeError("Expiration Time claim (exp) must be an" " integer.") + + if exp <= (now - leeway): + raise ExpiredSignatureError("Signature has expired") + + def _validate_aud( + self, + payload: dict[str, Any], + audience: str | Iterable[str] | None, + *, + strict: bool = False, + ) -> None: + if audience is None: + if "aud" not in payload or not payload["aud"]: + return + # Application did not specify an audience, but + # the token has the 'aud' claim + raise InvalidAudienceError("Invalid audience") - if audience is not None and 'aud' not in payload: + if "aud" not in payload or not payload["aud"]: # Application specified an audience, but it could not be # verified since the token does not contain a claim. - raise MissingRequiredClaimError('aud') + raise MissingRequiredClaimError("aud") - if audience is None and 'aud' in payload: - # Application did not specify an audience, but - # the token has the 'aud' claim - raise InvalidAudienceError('Invalid audience') + audience_claims = payload["aud"] - audience_claims = payload['aud'] + # In strict mode, we forbid list matching: the supplied audience + # must be a string, and it must exactly match the audience claim. + if strict: + # Only a single audience is allowed in strict mode. + if not isinstance(audience, str): + raise InvalidAudienceError("Invalid audience (strict)") + + # Only a single audience claim is allowed in strict mode. + if not isinstance(audience_claims, str): + raise InvalidAudienceError("Invalid claim format in token (strict)") + + if audience != audience_claims: + raise InvalidAudienceError("Audience doesn't match (strict)") + + return - if isinstance(audience_claims, string_types): + if isinstance(audience_claims, str): audience_claims = [audience_claims] if not isinstance(audience_claims, list): - raise InvalidAudienceError('Invalid claim format in token') - if any(not isinstance(c, string_types) for c in audience_claims): - raise InvalidAudienceError('Invalid claim format in token') + raise InvalidAudienceError("Invalid claim format in token") + if any(not isinstance(c, str) for c in audience_claims): + raise InvalidAudienceError("Invalid claim format in token") - if isinstance(audience, string_types): + if isinstance(audience, str): audience = [audience] - if not any(aud in audience_claims for aud in audience): - raise InvalidAudienceError('Invalid audience') + if all(aud not in audience_claims for aud in audience): + raise InvalidAudienceError("Audience doesn't match") - def _validate_iss(self, payload, issuer): + def _validate_iss(self, payload: dict[str, Any], issuer: Any) -> None: if issuer is None: return - if 'iss' not in payload: - raise MissingRequiredClaimError('iss') + if "iss" not in payload: + raise MissingRequiredClaimError("iss") - if payload['iss'] != issuer: - raise InvalidIssuerError('Invalid issuer') + if payload["iss"] != issuer: + raise InvalidIssuerError("Invalid issuer") _jwt_global_obj = PyJWT() encode = _jwt_global_obj.encode +decode_complete = _jwt_global_obj.decode_complete decode = _jwt_global_obj.decode -register_algorithm = _jwt_global_obj.register_algorithm -unregister_algorithm = _jwt_global_obj.unregister_algorithm -get_unverified_header = _jwt_global_obj.get_unverified_header diff --git a/script.module.pyjwt/lib/pyjwt/compat.py b/script.module.pyjwt/lib/pyjwt/compat.py deleted file mode 100644 index e79e258e5..000000000 --- a/script.module.pyjwt/lib/pyjwt/compat.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -The `compat` module provides support for backwards compatibility with older -versions of python, and compatibility wrappers around optional packages. -""" -# flake8: noqa -import hmac -import struct -import sys - - -PY3 = sys.version_info[0] == 3 - - -if PY3: - text_type = str - binary_type = bytes -else: - text_type = unicode - binary_type = str - -string_types = (text_type, binary_type) - -try: - # Importing ABCs from collections will be removed in PY3.8 - from collections.abc import Iterable, Mapping -except ImportError: - from collections import Iterable, Mapping - -try: - constant_time_compare = hmac.compare_digest -except AttributeError: - # Fallback for Python < 2.7 - def constant_time_compare(val1, val2): - """ - Returns True if the two strings are equal, False otherwise. - - The time taken is independent of the number of characters that match. - """ - if len(val1) != len(val2): - return False - - result = 0 - - for x, y in zip(val1, val2): - result |= ord(x) ^ ord(y) - - return result == 0 - -# Use int.to_bytes if it exists (Python 3) -if getattr(int, 'to_bytes', None): - def bytes_from_int(val): - remaining = val - byte_length = 0 - - while remaining != 0: - remaining = remaining >> 8 - byte_length += 1 - - return val.to_bytes(byte_length, 'big', signed=False) -else: - def bytes_from_int(val): - buf = [] - while val: - val, remainder = divmod(val, 256) - buf.append(remainder) - - buf.reverse() - return struct.pack('%sB' % len(buf), *buf) diff --git a/script.module.pyjwt/lib/pyjwt/contrib/__init__.py b/script.module.pyjwt/lib/pyjwt/contrib/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/script.module.pyjwt/lib/pyjwt/contrib/algorithms/__init__.py b/script.module.pyjwt/lib/pyjwt/contrib/algorithms/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/script.module.pyjwt/lib/pyjwt/contrib/algorithms/py_ecdsa.py b/script.module.pyjwt/lib/pyjwt/contrib/algorithms/py_ecdsa.py deleted file mode 100644 index bf0dea5ae..000000000 --- a/script.module.pyjwt/lib/pyjwt/contrib/algorithms/py_ecdsa.py +++ /dev/null @@ -1,60 +0,0 @@ -# Note: This file is named py_ecdsa.py because import behavior in Python 2 -# would cause ecdsa.py to squash the ecdsa library that it depends upon. - -import hashlib - -import ecdsa - -from jwt.algorithms import Algorithm -from jwt.compat import string_types, text_type - - -class ECAlgorithm(Algorithm): - """ - Performs signing and verification operations using - ECDSA and the specified hash function - - This class requires the ecdsa package to be installed. - - This is based off of the implementation in PyJWT 0.3.2 - """ - SHA256 = hashlib.sha256 - SHA384 = hashlib.sha384 - SHA512 = hashlib.sha512 - - def __init__(self, hash_alg): - self.hash_alg = hash_alg - - def prepare_key(self, key): - - if isinstance(key, ecdsa.SigningKey) or \ - isinstance(key, ecdsa.VerifyingKey): - return key - - if isinstance(key, string_types): - if isinstance(key, text_type): - key = key.encode('utf-8') - - # Attempt to load key. We don't know if it's - # a Signing Key or a Verifying Key, so we try - # the Verifying Key first. - try: - key = ecdsa.VerifyingKey.from_pem(key) - except ecdsa.der.UnexpectedDER: - key = ecdsa.SigningKey.from_pem(key) - - else: - raise TypeError('Expecting a PEM-formatted key.') - - return key - - def sign(self, msg, key): - return key.sign(msg, hashfunc=self.hash_alg, - sigencode=ecdsa.util.sigencode_string) - - def verify(self, msg, key, sig): - try: - return key.verify(sig, msg, hashfunc=self.hash_alg, - sigdecode=ecdsa.util.sigdecode_string) - except AssertionError: - return False diff --git a/script.module.pyjwt/lib/pyjwt/contrib/algorithms/pycrypto.py b/script.module.pyjwt/lib/pyjwt/contrib/algorithms/pycrypto.py deleted file mode 100644 index e49cdbfe4..000000000 --- a/script.module.pyjwt/lib/pyjwt/contrib/algorithms/pycrypto.py +++ /dev/null @@ -1,46 +0,0 @@ -import Crypto.Hash.SHA256 -import Crypto.Hash.SHA384 -import Crypto.Hash.SHA512 -from Crypto.PublicKey import RSA -from Crypto.Signature import PKCS1_v1_5 - -from jwt.algorithms import Algorithm -from jwt.compat import string_types, text_type - - -class RSAAlgorithm(Algorithm): - """ - Performs signing and verification operations using - RSASSA-PKCS-v1_5 and the specified hash function. - - This class requires PyCrypto package to be installed. - - This is based off of the implementation in PyJWT 0.3.2 - """ - SHA256 = Crypto.Hash.SHA256 - SHA384 = Crypto.Hash.SHA384 - SHA512 = Crypto.Hash.SHA512 - - def __init__(self, hash_alg): - self.hash_alg = hash_alg - - def prepare_key(self, key): - - if isinstance(key, RSA._RSAobj): - return key - - if isinstance(key, string_types): - if isinstance(key, text_type): - key = key.encode('utf-8') - - key = RSA.importKey(key) - else: - raise TypeError('Expecting a PEM- or RSA-formatted key.') - - return key - - def sign(self, msg, key): - return PKCS1_v1_5.new(key).sign(self.hash_alg.new(msg)) - - def verify(self, msg, key, sig): - return PKCS1_v1_5.new(key).verify(self.hash_alg.new(msg), sig) diff --git a/script.module.pyjwt/lib/pyjwt/exceptions.py b/script.module.pyjwt/lib/pyjwt/exceptions.py index 2a6aa596b..8ac6ecf74 100644 --- a/script.module.pyjwt/lib/pyjwt/exceptions.py +++ b/script.module.pyjwt/lib/pyjwt/exceptions.py @@ -2,6 +2,7 @@ class PyJWTError(Exception): """ Base class for all exceptions """ + pass @@ -46,14 +47,24 @@ class InvalidAlgorithmError(InvalidTokenError): class MissingRequiredClaimError(InvalidTokenError): - def __init__(self, claim): + def __init__(self, claim: str) -> None: self.claim = claim - def __str__(self): - return 'Token is missing the "%s" claim' % self.claim + def __str__(self) -> str: + return f'Token is missing the "{self.claim}" claim' + + +class PyJWKError(PyJWTError): + pass + +class PyJWKSetError(PyJWTError): + pass -# Compatibility aliases (deprecated) -ExpiredSignature = ExpiredSignatureError -InvalidAudience = InvalidAudienceError -InvalidIssuer = InvalidIssuerError + +class PyJWKClientError(PyJWTError): + pass + + +class PyJWKClientConnectionError(PyJWKClientError): + pass diff --git a/script.module.pyjwt/lib/pyjwt/help.py b/script.module.pyjwt/lib/pyjwt/help.py index 55e39ebb2..80b0ca56e 100644 --- a/script.module.pyjwt/lib/pyjwt/help.py +++ b/script.module.pyjwt/lib/pyjwt/help.py @@ -1,30 +1,29 @@ -from __future__ import print_function - import json import platform import sys +from typing import Dict from . import __version__ as pyjwt_version try: import cryptography -except ImportError: - cryptography = None -try: - import ecdsa -except ImportError: - ecdsa = None + cryptography_version = cryptography.__version__ +except ModuleNotFoundError: + cryptography_version = "" -def info(): +def info() -> Dict[str, Dict[str, str]]: """ Generate information for a bug report. Based on the requests package help utility module. """ try: - platform_info = {"system": platform.system(), "release": platform.release()} - except IOError: + platform_info = { + "system": platform.system(), + "release": platform.release(), + } + except OSError: platform_info = {"system": "Unknown", "release": "Unknown"} implementation = platform.python_implementation() @@ -32,27 +31,31 @@ def info(): if implementation == "CPython": implementation_version = platform.python_version() elif implementation == "PyPy": - implementation_version = "%s.%s.%s" % ( - sys.pypy_version_info.major, - sys.pypy_version_info.minor, - sys.pypy_version_info.micro, + pypy_version_info = sys.pypy_version_info # type: ignore[attr-defined] + implementation_version = ( + f"{pypy_version_info.major}." + f"{pypy_version_info.minor}." + f"{pypy_version_info.micro}" ) - if sys.pypy_version_info.releaselevel != "final": + if pypy_version_info.releaselevel != "final": implementation_version = "".join( - [implementation_version, sys.pypy_version_info.releaselevel] + [implementation_version, pypy_version_info.releaselevel] ) else: implementation_version = "Unknown" return { "platform": platform_info, - "implementation": {"name": implementation, "version": implementation_version}, - "cryptography": {"version": getattr(cryptography, "__version__", "")}, + "implementation": { + "name": implementation, + "version": implementation_version, + }, + "cryptography": {"version": cryptography_version}, "pyjwt": {"version": pyjwt_version}, } -def main(): +def main() -> None: """Pretty-print the bug information as JSON.""" print(json.dumps(info(), sort_keys=True, indent=2)) diff --git a/script.module.pyjwt/lib/pyjwt/jwk_set_cache.py b/script.module.pyjwt/lib/pyjwt/jwk_set_cache.py new file mode 100644 index 000000000..243256305 --- /dev/null +++ b/script.module.pyjwt/lib/pyjwt/jwk_set_cache.py @@ -0,0 +1,31 @@ +import time +from typing import Optional + +from .api_jwk import PyJWKSet, PyJWTSetWithTimestamp + + +class JWKSetCache: + def __init__(self, lifespan: int) -> None: + self.jwk_set_with_timestamp: Optional[PyJWTSetWithTimestamp] = None + self.lifespan = lifespan + + def put(self, jwk_set: PyJWKSet) -> None: + if jwk_set is not None: + self.jwk_set_with_timestamp = PyJWTSetWithTimestamp(jwk_set) + else: + # clear cache + self.jwk_set_with_timestamp = None + + def get(self) -> Optional[PyJWKSet]: + if self.jwk_set_with_timestamp is None or self.is_expired(): + return None + + return self.jwk_set_with_timestamp.get_jwk_set() + + def is_expired(self) -> bool: + return ( + self.jwk_set_with_timestamp is not None + and self.lifespan > -1 + and time.monotonic() + > self.jwk_set_with_timestamp.get_timestamp() + self.lifespan + ) diff --git a/script.module.pyjwt/lib/pyjwt/jwks_client.py b/script.module.pyjwt/lib/pyjwt/jwks_client.py new file mode 100644 index 000000000..f19b10acb --- /dev/null +++ b/script.module.pyjwt/lib/pyjwt/jwks_client.py @@ -0,0 +1,124 @@ +import json +import urllib.request +from functools import lru_cache +from ssl import SSLContext +from typing import Any, Dict, List, Optional +from urllib.error import URLError + +from .api_jwk import PyJWK, PyJWKSet +from .api_jwt import decode_complete as decode_token +from .exceptions import PyJWKClientConnectionError, PyJWKClientError +from .jwk_set_cache import JWKSetCache + + +class PyJWKClient: + def __init__( + self, + uri: str, + cache_keys: bool = False, + max_cached_keys: int = 16, + cache_jwk_set: bool = True, + lifespan: int = 300, + headers: Optional[Dict[str, Any]] = None, + timeout: int = 30, + ssl_context: Optional[SSLContext] = None, + ): + if headers is None: + headers = {} + self.uri = uri + self.jwk_set_cache: Optional[JWKSetCache] = None + self.headers = headers + self.timeout = timeout + self.ssl_context = ssl_context + + if cache_jwk_set: + # Init jwt set cache with default or given lifespan. + # Default lifespan is 300 seconds (5 minutes). + if lifespan <= 0: + raise PyJWKClientError( + f'Lifespan must be greater than 0, the input is "{lifespan}"' + ) + self.jwk_set_cache = JWKSetCache(lifespan) + else: + self.jwk_set_cache = None + + if cache_keys: + # Cache signing keys + # Ignore mypy (https://github.com/python/mypy/issues/2427) + self.get_signing_key = lru_cache(maxsize=max_cached_keys)(self.get_signing_key) # type: ignore + + def fetch_data(self) -> Any: + jwk_set: Any = None + try: + r = urllib.request.Request(url=self.uri, headers=self.headers) + with urllib.request.urlopen( + r, timeout=self.timeout, context=self.ssl_context + ) as response: + jwk_set = json.load(response) + except (URLError, TimeoutError) as e: + raise PyJWKClientConnectionError( + f'Fail to fetch data from the url, err: "{e}"' + ) + else: + return jwk_set + finally: + if self.jwk_set_cache is not None: + self.jwk_set_cache.put(jwk_set) + + def get_jwk_set(self, refresh: bool = False) -> PyJWKSet: + data = None + if self.jwk_set_cache is not None and not refresh: + data = self.jwk_set_cache.get() + + if data is None: + data = self.fetch_data() + + if not isinstance(data, dict): + raise PyJWKClientError("The JWKS endpoint did not return a JSON object") + + return PyJWKSet.from_dict(data) + + def get_signing_keys(self, refresh: bool = False) -> List[PyJWK]: + jwk_set = self.get_jwk_set(refresh) + signing_keys = [ + jwk_set_key + for jwk_set_key in jwk_set.keys + if jwk_set_key.public_key_use in ["sig", None] and jwk_set_key.key_id + ] + + if not signing_keys: + raise PyJWKClientError("The JWKS endpoint did not contain any signing keys") + + return signing_keys + + def get_signing_key(self, kid: str) -> PyJWK: + signing_keys = self.get_signing_keys() + signing_key = self.match_kid(signing_keys, kid) + + if not signing_key: + # If no matching signing key from the jwk set, refresh the jwk set and try again. + signing_keys = self.get_signing_keys(refresh=True) + signing_key = self.match_kid(signing_keys, kid) + + if not signing_key: + raise PyJWKClientError( + f'Unable to find a signing key that matches: "{kid}"' + ) + + return signing_key + + def get_signing_key_from_jwt(self, token: str) -> PyJWK: + unverified = decode_token(token, options={"verify_signature": False}) + header = unverified["header"] + return self.get_signing_key(header.get("kid")) + + @staticmethod + def match_kid(signing_keys: List[PyJWK], kid: str) -> Optional[PyJWK]: + signing_key = None + + for key in signing_keys: + if key.key_id == kid: + signing_key = key + break + + return signing_key diff --git a/script.module.pyjwt/lib/pyjwt/types.py b/script.module.pyjwt/lib/pyjwt/types.py new file mode 100644 index 000000000..7d9935205 --- /dev/null +++ b/script.module.pyjwt/lib/pyjwt/types.py @@ -0,0 +1,5 @@ +from typing import Any, Callable, Dict + +JWKDict = Dict[str, Any] + +HashlibHash = Callable[..., Any] diff --git a/script.module.pyjwt/lib/pyjwt/utils.py b/script.module.pyjwt/lib/pyjwt/utils.py index b33c7a2d4..81c5ee41a 100644 --- a/script.module.pyjwt/lib/pyjwt/utils.py +++ b/script.module.pyjwt/lib/pyjwt/utils.py @@ -1,97 +1,80 @@ import base64 import binascii -import struct - -from .compat import binary_type, bytes_from_int, text_type +import re +from typing import Union try: + from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve from cryptography.hazmat.primitives.asymmetric.utils import ( - decode_dss_signature, encode_dss_signature + decode_dss_signature, + encode_dss_signature, ) -except ImportError: +except ModuleNotFoundError: pass -def force_unicode(value): - if isinstance(value, binary_type): - return value.decode('utf-8') - elif isinstance(value, text_type): - return value - else: - raise TypeError('Expected a string value') - - -def force_bytes(value): - if isinstance(value, text_type): - return value.encode('utf-8') - elif isinstance(value, binary_type): +def force_bytes(value: Union[bytes, str]) -> bytes: + if isinstance(value, str): + return value.encode("utf-8") + elif isinstance(value, bytes): return value else: - raise TypeError('Expected a string value') + raise TypeError("Expected a string value") -def base64url_decode(input): - if isinstance(input, text_type): - input = input.encode('ascii') +def base64url_decode(input: Union[bytes, str]) -> bytes: + input_bytes = force_bytes(input) - rem = len(input) % 4 + rem = len(input_bytes) % 4 if rem > 0: - input += b'=' * (4 - rem) + input_bytes += b"=" * (4 - rem) - return base64.urlsafe_b64decode(input) + return base64.urlsafe_b64decode(input_bytes) -def base64url_encode(input): - return base64.urlsafe_b64encode(input).replace(b'=', b'') +def base64url_encode(input: bytes) -> bytes: + return base64.urlsafe_b64encode(input).replace(b"=", b"") -def to_base64url_uint(val): +def to_base64url_uint(val: int) -> bytes: if val < 0: - raise ValueError('Must be a positive integer') + raise ValueError("Must be a positive integer") int_bytes = bytes_from_int(val) if len(int_bytes) == 0: - int_bytes = b'\x00' + int_bytes = b"\x00" return base64url_encode(int_bytes) -def from_base64url_uint(val): - if isinstance(val, text_type): - val = val.encode('ascii') +def from_base64url_uint(val: Union[bytes, str]) -> int: + data = base64url_decode(force_bytes(val)) + return int.from_bytes(data, byteorder="big") - data = base64url_decode(val) - buf = struct.unpack('%sB' % len(data), data) - return int(''.join(["%02x" % byte for byte in buf]), 16) +def number_to_bytes(num: int, num_bytes: int) -> bytes: + padded_hex = "%0*x" % (2 * num_bytes, num) + return binascii.a2b_hex(padded_hex.encode("ascii")) -def merge_dict(original, updates): - if not updates: - return original - - try: - merged_options = original.copy() - merged_options.update(updates) - except (AttributeError, ValueError) as e: - raise TypeError('original and updates must be a dictionary: %s' % e) - - return merged_options +def bytes_to_number(string: bytes) -> int: + return int(binascii.b2a_hex(string), 16) -def number_to_bytes(num, num_bytes): - padded_hex = '%0*x' % (2 * num_bytes, num) - big_endian = binascii.a2b_hex(padded_hex.encode('ascii')) - return big_endian +def bytes_from_int(val: int) -> bytes: + remaining = val + byte_length = 0 + while remaining != 0: + remaining >>= 8 + byte_length += 1 -def bytes_to_number(string): - return int(binascii.b2a_hex(string), 16) + return val.to_bytes(byte_length, "big", signed=False) -def der_to_raw_signature(der_sig, curve): +def der_to_raw_signature(der_sig: bytes, curve: "EllipticCurve") -> bytes: num_bits = curve.key_size num_bytes = (num_bits + 7) // 8 @@ -100,14 +83,74 @@ def der_to_raw_signature(der_sig, curve): return number_to_bytes(r, num_bytes) + number_to_bytes(s, num_bytes) -def raw_to_der_signature(raw_sig, curve): +def raw_to_der_signature(raw_sig: bytes, curve: "EllipticCurve") -> bytes: num_bits = curve.key_size num_bytes = (num_bits + 7) // 8 if len(raw_sig) != 2 * num_bytes: - raise ValueError('Invalid signature') + raise ValueError("Invalid signature") r = bytes_to_number(raw_sig[:num_bytes]) s = bytes_to_number(raw_sig[num_bytes:]) - return encode_dss_signature(r, s) + return bytes(encode_dss_signature(r, s)) + + +# Based on https://github.com/hynek/pem/blob/7ad94db26b0bc21d10953f5dbad3acfdfacf57aa/src/pem/_core.py#L224-L252 +_PEMS = { + b"CERTIFICATE", + b"TRUSTED CERTIFICATE", + b"PRIVATE KEY", + b"PUBLIC KEY", + b"ENCRYPTED PRIVATE KEY", + b"OPENSSH PRIVATE KEY", + b"DSA PRIVATE KEY", + b"RSA PRIVATE KEY", + b"RSA PUBLIC KEY", + b"EC PRIVATE KEY", + b"DH PARAMETERS", + b"NEW CERTIFICATE REQUEST", + b"CERTIFICATE REQUEST", + b"SSH2 PUBLIC KEY", + b"SSH2 ENCRYPTED PRIVATE KEY", + b"X509 CRL", +} + +_PEM_RE = re.compile( + b"----[- ]BEGIN (" + + b"|".join(_PEMS) + + b""")[- ]----\r? +.+?\r? +----[- ]END \\1[- ]----\r?\n?""", + re.DOTALL, +) + + +def is_pem_format(key: bytes) -> bool: + return bool(_PEM_RE.search(key)) + + +# Based on https://github.com/pyca/cryptography/blob/bcb70852d577b3f490f015378c75cba74986297b/src/cryptography/hazmat/primitives/serialization/ssh.py#L40-L46 +_CERT_SUFFIX = b"-cert-v01@openssh.com" +_SSH_PUBKEY_RC = re.compile(rb"\A(\S+)[ \t]+(\S+)") +_SSH_KEY_FORMATS = [ + b"ssh-ed25519", + b"ssh-rsa", + b"ssh-dss", + b"ecdsa-sha2-nistp256", + b"ecdsa-sha2-nistp384", + b"ecdsa-sha2-nistp521", +] + + +def is_ssh_key(key: bytes) -> bool: + if any(string_value in key for string_value in _SSH_KEY_FORMATS): + return True + + ssh_pubkey_match = _SSH_PUBKEY_RC.match(key) + if ssh_pubkey_match: + key_type = ssh_pubkey_match.group(1) + if _CERT_SUFFIX == key_type[-len(_CERT_SUFFIX) :]: + return True + + return False diff --git a/script.module.pyjwt/lib/pyjwt/warnings.py b/script.module.pyjwt/lib/pyjwt/warnings.py new file mode 100644 index 000000000..8762a8cbb --- /dev/null +++ b/script.module.pyjwt/lib/pyjwt/warnings.py @@ -0,0 +1,2 @@ +class RemovedInPyjwt3Warning(DeprecationWarning): + pass diff --git a/script.module.pyjwt/icon.png b/script.module.pyjwt/resources/icon.png similarity index 100% rename from script.module.pyjwt/icon.png rename to script.module.pyjwt/resources/icon.png