diff --git a/ocs_ci/framework/pytest_customization/marks.py b/ocs_ci/framework/pytest_customization/marks.py index cb00474a4cc0..da58a236f421 100644 --- a/ocs_ci/framework/pytest_customization/marks.py +++ b/ocs_ci/framework/pytest_customization/marks.py @@ -610,3 +610,16 @@ def get_current_test_marks(): """ return current_test_marks + + +# Marks to identify encryption at rest is configured. +encryption_at_rest_required = pytest.mark.skipif( + not config.ENV_DATA.get("encryption_at_rest"), + reason="This test requires encryption at rest to be enabled.", +) + +# Mark to identify encryption is configured with KMS. +skipif_kms_deployment = pytest.mark.skipif( + config.DEPLOYMENT.get("kms_deployment") is True, + reason="This test is not supported for KMS deployment.", +) diff --git a/ocs_ci/helpers/keyrotation_helper.py b/ocs_ci/helpers/keyrotation_helper.py new file mode 100644 index 000000000000..0cc54d073f1e --- /dev/null +++ b/ocs_ci/helpers/keyrotation_helper.py @@ -0,0 +1,305 @@ +import base64 +import logging + +from ocs_ci.ocs.ocp import OCP +from ocs_ci.ocs import constants +from ocs_ci.ocs.exceptions import CommandFailed +from ocs_ci.framework import config +from ocs_ci.ocs.resources.pvc import get_deviceset_pvcs + +log = logging.getLogger(__name__) + + +class KeyRotation: + """ + Handles key rotation operations for a storage cluster. + """ + + def __init__(self): + """ + Initializes KeyRotation object with necessary parameters. + """ + self.cluster_name = constants.DEFAULT_CLUSTERNAME + self.resource_name = constants.STORAGECLUSTER + self.cluster_namespace = config.ENV_DATA["cluster_namespace"] + + if config.DEPLOYMENT["external_mode"]: + self.cluster_name_name = constants.DEFAULT_CLUSTERNAME_EXTERNAL_MODE + + self.storagecluster_obj = OCP( + resource_name=self.cluster_name, + namespace=self.cluster_namespace, + kind=self.resource_name, + ) + + def _exec_oc_cmd(self, cmd, **kwargs): + """ + Executes the given command. + + Args: + cmd : command to run. + + Returns: + str: The output of the command. + + Raises: + CommandFailed: If the command fails. + """ + + try: + cmd_out = self.storagecluster_obj.exec_oc_cmd(cmd, **kwargs) + except CommandFailed as ex: + log.error(f"Error while executing command {cmd}: {ex}") + raise ex + + return cmd_out + + def get_keyrotation_schedule(self): + """ + Retrieves the current key rotation schedule for the storage cluster. + + Returns: + str: The key rotation schedule. + + Raises: + ValueError: If the key rotation schedule is not found. + """ + cmd = f"get {self.resource_name} -o jsonpath='{{.items[*].spec.encryption.keyRotation.schedule}}'" + schedule = self._exec_oc_cmd(cmd, out_yaml_format=False) + log.info(f"Keyrotation schedule set to {schedule} in storagecluster spec.") + return schedule + + def set_keyrotation_schedule(self, schedule): + """ + Sets the key rotation schedule for the storage cluster. + + Args: + schedule (str): The new key rotation schedule. + + Raises: + ValueError: If the key rotation schedule is not updated successfully. + """ + param = f'[{{"op":"replace","path":"/spec/encryption/keyRotation/schedule","value":"{schedule}"}}]' + self.storagecluster_obj.patch(params=param, format_type="json") + self.storagecluster_obj.wait_for_resource( + constants.STATUS_READY, + self.storagecluster_obj.resource_name, + column="PHASE", + timeout=180, + ) + self.storagecluster_obj.reload_data() + if self.get_keyrotation_schedule() == schedule: + log.info(f"Storagecluster keyrotation schedule is set as {schedule}") + + def enable_keyrotation(self): + """ + Enables key rotation for the storage cluster. + + Returns: + bool: True if key rotation is enabled, False otherwise. + """ + if self.is_keyrotation_enable(): + log.info("Keyrotation is Already in Enabled state.") + return True + + param = '{"spec":{"encryption":{"keyRotation":{"enable":null}}}}' + self.storagecluster_obj.patch( + params=param, format_type="merge", resource_name=self.cluster_name + ) + self.storagecluster_obj.wait_for_resource( + constants.STATUS_READY, + self.storagecluster_obj.resource_name, + column="PHASE", + timeout=180, + ) + self.storagecluster_obj.reload_data() + log.info("Keyrotation is enabled in storegeclujster object.") + return True + + def disable_keyrotation(self): + """ + Disables key rotation for the storage cluster. + + Returns: + bool: True if key rotation is disabled, False otherwise. + """ + if not self.is_keyrotation_enable(): + log.info("Keyrotation is Already in Disabled state.") + return True + + param = '[{"op":"replace","path":"/spec/encryption/keyRotation/enable","value":False}]' + self.storagecluster_obj.patch(params=param, format_type="json") + self.storagecluster_obj.wait_for_resource( + constants.STATUS_READY, + self.storagecluster_obj.resource_name, + column="PHASE", + timeout=180, + ) + self.storagecluster_obj.reload_data() + log.info("Keyrotation is Disabled in storagecluster object.") + return True + + def is_keyrotation_enable(self): + """ + Checks if key rotation is enabled for the storage cluster. + + Returns: + bool: True if key rotation is enabled, False otherwise. + """ + cmd = f" get {self.resource_name} -o jsonpath='{{.items[*].spec.encryption.keyRotation.enable}}'" + cmd_out = self._exec_oc_cmd(cmd) + if (cmd_out is True) or (cmd_out is None): + log.info("Keyrotation in storagecluster object is enabled.") + return True + log.info("Keyrotation in storagecluster object is not enabled.") + return False + + +class NoobaaKeyrotation(KeyRotation): + """ + Extends KeyRotation class to handle key rotation operations for Noobaa. + """ + + def __init__(self): + """ + Initializes NoobaaKeyrotation object. + """ + super().__init__() + + def get_noobaa_keyrotation_schedule(self): + """ + Retrieves the current key rotation schedule for Noobaa. + + Returns: + str: The key rotation schedule for Noobaa. + + Raises: + ValueError: If the key rotation schedule is not found or is invalid. + """ + cmd = " get noobaas.noobaa.io -o jsonpath='{.items[*].spec.security.kms.schedule}'" + + cmd_out = self._exec_oc_cmd(cmd=cmd, out_yaml_format=False) + if cmd_out == "": + raise ValueError("Noobaa Keyrotation schedule is not found.") + log.info(f"Noobaa Keyrotation schedule: {cmd_out}") + return cmd_out.strip() + + def is_noobaa_keyrotation_enable(self): + """ + Checks if key rotation is enabled for Noobaa. + + Returns: + bool: True if key rotation is enabled for Noobaa, False otherwise. + """ + cmd = " get noobaas.noobaa.io -o jsonpath='{.items[*].spec.security.kms.enableKeyRotation}'" + + cmd_out = self._exec_oc_cmd(cmd=cmd) + if cmd_out == "true": + log.info("Noobaa Keyrotation is Enabled.") + return True + log.info("Noobaa Keyrotation is disabled.") + return False + + def get_noobaa_backend_secret(self): + """ + Retrieves the backend secret for Noobaa. + + Returns: + tuple: A tuple containing the Noobaa backend root key and secret. + + Raises: + ValueError: If failed to retrieve the backend secret. + """ + cmd = f" get secret {constants.NOOBAA_BACKEND_SECRET} -o jsonpath='{{.data}}'" + + cmd_out = self._exec_oc_cmd(cmd=cmd) + noobaa_backend_root_key = base64.b64decode(cmd_out["active_root_key"]).decode() + noobaa_backend_secret = cmd_out[noobaa_backend_root_key] + log.info( + f"Noobaa Backend root key : {noobaa_backend_root_key}, Noobaa backend secrets : {noobaa_backend_secret}" + ) + return noobaa_backend_root_key, noobaa_backend_secret + + def get_noobaa_volume_secret(self): + """ + Retrieves the volume secret for Noobaa. + + Returns: + tuple: A tuple containing the Noobaa volume root key and secret. + + Raises: + ValueError: If failed to retrieve the volume secret. + """ + cmd = f" get secret {constants.NOOBAA_VOLUME_SECRET} -o jsonpath='{{.data}}'" + + cmd_out = self._exec_oc_cmd(cmd=cmd) + noobaa_volume_root_key = base64.b64decode(cmd_out["active_root_key"]).decode() + noobaa_volume_secret = cmd_out[noobaa_volume_root_key] + log.info( + f"Noobaa volume root key: {noobaa_volume_root_key}, Noobaa Volume sceret: {noobaa_volume_secret}" + ) + return noobaa_volume_root_key, noobaa_volume_secret + + +class OSDKeyrotation(KeyRotation): + """ + Extends KeyRotation class to handle key rotation operations for Rook. + """ + + def __init__(self): + """ + Initializes RookKeyrotation object. + """ + super().__init__() + self.deviceset = [pvc.name for pvc in get_deviceset_pvcs()] + + def is_osd_keyrotation_enabled(self): + """ + Checks if key rotation is enabled for OSD. + + Returns: + bool: True if key rotation is enabled for OSD, False otherwise. + """ + cmd = " get cephclusters.ceph.rook.io -o jsonpath='{.items[].spec.security.keyRotation.enabled}'" + + cmd_out = self._exec_oc_cmd(cmd=cmd) + if cmd_out: + log.info("OSD keyrotation is Enabled.") + return True + log.info("OSD keyrotation is Disabled.") + return False + + def get_osd_keyrotation_schedule(self): + """ + Retrieves the key rotation schedule for OSD. + + Returns: + str: The key rotation schedule for OSD. + + Raises: + ValueError: If failed to retrieve the key rotation schedule. + """ + cmd = " get cephclusters.ceph.rook.io -o jsonpath='{.items[].spec.security.keyRotation.schedule}'" + + schedule = self._exec_oc_cmd(cmd=cmd, out_yaml_format=False) + log.info(f"OSD keyrotation schedule set as {schedule}") + return schedule + + def get_osd_dm_crypt(self, device): + """ + Retrieves the dmcrypt key for OSD. + + Args: + device (str): The OSD device name. + + Returns: + str: The dmcrypt key for the specified OSD device. + + Raises: + ValueError: If failed to retrieve the OSD dmcrypt key. + """ + cmd = f" get secret rook-ceph-osd-encryption-key-{device} -o jsonpath='{{.data.dmcrypt-key}}'" + + dmcrypt_key = self._exec_oc_cmd(cmd=cmd, out_yaml_format=False) + log.info(f"dmcrypt-key of device {device} is {dmcrypt_key}") + return dmcrypt_key diff --git a/ocs_ci/ocs/constants.py b/ocs_ci/ocs/constants.py index b2c056e2c180..2f9e2e22fea7 100644 --- a/ocs_ci/ocs/constants.py +++ b/ocs_ci/ocs/constants.py @@ -2504,3 +2504,7 @@ OPERATION_START = "start" OPERATION_RESTART = "restart" OPERATION_TERMINATE = "terminate" + +# Noobaa Secrets +NOOBAA_BACKEND_SECRET = "noobaa-root-master-key-backend" +NOOBAA_VOLUME_SECRET = "noobaa-root-master-key-volume" diff --git a/tests/functional/encryption/test_encryption_keyrotation.py b/tests/functional/encryption/test_encryption_keyrotation.py new file mode 100644 index 000000000000..ee32e6ae106d --- /dev/null +++ b/tests/functional/encryption/test_encryption_keyrotation.py @@ -0,0 +1,229 @@ +import logging +import pytest + +from ocs_ci.framework.pytest_customization.marks import ( + tier1, + green_squad, + encryption_at_rest_required, + skipif_kms_deployment, +) +from ocs_ci.helpers.keyrotation_helper import ( + NoobaaKeyrotation, + OSDKeyrotation, + KeyRotation, +) + +from ocs_ci.ocs.exceptions import UnexpectedBehaviour +from ocs_ci.utility.retry import retry + + +logger = logging.getLogger(__name__) + +log = logging.getLogger(__name__) + + +@encryption_at_rest_required +@skipif_kms_deployment +@green_squad +@tier1 +class TestEncryptionKeyrotation: + @pytest.fixture(autouse=True) + def teardown(self, request): + """ + Resetting the default value of KeyRotation + """ + + def finalizer(): + kr_obj = KeyRotation() + kr_obj.set_keyrotation_schedule("@weekly") + kr_obj.enable_keyrotation() + + request.addfinalizer(finalizer) + + def test_osd_keyrotation(self): + """ + Test to verify the key rotation of the OSD + + Steps: + 1. Disable Keyrotation and verify its disable status at rook and storagecluster end. + 2. Record existing OSD keys before rotation is happen. + 3. Enable Keyrotation and verify its enable status at rook and storagecluster end. + 4. Set keyrotation status to every 3 minutes. + 5. wait for 3 minute. + 6. Verify the keyrotation is happen for each osd by comparing the old keys with new keys. + 7. Change the keyrotation value to default. + """ + osd_keyrotation = OSDKeyrotation() + + # Disable keyrotation and verify its enable status at rook and storagecluster end. + log.info("Disabling the Keyrotation in storagecluster Spec.") + osd_keyrotation.disable_keyrotation() + + assert ( + not osd_keyrotation.is_keyrotation_enable() + ), "Keyrotation is Not Disable in the storagecluster object" + assert ( + not osd_keyrotation.is_osd_keyrotation_enabled() + ), "KeyRotation is not Disable in the Rook Object." + + # Recored existing OSD keys before rotation is happen. + osd_keys_before_rotation = {} + for device in osd_keyrotation.deviceset: + osd_keys_before_rotation[device] = osd_keyrotation.get_osd_dm_crypt(device) + + # Enable Keyrotation and verify its enable status at rook and storagecluster end. + log.info("Enabling the Keyrotation in storagecluster Spec.") + osd_keyrotation.enable_keyrotation() + + assert ( + osd_keyrotation.is_keyrotation_enable() + ), "Keyrotation is Not enabled in the storagecluster object" + assert ( + osd_keyrotation.is_osd_keyrotation_enabled() + ), "KeyRotation is not enabled in the Rook Object." + + # Set Key Rotation schedule to every 3 minutes. + schedule = "*/3 * * * *" + osd_keyrotation.set_keyrotation_schedule(schedule) + + # Verify Keyrotation schedule changed at storagecluster object and rook object. + assert ( + osd_keyrotation.get_keyrotation_schedule() == schedule + ), "Keyrotation schedule is not set to 3 minutes." + assert ( + osd_keyrotation.get_osd_keyrotation_schedule() == schedule + ), "KeyRotation is not enabled in the Rook Object." + + # Wait for 3 minuted and verify the keyrotation is happen for each osd by comparing the old keys with new keys. + log.info( + "Waiting for 3 minutes to verify the keyrotation is happen for each osd." + ) + + @retry(UnexpectedBehaviour, tries=10, delay=20) + def compare_old_with_new_keys(): + for device in osd_keyrotation.deviceset: + osd_keys_after_rotation = osd_keyrotation.get_osd_dm_crypt(device) + log.info( + f"Fetching New Key for device {device}: {osd_keys_after_rotation}" + ) + if osd_keys_before_rotation[device] == osd_keys_after_rotation: + log.info(f"Keyrotation Still not happend for device {device}") + raise UnexpectedBehaviour( + f"Keyrotation is not happened for the device {device}" + ) + log.info(f"Keyrotation is happend for device {device}") + return True + + # with pytest.raises(UnexpectedBehaviour): + try: + compare_old_with_new_keys() + except UnexpectedBehaviour: + log.error("Key rotation is Not happend after schedule is passed. ") + assert False + + # Change the keyrotation value to default. + log.info("Changing the keyrotation value to default.") + osd_keyrotation.set_keyrotation_schedule("@weekly") + + def test_noobaa_keyrotation(self): + """ + Test to verify the keyrotation for noobaa. + + Steps: + 1. Disable Keyrotation and verify its Disable status at noobaa and storagecluster end. + 3. Record existing NooBaa Volume and backend keys before rotation is happen. + 4. Set keyrotation status to every 3 minutes. + 5. wait for 3 minute. + 6. Verify the keyrotation is happen NooBaa volume and backend keys + by comparing the old keys with new keys. + 7. Change the keyrotation value to default. + """ + + # Get the noobaa object. + noobaa_keyrotation = NoobaaKeyrotation() + + # Disable keyrotation and verify its disable status at noobaa and storagecluster end. + noobaa_keyrotation.disable_keyrotation() + + assert ( + not noobaa_keyrotation.is_keyrotation_enable() + ), "Keyrotation is not disabled." + assert ( + not noobaa_keyrotation.is_noobaa_keyrotation_enable() + ), "Keyrotation is not disabled." + + # Recoard Noobaa volume and backend keys before rotation. + ( + old_noobaa_backend_key, + old_noobaa_backend_secret, + ) = noobaa_keyrotation.get_noobaa_backend_secret() + log.info( + f" Noobaa backend secrets before Rotation {old_noobaa_backend_key} : {old_noobaa_backend_secret}" + ) + + ( + old_noobaa_volume_key, + old_noobaa_volume_secret, + ) = noobaa_keyrotation.get_noobaa_volume_secret() + log.info( + f"Noobaa Volume secrets before Rotation {old_noobaa_volume_key} : {old_noobaa_volume_secret}" + ) + + # Enable Keyrotation and verify its enable status at Noobaa and storagecluster end. + noobaa_keyrotation.enable_keyrotation() + + assert ( + noobaa_keyrotation.is_keyrotation_enable + ), "Keyrotation is not enabled in the storagecluster object." + assert ( + noobaa_keyrotation.is_noobaa_keyrotation_enable + ), "Keyrotation is not enabled in the noobaa object." + + # Set keyrotatiojn schedule to every 3 minutes. + schedule = "*/3 * * * *" + noobaa_keyrotation.set_keyrotation_schedule(schedule) + + # Verify keyrotation is set for every 3 minute in storagecluster and noobaa object. + assert ( + noobaa_keyrotation.get_keyrotation_schedule() == schedule + ), "Keyrotation schedule is not set to every 3 minutes in storagecluster object." + assert ( + noobaa_keyrotation.get_noobaa_keyrotation_schedule() == schedule + ), "Keyrotation schedule is not set to every 3 minutes in Noobaa object." + + @retry(UnexpectedBehaviour, tries=10, delay=20) + def compare_old_keys_with_new_keys(): + """ + Compare old keys with new keys. + """ + ( + new_noobaa_backend_key, + new_noobaa_backend_secret, + ) = noobaa_keyrotation.get_noobaa_backend_secret() + ( + new_noobaa_volume_key, + new_noobaa_volume_secret, + ) = noobaa_keyrotation.get_noobaa_volume_secret() + + if new_noobaa_backend_key == old_noobaa_backend_key: + raise UnexpectedBehaviour("Noobaa Key Rotation is not happend") + + if new_noobaa_volume_key == old_noobaa_volume_key: + raise UnexpectedBehaviour("Noobaa Key Rotation is not happend.") + + log.info( + f"Noobaa Backend key rotated {new_noobaa_backend_key} : {new_noobaa_backend_secret}" + ) + log.info( + f"Noobaa Volume key rotated {new_noobaa_volume_key} : {new_noobaa_volume_secret}" + ) + + try: + compare_old_keys_with_new_keys() + except UnexpectedBehaviour: + log.info("Noobaa Key Rotation is not happend.") + assert False + + # Change the keyrotation value to default. + log.info("Changing the keyrotation value to default.") + noobaa_keyrotation.set_keyrotation_schedule("@weekly")