Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RHEL-9435: Get AWS metadata via IMDSv2 #3351

Merged
merged 3 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions src/cloud_what/providers/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,25 +281,20 @@ def _get_metadata_from_server_imds_v2(self) -> Union[str, None]:

def _get_metadata_from_server(self) -> Union[str, None]:
"""
Try to get metadata from server as is described in this document:
Try to get metadata from server as described in these documents:
- https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
- https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-metadata-v2-how-it-works.html

https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
IMDSv2 requires two HTTP requests (first requests a token, second obtains the metadata).
If that fails, try to fall back to IDMSv1 (which is older and can be disabled in the AWS console).

It is possible to use two versions. We will try to use version IMDSv1 first (this version requires
only one HTTP request), when the usage of IMDSv1 is forbidden, then we will try to use IMDSv2 version.
The version requires two requests (get session TOKEN and then get own metadata using token)
:return: String with metadata or None
"""
metadata = self._get_metadata_from_server_imds_v2()
if metadata is not None:
return metadata

if self._token_exists() is False:
# First try to get metadata using IMDSv1
metadata = self._get_metadata_from_server_imds_v1()

if metadata is not None:
return metadata

# When it wasn't possible to get metadata using IMDSv1, then try to get metadata using IMDSv2
return self._get_metadata_from_server_imds_v2()
return self._get_metadata_from_server_imds_v1()

def _get_signature_from_cache_file(self) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion src/rhsm/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _uniquify(module_list: list) -> list:
return list(ret.values())

@staticmethod
def fix_aws_rhui_repos(base: dnf.Base) -> None:
def fix_aws_rhui_repos(base: "dnf.Base") -> None:
"""
Try to fix RHUI repos on AWS systems. When the system is running on AWS, then we have
to fix repository URL. See: https://bugzilla.redhat.com/show_bug.cgi?id=1924126
Expand Down
122 changes: 68 additions & 54 deletions test/test_auto_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import unittest
import base64
from unittest.mock import patch, Mock
from unittest.mock import Mock

from subscription_manager.scripts.rhsmcertd_worker import _collect_cloud_info
from .rhsmlib.facts.test_cloud_facts import AWS_METADATA
Expand Down Expand Up @@ -47,57 +47,72 @@
AWS_TOKEN = "ABCDEFGHIJKLMNOPQRSTVWXYZabcdefghijklmnopqrstvwxyz0123=="


def send_only_imds_v2_is_supported(request, *args, **kwargs):
def send__aws_imdsv2_only(request, *args, **kwargs):
"""
Mock result, when we try to get metadata using GET method against
AWS metadata provider. This mock is for the case, when only IMDSv2
is supported by instance.
Mock result for metadata request on AWS where only IMDSv2 is supported.
This function should be used to replace function `requests.Session.send()`.

:param request: HTTP request
:return: Mock with result
:return: Mocked server result.
"""
mock_result = Mock()
result = Mock()

if request.method == "PUT":
if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_TOKEN_URL:
if "X-aws-ec2-metadata-token-ttl-seconds" in request.headers:
mock_result.status_code = 200
mock_result.text = AWS_TOKEN
else:
mock_result.status_code = 400
mock_result.text = "Error: TTL for token not specified"
else:
mock_result.status_code = 400
mock_result.text = "Error: Invalid URL"
elif request.method == "GET":
if "X-aws-ec2-metadata-token" in request.headers.keys():
if request.headers["X-aws-ec2-metadata-token"] == AWS_TOKEN:
if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_METADATA_URL:
mock_result.status_code = 200
mock_result.text = AWS_METADATA
elif request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_SIGNATURE_URL:
mock_result.status_code = 200
mock_result.text = AWS_SIGNATURE
else:
mock_result.status_code = 400
mock_result.text = "Error: Invalid URL"
else:
mock_result.status_code = 400
mock_result.text = "Error: Invalid metadata token provided"
else:
mock_result.status_code = 400
mock_result.text = "Error: IMDSv1 is not supported on this instance"
else:
mock_result.status_code = 400
mock_result.text = "Error: not supported request method"

return mock_result


def mock_prepare_request(request):
return request
if request.url != aws.AWSCloudProvider.CLOUD_PROVIDER_TOKEN_URL:
result.status_code = 400
result.text = "Error: Invalid URL"
return result

if "X-aws-ec2-metadata-token-ttl-seconds" not in request.headers:
result.status_code = 400
result.text = "Error: TTL for token not specified"
return result

result.status_code = 200
result.text = AWS_TOKEN
return result

if request.method == "GET":
if "X-aws-ec2-metadata-token" not in request.headers.keys():
result.status_code = 400
result.text = "Error: IMDSv1 is not supported on this instance"
return result

if request.headers["X-aws-ec2-metadata-token"] != AWS_TOKEN:
result.status_code = 400
result.text = "Error: Invalid metadata token provided"
return result

if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_METADATA_URL:
result.status_code = 200
result.text = AWS_METADATA
return result

if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_SIGNATURE_URL:
result.status_code = 200
result.text = AWS_SIGNATURE
return result

result.status_code = 400
result.text = "Error: Invalid URL"
return result

result.status_code = 400
result.text = "Error: not supported request method"
return result


class TestAutomaticRegistration(unittest.TestCase):
def setUp(self):
_ = aws.AWSCloudProvider({})
aws.AWSCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None)
aws.AWSCloudProvider._instance._get_token_from_cache_file = Mock(return_value=None)
aws.AWSCloudProvider._instance._write_token_to_cache_file = Mock()

_ = azure.AzureCloudProvider({})
azure.AzureCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None)
azure.AzureCloudProvider._instance.get_api_versions = Mock(return_value="")

def tearDown(self):
aws.AWSCloudProvider._instance = None
aws.AWSCloudProvider._initialized = False
Expand All @@ -106,17 +121,16 @@ def tearDown(self):
gcp.GCPCloudProvider._instance = None
gcp.GCPCloudProvider._initialized = False

@patch("cloud_what.providers.aws.requests.Session")
def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class):
def test_collect_cloud_info_one_cloud_provider_detected(self):
"""
Test the case, when we try to collect cloud info only for
one detected cloud provider
"""
mock_session = Mock()
mock_session.send = send_only_imds_v2_is_supported
mock_session.prepare_request = Mock(side_effect=mock_prepare_request)
mock_session.send = send__aws_imdsv2_only
mock_session.prepare_request = Mock(side_effect=lambda request: request)
mock_session.hooks = {"response": []}
mock_session_class.return_value = mock_session
aws.AWSCloudProvider._instance._session = mock_session

cloud_list = ["aws"]
cloud_info = _collect_cloud_info(cloud_list, Mock())
Expand All @@ -136,18 +150,18 @@ def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class
signature = base64.b64decode(b64_signature).decode("utf-8")
self.assertEqual(signature, "-----BEGIN PKCS7-----\n" + AWS_SIGNATURE + "\n-----END PKCS7-----")

@patch("cloud_what.providers.aws.requests.Session")
def test_collect_cloud_info_more_cloud_providers_detected(self, mock_session_class):
def test_collect_cloud_info_more_cloud_providers_detected(self):
"""
Test the case, when we try to collect cloud info only for
more than one cloud providers, because more than one cloud
providers were detected
"""
mock_session = Mock()
mock_session.send = send_only_imds_v2_is_supported
mock_session.prepare_request = Mock(side_effect=mock_prepare_request)
mock_session.send = send__aws_imdsv2_only
mock_session.prepare_request = Mock(side_effect=lambda request: request)
mock_session.hooks = {"response": []}
mock_session_class.return_value = mock_session
aws.AWSCloudProvider._instance._session = mock_session
azure.AzureCloudProvider._instance._session = Mock()

# More cloud providers detected
cloud_list = ["azure", "aws"]
Expand Down
Loading