From 8d3a225ed5a294b1aaa535ed75260485dd68b6cb Mon Sep 17 00:00:00 2001 From: Chris Wagner Date: Mon, 19 Feb 2024 16:41:43 -0800 Subject: [PATCH] Change auth_tokens to be versionable, extendable, expirable. Older auth_token formats still are accepted. Each auth_token is now versioned and has an optional 'exp' timestamp which will be checked during validity. A new callback SECURITY_TOKEN_EXPIRE_TIMESTAMP can be set to compute whatever the app needs (and it can be per user). Use freezegun for testing. --- CHANGES.rst | 3 ++ docs/configuration.rst | 48 ++++++++++----------- docs/features.rst | 4 ++ flask_security/core.py | 84 +++++++++++++++++-------------------- flask_security/utils.py | 40 +++++++++++++++++- pyproject.toml | 1 + requirements/tests.txt | 1 + tests/conftest.py | 7 ++++ tests/test_basic.py | 92 ++++++++++++++++++++++++++++++----------- tests/test_utils.py | 6 +++ 10 files changed, 189 insertions(+), 97 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index b7d1c2a1..93aa8afe 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -23,6 +23,8 @@ Features & Improvements - (:pr:`881`) No longer rely on Flask-Login.unauthorized callback. See below for implications. - (:pr:`899`) Improve (and simplify) Two-Factor setup. See below for backwards compatability issues and new functionality. - (:issue:`904`) Changes to default unauthorized handler - remove use of referrer header (see below). +- (:pr:`xxx`) The authentication_token format has changed - adding per-token expiry time and future session ID. + Old tokens are still accepted. Docs and Chores @@ -47,6 +49,7 @@ Fixes return an HTML page even if the request was JSON. - (:pr:`914`) It was possible that if :data:`SECURITY_EMAIL_VALIDATOR_ARGS` were set that deliverability would be checked even for login. +- (:issue:`925`) Register with JSON and authentication token failed CSRF. (lilz-egoto) Notes ++++++ diff --git a/docs/configuration.rst b/docs/configuration.rst index 9860b6a0..870dc790 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -222,6 +222,25 @@ These configuration keys are used globally across all features. Default: ``None``, meaning the token never expires. +.. py:data:: SECURITY_TOKEN_EXPIRE_TIMESTAMP + + A callable that returns a unix timestamp in the future when this specific + authentication token should expire. Returning 0 means no expiration. + It is passed the currently authenticated User so any fields can be used + to customize an expiration time. Of course it is called in a request + context so any information about the current request can also be used. + + If BOTH this and :data:`SECURITY_TOKEN_MAX_AGE` are set - the shorter is used. + + .. note:: + These 2 expiry options work differently - with this one, the actual expire + timestamp is in the auth_token. The signed token (using itsdangerous) + has the timestamp the token was generated. On validation, that is checked + against ``SECURITY_TOKEN_MAX_AGE``. So for MAX_AGE, at the time of + validation, the token hasn't yet been associated with a User. + + Default: ``lambda user: 0`` + .. py:data:: SECURITY_EMAIL_VALIDATOR_ARGS Email address are validated and normalized via the ``mail_util_cls`` which @@ -295,32 +314,6 @@ These configuration keys are used globally across all features. .. versionadded:: 4.0.0 -.. py:data:: SECURITY_REDIRECT_VALIDATE_MODE - - Defines how Flask-Security will attempt to mitigate an open redirect - vulnerability w.r.t. client supplied `next` parameters. - Please see :ref:`redirect_topic` for a complete discussion. - - Current options include `"absolute"` and `"regex"`. A list is allowed. - - - Default: ``["absolute"]`` - - .. versionadded:: 4.0.2 - - .. versionchanged:: 5.4.0 - Default is now `"absolute"` and now takes a list. - -.. py:data:: SECURITY_REDIRECT_VALIDATE_RE - - This regex handles known patterns that can be exploited. Basically, - don't allow control characters or white-space followed by slashes (or - back slashes). - - Default: ``r"^/{4,}|\\{3,}|[\s\000-\037][/\\]{2,}(?![/\\])|[/\\]([^/\\]|/[^/\\])*[/\\].*"`` - - .. versionadded:: 4.0.2 - .. py:data:: SECURITY_CSRF_PROTECT_MECHANISMS Authentication mechanisms that require CSRF protection. @@ -466,6 +459,8 @@ These configuration keys are used globally across all features. If set to ``True`` Flask-Security will return generic responses to endpoints that could be used to enumerate users. Please see :ref:`generic_responses`. + Default: ``False`` + .. versionadded:: 5.0.0 Core - Multi-factor @@ -567,6 +562,7 @@ These are used by the Two-Factor and Unified Signin features. - :py:data:`SECURITY_US_SETUP_URL` - :py:data:`SECURITY_TWO_FACTOR_SETUP_URL` - :py:data:`SECURITY_WAN_REGISTER_URL` + - :py:data:`SECURITY_MULTI_FACTOR_RECOVERY_CODES` N.B. To avoid strange behavior, be sure to set the grace period less than the freshness period. diff --git a/docs/features.rst b/docs/features.rst index ec89c38c..d224a552 100644 --- a/docs/features.rst +++ b/docs/features.rst @@ -90,6 +90,10 @@ isolating password changes from authentication tokens. That attribute can be cha Unlike ``fs_uniquifier``, it can be set to ``nullable`` - it will automatically be generated at first use if null. +Authentication tokens have 2 options for specifying expiry time :data:`SECURITY_TOKEN_MAX_AGE` +is applied to ALL authentication tokens. Each authentication token can itself have an embedded +expiry value (settable via the :data:`SECURITY_TOKEN_EXPIRE_TIMESTAMP` callable). + .. _two-factor: Two-factor Authentication diff --git a/flask_security/core.py b/flask_security/core.py index b3acec27..c15de9e5 100644 --- a/flask_security/core.py +++ b/flask_security/core.py @@ -97,6 +97,7 @@ get_request_attr, is_user_authenticated, naive_utcnow, + parse_auth_token, set_request_attr, uia_email_mapper, uia_username_mapper, @@ -258,6 +259,7 @@ "TOKEN_AUTHENTICATION_KEY": "auth_token", "TOKEN_AUTHENTICATION_HEADER": "Authentication-Token", "TOKEN_MAX_AGE": None, + "TOKEN_EXPIRE_TIMESTAMP": lambda user: 0, "CONFIRM_SALT": "confirm-salt", "RESET_SALT": "reset-salt", "LOGIN_SALT": "login-salt", @@ -667,31 +669,15 @@ def _request_loader(request): token = data.get(args_key, token) try: - data = _security.remember_token_serializer.loads( - token, max_age=cv("TOKEN_MAX_AGE") - ) - - # Version 3.x generated tokens that map to data with 3 elements, - # and fs_uniquifier was on last element. - # Version 4.0.0 generates tokens that map to data with only 1 element, - # which maps to fs_uniquifier. - # Here we compute uniquifier_index so that we can pick up correct index for - # matching fs_uniquifier in version 4.0.0 even if token was created with - # version 3.x - uniquifier_index = 0 if len(data) == 1 else 2 - + tdata = parse_auth_token(token) if hasattr(_security.datastore.user_model, "fs_token_uniquifier"): - user = _security.datastore.find_user( - fs_token_uniquifier=data[uniquifier_index] - ) + user = _security.datastore.find_user(fs_token_uniquifier=tdata["uid"]) else: - user = _security.datastore.find_user(fs_uniquifier=data[uniquifier_index]) - if not user.active: - user = None + user = _security.datastore.find_user(fs_uniquifier=tdata["uid"]) except Exception: - user = None + return None - if user and user.verify_auth_token(data): + if user and user.active and user.verify_auth_token(tdata): set_request_attr("fs_authn_via", "token") return user @@ -838,49 +824,55 @@ def get_auth_token(self) -> str | bytes: Optionally use a separate uniquifier so that changing password doesn't invalidate auth tokens. - This data MUST be securely signed using the ``remember_token_serializer`` + The returned value is securely signed using the ``remember_token_serializer`` .. versionchanged:: 4.0.0 If user model has ``fs_token_uniquifier`` - use that (raise ValueError if not set). Otherwise, fallback to using ``fs_uniquifier``. + .. versionchanged:: 5.4.0 + New format - a dict with a version string. Add a token-based expiry + option as well as a session id. """ + tdata: dict[str, t.Any] = dict(ver=str(5)) if hasattr(self, "fs_token_uniquifier"): if not self.fs_token_uniquifier: raise ValueError() - data = [str(self.fs_token_uniquifier)] + tdata["uid"] = str(self.fs_token_uniquifier) else: - data = [str(self.fs_uniquifier)] - return _security.remember_token_serializer.dumps(data) + tdata["uid"] = str(self.fs_uniquifier) + tdata["sid"] = 0 # session id + tdata["exp"] = int(cv("TOKEN_EXPIRE_TIMESTAMP")(self)) # if >0 then shorter of + # :data:SECURITY_MAX_AGE and this. - def verify_auth_token(self, data: str | bytes) -> bool: - """ - Perform additional verification of contents of auth token. - Prior to this being called the token has been validated (via signing) - and has not expired. + # Let application add things + self.augment_auth_token(tdata) - :param data: the data as formulated by :meth:`get_auth_token` + # Serialize and sign + return _security.remember_token_serializer.dumps(tdata) - .. versionadded:: 3.3.0 + def augment_auth_token(self, tdata: dict[str, t.Any]) -> None: + """Override this to add/modify parts of the auth token. + Additions to the dict can be made and verified in verify_auth_token() - .. versionchanged:: 4.0.0 - If user model has ``fs_token_uniquifier`` - use that otherwise - use ``fs_uniquifier``. + .. versionadded:: 5.4.0 """ + return - # Version 3.x generated tokens that map to data with 3 elements, - # and fs_uniquifier was on last element. - # Version 4.0.0 generates tokens that map to data with only 1 element, - # which maps to fs_uniquifier. - # Here we compute uniquifier_index so that we can pick up correct index for - # matching fs_uniquifier in version 4.0.0 even if token was created with - # version 3.x - uniquifier_index = 0 if len(data) == 1 else 2 + def verify_auth_token(self, tdata: dict[str, t.Any]) -> bool: + """ + Override this to perform additional verification of contents of auth token. + Prior to this being called the token has been validated (via signing) + and has not expired (either with MAX_AGE or specific 'exp' value). - if hasattr(self, "fs_token_uniquifier"): - return data[uniquifier_index] == self.fs_token_uniquifier + :param tdata: a dictionary just as in augment_auth_token() - return data[uniquifier_index] == self.fs_uniquifier + .. versionadded:: 3.3.0 + + .. versionchanged:: 5.4.0 + Now receives a dictionary. + """ + return True def has_role(self, role: str | Role) -> bool: """Returns `True` if the user identifies with the specified role. diff --git a/flask_security/utils.py b/flask_security/utils.py index 3ca6f2c6..9b0f56f0 100644 --- a/flask_security/utils.py +++ b/flask_security/utils.py @@ -138,7 +138,7 @@ def find_csrf_field_name(): return None -def is_user_authenticated(user: User) -> bool: +def is_user_authenticated(user: User | None) -> bool: """ return True is user is authenticated. @@ -467,6 +467,44 @@ def do_flash(message: str, category: str) -> None: flash(message, category) +def parse_auth_token(auth_token: str) -> dict[str, t.Any]: + """Parse an authentication token. + This will raise an exception if not properly signed or expired + """ + tdata = dict() + + # This can raise BadSignature or SignatureExpired exceptions from itsdangerous + raw_data = _security.remember_token_serializer.loads( + auth_token, max_age=config_value("TOKEN_MAX_AGE") + ) + + # Version 3.x generated tokens that map to data with 3 elements, + # and fs_uniquifier was on last element. + # Version 4.0.0 generates tokens that map to data with only 1 element, + # which maps to fs_uniquifier. + # Version 5 and up are already a dict (with a version #) + if isinstance(raw_data, dict): + # new format - starting at ver=5 + if not all(k in raw_data for k in ["ver", "uid", "exp", "sid"]): + raise ValueError("Token missing keys") + tdata = raw_data + if ts := tdata.get("exp"): + if ts < int(time.time()): + raise SignatureExpired("token[exp] value expired") + else: + # old tokens that were lists + if len(raw_data) == 1: + # version 4 + tdata["ver"] = "4" + tdata["uid"] = raw_data[0] + else: + # version 3 + tdata["ver"] = "3" + tdata["uid"] = raw_data[2] + + return tdata + + def get_url(endpoint_or_url: str, qparams: dict[str, str] | None = None) -> str: """Returns a URL if a valid endpoint is found. Otherwise, returns the provided value. diff --git a/pyproject.toml b/pyproject.toml index 0acb329c..87d78166 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ low = [ "babel==2.12.1", "bcrypt==4.0.1", "bleach==6.0.0", + "freezegun", "jinja2==3.1.2", "itsdangerous==2.1.2", "markupsafe==2.1.2", diff --git a/requirements/tests.txt b/requirements/tests.txt index 2e86b129..603e175c 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -13,6 +13,7 @@ check-manifest coverage cryptography djlint +freezegun mongoengine mongomock msgcheck diff --git a/tests/conftest.py b/tests/conftest.py index 3b6e2093..db51f4b7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -472,6 +472,13 @@ def get_security_payload(self): # which handles datetime return {"email": str(self.email), "last_update": self.update_datetime} + def augment_auth_token(self, tdata): + # for testing - if TESTING_AUGMENT_AUTH_TOKEN is set - call that + from flask import current_app + + if cb := current_app.config.get("TESTING_AUGMENT_AUTH_TOKEN"): + cb(tdata) + with app.app_context(): db.create_all() diff --git a/tests/test_basic.py b/tests/test_basic.py index 43f4163d..a6d6a8c0 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -9,6 +9,7 @@ """ import base64 +from datetime import datetime, timedelta, timezone import json import re import pytest @@ -18,6 +19,7 @@ from flask_security import uia_email_mapper from flask_security.decorators import auth_required from flask_principal import identity_loaded +from freezegun import freeze_time from tests.test_utils import ( authenticate, @@ -628,6 +630,46 @@ def test_token_auth_invalid_for_session_auth(client): assert response.status_code == 401 +def test_per_user_expired_token(app, client_nc): + # Test expiry in auth_token using callable + with freeze_time("2024-01-01"): + + def exp(user): + assert user.email == "matt@lp.com" + return int((datetime.now(timezone.utc) + timedelta(days=1)).timestamp()) + + app.config["SECURITY_TOKEN_EXPIRE_TIMESTAMP"] = exp + + response = json_authenticate(client_nc) + token = response.json["response"]["user"]["authentication_token"] + + verify_token(client_nc, token, status=401) + + +def test_per_user_not_expired_token(app, client_nc): + # Test expiry in auth_token using callable + def exp(user): + assert user.email == "matt@lp.com" + return int((datetime.now(timezone.utc) + timedelta(days=1)).timestamp()) + + app.config["SECURITY_TOKEN_EXPIRE_TIMESTAMP"] = exp + + response = json_authenticate(client_nc) + token = response.json["response"]["user"]["authentication_token"] + verify_token(client_nc, token) + + +def test_garbled_auth_token(app, client_nc): + # garble token + def augment_auth_token(tdata): + del tdata["exp"] + + app.config["TESTING_AUGMENT_AUTH_TOKEN"] = augment_auth_token + response = json_authenticate(client_nc) + token = response.json["response"]["user"]["authentication_token"] + verify_token(client_nc, token, status=401) + + @pytest.mark.csrf(ignore_unauth=True, csrfprotect=True) def test_token_auth_csrf(client): response = json_authenticate(client) @@ -998,25 +1040,32 @@ def test_change_uniquifier(app, client_nc): verify_token(client_nc, token) -def test_verifying_token_from_version_3x(in_app_context): +def test_verifying_token_from_version_3x(app, client): """ Check token generated with flask security 3.x, which has different form than token from version 4.0.0, can be verified """ + from .test_utils import get_auth_token_version_3x - myapp = in_app_context - populate_data(myapp) + with app.test_request_context("/"): + user = app.security.datastore.find_user(email="matt@lp.com") + token = get_auth_token_version_3x(app, user) + + headers = {"Authentication-Token": token, "Accept": "application/json"} + response = client.get("/profile", headers=headers) + assert response.status_code == 200 - with myapp.test_request_context("/"): - user = myapp.security.datastore.find_user(email="matt@lp.com") - token = get_auth_token_version_3x(myapp, user) +def test_verifying_token_from_version_4x(app, client): + from .test_utils import get_auth_token_version_4x - data = myapp.security.remember_token_serializer.loads( - token, max_age=myapp.config["SECURITY_TOKEN_MAX_AGE"] - ) + with app.test_request_context("/"): + user = app.security.datastore.find_user(email="matt@lp.com") + token = get_auth_token_version_4x(app, user) - assert user.verify_auth_token(data) is True + headers = {"Authentication-Token": token, "Accept": "application/json"} + response = client.get("/profile", headers=headers) + assert response.status_code == 200 def test_change_token_uniquifier(app): @@ -1181,25 +1230,20 @@ def test_no_get_auth_token(app, client): assert "authentication_token" not in response.json["response"]["user"] -def test_auth_token_decorator(in_app_context): +def test_auth_token_decorator(app, client_nc): """ Test accessing endpoint decorated with auth_token_required when using token generated by flask security 3.x algorithm """ + with app.test_request_context("/"): + user = app.security.datastore.find_user(email="matt@lp.com") + token = get_auth_token_version_3x(app, user) - myapp = in_app_context - populate_data(myapp) - myclient_nc = myapp.test_client(use_cookies=False) - - with myapp.test_request_context("/"): - user = myapp.security.datastore.find_user(email="matt@lp.com") - token = get_auth_token_version_3x(myapp, user) - - response = myclient_nc.get( - "/token", - headers={"Content-Type": "application/json", "Authentication-Token": token}, - ) - assert response.status_code == 200 + response = client_nc.get( + "/token", + headers={"Content-Type": "application/json", "Authentication-Token": token}, + ) + assert response.status_code == 200 @pytest.mark.filterwarnings("ignore:.*BACKWARDS_COMPAT_UNAUTHN:DeprecationWarning") diff --git a/tests/test_utils.py b/tests/test_utils.py index ec2d18e9..2976e1ad 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -419,6 +419,12 @@ def get_auth_token_version_3x(app, user): return app.security.remember_token_serializer.dumps(data) +def get_auth_token_version_4x(app, user): + """Copy of algorithm that generated user token in version 4.x- 5.4""" + data = [str(user.fs_uniquifier)] + return app.security.remember_token_serializer.dumps(data) + + class FakeSerializer: def __init__(self, age=None, invalid=False): self.age = age