Skip to content

Commit

Permalink
Improve docstrings and implement delete objects steps
Browse files Browse the repository at this point in the history
Signed-off-by: Mahesh Shetty <[email protected]>
  • Loading branch information
Mahesh Shetty authored and Mahesh Shetty committed Dec 4, 2024
1 parent 1696ba8 commit 98e44ac
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 21 deletions.
71 changes: 63 additions & 8 deletions ocs_ci/helpers/mcg_stress_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
sync_object_directory,
rm_object_recursive,
s3_list_objects_v2,
list_objects_in_batches,
s3_delete_objects,
)

logger = logging.getLogger(__name__)
Expand All @@ -25,9 +27,12 @@ def upload_objs_to_buckets(mcg_obj, pod_obj, buckets, iteration_no, event=None):
mcg_obj (MCG): MCG object
pod_obj (Pod): Pod object
buckets (Dict): Map of bucket type and bucket object
iteration_no (int): Integer value representing iteration
event (threading.Event()): Event object to signal the execution
completion
"""
src_path = "/complex_directory/dir_0_0/dir_1_0/dir_2_0/dir_3_0/"
src_path = "/complex_directory/"

logger.info(f"Uploading objects to all the buckets under prefix {iteration_no}")
try:
Expand All @@ -47,7 +52,7 @@ def upload_objs_to_buckets(mcg_obj, pod_obj, buckets, iteration_no, event=None):
src_path,
f"s3://{bucket.name}/{iteration_no}/",
s3_obj,
timeout=1200,
timeout=20000,
)
futures.append(future)

Expand All @@ -67,6 +72,19 @@ def upload_objs_to_buckets(mcg_obj, pod_obj, buckets, iteration_no, event=None):
def run_noobaa_metadata_intense_ops(
mcg_obj, pod_obj, bucket_factory, bucket, iteration_no, event=None
):
"""
Perfrom metdata intense operations to stress Noobaa
Args:
mcg_obj (MCG): MCG object
pod_obj (Pod): Noobaa stress CLI pod object
bucket_factory (fixture): Pytest fixture for creating bucket
bucket (tuple): Tuple consisting of backend storage type and bucket object
iteration_no (int): Iteration number or prefix from where should delete objects
event (threading.Event()): Event object to signal the execution
completion
"""
bucket_type, bucket_obj = bucket
bucket_name = bucket_obj.name

Expand Down Expand Up @@ -126,6 +144,7 @@ def _run_object_metadata_ops():
target=bucket_name,
prefix=iteration_no,
s3_obj=s3_obj,
timeout=6000,
recursive=True,
)
while True:
Expand Down Expand Up @@ -207,9 +226,11 @@ def delete_objs_from_bucket(pod_obj, bucket, iteration_no, event=None):
Args:
pod_obj (Pod): Noobaa stress CLI pod object
mcg_obj (MCG): MCG object
bucket_name (str): Name of the bucket
bucket (Tuple): Tuple consisting of backend storage type and bucket object
iteration_no (int): Iteration number or prefix from where should delete objects
event (threading.Event()): Event object to signal the execution
completion
"""
bucket_type, bucket_obj = bucket
bucket_name = bucket_obj.name
Expand All @@ -227,6 +248,7 @@ def delete_objs_from_bucket(pod_obj, bucket, iteration_no, event=None):
bucket_name,
mcg_obj,
prefix=iteration_no,
timeout=6000,
)
logger.info(
f"Successfully completed object deletion operation on bucket {bucket_name} under prefix {iteration_no}"
Expand All @@ -238,9 +260,11 @@ def list_objs_from_bucket(bucket, iteration_no, event=None):
List objects from bucket
Args:
mcg_obj (MCG): MCG object
bucket_name (str): Name of the bucket
bucket (Tuple): Tuple consisting of backend storage type and bucket object
iteration_no (int): Iteration number or prefix from where should list objects
event (threading.Event()): Event object to signal the execution
completion
"""
bucket_type, bucket_obj = bucket
bucket_name = bucket_obj.name
Expand Down Expand Up @@ -270,10 +294,12 @@ def download_objs_from_bucket(pod_obj, bucket, target_dir, iteration_no, event=N
Args:
pod_obj (Pod): Noobaa stress CLI pod object
mcg_obj (MCG): MCG object
bucket_name (str): Name of the bucket
bucket (Tuple): Tuple consisting of backend storage type and bucket object
target_dir (str): Target directory to download objects
iteration_no (int): Iteration number or prefix from where should download objects
event (threading.Event()): Event object to signal the execution
completion
"""
bucket_type, bucket_obj = bucket
bucket_name = bucket_obj.name
Expand All @@ -291,10 +317,39 @@ def download_objs_from_bucket(pod_obj, bucket, target_dir, iteration_no, event=N
f"s3://{bucket_name}/{iteration_no}",
target_dir,
mcg_obj,
timeout=6000,
)

if event.is_set():
logger.info(
f"Successfully completed object download operation on bucket {bucket_name} under prefix {iteration_no}"
)
break


def delete_objects_in_batches(bucket, batch_size):
"""
Delete objects from the bucket in batches
Args:
bucket (tuple): Tuple consisting of backend storage type and bucket object
batch_size (int): Number of objects to delete at a time
"""
bucket_type, bucket_obj = bucket
bucket_name = bucket_obj.name
if bucket_type.upper() == "RGW":
mcg_obj = OBC(bucket_name)
else:
mcg_obj = MCG()

logger.info(f"Deleting objects in bucket {bucket_name} in batches of {batch_size}")
total_objs_deleted = 0
for obj_batch in list_objects_in_batches(
mcg_obj, bucket_name, batch_size=batch_size, yield_individual=False
):
s3_delete_objects(mcg_obj, bucket_name, obj_batch)
total_objs_deleted += batch_size
logger.info(
f"Total objects deleted {total_objs_deleted} in bucket {bucket_name}"
)
3 changes: 2 additions & 1 deletion ocs_ci/ocs/bucket_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ def download_objects_using_s3cmd(
), "Failed to download objects"


def rm_object_recursive(podobj, target, mcg_obj, option="", prefix=None):
def rm_object_recursive(podobj, target, mcg_obj, option="", prefix=None, timeout=600):
"""
Remove bucket objects with --recursive option
Expand All @@ -606,6 +606,7 @@ def rm_object_recursive(podobj, target, mcg_obj, option="", prefix=None):
mcg_obj.access_key,
mcg_obj.s3_internal_endpoint,
],
timeout=timeout,
)


Expand Down
52 changes: 40 additions & 12 deletions tests/cross_functional/stress/test_noobaa_under_stress.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import random

from ocs_ci.framework.pytest_customization.marks import magenta_squad
from threading import Event
from concurrent.futures import ThreadPoolExecutor
from ocs_ci.helpers.mcg_stress_helper import (
Expand All @@ -9,11 +10,13 @@
delete_objs_from_bucket,
list_objs_from_bucket,
download_objs_from_bucket,
delete_objects_in_batches,
)

logger = logging.getLogger(__name__)


@magenta_squad
class TestNoobaaUnderStress:

base_setup_buckets = list()
Expand All @@ -27,34 +30,44 @@ def test_noobaa_under_stress(
stress_test_directory_setup,
bucket_factory,
):

# fetch buckets created for stress testing
"""
Stress Noobaa by performing bulk s3 operations. This consists mainly 3 stages
mentioned below
1. Base setup: Here we create the buckets of all possible types and then
load them with million objects in deep directory
2. S3 bulk operations: Here we perform various s3 operations such as list,
download, delete, metadata intense op etc concurrently on each of the bucket
respectively.
3. At the end delete objects from all the bucket in batches
"""

# Fetch buckets created for stress testing
self.base_setup_buckets = setup_stress_testing_bucket()

# upload objects to the buckets created concurrently
# Upload objects to the buckets created concurrently
upload_objs_to_buckets(
mcg_obj_session,
nb_stress_cli_pod,
self.base_setup_buckets,
iteration_no=0,
)

# iterate and stress the cluster with object upload
# Iterate and stress the cluster with object upload
# and other IO operations
total_iterations = 4
executor = ThreadPoolExecutor(max_workers=5)
futures_obj = list()
for i in range(1, total_iterations):
logger.info(f"Performing Iteration {i} of stressing the cluster")
# buckets = list(self.base_setup_buckets.keys())
buckets = [
(type, bucket) for type, bucket in self.base_setup_buckets.items()
]

# instantiate event object
# Instantiate event object
event = Event()

# perform object upload operation
# Perform object upload operation
# concurrently
futures_obj.append(
executor.submit(
Expand All @@ -67,7 +80,7 @@ def test_noobaa_under_stress(
)
)

# perform metadata intense operations
# Perform metadata intense operations
# on randomly selected bucket
bucket = random.choice(buckets)
futures_obj.append(
Expand All @@ -83,7 +96,7 @@ def test_noobaa_under_stress(
)
buckets.remove(bucket)

# perform object deletion on a
# Perform object deletion on a
# randomly selected bucket
bucket = random.choice(buckets)
futures_obj.append(
Expand All @@ -97,7 +110,7 @@ def test_noobaa_under_stress(
)
buckets.remove(bucket)

# perform object listing on a
# Perform object listing on a
# randomly selected bucket
bucket = random.choice(buckets)
futures_obj.append(
Expand All @@ -110,7 +123,7 @@ def test_noobaa_under_stress(
)
buckets.remove(bucket)

# perform object download on
# Perform object download on
# a randomly selected bucket
bucket = random.choice(buckets)
futures_obj.append(
Expand All @@ -128,9 +141,24 @@ def test_noobaa_under_stress(
f"rm -rf {stress_test_directory_setup.result_dir}/"
)

# wait until all the object operations are done
# Wait until all the object operations are done
logger.info(
"Waiting all the Object upload and IO operations for the current iteration is completed"
)
for future in futures_obj:
future.result()

# Delete all the objects from the bucket
# in batches of 20K objects at a time
buckets = [(type, bucket) for type, bucket in self.base_setup_buckets.items()]
with ThreadPoolExecutor() as executor:
futures = list()
for bucket in buckets:
future = executor.submit(
delete_objects_in_batches, bucket, batch_size=20000
)
futures.append(future)

logger.info("Waiting for all the delete object operations to complete")
for future in futures:
future.result()

0 comments on commit 98e44ac

Please sign in to comment.