Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MFA Session cookie and MFA resource evations prevention #20

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ build
dist
__pycache__
htmlcov
*.egg-info
.coverage
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pip install django-mfa3

## Usage

1. Add `'mfa'` to `INSTALLED_APPS`
1. Add `'mfa'` to `INSTALLED_APPS` and add `mfa.middleware.MfaSessionMiddleware` in your `settings.MIDDLEWARE` configuration.
2. Use `mfa.views.LoginView` instead of the regular login view. (Be sure to
remove any other login routes, otherwise the multi factor authentication
can be circumvented. The admin login will automatically be patched to
Expand Down
75 changes: 75 additions & 0 deletions mfa/middleware.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import time

from django.conf import settings
from django.core.exceptions import SuspiciousOperation
from django.shortcuts import redirect
from django.utils.deprecation import MiddlewareMixin
from django.utils.cache import patch_vary_headers
from django.utils.http import http_date

from django.contrib.sessions.backends.base import UpdateError
from django.contrib.sessions.middleware import SessionMiddleware


class MFAEnforceMiddleware(MiddlewareMixin):
Expand All @@ -10,3 +19,69 @@ def process_view(self, request, view_func, view_args, view_kwargs):
and not request.user.mfakey_set.exists()
):
return redirect('mfa:list')


class MfaSessionMiddleware(SessionMiddleware):
cookie_name = getattr(settings, "MFA_COOKIE_NAME", "mfa_session")

def process_request(self, request):
session_key = request.COOKIES.get(self.cookie_name, None)
request.mfa_session = self.SessionStore(session_key)

def process_response(self, request, response):
"""
If request.mfa_session was modified, or if the configuration is to save the
session every time, save the changes and set a session cookie or delete
the session cookie if the session has been emptied.
"""
try:
accessed = request.mfa_session.accessed
modified = request.mfa_session.modified
empty = request.mfa_session.is_empty()
except AttributeError:
return response
# First check if we need to delete this cookie.
# The session should be deleted only if the session is entirely empty.
if self.cookie_name in request.COOKIES and empty:
response.delete_cookie(
self.cookie_name,
path=settings.SESSION_COOKIE_PATH,
domain=settings.SESSION_COOKIE_DOMAIN,
samesite=settings.SESSION_COOKIE_SAMESITE,
)
patch_vary_headers(response, ("Cookie",))
else:
if accessed:
patch_vary_headers(response, ("Cookie",))
# relies and the global one
if (modified or settings.SESSION_SAVE_EVERY_REQUEST) and not empty:
if request.mfa_session.get_expire_at_browser_close():
max_age = None
expires = None
else:
max_age = request.mfa_session.get_expiry_age()
expires_time = time.time() + max_age
expires = http_date(expires_time)
# Save the session data and refresh the client cookie.
# Skip session save for 500 responses, refs #3881.
if response.status_code != 500:
try:
request.mfa_session.save()
except UpdateError:
raise SuspiciousOperation(
"The request's session was deleted before the "
"request completed. The user may have logged "
"out in a concurrent request, for example."
)
response.set_cookie(
self.cookie_name,
request.mfa_session.session_key,
max_age=max_age,
expires=expires,
domain=settings.SESSION_COOKIE_DOMAIN,
path=settings.SESSION_COOKIE_PATH,
secure=settings.SESSION_COOKIE_SECURE or None,
httponly=settings.SESSION_COOKIE_HTTPONLY or None,
samesite=settings.SESSION_COOKIE_SAMESITE,
)
return response
20 changes: 16 additions & 4 deletions mfa/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,18 @@
from .methods import totp


class MFAFormView(FormView):
class DummyMixin:
pass


class MFASessionDispatcher:

def _get_mfa_session(self):
self.mfa_session = getattr(self.request, "mfa_session", self.request.session)
return self.mfa_session


class MFAFormView(FormView, MFASessionDispatcher):
@property
def method(self):
if self.kwargs['method'] in settings.METHODS:
Expand All @@ -25,7 +36,7 @@ def method(self):
@property
def challenge(self):
try:
return self.request.session['mfa_challenge']
return self.mfa_session['mfa_challenge']
except KeyError as e:
raise Http404 from e

Expand All @@ -38,13 +49,14 @@ def complete(self, code):
@method_decorator(sensitive_post_parameters())
@method_decorator(never_cache)
def dispatch(self, *args, **kwargs):
self._get_mfa_session()
return super().dispatch(*args, **kwargs)

def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
if 'mfa_data' not in context:
data, state = self.begin()
self.request.session['mfa_challenge'] = (data, state)
self.mfa_session['mfa_challenge'] = (data, state)
context['mfa_data'] = data
return context

Expand All @@ -60,5 +72,5 @@ def form_invalid(self, form):
))

def form_valid(self, form):
del self.request.session['mfa_challenge']
del self.mfa_session['mfa_challenge']
return super().form_valid(form)
57 changes: 44 additions & 13 deletions mfa/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .forms import MFAAuthForm
from .forms import MFACreateForm
from .mail import send_mail
from .mixins import MFAFormView
from .mixins import MFAFormView, MFASessionDispatcher
from .models import MFAKey


Expand All @@ -33,24 +33,40 @@ def form_valid(self, form):
if not user.mfakey_set.exists():
return self.no_key_exists(form)

self.request.session['mfa_user'] = {
mfa_session = getattr(self.request, "mfa_session", self.request.session)
mfa_session['mfa_user'] = {
'pk': user.pk,
'backend': user.backend,
}
self.request.session['mfa_success_url'] = self.get_success_url()
mfa_session['mfa_success_url'] = self.get_success_url()
for method in settings.METHODS:
if user.mfakey_set.filter(method=method).exists():
return redirect('mfa:auth', method)


class MFAListView(LoginRequiredMixin, ListView):
class MFARequiredIfExistsMixin(LoginRequiredMixin, MFASessionDispatcher):
def dispatch(self, request, *args, **kwargs):
self._get_mfa_session()
if not request.user.is_authenticated:
return self.handle_no_permission()
elif not self.mfa_session.get("mfa_authenticated") and request.user.mfakey_set.exists():
# the user must be authenticated with a 2fa to access to this resource OR
# he didn't have configured yet a new 2fa
#
# this prevents evasion attacks where an adversary being logged in without 2fa
# is able to create a 2fa and/or delete a preexisting 2fa
return self.handle_no_permission()
return super().dispatch(request, *args, **kwargs)


class MFAListView(MFARequiredIfExistsMixin, ListView):
model = MFAKey

def get_queryset(self):
return super().get_queryset().filter(user=self.request.user)


class MFADeleteView(LoginRequiredMixin, DeleteView):
class MFADeleteView(MFARequiredIfExistsMixin, DeleteView):
model = MFAKey

def get_queryset(self):
Expand All @@ -62,18 +78,26 @@ def get_success_url(self):

class MFACreateView(LoginRequiredMixin, MFAFormView):
form_class = MFACreateForm

def get_template_names(self):
return f'mfa/create_{self.method.name}.html'

def get_success_url(self):
return reverse('mfa:list')

def begin(self):
return self.method.register_begin(self.request.user)

self._get_mfa_session()
# the user can create new 2fa only if he doesn't have already configured a 2fa or if it is authenticated with 2fa
if not self.request.user.mfakey_set.exists() or self.mfa_session.get("mfa_authenticated"):
return self.method.register_begin(self.request.user)
else:
return self.handle_no_permission()

def complete(self, code):
return self.method.register_complete(self.challenge[1], code)
mfa_completed = self.method.register_complete(self.challenge[1], code)
if mfa_completed:
self.mfa_session["mfa_authenticated"] = True
return mfa_completed

def form_valid(self, form):
MFAKey.objects.create(
Expand All @@ -91,11 +115,15 @@ def form_valid(self, form):
class MFAAuthView(MFAFormView):
form_class = MFAAuthForm

def dispatch(self, request, *args, **kwargs):
self._get_mfa_session()
return super().dispatch(request, *args, **kwargs)

def get_template_names(self):
return f'mfa/auth_{self.method.name}.html'

def get_success_url(self):
success_url = self.request.session.pop('mfa_success_url')
success_url = self.mfa_session.pop('mfa_success_url')
if self.method.name == 'recovery':
return reverse('mfa:list')
else:
Expand All @@ -104,7 +132,7 @@ def get_success_url(self):
@cached_property
def user(self):
try:
user_data = self.request.session['mfa_user']
user_data = self.mfa_session['mfa_user']
except KeyError as e:
raise Http404 from e
User = get_user_model()
Expand All @@ -116,20 +144,23 @@ def begin(self):
return self.method.authenticate_begin(self.user)

def complete(self, code):
return self.method.authenticate_complete(
mfa_auth = self.method.authenticate_complete(
self.challenge[1], self.user, code,
)
self.mfa_session["mfa_authenticated"] = True
return mfa_auth

def form_invalid(self, form):
user_login_failed.send(
sender=__name__,
credentials={'username': self.user.get_username()},
request=self.request,
)
self.mfa_session["mfa_authenticated"] = False
send_mail(self.user, self.method)
return super().form_invalid(form)

def form_valid(self, form):
login(self.request, self.user)
del self.request.session['mfa_user']
del self.mfa_session['mfa_user']
return super().form_valid(form)
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ include = ["mfa*"]
[tool.setuptools.package-data]
mfa = [
"templates/**/*.html",
"templates/**/*.txt",
"static/**/*.js",
"locale/**/*.po",
"locale/**/*.mo",
Expand Down
36 changes: 35 additions & 1 deletion tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,29 +314,63 @@ def test_login(self):

class ListViewTest(MFATestCase):
def test_list_view(self):
self.client.force_login(self.user)
MFAKey.objects.create(
user=self.user,
method='FIDO2',
name='test',
secret='mock',
)
MFAKey.objects.create(
user=self.user,
method='recovery',
name='recovery',
secret=make_password('123456'),
)

res = self.login()
self.assertEqual(res.status_code, 302)

res = self.client.get('/mfa/auth/recovery/')
self.assertEqual(res.status_code, 200)

# user must be logged using MFA to list 2fas
login_res = self.client.post('/mfa/auth/recovery/', {'code': '123456'})
self.assertEqual(login_res.status_code, 302)

res = self.client.get('/mfa/')
self.assertEqual(res.status_code, 200)

self.assertEqual(res.content.count(b'<li>'), 1)


class DeleteViewTest(MFATestCase):
def test_delete_view(self):
self.client.force_login(self.user)
key = MFAKey.objects.create(
user=self.user,
method='FIDO2',
name='test',
secret='mock',
)
r_key = MFAKey.objects.create(
user=self.user,
method='recovery',
name='recovery',
secret=make_password('123456'),
)
res = self.login()
self.assertEqual(res.status_code, 302)

res = self.client.get('/mfa/auth/recovery/')
self.assertEqual(res.status_code, 200)

# user must be logged using MFA to delete 2fa
login_res = self.client.post('/mfa/auth/recovery/', {'code': '123456'})
self.assertEqual(login_res.status_code, 302)

res = self.client.post(f'/mfa/{key.pk}/delete/')
self.assertEqual(res.status_code, 302)

self.assertEqual(res.url, '/mfa/')
self.assertEqual(MFAKey.objects.filter(pk=key.pk).count(), 0)

Expand Down