Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Aug 29, 2024
1 parent b291f50 commit 716c1ed
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 51 deletions.
105 changes: 59 additions & 46 deletions src/awx_plugins/credentials/aws_assumerole.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import boto3
import hashlib
import datetime
import hashlib

from .plugin import CredentialPlugin
from django.utils.translation import gettext_lazy as _

import boto3

from .plugin import CredentialPlugin


try:
from botocore.exceptions import ClientError
except ImportError:
Expand All @@ -13,55 +16,59 @@
_aws_cred_cache = {}


assume_role_inputs = {
'fields': [
{
'id': 'access_key',
'label': _('AWS Access Key'),
'type': 'string',
'secret': True,
'help_text': _('The optional AWS access key for the user who will assume the role'),
},
{
'id': 'secret_key',
'label': 'AWS Secret Key',
'type': 'string',
'secret': True,
'help_text': _('The optional AWS secret key for the user who will assume the role'),
},
{
'id': 'external_id',
'label': 'External ID',
'type': 'string',
'help_text': _('The optional External ID which will be provided to the assume role API'),
},
{'id': 'role_arn', 'label': 'AWS ARN Role Name', 'type': 'string', 'secret': True, 'help_text': _('The ARN Role Name to be assumed in AWS')},
],
'metadata': [
{
'id': 'identifier',
'label': 'Identifier',
'type': 'string',
'help_text': _('The name of the key in the assumed AWS role to fetch [AccessKeyId | SecretAccessKey | SessionToken].'),
},
],
'required': ['role_arn'],
}
assume_role_inputs = {'fields': [{'id': 'access_key',
'label': _('AWS Access Key'),
'type': 'string',
'secret': True,
'help_text': _('The optional AWS access key for the user who will assume the role'),
},
{'id': 'secret_key',
'label': 'AWS Secret Key',
'type': 'string',
'secret': True,
'help_text': _('The optional AWS secret key for the user who will assume the role'),
},
{'id': 'external_id',
'label': 'External ID',
'type': 'string',
'help_text': _('The optional External ID which will be provided to the assume role API'),
},
{'id': 'role_arn',
'label': 'AWS ARN Role Name',
'type': 'string',
'secret': True,
'help_text': _('The ARN Role Name to be assumed in AWS')},
],
'metadata': [{'id': 'identifier',
'label': 'Identifier',
'type': 'string',
'help_text': _('The name of the key in the assumed AWS role to fetch [AccessKeyId | SecretAccessKey | SessionToken].'),
},
],
'required': ['role_arn'],
}


def aws_assumerole_getcreds(access_key, secret_key, role_arn, external_id):
if (access_key is None or len(access_key) == 0) and (secret_key is None or len(secret_key) == 0):
if (access_key is None or len(access_key) == 0) and (
secret_key is None or len(secret_key) == 0):
# Connect using credentials in the EE
connection = boto3.client(service_name="sts")
connection = boto3.client(service_name='sts')
else:
# Connect to AWS using provided credentials
connection = boto3.client(service_name="sts", aws_access_key_id=access_key, aws_secret_access_key=secret_key)
connection = boto3.client(
service_name='sts',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
try:
response = connection.assume_role(RoleArn=role_arn, RoleSessionName='AAP_AWS_Role_Session1', ExternalId=external_id)
response = connection.assume_role(
RoleArn=role_arn,
RoleSessionName='AAP_AWS_Role_Session1',
ExternalId=external_id)
except ClientError as ce:
raise ValueError(f'Got a bad client response from AWS: {ce.msg}.')

credentials = response.get("Credentials", {})
credentials = response.get('Credentials', {})

return credentials

Expand All @@ -79,17 +86,20 @@ def aws_assumerole_backend(**kwargs):
# separate credentials, and should allow the same user to request
# multiple roles.
#
credential_key_hash = hashlib.sha256((str(access_key or '') + role_arn).encode('utf-8'))
credential_key_hash = hashlib.sha256(
(str(access_key or '') + role_arn).encode('utf-8'))
credential_key = credential_key_hash.hexdigest()

credentials = _aws_cred_cache.get(credential_key, None)

# If there are no credentials for this user/ARN *or* the credentials
# we have in the cache have expired, then we need to contact AWS again.
#
if (credentials is None) or (credentials['Expiration'] < datetime.datetime.now(credentials['Expiration'].tzinfo)):
if (credentials is None) or (credentials['Expiration'] < datetime.datetime.now(
credentials['Expiration'].tzinfo)):

credentials = aws_assumerole_getcreds(access_key, secret_key, role_arn, external_id)
credentials = aws_assumerole_getcreds(
access_key, secret_key, role_arn, external_id)

_aws_cred_cache[credential_key] = credentials

Expand All @@ -101,4 +111,7 @@ def aws_assumerole_backend(**kwargs):
raise ValueError(f'Could not find a value for {identifier}.')


aws_assumerole_plugin = CredentialPlugin('AWS Assume Role Plugin', inputs=assume_role_inputs, backend=aws_assumerole_backend)
aws_assumerole_plugin = CredentialPlugin(
'AWS Assume Role Plugin',
inputs=assume_role_inputs,
backend=aws_assumerole_backend)
15 changes: 10 additions & 5 deletions tests/credential_plugins_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import datetime
from unittest import mock

import datetime
import pytest

from awx_plugins.credentials import hashivault
from awx_plugins.credentials import aws_assumerole
from awx_plugins.credentials import aws_assumerole, hashivault


def test_imported_azure_cloud_sdk_vars():
Expand Down Expand Up @@ -132,6 +131,7 @@ def test_hashivault_handle_auth_not_enough_args():
with pytest.raises(Exception):
hashivault.handle_auth()


def test_aws_assumerole_with_accesssecret():
kwargs = {
'access_key': 'my_access_key',
Expand All @@ -147,7 +147,11 @@ def test_aws_assumerole_with_accesssecret():
'Expiration': datetime.datetime.today() + datetime.timedelta(days=1),
}
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_called_with(kwargs.get('access_key'), kwargs.get('secret_key'), kwargs.get('role_arn'), None)
method_mock.assert_called_with(
kwargs.get('access_key'),
kwargs.get('secret_key'),
kwargs.get('role_arn'),
None)
assert token == 'the_access_token'
kwargs['identifier'] = 'secret_key'
method_mock.reset_mock()
Expand All @@ -174,7 +178,8 @@ def test_aws_assumerole_with_arnonly():
'Expiration': datetime.datetime.today() + datetime.timedelta(days=1),
}
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_called_with(None, None, kwargs.get('role_arn'), None)
method_mock.assert_called_with(
None, None, kwargs.get('role_arn'), None)
assert token == 'the_access_token'
kwargs['identifier'] = 'secret_key'
method_mock.reset_mock()
Expand Down

0 comments on commit 716c1ed

Please sign in to comment.