From f3455bf39d45a8adf221f49c58437f3b8426d4f0 Mon Sep 17 00:00:00 2001 From: Hamza Waleed Date: Tue, 13 Aug 2024 16:11:58 +0500 Subject: [PATCH] feat: update tests + define PA role and permissions (#693) --- license_manager/apps/api/permissions.py | 15 -- .../apps/api/v1/tests/constants.py | 1 - .../apps/api/v1/tests/test_views.py | 214 +++++++++--------- license_manager/apps/api/v1/views.py | 18 +- .../apps/subscriptions/constants.py | 9 + license_manager/apps/subscriptions/rules.py | 44 ++++ license_manager/settings/base.py | 7 + 7 files changed, 177 insertions(+), 131 deletions(-) diff --git a/license_manager/apps/api/permissions.py b/license_manager/apps/api/permissions.py index 72858da9..68971efa 100644 --- a/license_manager/apps/api/permissions.py +++ b/license_manager/apps/api/permissions.py @@ -13,18 +13,3 @@ class CanRetireUser(permissions.BasePermission): def has_permission(self, request, view): return request.user.username == settings.RETIREMENT_SERVICE_WORKER_USERNAME or request.user.is_superuser - - -class IsInProvisioningAdminGroup(permissions.BasePermission): - """ - Grant access to those users only who are part of the license provisiioning django group - """ - ALLOWED_API_GROUPS = ['provisioning_admins_group'] - message = 'Access denied: You do not have the necessary permissions to access this.' - - def has_permission(self, request, view): - return ( - super().has_permission(request, view) and ( - request.user.groups.filter(name__in=self.ALLOWED_API_GROUPS).exists() - ) - ) diff --git a/license_manager/apps/api/v1/tests/constants.py b/license_manager/apps/api/v1/tests/constants.py index b8fd9984..b1f2ba03 100644 --- a/license_manager/apps/api/v1/tests/constants.py +++ b/license_manager/apps/api/v1/tests/constants.py @@ -3,7 +3,6 @@ # Constants for subscriptions API tests SUBSCRIPTION_RENEWAL_DAYS_OFFSET = 500 -PROVISIONING_ADMINS_GROUP = "provisioning_admins_group" ADMIN_ROLES = { 'system_role': constants.SYSTEM_ENTERPRISE_ADMIN_ROLE, diff --git a/license_manager/apps/api/v1/tests/test_views.py b/license_manager/apps/api/v1/tests/test_views.py index b0f0b213..1601f97b 100644 --- a/license_manager/apps/api/v1/tests/test_views.py +++ b/license_manager/apps/api/v1/tests/test_views.py @@ -13,7 +13,6 @@ import pytest from django.conf import settings from django.contrib.auth import get_user_model -from django.contrib.auth.models import Group from django.core.exceptions import ObjectDoesNotExist from django.db import DatabaseError from django.http import QueryDict @@ -36,7 +35,6 @@ from license_manager.apps.api.v1.tests.constants import ( ADMIN_ROLES, LEARNER_ROLES, - PROVISIONING_ADMINS_GROUP, SUBSCRIPTION_RENEWAL_DAYS_OFFSET, ) from license_manager.apps.api.v1.views import ( @@ -563,20 +561,21 @@ def test_customer_agreement_detail_non_staff_user_200(api_client, non_staff_user @pytest.mark.django_db -def test_customer_agreement_detail_provisioning_admin_user_200(api_client, staff_user): +def test_customer_agreement_detail_non_staff_user_provisioning_admins_200(api_client, non_staff_user): """ Verify that provisioning admins can access the customer agreement detail endpoint. """ enterprise_customer_uuid = uuid4() _, _, customer_agreement = _create_subscription_plans(enterprise_customer_uuid) - _assign_role_via_jwt_or_db(api_client, staff_user, enterprise_customer_uuid, assign_via_jwt=True) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + _assign_role_via_jwt_or_db( + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) response = _customer_agreement_detail_request( api_client, - staff_user, + non_staff_user, customer_agreement.uuid, viewname='provisioning-admins-customer-agreement-detail', ) @@ -622,17 +621,17 @@ def test_customer_agreement_list_superuser_200(api_client, superuser): @pytest.mark.django_db -def test_customer_agreement_create_provisioning_admin_201(api_client, staff_user): +def test_customer_agreement_create_non_staff_user_201(api_client, non_staff_user): """ Verify that provisioning admins can create a new customer agreement. """ enterprise_customer_uuid = uuid4() - _assign_role_via_jwt_or_db(api_client, staff_user, enterprise_customer_uuid, assign_via_jwt=True) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) - - response = _customer_agreement_create_request(api_client, staff_user, enterprise_customer_uuid) + _assign_role_via_jwt_or_db( + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) + response = _customer_agreement_create_request(api_client, non_staff_user, enterprise_customer_uuid) customer_agreement = CustomerAgreementFactory.create(enterprise_customer_uuid=enterprise_customer_uuid) assert status.HTTP_201_CREATED == response.status_code @@ -641,18 +640,19 @@ def test_customer_agreement_create_provisioning_admin_201(api_client, staff_user @pytest.mark.django_db -def test_customer_agreement_partial_update_provisioning_admin_200(api_client, staff_user): +def test_customer_agreement_partial_update_provisioning_admin_non_staff_user_200(api_client, non_staff_user): """ Verify that provisioning admins can partially update a customer agreement. """ enterprise_customer_uuid = uuid4() - _assign_role_via_jwt_or_db(api_client, staff_user, enterprise_customer_uuid, assign_via_jwt=True) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + _assign_role_via_jwt_or_db( + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) customer_agreement = CustomerAgreementFactory.create(enterprise_customer_uuid=enterprise_customer_uuid) - response = _customer_agreement_partial_update_request(api_client, staff_user, customer_agreement.uuid) + response = _customer_agreement_partial_update_request(api_client, non_staff_user, customer_agreement.uuid) assert status.HTTP_200_OK == response.status_code _assert_customer_agreement_response_correct(response.data, customer_agreement) @@ -897,7 +897,7 @@ def test_subscription_plan_list_bad_enterprise_uuid_400(api_client, superuser): @pytest.mark.django_db -def test_subscription_plan_create_staff_user_200(api_client, staff_user, boolean_toggle): +def test_subscription_plan_create_non_staff_user_200(api_client, non_staff_user): """ Verify that the subcription POST endpoint creates new record and response includes all expected fields """ @@ -907,11 +907,11 @@ def test_subscription_plan_create_staff_user_200(api_client, staff_user, boolean ProductFactory.create_batch(1) params = _prepare_subscription_plan_payload(customer_agreement) _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) response = _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) assert status.HTTP_201_CREATED == response.status_code expected_fields = { @@ -977,27 +977,26 @@ def test_subscription_plan_list_staff_user_403(api_client, staff_user, boolean_t @pytest.mark.django_db -def test_subscription_plan_provisioning_admins_list_staff_user_200(api_client, staff_user, boolean_toggle): +def test_subscription_plan_provisioning_admins_list_non_staff_user_200(api_client, non_staff_user): """ - Verify that the subcription POST endpoint is accessible to authorised users only - and returns valid data + Verify that the subcription POST endpoint is accessible to Provisioning Admins(non-staff) onlye """ enterprise_customer_uuid = uuid4() customer_agreement = CustomerAgreementFactory.create( enterprise_customer_uuid=enterprise_customer_uuid) ProductFactory.create_batch(1) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) params = _prepare_subscription_plan_payload(customer_agreement) _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) response = _provision_license_list_request( - api_client, staff_user) + api_client, non_staff_user) expected_keys = {'count', 'next', 'previous', 'results'} assert expected_keys.issubset(response.json().keys()) @@ -1015,7 +1014,7 @@ def test_subscription_plan_provisioning_admins_list_staff_user_200(api_client, s @pytest.mark.django_db -def test_subscription_plan_pa_list_staff_user_customer_uuid_200(api_client, staff_user, boolean_toggle): +def test_subscription_plan_pa_list_non_staff_user_200(api_client, non_staff_user): """ Verify that the subcription POST endpoint applies enterprise_customer_uuid if passed in query params @@ -1032,29 +1031,26 @@ def test_subscription_plan_pa_list_staff_user_customer_uuid_200(api_client, staf customer_agreement_two = CustomerAgreementFactory.create( enterprise_customer_uuid=enterprise_customer_uuid_two) ProductFactory.create_batch(1) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid_one, assign_via_jwt=boolean_toggle) - + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE + ) params_one = _prepare_subscription_plan_payload(customer_agreement_one) params_two = _prepare_subscription_plan_payload(customer_agreement_two) _provision_license_create_request( - api_client, staff_user, params=params_one) + api_client, non_staff_user, params=params_one) response_one = _provision_license_list_request( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid_one) + api_client, non_staff_user, enterprise_customer_uuid=enterprise_customer_uuid_one) assert status.HTTP_200_OK == response_one.status_code assert response_one.json()['results'][0]['enterprise_customer_uuid'] == str( enterprise_customer_uuid_one) - _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid_two, assign_via_jwt=boolean_toggle) _provision_license_create_request( - api_client, staff_user, params=params_two) + api_client, non_staff_user, params=params_two) response_two = _provision_license_list_request( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid_two) + api_client, non_staff_user, enterprise_customer_uuid=enterprise_customer_uuid_two) assert status.HTTP_200_OK == response_two.status_code assert response_two.json()['results'][0]['enterprise_customer_uuid'] == str( @@ -1082,7 +1078,7 @@ def test_subscription_plan_update_staff_user_403(api_client, staff_user, boolean @pytest.mark.django_db -def test_subscription_plan_create_staff_user_201(api_client, staff_user, boolean_toggle): +def test_subscription_plan_create_non_staff_user_201(api_client, non_staff_user): """ Verify that the subcription update endpoint is accessible to authorised users only """ @@ -1091,18 +1087,18 @@ def test_subscription_plan_create_staff_user_201(api_client, staff_user, boolean enterprise_customer_uuid=enterprise_customer_uuid) ProductFactory.create_batch(1) params = _prepare_subscription_plan_payload(customer_agreement) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) response = _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) assert status.HTTP_201_CREATED == response.status_code @pytest.mark.django_db -def test_subscription_plan_create_staff_user_customer_agreement_400(api_client, staff_user, boolean_toggle): +def test_subscription_plan_create_non_staff_user_customer_agreement_400(api_client, non_staff_user): """ Verify that the subscription create endpoint returns error if invalid customer_agreement is given """ @@ -1115,17 +1111,17 @@ def test_subscription_plan_create_staff_user_customer_agreement_400(api_client, params = _prepare_subscription_plan_payload(customer_agreement) params['customer_agreement'] = invalid_customer_agreement_uuid _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) response = _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) assert status.HTTP_400_BAD_REQUEST == response.status_code assert response.json() == {'error': "An error occurred: Provided customer_agreement doesn't exist."} @pytest.mark.django_db -def test_subscription_plan_create_staff_user_product_400(api_client, staff_user, boolean_toggle): +def test_subscription_plan_create_non_staff_user_product_400(api_client, non_staff_user): """ Verify that the subscription create endpoint returns error if invalid Product is given """ @@ -1137,11 +1133,11 @@ def test_subscription_plan_create_staff_user_product_400(api_client, staff_user, params = _prepare_subscription_plan_payload(customer_agreement) params['product'] = 2 # passing an invalid PK _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) response = _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) assert status.HTTP_400_BAD_REQUEST == response.status_code assert response.json() == {'error': {'product': [ @@ -1149,7 +1145,7 @@ def test_subscription_plan_create_staff_user_product_400(api_client, staff_user, @pytest.mark.django_db -def test_subscription_plan_create_staff_user_salesforce_lineitem_400(api_client, staff_user, boolean_toggle): +def test_subscription_plan_create_non_staff_user_salesforce_lineitem_400(api_client, non_staff_user): """ Verify that the subscription create endpoint returns error if invalid salesforce_lineitem is given """ @@ -1162,11 +1158,11 @@ def test_subscription_plan_create_staff_user_salesforce_lineitem_400(api_client, # passing a invalid string params['salesforce_opportunity_line_item'] = 'foo' _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) response = _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) assert status.HTTP_400_BAD_REQUEST == response.status_code assert response.json() == \ @@ -1174,7 +1170,7 @@ def test_subscription_plan_create_staff_user_salesforce_lineitem_400(api_client, @pytest.mark.django_db -def test_subscription_plan_update_staff_user_200(api_client, staff_user, boolean_toggle): +def test_subscription_plan_update_non_staff_user_200(api_client, non_staff_user): """ Verify that the subscription create endpoint returns 200 on patch request """ @@ -1185,14 +1181,14 @@ def test_subscription_plan_update_staff_user_200(api_client, staff_user, boolean params = _prepare_subscription_plan_payload(customer_agreement) _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) create_response = _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) params['title'] = 'bar' patch_response = _provision_license_patch_request( - api_client, staff_user, params=params, subscription_uuid=create_response.json()['uuid']) + api_client, non_staff_user, params=params, subscription_uuid=create_response.json()['uuid']) assert status.HTTP_201_CREATED == create_response.status_code assert status.HTTP_200_OK == patch_response.status_code expected_fields = { @@ -1225,7 +1221,7 @@ def test_subscription_plan_update_staff_user_200(api_client, staff_user, boolean @pytest.mark.django_db -def test_subscription_plan_update_staff_user_invalid_product_id(api_client, staff_user, boolean_toggle): +def test_subscription_plan_update_non_staff_user_invalid_product_id(api_client, non_staff_user): """ Verify that the subscription create endpoint returns 200 on patch request """ @@ -1236,20 +1232,20 @@ def test_subscription_plan_update_staff_user_invalid_product_id(api_client, staf params = _prepare_subscription_plan_payload(customer_agreement) _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) create_response = _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) params['product'] = 10 # set invalid ID patch_response = _provision_license_patch_request( - api_client, staff_user, params=params, subscription_uuid=create_response.json()['uuid']) + api_client, non_staff_user, params=params, subscription_uuid=create_response.json()['uuid']) assert status.HTTP_201_CREATED == create_response.status_code assert status.HTTP_400_BAD_REQUEST == patch_response.status_code @pytest.mark.django_db -def test_subscription_plan_update_staff_user_invalid_payload(api_client, staff_user, boolean_toggle): +def test_subscription_plan_update_non_staff_user_invalid_payload(api_client, non_staff_user): """ Verify that the subscription create endpoint returns 200 on patch request """ @@ -1260,21 +1256,21 @@ def test_subscription_plan_update_staff_user_invalid_payload(api_client, staff_u params = _prepare_subscription_plan_payload(customer_agreement) _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) create_response = _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) params['salesforce_opportunity_line_item'] = 'foo' # set invalid ID patch_response = _provision_license_patch_request( - api_client, staff_user, params=params, subscription_uuid=create_response.json()['uuid']) + api_client, non_staff_user, params=params, subscription_uuid=create_response.json()['uuid']) assert status.HTTP_201_CREATED == create_response.status_code assert status.HTTP_400_BAD_REQUEST == patch_response.status_code @pytest.mark.django_db -def test_subscription_plan_create_staff_user_cataog_uuid_missing(api_client, staff_user, boolean_toggle): +def test_subscription_plan_create_non_staff_user_cataog_uuid_missing(api_client, non_staff_user): """ Verify that the subscription create endpoint handles request gracefully if enterprise_catalog_uuid is missing """ @@ -1286,11 +1282,11 @@ def test_subscription_plan_create_staff_user_cataog_uuid_missing(api_client, sta params = _prepare_subscription_plan_payload(customer_agreement) params['enterprise_catalog_uuid'] = None _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) create_response = _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) assert status.HTTP_201_CREATED == create_response.status_code @@ -1299,7 +1295,7 @@ def test_subscription_plan_create_staff_user_cataog_uuid_missing(api_client, sta @pytest.mark.django_db -def test_subscription_plan_create_staff_user_invalid_product_id(api_client, staff_user, boolean_toggle): +def test_subscription_plan_create_non_staff_user_invalid_product_id(api_client, non_staff_user): """ Verify that the subscription create endpoint returns error if invalid product id is provided """ @@ -1311,11 +1307,11 @@ def test_subscription_plan_create_staff_user_invalid_product_id(api_client, staf params = _prepare_subscription_plan_payload(customer_agreement) params['product'] = 12 _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + api_client, non_staff_user, enterprise_customer_uuid='*', assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) create_response = _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) assert status.HTTP_400_BAD_REQUEST == create_response.status_code assert create_response.json( @@ -1323,7 +1319,7 @@ def test_subscription_plan_create_staff_user_invalid_product_id(api_client, staf @pytest.mark.django_db -def test_subscription_plan_create_staff_user_db_integrity_error(api_client, staff_user, boolean_toggle): +def test_subscription_plan_create_non_staff_user_db_integrity_error(api_client, non_staff_user): """ Verify that the subscription create endpoint returns error if invalid product id is provided """ @@ -1334,14 +1330,15 @@ def test_subscription_plan_create_staff_user_db_integrity_error(api_client, staf params = _prepare_subscription_plan_payload(customer_agreement) _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + api_client, non_staff_user, enterprise_customer_uuid=None, + assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) first_create_response = _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) second_create_response = _provision_license_create_request( - api_client, staff_user, params=params) + api_client, non_staff_user, params=params) assert status.HTTP_201_CREATED == first_create_response.status_code assert status.HTTP_400_BAD_REQUEST == second_create_response.status_code @@ -1351,7 +1348,7 @@ def test_subscription_plan_create_staff_user_db_integrity_error(api_client, staf @pytest.mark.django_db -def test_subscription_plan_get_staff_user_success(api_client, staff_user, boolean_toggle): +def test_subscription_plan_get_non_staff_user_success(api_client, non_staff_user): """ Verify that the subscription create endpoint returns error if invalid product id is provided """ @@ -1362,34 +1359,35 @@ def test_subscription_plan_get_staff_user_success(api_client, staff_user, boolea params = _prepare_subscription_plan_payload(customer_agreement) _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + api_client, non_staff_user, enterprise_customer_uuid=None, + assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) create_response = _provision_license_create_request( - api_client, staff_user, params) + api_client, non_staff_user, params) created_uuid = str(create_response.json()['uuid']) retrieved_subscription = _subscription_get_request( - api_client, staff_user, created_uuid) + api_client, non_staff_user, created_uuid) assert retrieved_subscription.json()['uuid'] == created_uuid assert status.HTTP_201_CREATED == create_response.status_code @pytest.mark.django_db -def test_subscription_plan_get_staff_user_failure(api_client, staff_user, boolean_toggle): +def test_subscription_plan_get_non_staff_user_failure(api_client, non_staff_user): """ Verify that the subscription create endpoint returns error if invalid product id is provided """ - enterprise_customer_uuid = uuid4() invalid_subscription_id = uuid4() _assign_role_via_jwt_or_db( - api_client, staff_user, enterprise_customer_uuid=enterprise_customer_uuid, assign_via_jwt=boolean_toggle) - allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) - staff_user.groups.add(allowed_group) + api_client, non_staff_user, enterprise_customer_uuid=None, + assign_via_jwt=True, + system_role=constants.SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, + ) response = _subscription_get_request( - api_client, staff_user, invalid_subscription_id) + api_client, non_staff_user, invalid_subscription_id) assert status.HTTP_404_NOT_FOUND == response.status_code diff --git a/license_manager/apps/api/v1/views.py b/license_manager/apps/api/v1/views.py index c45d8fcd..d169a279 100644 --- a/license_manager/apps/api/v1/views.py +++ b/license_manager/apps/api/v1/views.py @@ -13,7 +13,10 @@ from django_filters.rest_framework import DjangoFilterBackend from drf_spectacular.utils import extend_schema, extend_schema_view from edx_rbac.decorators import permission_required -from edx_rbac.mixins import PermissionRequiredForListingMixin +from edx_rbac.mixins import ( + PermissionRequiredForListingMixin, + PermissionRequiredMixin, +) from edx_rest_framework_extensions.auth.jwt.authentication import ( JwtAuthentication, ) @@ -30,10 +33,7 @@ from license_manager.apps.api.filters import LicenseFilter from license_manager.apps.api.mixins import UserDetailsFromJwtMixin from license_manager.apps.api.models import BulkEnrollmentJob -from license_manager.apps.api.permissions import ( - CanRetireUser, - IsInProvisioningAdminGroup, -) +from license_manager.apps.api.permissions import CanRetireUser from license_manager.apps.api.tasks import ( create_braze_aliases_task, execute_post_revocation_tasks, @@ -311,13 +311,15 @@ def _check_subscription_licenses(self, customer_agreement, plan, user_email): ), ) class CustomerAgreementProvisioningAdminViewset( + PermissionRequiredMixin, viewsets.GenericViewSet, mixins.CreateModelMixin, mixins.RetrieveModelMixin ): """ Viewset for Provisioning Admins write operations.""" authentication_classes = [JwtAuthentication, SessionAuthentication] - permission_classes = [permissions.IsAuthenticated, IsInProvisioningAdminGroup] + permission_classes = [permissions.IsAuthenticated] + permission_required = constants.SUBSCRIPTIONS_CUSTOMER_AGREEMENT_PROVISIONING_ADMIN_ACCESS_PERMISSION lookup_field = 'uuid' lookup_url_kwarg = 'customer_agreement_uuid' serializer_class = serializers.CustomerAgreementSerializer @@ -460,13 +462,15 @@ def list(self, request, *args, **kwargs): ), ) class SubscriptionPlanProvisioningAdminViewset( + PermissionRequiredMixin, mixins.CreateModelMixin, mixins.ListModelMixin, viewsets.GenericViewSet ): """ Viewset for Provisioning Admins write operations.""" authentication_classes = [JwtAuthentication, SessionAuthentication] - permission_classes = [permissions.IsAuthenticated, IsInProvisioningAdminGroup] + permission_classes = [permissions.IsAuthenticated] + permission_required = constants.SUBSCRIPTIONS_PROVISIONING_ADMIN_ACCESS_PERMISSION lookup_field = 'uuid' lookup_url_kwarg = 'subscription_uuid' # URL keyword for the lookup field diff --git a/license_manager/apps/subscriptions/constants.py b/license_manager/apps/subscriptions/constants.py index 70ffdef2..492e3501 100644 --- a/license_manager/apps/subscriptions/constants.py +++ b/license_manager/apps/subscriptions/constants.py @@ -77,14 +77,23 @@ class SegmentEvents: # Role-based access control SUBSCRIPTIONS_ADMIN_ROLE = 'enterprise_subscriptions_admin' SUBSCRIPTIONS_LEARNER_ROLE = 'enterprise_subscriptions_learner' +# Role-based access control - Provisioning admins +PROVISIONING_SUBSCRIPTION_ADMIN_ROLE = 'provisioning_subscription_admin' +PROVISIONING_CUSTOMER_AGREEMENT_ADMIN_ROLE = 'provisioning_customer_agreement_admin' SYSTEM_ENTERPRISE_ADMIN_ROLE = 'enterprise_admin' SYSTEM_ENTERPRISE_LEARNER_ROLE = 'enterprise_learner' SYSTEM_ENTERPRISE_OPERATOR_ROLE = 'enterprise_openedx_operator' +SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE = 'enterprise_provisioning_admin' SUBSCRIPTIONS_ADMIN_ACCESS_PERMISSION = 'subscriptions.has_admin_access' SUBSCRIPTIONS_ADMIN_LEARNER_ACCESS_PERMISSION = 'subscriptions.has_learner_or_admin_access' +# Provisioning admins permissions +SUBSCRIPTIONS_PROVISIONING_ADMIN_ACCESS_PERMISSION = 'subscriptions.has_provisioning_admin_access' +SUBSCRIPTIONS_CUSTOMER_AGREEMENT_PROVISIONING_ADMIN_ACCESS_PERMISSION = \ + 'agreements.has_provisioning_admin_access' + # Subsidy constants PERCENTAGE_DISCOUNT_TYPE = 'percentage' LICENSE_DISCOUNT_VALUE = 100 # Represents a 100% off value diff --git a/license_manager/apps/subscriptions/rules.py b/license_manager/apps/subscriptions/rules.py index 0823d5c2..4a2be11b 100644 --- a/license_manager/apps/subscriptions/rules.py +++ b/license_manager/apps/subscriptions/rules.py @@ -103,3 +103,47 @@ def has_explicit_access_to_subscriptions_learner(user, enterprise_customer_uuid) constants.SUBSCRIPTIONS_ADMIN_LEARNER_ACCESS_PERMISSION, has_admin_access | has_learner_access, ) + + +@rules.predicate +def has_implicit_access_to_subscription_plan_provisioning(user, enterprise_customer_uuid): # pylint: disable=unused-argument + """ + Check that if request user has implicit access as PROVISIONING_SUBSCRIPTION_ADMIN_ROLE. + + Returns: + boolean: whether the request user has access. + """ + return request_user_has_implicit_access_via_jwt( + get_decoded_jwt(crum.get_current_request()), + constants.PROVISIONING_SUBSCRIPTION_ADMIN_ROLE, + str(enterprise_customer_uuid), + ) + + +# Grants access permission if the user is a provisioning admin +rules.add_perm( + constants.SUBSCRIPTIONS_PROVISIONING_ADMIN_ACCESS_PERMISSION, + has_implicit_access_to_subscription_plan_provisioning, +) + + +@rules.predicate +def has_implicit_access_to_customer_agreement_provisioning(user, enterprise_customer_uuid): # pylint: disable=unused-argument + """ + Check that if request user has implicit access as PROVISIONING_SUBSCRIPTION_ADMIN_ROLE. + + Returns: + boolean: whether the request user has access. + """ + return request_user_has_implicit_access_via_jwt( + get_decoded_jwt(crum.get_current_request()), + constants.PROVISIONING_CUSTOMER_AGREEMENT_ADMIN_ROLE, + str(enterprise_customer_uuid), + ) + + +# Grants access permission if the user is a provisioning admin +rules.add_perm( + constants.SUBSCRIPTIONS_CUSTOMER_AGREEMENT_PROVISIONING_ADMIN_ACCESS_PERMISSION, + has_implicit_access_to_customer_agreement_provisioning, +) diff --git a/license_manager/settings/base.py b/license_manager/settings/base.py index 4386d9b3..808d78c4 100644 --- a/license_manager/settings/base.py +++ b/license_manager/settings/base.py @@ -4,11 +4,14 @@ from corsheaders.defaults import default_headers as corsheaders_default_headers from license_manager.apps.subscriptions.constants import ( + PROVISIONING_SUBSCRIPTION_ADMIN_ROLE, + PROVISIONING_CUSTOMER_AGREEMENT_ADMIN_ROLE, SUBSCRIPTIONS_ADMIN_ROLE, SUBSCRIPTIONS_LEARNER_ROLE, SYSTEM_ENTERPRISE_ADMIN_ROLE, SYSTEM_ENTERPRISE_LEARNER_ROLE, SYSTEM_ENTERPRISE_OPERATOR_ROLE, + SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE, ) from license_manager.settings.utils import get_logger_config @@ -405,6 +408,10 @@ SYSTEM_ENTERPRISE_OPERATOR_ROLE: [SUBSCRIPTIONS_ADMIN_ROLE], SYSTEM_ENTERPRISE_ADMIN_ROLE: [SUBSCRIPTIONS_ADMIN_ROLE], SYSTEM_ENTERPRISE_LEARNER_ROLE: [SUBSCRIPTIONS_LEARNER_ROLE], + SYSTEM_ENTERPRISE_PROVISIONING_ADMIN_ROLE: [ + PROVISIONING_SUBSCRIPTION_ADMIN_ROLE, + PROVISIONING_CUSTOMER_AGREEMENT_ADMIN_ROLE, + ] } SOCIAL_MEDIA_FOOTER_URLS = os.environ.get('SOCIAL_MEDIA_FOOTER_URLS', '')