From be1c044de032d9786518ebffad37d424af8e98c3 Mon Sep 17 00:00:00 2001 From: Mahesh Shetty Date: Tue, 26 Sep 2023 23:24:59 +0530 Subject: [PATCH 1/4] Entry critieria for MCG system tests Signed-off-by: Mahesh Shetty --- ocs_ci/helpers/e2e_helpers.py | 363 +++++++++++++ ocs_ci/ocs/bucket_utils.py | 127 ++++- ocs_ci/ocs/resources/pod.py | 20 + tests/conftest.py | 39 ++ tests/e2e/conftest.py | 559 ++++++++++++++++++++- tests/e2e/system_test/test_mcg_recovery.py | 20 + 6 files changed, 1116 insertions(+), 12 deletions(-) create mode 100644 ocs_ci/helpers/e2e_helpers.py diff --git a/ocs_ci/helpers/e2e_helpers.py b/ocs_ci/helpers/e2e_helpers.py new file mode 100644 index 00000000000..5342baa7d81 --- /dev/null +++ b/ocs_ci/helpers/e2e_helpers.py @@ -0,0 +1,363 @@ +import logging + +import random +import copy +import re +import time + +import botocore.exceptions as botoexceptions +from ocs_ci.utility.retry import retry +from ocs_ci.ocs.bucket_utils import ( + random_object_round_trip_verification, + write_random_test_objects_to_bucket, + wait_for_cache, + sync_object_directory, + verify_s3_object_integrity, + s3_put_object, + expire_objects_in_bucket, + sample_if_objects_expired, +) +from ocs_ci.ocs.resources.pod import get_rgw_pods, get_pod_logs +from ocs_ci.utility.utils import exec_cmd, run_cmd + + +logger = logging.getLogger(__name__) + + +def create_muliple_types_provider_obcs( + num_of_buckets, bucket_types, cloud_providers, bucket_factory +): + def get_all_combinations_map(providers, bucket_types): + """ + Args: + providers (dict): dictionary representing cloud + providers and the respective config + bucket_types (dict): dictionary representing different + types of bucket and the respective config + Returns: + List: containing all the possible combination of buckets + """ + all_combinations = dict() + + for provider, provider_config in providers.items(): + for bucket_type, type_config in bucket_types.items(): + if provider == "pv" and bucket_type != "data": + provider = random.choice(["aws", "azure"]) + provider_config = providers[provider] + bucketclass = copy.deepcopy(type_config) + + if "backingstore_dict" in bucketclass.keys(): + bucketclass["backingstore_dict"][provider] = [provider_config] + elif "namespace_policy_dict" in bucketclass.keys(): + bucketclass["namespace_policy_dict"]["namespacestore_dict"][ + provider + ] = [provider_config] + all_combinations.update({f"{bucket_type}-{provider}": bucketclass}) + return all_combinations + + all_combination_of_obcs = get_all_combinations_map(cloud_providers, bucket_types) + buckets = list() + num_of_buckets_each = num_of_buckets // len(all_combination_of_obcs.keys()) + buckets_left = num_of_buckets % len(all_combination_of_obcs.keys()) + if num_of_buckets_each != 0: + for combo, combo_config in all_combination_of_obcs.items(): + buckets.extend( + bucket_factory( + interface="OC", + amount=num_of_buckets_each, + bucketclass=combo_config, + ) + ) + + for i in range(0, buckets_left): + buckets.extend( + bucket_factory( + interface="OC", + amount=1, + bucketclass=all_combination_of_obcs[ + list(all_combination_of_obcs.keys())[i] + ], + ) + ) + + return buckets + + +def validate_mcg_bucket_replicaton( + awscli_pod_session, + mcg_obj_session, + source_target_map, + uploaded_objects_dir, + downloaded_obejcts_dir, + event, + run_in_bg=False, + object_amount=5, +): + """ + Validate MCG bucket replication feature + + Args: + awscli_pod_session (Pod): Pod object representing aws-cli pod + mcg_obj_session (MCG): MCG object + source_target_map (Dict): Dictionary consisting of source - target buckets + uploaded_objects_dir (str): directory where uploaded objects are kept + downloaded_obejcts_dir (str): directory where downloaded objects are kept + event (threading.Event()): Event() object + run_in_bg (bool): If True, validation is run in background + object_amount (int): Amounts of objects + + """ + bidi_uploaded_objs_dir_1 = uploaded_objects_dir + "/bidi_1" + bidi_uploaded_objs_dir_2 = uploaded_objects_dir + "/bidi_2" + bidi_downloaded_objs_dir_1 = downloaded_obejcts_dir + "/bidi_1" + bidi_downloaded_objs_dir_2 = downloaded_obejcts_dir + "/bidi_2" + + # Verify replication is working as expected by performing a two-way round-trip object verification + while True: + for first_bucket, second_bucket in source_target_map.items(): + random_object_round_trip_verification( + io_pod=awscli_pod_session, + bucket_name=first_bucket.name, + upload_dir=bidi_uploaded_objs_dir_1, + download_dir=bidi_downloaded_objs_dir_1, + amount=object_amount, + pattern="FirstBiDi", + wait_for_replication=True, + second_bucket_name=second_bucket.name, + mcg_obj=mcg_obj_session, + cleanup=True, + ) + + random_object_round_trip_verification( + io_pod=awscli_pod_session, + bucket_name=second_bucket.name, + upload_dir=bidi_uploaded_objs_dir_2, + download_dir=bidi_downloaded_objs_dir_2, + amount=object_amount, + pattern="SecondBiDi", + wait_for_replication=True, + second_bucket_name=first_bucket.name, + mcg_obj=mcg_obj_session, + cleanup=True, + ) + if event.is_set(): + run_in_bg = False + break + + if not run_in_bg: + logger.info("Verified bi-direction replication successfully") + logger.warning("Stopping bi-direction replication verification") + break + time.sleep(30) + + +def validate_mcg_caching( + awscli_pod_session, + mcg_obj_session, + cld_mgr, + cache_buckets, + uploaded_objects_dir, + downloaded_obejcts_dir, + event, + run_in_bg=False, +): + """ + Validate noobaa caching feature against the cache buckets + + Args: + awscli_pod_session (Pod): Pod object representing aws-cli pod + mcg_obj_session (MCG): MCG object + cld_mgr (cld_mgr): cld_mgr object + cache_buckets (List): List consisting of cache buckets + uploaded_objects_dir (str): directory where uploaded objects are kept + downloaded_obejcts_dir (str): directory where downloaded objects are kept + event (threading.Event()): Event() object + run_in_bg (bool): If True, validation is run in background + + """ + while True: + verified_buckets = list() + for bucket in cache_buckets: + try: + cache_uploaded_objs_dir = uploaded_objects_dir + "/cache" + cache_uploaded_objs_dir_2 = uploaded_objects_dir + "/cache_2" + cache_downloaded_objs_dir = downloaded_obejcts_dir + "/cache" + underlying_bucket_name = bucket.bucketclass.namespacestores[0].uls_name + + # Upload a random object to the bucket + logger.info(f"Uploading to the cache bucket: {bucket.name}") + objs_written_to_cache_bucket = write_random_test_objects_to_bucket( + awscli_pod_session, + bucket.name, + cache_uploaded_objs_dir, + pattern="Cache-", + mcg_obj=mcg_obj_session, + ) + wait_for_cache( + mcg_obj_session, + bucket.name, + objs_written_to_cache_bucket, + timeout=300, + ) + + # Write a random, larger object directly to the underlying storage of the bucket + logger.info( + f"Uploading to the underlying bucket {underlying_bucket_name} directly" + ) + write_random_test_objects_to_bucket( + awscli_pod_session, + underlying_bucket_name, + cache_uploaded_objs_dir_2, + pattern="Cache-", + s3_creds=cld_mgr.aws_client.nss_creds, + bs="2M", + ) + + # Download the object from the cache bucket + sync_object_directory( + awscli_pod_session, + f"s3://{bucket.name}", + cache_downloaded_objs_dir, + mcg_obj_session, + ) + + assert verify_s3_object_integrity( + original_object_path=f"{cache_uploaded_objs_dir}/Cache-0", + result_object_path=f"{cache_downloaded_objs_dir}/Cache-0", + awscli_pod=awscli_pod_session, + ), "The uploaded and downloaded cached objects have different checksums" + + assert ( + verify_s3_object_integrity( + original_object_path=f"{cache_uploaded_objs_dir_2}/Cache-0", + result_object_path=f"{cache_downloaded_objs_dir}/Cache-0", + awscli_pod=awscli_pod_session, + ) + is False + ), "The cached object was replaced by the new one before the TTL has expired" + verified_buckets.append(bucket.name) + if event.is_set(): + run_in_bg = False + break + except Exception: + logger.warning(f"verified so far: {verified_buckets}") + + raise + + if not run_in_bg: + logger.warning("Stopping noobaa caching verification") + break + time.sleep(30) + + +def validate_rgw_kafka_notification(kafka_rgw_dict, event, run_in_bg=False): + """ + Validate kafka notifications for RGW buckets + + Args: + kafka_rgw_dict (Dict): Dict consisting of rgw bucket, + kafka_topic, kafkadrop_host etc + event (threading.Event()): Event() object + run_in_bg (Bool): True if you want to run in the background + + """ + s3_client = kafka_rgw_dict["s3client"] + bucketname = kafka_rgw_dict["kafka_rgw_bucket"] + notify_cmd = kafka_rgw_dict["notify_cmd"] + data = kafka_rgw_dict["data"] + kafkadrop_host = kafka_rgw_dict["kafkadrop_host"] + kafka_topic = kafka_rgw_dict["kafka_topic"] + + while True: + + try: + + def put_object_to_bucket(bucket_name, key, body): + return s3_client.put_object(Bucket=bucket_name, Key=key, Body=body) + + except botoexceptions.ClientError: + logger.warning("s3 put object timedout but ignoring as of now") + + assert put_object_to_bucket( + bucketname, "key-1", data + ), "Failed: Put object: key-1" + exec_cmd(notify_cmd) + + # Validate rgw logs notification are sent + # No errors are seen + pattern = "ERROR: failed to create push endpoint" + rgw_pod_obj = get_rgw_pods() + rgw_log = get_pod_logs(pod_name=rgw_pod_obj[0].name, container="rgw") + assert re.search(pattern=pattern, string=rgw_log) is None, ( + f"Error: {pattern} msg found in the rgw logs." + f"Validate {pattern} found on rgw logs and also " + f"rgw bucket notification is working correctly" + ) + assert put_object_to_bucket( + bucketname, "key-2", data + ), "Failed: Put object: key-2" + exec_cmd(notify_cmd) + + # Validate message are received Kafka side using curl command + # A temporary way to check from Kafka side, need to check from UI + @retry(Exception, tries=5, delay=5) + def validate_kafa_for_message(): + curl_command = ( + f"curl -X GET {kafkadrop_host}/topic/{kafka_topic.name} " + "-H 'content-type: application/vnd.kafka.json.v2+json'" + ) + json_output = run_cmd(cmd=curl_command) + # logger.info("Json output:" f"{json_output}") + new_string = json_output.split() + messages = new_string[new_string.index("messages") + 1] + logger.info("Messages:" + str(messages)) + if messages.find("1") == -1: + raise Exception( + "Error: Messages are not recieved from Kafka side." + "RGW bucket notification is not working as expected." + ) + + validate_kafa_for_message() + + if event.is_set() or not run_in_bg: + logger.warning("Stopping kafka rgw notification verification") + break + time.sleep(30) + + +def validate_mcg_object_expiration( + mcg_obj, buckets, event, run_in_bg=False, object_amount=5, prefix="" +): + """ + Validates objects expiration for MCG buckets + + Args: + mcg_obj (MCG): MCG object + buckets (List): List of MCG buckets + event (threading.Event()): Event() object + run_in_bg (Bool): True if wants to run in background + object_amount (Int): Amount of objects + prefix (str): Any prefix used for objects + + """ + while True: + for bucket in buckets: + + for i in range(object_amount): + s3_put_object( + mcg_obj, bucket.name, f"{prefix}/obj-key-{i}", "Some random data" + ) + expire_objects_in_bucket(bucket.name) + sample_if_objects_expired(mcg_obj, bucket.name) + if event.is_set(): + run_in_bg = False + break + + if not run_in_bg: + logger.warning("Stopping MCG object expiration verification") + break + time.sleep(30) + + +def validate_mcg_nsfs_feature(): + logger.info("This is not implemented") diff --git a/ocs_ci/ocs/bucket_utils.py b/ocs_ci/ocs/bucket_utils.py index 2f54b93213d..83f8f08bc4e 100644 --- a/ocs_ci/ocs/bucket_utils.py +++ b/ocs_ci/ocs/bucket_utils.py @@ -6,6 +6,7 @@ import os import shlex from uuid import uuid4 +from datetime import date import boto3 from botocore.handlers import disable_signing @@ -1343,18 +1344,20 @@ def check_cached_objects_by_name(mcg_obj, bucket_name, expected_objects_names=No list_objects_res = [name["key"] for name in res.get("reply").get("objects")] if not expected_objects_names: expected_objects_names = [] - if set(expected_objects_names) == set(list_objects_res): - logger.info("Files cached as expected") - return True - logger.warning( - "Objects did not cache properly, \n" - f"Expected: [{expected_objects_names}]\n" - f"Cached: [{list_objects_res}]" - ) - return False + # if set(expected_objects_names) == set(list_objects_res): + for obj in expected_objects_names: + if obj not in list_objects_res: + logger.warning( + "Objects did not cache properly, \n" + f"Expected: [{expected_objects_names}]\n" + f"Cached: [{list_objects_res}]" + ) + return False + logger.info("Files cached as expected") + return True -def wait_for_cache(mcg_obj, bucket_name, expected_objects_names=None): +def wait_for_cache(mcg_obj, bucket_name, expected_objects_names=None, timeout=60): """ wait for existing cache bucket to cache all required objects @@ -1365,7 +1368,7 @@ def wait_for_cache(mcg_obj, bucket_name, expected_objects_names=None): """ sample = TimeoutSampler( - timeout=60, + timeout=timeout, sleep=10, func=check_cached_objects_by_name, mcg_obj=mcg_obj, @@ -1912,3 +1915,105 @@ def create_aws_bs_using_cli( f"--target-bucket {uls_name} --region {region}", use_yes=True, ) + + +def change_expiration_query_interval(new_interval): + """ + Change how often noobaa should check for object expiration + By default it will be 8 hours + Args: + new_interval (int): New interval in minutes + """ + + from ocs_ci.ocs.resources.pod import ( + get_noobaa_core_pod, + wait_for_pods_to_be_running, + ) + + nb_core_pod = get_noobaa_core_pod() + new_interval = new_interval * 60 * 1000 + params = ( + '[{"op": "add", "path": "/spec/template/spec/containers/0/env/-", ' + f'"value": {{ "name": "CONFIG_JS_LIFECYCLE_INTERVAL", "value": "{new_interval}" }}}}]' + ) + OCP(kind="statefulset", namespace=constants.OPENSHIFT_STORAGE_NAMESPACE).patch( + resource_name=constants.NOOBAA_CORE_STATEFULSET, + params=params, + format_type="json", + ) + logger.info(f"Updated the expiration query interval to {new_interval} ms") + nb_core_pod.delete() + wait_for_pods_to_be_running(pod_names=[nb_core_pod.name], timeout=300) + + +def expire_objects_in_bucket(bucket_name, new_expire_interval=None): + """ + Manually expire the objects in a bucket + Args: + bucket_name (str): Name of the bucket + new_expire_interval (int): New expiration interval + """ + + from ocs_ci.ocs.resources.pod import ( + get_noobaa_db_pod, + ) + + if new_expire_interval is not None and isinstance(new_expire_interval, int): + change_expiration_query_interval(new_expire_interval) + + creation_time = f"{date.today().year-1}-06-25T14:18:28.712Z" + nb_db_pod = get_noobaa_db_pod() + query = ( + 'UPDATE objectmds SET "data"=jsonb_set("data", \'{create_time}\',' + f"'\\\"{creation_time}\\\"') WHERE data ->> 'bucket' IN " + f"( SELECT _id FROM buckets WHERE data ->> 'name' = '{bucket_name}' );" + ) + command = f'psql -h 127.0.0.1 -p 5432 -U postgres -d nbcore -c "{query}"' + + nb_db_pod.exec_cmd_on_pod(command=command, out_yaml_format=False) + + +def check_if_objects_expired(mcg_obj, bucket_name, prefix=""): + """ + Checks if objects in the bucket is expired + + Args: + mcg_obj(MCG): MCG object + bucket_name(str): Name of the bucket + prefix(str): Objects prefix + + Returns: + Bool: True if objects are expired, else False + + """ + + response = s3_list_objects_v2( + mcg_obj, bucketname=bucket_name, prefix=prefix, delimiter="/" + ) + if response["KeyCount"] != 0: + return False + return True + + +def sample_if_objects_expired(mcg_obj, bucket_name, prefix="", timeout=600, sleep=30): + """ + Sample if the objects in a bucket expired using + TimeoutSampler + + """ + message = ( + f"Objects in bucket with prefix {prefix} " + if prefix != "" + else "Objects in the bucket " + ) + sampler = TimeoutSampler( + timeout=timeout, + sleep=sleep, + func=check_if_objects_expired, + mcg_obj=mcg_obj, + bucket_name=bucket_name, + prefix=prefix, + ) + + assert sampler.wait_for_func_status(result=True), f"{message} are not expired" + logger.info(f"{message} are expired") diff --git a/ocs_ci/ocs/resources/pod.py b/ocs_ci/ocs/resources/pod.py index 47b4ffd8e19..b22ececa589 100644 --- a/ocs_ci/ocs/resources/pod.py +++ b/ocs_ci/ocs/resources/pod.py @@ -893,6 +893,26 @@ def get_noobaa_operator_pod( return noobaa_operator_pod +def get_noobaa_db_pod(): + """ + Get noobaa db pod obj + Returns: + Pod object: Noobaa db pod object + """ + if version.get_semantic_ocs_version_from_config() > version.VERSION_4_6: + nb_db = get_pods_having_label( + label=constants.NOOBAA_DB_LABEL_47_AND_ABOVE, + namespace=config.ENV_DATA["cluster_namespace"], + ) + else: + nb_db = get_pods_having_label( + label=constants.NOOBAA_DB_LABEL_46_AND_UNDER, + namespace=config.ENV_DATA["cluster_namespace"], + ) + nb_db_pod = Pod(**nb_db[0]) + return nb_db_pod + + def get_noobaa_core_pod(): """ Fetches Noobaa core pod details diff --git a/tests/conftest.py b/tests/conftest.py index 9e45c157e63..7b8cbb3d2a3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -35,7 +35,9 @@ from ocs_ci.ocs.bucket_utils import ( craft_s3_command, put_bucket_policy, + change_expiration_query_interval, ) + from ocs_ci.ocs.dr.dr_workload import BusyBox, BusyBox_AppSet from ocs_ci.ocs.exceptions import ( CommandFailed, @@ -92,8 +94,12 @@ get_all_pods, verify_data_integrity_for_multi_pvc_objs, get_noobaa_pods, +<<<<<<< HEAD get_pod_count, wait_for_pods_by_label_count, +======= + get_noobaa_core_pod, +>>>>>>> 3ef4de76 (Entry critieria for MCG system tests) ) from ocs_ci.ocs.resources.pvc import PVC, create_restore_pvc from ocs_ci.ocs.version import get_ocs_version, get_ocp_version_dict, report_ocs_version @@ -6885,3 +6891,36 @@ def setup_logwriter_rbd_workload_factory(request, project_factory, teardown_fact ) return logwriter_sts + + +@pytest.fixture() +def change_noobaa_lifecycle_interval(request): + nb_core_pod = get_noobaa_core_pod() + interval_changed = False + + def factory(interval): + nonlocal interval_changed + interval_changed = True + change_expiration_query_interval(new_interval=interval) + + def finalizer(): + if interval_changed: + params = ( + f'[{{"op": "test", "path": "/spec/template/spec/containers/0/env/20/name",' + f'"value": "CONFIG_JS_LIFECYCLE_INTERVAL"}},' + f'{{"op": "remove", "path": "/spec/template/spec/containers/0/env/20"}}]' + ) + + OCP( + kind="statefulset", namespace=constants.OPENSHIFT_STORAGE_NAMESPACE + ).patch( + resource_name=constants.NOOBAA_CORE_STATEFULSET, + params=params, + format_type="json", + ) + nb_core_pod.delete() + wait_for_pods_to_be_running(pod_names=[nb_core_pod.name], timeout=300) + log.info("Switched back to default lifecycle interval") + + request.addfinalizer(finalizer) + return factory diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index adb1cd7d754..9d31854746e 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -1,10 +1,24 @@ import os import time import logging + +import boto3 import pytest + +from concurrent.futures import ThreadPoolExecutor +from threading import Event from ocs_ci.framework import config +from ocs_ci.helpers.e2e_helpers import ( + create_muliple_types_provider_obcs, + validate_mcg_bucket_replicaton, + validate_mcg_caching, + validate_mcg_object_expiration, + validate_rgw_kafka_notification, + validate_mcg_nsfs_feature, +) from ocs_ci.ocs import constants +from ocs_ci.ocs.amq import AMQ from ocs_ci.ocs.bucket_utils import ( compare_object_checksums_between_bucket_and_local, compare_directory, @@ -14,20 +28,27 @@ wait_for_cache, write_random_test_objects_to_bucket, s3_list_objects_v1, + retrieve_verification_mode, ) from ocs_ci.ocs.benchmark_operator_fio import BenchmarkOperatorFIO from ocs_ci.ocs.ocp import OCP from ocs_ci.ocs.resources import pod, pvc +from ocs_ci.ocs.resources.objectbucket import OBC from ocs_ci.ocs.resources.ocs import OCS -from ocs_ci.ocs.resources.pod import Pod, get_pods_having_label +from ocs_ci.ocs.resources.pod import ( + Pod, + get_pods_having_label, +) from ocs_ci.ocs.resources.deployment import Deployment from ocs_ci.ocs.exceptions import CommandFailed from ocs_ci.helpers.helpers import ( wait_for_resource_state, modify_statefulset_replica_count, validate_pv_delete, + default_storage_class, ) +from ocs_ci.utility.utils import clone_notify logger = logging.getLogger(__name__) @@ -725,3 +746,539 @@ def pytest_collection_modifyitems(items): f" since it requires Managed service platform" ) items.remove(item) + + +@pytest.fixture() +def setup_mcg_replication_feature_buckets(request, bucket_factory): + """ + This fixture does the setup for validating MCG replication + feature + + """ + + def factory(number_of_buckets, bucket_types, cloud_providers): + """ + factory function implementing the fixture + + Args: + number_of_buckets (int): number of buckets + bucket_types (dict): dictionary mapping bucket types and + configuration + cloud_providers (dict): dictionary mapping cloud provider + and configuration + + Returns: + Dict: source bucket to target bucket map + + """ + all_buckets = create_muliple_types_provider_obcs( + number_of_buckets, bucket_types, cloud_providers, bucket_factory + ) + + if len(all_buckets) % 2 != 0: + all_buckets[len(all_buckets) - 1].delete() + + source_target_map = dict() + index = 0 + for i in range(len(all_buckets) // 2): + source_target_map[all_buckets[index]] = all_buckets[index + 1] + patch_replication_policy_to_bucket( + all_buckets[index].name, + "basic-replication-rule-1", + all_buckets[index + 1].name, + ) + patch_replication_policy_to_bucket( + all_buckets[index + 1].name, + "basic-replication-rule-2", + all_buckets[index].name, + ) + + index += 2 + + return all_buckets, source_target_map + + return factory + + +@pytest.fixture() +def setup_mcg_caching_feature_buckets(request, bucket_factory): + """ + This fixture does the setup for Noobaa cache buckets validation + + """ + + def factory(number_of_buckets, bucket_types, cloud_providers): + """ + factory function implementing fixture + + Args: + number_of_buckets (int): number of buckets + bucket_types (dict): dictionary mapping bucket types and + configuration + cloud_providers (dict): dictionary mapping cloud provider + and configuration + + Returns: + List: List of cache buckets + + """ + cache_type = dict() + cache_type["cache"] = bucket_types["cache"] + all_buckets = create_muliple_types_provider_obcs( + number_of_buckets, cache_type, cloud_providers, bucket_factory + ) + return all_buckets + + return factory + + +@pytest.fixture() +def setup_mcg_expiration_feature_buckets( + request, bucket_factory, mcg_obj, change_noobaa_lifecycle_interval +): + """ + This fixture does the setup for validating MCG replication + feature + + """ + + def factory(number_of_buckets, bucket_types, cloud_providers): + """ + Factory function implementing the fixture + + Args: + number_of_buckets (int): number of buckets + bucket_types (dict): dictionary mapping bucket types and + configuration + cloud_providers (dict): dictionary mapping cloud provider + and configuration + + Returns: + List: list of buckets + + """ + type = dict() + type["data"] = bucket_types["data"] + + logger.info("Changed noobaa lifecycle interval to 2 minutes") + change_noobaa_lifecycle_interval(interval=2) + expiration_rule = { + "Rules": [ + { + "Expiration": { + "Days": 1, + "ExpiredObjectDeleteMarker": False, + }, + "Filter": {"Prefix": ""}, + "ID": "data-expire", + "Status": "Enabled", + } + ] + } + + all_buckets = create_muliple_types_provider_obcs( + number_of_buckets, type, cloud_providers, bucket_factory + ) + for bucket in all_buckets: + mcg_obj.s3_client.put_bucket_lifecycle_configuration( + Bucket=bucket.name, LifecycleConfiguration=expiration_rule + ) + + return all_buckets + + return factory + + +@pytest.fixture() +def setup_mcg_nsfs_feature_buckets(request): + def factory(): + pass + + +@pytest.fixture() +def setup_rgw_kafka_notification(request, rgw_bucket_factory, rgw_obj): + """ + This fixture does the setup for validating RGW kafka + notification feature + + """ + # setup AMQ + amq = AMQ() + + kafka_topic = kafkadrop_pod = kafkadrop_svc = kafkadrop_route = None + + # get storageclass + storage_class = default_storage_class(interface_type=constants.CEPHBLOCKPOOL) + + # setup AMQ cluster + amq.setup_amq_cluster(storage_class.name) + + # create kafka topic + kafka_topic = amq.create_kafka_topic() + + # create kafkadrop pod + ( + kafkadrop_pod, + kafkadrop_svc, + kafkadrop_route, + ) = amq.create_kafkadrop() + + def factory(): + """ + Factory function implementing the fixture + + Returns: + Dict: This consists of mapping of rgw buckets, + kafka_topic, kafkadrop_host objects etc + + """ + # get the kafkadrop route + kafkadrop_host = kafkadrop_route.get().get("spec").get("host") + + # create the bucket + bucketname = rgw_bucket_factory(amount=1, interface="RGW-OC")[0].name + + # get RGW credentials + rgw_endpoint, access_key, secret_key = rgw_obj.get_credentials() + + # clone notify repo + notify_path = clone_notify() + + # initilize to upload data + data = "A random string data to write on created rgw bucket" + obc_obj = OBC(bucketname) + s3_resource = boto3.resource( + "s3", + verify=retrieve_verification_mode(), + endpoint_url=rgw_endpoint, + aws_access_key_id=obc_obj.access_key_id, + aws_secret_access_key=obc_obj.access_key, + ) + s3_client = s3_resource.meta.client + + # Initialize notify command to run + notify_cmd = ( + f"python {notify_path} -e {rgw_endpoint} -a {obc_obj.access_key_id} " + f"-s {obc_obj.access_key} -b {bucketname} " + f"-ke {constants.KAFKA_ENDPOINT} -t {kafka_topic.name}" + ) + + kafka_rgw_dict = { + "s3client": s3_client, + "kafka_rgw_bucket": bucketname, + "notify_cmd": notify_cmd, + "data": data, + "kafkadrop_host": kafkadrop_host, + "kafka_topic": kafka_topic, + } + + return kafka_rgw_dict + + def finalizer(): + if kafka_topic: + kafka_topic.delete() + if kafkadrop_pod: + kafkadrop_pod.delete() + if kafkadrop_svc: + kafkadrop_svc.delete() + if kafkadrop_route: + kafkadrop_route.delete() + + amq.cleanup() + + request.addfinalizer(finalizer) + return factory + + +@pytest.fixture() +def validate_mcg_bg_features( + request, awscli_pod_session, mcg_obj_session, test_directory_setup, cld_mgr +): + """ + This fixture validates specified features provided neccesary + feature setup map. It has option to run the validation to run + in the background while not blocking the execution of rest of + the code + + """ + + def factory( + feature_setup_map, run_in_bg=False, skip_any_features=None, object_amount=5 + ): + """ + factory functon implementing the fixture + + Args: + feature_setup_map (Dict): This has feature to setup of buckets map + consists of buckets, executor, event objects + run_in_bg (Bool): True if want to run the validation in background + skip_any_features (List): List consisting of features that dont need + to be validated + object_amount (int): Number of objects that you wanna use while doing + the validation + + Returns: + Event(): this is a threading.Event() object used to send signals to the + threads to stop + List: List consisting of all the futures objects, ie. threads + + """ + uploaded_objects_dir = test_directory_setup.origin_dir + downloaded_obejcts_dir = test_directory_setup.result_dir + futures_obj = list() + + # if any already running background validation threads + # then stop those threads + if feature_setup_map["executor"] is not None: + feature_setup_map["executor"]["event"].set() + for t in feature_setup_map["executor"]["threads"]: + t.result() + + event = Event() + executor = ThreadPoolExecutor( + max_workers=5 - len(skip_any_features) + if skip_any_features is not None + else 5 + ) + skip_any_features = list() if skip_any_features is None else skip_any_features + + if "replication" not in skip_any_features: + validate_replication = executor.submit( + validate_mcg_bucket_replicaton, + awscli_pod_session, + mcg_obj_session, + feature_setup_map["replication"], + uploaded_objects_dir, + downloaded_obejcts_dir, + event, + run_in_bg=run_in_bg, + object_amount=object_amount, + ) + futures_obj.append(validate_replication) + + if "caching" not in skip_any_features: + validate_caching = executor.submit( + validate_mcg_caching, + awscli_pod_session, + mcg_obj_session, + cld_mgr, + feature_setup_map["caching"], + uploaded_objects_dir, + downloaded_obejcts_dir, + event, + run_in_bg=run_in_bg, + ) + futures_obj.append(validate_caching) + + if "expiration" not in skip_any_features: + validate_expiration = executor.submit( + validate_mcg_object_expiration, + mcg_obj_session, + feature_setup_map["expiration"], + event, + run_in_bg=run_in_bg, + object_amount=object_amount, + prefix="", + ) + futures_obj.append(validate_expiration) + + if "rgw kafka" not in skip_any_features: + validate_rgw_kafka = executor.submit( + validate_rgw_kafka_notification, + feature_setup_map["rgw kafka"], + event, + run_in_bg=run_in_bg, + ) + futures_obj.append(validate_rgw_kafka) + + if "nsfs" not in skip_any_features: + validate_nsfs = executor.submit( + validate_mcg_nsfs_feature, + ) + futures_obj.append(validate_nsfs) + + # if not run in background we wait until the + # threads are finsihed executing, ie. single iteration + if not run_in_bg: + for t in futures_obj: + t.result() + event = None + + return event, futures_obj + + return factory + + +@pytest.fixture() +def setup_mcg_bg_features( + request, + test_directory_setup, + awscli_pod_session, + mcg_obj_session, + setup_mcg_replication_feature_buckets, + setup_mcg_caching_feature_buckets, + setup_mcg_nsfs_feature_buckets, + setup_mcg_expiration_feature_buckets, + # setup_rgw_kafka_notification, + validate_mcg_bg_features, +): + """ + Fixture to setup MCG features buckets, run IOs, validate IOs + + 1. Bucket replication + 2. Noobaa caching + 3. Object expiration + 4. MCG NSFS + 5. RGW kafka notification + + """ + + def factory( + num_of_buckets=10, + object_amount=5, + is_disruptive=True, + skip_any_type=None, + skip_any_provider=None, + skip_any_features=None, + ): + """ + Args: + num_of_buckets(int): Number of buckets for each MCG features + is_disruptive(bool): Is the test calling this has disruptive flow? + skip_any_type(list): If you want to skip any types of OBCs + skip_any_provider(list): If you want to skip any cloud provider + skip_any_features(list): If you want to skip any MCG features + + Returns: + Dict: Representing all the buckets created for the respective features, + executor and event objects + + """ + + bucket_types = { + "data": { + "interface": "OC", + "backingstore_dict": {}, + }, + "namespace": { + "interface": "OC", + "namespace_policy_dict": { + "type": "Single", + "namespacestore_dict": {}, + }, + }, + "cache": { + "interface": "OC", + "namespace_policy_dict": { + "type": "Cache", + "ttl": 300000, + "namespacestore_dict": {}, + }, + "placement_policy": { + "tiers": [ + {"backingStores": [constants.DEFAULT_NOOBAA_BACKINGSTORE]} + ] + }, + }, + } + + # skip if any bucket types one wants to skip + if skip_any_type is not None: + for type in skip_any_type: + if type not in bucket_types.keys(): + logger.error( + f"Bucket type {type} you asked to skip is not valid type " + f"and valid are {list(bucket_types.keys())}" + ) + else: + bucket_types.pop(type) + + cloud_providers = { + "aws": (1, "eu-central-1"), + "azure": (1, None), + "pv": ( + 1, + constants.MIN_PV_BACKINGSTORE_SIZE_IN_GB, + "ocs-storagecluster-ceph-rbd", + ), + } + + # skip any cloud providers if one wants to skip + if skip_any_provider is not None: + for provider in skip_any_provider: + if provider not in cloud_providers.keys(): + logger.error( + f"Bucket type {provider} you asked to skip is not valid type " + f"and valid are {list(cloud_providers.keys())}" + ) + else: + bucket_types.pop(provider) + + all_buckets = list() + feature_setup_map = dict() + feature_setup_map["executor"] = None + + # skip any features if one wants to skip + features = ["replication", "caching", "expiration", "nsfs", "rgw kafka"] + assert isinstance(skip_any_features, list) and set(skip_any_features).issubset( + set(features) + ), f"Features asked to skip either not present or you havent provided through a list, valid: {features}" + + if "replication" not in skip_any_features: + buckets, source_target_map = setup_mcg_replication_feature_buckets( + num_of_buckets, bucket_types, cloud_providers + ) + all_buckets.extend(buckets) + feature_setup_map["replication"] = source_target_map + + if "caching" not in skip_any_features: + cache_buckets = setup_mcg_caching_feature_buckets( + num_of_buckets, bucket_types, cloud_providers + ) + all_buckets.extend(cache_buckets) + feature_setup_map["caching"] = cache_buckets + + if "expiration" not in skip_any_features: + buckets_with_expiration_policy = setup_mcg_expiration_feature_buckets( + num_of_buckets, bucket_types, cloud_providers + ) + all_buckets.extend(buckets_with_expiration_policy) + feature_setup_map["expiration"] = buckets_with_expiration_policy + + if "nsfs" not in skip_any_features: + setup_mcg_nsfs_feature_buckets() + feature_setup_map["nsfs"] = None + + # if "rgw kafka" not in skip_any_features: + # kafka_rgw_dict = setup_rgw_kafka_notification() + # all_buckets.extend([OBC(kafka_rgw_dict["kafka_rgw_bucket"])]) + # feature_setup_map["rgw kafka"] = kafka_rgw_dict + + uploaded_objects_dir = test_directory_setup.origin_dir + downloaded_obejcts_dir = test_directory_setup.result_dir + + for count, bucket in enumerate(all_buckets): + assert random_object_round_trip_verification( + io_pod=awscli_pod_session, + bucket_name=bucket.name, + upload_dir=uploaded_objects_dir + f"Bucket{count}", + download_dir=downloaded_obejcts_dir + f"Bucket{count}", + amount=object_amount, + pattern="Random_object", + mcg_obj=mcg_obj_session, + cleanup=True, + ), "Some or all written objects were not found in the list of downloaded objects" + logger.info("Successful object round trip verification") + + event, threads = validate_mcg_bg_features( + feature_setup_map, + run_in_bg=not is_disruptive, + skip_any_features=skip_any_features, + object_amount=object_amount, + ) + feature_setup_map["executor"] = dict() + feature_setup_map["executor"]["event"] = event + feature_setup_map["executor"]["threads"] = threads + return feature_setup_map + + return factory diff --git a/tests/e2e/system_test/test_mcg_recovery.py b/tests/e2e/system_test/test_mcg_recovery.py index 035a107c857..4149e478fab 100644 --- a/tests/e2e/system_test/test_mcg_recovery.py +++ b/tests/e2e/system_test/test_mcg_recovery.py @@ -42,3 +42,23 @@ def test_mcg_db_backup_recovery( noobaa_db_backup_and_recovery(snapshot_factory=snapshot_factory) verify_mcg_system_recovery(mcg_sys_dict) + + def test_sample(self, setup_mcg_bg_features): + + feature_setup_map = setup_mcg_bg_features( + num_of_buckets=10, + is_disruptive=False, + skip_any_features=["caching", "nsfs", "rgw kafka"], + ) + + import time + + log.error("Waiting for 5 mins") + time.sleep(300) + + feature_setup_map["executor"]["event"].set() + log.error("asked the background process to stop executing") + for th in feature_setup_map["executor"]["threads"]: + th.result() + + log.error("Done executing") From ce8ed30be66fdbb0debbbad36921e24ce9d19673 Mon Sep 17 00:00:00 2001 From: Mahesh Shetty Date: Tue, 3 Oct 2023 14:48:13 +0530 Subject: [PATCH 2/4] test this setup using test_mcg_db_backup_recovery test Signed-off-by: Mahesh Shetty --- tests/e2e/conftest.py | 6 ++-- tests/e2e/system_test/test_mcg_recovery.py | 34 ++++++++++------------ 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 9d31854746e..63e0d35e137 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -1029,7 +1029,7 @@ def factory( # if any already running background validation threads # then stop those threads - if feature_setup_map["executor"] is not None: + if feature_setup_map["executor"]["event"] is not None: feature_setup_map["executor"]["event"].set() for t in feature_setup_map["executor"]["threads"]: t.result() @@ -1216,7 +1216,8 @@ def factory( all_buckets = list() feature_setup_map = dict() - feature_setup_map["executor"] = None + feature_setup_map["executor"] = dict() + feature_setup_map["executor"]["event"] = None # skip any features if one wants to skip features = ["replication", "caching", "expiration", "nsfs", "rgw kafka"] @@ -1276,7 +1277,6 @@ def factory( skip_any_features=skip_any_features, object_amount=object_amount, ) - feature_setup_map["executor"] = dict() feature_setup_map["executor"]["event"] = event feature_setup_map["executor"]["threads"] = threads return feature_setup_map diff --git a/tests/e2e/system_test/test_mcg_recovery.py b/tests/e2e/system_test/test_mcg_recovery.py index 4149e478fab..e38f52cb0b9 100644 --- a/tests/e2e/system_test/test_mcg_recovery.py +++ b/tests/e2e/system_test/test_mcg_recovery.py @@ -30,35 +30,33 @@ class TestMCGRecovery(E2ETest): ) def test_mcg_db_backup_recovery( self, - setup_mcg_system, + setup_mcg_bg_features, bucket_amount, object_amount, - verify_mcg_system_recovery, snapshot_factory, noobaa_db_backup_and_recovery, + validate_mcg_bg_features, ): - mcg_sys_dict = setup_mcg_system(bucket_amount, object_amount) - - noobaa_db_backup_and_recovery(snapshot_factory=snapshot_factory) - - verify_mcg_system_recovery(mcg_sys_dict) - - def test_sample(self, setup_mcg_bg_features): feature_setup_map = setup_mcg_bg_features( - num_of_buckets=10, - is_disruptive=False, + num_of_buckets=bucket_amount, + object_amount=object_amount, + is_disruptive=True, skip_any_features=["caching", "nsfs", "rgw kafka"], ) + noobaa_db_backup_and_recovery(snapshot_factory=snapshot_factory) import time - log.error("Waiting for 5 mins") - time.sleep(300) + time.sleep(60) + event, threads = validate_mcg_bg_features( + feature_setup_map, + run_in_bg=False, + skip_any_features=["caching", "nsfs", "rgw kafka"], + object_amount=object_amount, + ) - feature_setup_map["executor"]["event"].set() - log.error("asked the background process to stop executing") - for th in feature_setup_map["executor"]["threads"]: + event.set() + for th in threads: th.result() - - log.error("Done executing") + log.info("No issues seen with the MCG bg feature validation") From a02260b18a984d2729506272452ec0a7f0e4830c Mon Sep 17 00:00:00 2001 From: Mahesh Shetty Date: Wed, 4 Oct 2023 11:06:50 +0530 Subject: [PATCH 3/4] objects should be unique identifiers and data every time validation is called Signed-off-by: Mahesh Shetty --- ocs_ci/helpers/e2e_helpers.py | 168 ++++++++++++--------- ocs_ci/ocs/bucket_utils.py | 43 +----- ocs_ci/ocs/constants.py | 2 + ocs_ci/ocs/resources/pod.py | 14 +- tests/conftest.py | 44 ++---- tests/e2e/conftest.py | 9 +- tests/e2e/system_test/test_mcg_recovery.py | 16 +- 7 files changed, 133 insertions(+), 163 deletions(-) diff --git a/ocs_ci/helpers/e2e_helpers.py b/ocs_ci/helpers/e2e_helpers.py index 5342baa7d81..55723cbb04a 100644 --- a/ocs_ci/helpers/e2e_helpers.py +++ b/ocs_ci/helpers/e2e_helpers.py @@ -5,7 +5,7 @@ import re import time -import botocore.exceptions as botoexceptions +from uuid import uuid4 from ocs_ci.utility.retry import retry from ocs_ci.ocs.bucket_utils import ( random_object_round_trip_verification, @@ -27,8 +27,27 @@ def create_muliple_types_provider_obcs( num_of_buckets, bucket_types, cloud_providers, bucket_factory ): + """ + This function creates valid OBCs of different cloud providers + and bucket types + + Args: + num_of_buckets (int): Number of buckets + bucket_types (dict): Dict representing mapping between + bucket type and relevant configuration + cloud_providers (dict): Dict representing mapping between + cloud providers and relevant configuration + bucket_factory (fixture): bucket_factory fixture method + + Returns: + List: list of created buckets + + """ + def get_all_combinations_map(providers, bucket_types): """ + Create valid combination of cloud-providers and bucket-types + Args: providers (dict): dictionary representing cloud providers and the respective config @@ -36,6 +55,7 @@ def get_all_combinations_map(providers, bucket_types): types of bucket and the respective config Returns: List: containing all the possible combination of buckets + """ all_combinations = dict() @@ -69,13 +89,13 @@ def get_all_combinations_map(providers, bucket_types): ) ) - for i in range(0, buckets_left): + for index in range(0, buckets_left): buckets.extend( bucket_factory( interface="OC", amount=1, bucketclass=all_combination_of_obcs[ - list(all_combination_of_obcs.keys())[i] + list(all_combination_of_obcs.keys())[index] ], ) ) @@ -121,11 +141,12 @@ def validate_mcg_bucket_replicaton( upload_dir=bidi_uploaded_objs_dir_1, download_dir=bidi_downloaded_objs_dir_1, amount=object_amount, - pattern="FirstBiDi", + pattern=f"FirstBiDi-{uuid4().hex}", wait_for_replication=True, second_bucket_name=second_bucket.name, mcg_obj=mcg_obj_session, cleanup=True, + timeout=1200, ) random_object_round_trip_verification( @@ -134,11 +155,12 @@ def validate_mcg_bucket_replicaton( upload_dir=bidi_uploaded_objs_dir_2, download_dir=bidi_downloaded_objs_dir_2, amount=object_amount, - pattern="SecondBiDi", + pattern=f"SecondBiDi-{uuid4().hex}", wait_for_replication=True, second_bucket_name=first_bucket.name, mcg_obj=mcg_obj_session, cleanup=True, + timeout=1200, ) if event.is_set(): run_in_bg = False @@ -176,73 +198,70 @@ def validate_mcg_caching( """ while True: - verified_buckets = list() for bucket in cache_buckets: - try: - cache_uploaded_objs_dir = uploaded_objects_dir + "/cache" - cache_uploaded_objs_dir_2 = uploaded_objects_dir + "/cache_2" - cache_downloaded_objs_dir = downloaded_obejcts_dir + "/cache" - underlying_bucket_name = bucket.bucketclass.namespacestores[0].uls_name - - # Upload a random object to the bucket - logger.info(f"Uploading to the cache bucket: {bucket.name}") - objs_written_to_cache_bucket = write_random_test_objects_to_bucket( - awscli_pod_session, - bucket.name, - cache_uploaded_objs_dir, - pattern="Cache-", - mcg_obj=mcg_obj_session, - ) - wait_for_cache( - mcg_obj_session, - bucket.name, - objs_written_to_cache_bucket, - timeout=300, - ) + cache_uploaded_objs_dir = uploaded_objects_dir + "/cache" + cache_uploaded_objs_dir_2 = uploaded_objects_dir + "/cache_2" + cache_downloaded_objs_dir = downloaded_obejcts_dir + "/cache" + underlying_bucket_name = bucket.bucketclass.namespacestores[0].uls_name + + # Upload a random object to the bucket + logger.info(f"Uploading to the cache bucket: {bucket.name}") + obj_name = f"Cache-{uuid4().hex}" + objs_written_to_cache_bucket = write_random_test_objects_to_bucket( + awscli_pod_session, + bucket.name, + cache_uploaded_objs_dir, + pattern=obj_name, + mcg_obj=mcg_obj_session, + ) + wait_for_cache( + mcg_obj_session, + bucket.name, + objs_written_to_cache_bucket, + timeout=300, + ) - # Write a random, larger object directly to the underlying storage of the bucket - logger.info( - f"Uploading to the underlying bucket {underlying_bucket_name} directly" - ) - write_random_test_objects_to_bucket( - awscli_pod_session, - underlying_bucket_name, - cache_uploaded_objs_dir_2, - pattern="Cache-", - s3_creds=cld_mgr.aws_client.nss_creds, - bs="2M", - ) + # Write a random, larger object directly to the underlying storage of the bucket + logger.info( + f"Uploading to the underlying bucket {underlying_bucket_name} directly" + ) + write_random_test_objects_to_bucket( + awscli_pod_session, + underlying_bucket_name, + cache_uploaded_objs_dir_2, + pattern=obj_name, + s3_creds=cld_mgr.aws_client.nss_creds, + bs="2M", + ) - # Download the object from the cache bucket - sync_object_directory( - awscli_pod_session, - f"s3://{bucket.name}", - cache_downloaded_objs_dir, - mcg_obj_session, - ) + # Download the object from the cache bucket + awscli_pod_session.exec_cmd_on_pod(f"mkdir -p {cache_downloaded_objs_dir}") + sync_object_directory( + awscli_pod_session, + f"s3://{bucket.name}", + cache_downloaded_objs_dir, + mcg_obj_session, + ) - assert verify_s3_object_integrity( - original_object_path=f"{cache_uploaded_objs_dir}/Cache-0", - result_object_path=f"{cache_downloaded_objs_dir}/Cache-0", + assert verify_s3_object_integrity( + original_object_path=f"{cache_uploaded_objs_dir}/{obj_name}0", + result_object_path=f"{cache_downloaded_objs_dir}/{obj_name}0", + awscli_pod=awscli_pod_session, + ), "The uploaded and downloaded cached objects have different checksums" + + assert ( + verify_s3_object_integrity( + original_object_path=f"{cache_uploaded_objs_dir_2}/{obj_name}0", + result_object_path=f"{cache_downloaded_objs_dir}/{obj_name}0", awscli_pod=awscli_pod_session, - ), "The uploaded and downloaded cached objects have different checksums" - - assert ( - verify_s3_object_integrity( - original_object_path=f"{cache_uploaded_objs_dir_2}/Cache-0", - result_object_path=f"{cache_downloaded_objs_dir}/Cache-0", - awscli_pod=awscli_pod_session, - ) - is False - ), "The cached object was replaced by the new one before the TTL has expired" - verified_buckets.append(bucket.name) - if event.is_set(): - run_in_bg = False - break - except Exception: - logger.warning(f"verified so far: {verified_buckets}") - - raise + ) + is False + ), "The cached object was replaced by the new one before the TTL has expired" + logger.info(f"Verified caching for bucket: {bucket.name}") + + if event.is_set(): + run_in_bg = False + break if not run_in_bg: logger.warning("Stopping noobaa caching verification") @@ -269,14 +288,10 @@ def validate_rgw_kafka_notification(kafka_rgw_dict, event, run_in_bg=False): kafka_topic = kafka_rgw_dict["kafka_topic"] while True: + data = data + f"{uuid4().hex}" - try: - - def put_object_to_bucket(bucket_name, key, body): - return s3_client.put_object(Bucket=bucket_name, Key=key, Body=body) - - except botoexceptions.ClientError: - logger.warning("s3 put object timedout but ignoring as of now") + def put_object_to_bucket(bucket_name, key, body): + return s3_client.put_object(Bucket=bucket_name, Key=key, Body=body) assert put_object_to_bucket( bucketname, "key-1", data @@ -345,7 +360,10 @@ def validate_mcg_object_expiration( for i in range(object_amount): s3_put_object( - mcg_obj, bucket.name, f"{prefix}/obj-key-{i}", "Some random data" + mcg_obj, + bucket.name, + f"{prefix}/obj-key-{uuid4().hex}", + "Some random data", ) expire_objects_in_bucket(bucket.name) sample_if_objects_expired(mcg_obj, bucket.name) diff --git a/ocs_ci/ocs/bucket_utils.py b/ocs_ci/ocs/bucket_utils.py index 83f8f08bc4e..35af84bd9ab 100644 --- a/ocs_ci/ocs/bucket_utils.py +++ b/ocs_ci/ocs/bucket_utils.py @@ -1333,6 +1333,7 @@ def check_cached_objects_by_name(mcg_obj, bucket_name, expected_objects_names=No Returns: bool: True if all the objects exist in the cache as expected, False otherwise + """ res = mcg_obj.send_rpc_query( "object_api", @@ -1344,7 +1345,7 @@ def check_cached_objects_by_name(mcg_obj, bucket_name, expected_objects_names=No list_objects_res = [name["key"] for name in res.get("reply").get("objects")] if not expected_objects_names: expected_objects_names = [] - # if set(expected_objects_names) == set(list_objects_res): + for obj in expected_objects_names: if obj not in list_objects_res: logger.warning( @@ -1774,6 +1775,7 @@ def random_object_round_trip_verification( cleanup=False, result_pod=None, result_pod_path=None, + **kwargs, ): """ Writes random objects in a pod, uploads them to a bucket, @@ -1817,7 +1819,7 @@ def random_object_round_trip_verification( ) written_objects = io_pod.exec_cmd_on_pod(f"ls -A1 {upload_dir}").split(" ") if wait_for_replication: - compare_bucket_object_list(mcg_obj, bucket_name, second_bucket_name) + compare_bucket_object_list(mcg_obj, bucket_name, second_bucket_name, **kwargs) bucket_name = second_bucket_name # Download the random objects that were uploaded to the bucket sync_object_directory( @@ -1917,50 +1919,19 @@ def create_aws_bs_using_cli( ) -def change_expiration_query_interval(new_interval): - """ - Change how often noobaa should check for object expiration - By default it will be 8 hours - Args: - new_interval (int): New interval in minutes - """ - - from ocs_ci.ocs.resources.pod import ( - get_noobaa_core_pod, - wait_for_pods_to_be_running, - ) - - nb_core_pod = get_noobaa_core_pod() - new_interval = new_interval * 60 * 1000 - params = ( - '[{"op": "add", "path": "/spec/template/spec/containers/0/env/-", ' - f'"value": {{ "name": "CONFIG_JS_LIFECYCLE_INTERVAL", "value": "{new_interval}" }}}}]' - ) - OCP(kind="statefulset", namespace=constants.OPENSHIFT_STORAGE_NAMESPACE).patch( - resource_name=constants.NOOBAA_CORE_STATEFULSET, - params=params, - format_type="json", - ) - logger.info(f"Updated the expiration query interval to {new_interval} ms") - nb_core_pod.delete() - wait_for_pods_to_be_running(pod_names=[nb_core_pod.name], timeout=300) - - -def expire_objects_in_bucket(bucket_name, new_expire_interval=None): +def expire_objects_in_bucket(bucket_name): """ Manually expire the objects in a bucket + Args: bucket_name (str): Name of the bucket - new_expire_interval (int): New expiration interval + """ from ocs_ci.ocs.resources.pod import ( get_noobaa_db_pod, ) - if new_expire_interval is not None and isinstance(new_expire_interval, int): - change_expiration_query_interval(new_expire_interval) - creation_time = f"{date.today().year-1}-06-25T14:18:28.712Z" nb_db_pod = get_noobaa_db_pod() query = ( diff --git a/ocs_ci/ocs/constants.py b/ocs_ci/ocs/constants.py index 1d024076888..fb0e87df186 100644 --- a/ocs_ci/ocs/constants.py +++ b/ocs_ci/ocs/constants.py @@ -116,6 +116,7 @@ CONFIG_JS_PREFIX = "CONFIG_JS_" BUCKET_REPLICATOR_DELAY_PARAM = CONFIG_JS_PREFIX + "BUCKET_REPLICATOR_DELAY" BUCKET_LOG_REPLICATOR_DELAY_PARAM = CONFIG_JS_PREFIX + "BUCKET_LOG_REPLICATOR_DELAY" +LIFECYCLE_INTERVAL_PARAM = "CONFIG_JS_LIFECYCLE_INTERVAL" # Resources / Kinds CEPHFILESYSTEM = "CephFileSystem" @@ -1756,6 +1757,7 @@ AWS_S3_ENDPOINT = "https://s3.amazonaws.com" NAMESPACE_FILESYSTEM = "nsfs" + # Cosbench constants COSBENCH = "cosbench" COSBENCH_PROJECT = "cosbench-project" diff --git a/ocs_ci/ocs/resources/pod.py b/ocs_ci/ocs/resources/pod.py index b22ececa589..a1a69779e2d 100644 --- a/ocs_ci/ocs/resources/pod.py +++ b/ocs_ci/ocs/resources/pod.py @@ -899,16 +899,10 @@ def get_noobaa_db_pod(): Returns: Pod object: Noobaa db pod object """ - if version.get_semantic_ocs_version_from_config() > version.VERSION_4_6: - nb_db = get_pods_having_label( - label=constants.NOOBAA_DB_LABEL_47_AND_ABOVE, - namespace=config.ENV_DATA["cluster_namespace"], - ) - else: - nb_db = get_pods_having_label( - label=constants.NOOBAA_DB_LABEL_46_AND_UNDER, - namespace=config.ENV_DATA["cluster_namespace"], - ) + nb_db = get_pods_having_label( + label=constants.NOOBAA_DB_LABEL_47_AND_ABOVE, + namespace=config.ENV_DATA["cluster_namespace"], + ) nb_db_pod = Pod(**nb_db[0]) return nb_db_pod diff --git a/tests/conftest.py b/tests/conftest.py index 7b8cbb3d2a3..a24ae3174d3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -35,9 +35,7 @@ from ocs_ci.ocs.bucket_utils import ( craft_s3_command, put_bucket_policy, - change_expiration_query_interval, ) - from ocs_ci.ocs.dr.dr_workload import BusyBox, BusyBox_AppSet from ocs_ci.ocs.exceptions import ( CommandFailed, @@ -94,12 +92,9 @@ get_all_pods, verify_data_integrity_for_multi_pvc_objs, get_noobaa_pods, -<<<<<<< HEAD get_pod_count, wait_for_pods_by_label_count, -======= get_noobaa_core_pod, ->>>>>>> 3ef4de76 (Entry critieria for MCG system tests) ) from ocs_ci.ocs.resources.pvc import PVC, create_restore_pvc from ocs_ci.ocs.version import get_ocs_version, get_ocp_version_dict, report_ocs_version @@ -6894,33 +6889,22 @@ def setup_logwriter_rbd_workload_factory(request, project_factory, teardown_fact @pytest.fixture() -def change_noobaa_lifecycle_interval(request): - nb_core_pod = get_noobaa_core_pod() - interval_changed = False +def reduce_expiration_interval(add_env_vars_to_noobaa_core_class): + """ + Reduce the interval in which the lifecycle + background worker is running - def factory(interval): - nonlocal interval_changed - interval_changed = True - change_expiration_query_interval(new_interval=interval) + """ - def finalizer(): - if interval_changed: - params = ( - f'[{{"op": "test", "path": "/spec/template/spec/containers/0/env/20/name",' - f'"value": "CONFIG_JS_LIFECYCLE_INTERVAL"}},' - f'{{"op": "remove", "path": "/spec/template/spec/containers/0/env/20"}}]' - ) + def factory(interval): + """ + Args: + interval (int): new interval in minutes - OCP( - kind="statefulset", namespace=constants.OPENSHIFT_STORAGE_NAMESPACE - ).patch( - resource_name=constants.NOOBAA_CORE_STATEFULSET, - params=params, - format_type="json", - ) - nb_core_pod.delete() - wait_for_pods_to_be_running(pod_names=[nb_core_pod.name], timeout=300) - log.info("Switched back to default lifecycle interval") + """ + new_intervals_in_miliseconds = 60 * interval * 1000 + add_env_vars_to_noobaa_core_class( + [(constants.LIFECYCLE_INTERVAL_PARAM, new_intervals_in_miliseconds)] + ) - request.addfinalizer(finalizer) return factory diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 63e0d35e137..f91def5ad87 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -827,6 +827,9 @@ def factory(number_of_buckets, bucket_types, cloud_providers): all_buckets = create_muliple_types_provider_obcs( number_of_buckets, cache_type, cloud_providers, bucket_factory ) + logger.info( + f"These are the cache buckets created: {[bucket.name for bucket in all_buckets]}" + ) return all_buckets return factory @@ -834,7 +837,7 @@ def factory(number_of_buckets, bucket_types, cloud_providers): @pytest.fixture() def setup_mcg_expiration_feature_buckets( - request, bucket_factory, mcg_obj, change_noobaa_lifecycle_interval + request, bucket_factory, mcg_obj, reduce_expiration_interval ): """ This fixture does the setup for validating MCG replication @@ -859,9 +862,9 @@ def factory(number_of_buckets, bucket_types, cloud_providers): """ type = dict() type["data"] = bucket_types["data"] - + reduce_expiration_interval(interval=2) logger.info("Changed noobaa lifecycle interval to 2 minutes") - change_noobaa_lifecycle_interval(interval=2) + expiration_rule = { "Rules": [ { diff --git a/tests/e2e/system_test/test_mcg_recovery.py b/tests/e2e/system_test/test_mcg_recovery.py index e38f52cb0b9..cbc622edec0 100644 --- a/tests/e2e/system_test/test_mcg_recovery.py +++ b/tests/e2e/system_test/test_mcg_recovery.py @@ -1,5 +1,6 @@ import logging import pytest +import time from ocs_ci.framework.pytest_customization.marks import ( system_test, @@ -26,7 +27,7 @@ class TestMCGRecovery(E2ETest): @pytest.mark.parametrize( argnames=["bucket_amount", "object_amount"], - argvalues=[pytest.param(2, 15)], + argvalues=[pytest.param(5, 5)], ) def test_mcg_db_backup_recovery( self, @@ -42,21 +43,18 @@ def test_mcg_db_backup_recovery( num_of_buckets=bucket_amount, object_amount=object_amount, is_disruptive=True, - skip_any_features=["caching", "nsfs", "rgw kafka"], + skip_any_features=["nsfs", "rgw kafka", "caching"], ) noobaa_db_backup_and_recovery(snapshot_factory=snapshot_factory) - import time + # wait 1 min for complete stabilization time.sleep(60) - event, threads = validate_mcg_bg_features( + + validate_mcg_bg_features( feature_setup_map, run_in_bg=False, - skip_any_features=["caching", "nsfs", "rgw kafka"], + skip_any_features=["nsfs", "rgw kafka", "caching"], object_amount=object_amount, ) - - event.set() - for th in threads: - th.result() log.info("No issues seen with the MCG bg feature validation") From 6bf3727d15084c33a6269d8ad86352002fee8ecd Mon Sep 17 00:00:00 2001 From: Mahesh Shetty Date: Wed, 18 Oct 2023 12:46:14 +0530 Subject: [PATCH 4/4] fix minor issues Signed-off-by: Mahesh Shetty --- tests/conftest.py | 1 - tests/e2e/conftest.py | 7 +++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index a24ae3174d3..c2f854b90a0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -94,7 +94,6 @@ get_noobaa_pods, get_pod_count, wait_for_pods_by_label_count, - get_noobaa_core_pod, ) from ocs_ci.ocs.resources.pvc import PVC, create_restore_pvc from ocs_ci.ocs.version import get_ocs_version, get_ocp_version_dict, report_ocs_version diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index f91def5ad87..62d91ec3e7c 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -777,6 +777,7 @@ def factory(number_of_buckets, bucket_types, cloud_providers): if len(all_buckets) % 2 != 0: all_buckets[len(all_buckets) - 1].delete() + all_buckets.remove(all_buckets[len(all_buckets) - 1]) source_target_map = dict() index = 0 @@ -795,6 +796,9 @@ def factory(number_of_buckets, bucket_types, cloud_providers): index += 2 + logger.info( + f"Buckets created under replication setup: {[bucket.name for bucket in all_buckets]}" + ) return all_buckets, source_target_map return factory @@ -887,6 +891,9 @@ def factory(number_of_buckets, bucket_types, cloud_providers): Bucket=bucket.name, LifecycleConfiguration=expiration_rule ) + logger.info( + f"Buckets created under expiration setup: {[bucket.name for bucket in all_buckets]}" + ) return all_buckets return factory