diff --git a/ocs_ci/ocs/amq.py b/ocs_ci/ocs/amq.py index 155478fb178..44250f0d1c0 100644 --- a/ocs_ci/ocs/amq.py +++ b/ocs_ci/ocs/amq.py @@ -161,9 +161,10 @@ def setup_amq_cluster_operator(self, namespace=constants.AMQ_NAMESPACE): log.warn( "Some amq leftovers are present, please cleanup the cluster" ) - pytest.skip( - "AMQ leftovers are present needs to cleanup the cluster" - ) + # TODO: Check why this skip was here in the first place + # pytest.skip( + # "AMQ leftovers are present needs to cleanup the cluster" + # ) time.sleep(30) # Check strimzi-cluster-operator pod created if self.is_amq_pod_running(pod_pattern="cluster-operator", expected_pods=1): diff --git a/ocs_ci/ocs/constants.py b/ocs_ci/ocs/constants.py index a419766b34f..ac533947709 100644 --- a/ocs_ci/ocs/constants.py +++ b/ocs_ci/ocs/constants.py @@ -438,6 +438,7 @@ DEFAULT_NOOBAA_BACKINGSTORE = "noobaa-default-backing-store" DEFAULT_NOOBAA_BUCKETCLASS = "noobaa-default-bucket-class" DEFAULT_MCG_BUCKET_LOGS_PVC = "noobaa-bucket-logging-pvc" +DEFAULT_MCG_BUCKET_NOTIFS_PVC = "noobaa-bucket-notifications-pvc" CUSTOM_MCG_LABEL = "custom=mcg-label" NOOBAA_RESOURCE_NAME = "noobaa" NOOBAA_DB_PVC_NAME = "db-noobaa-db-pg-0" @@ -619,7 +620,9 @@ NOOBAA_OPERATOR_DEPLOYMENT = "noobaa-operator" NOOBAA_ENDPOINT_DEPLOYMENT = "noobaa-endpoint" NOOBAA_DB_STATEFULSET = "noobaa-db-pg" +NOOBAA_DB_POD = "noobaa-db-pg-0" NOOBAA_CORE_STATEFULSET = "noobaa-core" +NOOBAA_CORE_POD = "noobaa-core-0" # Noobaa db secret NOOBAA_DB_SECRET = "noobaa-db" diff --git a/ocs_ci/ocs/resources/bucket_notifications_manager.py b/ocs_ci/ocs/resources/bucket_notifications_manager.py new file mode 100644 index 00000000000..9fad088d362 --- /dev/null +++ b/ocs_ci/ocs/resources/bucket_notifications_manager.py @@ -0,0 +1,256 @@ +import json +import logging +import tempfile + +from ocs_ci.framework import config +from ocs_ci.helpers.helpers import create_unique_resource_name, default_storage_class +from ocs_ci.ocs import constants +from ocs_ci.ocs.ocp import OCP +from ocs_ci.ocs.amq import AMQ +from ocs_ci.ocs.exceptions import CommandFailed +from ocs_ci.ocs.resources.pod import wait_for_pods_to_be_running + +logger = logging.getLogger(__name__) + +NOTIFS_YAML_PATH_NB_CR = "/spec/bucketNotifications" + + +class BucketNotificationsManager: + """ + A class to manage the MCG bucket notifications feature + """ + + @property + def nb_config_resource(self): + """ + Return the NooBaa configuration resource + Note that this might change in the future. + + Returns: + ocs_ci.ocs.ocp.OCP: OCP instance of the NooBaa configuration resource + """ + return OCP( + kind="noobaa", + namespace=config.ENV_DATA["cluster_namespace"], + resource_name="noobaa", + ) + + def __init__(self): + self.amq = AMQ() + self.kafka_topics = [] + self.conn_secrets = [] + self.cur_logs_pvc = constants.DEFAULT_MCG_BUCKET_NOTIFS_PVC + self.kafkadrop_pod = self.kafkadrop_svc = self.kafkadrop_route = None + + def setup_kafka(self): + """ + TODO + """ + # Get sc + sc = default_storage_class(interface_type=constants.CEPHBLOCKPOOL) + + # Deploy amq cluster + self.amq.setup_amq_cluster(sc.name) + + # Create Kafkadrop pod + ( + self.kafkadrop_pod, + self.kafkadrop_pod, + self.kafkadrop_route, + ) = self.amq.create_kafkadrop() + + def cleanup_kafka(self): + for topic in self.kafka_topics: + topic.delete() + self.kafka_topics = [] + if self.kafkadrop_pod: + self.kafkadrop_pod.delete() + if self.kafkadrop_route: + self.kafkadrop_route.delete() + + self.amq.cleanup() + + def enable_bucket_notifs_on_cr(self, notifs_pvc=None): + """ + Set the bucket notifications feature on the NooBaa CR + + Args: + notifs_pvc(str|optional): Name of a provided PVC for MCG to use for + intermediate logging of the events. + Note: + If not provided, a PVC will be automatically be created + by MCG when first enabling the feature. + """ + logger.info("Enabling bucket notifications on the NooBaa CR") + + # Build a patch command to enable guaranteed bucket logs + bucket_notifs_dict = {"connections": [], "enabled": True} + + # Add the bucketLoggingPVC field if provided + if notifs_pvc: + bucket_notifs_dict["pvc"] = notifs_pvc + + patch_params = [ + { + "op": "add", + "path": NOTIFS_YAML_PATH_NB_CR, + "value": bucket_notifs_dict, + } + ] + + # Try patching via add, and if it fails - replace instead + try: + self.nb_config_resource.patch( + params=json.dumps(patch_params), + format_type="json", + ) + except CommandFailed as e: + if "already exists" in str(e).lower(): + patch_params[0]["op"] = "replace" + self.nb_config_resource.patch( + params=json.dumps(patch_params), + format_type="json", + ) + else: + logger.error(f"Failed to enable bucket notifications: {e}") + raise e + + self.cur_logs_pvc = ( + notifs_pvc if notifs_pvc else constants.DEFAULT_MCG_BUCKET_NOTIFS_PVC + ) + + wait_for_pods_to_be_running( + pod_names=[constants.NOOBAA_CORE_POD], + timeout=60, + sleep=10, + ) + + logger.info("Guaranteed bucket logs have been enabled") + + def disable_bucket_logging_on_cr(self): + """ + Unset the bucket notifications feature on the NooBaa CR + """ + logger.info("Disabling bucket notifications on the NooBaa CR") + + try: + patch_params = [ + { + "op": "replace", + "path": NOTIFS_YAML_PATH_NB_CR, + "value": None, + }, + ] + self.nb_config_resource.patch( + params=json.dumps(patch_params), + format_type="json", + ) + + except CommandFailed as e: + if "not found" in str(e): + logger.info("The bucketNotifications field was not found") + else: + logger.error(f"Failed to disable bucket notifications: {e}") + raise e + + wait_for_pods_to_be_running( + pod_names=[constants.NOOBAA_CORE_POD], + timeout=60, + sleep=10, + ) + + logger.info("Bucket notifications have been disabled") + + def add_new_notif_conn(self, name=""): + """ + 1. Create a Kafka topic + 2. Create a secret with the Kafka connection details + 3. Add the connection to the NooBaa CR + """ + topic_name = name or create_unique_resource_name( + resource_description="nb-notif", resource_type="kafka-topic" + ) + topic_obj = self.amq.create_kafka_topic(topic_name) + self.kafka_topics.append(topic_obj) + secret, conn_file_name = self.create_kafka_connection_secret(topic_name) + self.add_notif_conn_to_noobaa_cr(secret) + + return conn_file_name + + def create_kafka_connection_secret(self, topic): + """ + TODO + """ + namespace = config.ENV_DATA["cluster_namespace"] + conn_name = create_unique_resource_name(resource_type="kafka-conn") + secret_name = conn_name + "-secret" + file_name = "" + + kafka_conn_config = { + "metadata.broker.list": "my-cluster-kafka-bootstrap.myproject.svc.cluster.local:9092", + "notification_protocol": "kafka", + "topic": topic, + "name": conn_name, + } + + with tempfile.NamedTemporaryFile( + mode="w+", prefix="kafka_conn_", suffix=".kv", delete=True + ) as conn_file: + file_name = conn_file.name + for key, value in kafka_conn_config.items(): + conn_file.write(f"{key}={value}\n") + + OCP().exec_oc_cmd( + f"create secret generic {secret_name} --from-file={conn_file.name} -n {namespace}" + ) + + secret_ocp_obj = OCP( + kind="secret", + namespace=namespace, + resource_name=secret_name, + ) + self.conn_secrets.append(secret_ocp_obj) + + return secret_ocp_obj, file_name + + def add_notif_conn_to_noobaa_cr(self, secret): + """ + TODO + """ + nb_ocp_obj = OCP( + kind="noobaa", + namespace=config.ENV_DATA["cluster_namespace"], + resource_name="noobaa", + ) + conn_data = { + "name": secret.name, + "namespace": secret.namespace, + } + patch_path = "/spec/bucketNotification/connections" + add_op = [{"op": "add", "path": f"{patch_path}/-", "value": conn_data}] + nb_ocp_obj.patch( + resource_name=constants.NOOBAA_RESOURCE_NAME, + params=json.dumps(add_op), + format_type="json", + ) + + wait_for_pods_to_be_running( + pod_names=[constants.NOOBAA_CORE_POD], + timeout=60, + sleep=10, + ) + + def get_events(self, topic): + """ + TODO + """ + pass + + def cleanup(self): + """ + TODO + """ + self.disable_bucket_logging_on_cr() + for secret in self.conn_secrets: + secret.delete() + self.cleanup_kafka() diff --git a/tests/functional/object/mcg/test_bucket_notifications.py b/tests/functional/object/mcg/test_bucket_notifications.py new file mode 100644 index 00000000000..36f5f2d3bd9 --- /dev/null +++ b/tests/functional/object/mcg/test_bucket_notifications.py @@ -0,0 +1,114 @@ +import pytest + +from ocs_ci.framework import config +from ocs_ci.framework.testlib import ( + MCGTest, + mcg, + polarion_id, + red_squad, + skipif_mcg_only, + tier1, +) +from ocs_ci.ocs import constants +from ocs_ci.ocs.bucket_utils import write_random_test_objects_to_bucket +from ocs_ci.ocs.ocp import OCP +from ocs_ci.ocs.resources.bucket_notifications_manager import BucketNotificationsManager +from ocs_ci.utility.utils import TimeoutSampler + + +@mcg +@red_squad +class TestBucketNotifications(MCGTest): + """ + Test the MCG bucket notifications feature + """ + + @pytest.fixture(autouse=True, scope="class") + def notif_manager(self, request): + """ + TODO + """ + notif_manager = BucketNotificationsManager() + request.addfinalizer(notif_manager.cleanup) + + notif_manager.setup_kafka() + return notif_manager + + @tier1 + @pytest.mark.parametrize( + argnames=["use_provided_notifs_pvc"], + argvalues=[ + # pytest.param(False, marks=[polarion_id("OCS-6242"), bugzilla("2302842")]), + pytest.param( + True, + marks=[polarion_id("OCS-6243"), skipif_mcg_only], + ), + ], + ids=[ + # "default-logs-pvc", + "provided-logs-pvc", + ], + ) + def test_bucket_notifications( + self, + mcg_obj, + awscli_pod, + bucket_factory, + pvc_factory, + test_directory_setup, + notif_manager, + use_provided_notifs_pvc, + ): + """ + Test the MCG bucket notifications feature + # TODO + """ + # Enable bucket notifications on the NooBaa CR + provided_notifs_pvc = None + if use_provided_notifs_pvc: + clstr_proj_obj = OCP(namespace=config.ENV_DATA["cluster_namespace"]) + provided_notifs_pvc = pvc_factory( + interface=constants.CEPHFILESYSTEM, + project=clstr_proj_obj, + size=20, + access_mode=constants.ACCESS_MODE_RWX, + ) + notif_manager.enable_bucket_notifs_on_cr(notifs_pvc=provided_notifs_pvc) + else: + notif_manager.enable_bucket_notifs_on_cr() + + # Add a Kafka notif connection to the NooBaa CR + topic = notif_manager.add_new_notif_conn() + + # Create a bucket and configure bucket notifs on it using the new connection + bucket = bucket_factory()[0].name + notif_manager.put_bucket_notification( + events=["s3:ObjectCreated:*"], + topic=topic, + ) + + # Verify the bucket notification configuration was set correctly + resp = notif_manager.get_bucket_notification() + assert resp["TopicConfigurations"]["Topic"] == topic + + # Verify that uploads are generating the expected events in Kafka + obj_keys = write_random_test_objects_to_bucket( + io_pod=awscli_pod, + bucket_to_write=bucket, + file_dir=test_directory_setup.origin_dir, + amount=20, + mcg_obj=mcg_obj, + ) + + # Wait for Kafka to process the notifications + for events in TimeoutSampler( + timeout=300, + sleep=5, + func=notif_manager.get_events, + topic=topic, + ): + # TODO compare obj_keys with the received events + for obj in obj_keys: + assert obj in events + + pass