diff --git a/ocs_ci/helpers/e2e_helpers.py b/ocs_ci/helpers/e2e_helpers.py new file mode 100644 index 00000000000..55723cbb04a --- /dev/null +++ b/ocs_ci/helpers/e2e_helpers.py @@ -0,0 +1,381 @@ +import logging + +import random +import copy +import re +import time + +from uuid import uuid4 +from ocs_ci.utility.retry import retry +from ocs_ci.ocs.bucket_utils import ( + random_object_round_trip_verification, + write_random_test_objects_to_bucket, + wait_for_cache, + sync_object_directory, + verify_s3_object_integrity, + s3_put_object, + expire_objects_in_bucket, + sample_if_objects_expired, +) +from ocs_ci.ocs.resources.pod import get_rgw_pods, get_pod_logs +from ocs_ci.utility.utils import exec_cmd, run_cmd + + +logger = logging.getLogger(__name__) + + +def create_muliple_types_provider_obcs( + num_of_buckets, bucket_types, cloud_providers, bucket_factory +): + """ + This function creates valid OBCs of different cloud providers + and bucket types + + Args: + num_of_buckets (int): Number of buckets + bucket_types (dict): Dict representing mapping between + bucket type and relevant configuration + cloud_providers (dict): Dict representing mapping between + cloud providers and relevant configuration + bucket_factory (fixture): bucket_factory fixture method + + Returns: + List: list of created buckets + + """ + + def get_all_combinations_map(providers, bucket_types): + """ + Create valid combination of cloud-providers and bucket-types + + Args: + providers (dict): dictionary representing cloud + providers and the respective config + bucket_types (dict): dictionary representing different + types of bucket and the respective config + Returns: + List: containing all the possible combination of buckets + + """ + all_combinations = dict() + + for provider, provider_config in providers.items(): + for bucket_type, type_config in bucket_types.items(): + if provider == "pv" and bucket_type != "data": + provider = random.choice(["aws", "azure"]) + provider_config = providers[provider] + bucketclass = copy.deepcopy(type_config) + + if "backingstore_dict" in bucketclass.keys(): + bucketclass["backingstore_dict"][provider] = [provider_config] + elif "namespace_policy_dict" in bucketclass.keys(): + bucketclass["namespace_policy_dict"]["namespacestore_dict"][ + provider + ] = [provider_config] + all_combinations.update({f"{bucket_type}-{provider}": bucketclass}) + return all_combinations + + all_combination_of_obcs = get_all_combinations_map(cloud_providers, bucket_types) + buckets = list() + num_of_buckets_each = num_of_buckets // len(all_combination_of_obcs.keys()) + buckets_left = num_of_buckets % len(all_combination_of_obcs.keys()) + if num_of_buckets_each != 0: + for combo, combo_config in all_combination_of_obcs.items(): + buckets.extend( + bucket_factory( + interface="OC", + amount=num_of_buckets_each, + bucketclass=combo_config, + ) + ) + + for index in range(0, buckets_left): + buckets.extend( + bucket_factory( + interface="OC", + amount=1, + bucketclass=all_combination_of_obcs[ + list(all_combination_of_obcs.keys())[index] + ], + ) + ) + + return buckets + + +def validate_mcg_bucket_replicaton( + awscli_pod_session, + mcg_obj_session, + source_target_map, + uploaded_objects_dir, + downloaded_obejcts_dir, + event, + run_in_bg=False, + object_amount=5, +): + """ + Validate MCG bucket replication feature + + Args: + awscli_pod_session (Pod): Pod object representing aws-cli pod + mcg_obj_session (MCG): MCG object + source_target_map (Dict): Dictionary consisting of source - target buckets + uploaded_objects_dir (str): directory where uploaded objects are kept + downloaded_obejcts_dir (str): directory where downloaded objects are kept + event (threading.Event()): Event() object + run_in_bg (bool): If True, validation is run in background + object_amount (int): Amounts of objects + + """ + bidi_uploaded_objs_dir_1 = uploaded_objects_dir + "/bidi_1" + bidi_uploaded_objs_dir_2 = uploaded_objects_dir + "/bidi_2" + bidi_downloaded_objs_dir_1 = downloaded_obejcts_dir + "/bidi_1" + bidi_downloaded_objs_dir_2 = downloaded_obejcts_dir + "/bidi_2" + + # Verify replication is working as expected by performing a two-way round-trip object verification + while True: + for first_bucket, second_bucket in source_target_map.items(): + random_object_round_trip_verification( + io_pod=awscli_pod_session, + bucket_name=first_bucket.name, + upload_dir=bidi_uploaded_objs_dir_1, + download_dir=bidi_downloaded_objs_dir_1, + amount=object_amount, + pattern=f"FirstBiDi-{uuid4().hex}", + wait_for_replication=True, + second_bucket_name=second_bucket.name, + mcg_obj=mcg_obj_session, + cleanup=True, + timeout=1200, + ) + + random_object_round_trip_verification( + io_pod=awscli_pod_session, + bucket_name=second_bucket.name, + upload_dir=bidi_uploaded_objs_dir_2, + download_dir=bidi_downloaded_objs_dir_2, + amount=object_amount, + pattern=f"SecondBiDi-{uuid4().hex}", + wait_for_replication=True, + second_bucket_name=first_bucket.name, + mcg_obj=mcg_obj_session, + cleanup=True, + timeout=1200, + ) + if event.is_set(): + run_in_bg = False + break + + if not run_in_bg: + logger.info("Verified bi-direction replication successfully") + logger.warning("Stopping bi-direction replication verification") + break + time.sleep(30) + + +def validate_mcg_caching( + awscli_pod_session, + mcg_obj_session, + cld_mgr, + cache_buckets, + uploaded_objects_dir, + downloaded_obejcts_dir, + event, + run_in_bg=False, +): + """ + Validate noobaa caching feature against the cache buckets + + Args: + awscli_pod_session (Pod): Pod object representing aws-cli pod + mcg_obj_session (MCG): MCG object + cld_mgr (cld_mgr): cld_mgr object + cache_buckets (List): List consisting of cache buckets + uploaded_objects_dir (str): directory where uploaded objects are kept + downloaded_obejcts_dir (str): directory where downloaded objects are kept + event (threading.Event()): Event() object + run_in_bg (bool): If True, validation is run in background + + """ + while True: + for bucket in cache_buckets: + cache_uploaded_objs_dir = uploaded_objects_dir + "/cache" + cache_uploaded_objs_dir_2 = uploaded_objects_dir + "/cache_2" + cache_downloaded_objs_dir = downloaded_obejcts_dir + "/cache" + underlying_bucket_name = bucket.bucketclass.namespacestores[0].uls_name + + # Upload a random object to the bucket + logger.info(f"Uploading to the cache bucket: {bucket.name}") + obj_name = f"Cache-{uuid4().hex}" + objs_written_to_cache_bucket = write_random_test_objects_to_bucket( + awscli_pod_session, + bucket.name, + cache_uploaded_objs_dir, + pattern=obj_name, + mcg_obj=mcg_obj_session, + ) + wait_for_cache( + mcg_obj_session, + bucket.name, + objs_written_to_cache_bucket, + timeout=300, + ) + + # Write a random, larger object directly to the underlying storage of the bucket + logger.info( + f"Uploading to the underlying bucket {underlying_bucket_name} directly" + ) + write_random_test_objects_to_bucket( + awscli_pod_session, + underlying_bucket_name, + cache_uploaded_objs_dir_2, + pattern=obj_name, + s3_creds=cld_mgr.aws_client.nss_creds, + bs="2M", + ) + + # Download the object from the cache bucket + awscli_pod_session.exec_cmd_on_pod(f"mkdir -p {cache_downloaded_objs_dir}") + sync_object_directory( + awscli_pod_session, + f"s3://{bucket.name}", + cache_downloaded_objs_dir, + mcg_obj_session, + ) + + assert verify_s3_object_integrity( + original_object_path=f"{cache_uploaded_objs_dir}/{obj_name}0", + result_object_path=f"{cache_downloaded_objs_dir}/{obj_name}0", + awscli_pod=awscli_pod_session, + ), "The uploaded and downloaded cached objects have different checksums" + + assert ( + verify_s3_object_integrity( + original_object_path=f"{cache_uploaded_objs_dir_2}/{obj_name}0", + result_object_path=f"{cache_downloaded_objs_dir}/{obj_name}0", + awscli_pod=awscli_pod_session, + ) + is False + ), "The cached object was replaced by the new one before the TTL has expired" + logger.info(f"Verified caching for bucket: {bucket.name}") + + if event.is_set(): + run_in_bg = False + break + + if not run_in_bg: + logger.warning("Stopping noobaa caching verification") + break + time.sleep(30) + + +def validate_rgw_kafka_notification(kafka_rgw_dict, event, run_in_bg=False): + """ + Validate kafka notifications for RGW buckets + + Args: + kafka_rgw_dict (Dict): Dict consisting of rgw bucket, + kafka_topic, kafkadrop_host etc + event (threading.Event()): Event() object + run_in_bg (Bool): True if you want to run in the background + + """ + s3_client = kafka_rgw_dict["s3client"] + bucketname = kafka_rgw_dict["kafka_rgw_bucket"] + notify_cmd = kafka_rgw_dict["notify_cmd"] + data = kafka_rgw_dict["data"] + kafkadrop_host = kafka_rgw_dict["kafkadrop_host"] + kafka_topic = kafka_rgw_dict["kafka_topic"] + + while True: + data = data + f"{uuid4().hex}" + + def put_object_to_bucket(bucket_name, key, body): + return s3_client.put_object(Bucket=bucket_name, Key=key, Body=body) + + assert put_object_to_bucket( + bucketname, "key-1", data + ), "Failed: Put object: key-1" + exec_cmd(notify_cmd) + + # Validate rgw logs notification are sent + # No errors are seen + pattern = "ERROR: failed to create push endpoint" + rgw_pod_obj = get_rgw_pods() + rgw_log = get_pod_logs(pod_name=rgw_pod_obj[0].name, container="rgw") + assert re.search(pattern=pattern, string=rgw_log) is None, ( + f"Error: {pattern} msg found in the rgw logs." + f"Validate {pattern} found on rgw logs and also " + f"rgw bucket notification is working correctly" + ) + assert put_object_to_bucket( + bucketname, "key-2", data + ), "Failed: Put object: key-2" + exec_cmd(notify_cmd) + + # Validate message are received Kafka side using curl command + # A temporary way to check from Kafka side, need to check from UI + @retry(Exception, tries=5, delay=5) + def validate_kafa_for_message(): + curl_command = ( + f"curl -X GET {kafkadrop_host}/topic/{kafka_topic.name} " + "-H 'content-type: application/vnd.kafka.json.v2+json'" + ) + json_output = run_cmd(cmd=curl_command) + # logger.info("Json output:" f"{json_output}") + new_string = json_output.split() + messages = new_string[new_string.index("messages") + 1] + logger.info("Messages:" + str(messages)) + if messages.find("1") == -1: + raise Exception( + "Error: Messages are not recieved from Kafka side." + "RGW bucket notification is not working as expected." + ) + + validate_kafa_for_message() + + if event.is_set() or not run_in_bg: + logger.warning("Stopping kafka rgw notification verification") + break + time.sleep(30) + + +def validate_mcg_object_expiration( + mcg_obj, buckets, event, run_in_bg=False, object_amount=5, prefix="" +): + """ + Validates objects expiration for MCG buckets + + Args: + mcg_obj (MCG): MCG object + buckets (List): List of MCG buckets + event (threading.Event()): Event() object + run_in_bg (Bool): True if wants to run in background + object_amount (Int): Amount of objects + prefix (str): Any prefix used for objects + + """ + while True: + for bucket in buckets: + + for i in range(object_amount): + s3_put_object( + mcg_obj, + bucket.name, + f"{prefix}/obj-key-{uuid4().hex}", + "Some random data", + ) + expire_objects_in_bucket(bucket.name) + sample_if_objects_expired(mcg_obj, bucket.name) + if event.is_set(): + run_in_bg = False + break + + if not run_in_bg: + logger.warning("Stopping MCG object expiration verification") + break + time.sleep(30) + + +def validate_mcg_nsfs_feature(): + logger.info("This is not implemented") diff --git a/ocs_ci/ocs/bucket_utils.py b/ocs_ci/ocs/bucket_utils.py index 2f54b93213d..35af84bd9ab 100644 --- a/ocs_ci/ocs/bucket_utils.py +++ b/ocs_ci/ocs/bucket_utils.py @@ -6,6 +6,7 @@ import os import shlex from uuid import uuid4 +from datetime import date import boto3 from botocore.handlers import disable_signing @@ -1332,6 +1333,7 @@ def check_cached_objects_by_name(mcg_obj, bucket_name, expected_objects_names=No Returns: bool: True if all the objects exist in the cache as expected, False otherwise + """ res = mcg_obj.send_rpc_query( "object_api", @@ -1343,18 +1345,20 @@ def check_cached_objects_by_name(mcg_obj, bucket_name, expected_objects_names=No list_objects_res = [name["key"] for name in res.get("reply").get("objects")] if not expected_objects_names: expected_objects_names = [] - if set(expected_objects_names) == set(list_objects_res): - logger.info("Files cached as expected") - return True - logger.warning( - "Objects did not cache properly, \n" - f"Expected: [{expected_objects_names}]\n" - f"Cached: [{list_objects_res}]" - ) - return False + + for obj in expected_objects_names: + if obj not in list_objects_res: + logger.warning( + "Objects did not cache properly, \n" + f"Expected: [{expected_objects_names}]\n" + f"Cached: [{list_objects_res}]" + ) + return False + logger.info("Files cached as expected") + return True -def wait_for_cache(mcg_obj, bucket_name, expected_objects_names=None): +def wait_for_cache(mcg_obj, bucket_name, expected_objects_names=None, timeout=60): """ wait for existing cache bucket to cache all required objects @@ -1365,7 +1369,7 @@ def wait_for_cache(mcg_obj, bucket_name, expected_objects_names=None): """ sample = TimeoutSampler( - timeout=60, + timeout=timeout, sleep=10, func=check_cached_objects_by_name, mcg_obj=mcg_obj, @@ -1771,6 +1775,7 @@ def random_object_round_trip_verification( cleanup=False, result_pod=None, result_pod_path=None, + **kwargs, ): """ Writes random objects in a pod, uploads them to a bucket, @@ -1814,7 +1819,7 @@ def random_object_round_trip_verification( ) written_objects = io_pod.exec_cmd_on_pod(f"ls -A1 {upload_dir}").split(" ") if wait_for_replication: - compare_bucket_object_list(mcg_obj, bucket_name, second_bucket_name) + compare_bucket_object_list(mcg_obj, bucket_name, second_bucket_name, **kwargs) bucket_name = second_bucket_name # Download the random objects that were uploaded to the bucket sync_object_directory( @@ -1912,3 +1917,74 @@ def create_aws_bs_using_cli( f"--target-bucket {uls_name} --region {region}", use_yes=True, ) + + +def expire_objects_in_bucket(bucket_name): + """ + Manually expire the objects in a bucket + + Args: + bucket_name (str): Name of the bucket + + """ + + from ocs_ci.ocs.resources.pod import ( + get_noobaa_db_pod, + ) + + creation_time = f"{date.today().year-1}-06-25T14:18:28.712Z" + nb_db_pod = get_noobaa_db_pod() + query = ( + 'UPDATE objectmds SET "data"=jsonb_set("data", \'{create_time}\',' + f"'\\\"{creation_time}\\\"') WHERE data ->> 'bucket' IN " + f"( SELECT _id FROM buckets WHERE data ->> 'name' = '{bucket_name}' );" + ) + command = f'psql -h 127.0.0.1 -p 5432 -U postgres -d nbcore -c "{query}"' + + nb_db_pod.exec_cmd_on_pod(command=command, out_yaml_format=False) + + +def check_if_objects_expired(mcg_obj, bucket_name, prefix=""): + """ + Checks if objects in the bucket is expired + + Args: + mcg_obj(MCG): MCG object + bucket_name(str): Name of the bucket + prefix(str): Objects prefix + + Returns: + Bool: True if objects are expired, else False + + """ + + response = s3_list_objects_v2( + mcg_obj, bucketname=bucket_name, prefix=prefix, delimiter="/" + ) + if response["KeyCount"] != 0: + return False + return True + + +def sample_if_objects_expired(mcg_obj, bucket_name, prefix="", timeout=600, sleep=30): + """ + Sample if the objects in a bucket expired using + TimeoutSampler + + """ + message = ( + f"Objects in bucket with prefix {prefix} " + if prefix != "" + else "Objects in the bucket " + ) + sampler = TimeoutSampler( + timeout=timeout, + sleep=sleep, + func=check_if_objects_expired, + mcg_obj=mcg_obj, + bucket_name=bucket_name, + prefix=prefix, + ) + + assert sampler.wait_for_func_status(result=True), f"{message} are not expired" + logger.info(f"{message} are expired") diff --git a/ocs_ci/ocs/constants.py b/ocs_ci/ocs/constants.py index 1d024076888..fb0e87df186 100644 --- a/ocs_ci/ocs/constants.py +++ b/ocs_ci/ocs/constants.py @@ -116,6 +116,7 @@ CONFIG_JS_PREFIX = "CONFIG_JS_" BUCKET_REPLICATOR_DELAY_PARAM = CONFIG_JS_PREFIX + "BUCKET_REPLICATOR_DELAY" BUCKET_LOG_REPLICATOR_DELAY_PARAM = CONFIG_JS_PREFIX + "BUCKET_LOG_REPLICATOR_DELAY" +LIFECYCLE_INTERVAL_PARAM = "CONFIG_JS_LIFECYCLE_INTERVAL" # Resources / Kinds CEPHFILESYSTEM = "CephFileSystem" @@ -1756,6 +1757,7 @@ AWS_S3_ENDPOINT = "https://s3.amazonaws.com" NAMESPACE_FILESYSTEM = "nsfs" + # Cosbench constants COSBENCH = "cosbench" COSBENCH_PROJECT = "cosbench-project" diff --git a/ocs_ci/ocs/resources/pod.py b/ocs_ci/ocs/resources/pod.py index 47b4ffd8e19..a1a69779e2d 100644 --- a/ocs_ci/ocs/resources/pod.py +++ b/ocs_ci/ocs/resources/pod.py @@ -893,6 +893,20 @@ def get_noobaa_operator_pod( return noobaa_operator_pod +def get_noobaa_db_pod(): + """ + Get noobaa db pod obj + Returns: + Pod object: Noobaa db pod object + """ + nb_db = get_pods_having_label( + label=constants.NOOBAA_DB_LABEL_47_AND_ABOVE, + namespace=config.ENV_DATA["cluster_namespace"], + ) + nb_db_pod = Pod(**nb_db[0]) + return nb_db_pod + + def get_noobaa_core_pod(): """ Fetches Noobaa core pod details diff --git a/tests/conftest.py b/tests/conftest.py index 9e45c157e63..c2f854b90a0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6885,3 +6885,25 @@ def setup_logwriter_rbd_workload_factory(request, project_factory, teardown_fact ) return logwriter_sts + + +@pytest.fixture() +def reduce_expiration_interval(add_env_vars_to_noobaa_core_class): + """ + Reduce the interval in which the lifecycle + background worker is running + + """ + + def factory(interval): + """ + Args: + interval (int): new interval in minutes + + """ + new_intervals_in_miliseconds = 60 * interval * 1000 + add_env_vars_to_noobaa_core_class( + [(constants.LIFECYCLE_INTERVAL_PARAM, new_intervals_in_miliseconds)] + ) + + return factory diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index adb1cd7d754..62d91ec3e7c 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -1,10 +1,24 @@ import os import time import logging + +import boto3 import pytest + +from concurrent.futures import ThreadPoolExecutor +from threading import Event from ocs_ci.framework import config +from ocs_ci.helpers.e2e_helpers import ( + create_muliple_types_provider_obcs, + validate_mcg_bucket_replicaton, + validate_mcg_caching, + validate_mcg_object_expiration, + validate_rgw_kafka_notification, + validate_mcg_nsfs_feature, +) from ocs_ci.ocs import constants +from ocs_ci.ocs.amq import AMQ from ocs_ci.ocs.bucket_utils import ( compare_object_checksums_between_bucket_and_local, compare_directory, @@ -14,20 +28,27 @@ wait_for_cache, write_random_test_objects_to_bucket, s3_list_objects_v1, + retrieve_verification_mode, ) from ocs_ci.ocs.benchmark_operator_fio import BenchmarkOperatorFIO from ocs_ci.ocs.ocp import OCP from ocs_ci.ocs.resources import pod, pvc +from ocs_ci.ocs.resources.objectbucket import OBC from ocs_ci.ocs.resources.ocs import OCS -from ocs_ci.ocs.resources.pod import Pod, get_pods_having_label +from ocs_ci.ocs.resources.pod import ( + Pod, + get_pods_having_label, +) from ocs_ci.ocs.resources.deployment import Deployment from ocs_ci.ocs.exceptions import CommandFailed from ocs_ci.helpers.helpers import ( wait_for_resource_state, modify_statefulset_replica_count, validate_pv_delete, + default_storage_class, ) +from ocs_ci.utility.utils import clone_notify logger = logging.getLogger(__name__) @@ -725,3 +746,549 @@ def pytest_collection_modifyitems(items): f" since it requires Managed service platform" ) items.remove(item) + + +@pytest.fixture() +def setup_mcg_replication_feature_buckets(request, bucket_factory): + """ + This fixture does the setup for validating MCG replication + feature + + """ + + def factory(number_of_buckets, bucket_types, cloud_providers): + """ + factory function implementing the fixture + + Args: + number_of_buckets (int): number of buckets + bucket_types (dict): dictionary mapping bucket types and + configuration + cloud_providers (dict): dictionary mapping cloud provider + and configuration + + Returns: + Dict: source bucket to target bucket map + + """ + all_buckets = create_muliple_types_provider_obcs( + number_of_buckets, bucket_types, cloud_providers, bucket_factory + ) + + if len(all_buckets) % 2 != 0: + all_buckets[len(all_buckets) - 1].delete() + all_buckets.remove(all_buckets[len(all_buckets) - 1]) + + source_target_map = dict() + index = 0 + for i in range(len(all_buckets) // 2): + source_target_map[all_buckets[index]] = all_buckets[index + 1] + patch_replication_policy_to_bucket( + all_buckets[index].name, + "basic-replication-rule-1", + all_buckets[index + 1].name, + ) + patch_replication_policy_to_bucket( + all_buckets[index + 1].name, + "basic-replication-rule-2", + all_buckets[index].name, + ) + + index += 2 + + logger.info( + f"Buckets created under replication setup: {[bucket.name for bucket in all_buckets]}" + ) + return all_buckets, source_target_map + + return factory + + +@pytest.fixture() +def setup_mcg_caching_feature_buckets(request, bucket_factory): + """ + This fixture does the setup for Noobaa cache buckets validation + + """ + + def factory(number_of_buckets, bucket_types, cloud_providers): + """ + factory function implementing fixture + + Args: + number_of_buckets (int): number of buckets + bucket_types (dict): dictionary mapping bucket types and + configuration + cloud_providers (dict): dictionary mapping cloud provider + and configuration + + Returns: + List: List of cache buckets + + """ + cache_type = dict() + cache_type["cache"] = bucket_types["cache"] + all_buckets = create_muliple_types_provider_obcs( + number_of_buckets, cache_type, cloud_providers, bucket_factory + ) + logger.info( + f"These are the cache buckets created: {[bucket.name for bucket in all_buckets]}" + ) + return all_buckets + + return factory + + +@pytest.fixture() +def setup_mcg_expiration_feature_buckets( + request, bucket_factory, mcg_obj, reduce_expiration_interval +): + """ + This fixture does the setup for validating MCG replication + feature + + """ + + def factory(number_of_buckets, bucket_types, cloud_providers): + """ + Factory function implementing the fixture + + Args: + number_of_buckets (int): number of buckets + bucket_types (dict): dictionary mapping bucket types and + configuration + cloud_providers (dict): dictionary mapping cloud provider + and configuration + + Returns: + List: list of buckets + + """ + type = dict() + type["data"] = bucket_types["data"] + reduce_expiration_interval(interval=2) + logger.info("Changed noobaa lifecycle interval to 2 minutes") + + expiration_rule = { + "Rules": [ + { + "Expiration": { + "Days": 1, + "ExpiredObjectDeleteMarker": False, + }, + "Filter": {"Prefix": ""}, + "ID": "data-expire", + "Status": "Enabled", + } + ] + } + + all_buckets = create_muliple_types_provider_obcs( + number_of_buckets, type, cloud_providers, bucket_factory + ) + for bucket in all_buckets: + mcg_obj.s3_client.put_bucket_lifecycle_configuration( + Bucket=bucket.name, LifecycleConfiguration=expiration_rule + ) + + logger.info( + f"Buckets created under expiration setup: {[bucket.name for bucket in all_buckets]}" + ) + return all_buckets + + return factory + + +@pytest.fixture() +def setup_mcg_nsfs_feature_buckets(request): + def factory(): + pass + + +@pytest.fixture() +def setup_rgw_kafka_notification(request, rgw_bucket_factory, rgw_obj): + """ + This fixture does the setup for validating RGW kafka + notification feature + + """ + # setup AMQ + amq = AMQ() + + kafka_topic = kafkadrop_pod = kafkadrop_svc = kafkadrop_route = None + + # get storageclass + storage_class = default_storage_class(interface_type=constants.CEPHBLOCKPOOL) + + # setup AMQ cluster + amq.setup_amq_cluster(storage_class.name) + + # create kafka topic + kafka_topic = amq.create_kafka_topic() + + # create kafkadrop pod + ( + kafkadrop_pod, + kafkadrop_svc, + kafkadrop_route, + ) = amq.create_kafkadrop() + + def factory(): + """ + Factory function implementing the fixture + + Returns: + Dict: This consists of mapping of rgw buckets, + kafka_topic, kafkadrop_host objects etc + + """ + # get the kafkadrop route + kafkadrop_host = kafkadrop_route.get().get("spec").get("host") + + # create the bucket + bucketname = rgw_bucket_factory(amount=1, interface="RGW-OC")[0].name + + # get RGW credentials + rgw_endpoint, access_key, secret_key = rgw_obj.get_credentials() + + # clone notify repo + notify_path = clone_notify() + + # initilize to upload data + data = "A random string data to write on created rgw bucket" + obc_obj = OBC(bucketname) + s3_resource = boto3.resource( + "s3", + verify=retrieve_verification_mode(), + endpoint_url=rgw_endpoint, + aws_access_key_id=obc_obj.access_key_id, + aws_secret_access_key=obc_obj.access_key, + ) + s3_client = s3_resource.meta.client + + # Initialize notify command to run + notify_cmd = ( + f"python {notify_path} -e {rgw_endpoint} -a {obc_obj.access_key_id} " + f"-s {obc_obj.access_key} -b {bucketname} " + f"-ke {constants.KAFKA_ENDPOINT} -t {kafka_topic.name}" + ) + + kafka_rgw_dict = { + "s3client": s3_client, + "kafka_rgw_bucket": bucketname, + "notify_cmd": notify_cmd, + "data": data, + "kafkadrop_host": kafkadrop_host, + "kafka_topic": kafka_topic, + } + + return kafka_rgw_dict + + def finalizer(): + if kafka_topic: + kafka_topic.delete() + if kafkadrop_pod: + kafkadrop_pod.delete() + if kafkadrop_svc: + kafkadrop_svc.delete() + if kafkadrop_route: + kafkadrop_route.delete() + + amq.cleanup() + + request.addfinalizer(finalizer) + return factory + + +@pytest.fixture() +def validate_mcg_bg_features( + request, awscli_pod_session, mcg_obj_session, test_directory_setup, cld_mgr +): + """ + This fixture validates specified features provided neccesary + feature setup map. It has option to run the validation to run + in the background while not blocking the execution of rest of + the code + + """ + + def factory( + feature_setup_map, run_in_bg=False, skip_any_features=None, object_amount=5 + ): + """ + factory functon implementing the fixture + + Args: + feature_setup_map (Dict): This has feature to setup of buckets map + consists of buckets, executor, event objects + run_in_bg (Bool): True if want to run the validation in background + skip_any_features (List): List consisting of features that dont need + to be validated + object_amount (int): Number of objects that you wanna use while doing + the validation + + Returns: + Event(): this is a threading.Event() object used to send signals to the + threads to stop + List: List consisting of all the futures objects, ie. threads + + """ + uploaded_objects_dir = test_directory_setup.origin_dir + downloaded_obejcts_dir = test_directory_setup.result_dir + futures_obj = list() + + # if any already running background validation threads + # then stop those threads + if feature_setup_map["executor"]["event"] is not None: + feature_setup_map["executor"]["event"].set() + for t in feature_setup_map["executor"]["threads"]: + t.result() + + event = Event() + executor = ThreadPoolExecutor( + max_workers=5 - len(skip_any_features) + if skip_any_features is not None + else 5 + ) + skip_any_features = list() if skip_any_features is None else skip_any_features + + if "replication" not in skip_any_features: + validate_replication = executor.submit( + validate_mcg_bucket_replicaton, + awscli_pod_session, + mcg_obj_session, + feature_setup_map["replication"], + uploaded_objects_dir, + downloaded_obejcts_dir, + event, + run_in_bg=run_in_bg, + object_amount=object_amount, + ) + futures_obj.append(validate_replication) + + if "caching" not in skip_any_features: + validate_caching = executor.submit( + validate_mcg_caching, + awscli_pod_session, + mcg_obj_session, + cld_mgr, + feature_setup_map["caching"], + uploaded_objects_dir, + downloaded_obejcts_dir, + event, + run_in_bg=run_in_bg, + ) + futures_obj.append(validate_caching) + + if "expiration" not in skip_any_features: + validate_expiration = executor.submit( + validate_mcg_object_expiration, + mcg_obj_session, + feature_setup_map["expiration"], + event, + run_in_bg=run_in_bg, + object_amount=object_amount, + prefix="", + ) + futures_obj.append(validate_expiration) + + if "rgw kafka" not in skip_any_features: + validate_rgw_kafka = executor.submit( + validate_rgw_kafka_notification, + feature_setup_map["rgw kafka"], + event, + run_in_bg=run_in_bg, + ) + futures_obj.append(validate_rgw_kafka) + + if "nsfs" not in skip_any_features: + validate_nsfs = executor.submit( + validate_mcg_nsfs_feature, + ) + futures_obj.append(validate_nsfs) + + # if not run in background we wait until the + # threads are finsihed executing, ie. single iteration + if not run_in_bg: + for t in futures_obj: + t.result() + event = None + + return event, futures_obj + + return factory + + +@pytest.fixture() +def setup_mcg_bg_features( + request, + test_directory_setup, + awscli_pod_session, + mcg_obj_session, + setup_mcg_replication_feature_buckets, + setup_mcg_caching_feature_buckets, + setup_mcg_nsfs_feature_buckets, + setup_mcg_expiration_feature_buckets, + # setup_rgw_kafka_notification, + validate_mcg_bg_features, +): + """ + Fixture to setup MCG features buckets, run IOs, validate IOs + + 1. Bucket replication + 2. Noobaa caching + 3. Object expiration + 4. MCG NSFS + 5. RGW kafka notification + + """ + + def factory( + num_of_buckets=10, + object_amount=5, + is_disruptive=True, + skip_any_type=None, + skip_any_provider=None, + skip_any_features=None, + ): + """ + Args: + num_of_buckets(int): Number of buckets for each MCG features + is_disruptive(bool): Is the test calling this has disruptive flow? + skip_any_type(list): If you want to skip any types of OBCs + skip_any_provider(list): If you want to skip any cloud provider + skip_any_features(list): If you want to skip any MCG features + + Returns: + Dict: Representing all the buckets created for the respective features, + executor and event objects + + """ + + bucket_types = { + "data": { + "interface": "OC", + "backingstore_dict": {}, + }, + "namespace": { + "interface": "OC", + "namespace_policy_dict": { + "type": "Single", + "namespacestore_dict": {}, + }, + }, + "cache": { + "interface": "OC", + "namespace_policy_dict": { + "type": "Cache", + "ttl": 300000, + "namespacestore_dict": {}, + }, + "placement_policy": { + "tiers": [ + {"backingStores": [constants.DEFAULT_NOOBAA_BACKINGSTORE]} + ] + }, + }, + } + + # skip if any bucket types one wants to skip + if skip_any_type is not None: + for type in skip_any_type: + if type not in bucket_types.keys(): + logger.error( + f"Bucket type {type} you asked to skip is not valid type " + f"and valid are {list(bucket_types.keys())}" + ) + else: + bucket_types.pop(type) + + cloud_providers = { + "aws": (1, "eu-central-1"), + "azure": (1, None), + "pv": ( + 1, + constants.MIN_PV_BACKINGSTORE_SIZE_IN_GB, + "ocs-storagecluster-ceph-rbd", + ), + } + + # skip any cloud providers if one wants to skip + if skip_any_provider is not None: + for provider in skip_any_provider: + if provider not in cloud_providers.keys(): + logger.error( + f"Bucket type {provider} you asked to skip is not valid type " + f"and valid are {list(cloud_providers.keys())}" + ) + else: + bucket_types.pop(provider) + + all_buckets = list() + feature_setup_map = dict() + feature_setup_map["executor"] = dict() + feature_setup_map["executor"]["event"] = None + + # skip any features if one wants to skip + features = ["replication", "caching", "expiration", "nsfs", "rgw kafka"] + assert isinstance(skip_any_features, list) and set(skip_any_features).issubset( + set(features) + ), f"Features asked to skip either not present or you havent provided through a list, valid: {features}" + + if "replication" not in skip_any_features: + buckets, source_target_map = setup_mcg_replication_feature_buckets( + num_of_buckets, bucket_types, cloud_providers + ) + all_buckets.extend(buckets) + feature_setup_map["replication"] = source_target_map + + if "caching" not in skip_any_features: + cache_buckets = setup_mcg_caching_feature_buckets( + num_of_buckets, bucket_types, cloud_providers + ) + all_buckets.extend(cache_buckets) + feature_setup_map["caching"] = cache_buckets + + if "expiration" not in skip_any_features: + buckets_with_expiration_policy = setup_mcg_expiration_feature_buckets( + num_of_buckets, bucket_types, cloud_providers + ) + all_buckets.extend(buckets_with_expiration_policy) + feature_setup_map["expiration"] = buckets_with_expiration_policy + + if "nsfs" not in skip_any_features: + setup_mcg_nsfs_feature_buckets() + feature_setup_map["nsfs"] = None + + # if "rgw kafka" not in skip_any_features: + # kafka_rgw_dict = setup_rgw_kafka_notification() + # all_buckets.extend([OBC(kafka_rgw_dict["kafka_rgw_bucket"])]) + # feature_setup_map["rgw kafka"] = kafka_rgw_dict + + uploaded_objects_dir = test_directory_setup.origin_dir + downloaded_obejcts_dir = test_directory_setup.result_dir + + for count, bucket in enumerate(all_buckets): + assert random_object_round_trip_verification( + io_pod=awscli_pod_session, + bucket_name=bucket.name, + upload_dir=uploaded_objects_dir + f"Bucket{count}", + download_dir=downloaded_obejcts_dir + f"Bucket{count}", + amount=object_amount, + pattern="Random_object", + mcg_obj=mcg_obj_session, + cleanup=True, + ), "Some or all written objects were not found in the list of downloaded objects" + logger.info("Successful object round trip verification") + + event, threads = validate_mcg_bg_features( + feature_setup_map, + run_in_bg=not is_disruptive, + skip_any_features=skip_any_features, + object_amount=object_amount, + ) + feature_setup_map["executor"]["event"] = event + feature_setup_map["executor"]["threads"] = threads + return feature_setup_map + + return factory diff --git a/tests/e2e/system_test/test_mcg_recovery.py b/tests/e2e/system_test/test_mcg_recovery.py index 035a107c857..cbc622edec0 100644 --- a/tests/e2e/system_test/test_mcg_recovery.py +++ b/tests/e2e/system_test/test_mcg_recovery.py @@ -1,5 +1,6 @@ import logging import pytest +import time from ocs_ci.framework.pytest_customization.marks import ( system_test, @@ -26,19 +27,34 @@ class TestMCGRecovery(E2ETest): @pytest.mark.parametrize( argnames=["bucket_amount", "object_amount"], - argvalues=[pytest.param(2, 15)], + argvalues=[pytest.param(5, 5)], ) def test_mcg_db_backup_recovery( self, - setup_mcg_system, + setup_mcg_bg_features, bucket_amount, object_amount, - verify_mcg_system_recovery, snapshot_factory, noobaa_db_backup_and_recovery, + validate_mcg_bg_features, ): - mcg_sys_dict = setup_mcg_system(bucket_amount, object_amount) + + feature_setup_map = setup_mcg_bg_features( + num_of_buckets=bucket_amount, + object_amount=object_amount, + is_disruptive=True, + skip_any_features=["nsfs", "rgw kafka", "caching"], + ) noobaa_db_backup_and_recovery(snapshot_factory=snapshot_factory) - verify_mcg_system_recovery(mcg_sys_dict) + # wait 1 min for complete stabilization + time.sleep(60) + + validate_mcg_bg_features( + feature_setup_map, + run_in_bg=False, + skip_any_features=["nsfs", "rgw kafka", "caching"], + object_amount=object_amount, + ) + log.info("No issues seen with the MCG bg feature validation")