diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 6af5dd513ab8..a9e24c9b6bff 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -1,4 +1,5 @@ import os +import re import time import logging @@ -8,6 +9,14 @@ from concurrent.futures import ThreadPoolExecutor from threading import Event + + +import boto3 +import pytest +import random +import copy + +from ocs_ci.utility import version from ocs_ci.framework import config from ocs_ci.helpers.e2e_helpers import ( create_muliple_types_provider_obcs, @@ -39,6 +48,9 @@ from ocs_ci.ocs.resources.pod import ( Pod, get_pods_having_label, + get_rgw_pods, + get_pod_logs, + ) from ocs_ci.ocs.resources.deployment import Deployment from ocs_ci.ocs.exceptions import CommandFailed @@ -48,7 +60,10 @@ validate_pv_delete, default_storage_class, ) + from ocs_ci.utility.utils import clone_notify +from ocs_ci.ocs.resources.rgw import RGW +from ocs_ci.utility.utils import clone_notify, exec_cmd, run_cmd logger = logging.getLogger(__name__) @@ -140,13 +155,20 @@ def factory( kind="secret", namespace=config.ENV_DATA["cluster_namespace"] ) secrets = [ - "noobaa-root-master-key", "noobaa-admin", "noobaa-operator", "noobaa-db", "noobaa-server", "noobaa-endpoints", ] + + if version.get_semantic_ocs_version_from_config() < version.VERSION_4_14: + secrets.extend( + ["noobaa-root-master-key-backend", "noobaa-root-master-key-volume"] + ) + else: + secrets.append("noobaa-root-master-key") + secrets_yaml = [ ocp_secret_obj.get(resource_name=f"{secret}") for secret in secrets ] @@ -589,7 +611,12 @@ def mcg_system_setup(bucket_amount=5, object_amount=10): pattern=cache_pattern, mcg_obj=mcg_obj_session, ) - wait_for_cache(mcg_obj_session, cache_bucket.name, objs_written_to_cache_bucket) + wait_for_cache( + mcg_obj_session, + cache_bucket.name, + objs_written_to_cache_bucket, + timeout=600, + ) # Write a random, larger object directly to the underlying storage of the bucket write_random_test_objects_to_bucket( awscli_pod_session, @@ -912,6 +939,7 @@ def setup_rgw_kafka_notification(request, rgw_bucket_factory, rgw_obj): notification feature """ + # setup AMQ amq = AMQ() @@ -934,6 +962,7 @@ def setup_rgw_kafka_notification(request, rgw_bucket_factory, rgw_obj): ) = amq.create_kafkadrop() def factory(): + """ Factory function implementing the fixture @@ -942,6 +971,7 @@ def factory(): kafka_topic, kafkadrop_host objects etc """ + # get the kafkadrop route kafkadrop_host = kafkadrop_route.get().get("spec").get("host") @@ -949,6 +979,7 @@ def factory(): bucketname = rgw_bucket_factory(amount=1, interface="RGW-OC")[0].name # get RGW credentials + rgw_endpoint, access_key, secret_key = rgw_obj.get_credentials() # clone notify repo @@ -982,6 +1013,8 @@ def factory(): "kafka_topic": kafka_topic, } + validate_kafka_rgw_notifications(kafka_rgw_dict) + return kafka_rgw_dict def finalizer(): @@ -1116,6 +1149,75 @@ def factory( return event, futures_obj +def validate_kafka_rgw_notifications(kafka_rgw_dict): + + s3_client = kafka_rgw_dict["s3_client"] + bucketname = kafka_rgw_dict["bucketname"] + notify_cmd = kafka_rgw_dict["notify_cmd"] + data = kafka_rgw_dict["data"] + kafkadrop_host = kafka_rgw_dict["kafkadrop_host"] + kafka_topic = kafka_rgw_dict["kafka_topic"] + + # Put objects to bucket + assert s3_client.put_object( + Bucket=bucketname, Key="key-1", Body=data + ), "Failed: Put object: key-1" + exec_cmd(notify_cmd) + + # Validate rgw logs notification are sent + # No errors are seen + pattern = "ERROR: failed to create push endpoint" + rgw_pod_obj = get_rgw_pods() + rgw_log = get_pod_logs(pod_name=rgw_pod_obj[0].name, container="rgw") + assert re.search(pattern=pattern, string=rgw_log) is None, ( + f"Error: {pattern} msg found in the rgw logs." + f"Validate {pattern} found on rgw logs and also " + f"rgw bucket notification is working correctly" + ) + assert s3_client.put_object( + Bucket=bucketname, Key="key-2", Body=data + ), "Failed: Put object: key-2" + exec_cmd(notify_cmd) + + # Validate message are received Kafka side using curl command + # A temporary way to check from Kafka side, need to check from UI + curl_command = ( + f"curl -X GET {kafkadrop_host}/topic/{kafka_topic.name} " + "-H 'content-type: application/vnd.kafka.json.v2+json'" + ) + json_output = run_cmd(cmd=curl_command) + new_string = json_output.split() + messages = new_string[new_string.index("messages") + 1] + if messages.find("1") == -1: + raise Exception( + "Error: Messages are not recieved from Kafka side." + "RGW bucket notification is not working as expected." + ) + + +@pytest.fixture() +def setup_mcg_bg_features(setup_mcg_system, setup_kafka_rgw): + """ + This fixture helps to setup various noobaa feature buckets + * MCG bucket replication + * Noobaa caching + * NSFS bucket + * RGW kafka notification + perform basic s3 ops on the buckets + + Returns: + Dict: Dictionary representing mapping between feature and related + buckets + """ + + def factory(bucket_amount=1, object_amount=1): + mcg_sys_dict = setup_mcg_system( + bucket_amount=bucket_amount, object_amount=object_amount + ) + kafka_rgw_dict = setup_kafka_rgw() + + return mcg_sys_dict, kafka_rgw_dict + return factory @@ -1293,8 +1395,174 @@ def factory( return factory -def multi_obc_setup_factory(request, bucket_factory, mcg_obj): - from tests.e2e.helpers import multi_obc_factory +def validate_mcg_bg_feature(verify_mcg_system_recovery): + def factory(mcg_sys_dict, kafka_rgw_dict): + verify_mcg_system_recovery(mcg_sys_dict) + validate_kafka_rgw_notifications(kafka_rgw_dict) + + return factory + +@pytest.fixture() +def multi_obc_setup_factory(request, bucket_factory, mcg_obj): + """ + Fixture for multi obc factory + + """ return multi_obc_factory(bucket_factory, mcg_obj) + + +def multi_obc_factory(bucket_factory, mcg_obj): + """ + This function helps to create different types of + buckets backed by different providers + + """ + + def create_obcs(num_obcs=50, type_of_bucket=None, expiration_rule=None): + """ + This helps to create buckets in bulk, apply expiration rule if any + + Args: + num_obcs (int): number of OBCs + type_of_bucket (list): List representing type fo the buckets + can have values ['data', 'cache', 'namespace'] + expiration_rule (dict): Dictionary representing the object + expiration rule + Returns: + List: List of bucket objects + + """ + + def get_all_combinations_map(providers, bucket_types): + """ + Args: + providers (dict): dictionary representing cloud + providers and the respective config + bucket_types (dict): dictionary representing different + types of bucket and the respective config + + Returns: + List: containing all the possible combination of buckets + + """ + all_combinations = dict() + + for provider, provider_config in providers.items(): + for bucket_type, type_config in bucket_types.items(): + if provider == "pv" and bucket_type != "data": + provider = random.choice(["aws", "azure"]) + provider_config = providers[provider] + bucketclass = copy.deepcopy(type_config) + + if "backingstore_dict" in bucketclass.keys(): + bucketclass["backingstore_dict"][provider] = [provider_config] + elif "namespace_policy_dict" in bucketclass.keys(): + bucketclass["namespace_policy_dict"]["namespacestore_dict"][ + provider + ] = [provider_config] + all_combinations.update({f"{bucket_type}-{provider}": bucketclass}) + return all_combinations + + cloud_providers = { + "aws": (1, "eu-central-1"), + "azure": (1, None), + "pv": ( + 1, + constants.MIN_PV_BACKINGSTORE_SIZE_IN_GB, + "ocs-storagecluster-ceph-rbd", + ), + } + + bucket_types = { + "data": { + "interface": "OC", + "backingstore_dict": {}, + }, + "namespace": { + "interface": "OC", + "namespace_policy_dict": { + "type": "Single", + "namespacestore_dict": {}, + }, + }, + "cache": { + "interface": "OC", + "namespace_policy_dict": { + "type": "Cache", + "ttl": 300000, + "namespacestore_dict": {}, + }, + "placement_policy": { + "tiers": [ + {"backingStores": [constants.DEFAULT_NOOBAA_BACKINGSTORE]} + ] + }, + }, + } + to_remove = list() + if isinstance(type_of_bucket, list): + if set(type_of_bucket).issubset(set(list(bucket_types.keys()))): + for type in bucket_types.keys(): + if type not in type_of_bucket: + to_remove.append(type) + else: + logger.error( + "Invalid bucket types, only possible types are: data, cache, namespace" + ) + elif type_of_bucket is not None: + logger.error( + "Invalid argument type for 'type_of_bucket': It should be list type" + ) + + for i in range(len(to_remove)): + del bucket_types[to_remove[i]] + + all_combination_of_obcs = get_all_combinations_map( + cloud_providers, bucket_types + ) + buckets = list() + buckets_created = dict() + num_of_buckets_each = num_obcs // len(all_combination_of_obcs.keys()) + buckets_left = num_obcs % len(all_combination_of_obcs.keys()) + if num_of_buckets_each != 0: + for combo, combo_config in all_combination_of_obcs.items(): + buckets.extend( + bucket_factory( + interface="OC", + amount=num_of_buckets_each, + bucketclass=combo_config, + ) + ) + buckets_created.update({combo: num_of_buckets_each}) + + for i in range(0, buckets_left): + buckets.extend( + bucket_factory( + interface="OC", + amount=1, + bucketclass=all_combination_of_obcs[ + list(all_combination_of_obcs.keys())[i] + ], + ) + ) + buckets_created.update( + { + list(all_combination_of_obcs.keys())[i]: ( + buckets_created[list(all_combination_of_obcs.keys())[i]] + if len(buckets) >= len(all_combination_of_obcs.keys()) + else 0 + ) + + 1 + } + ) + + for bucket in buckets: + mcg_obj.s3_client.put_bucket_lifecycle_configuration( + Bucket=bucket.name, LifecycleConfiguration=expiration_rule + ) + logger.info("These are the buckets created:" f"{buckets_created}") + return buckets + + return create_obcs diff --git a/tests/e2e/helpers.py b/tests/e2e/helpers.py deleted file mode 100644 index 52c00a5857b2..000000000000 --- a/tests/e2e/helpers.py +++ /dev/null @@ -1,132 +0,0 @@ -import logging -import random -import copy - -from ocs_ci.ocs import constants - -logger = logging.getLogger(__name__) - - -def multi_obc_factory(bucket_factory, mcg_obj): - def get_all_combinations_map(providers, bucket_types): - all_combinations = dict() - - for provider, provider_config in providers.items(): - for bucket_type, type_config in bucket_types.items(): - if provider == "pv" and bucket_type != "data": - provider = random.choice(["aws", "azure"]) - provider_config = providers[provider] - bucketclass = copy.deepcopy(type_config) - - if "backingstore_dict" in bucketclass.keys(): - bucketclass["backingstore_dict"][provider] = [provider_config] - elif "namespace_policy_dict" in bucketclass.keys(): - bucketclass["namespace_policy_dict"]["namespacestore_dict"][ - provider - ] = [provider_config] - all_combinations.update({f"{bucket_type}-{provider}": bucketclass}) - return all_combinations - - def create_obcs(num_obcs=50, type_of_bucket=None, expiration_rule=None): - - cloud_providers = { - "aws": (1, "eu-central-1"), - "azure": (1, None), - "pv": ( - 1, - constants.MIN_PV_BACKINGSTORE_SIZE_IN_GB, - "ocs-storagecluster-ceph-rbd", - ), - } - - bucket_types = { - "data": { - "interface": "OC", - "backingstore_dict": {}, - }, - "namespace": { - "interface": "OC", - "namespace_policy_dict": { - "type": "Single", - "namespacestore_dict": {}, - }, - }, - "cache": { - "interface": "OC", - "namespace_policy_dict": { - "type": "Cache", - "ttl": 300000, - "namespacestore_dict": {}, - }, - "placement_policy": { - "tiers": [ - {"backingStores": [constants.DEFAULT_NOOBAA_BACKINGSTORE]} - ] - }, - }, - } - to_remove = list() - if isinstance(type_of_bucket, list): - if set(type_of_bucket).issubset(set(list(bucket_types.keys()))): - for type in bucket_types.keys(): - if type not in type_of_bucket: - to_remove.append(type) - else: - logger.error( - "Invalid bucket types, only possible types are: data, cache, namespace" - ) - elif type_of_bucket is not None: - logger.error( - "Invalid argument type for 'type_of_bucket': It should be list type" - ) - - for i in range(len(to_remove)): - del bucket_types[to_remove[i]] - - all_combination_of_obcs = get_all_combinations_map( - cloud_providers, bucket_types - ) - buckets = list() - buckets_created = dict() - num_of_buckets_each = num_obcs // len(all_combination_of_obcs.keys()) - buckets_left = num_obcs % len(all_combination_of_obcs.keys()) - if num_of_buckets_each != 0: - for combo, combo_config in all_combination_of_obcs.items(): - buckets.extend( - bucket_factory( - interface="OC", - amount=num_of_buckets_each, - bucketclass=combo_config, - ) - ) - buckets_created.update({combo: num_of_buckets_each}) - - for i in range(0, buckets_left): - buckets.extend( - bucket_factory( - interface="OC", - amount=1, - bucketclass=all_combination_of_obcs[ - list(all_combination_of_obcs.keys())[i] - ], - ) - ) - buckets_created.update( - { - list(all_combination_of_obcs.keys())[i]: ( - buckets_created[list(all_combination_of_obcs.keys())[i]] - if len(buckets) >= len(all_combination_of_obcs.keys()) - else 0 - ) - + 1 - } - ) - - for bucket in buckets: - mcg_obj.s3_client.put_bucket_lifecycle_configuration( - Bucket=bucket.name, LifecycleConfiguration=expiration_rule - ) - logger.info("These are the buckets created:" f"{buckets_created}") - return buckets - - return create_obcs diff --git a/tests/e2e/system_test/test_object_expiration.py b/tests/e2e/system_test/test_object_expiration.py index ce2878a59d3b..bf9dcee9f89c 100644 --- a/tests/e2e/system_test/test_object_expiration.py +++ b/tests/e2e/system_test/test_object_expiration.py @@ -138,8 +138,13 @@ def test_object_expiration_with_disruptions( change_noobaa_lifecycle_interval, node_drain_teardown, ): - change_noobaa_lifecycle_interval(interval=2) + """ + Test object expiration feature when there are some sort of disruption to the noobaa + like node drain, node restart, nb db recovery etc + + """ + change_noobaa_lifecycle_interval(interval=2) expiration_days = 1 expire_rule = { "Rules": [ @@ -242,7 +247,7 @@ def check_if_objects_expired(mcg_obj, bucket_name, prefix=""): ) logger.info( - "Objects with prefix 'others' should expire but not with the prefix 'new'" + "Objects with prefix 'others' should expire but not with the prefix 'perm'" ) for bucket in buckets_with_prefix: sampler = TimeoutSampler( @@ -268,7 +273,7 @@ def check_if_objects_expired(mcg_obj, bucket_name, prefix=""): func=check_if_objects_expired, mcg_obj=mcg_obj, bucket_name=bucket.name, - prefix="new", + prefix="perm", ) if sampler.wait_for_func_status(result=False): logger.info( @@ -332,3 +337,10 @@ def check_if_objects_expired(mcg_obj, bucket_name, prefix=""): self.sanity_helpers.health_check(tries=120) sample_if_objects_expired() + + def test_sample(self, setup_mcg_bg_features, validate_mcg_bg_feature): + mcg_sys_dict, kafka_rgw_dict = setup_mcg_bg_features() + import time + + time.sleep(60) + validate_mcg_bg_feature(mcg_sys_dict, kafka_rgw_dict)