Skip to content

Commit

Permalink
Tests to verify RDR failover with CephFS (#8818)
Browse files Browse the repository at this point in the history
Signed-off-by: Jilju Joy <[email protected]>
  • Loading branch information
jilju authored Dec 4, 2023
1 parent 9b2ac9f commit 5f306b5
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 63 deletions.
6 changes: 6 additions & 0 deletions conf/ocsci/dr_workload.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,10 @@ ENV_DATA:
dr_workload_app_pvc_selector: {'appname': 'busybox_app5'}, pod_count: 20, pvc_count: 20
},
]

dr_workload_subscription_cephfs: [
{ name: "busybox-1", workload_dir: "rdr/busybox/cephfs/app-busybox-1", pod_count: 20, pvc_count: 20 },
{ name: "busybox-2", workload_dir: "rdr/busybox/cephfs/app-busybox-2", pod_count: 20, pvc_count: 4 },
]

# dr_policy_name: PLACEHOLDER
206 changes: 160 additions & 46 deletions ocs_ci/helpers/dr_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,38 @@ def get_vr_count(namespace):
return len(vr_items)


def get_replicationsources_count(namespace):
"""
Gets ReplicationSource resource count in given namespace
Args:
namespace (str): the namespace of the ReplicationSource resources
Returns:
int: ReplicationSource resource count
"""
rs_obj = ocp.OCP(kind=constants.REPLICATION_SOURCE, namespace=namespace)
rs_items = rs_obj.get().get("items")
return len(rs_items)


def get_replicationdestinations_count(namespace):
"""
Gets ReplicationDestination resource count in given namespace
Args:
namespace (str): the namespace of the ReplicationDestination resources
Returns:
int: ReplicationDestination resource count
"""
rd_obj = ocp.OCP(kind=constants.REPLICATIONDESTINATION, namespace=namespace)
rd_items = rd_obj.get().get("items")
return len(rd_items)


def check_vr_state(state, namespace):
"""
Check if all VRs in the given namespace are in expected state
Expand Down Expand Up @@ -435,9 +467,9 @@ def wait_for_replication_resources_creation(vr_count, namespace, timeout):
Wait for replication resources to be created
Args:
vr_count (int): Expected number of VR resources
namespace (str): the namespace of the VR resources
timeout (int): time in seconds to wait for VR resources to be created
vr_count (int): Expected number of VR resources or ReplicationSource count
namespace (str): the namespace of the VR or ReplicationSource resources
timeout (int): time in seconds to wait for VR or ReplicationSource resources to be created
or reach expected state
Raises:
TimeoutExpiredError: In case replication resources not created
Expand All @@ -452,28 +484,39 @@ def wait_for_replication_resources_creation(vr_count, namespace, timeout):
logger.error(error_msg)
raise TimeoutExpiredError(error_msg)

# TODO: Improve the parameter for condition
if "cephfs" in namespace:
resource_kind = constants.REPLICATION_SOURCE
count_function = get_replicationsources_count
else:
resource_kind = constants.VOLUME_REPLICATION
count_function = get_vr_count

if config.MULTICLUSTER["multicluster_mode"] != "metro-dr":
logger.info(f"Waiting for {vr_count} VRs to be created")
logger.info(f"Waiting for {vr_count} {resource_kind}s to be created")
sample = TimeoutSampler(
timeout=timeout,
sleep=5,
func=get_vr_count,
func=count_function,
namespace=namespace,
)
sample.wait_for_func_value(vr_count)

logger.info(f"Waiting for {vr_count} VRs to reach primary state")
sample = TimeoutSampler(
timeout=timeout,
sleep=5,
func=check_vr_state,
state="primary",
namespace=namespace,
)
if not sample.wait_for_func_status(result=True):
error_msg = "One or more VR haven't reached expected state primary within the time limit."
logger.error(error_msg)
raise TimeoutExpiredError(error_msg)
if resource_kind == constants.VOLUME_REPLICATION:
logger.info(
f"Waiting for {vr_count} {resource_kind}s to reach primary state"
)
sample = TimeoutSampler(
timeout=timeout,
sleep=5,
func=check_vr_state,
state="primary",
namespace=namespace,
)
if not sample.wait_for_func_status(result=True):
error_msg = "One or more VR haven't reached expected state primary within the time limit."
logger.error(error_msg)
raise TimeoutExpiredError(error_msg)

logger.info("Waiting for VRG to reach primary state")
sample = TimeoutSampler(
Expand Down Expand Up @@ -503,19 +546,28 @@ def wait_for_replication_resources_deletion(namespace, timeout, check_state=True
TimeoutExpiredError: In case replication resources not deleted
"""
# TODO: Improve the parameter for condition
if "cephfs" in namespace:
resource_kind = constants.REPLICATION_SOURCE
count_function = get_replicationsources_count
else:
resource_kind = constants.VOLUME_REPLICATION
count_function = get_vr_count

if check_state:
logger.info("Waiting for all VRs to reach secondary state")
sample = TimeoutSampler(
timeout=timeout,
sleep=5,
func=check_vr_state,
state="secondary",
namespace=namespace,
)
if not sample.wait_for_func_status(result=True):
error_msg = "One or more VR haven't reached expected state secondary within the time limit."
logger.error(error_msg)
raise TimeoutExpiredError(error_msg)
if resource_kind == constants.VOLUME_REPLICATION:
logger.info("Waiting for all VRs to reach secondary state")
sample = TimeoutSampler(
timeout=timeout,
sleep=5,
func=check_vr_state,
state="secondary",
namespace=namespace,
)
if not sample.wait_for_func_status(result=True):
error_msg = "One or more VR haven't reached expected state secondary within the time limit."
logger.error(error_msg)
raise TimeoutExpiredError(error_msg)

logger.info("Waiting for VRG to reach secondary state")
sample = TimeoutSampler(
Expand All @@ -532,21 +584,22 @@ def wait_for_replication_resources_deletion(namespace, timeout, check_state=True
logger.info(error_msg)
raise TimeoutExpiredError(error_msg)

logger.info("Waiting for VRG to be deleted")
sample = TimeoutSampler(
timeout=timeout, sleep=5, func=check_vrg_existence, namespace=namespace
)
if not sample.wait_for_func_status(result=False):
error_msg = "VRG resource not deleted"
logger.info(error_msg)
raise TimeoutExpiredError(error_msg)
if "cephfs" not in namespace:
logger.info("Waiting for VRG to be deleted")
sample = TimeoutSampler(
timeout=timeout, sleep=5, func=check_vrg_existence, namespace=namespace
)
if not sample.wait_for_func_status(result=False):
error_msg = "VRG resource not deleted"
logger.info(error_msg)
raise TimeoutExpiredError(error_msg)

if config.MULTICLUSTER["multicluster_mode"] != "metro-dr":
logger.info("Waiting for all VRs to be deleted")
logger.info(f"Waiting for all {resource_kind} to be deleted")
sample = TimeoutSampler(
timeout=timeout,
sleep=5,
func=get_vr_count,
func=count_function,
namespace=namespace,
)
sample.wait_for_func_value(0)
Expand Down Expand Up @@ -602,30 +655,91 @@ def wait_for_all_resources_deletion(
logger.info("Waiting for all pods to be deleted")
all_pods = get_all_pods(namespace=namespace)
for pod_obj in all_pods:
pod_obj.ocp.wait_for_delete(
resource_name=pod_obj.name, timeout=timeout, sleep=5
)
if "volsync-rsync-tls-dst" not in pod_obj.name:
pod_obj.ocp.wait_for_delete(
resource_name=pod_obj.name, timeout=timeout, sleep=5
)

wait_for_replication_resources_deletion(
namespace, timeout, check_replication_resources_state
)

logger.info("Waiting for all PVCs to be deleted")
all_pvcs = get_all_pvc_objs(namespace=namespace)

for pvc_obj in all_pvcs:
pvc_obj.ocp.wait_for_delete(
resource_name=pvc_obj.name, timeout=timeout, sleep=5
)
if "volsync-" not in pvc_obj.name:
pvc_obj.ocp.wait_for_delete(
resource_name=pvc_obj.name, timeout=timeout, sleep=5
)

if config.MULTICLUSTER["multicluster_mode"] != "metro-dr":
logger.info("Waiting for all PVs to be deleted")
# Check whether volsync PVCs are still present. The value from the previous step is not obtained because the
# PVCs might be in deleting state and the count may change.
num_of_volsync_pvc = len(
[
pvc_obj.name
for pvc_obj in get_all_pvc_objs(namespace=namespace)
if "volsync-" in pvc_obj.name
]
)
sample = TimeoutSampler(
timeout=timeout,
sleep=5,
func=get_pv_count,
namespace=namespace,
)
sample.wait_for_func_value(0)
sample.wait_for_func_value(num_of_volsync_pvc)


def wait_for_replication_destinations_creation(rep_dest_count, namespace, timeout=900):
"""
Wait for ReplicationDestination resources to be created
Args:
rep_dest_count (int): Expected number of ReplicationDestination resource
namespace (str): The namespace of the ReplicationDestination resources
timeout (int): Time in seconds to wait for ReplicationDestination resources to be created
Raises:
TimeoutExpiredError: If expected number of ReplicationDestination resources not created
"""

logger.info(
f"Waiting for {rep_dest_count} {constants.REPLICATIONDESTINATION} to be created"
)
sample = TimeoutSampler(
timeout=timeout,
sleep=5,
func=get_replicationdestinations_count,
namespace=namespace,
)
sample.wait_for_func_value(rep_dest_count)


def wait_for_replication_destinations_deletion(namespace, timeout=900):
"""
Wait for ReplicationDestination resources to be deleted
Args:
namespace (str): The namespace of the ReplicationDestination resources
timeout (int): Time in seconds to wait for ReplicationDestination resources to be deleted
Raises:
TimeoutExpiredError: If expected number of ReplicationDestination resources not deleted
"""

logger.info(f"Waiting for all {constants.REPLICATIONDESTINATION} to be deleted")
sample = TimeoutSampler(
timeout=timeout,
sleep=5,
func=get_replicationdestinations_count,
namespace=namespace,
)
sample.wait_for_func_value(0)


def get_image_uuids(namespace):
Expand Down
2 changes: 2 additions & 0 deletions ocs_ci/ocs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@
ENDPOINTS = "Endpoints"
WEBHOOK = "ValidatingWebhookConfiguration"
ROOK_CEPH_WEBHOOK = "rook-ceph-webhook"
REPLICATION_SOURCE = "ReplicationSource"
REPLICATIONDESTINATION = "ReplicationDestination"

# Provisioners
AWS_EFS_PROVISIONER = "openshift.org/aws-efs"
Expand Down
20 changes: 13 additions & 7 deletions ocs_ci/ocs/dr/dr_workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def delete_workload(self, force=False, rbd_name="rbd"):
check_replication_resources_state=False,
)

log.info("Verify backend RBD images are deleted")
log.info("Verify backend images or subvolumes are deleted")
for cluster in get_non_acm_cluster_config():
config.switch_ctx(cluster.MULTICLUSTER["multicluster_index"])
rbd_pool_name = (
Expand All @@ -248,14 +248,20 @@ def delete_workload(self, force=False, rbd_name="rbd"):
else constants.DEFAULT_CEPHBLOCKPOOL
)
for image_uuid in image_uuids:
status = verify_volume_deleted_in_backend(
interface=constants.CEPHBLOCKPOOL,
image_uuid=image_uuid,
pool_name=rbd_pool_name,
)
# TODO: Add a better condition to identify CephFS or RBD
if "cephfs" in self.workload_namespace:
status = verify_volume_deleted_in_backend(
interface=constants.CEPHFILESYSTEM, image_uuid=image_uuid
)
else:
status = verify_volume_deleted_in_backend(
interface=constants.CEPHBLOCKPOOL,
image_uuid=image_uuid,
pool_name=rbd_pool_name,
)
if not status:
raise UnexpectedBehaviour(
"RBD image(s) still exists on backend"
"Images/subvolumes still exists on backend"
)

except (
Expand Down
17 changes: 14 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6390,11 +6390,15 @@ def dr_workload(request):
"""
instances = []

def factory(num_of_subscription=1, num_of_appset=0):
def factory(
num_of_subscription=1, num_of_appset=0, pvc_interface=constants.CEPHBLOCKPOOL
):
"""
Args:
num_of_subscription (int): Number of Subscription type workload to be created
num_of_appset (int): Number of ApplicationSet type workload to be created
pvc_interface (str): 'CephBlockPool' or 'CephFileSystem'.
This decides whether a RBD based or CephFS based resource is created. RBD is default.
Raises:
ResourceNotDeleted: In case workload resources not deleted properly
Expand All @@ -6404,8 +6408,12 @@ def factory(num_of_subscription=1, num_of_appset=0):
"""
total_pvc_count = 0
workload_key = "dr_workload_subscription"
if pvc_interface == constants.CEPHFILESYSTEM:
workload_key = "dr_workload_subscription_cephfs"

for index in range(num_of_subscription):
workload_details = ocsci_config.ENV_DATA["dr_workload_subscription"][index]
workload_details = ocsci_config.ENV_DATA[workload_key][index]
workload = BusyBox(
workload_dir=workload_details["workload_dir"],
workload_pod_count=workload_details["pod_count"],
Expand All @@ -6430,7 +6438,10 @@ def factory(num_of_subscription=1, num_of_appset=0):
total_pvc_count += workload_details["pvc_count"]
workload.deploy_workload()
if ocsci_config.MULTICLUSTER["multicluster_mode"] != "metro-dr":
dr_helpers.wait_for_mirroring_status_ok(replaying_images=total_pvc_count)
if pvc_interface != constants.CEPHFILESYSTEM:
dr_helpers.wait_for_mirroring_status_ok(
replaying_images=total_pvc_count
)
return instances

def teardown():
Expand Down
Loading

0 comments on commit 5f306b5

Please sign in to comment.