From 6697ef76e998b9ec0e2d373153d8b463386d7a20 Mon Sep 17 00:00:00 2001 From: Mahesh Shetty Date: Mon, 2 Dec 2024 13:21:21 +0530 Subject: [PATCH] Improve docstrings and implement delete objects steps Signed-off-by: Mahesh Shetty --- ocs_ci/helpers/mcg_stress_helper.py | 71 ++++++++++++++++--- ocs_ci/ocs/bucket_utils.py | 3 +- .../stress/test_noobaa_under_stress.py | 52 ++++++++++---- 3 files changed, 105 insertions(+), 21 deletions(-) diff --git a/ocs_ci/helpers/mcg_stress_helper.py b/ocs_ci/helpers/mcg_stress_helper.py index 0dc6e29648b..f78ca692ba7 100644 --- a/ocs_ci/helpers/mcg_stress_helper.py +++ b/ocs_ci/helpers/mcg_stress_helper.py @@ -11,6 +11,8 @@ sync_object_directory, rm_object_recursive, s3_list_objects_v2, + list_objects_in_batches, + s3_delete_objects, ) logger = logging.getLogger(__name__) @@ -25,9 +27,12 @@ def upload_objs_to_buckets(mcg_obj, pod_obj, buckets, iteration_no, event=None): mcg_obj (MCG): MCG object pod_obj (Pod): Pod object buckets (Dict): Map of bucket type and bucket object + iteration_no (int): Integer value representing iteration + event (threading.Event()): Event object to signal the execution + completion """ - src_path = "/complex_directory/dir_0_0/dir_1_0/dir_2_0/dir_3_0/" + src_path = "/complex_directory/" logger.info(f"Uploading objects to all the buckets under prefix {iteration_no}") try: @@ -47,7 +52,7 @@ def upload_objs_to_buckets(mcg_obj, pod_obj, buckets, iteration_no, event=None): src_path, f"s3://{bucket.name}/{iteration_no}/", s3_obj, - timeout=1200, + timeout=20000, ) futures.append(future) @@ -67,6 +72,19 @@ def upload_objs_to_buckets(mcg_obj, pod_obj, buckets, iteration_no, event=None): def run_noobaa_metadata_intense_ops( mcg_obj, pod_obj, bucket_factory, bucket, iteration_no, event=None ): + """ + Perfrom metdata intense operations to stress Noobaa + + Args: + mcg_obj (MCG): MCG object + pod_obj (Pod): Noobaa stress CLI pod object + bucket_factory (fixture): Pytest fixture for creating bucket + bucket (tuple): Tuple consisting of backend storage type and bucket object + iteration_no (int): Iteration number or prefix from where should delete objects + event (threading.Event()): Event object to signal the execution + completion + + """ bucket_type, bucket_obj = bucket bucket_name = bucket_obj.name @@ -126,6 +144,7 @@ def _run_object_metadata_ops(): target=bucket_name, prefix=iteration_no, s3_obj=s3_obj, + timeout=6000, recursive=True, ) while True: @@ -207,9 +226,11 @@ def delete_objs_from_bucket(pod_obj, bucket, iteration_no, event=None): Args: pod_obj (Pod): Noobaa stress CLI pod object - mcg_obj (MCG): MCG object - bucket_name (str): Name of the bucket + bucket (Tuple): Tuple consisting of backend storage type and bucket object iteration_no (int): Iteration number or prefix from where should delete objects + event (threading.Event()): Event object to signal the execution + completion + """ bucket_type, bucket_obj = bucket bucket_name = bucket_obj.name @@ -227,6 +248,7 @@ def delete_objs_from_bucket(pod_obj, bucket, iteration_no, event=None): bucket_name, mcg_obj, prefix=iteration_no, + timeout=6000, ) logger.info( f"Successfully completed object deletion operation on bucket {bucket_name} under prefix {iteration_no}" @@ -238,9 +260,11 @@ def list_objs_from_bucket(bucket, iteration_no, event=None): List objects from bucket Args: - mcg_obj (MCG): MCG object - bucket_name (str): Name of the bucket + bucket (Tuple): Tuple consisting of backend storage type and bucket object iteration_no (int): Iteration number or prefix from where should list objects + event (threading.Event()): Event object to signal the execution + completion + """ bucket_type, bucket_obj = bucket bucket_name = bucket_obj.name @@ -270,10 +294,12 @@ def download_objs_from_bucket(pod_obj, bucket, target_dir, iteration_no, event=N Args: pod_obj (Pod): Noobaa stress CLI pod object - mcg_obj (MCG): MCG object - bucket_name (str): Name of the bucket + bucket (Tuple): Tuple consisting of backend storage type and bucket object target_dir (str): Target directory to download objects iteration_no (int): Iteration number or prefix from where should download objects + event (threading.Event()): Event object to signal the execution + completion + """ bucket_type, bucket_obj = bucket bucket_name = bucket_obj.name @@ -291,6 +317,7 @@ def download_objs_from_bucket(pod_obj, bucket, target_dir, iteration_no, event=N f"s3://{bucket_name}/{iteration_no}", target_dir, mcg_obj, + timeout=6000, ) if event.is_set(): @@ -298,3 +325,31 @@ def download_objs_from_bucket(pod_obj, bucket, target_dir, iteration_no, event=N f"Successfully completed object download operation on bucket {bucket_name} under prefix {iteration_no}" ) break + + +def delete_objects_in_batches(bucket, batch_size): + """ + Delete objects from the bucket in batches + + Args: + bucket (tuple): Tuple consisting of backend storage type and bucket object + batch_size (int): Number of objects to delete at a time + + """ + bucket_type, bucket_obj = bucket + bucket_name = bucket_obj.name + if bucket_type.upper() == "RGW": + mcg_obj = OBC(bucket_name) + else: + mcg_obj = MCG() + + logger.info(f"Deleting objects in bucket {bucket_name} in batches of {batch_size}") + total_objs_deleted = 0 + for obj_batch in list_objects_in_batches( + mcg_obj, bucket_name, batch_size=batch_size, yield_individual=False + ): + s3_delete_objects(mcg_obj, bucket_name, obj_batch) + total_objs_deleted += batch_size + logger.info( + f"Total objects deleted {total_objs_deleted} in bucket {bucket_name}" + ) diff --git a/ocs_ci/ocs/bucket_utils.py b/ocs_ci/ocs/bucket_utils.py index 257d976b13a..f5aec741f91 100644 --- a/ocs_ci/ocs/bucket_utils.py +++ b/ocs_ci/ocs/bucket_utils.py @@ -579,7 +579,7 @@ def download_objects_using_s3cmd( ), "Failed to download objects" -def rm_object_recursive(podobj, target, mcg_obj, option="", prefix=None): +def rm_object_recursive(podobj, target, mcg_obj, option="", prefix=None, timeout=600): """ Remove bucket objects with --recursive option @@ -606,6 +606,7 @@ def rm_object_recursive(podobj, target, mcg_obj, option="", prefix=None): mcg_obj.access_key, mcg_obj.s3_internal_endpoint, ], + timeout=timeout, ) diff --git a/tests/cross_functional/stress/test_noobaa_under_stress.py b/tests/cross_functional/stress/test_noobaa_under_stress.py index 402a9f24e50..23b9646d72c 100644 --- a/tests/cross_functional/stress/test_noobaa_under_stress.py +++ b/tests/cross_functional/stress/test_noobaa_under_stress.py @@ -1,6 +1,7 @@ import logging import random +from ocs_ci.framework.pytest_customization.marks import magenta_squad from threading import Event from concurrent.futures import ThreadPoolExecutor from ocs_ci.helpers.mcg_stress_helper import ( @@ -9,11 +10,13 @@ delete_objs_from_bucket, list_objs_from_bucket, download_objs_from_bucket, + delete_objects_in_batches, ) logger = logging.getLogger(__name__) +@magenta_squad class TestNoobaaUnderStress: base_setup_buckets = list() @@ -27,11 +30,22 @@ def test_noobaa_under_stress( stress_test_directory_setup, bucket_factory, ): - - # fetch buckets created for stress testing + """ + Stress Noobaa by performing bulk s3 operations. This consists mainly 3 stages + mentioned below + 1. Base setup: Here we create the buckets of all possible types and then + load them with million objects in deep directory + 2. S3 bulk operations: Here we perform various s3 operations such as list, + download, delete, metadata intense op etc concurrently on each of the bucket + respectively. + 3. At the end delete objects from all the bucket in batches + + """ + + # Fetch buckets created for stress testing self.base_setup_buckets = setup_stress_testing_bucket() - # upload objects to the buckets created concurrently + # Upload objects to the buckets created concurrently upload_objs_to_buckets( mcg_obj_session, nb_stress_cli_pod, @@ -39,22 +53,21 @@ def test_noobaa_under_stress( iteration_no=0, ) - # iterate and stress the cluster with object upload + # Iterate and stress the cluster with object upload # and other IO operations total_iterations = 4 executor = ThreadPoolExecutor(max_workers=5) futures_obj = list() for i in range(1, total_iterations): logger.info(f"Performing Iteration {i} of stressing the cluster") - # buckets = list(self.base_setup_buckets.keys()) buckets = [ (type, bucket) for type, bucket in self.base_setup_buckets.items() ] - # instantiate event object + # Instantiate event object event = Event() - # perform object upload operation + # Perform object upload operation # concurrently futures_obj.append( executor.submit( @@ -67,7 +80,7 @@ def test_noobaa_under_stress( ) ) - # perform metadata intense operations + # Perform metadata intense operations # on randomly selected bucket bucket = random.choice(buckets) futures_obj.append( @@ -83,7 +96,7 @@ def test_noobaa_under_stress( ) buckets.remove(bucket) - # perform object deletion on a + # Perform object deletion on a # randomly selected bucket bucket = random.choice(buckets) futures_obj.append( @@ -97,7 +110,7 @@ def test_noobaa_under_stress( ) buckets.remove(bucket) - # perform object listing on a + # Perform object listing on a # randomly selected bucket bucket = random.choice(buckets) futures_obj.append( @@ -110,7 +123,7 @@ def test_noobaa_under_stress( ) buckets.remove(bucket) - # perform object download on + # Perform object download on # a randomly selected bucket bucket = random.choice(buckets) futures_obj.append( @@ -128,9 +141,24 @@ def test_noobaa_under_stress( f"rm -rf {stress_test_directory_setup.result_dir}/" ) - # wait until all the object operations are done + # Wait until all the object operations are done logger.info( "Waiting all the Object upload and IO operations for the current iteration is completed" ) for future in futures_obj: future.result() + + # Delete all the objects from the bucket + # in batches of 20K objects at a time + buckets = [(type, bucket) for type, bucket in self.base_setup_buckets.items()] + with ThreadPoolExecutor() as executor: + futures = list() + for bucket in buckets: + future = executor.submit( + delete_objects_in_batches, bucket, batch_size=20000 + ) + futures.append(future) + + logger.info("Waiting for all the delete object operations to complete") + for future in futures: + future.result()