Skip to content

Commit

Permalink
parent 3752bf0
Browse files Browse the repository at this point in the history
author Mahesh Shetty <[email protected]> 1694420550 +0530
committer Mahesh Shetty <[email protected]> 1698310664 +0530

fix #8310

Signed-off-by: Mahesh Shetty <[email protected]>
  • Loading branch information
mashetty330 committed Oct 26, 2023
1 parent 3752bf0 commit 17caf48
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 51 deletions.
5 changes: 3 additions & 2 deletions ocs_ci/ocs/bucket_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1920,12 +1920,13 @@ def create_aws_bs_using_cli(
)


def upload_bulk_buckets(s3_obj, buckets, object_key="obj-key-0", prefix=None):
def upload_bulk_buckets(s3_obj, buckets, amount=1, object_key="obj-key-0", prefix=None):
"""
Upload object to the buckets
"""
for bucket in buckets:
s3_put_object(s3_obj, bucket.name, f"{prefix}/{object_key}", object_key)
for i in range(amount):
s3_put_object(s3_obj, bucket.name, f"{prefix}/{object_key}-{i}", object_key)


def change_expiration_query_interval(new_interval):
Expand Down
101 changes: 65 additions & 36 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
import re
import time
import logging

import botocore.exceptions as botoexeptions
import boto3
import pytest

Expand All @@ -17,6 +17,7 @@
import copy

from ocs_ci.utility import version
from ocs_ci.utility.retry import retry
from ocs_ci.framework import config
from ocs_ci.helpers.e2e_helpers import (
create_muliple_types_provider_obcs,
Expand All @@ -36,8 +37,8 @@
sync_object_directory,
wait_for_cache,
write_random_test_objects_to_bucket,
s3_list_objects_v1,
retrieve_verification_mode,
s3_list_objects_v2,
)

from ocs_ci.ocs.benchmark_operator_fio import BenchmarkOperatorFIO
Expand All @@ -63,6 +64,7 @@

from ocs_ci.utility.utils import clone_notify
from ocs_ci.ocs.resources.rgw import RGW

from ocs_ci.utility.utils import clone_notify, exec_cmd, run_cmd

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -162,11 +164,14 @@ def factory(
"noobaa-endpoints",
]

if version.get_semantic_ocs_version_from_config() < version.VERSION_4_14:
if (
version.get_semantic_ocs_version_from_config() >= version.VERSION_4_14
and not config.DEPLOYMENT.get("kms_deployment")
):
secrets.extend(
["noobaa-root-master-key-backend", "noobaa-root-master-key-volume"]
)
else:
elif not config.DEPLOYMENT.get("kms_deployment"):
secrets.append("noobaa-root-master-key")

secrets_yaml = [
Expand Down Expand Up @@ -262,15 +267,27 @@ def factory(
logger.info("Restarted noobaa-db pod!")

# Make sure the testloss bucket doesn't exists and test bucket consists all the data
time.sleep(10)
try:
s3_list_objects_v1(s3_obj=mcg_obj_session, bucketname=testloss_bucket.name)
except Exception as e:
logger.info(e)
@retry(Exception, tries=10, delay=5)
def check_for_buckets_content(bucket):
try:
response = s3_list_objects_v2(
s3_obj=mcg_obj_session, bucketname=bucket.name
)
logger.info(response)
return response
except Exception as err:
if "The specified bucket does not exist" in err.args[0]:
return err.args[0]
else:
raise

logger.info(
s3_list_objects_v1(s3_obj=mcg_obj_session, bucketname=test_bucket.name)
)
assert "The specified bucket does not exist" in check_for_buckets_content(
testloss_bucket
), "Test loss bucket exists even though it shouldn't be present in the recovered db"

assert (
check_for_buckets_content(test_bucket)["KeyCount"] == 1
), "test bucket doesnt consists of data post db recovery"

def finalizer():

Expand Down Expand Up @@ -979,7 +996,6 @@ def factory():
bucketname = rgw_bucket_factory(amount=1, interface="RGW-OC")[0].name

# get RGW credentials

rgw_endpoint, access_key, secret_key = rgw_obj.get_credentials()

# clone notify repo
Expand Down Expand Up @@ -1151,17 +1167,25 @@ def factory(

def validate_kafka_rgw_notifications(kafka_rgw_dict):

s3_client = kafka_rgw_dict["s3_client"]
bucketname = kafka_rgw_dict["bucketname"]
s3_client = kafka_rgw_dict["s3client"]
bucketname = kafka_rgw_dict["kafka_rgw_bucket"]
notify_cmd = kafka_rgw_dict["notify_cmd"]
data = kafka_rgw_dict["data"]
kafkadrop_host = kafka_rgw_dict["kafkadrop_host"]
kafka_topic = kafka_rgw_dict["kafka_topic"]

# Put objects to bucket
assert s3_client.put_object(
Bucket=bucketname, Key="key-1", Body=data
), "Failed: Put object: key-1"

# @retry(botoexeptions.ClientError, tries=5, delay=5)
try:

def put_object_to_bucket(bucket_name, key, body):
return s3_client.put_object(Bucket=bucket_name, Key=key, Body=body)

except botoexeptions.ClientError:
logger.warning("s3 put object timedout but ignoring as of now")

assert put_object_to_bucket(bucketname, "key-1", data), "Failed: Put object: key-1"
exec_cmd(notify_cmd)

# Validate rgw logs notification are sent
Expand All @@ -1174,29 +1198,33 @@ def validate_kafka_rgw_notifications(kafka_rgw_dict):
f"Validate {pattern} found on rgw logs and also "
f"rgw bucket notification is working correctly"
)
assert s3_client.put_object(
Bucket=bucketname, Key="key-2", Body=data
), "Failed: Put object: key-2"
assert put_object_to_bucket(bucketname, "key-2", data), "Failed: Put object: key-2"
exec_cmd(notify_cmd)

# Validate message are received Kafka side using curl command
# A temporary way to check from Kafka side, need to check from UI
curl_command = (
f"curl -X GET {kafkadrop_host}/topic/{kafka_topic.name} "
"-H 'content-type: application/vnd.kafka.json.v2+json'"
)
json_output = run_cmd(cmd=curl_command)
new_string = json_output.split()
messages = new_string[new_string.index("messages</td>") + 1]
if messages.find("1") == -1:
raise Exception(
"Error: Messages are not recieved from Kafka side."
"RGW bucket notification is not working as expected."
@retry(Exception, tries=5, delay=5)
def validate_kafa_for_message():
curl_command = (
f"curl -X GET {kafkadrop_host}/topic/{kafka_topic.name} "
"-H 'content-type: application/vnd.kafka.json.v2+json'"
)
json_output = run_cmd(cmd=curl_command)
# logger.info("Json output:" f"{json_output}")
new_string = json_output.split()
messages = new_string[new_string.index("messages</td>") + 1]
logger.info("Messages:" + str(messages))
if messages.find("1") == -1:
raise Exception(
"Error: Messages are not recieved from Kafka side."
"RGW bucket notification is not working as expected."
)

validate_kafa_for_message()


@pytest.fixture()
def setup_mcg_bg_features(setup_mcg_system, setup_kafka_rgw):
def setup_mcg_bg_features(setup_mcg_system):
"""
This fixture helps to setup various noobaa feature buckets
* MCG bucket replication
Expand All @@ -1214,9 +1242,10 @@ def factory(bucket_amount=1, object_amount=1):
mcg_sys_dict = setup_mcg_system(
bucket_amount=bucket_amount, object_amount=object_amount
)
kafka_rgw_dict = setup_kafka_rgw()
logger.info("NONE")
# kafka_rgw_dict = setup_kafka_rgw()

return mcg_sys_dict, kafka_rgw_dict
return mcg_sys_dict, None

return factory

Expand Down Expand Up @@ -1399,7 +1428,7 @@ def factory(
def validate_mcg_bg_feature(verify_mcg_system_recovery):
def factory(mcg_sys_dict, kafka_rgw_dict):
verify_mcg_system_recovery(mcg_sys_dict)
validate_kafka_rgw_notifications(kafka_rgw_dict)
# validate_kafka_rgw_notifications(kafka_rgw_dict)

return factory

Expand Down
37 changes: 26 additions & 11 deletions tests/e2e/system_test/test_object_expiration.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def test_object_expiration_with_disruptions(
awscli_pod_session,
nodes,
snapshot_factory,
setup_mcg_bg_features,
validate_mcg_bg_feature,
noobaa_db_backup_and_recovery,
noobaa_db_backup_and_recovery_locally,
change_noobaa_lifecycle_interval,
Expand Down Expand Up @@ -161,8 +163,10 @@ def test_object_expiration_with_disruptions(
}

expire_rule_prefix = deepcopy(expire_rule)
number_of_buckets = 2
# Enable entry criteria
number_of_buckets = 50

# Entry criteria
mcg_sys_dict, kafka_rgw_dict = setup_mcg_bg_features()

# Create bulk buckets with expiry rule and no prefix set
logger.info(
Expand All @@ -185,12 +189,20 @@ def test_object_expiration_with_disruptions(
type_of_bucket=["data"],
)

from botocore.exceptions import ClientError
from ocs_ci.utility.retry import retry

@retry(ClientError, tries=5, delay=5)
def upload_objects_and_expire():

# upload objects with prefix 'tmp'
logger.info("Uploading objects with prefix 'tmp'")
upload_bulk_buckets(
mcg_obj, buckets_without_prefix, object_key="tmp-obj", prefix="tmp"
mcg_obj,
buckets_without_prefix,
amount=50,
object_key="tmp-obj",
prefix="tmp",
)

# Manually expire objects in bucket
Expand All @@ -201,13 +213,21 @@ def upload_objects_and_expire():
# Upload objects with same prefix 'others'
logger.info("upload objects under 'others' prefix")
upload_bulk_buckets(
mcg_obj, buckets_with_prefix, object_key="other-obj", prefix="others"
mcg_obj,
buckets_with_prefix,
amount=50,
object_key="other-obj",
prefix="others",
)

# Upload objects with different prefix 'perm'
logger.info("upload objects under 'perm' prefix")
upload_bulk_buckets(
mcg_obj, buckets_with_prefix, object_key="perm-obj", prefix="perm"
mcg_obj,
buckets_with_prefix,
amount=50,
object_key="perm-obj",
prefix="perm",
)

# Manually expire objects in bucket
Expand Down Expand Up @@ -334,13 +354,8 @@ def check_if_objects_expired(mcg_obj, bucket_name, prefix=""):
# Perform noobaa db recovery locally
noobaa_db_backup_and_recovery_locally()
wait_for_storage_pods()
self.sanity_helpers.health_check(tries=120)

sample_if_objects_expired()

def test_sample(self, setup_mcg_bg_features, validate_mcg_bg_feature):
mcg_sys_dict, kafka_rgw_dict = setup_mcg_bg_features()
import time

time.sleep(60)
# validate mcg entry criteria post test
validate_mcg_bg_feature(mcg_sys_dict, kafka_rgw_dict)
4 changes: 2 additions & 2 deletions tests/e2e/workloads/app/amq/test_rgw_kafka_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def teardown():
request.addfinalizer(teardown)
return self.amq

def test_rgw_kafka_notifications(self, bucket_factory):
def test_rgw_kafka_notifications(self, rgw_bucket_factory):
"""
Test to verify rgw kafka notifications
Expand All @@ -92,7 +92,7 @@ def test_rgw_kafka_notifications(self, bucket_factory):
kafkadrop_host = self.kafkadrop_route.get().get("spec").get("host")

# Create bucket
bucketname = bucket_factory(amount=1, interface="RGW-OC")[0].name
bucketname = rgw_bucket_factory(amount=1, interface="RGW-OC")[0].name

# Get RGW credentials
rgw_obj = RGW()
Expand Down

0 comments on commit 17caf48

Please sign in to comment.