From be5a5ff4a0b120cf2af459c289946f1936501b31 Mon Sep 17 00:00:00 2001 From: chahatsagarmain Date: Wed, 6 Nov 2024 08:37:24 +0530 Subject: [PATCH] [BUG] uuid pattern change and pattern checking openwisp#682 Changed regex for uuid pattern and uuid pattern check in admin.py Fixes openwisp#682 Signed-off-by: chahatsagarmain --- openwisp_controller/config/admin.py | 10 ++++-- openwisp_controller/config/tests/pytest.py | 31 ++++++++++--------- openwisp_controller/config/utils.py | 21 +++++++------ .../connection/channels/routing.py | 3 +- 4 files changed, 37 insertions(+), 28 deletions(-) diff --git a/openwisp_controller/config/admin.py b/openwisp_controller/config/admin.py index f51194af5..7977a4437 100644 --- a/openwisp_controller/config/admin.py +++ b/openwisp_controller/config/admin.py @@ -108,6 +108,11 @@ def get_extra_context(self, pk=None): if not issubclass(self.model, AbstractVpn): ctx['CONFIG_BACKEND_FIELD_SHOWN'] = app_settings.CONFIG_BACKEND_FIELD_SHOWN if pk: + UUID_PATTERN = re.compile( + '^[a-fA-F0-9]{8}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{12}$' + ) + if not UUID_PATTERN.match(str(pk)): + raise Http404() ctx['download_url'] = reverse('{0}_download'.format(prefix), args=[pk]) try: has_config = True @@ -137,9 +142,10 @@ def change_view(self, request, object_id, form_url='', extra_context=None): def get_urls(self): options = getattr(self.model, '_meta') url_prefix = '{0}_{1}'.format(options.app_label, options.model_name) + UUID_PATTERN = r'([a-fA-F0-9]{8}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{12})' return [ re_path( - r'^download/(?P[^/]+)/$', + r'^download/{0}/$'.format(UUID_PATTERN), self.admin_site.admin_view(self.download_view), name='{0}_download'.format(url_prefix), ), @@ -149,7 +155,7 @@ def get_urls(self): name='{0}_preview'.format(url_prefix), ), re_path( - r'^(?P[^/]+)/context\.json$', + r'^{0}/context\.json$'.format(UUID_PATTERN), self.admin_site.admin_view(self.context_view), name='{0}_context'.format(url_prefix), ), diff --git a/openwisp_controller/config/tests/pytest.py b/openwisp_controller/config/tests/pytest.py index 4d0987e7d..d8b234dcd 100644 --- a/openwisp_controller/config/tests/pytest.py +++ b/openwisp_controller/config/tests/pytest.py @@ -11,21 +11,22 @@ from ..base.channels_consumer import BaseDeviceConsumer from .utils import CreateDeviceMixin -Device = load_model('config', 'Device') +Device = load_model("config", "Device") @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) class TestDeviceConsumer(CreateDeviceMixin): model = Device + UUID_PATTERN = "[a-fA-F0-9]{8}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{12}" application = ProtocolTypeRouter( { - 'websocket': AllowedHostsOriginValidator( + "websocket": AllowedHostsOriginValidator( AuthMiddlewareStack( URLRouter( [ re_path( - r'^ws/controller/device/(?P[^/]+)/$', + f"^ws/controller/device/(?P{UUID_PATTERN})/$", BaseDeviceConsumer.as_asgi(), ) ] @@ -36,14 +37,14 @@ class TestDeviceConsumer(CreateDeviceMixin): ) async def _get_communicator(self, admin_client, device_id): - session_id = admin_client.cookies['sessionid'].value + session_id = admin_client.cookies["sessionid"].value communicator = WebsocketCommunicator( self.application, - path=f'ws/controller/device/{device_id}/', + path=f"ws/controller/device/{device_id}/", headers=[ ( - b'cookie', - f'sessionid={session_id}'.encode('ascii'), + b"cookie", + f"sessionid={session_id}".encode("ascii"), ) ], ) @@ -55,15 +56,15 @@ async def _get_communicator(self, admin_client, device_id): def _add_model_permissions(self, user, add=True, change=True, delete=True): permissions = [] if add: - permissions.append(f'add_{self.model._meta.model_name}') + permissions.append(f"add_{self.model._meta.model_name}") if change: - permissions.append(f'change_{self.model._meta.model_name}') + permissions.append(f"change_{self.model._meta.model_name}") if delete: - permissions.append(f'delete_{self.model._meta.model_name}') + permissions.append(f"delete_{self.model._meta.model_name}") user.user_permissions.set(Permission.objects.filter(codename__in=permissions)) async def test_unauthenticated_user(self, client): - client.cookies['sessionid'] = 'random' + client.cookies["sessionid"] = "random" device = await database_sync_to_async(self._create_device)() with pytest.raises(AssertionError): await self._get_communicator(client, device.id) @@ -91,14 +92,14 @@ async def test_user_authorization(self, client, django_user_model): async def test_silent_disconnection(self, admin_user, admin_client): device = await database_sync_to_async(self._create_device)() - session_id = admin_client.cookies['sessionid'].value + session_id = admin_client.cookies["sessionid"].value communicator = WebsocketCommunicator( self.application, - path=f'ws/controller/device/{device.pk}/', + path=f"ws/controller/device/{device.pk}/", headers=[ ( - b'cookie', - f'sessionid={session_id}'.encode('ascii'), + b"cookie", + f"sessionid={session_id}".encode("ascii"), ) ], ) diff --git a/openwisp_controller/config/utils.py b/openwisp_controller/config/utils.py index 78989fb1b..245b82165 100644 --- a/openwisp_controller/config/utils.py +++ b/openwisp_controller/config/utils.py @@ -114,24 +114,25 @@ def get_controller_urls(views_module): """ used by third party apps to reduce boilerplate """ + UUID_PATTERN = '[a-fA-F0-9]{8}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{12}' urls = [ re_path( - 'controller/device/checksum/(?P[^/]+)/$', + f'controller/device/checksum/(?P{UUID_PATTERN})/$', views_module.device_checksum, name='device_checksum', ), re_path( - 'controller/device/download-config/(?P[^/]+)/$', + f'controller/device/download-config/(?P{UUID_PATTERN})/$', views_module.device_download_config, name='device_download_config', ), re_path( - 'controller/device/update-info/(?P[^/]+)/$', + f'controller/device/update-info/(?P{UUID_PATTERN})/$', views_module.device_update_info, name='device_update_info', ), re_path( - 'controller/device/report-status/(?P[^/]+)/$', + f'controller/device/report-status/(?P{UUID_PATTERN})/$', views_module.device_report_status, name='device_report_status', ), @@ -141,33 +142,33 @@ def get_controller_urls(views_module): name='device_register', ), re_path( - 'controller/vpn/checksum/(?P[^/]+)/$', + f'controller/vpn/checksum/(?P{UUID_PATTERN})/$', views_module.vpn_checksum, name='vpn_checksum', ), re_path( - 'controller/vpn/download-config/(?P[^/]+)/$', + f'controller/vpn/download-config/(?P{UUID_PATTERN})/$', views_module.vpn_download_config, name='vpn_download_config', ), # legacy URLs re_path( - 'controller/checksum/(?P[^/]+)/$', + f'controller/checksum/(?P{UUID_PATTERN})/$', views_module.device_checksum, name='checksum_legacy', ), re_path( - 'controller/download-config/(?P[^/]+)/$', + f'controller/download-config/(?P{UUID_PATTERN})/$', views_module.device_download_config, name='download_config_legacy', ), re_path( - 'controller/update-info/(?P[^/]+)/$', + f'controller/update-info/(?P{UUID_PATTERN})/$', views_module.device_update_info, name='update_info_legacy', ), re_path( - 'controller/report-status/(?P[^/]+)/$', + f'controller/report-status/(?P{UUID_PATTERN})/$', views_module.device_report_status, name='report_status_legacy', ), diff --git a/openwisp_controller/connection/channels/routing.py b/openwisp_controller/connection/channels/routing.py index 70c426332..4236d1597 100644 --- a/openwisp_controller/connection/channels/routing.py +++ b/openwisp_controller/connection/channels/routing.py @@ -4,9 +4,10 @@ def get_routes(consumer=ow_consumer): + UUID_PATTERN = '[a-fA-F0-9]{8}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{12}' return [ re_path( - r'^ws/controller/device/(?P[^/]+)/command$', + f'^ws/controller/device/(?P{UUID_PATTERN})/command$', consumer.CommandConsumer.as_asgi(), ) ]