Skip to content

Commit

Permalink
Test Automation for feature PV key rotation. (#10344)
Browse files Browse the repository at this point in the history
Signed-off-by: Parag Kamble <[email protected]>
  • Loading branch information
paraggit authored Sep 1, 2024
1 parent c95770a commit 047ca53
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 11 deletions.
58 changes: 58 additions & 0 deletions ocs_ci/helpers/keyrotation_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ocs_ci.ocs.resources.pvc import get_deviceset_pvcs
from ocs_ci.ocs.exceptions import UnexpectedBehaviour
from ocs_ci.utility.retry import retry
from ocs_ci.utility.kms import get_kms_deployment

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -350,3 +351,60 @@ def compare_old_with_new_keys():

log.info("Keyrotation is sucessfully done for the all OSD.")
return True


class PVKeyrotation(KeyRotation):
def __init__(self, sc_obj):
self.sc_obj = sc_obj
self.kms = get_kms_deployment()

def annotate_storageclass_key_rotation(self, schedule="@weekly"):
"""
Annotate Storageclass To enable keyrotation for encrypted PV
"""
annot_str = f"keyrotation.csiaddons.openshift.io/schedule='{schedule}'"
log.info(f"Adding annotation to the storage class: {annot_str}")
self.sc_obj.annotate(annotation=annot_str)

@retry(UnexpectedBehaviour, tries=5, delay=20)
def compare_keys(self, device_handle, old_key):
"""
Compares the current key with the rotated key.
Args:
device_handle (str): The handle or identifier for the device.
old_key (str): The current key before rotation.
Returns:
bool: True if the key has rotated successfully.
Raises:
UnexpectedBehaviour: If the keys have not rotated.
"""
rotated_key = self.kms.get_pv_secret(device_handle)
if old_key == rotated_key:
raise UnexpectedBehaviour(
f"Keys are not rotated for device handle {device_handle}"
)
log.info(f"PV key rotated with new key : {rotated_key}")
return True

def wait_till_keyrotation(self, device_handle):
"""
Waits until the key rotation occurs for a given device handle.
Args:
device_handle (str): The handle or identifier for the device whose key
rotation is to be checked.
Returns:
bool: True if the key rotation is successful, otherwise False.
"""
old_key = self.kms.get_pv_secret(device_handle)
try:
self.compare_keys(device_handle, old_key)
except UnexpectedBehaviour:
log.error(f"Keys are not rotated for device handle {device_handle}")
assert False

return True
11 changes: 11 additions & 0 deletions ocs_ci/ocs/resources/ocs.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ def add_label(self, label):
self.reload()
return status

def annotate(self, annotation):
"""
Add annotation to the OCS Object.
Args:
annotation (str): New annotation to the OCS object.
"""
status = self.ocp.annotate(annotation, resource_name=self.name)
self.reload()
return status

def delete_temp_yaml_file(self):
utils.delete_file(self.temp_yaml)

Expand Down
101 changes: 90 additions & 11 deletions ocs_ci/utility/kms.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def __init__(self):
self.vault_namespace = None
self.vault_deploy_mode = config.ENV_DATA.get("vault_deploy_mode")
self.vault_backend_path = None
self.csi_vault_backend_path = None
self.vault_backend_version = config.ENV_DATA.get(
"VAULT_BACKEND", defaults.VAULT_DEFAULT_BACKEND_VERSION
)
Expand Down Expand Up @@ -605,22 +606,46 @@ def gather_vault_config(self):
vault_conf = load_auth_config()["vault"]
return vault_conf

def get_vault_connection_info(self, resource_name=None):
def get_vault_connection_info(
self,
resource_name=None,
resource_configmap=constants.VAULT_KMS_CONNECTION_DETAILS_RESOURCE,
):
"""
Get resource info from ocs-kms-connection-defatils
Get resource info from ocs-kms-connection-details or csi-kms-connection-details
ConfigMap.
Args:
resource_name (str): name of the resource
Returns:
str or None: The resource information, or None if not found.
"""
connection_details = ocp.OCP(
cm_obj = ocp.OCP(
kind="ConfigMap",
resource_name=constants.VAULT_KMS_CONNECTION_DETAILS_RESOURCE,
namespace=config.ENV_DATA["cluster_namespace"],
)
return connection_details.get().get("data")[resource_name]

def get_vault_backend_path(self):
if not cm_obj.is_exist(resource_configmap):
logger.info(f"Resource ConfigMap {resource_configmap} does not exist")
return None

cm_data = cm_obj.get(resource_configmap).get("data", {})

if resource_configmap == constants.VAULT_KMS_CONNECTION_DETAILS_RESOURCE:
return cm_data.get(resource_name)

if resource_configmap == constants.VAULT_KMS_CSI_CONNECTION_DETAILS:
for v in cm_data.values():
json_out = json.loads(v)
if json_out.get("KMS_SERVICE_NAME") == "vault":
return json_out.get(resource_name)

return None

def get_vault_backend_path(
self, resource_configmap=constants.VAULT_KMS_CONNECTION_DETAILS_RESOURCE
):
"""
Fetch the vault backend path used for this deployment
This can be obtained from kubernetes secret resource
Expand All @@ -636,11 +661,23 @@ def get_vault_backend_path(self):
VAULT_BACKEND_PATH: ocs
"""
if not self.vault_backend_path:
self.vault_backend_path = self.get_vault_connection_info(
"VAULT_BACKEND_PATH"
)
logger.info(f"setting vault_backend_path = {self.vault_backend_path}")
if resource_configmap == constants.VAULT_KMS_CONNECTION_DETAILS_RESOURCE:
if not self.vault_backend_path:
self.vault_backend_path = self.get_vault_connection_info(
resource_name="VAULT_BACKEND_PATH",
resource_configmap=resource_configmap,
)
logger.info(f"setting vault_backend_path = {self.vault_backend_path}")

elif resource_configmap == constants.VAULT_KMS_CSI_CONNECTION_DETAILS:
if not self.csi_vault_backend_path:
self.csi_vault_backend_path = self.get_vault_connection_info(
resource_name="VAULT_BACKEND_PATH",
resource_configmap=resource_configmap,
)
logger.info(f"setting vault_backend_path = {self.vault_backend_path}")
else:
logger.error(f"Wrong resource_configmap : {resource_configmap}.")

def get_vault_path_token(self):
"""
Expand Down Expand Up @@ -1206,6 +1243,48 @@ def create_vault_kube_auth_role(
if "Success" in out.decode():
logger.info(f"Role {role_name} created successfully")

def get_pv_secret(self, device_handle):
"""
Get secret stored in the vault KMS for the given device_handle
Args:
device_handle (str): PV device handle string
Returns:
secret (str): passphrase stored in the vault KMS for given device handle.
"""
if not self.csi_vault_backend_path:
self.get_vault_backend_path(
resource_configmap=constants.VAULT_KMS_CSI_CONNECTION_DETAILS
)

cmd = f"vault kv get -format=json {self.csi_vault_backend_path}/{device_handle}"
out = subprocess.check_output(shlex.split(cmd))
json_out = json.loads(out)

def find_passphrase(obj):
"""
Recursively searches for the 'passphrase' key in the JSON object.
"""
if isinstance(obj, dict):
for key, value in obj.items():
if key == "passphrase":
return value
elif isinstance(value, dict) or isinstance(value, list):
result = find_passphrase(value)
if result:
return result
elif isinstance(obj, list):
for item in obj:
result = find_passphrase(item)
if result:
return result
return None

secret = find_passphrase(json_out)

return secret


class HPCS(KMS):
"""
Expand Down
158 changes: 158 additions & 0 deletions tests/functional/pv/pv_encryption/test_pv_keyrotation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import logging
import pytest

from ocs_ci.framework.pytest_customization.marks import green_squad
from ocs_ci.framework.testlib import (
ManageTest,
tier1,
skipif_ocs_version,
kms_config_required,
skipif_managed_service,
skipif_hci_provider_and_client,
skipif_disconnected_cluster,
skipif_proxy_cluster,
config,
)
from ocs_ci.ocs import constants
from ocs_ci.helpers.keyrotation_helper import PVKeyrotation


log = logging.getLogger(__name__)

# Set the arg values based on KMS provider.
if config.ENV_DATA["KMS_PROVIDER"].lower() == constants.HPCS_KMS_PROVIDER:
kmsprovider = constants.HPCS_KMS_PROVIDER
argnames = ["kv_version", "kms_provider"]
argvalues = [
pytest.param("v1", kmsprovider),
]
else:
kmsprovider = constants.VAULT_KMS_PROVIDER
argnames = ["kv_version", "kms_provider", "use_vault_namespace"]
if config.ENV_DATA.get("vault_hcp"):
argvalues = [
pytest.param(
"v1", kmsprovider, True, marks=pytest.mark.polarion_id("OCS-6179")
),
pytest.param(
"v2", kmsprovider, True, marks=pytest.mark.polarion_id("OCS-6180")
),
]
else:
argvalues = [
pytest.param(
"v1", kmsprovider, False, marks=pytest.mark.polarion_id("OCS-6181")
),
pytest.param(
"v2", kmsprovider, False, marks=pytest.mark.polarion_id("OCS-6182")
),
]


@green_squad
@skipif_ocs_version("<4.17")
@kms_config_required
@skipif_managed_service
@skipif_hci_provider_and_client
@skipif_disconnected_cluster
@skipif_proxy_cluster
class TestPVKeyRotationWithVaultKMS(ManageTest):
"""
Test Key Rotation for encrypted PV.
"""

@pytest.fixture(autouse=True)
def setup(
self,
kv_version,
use_vault_namespace,
pv_encryption_kms_setup_factory,
):
"""
Setup csi-kms-connection-details configmap
"""
log.info("Setting up csi-kms-connection-details configmap")
self.kms = pv_encryption_kms_setup_factory(kv_version, use_vault_namespace)
log.info("csi-kms-connection-details setup successful")

@tier1
@pytest.mark.parametrize(
argnames=argnames,
argvalues=argvalues,
)
def test_encrypted_pvc_key_rotation(
self,
kms_provider,
project_factory,
storageclass_factory,
pvc_factory,
pod_factory,
):
"""
Test Encrypted PVC keyrotation.
Steps:
1. Create an encrypted RBD storage class.
2. Add an annotation to the encrypted storage class.
"keyrotation.csiaddons.openshift.io/schedule='*/3 * * * *'"
3. Configure the Persistent Volume (PV) encryption settings with the Vault service.
4. Create a PVC using the encrypted storage class.
5. Deploy a Pod that utilizes the previously created PVC for storage.
6. Start an IO workload with verify=True option.
7. Wait for the key rotation to occur for the PV.
8. Check for any error messages that appear during the IO operation.
"""
# Create a project
proj_obj = project_factory()

# Create an encryption enabled storageclass for RBD
sc_obj = storageclass_factory(
interface=constants.CEPHBLOCKPOOL,
encrypted=True,
encryption_kms_id=self.kms.kmsid,
allow_volume_expansion=False,
)

if kms_provider == constants.VAULT_KMS_PROVIDER:
# Create ceph-csi-kms-token in the tenant namespace
self.kms.vault_path_token = self.kms.generate_vault_token()
self.kms.create_vault_csi_kms_token(namespace=proj_obj.namespace)

# Annotate Storageclass for keyrotation.
pvk_obj = PVKeyrotation(sc_obj)
pvk_obj.annotate_storageclass_key_rotation(schedule="*/3 * * * *")

# Create RBD PVCs with volume mode Block
pvc_obj = pvc_factory(
interface=constants.CEPHBLOCKPOOL,
project=proj_obj,
storageclass=sc_obj,
size=10,
status=constants.STATUS_BOUND,
)

pod_obj = pod_factory(pvc=pvc_obj)

# Verify the pod status
log.info("Verifying the pod status.")
assert (
pod_obj.data["status"]["phase"] == constants.STATUS_RUNNING
), f"Pod {pod_obj.name} is not in {constants.STATUS_RUNNING} state."

pod_obj.run_io("fs", size="5G", verify=True, runtime=180)

# Verify PV Keyrotation.
assert pvk_obj.wait_till_keyrotation(
pvc_obj.get_pv_volume_handle_name
), f"Failed to rotate Key for the PVC {pvc_obj.name}"

log.info("Getting FIO results")
result = pod_obj.get_fio_results(timeout=180)

assert (
"Error" not in result
), f" IO Failed when Keyrotation operation happen for the PVC: {pvc_obj.name}"

0 comments on commit 047ca53

Please sign in to comment.