Skip to content

Commit

Permalink
Test Automation for validation of PV encryption with Azure KV
Browse files Browse the repository at this point in the history
Signed-off-by: Parag Kamble <[email protected]>
  • Loading branch information
paraggit committed May 31, 2024
1 parent 6b56278 commit 15d4a07
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 4 deletions.
54 changes: 50 additions & 4 deletions ocs_ci/utility/kms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
currently supported KMSs: Vault and HPCS
"""

import logging
import os

Expand Down Expand Up @@ -1929,11 +1930,18 @@ def create_azure_kv_csi_kms_connection_details(self):
logger.info(
f"Adding Azure connection to existing ConfigMap {constants.AZURE_KV_CSI_CONNECTION_DETAILS}"
)
param = (
f'[{{"op": "add", "path": "/data/{self.azure_kms_connection_name}", '
f'"value": "{json.dumps(azure_conn)}"}}]'

param = json.dumps(
[
{
"op": "add",
"path": f"/data/{self.azure_kms_connection_name}",
"value": json.dumps(azure_conn),
}
]
)
csi_kms_configmap.patch(params=param, format_type="merge")

csi_kms_configmap.patch(params=param, format_type="json")

# verifying ConfigMap is created or not.
self.is_azure_kv_connection_exists()
Expand Down Expand Up @@ -2059,6 +2067,44 @@ def verify_osd_keys_present_on_azure_kv(self):
logger.info("All OSD keys are present in the Azure KV ")
return True

def remove_kmsid(self):
"""
Removing azure kmsid from the configmap `csi-kms-connection-details`.
"""
if not self.is_azure_kv_connection_exists():
logger.info(
f"There is no KMS connection {self.azure_kms_connection_name} available in the configmap"
)
return False

csi_kms_configmap = ocp.OCP(
kind=constants.CONFIGMAP,
resource_name=constants.VAULT_KMS_CSI_CONNECTION_DETAILS,
namespace=self.namespace,
)

if len(get_encryption_kmsid()) == 1:
# removing configmap csi-kms-connection-details.
csi_kms_configmap.delete()
else:
params = json.dumps(
[{"op": "remove", "path": f"/data/{self.azure_kms_connection_name}"}]
)
csi_kms_configmap.patch(params=params, format_type="json")
return True

def verify_pv_secrets_present_in_azure_kv(self, vol_handle):
"""
Verify Azure KV has the secrets for given volume handle.
"""
secrets = self.azure_kv_secrets()
if vol_handle in secrets:
logger.info(f"PV sceret for {vol_handle} is found in the Azure KV.")
return True

logger.info(f"PV secret for {vol_handle} not found in the Azure KV.")
return False


kms_map = {"vault": Vault, "hpcs": HPCS, "kmip": KMIP, "azure-kv": AzureKV}

Expand Down
26 changes: 26 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4928,6 +4928,8 @@ def pv_encryption_kms_setup_factory(request):
# set the KMS provider based on KMS_PROVIDER env value.
if ocsci_config.ENV_DATA["KMS_PROVIDER"].lower() == constants.HPCS_KMS_PROVIDER:
return pv_encryption_hpcs_setup_factory(request)
elif ocsci_config.ENV_DATA["KMS_PROVIDER"] == constants.AZURE_KV_PROVIDER_NAME:
return pv_encryption_azure_kv_setup_factory(request)
else:
return pv_encryption_vault_setup_factory(request)

Expand Down Expand Up @@ -5168,6 +5170,30 @@ def finalizer():
return factory


def pv_encryption_azure_kv_setup_factory(request):
"""
Create a Azure KV resource and returh the azure KV Object.
"""
kms = KMS.AzureKV()

def factory():
"""
Create a Azure KV resources in the cluster
"""
# setup KMS connection details.
kms.create_azure_kv_csi_kms_connection_details()
return kms

def finalizer():
"""
Cleanup Azure KV resources from the cluster.
"""
kms.remove_kmsid()

request.addfinalizer(finalizer)
return factory


@pytest.fixture(scope="class")
def cephblockpool_factory_ui_class(request, setup_ui_class):
return cephblockpool_factory_ui_fixture(request, setup_ui_class)
Expand Down
147 changes: 147 additions & 0 deletions tests/functional/pv/pv_encryption/test_azure_kms_pv_encryption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import pytest
import logging

from ocs_ci.framework.pytest_customization.marks import (
tier1,
green_squad,
azure_platform_required,
kms_config_required,
polarion_id,
skipif_ocs_version,
skipif_managed_service,
skipif_hci_provider_and_client,
skipif_disconnected_cluster,
skipif_proxy_cluster,
)

from ocs_ci.ocs import constants
from ocs_ci.helpers.helpers import create_pods
from ocs_ci.ocs.node import verify_crypt_device_present_onnode

log = logging.getLogger(__name__)


@tier1
@green_squad
@azure_platform_required
@kms_config_required
@skipif_ocs_version("<4.16")
@skipif_managed_service
@skipif_hci_provider_and_client
@skipif_disconnected_cluster
@skipif_proxy_cluster
class TestAzureKMSPVEncryption:
@pytest.fixture(autouse=True)
def setup(
self,
pv_encryption_kms_setup_factory,
):
"""
Setup csi-kms-connection-details configmap
"""
log.info("Setting up csi-kms-connection-details configmap")
self.kms = pv_encryption_kms_setup_factory()
log.info("csi-kms-connection-details setup successful")

@polarion_id("OCS-5795")
def test_azure_kms_pv_encryption(
self, project_factory, storageclass_factory, multi_pvc_factory, pod_factory
):
"""
Verify Azure KV encryption operation with PV encryption.
Steps:
1. Set up Azure KV with the cluster.
2. Create multiple PVCs and attach them to pods.
3. Verify in Azure secrets that each PV has its corresponding secrets stored.
4. Start IO from the pods on the encrypted PVCs.
5. Verify that each PV's encrypted device is present on the node.
6. Wait until IO is complete.
7. Delete all the pods and PVCs that were created.
8. Verify that after deleting the PVCs, the respective secrets are also removed from the Azure Vault.
"""
# Create a project
proj_obj = project_factory()

# Create an encryption enabled storageclass for RBD
sc_obj = storageclass_factory(
interface=constants.CEPHBLOCKPOOL,
encrypted=True,
encryption_kms_id=self.kms.azure_kms_connection_name,
)

# Create RBD PVCs with volume mode Block
pvc_size = 5
pvc_objs = multi_pvc_factory(
interface=constants.CEPHBLOCKPOOL,
project=proj_obj,
storageclass=sc_obj,
size=pvc_size,
access_modes=[
f"{constants.ACCESS_MODE_RWX}-Block",
f"{constants.ACCESS_MODE_RWO}-Block",
],
status=constants.STATUS_BOUND,
num_of_pvc=3,
wait_each=False,
)

# Create pods
pod_objs = create_pods(
pvc_objs,
pod_factory,
constants.CEPHBLOCKPOOL,
pods_for_rwx=1,
status=constants.STATUS_RUNNING,
)

# # Verify Keys are create on the Azure KV
vol_handles = []
for pvc_obj in pvc_objs:
pv_obj = pvc_obj.backed_pv_obj
vol_handle = pv_obj.get().get("spec").get("csi").get("volumeHandle")
vol_handles.append(vol_handle)

assert self.kms.verify_pv_secrets_present_in_azure_kv(
vol_handle
), f"PV secret for vol_handle : {vol_handle} not found in the Azure KV"

# pass

# Verify whether encrypted device is present inside the pod and run IO
for vol_handle, pod_obj in zip(vol_handles, pod_objs):
node = pod_obj.get_node()
assert verify_crypt_device_present_onnode(
node, vol_handle
), f"Crypt devicve {vol_handle} not found on node:{node}"

pod_obj.run_io(
storage_type="block",
size=f"{pvc_size - 1}G",
io_direction="write",
runtime=60,
)
log.info("IO started on all pods")

# Wait for IO completion
for pod_obj in pod_objs:
pod_obj.get_fio_results()
log.info("IO completed on all pods")

# Delete the pod
for pod_obj in pod_objs:
pod_obj.delete()
pod_obj.ocp.wait_for_delete(resource_name=pod_obj.name)

# Delete the PVC
for pvc_obj in pvc_objs:
pv_obj = pvc_obj.backed_pv_obj
pvc_obj.delete()
pv_obj.ocp.wait_for_delete(resource_name=pv_obj.name)

for vol_handle in vol_handles:
assert not self.kms.verify_pv_secrets_present_in_azure_kv(
vol_handle
), f"PV secret for vol_handle : {vol_handle} not removed the Azure KV"

0 comments on commit 15d4a07

Please sign in to comment.