diff --git a/openedx_learning/__init__.py b/openedx_learning/__init__.py index 5f1cea0b..75d55cb5 100644 --- a/openedx_learning/__init__.py +++ b/openedx_learning/__init__.py @@ -1,4 +1,4 @@ """ Open edX Learning ("Learning Core"). """ -__version__ = "0.1.5" +__version__ = "0.1.6" diff --git a/openedx_tagging/core/tagging/models/base.py b/openedx_tagging/core/tagging/models/base.py index a8e0ea71..20f610e4 100644 --- a/openedx_tagging/core/tagging/models/base.py +++ b/openedx_tagging/core/tagging/models/base.py @@ -385,11 +385,30 @@ def tag_object( """ Replaces the existing ObjectTag entries for the current taxonomy + object_id with the given list of tags. If self.allows_free_text, then the list should be a list of tag values. - Otherwise, it should be a list of existing Tag IDs. + Otherwise, it should be either a list of existing Tag Values or IDs. Raised ValueError if the proposed tags are invalid for this taxonomy. Preserves existing (valid) tags, adds new (valid) tags, and removes omitted (or invalid) tags. """ + def _find_object_tag_index(tag_ref, object_tags) -> int: + """ + Search for Tag in the given list of ObjectTags by tag_ref or value, + returning its index or -1 if not found. + """ + return next( + ( + i + for i, object_tag in enumerate(object_tags) + if object_tag.tag_ref == tag_ref or object_tag.value == tag_ref + ), + -1, + ) + + if not isinstance(tags, list): + raise ValueError(_(f"Tags must be a list, not {type(tags).__name__}.")) + + tags = list(dict.fromkeys(tags)) # Remove duplicates preserving order + if not self.allow_multiple and len(tags) > 1: raise ValueError(_(f"Taxonomy ({self.id}) only allows one tag per object.")) @@ -399,17 +418,17 @@ def tag_object( ) ObjectTagClass = self.object_tag_class - current_tags = { - tag.tag_ref: tag - for tag in ObjectTagClass.objects.filter( + current_tags = list( + ObjectTagClass.objects.filter( taxonomy=self, object_id=object_id, ) - } + ) updated_tags = [] for tag_ref in tags: - if tag_ref in current_tags: - object_tag = current_tags.pop(tag_ref) + object_tag_index = _find_object_tag_index(tag_ref, current_tags) + if object_tag_index >= 0: + object_tag = current_tags.pop(object_tag_index) else: object_tag = ObjectTagClass( taxonomy=self, @@ -429,7 +448,7 @@ def tag_object( object_tag.save() # ...and delete any omitted existing tags - for old_tag in current_tags.values(): + for old_tag in current_tags: old_tag.delete() return updated_tags diff --git a/openedx_tagging/core/tagging/rest_api/v1/serializers.py b/openedx_tagging/core/tagging/rest_api/v1/serializers.py index e278b84f..787d3427 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/serializers.py +++ b/openedx_tagging/core/tagging/rest_api/v1/serializers.py @@ -55,3 +55,19 @@ class Meta: "tag_ref", "is_valid", ] + + +class ObjectTagUpdateBodySerializer(serializers.Serializer): + """ + Serializer of the body for the ObjectTag UPDATE view + """ + + tags = serializers.ListField(child=serializers.CharField(), required=True) + + +class ObjectTagUpdateQueryParamsSerializer(serializers.Serializer): + """ + Serializer of the query params for the ObjectTag UPDATE view + """ + + taxonomy = serializers.PrimaryKeyRelatedField(queryset=Taxonomy.objects.all(), required=True) diff --git a/openedx_tagging/core/tagging/rest_api/v1/views.py b/openedx_tagging/core/tagging/rest_api/v1/views.py index 3e8ae18c..008859f5 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/views.py +++ b/openedx_tagging/core/tagging/rest_api/v1/views.py @@ -3,17 +3,19 @@ """ from django.db import models from django.http import Http404 -from django.shortcuts import get_object_or_404 -from rest_framework import status -from rest_framework.response import Response -from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet +from rest_framework import mixins +from rest_framework.exceptions import MethodNotAllowed, PermissionDenied, ValidationError +from rest_framework.viewsets import GenericViewSet, ModelViewSet -from ...api import create_taxonomy, get_object_tags, get_taxonomies, get_taxonomy +from ...api import create_taxonomy, get_object_tags, get_taxonomies, get_taxonomy, tag_object from ...models import Taxonomy +from ...rules import ChangeObjectTagPermissionItem from .permissions import ObjectTagObjectPermissions, TaxonomyObjectPermissions from .serializers import ( ObjectTagListQueryParamsSerializer, ObjectTagSerializer, + ObjectTagUpdateBodySerializer, + ObjectTagUpdateQueryParamsSerializer, TaxonomyListQueryParamsSerializer, TaxonomySerializer, ) @@ -167,10 +169,9 @@ def perform_create(self, serializer) -> None: serializer.instance = create_taxonomy(**serializer.validated_data) -class ObjectTagView(ReadOnlyModelViewSet): +class ObjectTagView(mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.ListModelMixin, GenericViewSet): """ - View to retrieve paginated ObjectTags for an Object, given its Object ID. - (What tags does this object have?) + View to retrieve paginated ObjectTags for a provided Object ID (object_id). **Retrieve Parameters** * object_id (required): - The Object ID to retrieve ObjectTags for. @@ -195,7 +196,14 @@ class ObjectTagView(ReadOnlyModelViewSet): * 403 - Permission denied * 405 - Method not allowed + **Update Parameters** + * object_id (required): - The Object ID to add ObjectTags for. + + **Update Request Body** + * tags: List of tags to be applied to a object id. Must be a list of Tag ids or Tag values. + **Update Query Returns** + * 200 - Success * 403 - Permission denied * 405 - Method not allowed @@ -224,8 +232,8 @@ def get_queryset(self) -> models.QuerySet: def retrieve(self, request, object_id=None): """ - Retrieve ObjectTags that belong to a given Object given its - object_id and return paginated results. + Retrieve ObjectTags that belong to a given object_id and + return paginated results. Note: We override `retrieve` here instead of `list` because we are passing in the Object ID (object_id) in the path (as opposed to passing @@ -238,3 +246,58 @@ def retrieve(self, request, object_id=None): paginated_object_tags = self.paginate_queryset(object_tags) serializer = ObjectTagSerializer(paginated_object_tags, many=True) return self.get_paginated_response(serializer.data) + + def update(self, request, object_id, partial=False): + """ + Update ObjectTags that belong to a given object_id and + return the list of these ObjecTags paginated. + + Pass a list of Tag ids or Tag values to be applied to an object id in the + body `tag` parameter. Passing an empty list will remove all tags from + the object id. + + **Example Body Requests** + + PUT api/tagging/v1/object_tags/:object_id + + **Example Body Requests** + ```json + { + "tags": [1, 2, 3] + }, + { + "tags": ["Tag 1", "Tag 2"] + }, + { + "tags": [] + } + """ + + if partial: + raise MethodNotAllowed("PATCH", detail="PATCH not allowed") + + query_params = ObjectTagUpdateQueryParamsSerializer(data=request.query_params.dict()) + query_params.is_valid(raise_exception=True) + taxonomy = query_params.validated_data.get("taxonomy", None) + taxonomy = taxonomy.cast() + + perm = f"{taxonomy._meta.app_label}.change_objecttag" + + perm_obj = ChangeObjectTagPermissionItem( + taxonomy=taxonomy, + object_id=object_id, + ) + + if not request.user.has_perm(perm, perm_obj): + raise PermissionDenied("You do not have permission to change object tags for this taxonomy or object_id.") + + body = ObjectTagUpdateBodySerializer(data=request.data) + body.is_valid(raise_exception=True) + + tags = body.data.get("tags", []) + try: + tag_object(taxonomy, tags, object_id) + except ValueError as e: + raise ValidationError(e) + + return self.retrieve(request, object_id) diff --git a/openedx_tagging/core/tagging/rules.py b/openedx_tagging/core/tagging/rules.py index 8f0852a1..880fe19c 100644 --- a/openedx_tagging/core/tagging/rules.py +++ b/openedx_tagging/core/tagging/rules.py @@ -8,8 +8,9 @@ import django.contrib.auth.models # typing support in rules depends on https://github.com/dfunckt/django-rules/pull/177 import rules # type: ignore[import] +from attrs import define -from .models import ObjectTag, Tag, Taxonomy +from .models import Tag, Taxonomy UserType = Union[django.contrib.auth.models.User, django.contrib.auth.models.AnonymousUser] @@ -19,6 +20,16 @@ is_taxonomy_admin: Callable[[UserType], bool] = rules.is_staff +@define +class ChangeObjectTagPermissionItem: + """ + Pair of taxonomy and object_id used for permission checking. + """ + + taxonomy: Taxonomy + object_id: str + + @rules.predicate def can_view_taxonomy(user: UserType, taxonomy: Taxonomy | None = None) -> bool: """ @@ -53,18 +64,39 @@ def can_change_tag(user: UserType, tag: Tag | None = None) -> bool: @rules.predicate -def can_change_object_tag(user: UserType, object_tag: ObjectTag | None = None) -> bool: +def can_change_object_tag_objectid(_user: UserType, _object_id: str) -> bool: """ - Taxonomy admins can create or modify object tags on enabled taxonomies. + Nobody can create or modify object tags without checking the permission for the tagged object. + + This rule should be defined in other apps for proper permission checking. """ - taxonomy = ( - object_tag.taxonomy.cast() if (object_tag and object_tag.taxonomy) else None - ) - object_tag = taxonomy.object_tag_class.cast(object_tag) if taxonomy else object_tag - return is_taxonomy_admin(user) and ( - not object_tag or not taxonomy or (taxonomy and taxonomy.enabled) + return False + + +@rules.predicate +def can_change_object_tag(user: UserType, perm_obj: ChangeObjectTagPermissionItem | None = None) -> bool: + """ + Checks if the user has permissions to create or modify tags on the given taxonomy and object_id. + """ + + # The following code allows METHOD permission (PUT) in the viewset for everyone + if perm_obj is None: + return True + + # Checks the permission for the taxonomy + taxonomy_perm = user.has_perm("oel_tagging.change_objecttag_taxonomy", perm_obj.taxonomy) + if not taxonomy_perm: + return False + + # Checks the permission for the object_id + objectid_perm = user.has_perm( + "oel_tagging.change_objecttag_objectid", + # The obj arg expects an object, but we are passing a string + perm_obj.object_id, # type: ignore[arg-type] ) + return objectid_perm + # Taxonomy rules.add_perm("oel_tagging.add_taxonomy", can_change_taxonomy) @@ -81,5 +113,9 @@ def can_change_object_tag(user: UserType, object_tag: ObjectTag | None = None) - # ObjectTag rules.add_perm("oel_tagging.add_objecttag", can_change_object_tag) rules.add_perm("oel_tagging.change_objecttag", can_change_object_tag) -rules.add_perm("oel_tagging.delete_objecttag", is_taxonomy_admin) +rules.add_perm("oel_tagging.delete_objecttag", can_change_object_tag) rules.add_perm("oel_tagging.view_objecttag", rules.always_allow) + +# Users can tag objects using tags from any taxonomy that they have permission to view +rules.add_perm("oel_tagging.change_objecttag_taxonomy", can_view_taxonomy) +rules.add_perm("oel_tagging.change_objecttag_objectid", can_change_object_tag_objectid) diff --git a/tests/openedx_tagging/core/tagging/test_api.py b/tests/openedx_tagging/core/tagging/test_api.py index 0a19e276..46e8c190 100644 --- a/tests/openedx_tagging/core/tagging/test_api.py +++ b/tests/openedx_tagging/core/tagging/test_api.py @@ -27,6 +27,7 @@ class TestApiTagging(TestTagTaxonomyMixin, TestCase): """ Test the Tagging API methods. """ + def test_create_taxonomy(self) -> None: # Note: we must specify '-> None' to opt in to type checking params: dict[str, Any] = { "name": "Difficulty", @@ -42,11 +43,11 @@ def test_create_taxonomy(self) -> None: # Note: we must specify '-> None' to op assert not taxonomy.system_defined assert taxonomy.visible_to_authors - def test_bad_taxonomy_class(self): + def test_bad_taxonomy_class(self) -> None: with self.assertRaises(ValueError) as exc: tagging_api.create_taxonomy( name="Bad class", - taxonomy_class=str, + taxonomy_class=str, # type: ignore[arg-type] ) assert " must be a subclass of Taxonomy" in str(exc.exception) @@ -114,7 +115,7 @@ def check_object_tag( tag: Tag | None, name: str, value: str, - ): + ) -> None: """ Verifies that the properties of the given object_tag (once refreshed from the database) match those given. """ @@ -344,9 +345,106 @@ def test_tag_object_invalid_tag(self) -> None: ["Eukaryota Xenomorph"], "biology101", ) - assert "Invalid object tag for taxonomy (1): Eukaryota Xenomorph" in str( - exc.exception + assert "Invalid object tag for taxonomy (1): Eukaryota Xenomorph" in str(exc.exception) + + def test_tag_object_string(self) -> None: + with self.assertRaises(ValueError) as exc: + tagging_api.tag_object( + self.taxonomy, + 'string', # type: ignore[arg-type] + "biology101", + ) + assert "Tags must be a list, not str." in str(exc.exception) + + def test_tag_object_integer(self) -> None: + with self.assertRaises(ValueError) as exc: + tagging_api.tag_object( + self.taxonomy, + 1, # type: ignore[arg-type] + "biology101", + ) + assert "Tags must be a list, not int." in str(exc.exception) + + def test_tag_object_same_id(self) -> None: + # Tag the object with the same id twice + tagging_api.tag_object( + self.taxonomy, + [self.eubacteria.id], + "biology101", + ) + object_tags = tagging_api.tag_object( + self.taxonomy, + [self.eubacteria.id], + "biology101", + ) + assert len(object_tags) == 1 + assert str(object_tags[0]) == " biology101: Life on Earth=Eubacteria" + + def test_tag_object_same_value(self) -> None: + # Tag the object with the same value twice + tagging_api.tag_object( + self.taxonomy, + ["Eubacteria"], + "biology101", + ) + object_tags = tagging_api.tag_object( + self.taxonomy, + ["Eubacteria"], + "biology101", + ) + + assert len(object_tags) == 1 + assert str(object_tags[0]) == " biology101: Life on Earth=Eubacteria" + + def test_tag_object_same_mixed(self) -> None: + # Tag the object with the same id/value twice + tagging_api.tag_object( + self.taxonomy, + [self.eubacteria.id], + "biology101", + ) + object_tags = tagging_api.tag_object( + self.taxonomy, + ["Eubacteria"], + "biology101", + ) + + assert len(object_tags) == 1 + assert str(object_tags[0]) == " biology101: Life on Earth=Eubacteria" + + def test_tag_object_same_id_multiple(self) -> None: + self.taxonomy.allow_multiple = True + self.taxonomy.save() + # Tag the object with the same value twice + object_tags = tagging_api.tag_object( + self.taxonomy, + [self.eubacteria.id, self.eubacteria.id], + "biology101", + ) + assert len(object_tags) == 1 + + def test_tag_object_same_value_multiple(self) -> None: + self.taxonomy.allow_multiple = True + self.taxonomy.save() + # Tag the object with the same value twice + object_tags = tagging_api.tag_object( + self.taxonomy, + ["Eubacteria", "Eubacteria"], + "biology101", ) + assert len(object_tags) == 1 + + def test_tag_object_same_value_multiple_free(self) -> None: + self.taxonomy.allow_multiple = True + self.taxonomy.allow_free_text = True + self.taxonomy.save() + # Tag the object with the same value twice + object_tags = tagging_api.tag_object( + self.taxonomy, + ["tag1", "tag1"], + "biology101", + ) + assert len(object_tags) == 1 @override_settings(LANGUAGES=test_languages) def test_tag_object_language_taxonomy(self) -> None: @@ -493,7 +591,7 @@ def test_get_object_tags(self) -> None: ), ) @ddt.unpack - def test_autocomplete_tags(self, search: str, expected_values: list[str], expected_ids: list[int | None]): + def test_autocomplete_tags(self, search: str, expected_values: list[str], expected_ids: list[int | None]) -> None: tags = [ 'Archaea', 'Archaebacteria', diff --git a/tests/openedx_tagging/core/tagging/test_rules.py b/tests/openedx_tagging/core/tagging/test_rules.py index 76300f85..d49f8b99 100644 --- a/tests/openedx_tagging/core/tagging/test_rules.py +++ b/tests/openedx_tagging/core/tagging/test_rules.py @@ -2,10 +2,12 @@ Tests tagging rules-based permissions """ import ddt # type: ignore[import] +import rules # type: ignore[import] from django.contrib.auth import get_user_model from django.test.testcases import TestCase -from openedx_tagging.core.tagging.models import ObjectTag, Tag +from openedx_tagging.core.tagging.models import ObjectTag +from openedx_tagging.core.tagging.rules import ChangeObjectTagPermissionItem from .test_models import TestTagTaxonomyMixin @@ -19,7 +21,15 @@ class TestRulesTagging(TestTagTaxonomyMixin, TestCase): """ def setUp(self): + + def _object_permission(_user, object_id: str) -> bool: + """ + Everyone have object permission on object_id "abc" + """ + return object_id == "abc" + super().setUp() + self.superuser = User.objects.create( username="superuser", email="superuser@example.com", @@ -37,9 +47,13 @@ def setUp(self): self.object_tag = ObjectTag.objects.create( taxonomy=self.taxonomy, tag=self.bacteria, + object_id="abc", ) self.object_tag.save() + # Override the object permission for the test + rules.set_perm("oel_tagging.change_objecttag_objectid", _object_permission) + # Taxonomy @ddt.data( @@ -135,20 +149,6 @@ def test_tag_free_text_taxonomy(self, perm): assert not self.staff.has_perm(perm, self.bacteria) assert not self.learner.has_perm(perm, self.bacteria) - @ddt.data( - "oel_tagging.add_tag", - "oel_tagging.change_tag", - "oel_tagging.delete_tag", - ) - def test_tag_no_taxonomy(self, perm): - """ - Taxonomy administrators can modify any Tag, even those with no Taxonnmy. - """ - tag = Tag() - assert self.superuser.has_perm(perm, tag) - assert self.staff.has_perm(perm, tag) - assert not self.learner.has_perm(perm, tag) - @ddt.data( True, False, @@ -182,64 +182,60 @@ def test_view_tag(self): @ddt.data( "oel_tagging.add_objecttag", "oel_tagging.change_objecttag", + "oel_tagging.delete_objecttag", ) def test_add_change_object_tag(self, perm): """ - Taxonomy administrators can create/edit an ObjectTag with an enabled Taxonomy + Everyone can create/edit an ObjectTag with an enabled Taxonomy """ + obj_perm = ChangeObjectTagPermissionItem( + taxonomy=self.object_tag.taxonomy, + object_id=self.object_tag.object_id, + ) assert self.superuser.has_perm(perm) - assert self.superuser.has_perm(perm, self.object_tag) + assert self.superuser.has_perm(perm, obj_perm) assert self.staff.has_perm(perm) - assert self.staff.has_perm(perm, self.object_tag) - assert not self.learner.has_perm(perm) - assert not self.learner.has_perm(perm, self.object_tag) + assert self.staff.has_perm(perm, obj_perm) + assert self.learner.has_perm(perm) + assert self.learner.has_perm(perm, obj_perm) @ddt.data( "oel_tagging.add_objecttag", "oel_tagging.change_objecttag", + "oel_tagging.delete_objecttag", ) def test_object_tag_disabled_taxonomy(self, perm): """ - Taxonomy administrators cannot create/edit an ObjectTag with a disabled Taxonomy + Only Taxonomy administrators can create/edit an ObjectTag with a disabled Taxonomy """ self.taxonomy.enabled = False self.taxonomy.save() - assert self.superuser.has_perm(perm, self.object_tag) - assert not self.staff.has_perm(perm, self.object_tag) - assert not self.learner.has_perm(perm, self.object_tag) - - @ddt.data( - True, - False, - ) - def test_delete_objecttag(self, enabled): - """ - Taxonomy administrators can delete any ObjectTag, even those associated with a disabled Taxonomy. - """ - self.taxonomy.enabled = enabled - self.taxonomy.save() - assert self.superuser.has_perm("oel_tagging.delete_objecttag") - assert self.superuser.has_perm("oel_tagging.delete_objecttag", self.object_tag) - assert self.staff.has_perm("oel_tagging.delete_objecttag") - assert self.staff.has_perm("oel_tagging.delete_objecttag", self.object_tag) - assert not self.learner.has_perm("oel_tagging.delete_objecttag") - assert not self.learner.has_perm( - "oel_tagging.delete_objecttag", self.object_tag + obj_perm = ChangeObjectTagPermissionItem( + taxonomy=self.object_tag.taxonomy, + object_id=self.object_tag.object_id, ) + assert self.superuser.has_perm(perm, obj_perm) + assert self.staff.has_perm(perm, obj_perm) + assert not self.learner.has_perm(perm, obj_perm) @ddt.data( "oel_tagging.add_objecttag", "oel_tagging.change_objecttag", "oel_tagging.delete_objecttag", ) - def test_object_tag_no_taxonomy(self, perm): + def test_object_tag_without_object_permission(self, perm): """ - Taxonomy administrators can modify an ObjectTag with no Taxonomy + Only superusers can create/edit an ObjectTag without object permission """ - object_tag = ObjectTag() - assert self.superuser.has_perm(perm, object_tag) - assert self.staff.has_perm(perm, object_tag) - assert not self.learner.has_perm(perm, object_tag) + self.taxonomy.enabled = False + self.taxonomy.save() + obj_perm = ChangeObjectTagPermissionItem( + taxonomy=self.object_tag.taxonomy, + object_id="not abc", + ) + assert self.superuser.has_perm(perm, obj_perm) + assert not self.staff.has_perm(perm, obj_perm) + assert not self.learner.has_perm(perm, obj_perm) def test_view_object_tag(self): """ diff --git a/tests/openedx_tagging/core/tagging/test_views.py b/tests/openedx_tagging/core/tagging/test_views.py index 52762519..a50ac3e6 100644 --- a/tests/openedx_tagging/core/tagging/test_views.py +++ b/tests/openedx_tagging/core/tagging/test_views.py @@ -6,6 +6,8 @@ from urllib.parse import parse_qs, urlparse import ddt # type: ignore[import] +# typing support in rules depends on https://github.com/dfunckt/django-rules/pull/177 +import rules # type: ignore[import] from django.contrib.auth import get_user_model from rest_framework import status from rest_framework.test import APITestCase @@ -19,7 +21,10 @@ TAXONOMY_DETAIL_URL = "/tagging/rest_api/v1/taxonomies/{pk}/" -OBJECT_TAGS_RETRIEVE_URL = '/tagging/rest_api/v1/object_tags/{object_id}/' +OBJECT_TAGS_RETRIEVE_URL = "/tagging/rest_api/v1/object_tags/{object_id}/" +OBJECT_TAGS_UPDATE_URL = "/tagging/rest_api/v1/object_tags/{object_id}/?taxonomy={taxonomy_id}" + +LANGUAGE_TAXONOMY_ID = -1 def check_taxonomy( @@ -396,6 +401,13 @@ class TestObjectTagViewSet(APITestCase): """ def setUp(self): + + def _object_permission(_user, object_id: str) -> bool: + """ + Everyone have object permission on object_id "abc" + """ + return object_id == "abc" + super().setUp() self.user = User.objects.create( @@ -410,70 +422,55 @@ def setUp(self): ) # System-defined language taxonomy with valid ObjectTag - self.system_taxonomy = SystemDefinedTaxonomy.objects.create( - name="System Taxonomy" - ) - self.tag1 = Tag.objects.create( - taxonomy=self.system_taxonomy, value="Tag 1" - ) - ObjectTag.objects.create( - object_id="abc", - taxonomy=self.system_taxonomy, - tag=self.tag1 - ) + self.system_taxonomy = SystemDefinedTaxonomy.objects.create(name="System Taxonomy") + self.tag1 = Tag.objects.create(taxonomy=self.system_taxonomy, value="Tag 1") + ObjectTag.objects.create(object_id="abc", taxonomy=self.system_taxonomy, tag=self.tag1) + + # Language system-defined language taxonomy + self.language_taxonomy = Taxonomy.objects.get(pk=LANGUAGE_TAXONOMY_ID) # Closed Taxonomies created by taxonomy admins, each with 20 ObjectTags self.enabled_taxonomy = Taxonomy.objects.create(name="Enabled Taxonomy") + self.disabled_taxonomy = Taxonomy.objects.create(name="Disabled Taxonomy", enabled=False) + self.multiple_taxonomy = Taxonomy.objects.create(name="Multiple Taxonomy", allow_multiple=True) for i in range(20): # Valid ObjectTags - tag = Tag.objects.create( - taxonomy=self.enabled_taxonomy, value=f"Tag {i}" + tag_enabled = Tag.objects.create(taxonomy=self.enabled_taxonomy, value=f"Tag {i}") + tag_disabled = Tag.objects.create(taxonomy=self.disabled_taxonomy, value=f"Tag {i}") + tag_multiple = Tag.objects.create(taxonomy=self.multiple_taxonomy, value=f"Tag {i}") + ObjectTag.objects.create( + object_id="abc", taxonomy=self.enabled_taxonomy, tag=tag_enabled, _value=tag_enabled.value ) ObjectTag.objects.create( - object_id="abc", taxonomy=self.enabled_taxonomy, - tag=tag, _value=tag.value + object_id="abc", taxonomy=self.disabled_taxonomy, tag=tag_disabled, _value=tag_disabled.value + ) + ObjectTag.objects.create( + object_id="abc", taxonomy=self.multiple_taxonomy, tag=tag_multiple, _value=tag_multiple.value ) - - # Taxonomy with invalid ObjectTag - self.taxonomy_with_invalid_object_tag = Taxonomy.objects.create() - self.to_be_deleted_tag = Tag.objects.create( - taxonomy=self.enabled_taxonomy, value="Deleted Tag" - ) - ObjectTag.objects.create( - object_id="abc", taxonomy=self.taxonomy_with_invalid_object_tag, - tag=self.to_be_deleted_tag, _value=self.to_be_deleted_tag.value - ) - self.to_be_deleted_tag.delete() # Delete tag so ObjectTag is invalid # Free-Text Taxonomies created by taxonomy admins, each linked # to 200 ObjectTags - self.open_taxonomy_enabled = Taxonomy.objects.create( - name="Enabled Free-Text Taxonomy", allow_free_text=True - ) + self.open_taxonomy_enabled = Taxonomy.objects.create(name="Enabled Free-Text Taxonomy", allow_free_text=True) self.open_taxonomy_disabled = Taxonomy.objects.create( - name="Disabled Free-Text Taxonomy", enabled=False, allow_free_text=True + name="Disabled Free-Text Taxonomy", allow_free_text=True, enabled=False ) for i in range(200): - ObjectTag.objects.create( - object_id="abc", taxonomy=self.open_taxonomy_enabled, _value=f"Free Text {i}" - ) - ObjectTag.objects.create( - object_id="abc", taxonomy=self.open_taxonomy_disabled, _value=f"Free Text {i}" - ) + ObjectTag.objects.create(object_id="abc", taxonomy=self.open_taxonomy_enabled, _value=f"Free Text {i}") + ObjectTag.objects.create(object_id="abc", taxonomy=self.open_taxonomy_disabled, _value=f"Free Text {i}") + + # Override the object permission for the test + rules.set_perm("oel_tagging.change_objecttag_objectid", _object_permission) @ddt.data( (None, "abc", status.HTTP_403_FORBIDDEN, None, None), - ("user", "abc", status.HTTP_200_OK, 422, 10), - ("staff", "abc", status.HTTP_200_OK, 422, 10), + ("user", "abc", status.HTTP_200_OK, 461, 10), + ("staff", "abc", status.HTTP_200_OK, 461, 10), (None, "non-existing-id", status.HTTP_403_FORBIDDEN, None, None), ("user", "non-existing-id", status.HTTP_200_OK, 0, 0), ("staff", "non-existing-id", status.HTTP_200_OK, 0, 0), ) @ddt.unpack - def test_retrieve_object_tags( - self, user_attr, object_id, expected_status, expected_count, expected_results - - ): + def test_retrieve_object_tags(self, user_attr, object_id, expected_status, expected_count, expected_results): """ Test retrieving object tags """ @@ -492,15 +489,13 @@ def test_retrieve_object_tags( assert len(response.data.get("results")) == expected_results @ddt.data( - (None, "abc", status.HTTP_403_FORBIDDEN, None, None, None, None), - ("user", "abc", status.HTTP_200_OK, 20, 10, 1, 1), - ("staff", "abc", status.HTTP_200_OK, 20, 10, 1, 1), + (None, "abc", status.HTTP_403_FORBIDDEN, None, None), + ("user", "abc", status.HTTP_200_OK, 20, 10), + ("staff", "abc", status.HTTP_200_OK, 20, 10), ) @ddt.unpack def test_retrieve_object_tags_taxonomy_queryparam( - self, user_attr, object_id, expected_status, - expected_count, expected_results, - expected_invalid_count, expected_invalid_results + self, user_attr, object_id, expected_status, expected_count, expected_results ): """ Test retrieving object tags for specific taxonomies provided @@ -511,7 +506,6 @@ def test_retrieve_object_tags_taxonomy_queryparam( user = getattr(self, user_attr) self.client.force_authenticate(user=user) - # Check valid object tags response = self.client.get(url, {"taxonomy": self.enabled_taxonomy.pk}) assert response.status_code == expected_status if status.is_success(expected_status): @@ -523,30 +517,13 @@ def test_retrieve_object_tags_taxonomy_queryparam( assert object_tag.get("is_valid") is True assert object_tag.get("taxonomy_id") == self.enabled_taxonomy.pk - # Check invalid object tags - response = self.client.get( - url, {"taxonomy": self.taxonomy_with_invalid_object_tag.pk} - ) - assert response.status_code == expected_status - if status.is_success(expected_status): - assert response.data.get("count") == expected_invalid_count - assert response.data.get("results") is not None - assert len(response.data.get("results")) == expected_invalid_results - object_tags = response.data.get("results") - for object_tag in object_tags: - assert object_tag.get("is_valid") is False - assert object_tag.get("taxonomy_id") == \ - self.taxonomy_with_invalid_object_tag.pk - @ddt.data( (None, "abc", status.HTTP_403_FORBIDDEN), ("user", "abc", status.HTTP_400_BAD_REQUEST), ("staff", "abc", status.HTTP_400_BAD_REQUEST), ) @ddt.unpack - def test_retrieve_object_tags_invalid_taxonomy_queryparam( - self, user_attr, object_id, expected_status - ): + def test_retrieve_object_tags_invalid_taxonomy_queryparam(self, user_attr, object_id, expected_status): """ Test retrieving object tags for invalid taxonomy """ @@ -591,10 +568,7 @@ def test_retrieve_object_tags_pagination( user = getattr(self, user_attr) self.client.force_authenticate(user=user) - query_params = { - "taxonomy": self.open_taxonomy_enabled.pk, - "page": page - } + query_params = {"taxonomy": self.open_taxonomy_enabled.pk, "page": page} if page_size: query_params["page_size"] = page_size @@ -610,25 +584,24 @@ def test_retrieve_object_tags_pagination( @ddt.data( (None, "POST", status.HTTP_403_FORBIDDEN), - (None, "PUT", status.HTTP_403_FORBIDDEN), (None, "PATCH", status.HTTP_403_FORBIDDEN), (None, "DELETE", status.HTTP_403_FORBIDDEN), - ("user", "POST", status.HTTP_403_FORBIDDEN), - ("user", "PUT", status.HTTP_403_FORBIDDEN), - ("user", "PATCH", status.HTTP_403_FORBIDDEN), - ("user", "DELETE", status.HTTP_403_FORBIDDEN), + ("user", "POST", status.HTTP_405_METHOD_NOT_ALLOWED), + ("user", "PATCH", status.HTTP_405_METHOD_NOT_ALLOWED), + ("user", "DELETE", status.HTTP_405_METHOD_NOT_ALLOWED), ("staff", "POST", status.HTTP_405_METHOD_NOT_ALLOWED), - ("staff", "PUT", status.HTTP_405_METHOD_NOT_ALLOWED), ("staff", "PATCH", status.HTTP_405_METHOD_NOT_ALLOWED), ("staff", "DELETE", status.HTTP_405_METHOD_NOT_ALLOWED), ) @ddt.unpack def test_object_tags_remaining_http_methods( - self, user_attr, http_method, expected_status, - + self, + user_attr, + http_method, + expected_status, ): """ - Test POST/PUT/PATCH/DELETE method for ObjectTagView + Test POST/PATCH/DELETE method for ObjectTagView Only staff users should have permissions to perform the actions, however the methods are currently not allowed. @@ -640,18 +613,172 @@ def test_object_tags_remaining_http_methods( self.client.force_authenticate(user=user) if http_method == "POST": - response = self.client.post( - url, {"test": "payload"}, format="json" - ) - elif http_method == "PUT": - response = self.client.put( - url, {"test": "payload"}, format="json" - ) + response = self.client.post(url, {"test": "payload"}, format="json") elif http_method == "PATCH": - response = self.client.patch( - url, {"test": "payload"}, format="json" - ) + response = self.client.patch(url, {"test": "payload"}, format="json") elif http_method == "DELETE": response = self.client.delete(url) assert response.status_code == expected_status + + @ddt.data( + # Users and staff can add tags to a taxonomy + (None, "language_taxonomy", ["Portuguese"], status.HTTP_403_FORBIDDEN), + ("user", "language_taxonomy", ["Portuguese"], status.HTTP_200_OK), + ("staff", "language_taxonomy", ["Portuguese"], status.HTTP_200_OK), + # Users and staff can clear add tags to a taxonomy + (None, "enabled_taxonomy", ["Tag 1"], status.HTTP_403_FORBIDDEN), + ("user", "enabled_taxonomy", ["Tag 1"], status.HTTP_200_OK), + ("staff", "enabled_taxonomy", ["Tag 1"], status.HTTP_200_OK), + # Only staff can add tag to a disabled taxonomy + (None, "disabled_taxonomy", ["Tag 1"], status.HTTP_403_FORBIDDEN), + ("user", "disabled_taxonomy", ["Tag 1"], status.HTTP_403_FORBIDDEN), + ("staff", "disabled_taxonomy", ["Tag 1"], status.HTTP_200_OK), + # Users and staff can add a single tag to a allow_multiple=True taxonomy + (None, "multiple_taxonomy", ["Tag 1"], status.HTTP_403_FORBIDDEN), + ("user", "multiple_taxonomy", ["Tag 1"], status.HTTP_200_OK), + ("staff", "multiple_taxonomy", ["Tag 1"], status.HTTP_200_OK), + # Users and staff can add tags to an open taxonomy + (None, "open_taxonomy_enabled", ["tag1"], status.HTTP_403_FORBIDDEN), + ("user", "open_taxonomy_enabled", ["tag1"], status.HTTP_200_OK), + ("staff", "open_taxonomy_enabled", ["tag1"], status.HTTP_200_OK), + # Only staff can add tags to a disabled open taxonomy + (None, "open_taxonomy_disabled", ["tag1"], status.HTTP_403_FORBIDDEN), + ("user", "open_taxonomy_disabled", ["tag1"], status.HTTP_403_FORBIDDEN), + ("staff", "open_taxonomy_disabled", ["tag1"], status.HTTP_200_OK), + ) + @ddt.unpack + def test_tag_object(self, user_attr, taxonomy_attr, tag_values, expected_status): + if user_attr: + user = getattr(self, user_attr) + self.client.force_authenticate(user=user) + + taxonomy = getattr(self, taxonomy_attr) + + url = OBJECT_TAGS_UPDATE_URL.format(object_id="abc", taxonomy_id=taxonomy.pk) + + response = self.client.put(url, {"tags": tag_values}, format="json") + assert response.status_code == expected_status + if status.is_success(expected_status): + assert len(response.data.get("results")) == len(tag_values) + assert set(t["value"] for t in response.data["results"]) == set(tag_values) + + @ddt.data( + # Can't add invalid tags to a closed taxonomy + (None, "language_taxonomy", ["Invalid"], status.HTTP_403_FORBIDDEN), + ("user", "language_taxonomy", ["Invalid"], status.HTTP_400_BAD_REQUEST), + ("staff", "language_taxonomy", ["Invalid"], status.HTTP_400_BAD_REQUEST), + (None, "enabled_taxonomy", ["invalid"], status.HTTP_403_FORBIDDEN), + ("user", "enabled_taxonomy", ["invalid"], status.HTTP_400_BAD_REQUEST), + ("staff", "enabled_taxonomy", ["invalid"], status.HTTP_400_BAD_REQUEST), + (None, "multiple_taxonomy", ["invalid"], status.HTTP_403_FORBIDDEN), + ("user", "multiple_taxonomy", ["invalid"], status.HTTP_400_BAD_REQUEST), + ("staff", "multiple_taxonomy", ["invalid"], status.HTTP_400_BAD_REQUEST), + # Users can't edit tags from a disabled taxonomy. Staff can't add invalid tags to a closed taxonomy + (None, "disabled_taxonomy", ["invalid"], status.HTTP_403_FORBIDDEN), + ("user", "disabled_taxonomy", ["invalid"], status.HTTP_403_FORBIDDEN), + ("staff", "disabled_taxonomy", ["invalid"], status.HTTP_400_BAD_REQUEST), + ) + @ddt.unpack + def test_tag_object_invalid(self, user_attr, taxonomy_attr, tag_values, expected_status): + if user_attr: + user = getattr(self, user_attr) + self.client.force_authenticate(user=user) + + taxonomy = getattr(self, taxonomy_attr) + + url = OBJECT_TAGS_UPDATE_URL.format(object_id="abc", taxonomy_id=taxonomy.pk) + + response = self.client.put(url, {"tags": tag_values}, format="json") + assert response.status_code == expected_status + assert not status.is_success(expected_status) # No success cases here + + @ddt.data( + # Users and staff can clear tags from a taxonomy + (None, "enabled_taxonomy", [], status.HTTP_403_FORBIDDEN), + ("user", "enabled_taxonomy", [], status.HTTP_200_OK), + ("staff", "enabled_taxonomy", [], status.HTTP_200_OK), + # Users and staff can clear tags from a allow_multiple=True taxonomy + (None, "multiple_taxonomy", [], status.HTTP_403_FORBIDDEN), + ("user", "multiple_taxonomy", [], status.HTTP_200_OK), + ("staff", "multiple_taxonomy", [], status.HTTP_200_OK), + # Only staff can clear tags from a disabled taxonomy + (None, "disabled_taxonomy", [], status.HTTP_403_FORBIDDEN), + ("user", "disabled_taxonomy", [], status.HTTP_403_FORBIDDEN), + ("staff", "disabled_taxonomy", [], status.HTTP_200_OK), + (None, "open_taxonomy_disabled", [], status.HTTP_403_FORBIDDEN), + ("user", "open_taxonomy_disabled", [], status.HTTP_403_FORBIDDEN), + ("staff", "open_taxonomy_disabled", [], status.HTTP_200_OK), + # Users and staff can't clear a taxonomy with required=True + (None, "language_taxonomy", [], status.HTTP_403_FORBIDDEN), + ("user", "language_taxonomy", [], status.HTTP_400_BAD_REQUEST), + ("staff", "language_taxonomy", [], status.HTTP_400_BAD_REQUEST), + ) + @ddt.unpack + def test_tag_object_clear(self, user_attr, taxonomy_attr, tag_values, expected_status): + if user_attr: + user = getattr(self, user_attr) + self.client.force_authenticate(user=user) + + taxonomy = getattr(self, taxonomy_attr) + + url = OBJECT_TAGS_UPDATE_URL.format(object_id="abc", taxonomy_id=taxonomy.pk) + + response = self.client.put(url, {"tags": tag_values}, format="json") + assert response.status_code == expected_status + if status.is_success(expected_status): + assert len(response.data.get("results")) == len(tag_values) + assert set(t["value"] for t in response.data["results"]) == set(tag_values) + + @ddt.data( + # Users and staff can add multiple tags to a allow_multiple=True taxonomy + (None, "multiple_taxonomy", ["Tag 1", "Tag 2"], status.HTTP_403_FORBIDDEN), + ("user", "multiple_taxonomy", ["Tag 1", "Tag 2"], status.HTTP_200_OK), + ("staff", "multiple_taxonomy", ["Tag 1", "Tag 2"], status.HTTP_200_OK), + (None, "open_taxonomy_enabled", ["tag1", "tag2"], status.HTTP_403_FORBIDDEN), + ("user", "open_taxonomy_enabled", ["tag1", "tag2"], status.HTTP_400_BAD_REQUEST), + ("staff", "open_taxonomy_enabled", ["tag1", "tag2"], status.HTTP_400_BAD_REQUEST), + # Users and staff can't add multple tags to a allow_multiple=False taxonomy + (None, "enabled_taxonomy", ["Tag 1", "Tag 2"], status.HTTP_403_FORBIDDEN), + ("user", "enabled_taxonomy", ["Tag 1", "Tag 2"], status.HTTP_400_BAD_REQUEST), + ("staff", "enabled_taxonomy", ["Tag 1", "Tag 2"], status.HTTP_400_BAD_REQUEST), + (None, "language_taxonomy", ["Portuguese", "English"], status.HTTP_403_FORBIDDEN), + ("user", "language_taxonomy", ["Portuguese", "English"], status.HTTP_400_BAD_REQUEST), + ("staff", "language_taxonomy", ["Portuguese", "English"], status.HTTP_400_BAD_REQUEST), + # Users can't edit tags from a disabled taxonomy. Staff can't add multiple tags to + # a taxonomy with allow_multiple=False + (None, "disabled_taxonomy", ["Tag 1", "Tag 2"], status.HTTP_403_FORBIDDEN), + ("user", "disabled_taxonomy", ["Tag 1", "Tag 2"], status.HTTP_403_FORBIDDEN), + ("staff", "disabled_taxonomy", ["Tag 1", "Tag 2"], status.HTTP_400_BAD_REQUEST), + ) + @ddt.unpack + def test_tag_object_multiple(self, user_attr, taxonomy_attr, tag_values, expected_status): + if user_attr: + user = getattr(self, user_attr) + self.client.force_authenticate(user=user) + + taxonomy = getattr(self, taxonomy_attr) + + url = OBJECT_TAGS_UPDATE_URL.format(object_id="abc", taxonomy_id=taxonomy.pk) + + response = self.client.put(url, {"tags": tag_values}, format="json") + assert response.status_code == expected_status + if status.is_success(expected_status): + assert len(response.data.get("results")) == len(tag_values) + assert set(t["value"] for t in response.data["results"]) == set(tag_values) + + @ddt.data( + (None, status.HTTP_403_FORBIDDEN), + ("user", status.HTTP_403_FORBIDDEN), + ("staff", status.HTTP_403_FORBIDDEN), + ) + @ddt.unpack + def test_tag_object_without_permission(self, user_attr, expected_status): + if user_attr: + user = getattr(self, user_attr) + self.client.force_authenticate(user=user) + + url = OBJECT_TAGS_UPDATE_URL.format(object_id="not abc", taxonomy_id=self.enabled_taxonomy.pk) + + response = self.client.put(url, {"tags": ["Tag 1"]}, format="json") + assert response.status_code == expected_status