Skip to content

Commit

Permalink
Merge branch 'master' of github.com:DataBiosphere/toil into issues/45…
Browse files Browse the repository at this point in the history
…54-string-to-file-wdl
  • Loading branch information
stxue1 committed Aug 31, 2023
2 parents 67092ce + 87f858d commit f3d114d
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions src/toil/lib/aws/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import boto3
from mypy_boto3_iam import IAMClient
from mypy_boto3_iam.type_defs import AttachedPolicyTypeDef
from mypy_boto3_iam.type_defs import AttachedPolicyTypeDef, PolicyDocumentDictTypeDef
from mypy_boto3_sts import STSClient

from toil.lib.aws import zone_to_region
Expand Down Expand Up @@ -121,7 +121,7 @@ def permission_matches_any(perm: str, list_perms: List[str]) -> bool:
return True
return False

def get_actions_from_policy_document(policy_doc: Dict[str, Any]) -> AllowedActionCollection:
def get_actions_from_policy_document(policy_doc: PolicyDocumentDictTypeDef) -> AllowedActionCollection:
'''
Given a policy document, go through each statement and create an AllowedActionCollection representing the
permissions granted in the policy document.
Expand All @@ -138,11 +138,16 @@ def get_actions_from_policy_document(policy_doc: Dict[str, Any]) -> AllowedActio
for resource in statement["Resource"]:
for key in ["Action", "NotAction"]:
if key in statement.keys():
if isinstance(statement[key], list):
allowed_actions[resource][key] += statement[key]
# mypy_boto3_iam declares policy document as a TypedDict
# This type expects 4 string keys, of which NotAction is not an option
# Thus mypy complains. NotAction seems to be valid according to Amazon:
# https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_notaction.html
# so type: ignore for now
if isinstance(statement[key], list): # type: ignore[literal-required]
allowed_actions[resource][key] += statement[key] # type: ignore[literal-required]
else:
#Assumes that if it isn't a list it's probably a string
allowed_actions[resource][key].append(statement[key])
allowed_actions[resource][key].append(statement[key]) # type: ignore[literal-required]

return allowed_actions
def allowed_actions_attached(iam: IAMClient, attached_policies: List[AttachedPolicyTypeDef]) -> AllowedActionCollection:
Expand Down Expand Up @@ -181,24 +186,28 @@ def allowed_actions_roles(iam: IAMClient, policy_names: List[str], role_name: st
PolicyName=policy_name
)
logger.debug("Checking role policy")
policy_document = json.loads(role_policy["PolicyDocument"])
# PolicyDocument is now a TypedDict, but an instance of TypedDict is not an instance of dict?
if isinstance(role_policy["PolicyDocument"], str):
policy_document = json.loads(role_policy["PolicyDocument"])
else:
policy_document = role_policy["PolicyDocument"]

allowed_actions = add_to_action_collection(allowed_actions, get_actions_from_policy_document(policy_document))

return allowed_actions


def collect_policy_actions(policy_documents: Sequence[Union[str, Dict[str, Any]]]) -> AllowedActionCollection:
def collect_policy_actions(policy_documents: List[Union[str, PolicyDocumentDictTypeDef]]) -> AllowedActionCollection:
"""
Collect all of the actions allowed by the given policy documents into one AllowedActionCollection.
"""
allowed_actions: AllowedActionCollection = init_action_collection()
for policy_str in policy_documents:
# sometimes a string is returned from the api, so convert to a dictionary
if isinstance(policy_str, dict):
policy_dict = policy_str
else:
if isinstance(policy_str, str):
policy_dict = json.loads(policy_str)
else:
policy_dict = policy_str
allowed_actions = add_to_action_collection(allowed_actions, get_actions_from_policy_document(policy_dict))
return allowed_actions

Expand Down

0 comments on commit f3d114d

Please sign in to comment.