From 2feb2f092aee1338c9d158a01d64af38c62301a2 Mon Sep 17 00:00:00 2001 From: Mahesh Shetty Date: Tue, 26 Nov 2024 17:47:51 +0530 Subject: [PATCH] Implemet delete objects, list objects, download objects Signed-off-by: Mahesh Shetty --- ocs_ci/helpers/mcg_stress_helper.py | 300 +++++++++++++----- ocs_ci/ocs/bucket_utils.py | 9 +- tests/conftest.py | 9 + tests/cross_functional/conftest.py | 1 + .../stress/test_noobaa_under_stress.py | 118 ++++++- 5 files changed, 362 insertions(+), 75 deletions(-) diff --git a/ocs_ci/helpers/mcg_stress_helper.py b/ocs_ci/helpers/mcg_stress_helper.py index d8bc1c30e766..0dc6e29648bd 100644 --- a/ocs_ci/helpers/mcg_stress_helper.py +++ b/ocs_ci/helpers/mcg_stress_helper.py @@ -1,6 +1,7 @@ import logging import concurrent.futures +from ocs_ci.ocs.resources.mcg import MCG from ocs_ci.ocs.resources.objectbucket import OBC from ocs_ci.ocs.resources.bucket_policy import NoobaaAccount from ocs_ci.ocs.resources.mcg_lifecycle_policies import LifecyclePolicy, ExpirationRule @@ -8,12 +9,14 @@ s3_copy_object, list_objects_from_bucket, sync_object_directory, + rm_object_recursive, + s3_list_objects_v2, ) logger = logging.getLogger(__name__) -def upload_objs_to_buckets(mcg_obj, pod_obj, buckets): +def upload_objs_to_buckets(mcg_obj, pod_obj, buckets, iteration_no, event=None): """ This will upload objects present in the stress-cli pod to the buckets provided concurrently @@ -24,29 +27,48 @@ def upload_objs_to_buckets(mcg_obj, pod_obj, buckets): buckets (Dict): Map of bucket type and bucket object """ - src_path = "/complex_directory/dir_0_0/dir_1_0/dir_2_0/dir_3_0/dir_4_0/dir_5_0/dir_6_0/dir_7_0/dir_8_0/dir_9_0/" - - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = list() - for type, bucket in buckets.items(): - if type == "rgw": - s3_obj = OBC(bucket.name) - else: - s3_obj = mcg_obj - logger.info(f"OBJECT UPLOAD: Uploading objects to the bucket {bucket.name}") - future = executor.submit( - sync_object_directory, pod_obj, src_path, f"s3://{bucket.name}", s3_obj - ) - futures.append(future) + src_path = "/complex_directory/dir_0_0/dir_1_0/dir_2_0/dir_3_0/" + + logger.info(f"Uploading objects to all the buckets under prefix {iteration_no}") + try: + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = list() + for type, bucket in buckets.items(): + if type == "rgw": + s3_obj = OBC(bucket.name) + else: + s3_obj = mcg_obj + logger.info( + f"OBJECT UPLOAD: Uploading objects to the bucket {bucket.name}" + ) + future = executor.submit( + sync_object_directory, + pod_obj, + src_path, + f"s3://{bucket.name}/{iteration_no}/", + s3_obj, + timeout=1200, + ) + futures.append(future) + logger.info( + "OBJECT UPLOAD: Waiting for the objects upload to complete for all the buckets" + ) + for future in concurrent.futures.as_completed(futures): + future.result() + finally: logger.info( - "OBJECT UPLOAD: Waiting for the objects upload to complete for all the buckets" + "Setting the event to indicate that upload objects operation is either completed or failed" ) - for future in concurrent.futures.as_completed(futures): - future.result() + if event: + event.set() -def run_noobaa_metadata_intense_ops(mcg_obj, pod_obj, bucket_factory, bucket_name): +def run_noobaa_metadata_intense_ops( + mcg_obj, pod_obj, bucket_factory, bucket, iteration_no, event=None +): + bucket_type, bucket_obj = bucket + bucket_name = bucket_obj.name # Run metadata specific to bucket def _run_bucket_ops(): @@ -56,27 +78,36 @@ def _run_bucket_ops(): stressing the noobaa db through lot of metadata related operations """ - buckets_created = list() - for i in range(0, 10): - # create 100K buckets - bucket = bucket_factory()[0] - logger.info(f"METADATA OP: Created bucket {bucket.name}") + while True: + buckets_created = list() + for i in range(0, 10): + # create 100K buckets + bucket = bucket_factory()[0] + logger.info(f"METADATA OP: Created bucket {bucket.name}") - # set lifecycle config for each buckets - lifecycle_policy = LifecyclePolicy(ExpirationRule(days=1)) - mcg_obj.s3_client.put_bucket_lifecycle_configuration( - Bucket=bucket.name, LifecycleConfiguration=lifecycle_policy.as_dict() - ) - logger.info( - f"METADATA OP: Applied bucket lifecycle policy for the bucket {bucket.name}" - ) - buckets_created.append(bucket) + # set lifecycle config for each buckets + lifecycle_policy = LifecyclePolicy(ExpirationRule(days=1)) + mcg_obj.s3_client.put_bucket_lifecycle_configuration( + Bucket=bucket.name, + LifecycleConfiguration=lifecycle_policy.as_dict(), + ) + logger.info( + f"METADATA OP: Applied bucket lifecycle policy for the bucket {bucket.name}" + ) + buckets_created.append(bucket) - # delete the buckets - for bucket in buckets_created: - bucket.delete() - logger.info(f"METADATA OP: Deleted bucket {bucket.name}") + # delete the buckets + for bucket in buckets_created: + bucket.delete() + logger.info(f"METADATA OP: Deleted bucket {bucket.name}") + + if event.is_set(): + logger.info( + f"Successfully completed bucket creation/deletion operation in the background" + f" for the current iteration {iteration_no+1}" + ) + break def _run_object_metadata_ops(): """ @@ -85,26 +116,41 @@ def _run_object_metadata_ops(): """ # set metadata for each object present in the given bucket + if bucket_type.upper() == "RGW": + s3_obj = OBC(bucket_name) + else: + s3_obj = mcg_obj + objs_in_bucket = list_objects_from_bucket( pod_obj=pod_obj, target=bucket_name, - s3_obj=mcg_obj, + prefix=iteration_no, + s3_obj=s3_obj, recursive=True, ) + while True: + for obj in objs_in_bucket: + object_key = obj.split("/")[-1] + metadata = {f"new-{object_key}": f"new-{object_key}"} + s3_copy_object( + s3_obj, + bucket_name, + source=f"{bucket_name}/{obj}", + object_key=object_key, + metadata=metadata, + ) + logger.info( + f"METADATA OP: Updated metadata for object {object_key} in bucket {bucket_name}" + ) - for obj in objs_in_bucket: - object_key = obj.split("/")[-1] - metadata = {f"new-{object_key}": f"new-{object_key}"} - s3_copy_object( - mcg_obj, - bucket_name, - source=f"{bucket_name}/{obj}", - object_key=object_key, - metadata=metadata, - ) - logger.info( - f"METADATA OP: Updated metadata for object {object_key} in bucket {bucket_name}" - ) + if event.is_set(): + break + if event.is_set(): + logger.info( + f"Successfully completed the metadata update operation" + f" in the background for the iteration {iteration_no+1}" + ) + break def _run_noobaa_account_ops(): """ @@ -113,30 +159,142 @@ def _run_noobaa_account_ops(): """ # create 100K of noobaa accounts - nb_accounts_created = list() - for i in range(0, 10): - nb_account = NoobaaAccount( - mcg_obj, - name=f"nb-acc-{i}", - email=f"nb-acc-{i}@email", - ) - nb_accounts_created.append(nb_account) + while True: + nb_accounts_created = list() + for i in range(0, 10): + nb_account = NoobaaAccount( + mcg_obj, + name=f"nb-acc-{i}", + email=f"nb-acc-{i}@email", + ) + nb_accounts_created.append(nb_account) + logger.info( + f"METADATA OP: Created Noobaa account {nb_account.account_name}" + ) + + # for nb_acc in nb_accounts_created: + # nb_acc.update_account(new_email=f"new-{nb_acc.email_id}") + # logger.info(f"METADATA OP: Updated noobaa account {nb_acc.account_name}") + + for nb_acc in nb_accounts_created: + nb_acc.delete_account() + logger.info( + f"METADATA OP: Deleted noobaa account {nb_acc.account_name}" + ) + + if event.is_set(): + logger.info( + f"Successfully completed noobaa account creation/update/deletion operation" + f" in the background for the iteration {iteration_no+1}" + ) + break + + # run the above metadata intense ops parallel + logger.info("Initiating metadata ops") + executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) + futures_obj = list() + futures_obj.append(executor.submit(_run_bucket_ops)) + futures_obj.append(executor.submit(_run_object_metadata_ops)) + futures_obj.append(executor.submit(_run_noobaa_account_ops)) + logger.info("Waiting until all the upload objects operations are completed") + for future in futures_obj: + future.result() + + +def delete_objs_from_bucket(pod_obj, bucket, iteration_no, event=None): + """ + Delete all the objects from a bucket + + Args: + pod_obj (Pod): Noobaa stress CLI pod object + mcg_obj (MCG): MCG object + bucket_name (str): Name of the bucket + iteration_no (int): Iteration number or prefix from where should delete objects + """ + bucket_type, bucket_obj = bucket + bucket_name = bucket_obj.name + if bucket_type.upper() == "RGW": + mcg_obj = OBC(bucket_name) + else: + mcg_obj = MCG() + + logger.info( + f"DELETE OP: Delete objects recursively from the bucket {bucket_name} under prefix {iteration_no}" + ) + + rm_object_recursive( + pod_obj, + bucket_name, + mcg_obj, + prefix=iteration_no, + ) + logger.info( + f"Successfully completed object deletion operation on bucket {bucket_name} under prefix {iteration_no}" + ) + + +def list_objs_from_bucket(bucket, iteration_no, event=None): + """ + List objects from bucket + + Args: + mcg_obj (MCG): MCG object + bucket_name (str): Name of the bucket + iteration_no (int): Iteration number or prefix from where should list objects + """ + bucket_type, bucket_obj = bucket + bucket_name = bucket_obj.name + if bucket_type.upper() == "RGW": + mcg_obj = OBC(bucket_name) + else: + mcg_obj = MCG() + + logger.info( + f"LIST OP: Listing objects from the bucket {bucket_name} under prefix {iteration_no}" + ) + while True: + s3_list_objects_v2( + mcg_obj, bucket_name, prefix=str(iteration_no), delimiter="/" + ) + + if event.is_set(): logger.info( - f"METADATA OP: Created Noobaa account {nb_account.account_name}" + f"Successfully completed object list operation on bucket {bucket_name} under prefix {iteration_no}" ) + break - for nb_acc in nb_accounts_created: - nb_acc.update_account(new_email=f"new-{nb_acc.email_id}") - logger.info(f"METADATA OP: Updated noobaa account {nb_acc.account_name}") - for nb_acc in nb_accounts_created: - nb_acc.delete_account() - logger.info(f"METADATA OP: Deleted noobaa account {nb_acc.account_name}") +def download_objs_from_bucket(pod_obj, bucket, target_dir, iteration_no, event=None): + """ + Download objects from a bucket back to local directory + + Args: + pod_obj (Pod): Noobaa stress CLI pod object + mcg_obj (MCG): MCG object + bucket_name (str): Name of the bucket + target_dir (str): Target directory to download objects + iteration_no (int): Iteration number or prefix from where should download objects + """ + bucket_type, bucket_obj = bucket + bucket_name = bucket_obj.name + if bucket_type.upper() == "RGW": + mcg_obj = OBC(bucket_name) + else: + mcg_obj = MCG() - # run the above metadata intense ops parallel logger.info( - "---------------------------------Initiating metadata ops---------------------------------" + f"DOWNLOAD OP: Download objects from the bucket {bucket_name} under prefix {iteration_no}" ) - _run_bucket_ops() - _run_object_metadata_ops() - _run_noobaa_account_ops() + while True: + sync_object_directory( + pod_obj, + f"s3://{bucket_name}/{iteration_no}", + target_dir, + mcg_obj, + ) + + if event.is_set(): + logger.info( + f"Successfully completed object download operation on bucket {bucket_name} under prefix {iteration_no}" + ) + break diff --git a/ocs_ci/ocs/bucket_utils.py b/ocs_ci/ocs/bucket_utils.py index 4eede4de26a0..1cec01bc3b69 100644 --- a/ocs_ci/ocs/bucket_utils.py +++ b/ocs_ci/ocs/bucket_utils.py @@ -579,7 +579,7 @@ def download_objects_using_s3cmd( ), "Failed to download objects" -def rm_object_recursive(podobj, target, mcg_obj, option=""): +def rm_object_recursive(podobj, target, mcg_obj, option="", prefix=None): """ Remove bucket objects with --recursive option @@ -592,7 +592,12 @@ def rm_object_recursive(podobj, target, mcg_obj, option=""): option (str): Extra s3 remove command option """ - rm_command = f"rm s3://{target} --recursive {option}" + + rm_command = ( + f"rm s3://{target} --recursive {option}" + if not prefix + else f"rm s3://{target}/{prefix} --recursive {option}" + ) podobj.exec_cmd_on_pod( command=craft_s3_command(rm_command, mcg_obj), out_yaml_format=False, diff --git a/tests/conftest.py b/tests/conftest.py index a42176e8a5c2..628e2ef7c9f9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2843,6 +2843,15 @@ def cleanup(): return stress_cli_pod_obj +@pytest.fixture() +def stress_test_directory_setup(request, nb_stress_cli_pod): + """ + Setup test directories on noobaa stress CLI pod + + """ + return test_directory_setup_fixture(request, nb_stress_cli_pod) + + @pytest.fixture() def test_directory_setup(request, awscli_pod_session): return test_directory_setup_fixture(request, awscli_pod_session) diff --git a/tests/cross_functional/conftest.py b/tests/cross_functional/conftest.py index f1c35ff4160d..59826951ff37 100644 --- a/tests/cross_functional/conftest.py +++ b/tests/cross_functional/conftest.py @@ -1383,6 +1383,7 @@ def factory(): bucket_objects = dict() + logger.info("Creating buckets for stress testing") for type, bucketclass_dict in bucket_configs.items(): if type == "rgw": bucket = rgw_bucket_factory_session(interface="rgw-oc")[0] diff --git a/tests/cross_functional/stress/test_noobaa_under_stress.py b/tests/cross_functional/stress/test_noobaa_under_stress.py index ccee60a75be3..402a9f24e505 100644 --- a/tests/cross_functional/stress/test_noobaa_under_stress.py +++ b/tests/cross_functional/stress/test_noobaa_under_stress.py @@ -1,9 +1,22 @@ -from ocs_ci.helpers.mcg_stress_helper import upload_objs_to_buckets +import logging +import random + +from threading import Event +from concurrent.futures import ThreadPoolExecutor +from ocs_ci.helpers.mcg_stress_helper import ( + upload_objs_to_buckets, + run_noobaa_metadata_intense_ops, + delete_objs_from_bucket, + list_objs_from_bucket, + download_objs_from_bucket, +) + +logger = logging.getLogger(__name__) class TestNoobaaUnderStress: - base_setup_buckets = None + base_setup_buckets = list() def test_noobaa_under_stress( self, @@ -11,12 +24,113 @@ def test_noobaa_under_stress( nb_stress_cli_pod, mcg_obj_session, rgw_obj_session, + stress_test_directory_setup, + bucket_factory, ): + # fetch buckets created for stress testing self.base_setup_buckets = setup_stress_testing_bucket() + # upload objects to the buckets created concurrently upload_objs_to_buckets( mcg_obj_session, nb_stress_cli_pod, self.base_setup_buckets, + iteration_no=0, ) + + # iterate and stress the cluster with object upload + # and other IO operations + total_iterations = 4 + executor = ThreadPoolExecutor(max_workers=5) + futures_obj = list() + for i in range(1, total_iterations): + logger.info(f"Performing Iteration {i} of stressing the cluster") + # buckets = list(self.base_setup_buckets.keys()) + buckets = [ + (type, bucket) for type, bucket in self.base_setup_buckets.items() + ] + + # instantiate event object + event = Event() + + # perform object upload operation + # concurrently + futures_obj.append( + executor.submit( + upload_objs_to_buckets, + mcg_obj_session, + nb_stress_cli_pod, + self.base_setup_buckets, + iteration_no=i, + event=event, + ) + ) + + # perform metadata intense operations + # on randomly selected bucket + bucket = random.choice(buckets) + futures_obj.append( + executor.submit( + run_noobaa_metadata_intense_ops, + mcg_obj_session, + nb_stress_cli_pod, + bucket_factory, + bucket, + iteration_no=i - 1, + event=event, + ) + ) + buckets.remove(bucket) + + # perform object deletion on a + # randomly selected bucket + bucket = random.choice(buckets) + futures_obj.append( + executor.submit( + delete_objs_from_bucket, + nb_stress_cli_pod, + bucket, + iteration_no=i - 1, + event=event, + ) + ) + buckets.remove(bucket) + + # perform object listing on a + # randomly selected bucket + bucket = random.choice(buckets) + futures_obj.append( + executor.submit( + list_objs_from_bucket, + bucket, + iteration_no=i - 1, + event=event, + ) + ) + buckets.remove(bucket) + + # perform object download on + # a randomly selected bucket + bucket = random.choice(buckets) + futures_obj.append( + executor.submit( + download_objs_from_bucket, + nb_stress_cli_pod, + bucket, + stress_test_directory_setup.result_dir, + iteration_no=i - 1, + event=event, + ) + ) + buckets.remove(bucket) + nb_stress_cli_pod.exec_cmd_on_pod( + f"rm -rf {stress_test_directory_setup.result_dir}/" + ) + + # wait until all the object operations are done + logger.info( + "Waiting all the Object upload and IO operations for the current iteration is completed" + ) + for future in futures_obj: + future.result()