Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keyrotation verification added in the add_capacity test #9919

Merged
merged 2 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion ocs_ci/helpers/keyrotation_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from ocs_ci.ocs.exceptions import CommandFailed
from ocs_ci.framework import config
from ocs_ci.ocs.resources.pvc import get_deviceset_pvcs
from ocs_ci.ocs.exceptions import UnexpectedBehaviour
from ocs_ci.utility.retry import retry

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -259,7 +261,13 @@ def __init__(self):
Initializes RookKeyrotation object.
"""
super().__init__()
self.deviceset = [pvc.name for pvc in get_deviceset_pvcs()]
self.deviceset = self._get_deviceset()

def _get_deviceset(self):
"""
Listing deviceset for OSD.
"""
return [pvc.name for pvc in get_deviceset_pvcs()]

def is_osd_keyrotation_enabled(self):
"""
Expand Down Expand Up @@ -306,3 +314,39 @@ def get_osd_dm_crypt(self, device):
dmcrypt_key = self._exec_oc_cmd(cmd=cmd, out_yaml_format=False)
log.info(f"dmcrypt-key of device {device} is {dmcrypt_key}")
return dmcrypt_key

def verify_keyrotation(self, old_keys, tries=10, delay=20):
"""
Verify Keyrotation is suceeded for all OSD devices.

Args:
old_keys (dict): osd devices and their keys.

Returns:
bool: True if all OSD keyrotation is happend, orherwise False.
"""
log.info("Verifying OSD keyrotation is happening")

@retry(UnexpectedBehaviour, tries=tries, delay=delay)
def compare_old_with_new_keys():
for device in self._get_deviceset():
osd_keys_after_rotation = self.get_osd_dm_crypt(device)
log.info(
f"Fetching New Key for device {device}: {osd_keys_after_rotation}"
)
if old_keys[device] == osd_keys_after_rotation:
log.info(f"Keyrotation Still not happend for device {device}")
raise UnexpectedBehaviour(
f"Keyrotation is not happened for the device {device}"
)
log.info(f"Keyrotation is happend for device {device}")
return True

try:
compare_old_with_new_keys()
except UnexpectedBehaviour:
log.error("Key rotation is Not happend after schedule is passed. ")
assert False

log.info("Keyrotation is sucessfully done for the all OSD.")
return True
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from ocs_ci.helpers.pvc_ops import test_create_delete_pvcs
from ocs_ci.ocs.resources.storage_cluster import osd_encryption_verification
from ocs_ci.helpers.sanity_helpers import Sanity
from ocs_ci.utility.version import get_semantic_ocp_running_version, VERSION_4_16
from ocs_ci.helpers.keyrotation_helper import OSDKeyrotation

logger = logging.getLogger(__name__)

Expand All @@ -46,6 +48,19 @@
@skipif_managed_service
@skipif_hci_provider_and_client
class TestAddCapacity(ManageTest):
@pytest.fixture(autouse=True)
def teardown(self, request):
"""
Resetting the default value of KeyRotation
"""

def finalizer():
kr_obj = OSDKeyrotation()
kr_obj.set_keyrotation_schedule("@weekly")
kr_obj.enable_keyrotation()

request.addfinalizer(finalizer)

@pytest.fixture(autouse=True)
def setup(self):
"""
Expand Down Expand Up @@ -387,6 +402,37 @@ def test_add_capacity(
cluster_obj.get_ceph_health() != "HEALTH_ERR"
), "Ceph cluster health checking failed"

# Verify Keyrotation for newly added OSD are happning or not.
if (get_semantic_ocp_running_version() >= VERSION_4_16) and (
config.ENV_DATA.get("encryption_at_rest")
and (not config.DEPLOYMENT.get("kms_deployment"))
):
logger.info("Verifying Keyrotation for OSD")
osd_keyrotation = OSDKeyrotation()

# Recored existing OSD keys before rotation is happen.
osd_keys_before_rotation = {}
for device in osd_keyrotation.deviceset:
osd_keys_before_rotation[device] = osd_keyrotation.get_osd_dm_crypt(
device
)

# Enable Keyrotation and verify its enable status at rook and storagecluster end.
logger.info("Enabling the Keyrotation in storagecluster Spec.")
osd_keyrotation.enable_keyrotation()

# Set Key Rotation schedule to every 3 minutes.
schedule = "*/3 * * * *"
osd_keyrotation.set_keyrotation_schedule(schedule)

assert osd_keyrotation.verify_keyrotation(
osd_keys_before_rotation
), "Keyrotation not happend for the OSD."

# Change the keyrotation value to default.
logger.info("Changing the keyrotation value to default.")
osd_keyrotation.set_keyrotation_schedule("@weekly")

logger.info("ALL Exit criteria verification successfully")
logger.info(
"********************** TEST PASSED *********************************"
Expand Down
Loading