Skip to content

Commit

Permalink
fixture to setup mcg background features
Browse files Browse the repository at this point in the history
Signed-off-by: Mahesh Shetty <[email protected]>
  • Loading branch information
mashetty330 committed Oct 26, 2023
1 parent 64a234c commit 3752bf0
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 139 deletions.
276 changes: 272 additions & 4 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
import time
import logging

Expand All @@ -8,6 +9,14 @@

from concurrent.futures import ThreadPoolExecutor
from threading import Event


import boto3
import pytest
import random
import copy

from ocs_ci.utility import version
from ocs_ci.framework import config
from ocs_ci.helpers.e2e_helpers import (
create_muliple_types_provider_obcs,
Expand Down Expand Up @@ -39,6 +48,9 @@
from ocs_ci.ocs.resources.pod import (
Pod,
get_pods_having_label,
get_rgw_pods,
get_pod_logs,

)
from ocs_ci.ocs.resources.deployment import Deployment
from ocs_ci.ocs.exceptions import CommandFailed
Expand All @@ -48,7 +60,10 @@
validate_pv_delete,
default_storage_class,
)

from ocs_ci.utility.utils import clone_notify
from ocs_ci.ocs.resources.rgw import RGW
from ocs_ci.utility.utils import clone_notify, exec_cmd, run_cmd

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -140,13 +155,20 @@ def factory(
kind="secret", namespace=config.ENV_DATA["cluster_namespace"]
)
secrets = [
"noobaa-root-master-key",
"noobaa-admin",
"noobaa-operator",
"noobaa-db",
"noobaa-server",
"noobaa-endpoints",
]

if version.get_semantic_ocs_version_from_config() < version.VERSION_4_14:
secrets.extend(
["noobaa-root-master-key-backend", "noobaa-root-master-key-volume"]
)
else:
secrets.append("noobaa-root-master-key")

secrets_yaml = [
ocp_secret_obj.get(resource_name=f"{secret}") for secret in secrets
]
Expand Down Expand Up @@ -589,7 +611,12 @@ def mcg_system_setup(bucket_amount=5, object_amount=10):
pattern=cache_pattern,
mcg_obj=mcg_obj_session,
)
wait_for_cache(mcg_obj_session, cache_bucket.name, objs_written_to_cache_bucket)
wait_for_cache(
mcg_obj_session,
cache_bucket.name,
objs_written_to_cache_bucket,
timeout=600,
)
# Write a random, larger object directly to the underlying storage of the bucket
write_random_test_objects_to_bucket(
awscli_pod_session,
Expand Down Expand Up @@ -912,6 +939,7 @@ def setup_rgw_kafka_notification(request, rgw_bucket_factory, rgw_obj):
notification feature
"""

# setup AMQ
amq = AMQ()

Expand All @@ -934,6 +962,7 @@ def setup_rgw_kafka_notification(request, rgw_bucket_factory, rgw_obj):
) = amq.create_kafkadrop()

def factory():

"""
Factory function implementing the fixture
Expand All @@ -942,13 +971,15 @@ def factory():
kafka_topic, kafkadrop_host objects etc
"""

# get the kafkadrop route
kafkadrop_host = kafkadrop_route.get().get("spec").get("host")

# create the bucket
bucketname = rgw_bucket_factory(amount=1, interface="RGW-OC")[0].name

# get RGW credentials

rgw_endpoint, access_key, secret_key = rgw_obj.get_credentials()

# clone notify repo
Expand Down Expand Up @@ -982,6 +1013,8 @@ def factory():
"kafka_topic": kafka_topic,
}

validate_kafka_rgw_notifications(kafka_rgw_dict)

return kafka_rgw_dict

def finalizer():
Expand Down Expand Up @@ -1116,6 +1149,75 @@ def factory(

return event, futures_obj

def validate_kafka_rgw_notifications(kafka_rgw_dict):

s3_client = kafka_rgw_dict["s3_client"]
bucketname = kafka_rgw_dict["bucketname"]
notify_cmd = kafka_rgw_dict["notify_cmd"]
data = kafka_rgw_dict["data"]
kafkadrop_host = kafka_rgw_dict["kafkadrop_host"]
kafka_topic = kafka_rgw_dict["kafka_topic"]

# Put objects to bucket
assert s3_client.put_object(
Bucket=bucketname, Key="key-1", Body=data
), "Failed: Put object: key-1"
exec_cmd(notify_cmd)

# Validate rgw logs notification are sent
# No errors are seen
pattern = "ERROR: failed to create push endpoint"
rgw_pod_obj = get_rgw_pods()
rgw_log = get_pod_logs(pod_name=rgw_pod_obj[0].name, container="rgw")
assert re.search(pattern=pattern, string=rgw_log) is None, (
f"Error: {pattern} msg found in the rgw logs."
f"Validate {pattern} found on rgw logs and also "
f"rgw bucket notification is working correctly"
)
assert s3_client.put_object(
Bucket=bucketname, Key="key-2", Body=data
), "Failed: Put object: key-2"
exec_cmd(notify_cmd)

# Validate message are received Kafka side using curl command
# A temporary way to check from Kafka side, need to check from UI
curl_command = (
f"curl -X GET {kafkadrop_host}/topic/{kafka_topic.name} "
"-H 'content-type: application/vnd.kafka.json.v2+json'"
)
json_output = run_cmd(cmd=curl_command)
new_string = json_output.split()
messages = new_string[new_string.index("messages</td>") + 1]
if messages.find("1") == -1:
raise Exception(
"Error: Messages are not recieved from Kafka side."
"RGW bucket notification is not working as expected."
)


@pytest.fixture()
def setup_mcg_bg_features(setup_mcg_system, setup_kafka_rgw):
"""
This fixture helps to setup various noobaa feature buckets
* MCG bucket replication
* Noobaa caching
* NSFS bucket
* RGW kafka notification
perform basic s3 ops on the buckets
Returns:
Dict: Dictionary representing mapping between feature and related
buckets
"""

def factory(bucket_amount=1, object_amount=1):
mcg_sys_dict = setup_mcg_system(
bucket_amount=bucket_amount, object_amount=object_amount
)
kafka_rgw_dict = setup_kafka_rgw()

return mcg_sys_dict, kafka_rgw_dict

return factory


Expand Down Expand Up @@ -1293,8 +1395,174 @@ def factory(

return factory

def multi_obc_setup_factory(request, bucket_factory, mcg_obj):

from tests.e2e.helpers import multi_obc_factory
def validate_mcg_bg_feature(verify_mcg_system_recovery):
def factory(mcg_sys_dict, kafka_rgw_dict):
verify_mcg_system_recovery(mcg_sys_dict)
validate_kafka_rgw_notifications(kafka_rgw_dict)

return factory


@pytest.fixture()
def multi_obc_setup_factory(request, bucket_factory, mcg_obj):
"""
Fixture for multi obc factory
"""
return multi_obc_factory(bucket_factory, mcg_obj)


def multi_obc_factory(bucket_factory, mcg_obj):
"""
This function helps to create different types of
buckets backed by different providers
"""

def create_obcs(num_obcs=50, type_of_bucket=None, expiration_rule=None):
"""
This helps to create buckets in bulk, apply expiration rule if any
Args:
num_obcs (int): number of OBCs
type_of_bucket (list): List representing type fo the buckets
can have values ['data', 'cache', 'namespace']
expiration_rule (dict): Dictionary representing the object
expiration rule
Returns:
List: List of bucket objects
"""

def get_all_combinations_map(providers, bucket_types):
"""
Args:
providers (dict): dictionary representing cloud
providers and the respective config
bucket_types (dict): dictionary representing different
types of bucket and the respective config
Returns:
List: containing all the possible combination of buckets
"""
all_combinations = dict()

for provider, provider_config in providers.items():
for bucket_type, type_config in bucket_types.items():
if provider == "pv" and bucket_type != "data":
provider = random.choice(["aws", "azure"])
provider_config = providers[provider]
bucketclass = copy.deepcopy(type_config)

if "backingstore_dict" in bucketclass.keys():
bucketclass["backingstore_dict"][provider] = [provider_config]
elif "namespace_policy_dict" in bucketclass.keys():
bucketclass["namespace_policy_dict"]["namespacestore_dict"][
provider
] = [provider_config]
all_combinations.update({f"{bucket_type}-{provider}": bucketclass})
return all_combinations

cloud_providers = {
"aws": (1, "eu-central-1"),
"azure": (1, None),
"pv": (
1,
constants.MIN_PV_BACKINGSTORE_SIZE_IN_GB,
"ocs-storagecluster-ceph-rbd",
),
}

bucket_types = {
"data": {
"interface": "OC",
"backingstore_dict": {},
},
"namespace": {
"interface": "OC",
"namespace_policy_dict": {
"type": "Single",
"namespacestore_dict": {},
},
},
"cache": {
"interface": "OC",
"namespace_policy_dict": {
"type": "Cache",
"ttl": 300000,
"namespacestore_dict": {},
},
"placement_policy": {
"tiers": [
{"backingStores": [constants.DEFAULT_NOOBAA_BACKINGSTORE]}
]
},
},
}
to_remove = list()
if isinstance(type_of_bucket, list):
if set(type_of_bucket).issubset(set(list(bucket_types.keys()))):
for type in bucket_types.keys():
if type not in type_of_bucket:
to_remove.append(type)
else:
logger.error(
"Invalid bucket types, only possible types are: data, cache, namespace"
)
elif type_of_bucket is not None:
logger.error(
"Invalid argument type for 'type_of_bucket': It should be list type"
)

for i in range(len(to_remove)):
del bucket_types[to_remove[i]]

all_combination_of_obcs = get_all_combinations_map(
cloud_providers, bucket_types
)
buckets = list()
buckets_created = dict()
num_of_buckets_each = num_obcs // len(all_combination_of_obcs.keys())
buckets_left = num_obcs % len(all_combination_of_obcs.keys())
if num_of_buckets_each != 0:
for combo, combo_config in all_combination_of_obcs.items():
buckets.extend(
bucket_factory(
interface="OC",
amount=num_of_buckets_each,
bucketclass=combo_config,
)
)
buckets_created.update({combo: num_of_buckets_each})

for i in range(0, buckets_left):
buckets.extend(
bucket_factory(
interface="OC",
amount=1,
bucketclass=all_combination_of_obcs[
list(all_combination_of_obcs.keys())[i]
],
)
)
buckets_created.update(
{
list(all_combination_of_obcs.keys())[i]: (
buckets_created[list(all_combination_of_obcs.keys())[i]]
if len(buckets) >= len(all_combination_of_obcs.keys())
else 0
)
+ 1
}
)

for bucket in buckets:
mcg_obj.s3_client.put_bucket_lifecycle_configuration(
Bucket=bucket.name, LifecycleConfiguration=expiration_rule
)
logger.info("These are the buckets created:" f"{buckets_created}")
return buckets

return create_obcs
Loading

0 comments on commit 3752bf0

Please sign in to comment.