diff --git a/src/toil/lib/aws/iam.py b/src/toil/lib/aws/iam.py index bece2e0c19..b32a1b0f15 100644 --- a/src/toil/lib/aws/iam.py +++ b/src/toil/lib/aws/iam.py @@ -7,7 +7,7 @@ import boto3 from mypy_boto3_iam import IAMClient -from mypy_boto3_iam.type_defs import AttachedPolicyTypeDef +from mypy_boto3_iam.type_defs import AttachedPolicyTypeDef, PolicyDocumentDictTypeDef from mypy_boto3_sts import STSClient from toil.lib.aws import zone_to_region @@ -121,7 +121,7 @@ def permission_matches_any(perm: str, list_perms: List[str]) -> bool: return True return False -def get_actions_from_policy_document(policy_doc: Dict[str, Any]) -> AllowedActionCollection: +def get_actions_from_policy_document(policy_doc: PolicyDocumentDictTypeDef) -> AllowedActionCollection: ''' Given a policy document, go through each statement and create an AllowedActionCollection representing the permissions granted in the policy document. @@ -138,11 +138,16 @@ def get_actions_from_policy_document(policy_doc: Dict[str, Any]) -> AllowedActio for resource in statement["Resource"]: for key in ["Action", "NotAction"]: if key in statement.keys(): - if isinstance(statement[key], list): - allowed_actions[resource][key] += statement[key] + # mypy_boto3_iam declares policy document as a TypedDict + # This type expects 4 string keys, of which NotAction is not an option + # Thus mypy complains. NotAction seems to be valid according to Amazon: + # https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_notaction.html + # so type: ignore for now + if isinstance(statement[key], list): # type: ignore[literal-required] + allowed_actions[resource][key] += statement[key] # type: ignore[literal-required] else: #Assumes that if it isn't a list it's probably a string - allowed_actions[resource][key].append(statement[key]) + allowed_actions[resource][key].append(statement[key]) # type: ignore[literal-required] return allowed_actions def allowed_actions_attached(iam: IAMClient, attached_policies: List[AttachedPolicyTypeDef]) -> AllowedActionCollection: @@ -181,24 +186,28 @@ def allowed_actions_roles(iam: IAMClient, policy_names: List[str], role_name: st PolicyName=policy_name ) logger.debug("Checking role policy") - policy_document = json.loads(role_policy["PolicyDocument"]) + # PolicyDocument is now a TypedDict, but an instance of TypedDict is not an instance of dict? + if isinstance(role_policy["PolicyDocument"], str): + policy_document = json.loads(role_policy["PolicyDocument"]) + else: + policy_document = role_policy["PolicyDocument"] allowed_actions = add_to_action_collection(allowed_actions, get_actions_from_policy_document(policy_document)) return allowed_actions -def collect_policy_actions(policy_documents: Sequence[Union[str, Dict[str, Any]]]) -> AllowedActionCollection: +def collect_policy_actions(policy_documents: List[Union[str, PolicyDocumentDictTypeDef]]) -> AllowedActionCollection: """ Collect all of the actions allowed by the given policy documents into one AllowedActionCollection. """ allowed_actions: AllowedActionCollection = init_action_collection() for policy_str in policy_documents: # sometimes a string is returned from the api, so convert to a dictionary - if isinstance(policy_str, dict): - policy_dict = policy_str - else: + if isinstance(policy_str, str): policy_dict = json.loads(policy_str) + else: + policy_dict = policy_str allowed_actions = add_to_action_collection(allowed_actions, get_actions_from_policy_document(policy_dict)) return allowed_actions