From a7bfae445d560938e2a06f7ef8dcb56614709ee3 Mon Sep 17 00:00:00 2001 From: vincent porte Date: Wed, 13 Nov 2024 15:30:08 +0100 Subject: [PATCH] add clean_next_url method to reduce mitigate url redirection from remote source risks --- .../tests/__snapshots__/tests_view.ambr | 12 +++++++++ lacommunaute/forum_member/tests/tests_view.py | 27 ++++++++++++------- lacommunaute/forum_member/views.py | 8 +++--- lacommunaute/utils/tests/tests_urls.py | 9 +++++++ lacommunaute/utils/urls.py | 6 +++++ 5 files changed, 49 insertions(+), 13 deletions(-) diff --git a/lacommunaute/forum_member/tests/__snapshots__/tests_view.ambr b/lacommunaute/forum_member/tests/__snapshots__/tests_view.ambr index 58fac4c9d..d52f93c1f 100644 --- a/lacommunaute/forum_member/tests/__snapshots__/tests_view.ambr +++ b/lacommunaute/forum_member/tests/__snapshots__/tests_view.ambr @@ -336,6 +336,18 @@ ''' # --- +# name: TestSendMagicLink.test_payload_when_sending_magic_link[/-/][send_magic_link_payload] + '{"sender": {"name": "La Communaut\\u00e9", "email": "noreply@inclusion.beta.gouv.fr"}, "to": [{"email": "samuel@jackson.com"}], "params": {"display_name": "Samuel J.", "login_link": "LOGIN_LINK"}, "templateId": 31}' +# --- +# name: TestSendMagicLink.test_payload_when_sending_magic_link[/topics/-/][send_magic_link_payload] + '{"sender": {"name": "La Communaut\\u00e9", "email": "noreply@inclusion.beta.gouv.fr"}, "to": [{"email": "samuel@jackson.com"}], "params": {"display_name": "Samuel J.", "login_link": "LOGIN_LINK"}, "templateId": 31}' +# --- +# name: TestSendMagicLink.test_payload_when_sending_magic_link[None-/][send_magic_link_payload] + '{"sender": {"name": "La Communaut\\u00e9", "email": "noreply@inclusion.beta.gouv.fr"}, "to": [{"email": "samuel@jackson.com"}], "params": {"display_name": "Samuel J.", "login_link": "LOGIN_LINK"}, "templateId": 31}' +# --- +# name: TestSendMagicLink.test_payload_when_sending_magic_link[http://www.unallowed_host.com-/][send_magic_link_payload] + '{"sender": {"name": "La Communaut\\u00e9", "email": "noreply@inclusion.beta.gouv.fr"}, "to": [{"email": "samuel@jackson.com"}], "params": {"display_name": "Samuel J.", "login_link": "LOGIN_LINK"}, "templateId": 31}' +# --- # name: TestSendMagicLink.test_payload_when_sending_magic_link[send_magic_link_payload] '{"sender": {"name": "La Communaut\\u00e9", "email": "noreply@inclusion.beta.gouv.fr"}, "to": [{"email": "samuel@jackson.com"}], "params": {"display_name": "Samuel J.", "login_link": "LOGIN_LINK"}, "templateId": 31}' # --- diff --git a/lacommunaute/forum_member/tests/tests_view.py b/lacommunaute/forum_member/tests/tests_view.py index 266280c08..b0784c880 100644 --- a/lacommunaute/forum_member/tests/tests_view.py +++ b/lacommunaute/forum_member/tests/tests_view.py @@ -20,6 +20,8 @@ from lacommunaute.forum_member.views import send_magic_link from django.conf import settings +from lacommunaute.utils.urls import clean_next_url + PermissionHandler = get_class("forum_permission.handler", "PermissionHandler") assign_perm = get_class("forum_permission.shortcuts", "assign_perm") @@ -90,13 +92,15 @@ def test_show_linkedin_link(self, client, db): class TestSendMagicLink: - def test_payload_when_sending_magic_link(self, client, db, snapshot, mock_respx_post_to_sib_smtp_url): - user = UserFactory(first_name="Samuel", last_name="Jackson", email="samuel@jackson.com") - next_url = "/topics/" - uidb64 = urlsafe_base64_encode(force_bytes(user.pk)) - token = default_token_generator.make_token(user) + @pytest.mark.parametrize( + "next_url,expected", [(None, "/"), ("/", "/"), ("/topics/", "/"), ("http://www.unallowed_host.com", "/")] + ) + def test_payload_when_sending_magic_link( + self, client, db, user_token, next_url, expected, snapshot, mock_respx_post_to_sib_smtp_url + ): + user, uidb64, token = user_token url = reverse("members:login_with_link", kwargs={"uidb64": uidb64, "token": token}) - query_params = urlencode({"next": next_url}) + query_params = urlencode({"next": clean_next_url(next_url)}) login_link = f"{settings.COMMU_PROTOCOL}://{settings.COMMU_FQDN}{url}?{query_params}" send_magic_link(user, next_url) @@ -120,22 +124,25 @@ def test_content(self, client, db, next_url, snapshot): def test_post(self, client, db, next_url, snapshot, mock_respx_post_to_sib_smtp_url): user = UserFactory(email="test@server.com") response = client.post( - reverse("members:login") + f"?next={next_url}" if next_url else reverse("members:login"), + reverse("members:login") + f"?next={next_url}", data={"email": user.email}, ) assert response.status_code == 200 content = parse_response_to_soup(response, selector="main") assert str(content) == snapshot(name="login_view_content") - @pytest.mark.parametrize("next_url", [None, "/", "/topics/"]) - def test_redirection_when_email_is_unknown(self, client, db, next_url, mock_respx_post_to_sib_smtp_url): + @pytest.mark.parametrize( + "next_url,expected", + [(None, "/"), ("/", "/"), ("/topics/", "/topics/"), ("http://www.unallowed_host.com", "/")], + ) + def test_redirection_when_email_is_unknown(self, client, db, next_url, expected, mock_respx_post_to_sib_smtp_url): response = client.post( reverse("members:login") + f"?next={next_url}" if next_url else reverse("members:login"), data={"email": "john@travolta.es"}, ) assert response.status_code == 302 assert response.url == reverse("members:create") + "?" + urlencode( - {"email": "john@travolta.es", "next": "/" if next_url is None else next_url} + {"email": "john@travolta.es", "next": expected} ) diff --git a/lacommunaute/forum_member/views.py b/lacommunaute/forum_member/views.py index 031c695ca..39eba9730 100644 --- a/lacommunaute/forum_member/views.py +++ b/lacommunaute/forum_member/views.py @@ -23,6 +23,7 @@ from lacommunaute.notification.enums import EmailSentTrackKind from lacommunaute.users.enums import IdentityProvider from lacommunaute.users.models import User +from lacommunaute.utils.urls import clean_next_url logger = logging.getLogger(__name__) @@ -62,7 +63,7 @@ def send_magic_link(user, next_url): token = default_token_generator.make_token(user) uidb64 = urlsafe_base64_encode(force_bytes(user.pk)) url = reverse("members:login_with_link", kwargs={"uidb64": uidb64, "token": token}) - query_params = urlencode({"next": next_url}) + query_params = urlencode({"next": clean_next_url(next_url)}) login_link = f"{settings.COMMU_PROTOCOL}://{settings.COMMU_FQDN}{url}?{query_params}" send_email( @@ -96,7 +97,7 @@ def form_valid(self, form): user = User.objects.get(email=email) except User.DoesNotExist: base_url = reverse("members:create") - query_params = {"email": email, "next": next_url} + query_params = {"email": email, "next": clean_next_url(next_url)} return HttpResponseRedirect(f"{base_url}?{urlencode(query_params)}") send_magic_link(user, next_url) @@ -145,7 +146,8 @@ def login_with_link(request, uidb64, token): if user and default_token_generator.check_token(user, token): login(request, user) request.session[IdentityProvider.MAGIC_LINK.name] = 1 - return HttpResponseRedirect(request.GET.get("next", "/")) + next_url = clean_next_url(request.GET.get("next", "/")) + return HttpResponseRedirect(next_url) return HttpResponseRedirect(reverse("members:login")) diff --git a/lacommunaute/utils/tests/tests_urls.py b/lacommunaute/utils/tests/tests_urls.py index 4506a2b7c..51e7af78f 100644 --- a/lacommunaute/utils/tests/tests_urls.py +++ b/lacommunaute/utils/tests/tests_urls.py @@ -3,6 +3,8 @@ import pytest from django.urls import NoReverseMatch, clear_url_caches, reverse +from lacommunaute.utils.urls import clean_next_url + @pytest.fixture(autouse=True) def _clear_url_caches(): @@ -22,3 +24,10 @@ def test_django_urls_prod(settings): reverse("login") with pytest.raises(NoReverseMatch): reverse("djdt:render_panel") + + +@pytest.mark.parametrize( + "url, expected", [(None, "/"), ("http://www.unallowed.com", "/"), ("/", "/"), ("/topics/", "/topics/")] +) +def test_clean_next_url(url, expected): + assert clean_next_url(url) == expected diff --git a/lacommunaute/utils/urls.py b/lacommunaute/utils/urls.py index d17d97697..1f462b7aa 100644 --- a/lacommunaute/utils/urls.py +++ b/lacommunaute/utils/urls.py @@ -62,3 +62,9 @@ def get_safe_url(request, param_name=None, fallback_url=None, url=None): return url return fallback_url + + +def clean_next_url(url): + if not url_has_allowed_host_and_scheme(url, allowed_hosts=settings.ALLOWED_HOSTS): + return "/" + return url