From 7ac8180b6e634ff2dfe5003f196506d0f02f7184 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Mon, 16 Oct 2023 21:45:32 -0600 Subject: [PATCH] fix: Updated policy as a workaround for #75 A workaround for issue #75, the `parent_resource` was converted to the **Link** type, which is manually populated using create_link() method. Since Beanie cannot/struggles to fetch links of multiple types, two new class methods were added: get_parent_resource() uses link reference to find and return parent document(Workspace/Group/Poll), and get_policy_holder() finds and returns document of policy holder(Account/Group). --- src/unipoll_api/actions/policy.py | 50 ++++++++++++---------------- src/unipoll_api/documents.py | 40 +++++++++++++--------- src/unipoll_api/utils/permissions.py | 8 +++++ 3 files changed, 54 insertions(+), 44 deletions(-) diff --git a/src/unipoll_api/actions/policy.py b/src/unipoll_api/actions/policy.py index 312e685..6d8e505 100644 --- a/src/unipoll_api/actions/policy.py +++ b/src/unipoll_api/actions/policy.py @@ -1,7 +1,7 @@ from unipoll_api import AccountManager from unipoll_api.documents import Account, Workspace, Group, Policy, Resource from unipoll_api.schemas import MemberSchemas, PolicySchemas -from unipoll_api.exceptions import PolicyExceptions, ResourceExceptions +from unipoll_api.exceptions import ResourceExceptions from unipoll_api.utils import Permissions from unipoll_api.utils.permissions import check_permissions @@ -51,21 +51,20 @@ async def get_policies(policy_holder: Account | Group | None = None, return PolicySchemas.PolicyList(policies=policy_list) -# @check_permissions(resource=policy.parent_resource, required_permissions="get_policy", permission_check=True) async def get_policy(policy: Policy, permission_check: bool = True) -> PolicySchemas.PolicyShort: - await policy.parent_resource.fetch_all_links() # type: ignore - await check_permissions(policy.parent_resource, "get_policies", permission_check) - - # Convert policy_holder link to Member object - ph_type = policy.policy_holder_type - ph_ref = policy.policy_holder.ref.id - policy_holder = await Account.get(ph_ref) if ph_type == "account" else await Group.get(ph_ref) - if not policy_holder: - raise PolicyExceptions.PolicyHolderNotFound(ph_ref) - policy_holder = MemberSchemas.Member(**policy_holder.model_dump()) # type: ignore - resource_type: str = policy.parent_resource.resource_type # type: ignore - PermissionType = eval("Permissions." + resource_type.capitalize() + "Permissions") - permissions = PermissionType(policy.permissions).name.split('|') # type: ignore + # Get the parent resource of the policy + parent_resource = await policy.get_parent_recource(fetch_links=True) + await check_permissions(parent_resource, "get_policies", permission_check) + + # Get the policy holder + policy_holder = await policy.get_policy_holder() + policy_holder = MemberSchemas.Member(**policy_holder.model_dump()) + + # Get the permissions based on the resource type and convert it to a list of strings + permission_type = Permissions.PermissionTypes[parent_resource.resource_type] + permissions = permission_type(policy.permissions).name.split('|') + + # Return the policy return PolicySchemas.PolicyShort(id=policy.id, policy_holder_type=policy.policy_holder_type, policy_holder=policy_holder.model_dump(exclude_unset=True), @@ -76,30 +75,25 @@ async def update_policy(policy: Policy, new_permissions: list[str], check_permissions: bool = True) -> PolicySchemas.PolicyOutput: - # BUG: since the parent_resource is of multiple types, it is not fetched properly, so we fetch it manually - await policy.parent_resource.fetch_all_links() # type: ignore + parent_resource = await policy.get_parent_recource(fetch_links=True) # Check if the user has the required permissions to update the policy - await Permissions.check_permissions(policy.parent_resource, "update_policies", check_permissions) - ResourcePermissions = eval( - "Permissions." + policy.parent_resource.resource_type.capitalize() + "Permissions") # type: ignore + await Permissions.check_permissions(parent_resource, "update_policies", check_permissions) + permission_type = Permissions.PermissionTypes[parent_resource.resource_type] # Calculate the new permission value from request new_permission_value = 0 for i in new_permissions: try: - new_permission_value += ResourcePermissions[i].value # type: ignore + new_permission_value += permission_type[i].value except KeyError: raise ResourceExceptions.InvalidPermission(i) # Update permissions - policy.permissions = ResourcePermissions(new_permission_value) # type: ignore + policy.permissions = permission_type(new_permission_value) await Policy.save(policy) - if policy.policy_holder_type == "account": - policy_holder = await Account.get(policy.policy_holder.ref.id) - elif policy.policy_holder_type == "group": - policy_holder = await Group.get(policy.policy_holder.ref.id) + policy_holder = await policy.get_policy_holder() return PolicySchemas.PolicyOutput( - permissions=ResourcePermissions(policy.permissions).name.split('|'), # type: ignore - policy_holder=policy_holder.model_dump()) # type: ignore + permissions=permission_type(policy.permissions).name.split('|'), + policy_holder=policy_holder.model_dump()) diff --git a/src/unipoll_api/documents.py b/src/unipoll_api/documents.py index a64245b..85855bc 100644 --- a/src/unipoll_api/documents.py +++ b/src/unipoll_api/documents.py @@ -45,7 +45,7 @@ async def add_policy(self, member: "Group | Account", permissions, save: bool = new_policy = Policy(policy_holder_type='account', policy_holder=(await create_link(member)), permissions=permissions, - parent_resource=self) # type: ignore + parent_resource=(await create_link(self))) # type: ignore # Add the policy to the group self.policies.append(new_policy) # type: ignore @@ -147,17 +147,9 @@ async def remove_member(self, account, save: bool = True) -> bool: return True -class Policy(Document): - id: ResourceID = Field(default_factory=ResourceID, alias="_id") - parent_resource: BackLink["Workspace"] | BackLink["Group"] | BackLink["Poll"] = Field(original_field="policies") - policy_holder_type: Literal["account", "group"] - policy_holder: Link["Group"] | Link["Account"] - permissions: int - - class Poll(Resource): id: ResourceID = Field(default_factory=ResourceID, alias="_id") - workspace: BackLink["Workspace"] = Field(original_field="polls") + workspace: Link[Workspace] resource_type: Literal["poll"] = "poll" public: bool published: bool @@ -165,9 +157,25 @@ class Poll(Resource): policies: list[Link["Policy"]] -# NOTE: model_rebuild is used to avoid circular imports -Resource.model_rebuild() -Workspace.model_rebuild() -Group.model_rebuild() -Policy.model_rebuild() -Poll.model_rebuild() +class Policy(Document): + id: ResourceID = Field(default_factory=ResourceID, alias="_id") + parent_resource: Link[Workspace] | Link[Group] | Link[Poll] + policy_holder_type: Literal["account", "group"] + policy_holder: Link["Group"] | Link["Account"] + permissions: int + + async def get_parent_recource(self, fetch_links: bool = False) -> Workspace | Group | Poll: + from unipoll_api.exceptions.resource import ResourceNotFound + parent = await eval(self.parent_resource.ref.collection).get(self.parent_resource.ref.id, + fetch_links=fetch_links) + if not parent: + ResourceNotFound(self.parent_resource.ref.collection, self.parent_resource.ref.id) + return parent + + async def get_policy_holder(self, fetch_links: bool = False) -> Group | Account: + from unipoll_api.exceptions.policy import PolicyHolderNotFound + policy_holder = await eval(self.policy_holder.ref.collection).get(self.policy_holder.ref.id, + fetch_links=fetch_links) + if not policy_holder: + PolicyHolderNotFound(self.policy_holder.ref.id) + return policy_holder diff --git a/src/unipoll_api/utils/permissions.py b/src/unipoll_api/utils/permissions.py index 8a67189..fd6caac 100644 --- a/src/unipoll_api/utils/permissions.py +++ b/src/unipoll_api/utils/permissions.py @@ -55,6 +55,14 @@ 'delete_policies' ]) + +PermissionTypes = { + "workspace": WorkspacePermissions, + "group": GroupPermissions, + "poll": PollPermissions +} + + WORKSPACE_ALL_PERMISSIONS = WorkspacePermissions(-1) # type: ignore WORKSPACE_BASIC_PERMISSIONS = WorkspacePermissions(sum([WorkspacePermissions["get_workspace"], WorkspacePermissions["get_members"],