From 0eab79be0254bf698b479bb2918866deb9b85e69 Mon Sep 17 00:00:00 2001 From: mhorky Date: Mon, 30 Oct 2023 11:42:56 +0100 Subject: [PATCH 1/3] RHEL-9435: Get AWS metadata via IMDSv2 * Card ID: RHEL-9435 Even though both versions are officially supported, the AWS teams are tracking connections making v1 requests as WARNINGs [0]. This patch switches the order to try to use IMDSv2 first. [0]: https://github.com/aws/aws-imds-packet-analyzer --- src/cloud_what/providers/aws.py | 23 +++++++++-------------- test/test_auto_registration.py | 33 +++++++++++++++++++-------------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/cloud_what/providers/aws.py b/src/cloud_what/providers/aws.py index dc1fca1827..869778ea6c 100644 --- a/src/cloud_what/providers/aws.py +++ b/src/cloud_what/providers/aws.py @@ -281,25 +281,20 @@ def _get_metadata_from_server_imds_v2(self) -> Union[str, None]: def _get_metadata_from_server(self) -> Union[str, None]: """ - Try to get metadata from server as is described in this document: + Try to get metadata from server as described in these documents: + - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html + - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-metadata-v2-how-it-works.html - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html + IMDSv2 requires two HTTP requests (first requests a token, second obtains the metadata). + If that fails, try to fall back to IDMSv1 (which is older and can be disabled in the AWS console). - It is possible to use two versions. We will try to use version IMDSv1 first (this version requires - only one HTTP request), when the usage of IMDSv1 is forbidden, then we will try to use IMDSv2 version. - The version requires two requests (get session TOKEN and then get own metadata using token) :return: String with metadata or None """ + metadata = self._get_metadata_from_server_imds_v2() + if metadata is not None: + return metadata - if self._token_exists() is False: - # First try to get metadata using IMDSv1 - metadata = self._get_metadata_from_server_imds_v1() - - if metadata is not None: - return metadata - - # When it wasn't possible to get metadata using IMDSv1, then try to get metadata using IMDSv2 - return self._get_metadata_from_server_imds_v2() + return self._get_metadata_from_server_imds_v1() def _get_signature_from_cache_file(self) -> None: """ diff --git a/test/test_auto_registration.py b/test/test_auto_registration.py index 72e67a330d..4168fa9bc1 100644 --- a/test/test_auto_registration.py +++ b/test/test_auto_registration.py @@ -18,7 +18,7 @@ import unittest import base64 -from unittest.mock import patch, Mock +from unittest.mock import Mock from subscription_manager.scripts.rhsmcertd_worker import _collect_cloud_info from .rhsmlib.facts.test_cloud_facts import AWS_METADATA @@ -93,11 +93,17 @@ def send_only_imds_v2_is_supported(request, *args, **kwargs): return mock_result -def mock_prepare_request(request): - return request +class TestAutomaticRegistration(unittest.TestCase): + def setUp(self): + _ = aws.AWSCloudProvider({}) + aws.AWSCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None) + aws.AWSCloudProvider._instance._get_token_from_cache_file = Mock(return_value=None) + aws.AWSCloudProvider._instance._write_token_to_cache_file = Mock() + _ = azure.AzureCloudProvider({}) + azure.AzureCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None) + azure.AzureCloudProvider._instance.get_api_versions = Mock(return_value="") -class TestAutomaticRegistration(unittest.TestCase): def tearDown(self): aws.AWSCloudProvider._instance = None aws.AWSCloudProvider._initialized = False @@ -106,17 +112,16 @@ def tearDown(self): gcp.GCPCloudProvider._instance = None gcp.GCPCloudProvider._initialized = False - @patch("cloud_what.providers.aws.requests.Session") - def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class): + def test_collect_cloud_info_one_cloud_provider_detected(self): """ Test the case, when we try to collect cloud info only for one detected cloud provider """ mock_session = Mock() - mock_session.send = send_only_imds_v2_is_supported - mock_session.prepare_request = Mock(side_effect=mock_prepare_request) + mock_session.send = send_only_imds_v2_is_supported() + mock_session.prepare_request = Mock(side_effect=lambda request: request) mock_session.hooks = {"response": []} - mock_session_class.return_value = mock_session + aws.AWSCloudProvider._instance._session = mock_session cloud_list = ["aws"] cloud_info = _collect_cloud_info(cloud_list, Mock()) @@ -136,18 +141,18 @@ def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class signature = base64.b64decode(b64_signature).decode("utf-8") self.assertEqual(signature, "-----BEGIN PKCS7-----\n" + AWS_SIGNATURE + "\n-----END PKCS7-----") - @patch("cloud_what.providers.aws.requests.Session") - def test_collect_cloud_info_more_cloud_providers_detected(self, mock_session_class): + def test_collect_cloud_info_more_cloud_providers_detected(self): """ Test the case, when we try to collect cloud info only for more than one cloud providers, because more than one cloud providers were detected """ mock_session = Mock() - mock_session.send = send_only_imds_v2_is_supported - mock_session.prepare_request = Mock(side_effect=mock_prepare_request) + mock_session.send = send_only_imds_v2_is_supported() + mock_session.prepare_request = Mock(side_effect=lambda request: request) mock_session.hooks = {"response": []} - mock_session_class.return_value = mock_session + aws.AWSCloudProvider._instance._session = mock_session + azure.AzureCloudProvider._instance._session = Mock() # More cloud providers detected cloud_list = ["azure", "aws"] From 4d8fcd45bab2a7d776a7e8eed8572539c1180281 Mon Sep 17 00:00:00 2001 From: mhorky Date: Wed, 1 Nov 2023 16:56:21 +0100 Subject: [PATCH 2/3] Simplify autoregistration test setup The function responsible for returning 'server' responses was altered to be less nested, which should improve its readability. --- test/test_auto_registration.py | 91 +++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 41 deletions(-) diff --git a/test/test_auto_registration.py b/test/test_auto_registration.py index 4168fa9bc1..734090cbd2 100644 --- a/test/test_auto_registration.py +++ b/test/test_auto_registration.py @@ -47,50 +47,59 @@ AWS_TOKEN = "ABCDEFGHIJKLMNOPQRSTVWXYZabcdefghijklmnopqrstvwxyz0123==" -def send_only_imds_v2_is_supported(request, *args, **kwargs): +def send__aws_imdsv2_only(request, *args, **kwargs): """ - Mock result, when we try to get metadata using GET method against - AWS metadata provider. This mock is for the case, when only IMDSv2 - is supported by instance. + Mock result for metadata request on AWS where only IMDSv2 is supported. + This function should be used to replace function `requests.Session.send()`. + :param request: HTTP request - :return: Mock with result + :return: Mocked server result. """ - mock_result = Mock() + result = Mock() if request.method == "PUT": - if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_TOKEN_URL: - if "X-aws-ec2-metadata-token-ttl-seconds" in request.headers: - mock_result.status_code = 200 - mock_result.text = AWS_TOKEN - else: - mock_result.status_code = 400 - mock_result.text = "Error: TTL for token not specified" - else: - mock_result.status_code = 400 - mock_result.text = "Error: Invalid URL" - elif request.method == "GET": - if "X-aws-ec2-metadata-token" in request.headers.keys(): - if request.headers["X-aws-ec2-metadata-token"] == AWS_TOKEN: - if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_METADATA_URL: - mock_result.status_code = 200 - mock_result.text = AWS_METADATA - elif request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_SIGNATURE_URL: - mock_result.status_code = 200 - mock_result.text = AWS_SIGNATURE - else: - mock_result.status_code = 400 - mock_result.text = "Error: Invalid URL" - else: - mock_result.status_code = 400 - mock_result.text = "Error: Invalid metadata token provided" - else: - mock_result.status_code = 400 - mock_result.text = "Error: IMDSv1 is not supported on this instance" - else: - mock_result.status_code = 400 - mock_result.text = "Error: not supported request method" - - return mock_result + if request.url != aws.AWSCloudProvider.CLOUD_PROVIDER_TOKEN_URL: + result.status_code = 400 + result.text = "Error: Invalid URL" + return result + + if "X-aws-ec2-metadata-token-ttl-seconds" not in request.headers: + result.status_code = 400 + result.text = "Error: TTL for token not specified" + return result + + result.status_code = 200 + result.text = AWS_TOKEN + return result + + if request.method == "GET": + if "X-aws-ec2-metadata-token" not in request.headers.keys(): + result.status_code = 400 + result.text = "Error: IMDSv1 is not supported on this instance" + return result + + if request.headers["X-aws-ec2-metadata-token"] != AWS_TOKEN: + result.status_code = 400 + result.text = "Error: Invalid metadata token provided" + return result + + if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_METADATA_URL: + result.status_code = 200 + result.text = AWS_METADATA + return result + + if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_SIGNATURE_URL: + result.status_code = 200 + result.text = AWS_SIGNATURE + return result + + result.status_code = 400 + result.text = "Error: Invalid URL" + return result + + result.status_code = 400 + result.text = "Error: not supported request method" + return result class TestAutomaticRegistration(unittest.TestCase): @@ -118,7 +127,7 @@ def test_collect_cloud_info_one_cloud_provider_detected(self): one detected cloud provider """ mock_session = Mock() - mock_session.send = send_only_imds_v2_is_supported() + mock_session.send = send__aws_imdsv2_only mock_session.prepare_request = Mock(side_effect=lambda request: request) mock_session.hooks = {"response": []} aws.AWSCloudProvider._instance._session = mock_session @@ -148,7 +157,7 @@ def test_collect_cloud_info_more_cloud_providers_detected(self): providers were detected """ mock_session = Mock() - mock_session.send = send_only_imds_v2_is_supported() + mock_session.send = send__aws_imdsv2_only mock_session.prepare_request = Mock(side_effect=lambda request: request) mock_session.hooks = {"response": []} aws.AWSCloudProvider._instance._session = mock_session From c8b7cb72fd5194dd9f6eb56b041508b833f71d21 Mon Sep 17 00:00:00 2001 From: mhorky Date: Wed, 1 Nov 2023 16:56:35 +0100 Subject: [PATCH 3/3] Fix type hint in rhsm When type hints for 'dnf' are not available, loading this module would result in a test failure. --- src/rhsm/profile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rhsm/profile.py b/src/rhsm/profile.py index de570c115f..374df83b52 100644 --- a/src/rhsm/profile.py +++ b/src/rhsm/profile.py @@ -86,7 +86,7 @@ def _uniquify(module_list: list) -> list: return list(ret.values()) @staticmethod - def fix_aws_rhui_repos(base: dnf.Base) -> None: + def fix_aws_rhui_repos(base: "dnf.Base") -> None: """ Try to fix RHUI repos on AWS systems. When the system is running on AWS, then we have to fix repository URL. See: https://bugzilla.redhat.com/show_bug.cgi?id=1924126