Skip to content

Commit

Permalink
Fix form.next for newer authentication mechanisms - including two-fac…
Browse files Browse the repository at this point in the history
…tor. (#854)

Propagating 'next' when part of the request query string was working - but FS also supported 'next' as a hidden field in some endpoints
such as /login and /register. form.next wasn't being propagated with 2FA. Also - newer authentication endpoints - /us-signin /wan-signin didn't have the
'next' hidden field.

Also - there were NO tests for form.next.

Fixed all that, added tests, combined all 'next' calculations in a single utility.

closes #853
  • Loading branch information
jwag956 authored Oct 14, 2023
1 parent 8dba976 commit 509aac7
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Fixes
- (:issue:`826`) Fix error in quickstart (codycollier)
- (:pr:`835`) Update Armenian translations (amkrtchyan-tmp)
- (:pr:`831`) Update German translations. (sr-verde)
- (:issue:`853`) Fix 'next' propagation when passed as form.next (thanks cariaso)

Version 5.3.0
-------------
Expand Down
2 changes: 1 addition & 1 deletion flask_security/oauth_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def oauthresponse(name: str) -> "ResponseValue":
if user:
after_this_request(view_commit)
response = _security.two_factor_plugins.tf_enter(
user, False, "oauth", next_loc=propagate_next(request.url)
user, False, "oauth", next_loc=propagate_next(request.url, None)
)
if response:
return response
Expand Down
8 changes: 5 additions & 3 deletions flask_security/tf_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Flask-Security Two-Factor Plugin Module
:copyright: (c) 2022-2022 by J. Christopher Wagner (jwag).
:copyright: (c) 2022-2023 by J. Christopher Wagner (jwag).
:license: MIT, see LICENSE for more details.
TODO:
Expand Down Expand Up @@ -58,7 +58,9 @@ def __init__(self, *args, **kwargs):
def tf_select() -> "ResponseValue":
# Ask user which MFA method they want to use.
# This is used when a user has setup more than one type of 2FA.
form = build_form_from_request("two_factor_select_form")
form = t.cast(
TwoFactorSelectForm, build_form_from_request("two_factor_select_form")
)

# This endpoint is unauthenticated - make sure we're in a valid state
if not all(k in session for k in ["tf_user_id", "tf_select"]):
Expand All @@ -81,7 +83,7 @@ def tf_select() -> "ResponseValue":
if tf_impl:
json_payload = {"tf_required": True}
response = tf_impl.tf_login(
user, json_payload, next_loc=propagate_next(request.url)
user, json_payload, next_loc=propagate_next(request.url, None)
)
if not response: # pragma no cover
# This really can't happen unless between the time the started logging in
Expand Down
20 changes: 10 additions & 10 deletions flask_security/unified_signin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Flask-Security Unified Signin module
:copyright: (c) 2019-2022 by J. Christopher Wagner (jwag).
:copyright: (c) 2019-2023 by J. Christopher Wagner (jwag).
:license: MIT, see LICENSE for more details.
This implements a unified sign in endpoint - allowing
Expand All @@ -24,7 +24,6 @@
fact allow the user to add a password. Is that sufficient?
- This also means that there is no way to REMOVE your password once it is setup,
although user can register without one.
- Any reason to support 'next' in form? xx?next=yyy works fine.
- separate code validation times for SMS, email, authenticator?
- token versus code versus passcode? Confusing terminology.
Expand Down Expand Up @@ -52,6 +51,7 @@
from .decorators import anonymous_user_required, auth_required, unauth_csrf
from .forms import (
Form,
NextFormMixin,
Required,
build_form_from_request,
build_form,
Expand Down Expand Up @@ -231,7 +231,7 @@ def validate2(self) -> bool:
return False # pragma: no cover


class UnifiedSigninForm(_UnifiedPassCodeForm):
class UnifiedSigninForm(_UnifiedPassCodeForm, NextFormMixin):
"""A unified login form
For either identity/password or request and enter code.
"""
Expand Down Expand Up @@ -414,7 +414,7 @@ def us_signin_send_code() -> "ResponseValue":
This takes an identity (as configured in USER_IDENTITY_ATTRIBUTES)
and a method request to send a code.
"""
form: UnifiedSigninForm = build_form_from_request("us_signin_form")
form = t.cast(UnifiedSigninForm, build_form_from_request("us_signin_form"))
form.submit_send_code.data = True
form.submit.data = False

Expand Down Expand Up @@ -485,7 +485,7 @@ def us_verify_send_code() -> "ResponseValue":
"""
Send code during verify. POST only.
"""
form: UnifiedVerifyForm = build_form_from_request("us_verify_form")
form = t.cast(UnifiedVerifyForm, build_form_from_request("us_verify_form"))
form.submit_send_code.data = True
form.submit.data = False

Expand Down Expand Up @@ -557,7 +557,7 @@ def us_signin() -> "ResponseValue":
else:
return redirect(get_post_login_redirect())

form: UnifiedSigninForm = build_form_from_request("us_signin_form")
form = t.cast(UnifiedSigninForm, build_form_from_request("us_signin_form"))
form.submit.data = True
form.submit_send_code.data = False

Expand All @@ -570,7 +570,7 @@ def us_signin() -> "ResponseValue":
form.user,
remember_me,
form.authn_via,
next_loc=propagate_next(request.url),
next_loc=propagate_next(request.url, form),
)
if response:
return response
Expand Down Expand Up @@ -632,7 +632,7 @@ def us_verify() -> "ResponseValue":
will have filled in ?next=xxx - which we want to carefully not lose as we
go through these steps.
"""
form: UnifiedVerifyForm = build_form_from_request("us_verify_form")
form = t.cast(UnifiedVerifyForm, build_form_from_request("us_verify_form"))
form.submit.data = True
form.submit_send_code.data = False

Expand Down Expand Up @@ -736,7 +736,7 @@ def us_verify_link() -> "ResponseValue":
)
)
response = _security.two_factor_plugins.tf_enter(
user, False, "email", next_loc=propagate_next(request.url)
user, False, "email", next_loc=propagate_next(request.url, None)
)
if response:
return response
Expand Down Expand Up @@ -769,7 +769,7 @@ def us_setup() -> "ResponseValue":
use a timed signed token to pass along state.
GET - retrieve current info (json) or form.
"""
form: UnifiedSigninSetupForm = build_form_from_request("us_setup_form")
form = t.cast(UnifiedSigninSetupForm, build_form_from_request("us_setup_form"))

setup_methods = _compute_setup_methods()
active_methods = _compute_active_methods(current_user)
Expand Down
84 changes: 43 additions & 41 deletions flask_security/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import hmac
import time
import typing as t
from urllib.parse import parse_qsl, parse_qs, urlsplit, urlunsplit, urlencode
from urllib.parse import parse_qsl, urlsplit, urlunsplit, urlencode
import urllib.request
import urllib.error
import warnings
Expand All @@ -36,10 +36,11 @@
from flask_login import current_user
from flask_login import COOKIE_NAME as REMEMBER_COOKIE_NAME
from flask_principal import AnonymousIdentity, Identity, identity_changed, Need
from flask_wtf import csrf
from flask_wtf import csrf, FlaskForm
from wtforms import ValidationError
from itsdangerous import BadSignature, SignatureExpired
from werkzeug.local import LocalProxy
from werkzeug.datastructures import MultiDict

from .quart_compat import best, get_quart_status
from .proxies import _security, _datastore, _pwd_context, _hashing_context
Expand All @@ -48,7 +49,6 @@
if t.TYPE_CHECKING: # pragma: no cover
from flask import Flask, Response
from flask.typing import ResponseValue
from flask_wtf import FlaskForm
from .datastore import User

SB = t.Union[str, bytes]
Expand Down Expand Up @@ -547,46 +547,27 @@ def validate_redirect_url(url: str) -> bool:
return True


def get_post_action_redirect(config_key: str, declared: t.Optional[str] = None) -> str:
# All this nonsense due to mypy
arg_next_url = None
arg_next = request.args.get("next", None)
if arg_next:
arg_next_url = get_url(arg_next)
form_next_url = None
form_next = request.form.get("next", None)
if form_next:
form_next_url = get_url(form_next)
urls = [
arg_next_url,
form_next_url,
find_redirect(config_key),
]
if declared:
urls.insert(0, declared)
for url in urls:
if url and validate_redirect_url(url):
return url
raise ValueError("No valid redirect URL found - configuration error")
def get_post_action_redirect(config_key: str) -> str:
return propagate_next(find_redirect(config_key), request.form)


def get_post_login_redirect(declared: t.Optional[str] = None) -> str:
return get_post_action_redirect("SECURITY_POST_LOGIN_VIEW", declared)
def get_post_login_redirect() -> str:
return get_post_action_redirect("SECURITY_POST_LOGIN_VIEW")


def get_post_register_redirect(declared: t.Optional[str] = None) -> str:
return get_post_action_redirect("SECURITY_POST_REGISTER_VIEW", declared)
def get_post_register_redirect() -> str:
return get_post_action_redirect("SECURITY_POST_REGISTER_VIEW")


def get_post_logout_redirect(declared: t.Optional[str] = None) -> str:
return get_post_action_redirect("SECURITY_POST_LOGOUT_VIEW", declared)
def get_post_logout_redirect() -> str:
return get_post_action_redirect("SECURITY_POST_LOGOUT_VIEW")


def get_post_verify_redirect(declared: t.Optional[str] = None) -> str:
return get_post_action_redirect("SECURITY_POST_VERIFY_VIEW", declared)
def get_post_verify_redirect() -> str:
return get_post_action_redirect("SECURITY_POST_VERIFY_VIEW")


def find_redirect(key: str) -> t.Optional[str]:
def find_redirect(key: str) -> str:
"""Returns the URL to redirect to after a user logs in successfully.
:param key: The session or application configuration key to search for
Expand All @@ -599,17 +580,38 @@ def find_redirect(key: str) -> t.Optional[str]:
app_url = None
if app_value:
app_url = get_url(app_value)
rv = session_url or app_url or current_app.config.get("APPLICATION_ROOT", "/")
rv = session_url or app_url or str(current_app.config.get("APPLICATION_ROOT", "/"))
return rv


def propagate_next(url: str) -> str:
# return either URL or, if URL already has a ?next=xx, return that.
url_next = urlsplit(url)
qparams = parse_qs(url_next.query)
if "next" in qparams:
return qparams["next"][0]
return url
def propagate_next(
fallback_url: str, form: t.Optional[t.Union[FlaskForm, MultiDict]]
) -> str:
"""Compute appropriate redirect URL
The application can add a 'next' query parameter or have 'next' as a form field.
If either exist, make sure they are valid (not pointing to external location)
If neither, return the fallback_url
Can be passed either request.form (which is really a MultiDict OR a real form.
"""
form_next = None
if form and isinstance(form, FlaskForm):
if hasattr(form, "next") and form.next.data:
form_next = get_url(form.next.data)
elif form and form.get("next", None):
form_next = get_url(str(form.get("next")))
arg_next = None
if request.args.get("next", None):
arg_next = get_url(request.args.get("next")) # type: ignore[arg-type]
urls = [
arg_next,
form_next,
fallback_url,
]
for url in urls:
if url and validate_redirect_url(url):
return url
raise ValueError("No valid redirect URL found - configuration error")


def get_config(app: "Flask") -> t.Dict[str, t.Any]:
Expand Down
11 changes: 7 additions & 4 deletions flask_security/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ def login() -> "ResponseValue":
assert form.user is not None
remember_me = form.remember.data if "remember" in form else None
response = _security.two_factor_plugins.tf_enter(
form.user, remember_me, "password", next_loc=propagate_next(request.url)
form.user,
remember_me,
"password",
next_loc=propagate_next(request.url, form),
)
if response:
return response
Expand Down Expand Up @@ -305,7 +308,7 @@ def register() -> "ResponseValue":
# signin - we adhere to historic behavior.
if not _security.confirmable or cv("LOGIN_WITHOUT_CONFIRMATION"):
response = _security.two_factor_plugins.tf_enter(
form.user, False, "register", next_loc=propagate_next(request.url)
form.user, False, "register", next_loc=propagate_next(request.url, form)
)
if response:
return response
Expand Down Expand Up @@ -490,7 +493,7 @@ def confirm_email(token):
# get the email.
# Note also this goes against OWASP recommendations.
response = _security.two_factor_plugins.tf_enter(
user, False, "confirm", next_loc=propagate_next(request.url)
user, False, "confirm", next_loc=propagate_next(request.url, None)
)
if response:
do_flash(m, c)
Expand Down Expand Up @@ -639,7 +642,7 @@ def reset_password(token):
if cv("AUTO_LOGIN_AFTER_RESET"):
# backwards compat - really shouldn't do this according to OWASP
response = _security.two_factor_plugins.tf_enter(
form.user, False, "reset", next_loc=propagate_next(request.url)
form.user, False, "reset", next_loc=propagate_next(request.url, None)
)
if response:
return response
Expand Down
11 changes: 7 additions & 4 deletions flask_security/webauthn.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from flask import current_app as app
from flask_login import current_user
from wtforms import BooleanField, HiddenField, RadioField, StringField, SubmitField
from .forms import NextFormMixin

try:
import webauthn
Expand Down Expand Up @@ -205,7 +206,7 @@ def validate(self, **kwargs: t.Any) -> bool:
return True


class WebAuthnSigninForm(Form):
class WebAuthnSigninForm(Form, NextFormMixin):
identity = StringField(get_form_field_label("identity"))
remember = BooleanField(get_form_field_label("remember_me"))
submit = SubmitField(label=get_form_field_xlate(_("Start")), id="wan_signin")
Expand Down Expand Up @@ -236,7 +237,7 @@ def validate(self, **kwargs: t.Any) -> bool:
return True


class WebAuthnSigninResponseForm(Form):
class WebAuthnSigninResponseForm(Form, NextFormMixin):
"""
This form is used both for signin (primary/first or secondary) and verify.
"""
Expand Down Expand Up @@ -621,7 +622,9 @@ def webauthn_signin() -> "ResponseValue":
cv("WAN_SIGNIN_TEMPLATE"),
wan_signin_form=form,
wan_signin_response_form=build_form(
"wan_signin_response_form", remember=form.remember.data
"wan_signin_response_form",
remember=form.remember.data,
next=form.next.data,
),
wan_state=state_token,
credential_options=json.dumps(o_json),
Expand Down Expand Up @@ -705,7 +708,7 @@ def webauthn_signin_response(token: str) -> "ResponseValue":
form.user,
remember_me,
"webauthn",
next_loc=propagate_next(request.url),
next_loc=propagate_next(request.url, form),
)
if response:
return response
Expand Down
12 changes: 12 additions & 0 deletions tests/test_two_factor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,18 @@ def test_propagate_next(app, client):
verify_url, data=dict(code=codes[0]["login_token"]), follow_redirects=False
)
assert "/im-in" in response.location
logout(client)

# do it with next in the form
data = dict(email="[email protected]", password="password", next="/im-in")
response = client.post("/login", data=data, follow_redirects=True)
assert "?next=/im-in" in response.request.url
# grab URL from form to show that our template propagates ?next
verify_url = get_form_action(response)
response = client.post(
verify_url, data=dict(code=codes[1]["login_token"]), follow_redirects=False
)
assert "/im-in" in response.location


@pytest.mark.settings(freshness=timedelta(minutes=0))
Expand Down
Loading

0 comments on commit 509aac7

Please sign in to comment.