Skip to content

Commit

Permalink
fix: Updated policy as a workaround for #75
Browse files Browse the repository at this point in the history
A workaround for issue #75, the `parent_resource` was converted to the **Link** type, which is manually populated using create_link() method. Since Beanie cannot/struggles to fetch links of multiple types, two new class methods were added: get_parent_resource() uses link reference to find and return parent document(Workspace/Group/Poll), and get_policy_holder() finds and returns document of policy holder(Account/Group).
  • Loading branch information
mike-pisman committed Oct 17, 2023
1 parent ff0a65f commit 7ac8180
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 44 deletions.
50 changes: 22 additions & 28 deletions src/unipoll_api/actions/policy.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from unipoll_api import AccountManager
from unipoll_api.documents import Account, Workspace, Group, Policy, Resource
from unipoll_api.schemas import MemberSchemas, PolicySchemas
from unipoll_api.exceptions import PolicyExceptions, ResourceExceptions
from unipoll_api.exceptions import ResourceExceptions
from unipoll_api.utils import Permissions
from unipoll_api.utils.permissions import check_permissions

Expand Down Expand Up @@ -51,21 +51,20 @@ async def get_policies(policy_holder: Account | Group | None = None,
return PolicySchemas.PolicyList(policies=policy_list)


# @check_permissions(resource=policy.parent_resource, required_permissions="get_policy", permission_check=True)
async def get_policy(policy: Policy, permission_check: bool = True) -> PolicySchemas.PolicyShort:
await policy.parent_resource.fetch_all_links() # type: ignore
await check_permissions(policy.parent_resource, "get_policies", permission_check)

# Convert policy_holder link to Member object
ph_type = policy.policy_holder_type
ph_ref = policy.policy_holder.ref.id
policy_holder = await Account.get(ph_ref) if ph_type == "account" else await Group.get(ph_ref)
if not policy_holder:
raise PolicyExceptions.PolicyHolderNotFound(ph_ref)
policy_holder = MemberSchemas.Member(**policy_holder.model_dump()) # type: ignore
resource_type: str = policy.parent_resource.resource_type # type: ignore
PermissionType = eval("Permissions." + resource_type.capitalize() + "Permissions")
permissions = PermissionType(policy.permissions).name.split('|') # type: ignore
# Get the parent resource of the policy
parent_resource = await policy.get_parent_recource(fetch_links=True)
await check_permissions(parent_resource, "get_policies", permission_check)

# Get the policy holder
policy_holder = await policy.get_policy_holder()
policy_holder = MemberSchemas.Member(**policy_holder.model_dump())

# Get the permissions based on the resource type and convert it to a list of strings
permission_type = Permissions.PermissionTypes[parent_resource.resource_type]
permissions = permission_type(policy.permissions).name.split('|')

# Return the policy
return PolicySchemas.PolicyShort(id=policy.id,
policy_holder_type=policy.policy_holder_type,
policy_holder=policy_holder.model_dump(exclude_unset=True),
Expand All @@ -76,30 +75,25 @@ async def update_policy(policy: Policy,
new_permissions: list[str],
check_permissions: bool = True) -> PolicySchemas.PolicyOutput:

# BUG: since the parent_resource is of multiple types, it is not fetched properly, so we fetch it manually
await policy.parent_resource.fetch_all_links() # type: ignore
parent_resource = await policy.get_parent_recource(fetch_links=True)

# Check if the user has the required permissions to update the policy
await Permissions.check_permissions(policy.parent_resource, "update_policies", check_permissions)
ResourcePermissions = eval(
"Permissions." + policy.parent_resource.resource_type.capitalize() + "Permissions") # type: ignore
await Permissions.check_permissions(parent_resource, "update_policies", check_permissions)
permission_type = Permissions.PermissionTypes[parent_resource.resource_type]

# Calculate the new permission value from request
new_permission_value = 0
for i in new_permissions:
try:
new_permission_value += ResourcePermissions[i].value # type: ignore
new_permission_value += permission_type[i].value
except KeyError:
raise ResourceExceptions.InvalidPermission(i)
# Update permissions
policy.permissions = ResourcePermissions(new_permission_value) # type: ignore
policy.permissions = permission_type(new_permission_value)
await Policy.save(policy)

if policy.policy_holder_type == "account":
policy_holder = await Account.get(policy.policy_holder.ref.id)
elif policy.policy_holder_type == "group":
policy_holder = await Group.get(policy.policy_holder.ref.id)
policy_holder = await policy.get_policy_holder()

return PolicySchemas.PolicyOutput(
permissions=ResourcePermissions(policy.permissions).name.split('|'), # type: ignore
policy_holder=policy_holder.model_dump()) # type: ignore
permissions=permission_type(policy.permissions).name.split('|'),
policy_holder=policy_holder.model_dump())
40 changes: 24 additions & 16 deletions src/unipoll_api/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def add_policy(self, member: "Group | Account", permissions, save: bool =
new_policy = Policy(policy_holder_type='account',
policy_holder=(await create_link(member)),
permissions=permissions,
parent_resource=self) # type: ignore
parent_resource=(await create_link(self))) # type: ignore

# Add the policy to the group
self.policies.append(new_policy) # type: ignore
Expand Down Expand Up @@ -147,27 +147,35 @@ async def remove_member(self, account, save: bool = True) -> bool:
return True


class Policy(Document):
id: ResourceID = Field(default_factory=ResourceID, alias="_id")
parent_resource: BackLink["Workspace"] | BackLink["Group"] | BackLink["Poll"] = Field(original_field="policies")
policy_holder_type: Literal["account", "group"]
policy_holder: Link["Group"] | Link["Account"]
permissions: int


class Poll(Resource):
id: ResourceID = Field(default_factory=ResourceID, alias="_id")
workspace: BackLink["Workspace"] = Field(original_field="polls")
workspace: Link[Workspace]
resource_type: Literal["poll"] = "poll"
public: bool
published: bool
questions: list
policies: list[Link["Policy"]]


# NOTE: model_rebuild is used to avoid circular imports
Resource.model_rebuild()
Workspace.model_rebuild()
Group.model_rebuild()
Policy.model_rebuild()
Poll.model_rebuild()
class Policy(Document):
id: ResourceID = Field(default_factory=ResourceID, alias="_id")
parent_resource: Link[Workspace] | Link[Group] | Link[Poll]
policy_holder_type: Literal["account", "group"]
policy_holder: Link["Group"] | Link["Account"]
permissions: int

async def get_parent_recource(self, fetch_links: bool = False) -> Workspace | Group | Poll:
from unipoll_api.exceptions.resource import ResourceNotFound
parent = await eval(self.parent_resource.ref.collection).get(self.parent_resource.ref.id,
fetch_links=fetch_links)
if not parent:
ResourceNotFound(self.parent_resource.ref.collection, self.parent_resource.ref.id)
return parent

async def get_policy_holder(self, fetch_links: bool = False) -> Group | Account:
from unipoll_api.exceptions.policy import PolicyHolderNotFound
policy_holder = await eval(self.policy_holder.ref.collection).get(self.policy_holder.ref.id,
fetch_links=fetch_links)
if not policy_holder:
PolicyHolderNotFound(self.policy_holder.ref.id)
return policy_holder
8 changes: 8 additions & 0 deletions src/unipoll_api/utils/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@
'delete_policies'
])


PermissionTypes = {
"workspace": WorkspacePermissions,
"group": GroupPermissions,
"poll": PollPermissions
}


WORKSPACE_ALL_PERMISSIONS = WorkspacePermissions(-1) # type: ignore
WORKSPACE_BASIC_PERMISSIONS = WorkspacePermissions(sum([WorkspacePermissions["get_workspace"],
WorkspacePermissions["get_members"],
Expand Down

0 comments on commit 7ac8180

Please sign in to comment.