From 9ca3f37c23cf2fdbff9f1c1f4bcf9a702728d63a Mon Sep 17 00:00:00 2001 From: Chris Wagner Date: Thu, 11 Jul 2024 15:11:48 -0700 Subject: [PATCH] Try a different approach to typing datastores. Rather than define our own User model (just for typing) - change the annotations to use UserMixin and as part of typing define each possible member and type. Add annotations to view_scaffold.py - as a test for the new datastore annotations. add pyright config to pyproject.toml - it throws tons of errors - but view_scaffold not passes. closes #1001 --- docs/models.rst | 44 ++++++ docs/quickstart.rst | 225 +++++++++++++++-------------- docs/two_factor_configurations.rst | 6 +- flask_security/changeable.py | 8 +- flask_security/core.py | 62 +++++++- flask_security/datastore.py | 186 +++++++++--------------- flask_security/forms.py | 20 +-- flask_security/models/fsqla.py | 3 + flask_security/models/fsqla_v2.py | 3 + flask_security/models/fsqla_v3.py | 3 + flask_security/models/sqla.py | 3 + flask_security/proxies.py | 7 +- flask_security/recovery_codes.py | 12 +- flask_security/tf_plugin.py | 17 +-- flask_security/totp.py | 16 +- flask_security/twofactor.py | 9 +- flask_security/unified_signin.py | 6 +- flask_security/utils.py | 10 +- flask_security/webauthn.py | 25 ++-- flask_security/webauthn_util.py | 10 +- mypy.ini | 9 +- pyproject.toml | 5 + tests/test_changeable.py | 6 +- tests/test_registerable.py | 4 +- tests/view_scaffold.py | 25 ++-- tox.ini | 1 - 26 files changed, 398 insertions(+), 327 deletions(-) diff --git a/docs/models.rst b/docs/models.rst index b4aea404..fd0c8187 100644 --- a/docs/models.rst +++ b/docs/models.rst @@ -12,6 +12,20 @@ and `Role` data model. The fields on your models must follow a particular conven depending on the functionality your app requires. Aside from this, you're free to add any additional fields to your model(s) if you want. +.. note:: + The User, Role, and WebAuthn models MUST subclass their respective mixin (along + with any other mixin the datastore requires). The pre-packaged models described + below do this for you. + + .. code-block:: python + + from flask_security import UserMixin, RoleMixin, WebAuthMixin + + class User(, UserMixin): + ... define columns here + + ... same for Role and WebAuthn + Packaged Models ---------------- As more features are added to Flask-Security, the list of required fields and tables grow. @@ -38,6 +52,18 @@ Flask-SQLAlchemy Your application code should import just the required version e.g.:: from flask_security.models import fsqla_v3 as fsqla + from flask_sqlalchemy import SQLAlchemy + + db = SQLAlchemy(app) + + # Define models + fsqla.FsModels.set_db_info(db) + + class Role(db.Model, fsqla.FsRoleMixin): + pass + + class User(db.Model, fsqla.FsUserMixin): + pass A single method ``fsqla.FsModels.set_db_info`` is provided to glue the supplied models to your @@ -48,6 +74,24 @@ Flask-SQLAlchemy-Lite Your application code should import just the required version e.g.:: from flask_security.models import sqla as sqla + from flask_sqlalchemy_lite import SQLAlchemy + + db = SQLAlchemy(app) + + # Define models + class Model(DeclarativeBase): + pass + + # NOTE: call this PRIOR to declaring models + sqla.FsModels.set_db_info(base_model=Model) + + class Role(Model, sqla.FsRoleMixin): + __tablename__ = "Role" + pass + + class User(Model, sqla.FsUserMixin): + __tablename__ = "User" + pass A single method ``sqla.FsModels.set_db_info`` is provided to glue the supplied mixins to your models. This is only needed if you use the packaged models. diff --git a/docs/quickstart.rst b/docs/quickstart.rst index d09dbd5b..3c902169 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -55,9 +55,9 @@ Flask-SQLAlchemy Application ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The following code sample illustrates how to get started as quickly as -possible using Flask-SQLAlchemy and the built-in model mixins: +possible using Flask-SQLAlchemy and the built-in model mixins. -:: +.. code-block:: python import os @@ -104,7 +104,7 @@ possible using Flask-SQLAlchemy and the built-in model mixins: # Setup Flask-Security user_datastore = SQLAlchemyUserDatastore(db, User, Role) - app.security = Security(app, user_datastore) + security = Security(app, user_datastore) # Views @app.route("/") @@ -116,8 +116,8 @@ possible using Flask-SQLAlchemy and the built-in model mixins: with app.app_context(): # Create User to test with db.create_all() - if not app.security.datastore.find_user(email="test@me.com"): - app.security.datastore.create_user(email="test@me.com", password=hash_password("password")) + if not security.datastore.find_user(email="test@me.com"): + security.datastore.create_user(email="test@me.com", password=hash_password("password")) db.session.commit() if __name__ == '__main__': @@ -154,7 +154,7 @@ Note that Flask-SQLAlchemy-Lite is a very thin wrapper above sqlalchemy.orm and just provides session and engine initialization. Everything else is pure sqlalchemy (unlike Flask-SQLAlchemy). -:: +.. code-block:: python import os @@ -203,7 +203,7 @@ pure sqlalchemy (unlike Flask-SQLAlchemy). # Setup Flask-Security user_datastore = FSQLALiteUserDatastore(db, User, Role) - app.security = Security(app, user_datastore) + security = Security(app, user_datastore) # Views @app.route("/") @@ -215,8 +215,8 @@ pure sqlalchemy (unlike Flask-SQLAlchemy). with app.app_context(): # Create User to test with Model.metadata.create_all(db.engine) - if not app.security.datastore.find_user(email="test@me.com"): - app.security.datastore.create_user(email="test@me.com", password=hash_password("password")) + if not security.datastore.find_user(email="test@me.com"): + security.datastore.create_user(email="test@me.com", password=hash_password("password")) db.session.commit() if __name__ == '__main__': @@ -254,97 +254,100 @@ possible using `SQLAlchemy in a declarative way This example shows how to split your application into 3 files: app.py, database.py and models.py. -- app.py :: - - import os - - from flask import Flask, render_template_string - from flask_security import Security, current_user, auth_required, hash_password, \ - SQLAlchemySessionUserDatastore, permissions_accepted - from database import db_session, init_db - from models import User, Role - - # Create app - app = Flask(__name__) - app.config['DEBUG'] = True - - # Generate a nice key using secrets.token_urlsafe() - app.config['SECRET_KEY'] = os.environ.get("SECRET_KEY", 'pf9Wkove4IKEAXvy-cQkeDPhv9Cb3Ag-wyJILbq_dFw') - # Generate a good salt for password hashing using: secrets.SystemRandom().getrandbits(128) - app.config['SECURITY_PASSWORD_SALT'] = os.environ.get("SECURITY_PASSWORD_SALT", '146585145368132386173505678016728509634') - # Don't worry if email has findable domain - app.config["SECURITY_EMAIL_VALIDATOR_ARGS"] = {"check_deliverability": False} - - # manage sessions per request - make sure connections are closed and returned - app.teardown_appcontext(lambda exc: db_session.close()) - - # Setup Flask-Security - user_datastore = SQLAlchemySessionUserDatastore(db_session, User, Role) - app.security = Security(app, user_datastore) - - # Views - @app.route("/") - @auth_required() - def home(): - return render_template_string('Hello {{current_user.email}}!') - - @app.route("/user") - @auth_required() - @permissions_accepted("user-read") - def user_home(): - return render_template_string("Hello {{ current_user.email }} you are a user!") - - # one time setup - with app.app_context(): - init_db() - # Create a user and role to test with - app.security.datastore.find_or_create_role( - name="user", permissions={"user-read", "user-write"} - ) - db_session.commit() - if not app.security.datastore.find_user(email="test@me.com"): - app.security.datastore.create_user(email="test@me.com", - password=hash_password("password"), roles=["user"]) - db_session.commit() - - if __name__ == '__main__': - # run application (can also use flask run) - app.run() - -- database.py :: - - from sqlalchemy import create_engine - from sqlalchemy.orm import scoped_session, sessionmaker - from sqlalchemy.ext.declarative import declarative_base - from flask_security.models import sqla - - engine = create_engine('sqlite:////tmp/test.db') - db_session = scoped_session(sessionmaker(autocommit=False, - autoflush=False, - bind=engine)) - Base = declarative_base() - # This creates the RolesUser table and is where - # you would pass in non-standard tables names. - sqla.FsModels.set_db_info(base_model=Base) - - - def init_db(): - # import all modules here that might define models so that - # they will be registered properly on the metadata. Otherwise - # you will have to import them first before calling init_db() - import models - Base.metadata.create_all(bind=engine) - -- models.py :: - - from database import Base - from flask_security.models import sqla as sqla - - class Role(Base, sqla.FsRoleMixin): - __tablename__ = 'role' - - class User(Base, sqla.FsUserMixin): - __tablename__ = 'user' +- app.py + .. code-block:: python + + import os + + from flask import Flask, render_template_string + from flask_security import Security, current_user, auth_required, hash_password, \ + SQLAlchemySessionUserDatastore, permissions_accepted + from database import db_session, init_db + from models import User, Role + + # Create app + app = Flask(__name__) + app.config['DEBUG'] = True + + # Generate a nice key using secrets.token_urlsafe() + app.config['SECRET_KEY'] = os.environ.get("SECRET_KEY", 'pf9Wkove4IKEAXvy-cQkeDPhv9Cb3Ag-wyJILbq_dFw') + # Generate a good salt for password hashing using: secrets.SystemRandom().getrandbits(128) + app.config['SECURITY_PASSWORD_SALT'] = os.environ.get("SECURITY_PASSWORD_SALT", '146585145368132386173505678016728509634') + # Don't worry if email has findable domain + app.config["SECURITY_EMAIL_VALIDATOR_ARGS"] = {"check_deliverability": False} + + # manage sessions per request - make sure connections are closed and returned + app.teardown_appcontext(lambda exc: db_session.close()) + + # Setup Flask-Security + user_datastore = SQLAlchemySessionUserDatastore(db_session, User, Role) + security = Security(app, user_datastore) + + # Views + @app.route("/") + @auth_required() + def home(): + return render_template_string('Hello {{current_user.email}}!') + + @app.route("/user") + @auth_required() + @permissions_accepted("user-read") + def user_home(): + return render_template_string("Hello {{ current_user.email }} you are a user!") + + # one time setup + with app.app_context(): + init_db() + # Create a user and role to test with + security.datastore.find_or_create_role( + name="user", permissions={"user-read", "user-write"} + ) + db_session.commit() + if not security.datastore.find_user(email="test@me.com"): + security.datastore.create_user(email="test@me.com", + password=hash_password("password"), roles=["user"]) + db_session.commit() + + if __name__ == '__main__': + # run application (can also use flask run) + app.run() + +- database.py + .. code-block:: python + + from sqlalchemy import create_engine + from sqlalchemy.orm import scoped_session, sessionmaker + from sqlalchemy.ext.declarative import declarative_base + from flask_security.models import sqla + + engine = create_engine('sqlite:////tmp/test.db') + db_session = scoped_session(sessionmaker(autocommit=False, + autoflush=False, + bind=engine)) + Base = declarative_base() + # This creates the RolesUser table and is where + # you would pass in non-standard tables names. + sqla.FsModels.set_db_info(base_model=Base) + + + def init_db(): + # import all modules here that might define models so that + # they will be registered properly on the metadata. Otherwise + # you will have to import them first before calling init_db() + import models + Base.metadata.create_all(bind=engine) + +- models.py + .. code-block:: python + + from database import Base + from flask_security.models import sqla as sqla + + class Role(Base, sqla.FsRoleMixin): + __tablename__ = 'role' + + class User(Base, sqla.FsUserMixin): + __tablename__ = 'user' You can run this either with:: @@ -373,9 +376,9 @@ MongoEngine Application The following code sample illustrates how to get started as quickly as possible using MongoEngine (of course you have to install and start up a -local MongoDB instance): +local MongoDB instance). -:: +.. code-block:: python import os @@ -425,7 +428,7 @@ local MongoDB instance): # Setup Flask-Security user_datastore = MongoEngineUserDatastore(db, User, Role) - app.security = Security(app, user_datastore) + security = Security(app, user_datastore) # Views @app.route("/") @@ -442,11 +445,11 @@ local MongoDB instance): # one time setup with app.app_context(): # Create a user and role to test with - app.security.datastore.find_or_create_role( + security.datastore.find_or_create_role( name="user", permissions={"user-read", "user-write"} ) - if not app.security.datastore.find_user(email="test@me.com"): - app.security.datastore.create_user(email="test@me.com", + if not security.datastore.find_user(email="test@me.com"): + security.datastore.create_user(email="test@me.com", password=hash_password("password"), roles=["user"]) if __name__ == '__main__': @@ -472,9 +475,9 @@ Peewee Application ~~~~~~~~~~~~~~~~~~ The following code sample illustrates how to get started as quickly as -possible using Peewee: +possible using Peewee. -:: +.. code-block:: python import os @@ -529,7 +532,7 @@ possible using Peewee: # Setup Flask-Security user_datastore = PeeweeUserDatastore(db, User, Role, UserRoles) - app.security = Security(app, user_datastore) + security = Security(app, user_datastore) # Views @app.route('/') @@ -543,8 +546,8 @@ possible using Peewee: for Model in (Role, User, UserRoles): Model.drop_table(fail_silently=True) Model.create_table(fail_silently=True) - if not app.security.datastore.find_user(email="test@me.com"): - app.security.datastore.create_user(email="test@me.com", password=hash_password("password")) + if not security.datastore.find_user(email="test@me.com"): + security.datastore.create_user(email="test@me.com", password=hash_password("password")) if __name__ == '__main__': app.run() diff --git a/docs/two_factor_configurations.rst b/docs/two_factor_configurations.rst index f42f9de9..4e08782c 100644 --- a/docs/two_factor_configurations.rst +++ b/docs/two_factor_configurations.rst @@ -97,7 +97,7 @@ possible using SQLAlchemy: # Setup Flask-Security user_datastore = SQLAlchemyUserDatastore(db, User, Role) - app.security = Security(app, user_datastore) + security = Security(app, user_datastore) mail = Mail(app) @@ -111,8 +111,8 @@ possible using SQLAlchemy: with app.app_context(): # Create a user to test with db.create_all() - if not app.security.datastore.find_user(email='test@me.com'): - app.security.datastore.create_user(email='test@me.com', password='password') + if not security.datastore.find_user(email='test@me.com'): + security.datastore.create_user(email='test@me.com', password='password') db.session.commit() if __name__ == '__main__': diff --git a/flask_security/changeable.py b/flask_security/changeable.py index dfd05544..145254f7 100644 --- a/flask_security/changeable.py +++ b/flask_security/changeable.py @@ -22,7 +22,7 @@ from .utils import config_value as cv, hash_password, login_user, send_mail if t.TYPE_CHECKING: # pragma: no cover - from .datastore import User + from flask_security import UserMixin def send_password_changed_notice(user): @@ -36,7 +36,7 @@ def send_password_changed_notice(user): def change_user_password( - user: User, password: str | None, notify: bool = True, autologin: bool = True + user: UserMixin, password: str | None, notify: bool = True, autologin: bool = True ) -> None: """Change the specified user's password @@ -73,7 +73,9 @@ def change_user_password( ) -def admin_change_password(user: User, new_passwd: str, notify: bool = True) -> None: +def admin_change_password( + user: UserMixin, new_passwd: str, notify: bool = True +) -> None: """ Administratively change a user's password. Note that this will immediately render the user's existing sessions (and possibly diff --git a/flask_security/core.py b/flask_security/core.py index b0481907..ba11c10e 100644 --- a/flask_security/core.py +++ b/flask_security/core.py @@ -114,7 +114,7 @@ from flask.typing import ResponseValue import flask_login.mixins from authlib.integrations.flask_client import OAuth - from .datastore import Role, User, UserDatastore + from .datastore import UserDatastore # List of authentication mechanisms supported. @@ -815,9 +815,13 @@ class RoleMixin: """Mixin for `Role` model definitions""" if t.TYPE_CHECKING: # pragma: no cover + id: int + name: str + description: str | None + permissions: list[str] | None + update_datetime: datetime - def __init__(self) -> None: - self.permissions: list[str] | None + def __init__(self, **kwargs): ... def __eq__(self, other): return self.name == other or self.name == getattr(other, "name", None) @@ -842,6 +846,35 @@ def get_permissions(self) -> set: class UserMixin(BaseUserMixin): """Mixin for `User` model definitions""" + if t.TYPE_CHECKING: # pragma: no cover + # These are defined in the applications Model files. + id: int + email: str + username: str | None + password: str | None + active: bool + fs_uniquifier: str + fs_token_uniquifier: str + fs_webauthn_user_handle: str + confirmed_at: datetime | None + last_login_at: datetime + current_login_at: datetime + last_login_ip: str | None + current_login_ip: str | None + login_count: int + tf_primary_method: str | None + tf_totp_secret: str | None + tf_phone_number: str | None + mf_recovery_codes: list[str] | None + us_phone_number: str | None + us_totp_secrets: str | bytes | None + create_datetime: datetime + update_datetime: datetime + roles: list[RoleMixin] + webauthn: list[WebAuthnMixin] + + def __init__(self, **kwargs): ... + def get_id(self) -> str: """Returns the user identification attribute. 'Alternative-token' for Flask-Login. This is always ``fs_uniquifier``. @@ -924,7 +957,7 @@ def verify_auth_token(self, tdata: dict[str, t.Any]) -> bool: """ return True - def has_role(self, role: str | Role) -> bool: + def has_role(self, role: str | RoleMixin) -> bool: """Returns `True` if the user identifies with the specified role. :param role: A role name or `Role` instance""" @@ -1049,6 +1082,22 @@ def tf_send_security_token(self, method: str, **kwargs: t.Any) -> str | None: class WebAuthnMixin: + if t.TYPE_CHECKING: # pragma: no cover + # These are defined in the applications Model files. + id: int + name: str + credential_id: bytes + public_key: bytes + sign_count: int + transports: list[str] | None + backup_state: bool + device_type: str + extensions: str | None + lastuse_datetime: datetime + usage: str + + def __init__(self, **kwargs): ... + def get_user_mapping(self) -> dict[str, t.Any]: """ Return the filter needed by find_user() to get the user @@ -1290,7 +1339,7 @@ def __init__( default_unauthz_handler ) self._render_json: t.Callable[ - [dict[str, t.Any], int, dict[str, str] | None, User | None], + [dict[str, t.Any], int, dict[str, str] | None, UserMixin | None], ResponseValue, ] = default_render_json self._want_json: t.Callable[[Request], bool] = default_want_json @@ -1324,6 +1373,7 @@ def __init__( self._password_util: PasswordUtil self._totp_factory: Totp self._username_util: UsernameUtil + self._webauthn_util: WebauthnUtil self._mf_recovery_codes_util: MfRecoveryCodesUtil # Add necessary attributes here to keep mypy happy @@ -1788,7 +1838,7 @@ def set_form_info(self, name: str, form_info: FormInfo) -> None: def render_json( self, cb: t.Callable[ - [dict[str, t.Any], int, dict[str, str] | None, User | None], + [dict[str, t.Any], int, dict[str, str] | None, UserMixin | None], ResponseValue, ], ) -> None: diff --git a/flask_security/datastore.py b/flask_security/datastore.py index da1039f7..225f5138 100644 --- a/flask_security/datastore.py +++ b/flask_security/datastore.py @@ -11,12 +11,12 @@ from __future__ import annotations -from datetime import datetime import json import typing as t import uuid from copy import copy +from .core import UserMixin, RoleMixin, WebAuthnMixin from .utils import config_value as cv if t.TYPE_CHECKING: # pragma: no cover @@ -187,9 +187,9 @@ class UserDatastore: def __init__( self, - user_model: t.Type[User], - role_model: t.Type[Role], - webauthn_model: t.Type[WebAuthn] | None = None, + user_model: t.Type[UserMixin], + role_model: t.Type[RoleMixin], + webauthn_model: t.Type[WebAuthnMixin] | None = None, ): self.user_model = user_model self.role_model = role_model @@ -203,7 +203,10 @@ def delete(self, model): def put(self, model): pass - def _prepare_role_modify_args(self, role: str | Role) -> Role | None: + def commit(self): + pass + + def _prepare_role_modify_args(self, role: str | RoleMixin) -> RoleMixin | None: if isinstance(role, str): return self.find_role(role) return role @@ -224,7 +227,9 @@ def _prepare_create_user_args(self, **kwargs): return kwargs - def find_user(self, case_insensitive: bool = False, **kwargs: t.Any) -> User | None: + def find_user( + self, case_insensitive: bool = False, **kwargs: t.Any + ) -> UserMixin | None: """Returns a user matching the provided parameters. A single kwarg will be poped and used to filter results. This should be a unique/primary key in the User model since only a single result is @@ -232,11 +237,11 @@ def find_user(self, case_insensitive: bool = False, **kwargs: t.Any) -> User | N """ raise NotImplementedError - def find_role(self, role: str) -> Role | None: + def find_role(self, role: str) -> RoleMixin | None: """Returns a role matching the provided name.""" raise NotImplementedError - def add_role_to_user(self, user: User, role: Role | str) -> bool: + def add_role_to_user(self, user: UserMixin, role: RoleMixin | str) -> bool: """Adds a role to a user. :param user: The user to manipulate. @@ -252,7 +257,7 @@ def add_role_to_user(self, user: User, role: Role | str) -> bool: return True return False - def remove_role_from_user(self, user: User, role: Role | str) -> bool: + def remove_role_from_user(self, user: UserMixin, role: RoleMixin | str) -> bool: """Removes a role from a user. :param user: The user to manipulate. Can be an User object or email @@ -270,7 +275,7 @@ def remove_role_from_user(self, user: User, role: Role | str) -> bool: return rv def add_permissions_to_role( - self, role: Role | str, permissions: set | list | tuple | str + self, role: RoleMixin | str, permissions: set | list | tuple | str ) -> bool: """Add one or more permissions to role. @@ -298,7 +303,7 @@ def add_permissions_to_role( return rv def remove_permissions_from_role( - self, role: Role | str, permissions: set | list | tuple | str + self, role: RoleMixin | str, permissions: set | list | tuple | str ) -> bool: """Remove one or more permissions from a role. @@ -324,13 +329,13 @@ def remove_permissions_from_role( self.put(role_obj) return rv - def toggle_active(self, user: User) -> bool: + def toggle_active(self, user: UserMixin) -> bool: """Toggles a user's active status. Always returns True.""" user.active = not user.active self.put(user) return True - def deactivate_user(self, user: User) -> bool: + def deactivate_user(self, user: UserMixin) -> bool: """Deactivates a specified user. Returns `True` if a change was made. This will immediately disallow access to all endpoints that require @@ -345,7 +350,7 @@ def deactivate_user(self, user: User) -> bool: return True return False - def activate_user(self, user: User) -> bool: + def activate_user(self, user: UserMixin) -> bool: """Activates a specified user. Returns `True` if a change was made. :param user: The user to activate @@ -356,7 +361,7 @@ def activate_user(self, user: User) -> bool: return True return False - def set_uniquifier(self, user: User, uniquifier: str | None = None) -> None: + def set_uniquifier(self, user: UserMixin, uniquifier: str | None = None) -> None: """Set user's Flask-Security identity key. This will immediately render outstanding auth tokens, session cookies and remember cookies invalid. @@ -371,7 +376,9 @@ def set_uniquifier(self, user: User, uniquifier: str | None = None) -> None: user.fs_uniquifier = uniquifier self.put(user) - def set_token_uniquifier(self, user: User, uniquifier: str | None = None) -> None: + def set_token_uniquifier( + self, user: UserMixin, uniquifier: str | None = None + ) -> None: """Set user's auth token identity key. This will immediately render outstanding auth tokens invalid. @@ -389,7 +396,7 @@ def set_token_uniquifier(self, user: User, uniquifier: str | None = None) -> Non user.fs_token_uniquifier = uniquifier self.put(user) - def create_role(self, **kwargs: t.Any) -> Role: + def create_role(self, **kwargs: t.Any) -> RoleMixin: """ Creates and returns a new role from the given parameters. Supported params (depending on RoleModel): @@ -416,13 +423,13 @@ def create_role(self, **kwargs: t.Any) -> Role: role = self.role_model(**kwargs) return self.put(role) - def find_or_create_role(self, name: str, **kwargs: t.Any) -> Role: + def find_or_create_role(self, name: str, **kwargs: t.Any) -> RoleMixin: """Returns a role matching the given name or creates it with any additionally provided parameters. """ return self.find_role(name) or self.create_role(name=name, **kwargs) - def create_user(self, **kwargs: t.Any) -> User: + def create_user(self, **kwargs: t.Any) -> UserMixin: """Creates and returns a new user from the given parameters. :kwparam email: required. @@ -464,14 +471,14 @@ def create_user(self, **kwargs: t.Any) -> User: user = self.user_model(**kwargs) return self.put(user) - def delete_user(self, user: User) -> None: + def delete_user(self, user: UserMixin) -> None: """Deletes the specified user. :param user: The user to delete """ self.delete(user) # type: ignore - def reset_user_access(self, user: User) -> None: + def reset_user_access(self, user: UserMixin) -> None: """ Use this method to reset user authentication methods in the case of compromise. This will: @@ -512,7 +519,7 @@ def reset_user_access(self, user: User) -> None: def tf_set( self, - user: User, + user: UserMixin, primary_method: str, totp_secret: str | None = None, phone: str | None = None, @@ -544,7 +551,7 @@ def tf_set( if changed: self.put(user) - def tf_reset(self, user: User) -> None: + def tf_reset(self, user: UserMixin) -> None: """Disable two-factor auth for user. .. versionadded: 3.4.1 @@ -554,7 +561,7 @@ def tf_reset(self, user: User) -> None: user.tf_phone_number = None self.put(user) - def mf_set_recovery_codes(self, user: User, rcs: list[str] | None) -> None: + def mf_set_recovery_codes(self, user: UserMixin, rcs: list[str] | None) -> None: """Set MF recovery codes into user record. Any existing codes will be erased. @@ -563,11 +570,11 @@ def mf_set_recovery_codes(self, user: User, rcs: list[str] | None) -> None: user.mf_recovery_codes = rcs self.put(user) - def mf_get_recovery_codes(self, user: User) -> list[str]: + def mf_get_recovery_codes(self, user: UserMixin) -> list[str]: codes = getattr(user, "mf_recovery_codes", []) return codes if codes else [] - def mf_delete_recovery_code(self, user: User, idx: int) -> bool: + def mf_delete_recovery_code(self, user: UserMixin, idx: int) -> bool: """Delete a single recovery code. Recovery codes are single-use - so delete after using! @@ -584,7 +591,7 @@ def mf_delete_recovery_code(self, user: User, idx: int) -> bool: except IndexError: return False - def us_get_totp_secrets(self, user: User) -> dict[str, str]: + def us_get_totp_secrets(self, user: UserMixin) -> dict[str, str]: """Return totp secrets. These are json encoded in the DB. @@ -596,7 +603,9 @@ def us_get_totp_secrets(self, user: User) -> dict[str, str]: return {} return json.loads(user.us_totp_secrets) - def us_put_totp_secrets(self, user: User, secrets: dict[str, str] | None) -> None: + def us_put_totp_secrets( + self, user: UserMixin, secrets: dict[str, str] | None + ) -> None: """Save secrets. Assume to be a dict (or None) with keys as methods, and values as (encrypted) secrets. @@ -607,7 +616,7 @@ def us_put_totp_secrets(self, user: User, secrets: dict[str, str] | None) -> Non def us_set( self, - user: User, + user: UserMixin, method: str, totp_secret: str | None = None, phone: str | None = None, @@ -633,7 +642,7 @@ def us_set( user.us_phone_number = phone self.put(user) - def us_reset(self, user: User, method: str | None = None) -> None: + def us_reset(self, user: UserMixin, method: str | None = None) -> None: """Disable unified sign in for user. This will disable authenticator app and SMS, and email. N.B. if user has no password they may not be able to authenticate at all. @@ -657,7 +666,7 @@ def us_reset(self, user: User, method: str | None = None) -> None: user.us_phone_number = None self.put(user) - def us_setup_email(self, user: User) -> bool: + def us_setup_email(self, user: UserMixin) -> bool: # setup email (if allowed) for user for unified sign in. from .proxies import _security @@ -669,7 +678,7 @@ def us_setup_email(self, user: User) -> bool: return True def set_webauthn_user_handle( - self, user: User, user_handle: str | None = None + self, user: UserMixin, user_handle: str | None = None ) -> None: """Set the value for the Relaying Party's (that's us) UserHandle (user.id) @@ -682,7 +691,7 @@ def set_webauthn_user_handle( def create_webauthn( self, - user: User, + user: UserMixin, credential_id: bytes, public_key: bytes, name: str, @@ -703,20 +712,20 @@ def create_webauthn( """ raise NotImplementedError - def delete_webauthn(self, webauthn: WebAuthn) -> None: + def delete_webauthn(self, webauthn: WebAuthnMixin) -> None: """ .. versionadded: 5.0.0 """ self.delete(webauthn) - def find_webauthn(self, credential_id: bytes) -> WebAuthn | None: + def find_webauthn(self, credential_id: bytes) -> WebAuthnMixin | None: """Returns a credential matching the id. .. versionadded: 5.0.0 """ raise NotImplementedError - def find_user_from_webauthn(self, webauthn: WebAuthn) -> User | None: + def find_user_from_webauthn(self, webauthn: WebAuthnMixin) -> UserMixin | None: """Returns user associated with this webauthn credential .. versionadded: 5.0.0 @@ -726,7 +735,7 @@ def find_user_from_webauthn(self, webauthn: WebAuthn) -> User | None: user_filter = webauthn.get_user_mapping() return self.find_user(**user_filter) - def webauthn_reset(self, user: User) -> None: + def webauthn_reset(self, user: UserMixin) -> None: """Reset access via webauthn credentials. This will DELETE all registered credentials. There doesn't appear to be any reason to change the user's @@ -754,14 +763,16 @@ class SQLAlchemyUserDatastore(SQLAlchemyDatastore, UserDatastore): def __init__( self, db: flask_sqlalchemy.SQLAlchemy, - user_model: t.Type[User], - role_model: t.Type[Role], - webauthn_model: t.Type[WebAuthn] | None = None, + user_model: t.Type[UserMixin], + role_model: t.Type[RoleMixin], + webauthn_model: t.Type[WebAuthnMixin] | None = None, ): SQLAlchemyDatastore.__init__(self, db) UserDatastore.__init__(self, user_model, role_model, webauthn_model) - def find_user(self, case_insensitive: bool = False, **kwargs: t.Any) -> User | None: + def find_user( + self, case_insensitive: bool = False, **kwargs: t.Any + ) -> UserMixin | None: from sqlalchemy import func, select from sqlalchemy.orm import joinedload @@ -779,14 +790,14 @@ def find_user(self, case_insensitive: bool = False, **kwargs: t.Any) -> User | N stmt = stmt.where(val == value) # type: ignore[arg-type] return self.db.session.scalar(stmt) - def find_role(self, role: str) -> Role | None: + def find_role(self, role: str) -> RoleMixin | None: from sqlalchemy import select return self.db.session.scalar( select(self.role_model).where(self.role_model.name == role) # type: ignore ) - def find_webauthn(self, credential_id: bytes) -> WebAuthn | None: + def find_webauthn(self, credential_id: bytes) -> WebAuthnMixin | None: from sqlalchemy import select if not self.webauthn_model: # pragma: no cover @@ -800,7 +811,7 @@ def find_webauthn(self, credential_id: bytes) -> WebAuthn | None: def create_webauthn( self, - user: User, + user: UserMixin, credential_id: bytes, public_key: bytes, name: str, @@ -849,9 +860,9 @@ class SQLAlchemySessionUserDatastore(SQLAlchemyUserDatastore, SQLAlchemyDatastor def __init__( self, session: sqlalchemy.orm.scoping.scoped_session, - user_model: t.Type[User], - role_model: t.Type[Role], - webauthn_model: t.Type[WebAuthn] | None = None, + user_model: t.Type[UserMixin], + role_model: t.Type[RoleMixin], + webauthn_model: t.Type[WebAuthnMixin] | None = None, ): class PretendFlaskSQLAlchemyDb: """This is a pretend db object, so we can just pass in a session.""" @@ -885,9 +896,9 @@ class FSQLALiteUserDatastore(SQLAlchemyUserDatastore, UserDatastore): def __init__( self, db: flask_sqlalchemy_lite.SQLAlchemy, - user_model: t.Type[User], - role_model: t.Type[Role], - webauthn_model: t.Type[WebAuthn] | None = None, + user_model: t.Type[UserMixin], + role_model: t.Type[RoleMixin], + webauthn_model: t.Type[WebAuthnMixin] | None = None, ): SQLAlchemyUserDatastore.__init__( self, @@ -913,9 +924,9 @@ class MongoEngineUserDatastore(MongoEngineDatastore, UserDatastore): def __init__( self, db: mongoengine.connection, - user_model: t.Type[User], - role_model: t.Type[Role], - webauthn_model: t.Type[WebAuthn] | None = None, + user_model: t.Type[UserMixin], + role_model: t.Type[RoleMixin], + webauthn_model: t.Type[WebAuthnMixin] | None = None, ): MongoEngineDatastore.__init__(self, db) UserDatastore.__init__(self, user_model, role_model, webauthn_model) @@ -945,7 +956,7 @@ def find_user(self, case_insensitive=False, **kwargs): def find_role(self, role): return self.role_model.objects(name=role).first() - def find_webauthn(self, credential_id: bytes) -> WebAuthn | None: + def find_webauthn(self, credential_id: bytes) -> WebAuthnMixin | None: if not self.webauthn_model: raise NotImplementedError @@ -956,7 +967,7 @@ def find_webauthn(self, credential_id: bytes) -> WebAuthn | None: def create_webauthn( self, - user: User, + user: UserMixin, credential_id: bytes, public_key: bytes, name: str, @@ -1093,7 +1104,7 @@ def find_webauthn(self, credential_id): def create_webauthn( self, - user: User, + user: UserMixin, credential_id: bytes, public_key: bytes, name: str, @@ -1172,66 +1183,3 @@ def create_user(self, **kwargs): @with_pony_session def create_role(self, **kwargs): return super().create_role(**kwargs) - - -if t.TYPE_CHECKING: # pragma: no cover - # Normally - the application creates the Models and glues them together - # For typing we do that here since we don't know which DB interface they - # will pick. - from .core import UserMixin, RoleMixin, WebAuthnMixin - - class CanonicalUserDatastore(Datastore, UserDatastore): - pass - - class User(UserMixin): - id: int - email: str - username: str | None - password: str | None - active: bool - fs_uniquifier: str - fs_token_uniquifier: str - fs_webauthn_user_handle: str - confirmed_at: datetime | None - last_login_at: datetime - current_login_at: datetime - last_login_ip: str | None - current_login_ip: str | None - login_count: int - tf_primary_method: str | None - tf_totp_secret: str | None - tf_phone_number: str | None - mf_recovery_codes: list[str] | None - us_phone_number: str | None - us_totp_secrets: str | bytes | None - create_datetime: datetime - update_datetime: datetime - roles: list[Role] - webauthn: list[WebAuthn] - - def __init__(self, **kwargs): ... - - class Role(RoleMixin): - id: int - name: str - description: str | None - permissions: list[str] | None - update_datetime: datetime - - def __init__(self, **kwargs): ... - - class WebAuthn(WebAuthnMixin): - id: int - name: str - credential_id: bytes - public_key: bytes - sign_count: int - transports: list[str] | None - backup_state: bool - device_type: str - extensions: str | None - lastuse_datetime: datetime - user_id: int - usage: str - - def __init__(self, **kwargs): ... diff --git a/flask_security/forms.py b/flask_security/forms.py index d837dfc2..5d4897d8 100644 --- a/flask_security/forms.py +++ b/flask_security/forms.py @@ -56,7 +56,7 @@ ) if t.TYPE_CHECKING: # pragma: no cover - from .datastore import User + from flask_security import UserMixin _default_field_labels = { "email": _("Email Address"), @@ -436,7 +436,7 @@ class SendConfirmationForm(Form, UserEmailFormMixin): def __init__(self, *args: t.Any, **kwargs: t.Any): super().__init__(*args, **kwargs) - self.user: User | None = None # set by valid_user_email + self.user: UserMixin | None = None # set by valid_user_email if request and request.method == "GET": self.email.data = request.args.get("email", None) @@ -458,7 +458,7 @@ class ForgotPasswordForm(Form, UserEmailFormMixin): def __init__(self, *args: t.Any, **kwargs: t.Any): super().__init__(*args, **kwargs) self.requires_confirmation: bool = False - self.user: User | None = None # set by valid_user_email + self.user: UserMixin | None = None # set by valid_user_email def validate(self, **kwargs: t.Any) -> bool: if not super().validate(**kwargs): @@ -487,7 +487,7 @@ class PasswordlessLoginForm(Form): def __init__(self, *args: t.Any, **kwargs: t.Any): super().__init__(*args, **kwargs) - self.user: User | None = None # set by valid_user_email + self.user: UserMixin | None = None # set by valid_user_email def validate(self, **kwargs: t.Any) -> bool: if not super().validate(**kwargs): @@ -527,7 +527,7 @@ def __init__(self, *args: t.Any, **kwargs: t.Any): ) self.password.description = html self.requires_confirmation: bool = False - self.user: User | None = None + self.user: UserMixin | None = None # ifield can be set by subclasses to skip identity checks. self.ifield: Field | None = None # If True then user has authenticated so we can show detailed errors @@ -602,9 +602,9 @@ class VerifyForm(Form, PasswordFormMixin): submit = SubmitField(get_form_field_label("verify_password")) - def __init__(self, *args: t.Any, user: User, **kwargs: t.Any): + def __init__(self, *args: t.Any, user: UserMixin, **kwargs: t.Any): super().__init__(*args, **kwargs) - self.user: User = user + self.user: UserMixin = user def validate(self, **kwargs: t.Any) -> bool: if not super().validate(**kwargs): # pragma: no cover @@ -705,7 +705,7 @@ class ResetPasswordForm(Form, NewPasswordFormMixin, PasswordConfirmFormMixin): """The default reset password form""" # filled in by caller - user: User + user: UserMixin submit = SubmitField(get_form_field_label("reset_password")) @@ -827,7 +827,7 @@ def __init__(self, *args: t.Any, **kwargs: t.Any): self.window: int = 0 self.primary_method: str = "" self.tf_totp_secret: str = "" - self.user: User | None = None # set by view + self.user: UserMixin | None = None # set by view def validate(self, **kwargs: t.Any) -> bool: if not super().validate(**kwargs): # pragma: no cover @@ -876,7 +876,7 @@ class DummyForm(Form): def __init__(self, *args: t.Any, **kwargs: t.Any): super().__init__(*args, **kwargs) - self.user: User | None = kwargs.get("user", None) + self.user: UserMixin | None = kwargs.get("user", None) def build_form_from_request(form_name: str, **kwargs: dict[str, t.Any]) -> Form: diff --git a/flask_security/models/fsqla.py b/flask_security/models/fsqla.py index 5a6e4839..8b445f77 100644 --- a/flask_security/models/fsqla.py +++ b/flask_security/models/fsqla.py @@ -11,6 +11,9 @@ a new version needs to be created. """ +# mypy: disable-error-code="assignment" +# pyright: reportAssignmentType = false, reportIncompatibleVariableOverride=false + from typing import cast from sqlalchemy import ( Boolean, diff --git a/flask_security/models/fsqla_v2.py b/flask_security/models/fsqla_v2.py index 91814aa4..18f1eebb 100644 --- a/flask_security/models/fsqla_v2.py +++ b/flask_security/models/fsqla_v2.py @@ -13,6 +13,9 @@ - Make username unique (but not required). """ +# mypy: disable-error-code="assignment" +# pyright: reportAssignmentType = false, reportIncompatibleVariableOverride=false + from sqlalchemy import Column, String, Text from sqlalchemy.ext.declarative import declared_attr diff --git a/flask_security/models/fsqla_v3.py b/flask_security/models/fsqla_v3.py index 8b3a938b..d2834b6f 100644 --- a/flask_security/models/fsqla_v3.py +++ b/flask_security/models/fsqla_v3.py @@ -16,6 +16,9 @@ - Add support for list types. """ +# mypy: disable-error-code="assignment" +# pyright: reportAssignmentType = false, reportIncompatibleVariableOverride=false + from sqlalchemy import ( Boolean, Column, diff --git a/flask_security/models/sqla.py b/flask_security/models/sqla.py index 250f349d..3b8163ff 100644 --- a/flask_security/models/sqla.py +++ b/flask_security/models/sqla.py @@ -13,6 +13,9 @@ a new version needs to be created. """ +# mypy: disable-error-code="assignment" +# pyright: reportAssignmentType = false, reportIncompatibleVariableOverride=false + from __future__ import annotations import datetime diff --git a/flask_security/proxies.py b/flask_security/proxies.py index c1a761af..d1ec3c4b 100644 --- a/flask_security/proxies.py +++ b/flask_security/proxies.py @@ -1,4 +1,4 @@ -# Copyright 2021 by J. Christopher Wagner (jwag). All rights reserved. +# Copyright 2021-2024 by J. Christopher Wagner (jwag). All rights reserved. import typing as t @@ -7,15 +7,14 @@ if t.TYPE_CHECKING: # pragma: no cover from passlib.context import CryptContext - from .core import Security - from .datastore import CanonicalUserDatastore + from .core import Security, UserDatastore # Convenient references _security: "Security" = LocalProxy( # type: ignore lambda: current_app.extensions["security"] ) -_datastore: "CanonicalUserDatastore" = LocalProxy( # type:ignore +_datastore: "UserDatastore" = LocalProxy( # type:ignore lambda: _security.datastore ) diff --git a/flask_security/recovery_codes.py b/flask_security/recovery_codes.py index 4193b529..7f4d4693 100644 --- a/flask_security/recovery_codes.py +++ b/flask_security/recovery_codes.py @@ -41,7 +41,7 @@ from cryptography.fernet import MultiFernet import flask from flask.typing import ResponseValue - from .datastore import User + from flask_security import UserMixin class MfRecoveryCodesUtil: @@ -71,7 +71,7 @@ def setup_cryptor(self, keys: list[bytes]) -> None: cryptors.append(Fernet(key)) self.cryptor = MultiFernet(cryptors) - def create_recovery_codes(self, user: User) -> list[str]: + def create_recovery_codes(self, user: UserMixin) -> list[str]: """Create :data:`SECURITY_MULTI_FACTOR_RECOVERY_CODES_N` new recovery codes and store in user record. If configured (:data:`SECURITY_MULTI_FACTOR_RECOVERY_CODES_KEYS`), @@ -83,18 +83,18 @@ def create_recovery_codes(self, user: User) -> list[str]: _datastore.mf_set_recovery_codes(user, self._encrypt_codes(new_codes)) return new_codes - def get_recovery_codes(self, user: User) -> list[str]: + def get_recovery_codes(self, user: UserMixin) -> list[str]: """Return list of (unencrypted) recovery codes""" ecodes = _datastore.mf_get_recovery_codes(user) return self._decrypt_codes(ecodes) - def check_recovery_code(self, user: User, code: str) -> bool: + def check_recovery_code(self, user: UserMixin, code: str) -> bool: """Verify code is valid""" codes = _datastore.mf_get_recovery_codes(user) dcodes = self._decrypt_codes(codes) return code in dcodes - def delete_recovery_code(self, user: User, code: str) -> bool: + def delete_recovery_code(self, user: UserMixin, code: str) -> bool: """codes are single use - so delete after use. encrypting code gives different answer due to time stamp. we don't want to re-encrypt other codes. @@ -165,7 +165,7 @@ class MfRecoveryForm(Form): def __init__(self, *args: t.Any, **kwargs: t.Any): super().__init__(*args, **kwargs) # filled by view - self.user: User | None = None + self.user: UserMixin | None = None def validate(self, **kwargs: t.Any) -> bool: if not super().validate(**kwargs): # pragma: no cover diff --git a/flask_security/tf_plugin.py b/flask_security/tf_plugin.py index ed500f1b..793f2c4e 100644 --- a/flask_security/tf_plugin.py +++ b/flask_security/tf_plugin.py @@ -45,8 +45,7 @@ import flask from flask.typing import ResponseValue from flask import Response - from .core import Security - from .datastore import User + from flask_security import Security, UserMixin class TwoFactorSelectForm(Form): @@ -116,14 +115,14 @@ def create_blueprint( ) -> None: raise NotImplementedError - def get_setup_methods(self, user: User) -> list[str]: + def get_setup_methods(self, user: UserMixin) -> list[str]: """ Return a list of methods that ``user`` has setup for this second factor """ raise NotImplementedError def tf_login( - self, user: User, json_payload: dict[str, t.Any], next_loc: str | None + self, user: UserMixin, json_payload: dict[str, t.Any], next_loc: str | None ) -> ResponseValue: """ Called from first/primary authenticated views if the user successfully @@ -169,7 +168,7 @@ def create_blueprint( endpoint="tf_select", )(tf_select) - def method_to_impl(self, user: User, method: str) -> TfPluginBase | None: + def method_to_impl(self, user: UserMixin, method: str) -> TfPluginBase | None: # reverse map a method to the implementation. # N.B. again - requires that methods be unique across all implementations. # There is a small window that a previously setup method was removed. @@ -179,7 +178,7 @@ def method_to_impl(self, user: User, method: str) -> TfPluginBase | None: return impl return None # pragma no cover - def get_setup_tf_methods(self, user: User) -> list[str]: + def get_setup_tf_methods(self, user: UserMixin) -> list[str]: # Return list of methods that user has setup methods = [] for impl in self._tf_impls.values(): @@ -188,7 +187,7 @@ def get_setup_tf_methods(self, user: User) -> list[str]: def tf_enter( self, - user: User, + user: UserMixin, remember_me: bool, primary_authn_via: str, next_loc: str | None, @@ -246,7 +245,7 @@ def tf_enter( return simple_render_json(json_payload) return None - def tf_complete(self, user: User, dologin: bool) -> str | None: + def tf_complete(self, user: UserMixin, dologin: bool) -> str | None: remember = session.pop("tf_remember_login", None) if dologin: @@ -314,7 +313,7 @@ def tf_set_validity_token_cookie(response: Response, token: str) -> Response: return response -def tf_check_state(allowed_states: list[str]) -> User | None: +def tf_check_state(allowed_states: list[str]) -> UserMixin | None: if ( not all(k in session for k in ["tf_user_id", "tf_state"]) or session["tf_state"] not in allowed_states diff --git a/flask_security/totp.py b/flask_security/totp.py index c9af7761..84f4d9db 100644 --- a/flask_security/totp.py +++ b/flask_security/totp.py @@ -18,7 +18,7 @@ from passlib.pwd import genword if t.TYPE_CHECKING: # pragma: no cover - from .datastore import User + from flask_security import UserMixin class Totp: @@ -35,7 +35,7 @@ class Totp: """ - def __init__(self, secrets: dict[str | int, str], issuer: str): + def __init__(self, secrets: dict[str, str] | dict[int, str], issuer: str): """Initialize a totp factory. secrets are used to encrypt the per-user totp_secret on disk. """ @@ -48,7 +48,8 @@ def generate_totp_password(self, totp_secret: str) -> str: """Get time-based one-time password on the basis of given secret and time :param totp_secret: the unique shared secret of the user """ - return self._totp.from_source(totp_secret).generate().token + tt = self._totp.from_source(totp_secret).generate() + return tt.token # type: ignore[return-value] def generate_totp_secret(self) -> str: """Create new user-unique totp_secret. @@ -60,7 +61,7 @@ def generate_totp_secret(self) -> str: return self._totp.new().to_json(encrypt=True) def verify_totp( - self, token: str, totp_secret: str, user: User, window: int = 0 + self, token: str, totp_secret: str, user: UserMixin, window: int = 0 ) -> bool: """Verifies token for specific user. @@ -110,7 +111,7 @@ def get_totp_pretty_key(self, totp_secret: str) -> str: tp = self._totp.from_source(totp_secret) return tp.pretty_key() - def fetch_setup_values(self, totp: str, user: User) -> dict[str, str]: + def fetch_setup_values(self, totp: str, user: UserMixin) -> dict[str, str]: """Generate various values user needs to setup authenticator app. Returns dict with keys: 'key': totp key @@ -126,6 +127,7 @@ def fetch_setup_values(self, totp: str, user: User) -> dict[str, str]: # By convention, the URI should have the username that the user # logs in with. username = user.calc_username() or "Unknown" + assert self._totp.issuer r["username"] = username r["key"] = self.get_totp_pretty_key(totp) r["issuer"] = self._totp.issuer @@ -176,7 +178,7 @@ def generate_recovery_codes(self, number: int) -> list[str]: ) return spwds - def get_last_counter(self, user: User) -> TotpMatch | None: + def get_last_counter(self, user: UserMixin) -> int | None: """Implement this to fetch stored last_counter from cache. :param user: User model @@ -184,7 +186,7 @@ def get_last_counter(self, user: User) -> TotpMatch | None: """ return None - def set_last_counter(self, user: User, tmatch: TotpMatch) -> None: + def set_last_counter(self, user: UserMixin, tmatch: TotpMatch) -> None: """Implement this to cache last_counter. :param user: User model diff --git a/flask_security/twofactor.py b/flask_security/twofactor.py index 8df7df18..416148ba 100644 --- a/flask_security/twofactor.py +++ b/flask_security/twofactor.py @@ -40,8 +40,7 @@ if t.TYPE_CHECKING: # pragma: no cover import flask - from .core import Security - from .datastore import User + from flask_security import Security, UserMixin from flask.typing import ResponseValue @@ -124,7 +123,7 @@ def complete_two_factor_process(user, primary_method, totp_secret, is_changing): return completion_message, token -def set_rescue_options(form: TwoFactorRescueForm, user: User) -> dict[str, str]: +def set_rescue_options(form: TwoFactorRescueForm, user: UserMixin) -> dict[str, str]: # Based on config - set up options for rescue. # Note that this modifies the passed in Form as well as returns # a dict that can be returned as part of a JSON response. @@ -176,14 +175,14 @@ def create_blueprint( ) -> None: pass - def get_setup_methods(self, user: User) -> list[str]: + def get_setup_methods(self, user: UserMixin) -> list[str]: if is_tf_setup(user): assert user.tf_primary_method is not None return [user.tf_primary_method] return [] def tf_login( - self, user: User, json_payload: dict[str, t.Any], next_loc: str | None + self, user: UserMixin, json_payload: dict[str, t.Any], next_loc: str | None ) -> ResponseValue: """Helper for two-factor authentication login diff --git a/flask_security/unified_signin.py b/flask_security/unified_signin.py index afbff1d1..fdd39b32 100644 --- a/flask_security/unified_signin.py +++ b/flask_security/unified_signin.py @@ -94,7 +94,7 @@ if t.TYPE_CHECKING: # pragma: no cover from flask.typing import ResponseValue - from .datastore import User + from flask_security import UserMixin if get_quart_status(): # pragma: no cover from quart import redirect @@ -147,7 +147,7 @@ class _UnifiedPassCodeForm(Form): """Common form for signin and verify/reauthenticate.""" # filled in by caller - user: User + user: UserMixin # Filled in here authn_via: str @@ -364,7 +364,7 @@ class UnifiedSigninSetupValidateForm(Form): """The unified sign in setup validation form""" # These 2 filled in by view - user: User + user: UserMixin totp_secret: str passcode = StringField( diff --git a/flask_security/utils.py b/flask_security/utils.py index e6c7b7db..11554f08 100644 --- a/flask_security/utils.py +++ b/flask_security/utils.py @@ -54,7 +54,7 @@ if t.TYPE_CHECKING: # pragma: no cover from flask import Flask from flask.typing import ResponseValue - from .datastore import User + from flask_security import UserMixin localize_callback = LocalProxy(lambda: _security.i18n_domain.gettext) @@ -140,7 +140,7 @@ def find_csrf_field_name(): return None -def is_user_authenticated(user: User | None) -> bool: +def is_user_authenticated(user: UserMixin | None) -> bool: """ return True if user is authenticated. @@ -160,7 +160,7 @@ def is_user_authenticated(user: User | None) -> bool: def login_user( - user: User, + user: UserMixin, remember: bool | None = None, authn_via: list[str] | None = None, ) -> bool: @@ -362,7 +362,7 @@ def verify_password(password: str | bytes, password_hash: str | bytes) -> bool: return _pwd_context.verify(password, password_hash) -def verify_and_update_password(password: str | bytes, user: User) -> bool: +def verify_and_update_password(password: str | bytes, user: UserMixin) -> bool: """Returns ``True`` if the password is valid for the specified user. Additionally, the hashed password in the database is updated if the @@ -387,7 +387,7 @@ def verify_and_update_password(password: str | bytes, user: User) -> bool: # Try with original password. verified = _pwd_context.verify(password, user.password) - if verified and _pwd_context.needs_update(user.password): + if verified and (user.password is None or _pwd_context.needs_update(user.password)): user.password = hash_password(password) _datastore.put(user) return verified diff --git a/flask_security/webauthn.py b/flask_security/webauthn.py index d68fb9c6..f8baf5dd 100644 --- a/flask_security/webauthn.py +++ b/flask_security/webauthn.py @@ -109,8 +109,7 @@ if t.TYPE_CHECKING: # pragma: no cover import flask from flask.typing import ResponseValue - from .core import Security - from .datastore import User, WebAuthn + from flask_security import Security, UserMixin, WebAuthnMixin if get_quart_status(): # pragma: no cover from quart import redirect @@ -218,7 +217,7 @@ class WebAuthnSigninForm(Form, NextFormMixin): remember = BooleanField(get_form_field_label("remember_me")) submit = SubmitField(label=get_form_field_xlate(_("Start")), id="wan_signin") - user: User | None = None + user: UserMixin | None = None # set by caller - is this a second factor authentication? is_secondary: bool @@ -260,8 +259,8 @@ class WebAuthnSigninResponseForm(Form, NextFormMixin): is_verify: bool # returned to caller authentication_verification: VerifiedAuthentication - user: User | None = None - cred: WebAuthn | None = None + user: UserMixin | None = None + cred: WebAuthnMixin | None = None # Set to True if this authentication qualifies as 'multi-factor' mf_check: bool = False @@ -373,7 +372,7 @@ def validate(self, **kwargs: t.Any) -> bool: class WebAuthnVerifyForm(Form): submit = SubmitField(label=get_form_field_label("submit"), id="wan_verify") - user: User + user: UserMixin def validate(self, **kwargs: t.Any) -> bool: if not super().validate(**kwargs): @@ -464,7 +463,7 @@ def webauthn_register() -> ResponseValue: ) current_creds = [] - cred: WebAuthn + cred: WebAuthnMixin for cred in current_user.webauthn: cl = { "name": cred.name, @@ -568,7 +567,7 @@ def webauthn_register_response(token: str) -> ResponseValue: return redirect(url_for_security("wan_register")) -def _signin_common(user: User | None, usage: list[str]) -> tuple[t.Any, str]: +def _signin_common(user: UserMixin | None, usage: list[str]) -> tuple[t.Any, str]: """ Common code between signin and verify - once form has been verified. """ @@ -875,7 +874,7 @@ def webauthn_verify_response(token: str) -> ResponseValue: return redirect(url_for_security("wan_verify")) -def is_cred_usable(cred: WebAuthn, usage: str | list[str]) -> bool: +def is_cred_usable(cred: WebAuthnMixin, usage: str | list[str]) -> bool: # Return True is cred can be used for the requested usage/verify if not isinstance(usage, list): usage = [usage] @@ -883,7 +882,7 @@ def is_cred_usable(cred: WebAuthn, usage: str | list[str]) -> bool: return cred.usage in usage -def has_webauthn(user: User, usage: str | list[str]) -> bool: +def has_webauthn(user: UserMixin, usage: str | list[str]) -> bool: # Return True if ``user`` has one or more keys with requested usage. # Usage: either "first" or "secondary" if not isinstance(usage, list): @@ -896,7 +895,7 @@ def has_webauthn(user: User, usage: str | list[str]) -> bool: def create_credential_list( - user: User, usage: list[str] + user: UserMixin, usage: list[str] ) -> list[PublicKeyCredentialDescriptor]: # Return a list of registered credentials - filtered by whether they apply to our # authentication state (first or secondary) @@ -930,13 +929,13 @@ def create_blueprint( """ pass - def get_setup_methods(self, user: User) -> list[str]: + def get_setup_methods(self, user: UserMixin) -> list[str]: if has_webauthn(user, "secondary"): return [_("webauthn")] return [] def tf_login( - self, user: User, json_payload: dict[str, t.Any], next_loc: str | None + self, user: UserMixin, json_payload: dict[str, t.Any], next_loc: str | None ) -> ResponseValue: session["tf_state"] = "ready" if not _security._want_json(request): diff --git a/flask_security/webauthn_util.py b/flask_security/webauthn_util.py index 120ed3b2..cf6d9f6d 100644 --- a/flask_security/webauthn_util.py +++ b/flask_security/webauthn_util.py @@ -30,7 +30,7 @@ if t.TYPE_CHECKING: # pragma: no cover import flask - from .datastore import User + from flask_security import UserMixin class WebauthnUtil: @@ -60,7 +60,7 @@ def origin(self) -> str: return request.host_url.rstrip("/") def registration_options( - self, user: User, usage: str, existing_options: dict[str, t.Any] + self, user: UserMixin, usage: str, existing_options: dict[str, t.Any] ) -> dict[str, t.Any]: """ :param user: User object - could be used to configure on a per-user basis. @@ -76,7 +76,7 @@ def registration_options( return existing_options def authenticator_selection( - self, user: User, usage: str + self, user: UserMixin, usage: str ) -> AuthenticatorSelectionCriteria: """ :param user: User object - could be used to configure on a per-user basis. @@ -120,7 +120,7 @@ def authenticator_selection( def authentication_options( self, - user: User | None, + user: UserMixin | None, usage: list[str], existing_options: dict[str, t.Any], ) -> dict[str, t.Any]: @@ -138,7 +138,7 @@ def authentication_options( return existing_options def user_verification( - self, user: User | None, usage: list[str] + self, user: UserMixin | None, usage: list[str] ) -> UserVerificationRequirement: """ As part of signin - do we want/need user verification. diff --git a/mypy.ini b/mypy.ini index 5da98c80..4c7e401a 100644 --- a/mypy.ini +++ b/mypy.ini @@ -4,7 +4,7 @@ pretty = True show_error_codes = True # this sucks - but most of our packages don't yet have types. -ignore_missing_imports = True +disable_error_code = import-untyped no_implicit_optional = True disallow_incomplete_defs = True @@ -22,3 +22,10 @@ warn_unused_configs = True [mypy-flask_security.cli] # Due to click 8.1.4 ignore_errors = True + +[mypy-quart.*] +ignore_missing_imports = True +[mypy-flask_mail.*] +ignore_missing_imports = True +[mypy-twilio.*] +ignore_missing_imports = True diff --git a/pyproject.toml b/pyproject.toml index 1dc22299..b2df5a1a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,3 +96,8 @@ version = {attr = "flask_security.__version__"} [tool.djlint] ignore="H005,H006" # lang, img height/width + +[tool.pyright] + include=["flask_security", "tests/view_scaffold.py"] + analyzeUnannotatedFunctions = "none" + reportMissingImports = false diff --git a/tests/test_changeable.py b/tests/test_changeable.py index 06c3144a..f10205e3 100644 --- a/tests/test_changeable.py +++ b/tests/test_changeable.py @@ -4,7 +4,7 @@ Changeable tests - :copyright: (c) 2019-2023 by J. Christopher Wagner (jwag). + :copyright: (c) 2019-2024 by J. Christopher Wagner (jwag). :license: MIT, see LICENSE for more details. """ @@ -14,10 +14,8 @@ from flask import Flask import markupsafe -from flask_security.core import UserMixin +from flask_security import PasswordUtil, UserMixin, password_changed, user_authenticated from flask_security.forms import _default_field_labels -from flask_security.password_util import PasswordUtil -from flask_security.signals import password_changed, user_authenticated from flask_security.utils import localize_callback from tests.test_utils import ( authenticate, diff --git a/tests/test_registerable.py b/tests/test_registerable.py index 70fb65e0..22f7f81f 100644 --- a/tests/test_registerable.py +++ b/tests/test_registerable.py @@ -18,15 +18,13 @@ logout, ) -from flask_security import Security -from flask_security.core import UserMixin +from flask_security import Security, UserMixin, user_registered, user_not_registered from flask_security.forms import ( ConfirmRegisterForm, RegisterForm, StringField, _default_field_labels, ) -from flask_security.signals import user_registered, user_not_registered from flask_security.utils import localize_callback pytestmark = pytest.mark.registerable() diff --git a/tests/view_scaffold.py b/tests/view_scaffold.py index be322c09..83d02507 100644 --- a/tests/view_scaffold.py +++ b/tests/view_scaffold.py @@ -32,6 +32,8 @@ from flask_security import ( MailUtil, Security, + UserDatastore, + UserMixin, WebauthnUtil, auth_required, current_user, @@ -105,7 +107,7 @@ class WebAuthn(db.Model, fsqla.FsWebAuthnMixin): return SQLAlchemyUserDatastore(db, User, Role, WebAuthn) -def fsqla_lite_datastore(app): +def fsqla_lite_datastore(app: Flask) -> FSQLALiteUserDatastore: from sqlalchemy.orm import DeclarativeBase from flask_sqlalchemy_lite import SQLAlchemy from flask_security.models import sqla as sqla @@ -148,7 +150,7 @@ class WebAuthn(Model, sqla.FsWebAuthnMixin): return FSQLALiteUserDatastore(db, User, Role, WebAuthn) -def create_app(): +def create_app() -> Flask: # Use real templates - not test templates... app = Flask("view_scaffold", template_folder="../") app.config["DEBUG"] = True @@ -234,7 +236,7 @@ def origin(self) -> str: # user_datastore = fsqla_datastore(app) user_datastore = fsqla_lite_datastore(app) - app.security = Security( + security = Security( app, user_datastore, webauthn_util_cls=TestWebauthnUtil, @@ -267,7 +269,9 @@ def get_locale(): pass @user_registered.connect_via(app) - def on_user_registered(myapp, user, confirm_token, **extra): + def on_user_registered( + myapp: Flask, user: UserMixin, confirm_token: str, **extra: dict[str, t.Any] + ) -> None: flash(f"To confirm {user.email} - go to /confirm/{confirm_token}") @user_not_registered.connect_via(app) @@ -313,7 +317,7 @@ def home(): {% include "security/_menu.html" %} """, email=current_user.email, - security=app.security, + security=security, ) @app.route("/basicauth") @@ -329,9 +333,11 @@ def protected(): return app -def add_user(ds, email, password, roles): +def add_user( + ds: UserDatastore, email: str, password: str, role_names: list[str] +) -> None: pw = hash_password(password) - roles = [ds.find_or_create_role(rn) for rn in roles] + roles = [ds.find_or_create_role(rn) for rn in role_names] ds.commit() user = ds.create_user( email=email, password=pw, active=True, confirmed_at=naive_utcnow() @@ -344,11 +350,12 @@ def add_user(ds, email, password, roles): if __name__ == "__main__": myapp = create_app() + security: Security = myapp.extensions["security"] with myapp.app_context(): test_acct = "test@test.com" - if not myapp.security.datastore.find_user(email=test_acct): - add_user(myapp.security.datastore, test_acct, "password", ["admin"]) + if not security.datastore.find_user(email=test_acct): + add_user(security.datastore, test_acct, "password", ["admin"]) print("Created User: {} with password {}".format(test_acct, "password")) myapp.run(port=5001) diff --git a/tox.ini b/tox.ini index bf037f8f..a7872f5c 100644 --- a/tox.ini +++ b/tox.ini @@ -137,7 +137,6 @@ deps = -r requirements/tests.txt types-setuptools mypy - sqlalchemy[mypy] commands = mypy --install-types --non-interactive flask_security tests