Skip to content

Commit

Permalink
Add capacity to the stretch cluster (#9315)
Browse files Browse the repository at this point in the history
Signed-off-by: Mahesh Shetty <[email protected]>
  • Loading branch information
mashetty330 authored Apr 25, 2024
1 parent c504521 commit 0e1402e
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 33 deletions.
4 changes: 3 additions & 1 deletion ocs_ci/ocs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2479,7 +2479,9 @@
DATA_ZONE_1 = "b"
DATA_ZONE_2 = "c"

ZONES_LABELS = ["data-1", "data-2", "arbiter"]
DATA_ZONE_LABELS = ["data-1", "data-2"]
ARBITER_ZONE_LABEL = ["arbiter"]
ZONES_LABELS = DATA_ZONE_LABELS.extend(ARBITER_ZONE_LABEL)

RGW_SVC_TOPOLOGY_ANNOTATIONS = "service.kubernetes.io/topology-mode: Auto"

Expand Down
19 changes: 19 additions & 0 deletions ocs_ci/ocs/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
)
from ocs_ci.utility.decorators import switch_to_orig_index_at_last


log = logging.getLogger(__name__)


Expand Down Expand Up @@ -2061,6 +2062,24 @@ def add_new_disk_for_vsphere(sc_name):
add_disk_to_node(node_with_min_pvs)


def add_disk_stretch_arbiter():
"""
Adds disk to storage nodes in a stretch cluster with arbiter
configuration evenly spread across two zones. Stretch cluster has
replica 4, hence 2 disks to each of the zones
"""

from ocs_ci.ocs.resources.stretchcluster import StretchCluster

data_zones = constants.DATA_ZONE_LABELS
sc_obj = StretchCluster()

for zone in data_zones:
for node in sc_obj.get_ocs_nodes_in_zone(zone)[:2]:
add_disk_to_node(node)


def get_odf_zone_count():
"""
Get the number of Availability zones used by ODF cluster
Expand Down
13 changes: 11 additions & 2 deletions ocs_ci/ocs/resources/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -2258,15 +2258,24 @@ def wait_for_new_osd_pods_to_come_up(number_of_osd_pods_before):
logger.warning("None of the new osd pods reached the desired status")


def get_pod_restarts_count(namespace=config.ENV_DATA["cluster_namespace"]):
def get_pod_restarts_count(namespace=config.ENV_DATA["cluster_namespace"], label=None):
"""
Gets the dictionary of pod and its restart count for all the pods in a given namespace
Returns:
dict: dictionary of pod name and its corresponding restart count
"""
list_of_pods = get_all_pods(namespace)
if label:
selector = label.split("=")[1]
selector_label = label.split("=")[0]
else:
selector = None
selector_label = None

list_of_pods = get_all_pods(
namespace=namespace, selector=[selector], selector_label=selector_label
)
restart_dict = {}
ocp_pod_obj = OCP(kind=constants.POD, namespace=namespace)
for p in list_of_pods:
Expand Down
11 changes: 9 additions & 2 deletions ocs_ci/ocs/resources/storage_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
get_nodes,
get_nodes_where_ocs_pods_running,
get_provider_internal_node_ips,
add_disk_stretch_arbiter,
)
from ocs_ci.ocs.version import get_ocp_version
from ocs_ci.utility.version import get_semantic_version, VERSION_4_11
Expand Down Expand Up @@ -1563,8 +1564,14 @@ def add_capacity_lso(ui_flag=False):
num_available_pv = 2
set_count = deviceset_count + 2
else:
add_new_disk_for_vsphere(sc_name=constants.LOCALSTORAGE_SC)
num_available_pv = 3
num_available_pv = get_osd_replica_count()
if (
config.DEPLOYMENT.get("arbiter_deployment") is True
and num_available_pv == 4
):
add_disk_stretch_arbiter()
else:
add_new_disk_for_vsphere(sc_name=constants.LOCALSTORAGE_SC)
set_count = deviceset_count + 1
localstorage.check_pvs_created(num_pvs_required=num_available_pv)
if ui_add_capacity_conditions() and ui_flag:
Expand Down
56 changes: 55 additions & 1 deletion ocs_ci/ocs/resources/stretchcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

from datetime import timedelta

from ocs_ci.ocs.node import get_nodes_having_label, get_node_objs
from ocs_ci.ocs.resources import pod
from ocs_ci.ocs.node import get_nodes_having_label, get_ocs_nodes, get_node_objs
from ocs_ci.ocs.resources.ocs import OCS
from ocs_ci.ocs.resources.pvc import get_pvc_objs
from ocs_ci.utility.retry import retry
from ocs_ci.ocs.exceptions import (
CommandFailed,
Expand Down Expand Up @@ -114,6 +115,42 @@ def cephfs_old_log(self):
def rbd_old_log(self):
return self.logfile_map[constants.LOGWRITER_RBD_LABEL][2]

def get_workload_pvc_obj(self, workload_label):
"""
Gets the PVC object for the volume attached
to the workload type mentioned by label
Args:
workload_label (str): Label for the workload
Returns:
PVC object
"""
pvcs = None

if (
workload_label == constants.LOGWRITER_CEPHFS_LABEL
or workload_label == constants.LOGREADER_CEPHFS_LABEL
):
pvcs = get_pvc_objs(
pvc_names=[
self.cephfs_logwriter_dep.get()["spec"]["template"]["spec"][
"volumes"
][0]["persistentVolumeClaim"]["claimName"]
],
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
elif workload_label == constants.LOGWRITER_RBD_LABEL:
pvc_names = list()
for pod_obj in self.workload_map[workload_label]:
pvc_names.append(f"logwriter-rbd-{pod_obj.name}")
pvcs = get_pvc_objs(
pvc_names=pvc_names, namespace=constants.STRETCH_CLUSTER_NAMESPACE
)

return pvcs

def get_nodes_in_zone(self, zone):
"""
This will return the list containing OCS objects
Expand All @@ -130,6 +167,23 @@ def get_nodes_in_zone(self, zone):
label = f"{constants.ZONE_LABEL}={zone}"
return [OCS(**node_info) for node_info in get_nodes_having_label(label)]

def get_ocs_nodes_in_zone(self, zone):
"""
Get the OCS nodes in a particular zone
Args:
zone (str): Zone that node belongs to
Returns:
List: Node(OCS) objects
"""

nodes_in_zone = set([node.name for node in self.get_nodes_in_zone(zone)])
ocs_nodes = set([node.name for node in get_ocs_nodes()])
ocs_nodes_in_zone = nodes_in_zone.intersection(ocs_nodes)
return get_node_objs(list(ocs_nodes_in_zone))

@retry(CommandFailed, tries=10, delay=10)
def check_for_read_pause(self, label, start_time, end_time):
"""
Expand Down
22 changes: 0 additions & 22 deletions tests/disaster-recovery/sc_arbiter/conftest.py

This file was deleted.

167 changes: 167 additions & 0 deletions tests/disaster-recovery/sc_arbiter/test_add_capacity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import pytest
import logging

from datetime import datetime, timezone
from ocs_ci.framework.pytest_customization.marks import (
turquoise_squad,
stretchcluster_required,
)
from ocs_ci.ocs import constants
from ocs_ci.ocs.resources import storage_cluster
from ocs_ci.ocs.resources.pod import (
get_pod_restarts_count,
get_ceph_tools_pod,
wait_for_pods_to_be_in_statuses,
)
from ocs_ci.ocs.resources.stretchcluster import StretchCluster

logger = logging.getLogger(__name__)


@turquoise_squad
@stretchcluster_required
class TestAddCapacityStretchCluster:
"""
Add capacity to the Stretch cluster with arbiter configuration
"""

@staticmethod
def add_capacity_to_stretch_cluster():
"""
Perform add capacity on a stretch cluster
"""
# get osd pods restart count before
osd_pods_restart_count_before = get_pod_restarts_count(
label=constants.OSD_APP_LABEL
)

# add capacity to the cluster
storage_cluster.add_capacity_lso(ui_flag=False)
logger.info("Successfully added capacity")

# get osd pods restart count after
osd_pods_restart_count_after = get_pod_restarts_count(
label=constants.OSD_APP_LABEL
)

# assert if any osd pods restart
assert sum(osd_pods_restart_count_before.values()) == sum(
osd_pods_restart_count_after.values()
), "Some of the osd pods have restarted during the add capacity"
logger.info("osd pod restarts counts are same before and after.")

# assert if osd weights for both the zones are not balanced
tools_pod = get_ceph_tools_pod()
zone1_osd_weight = tools_pod.exec_sh_cmd_on_pod(
command=f"ceph osd tree | grep 'zone {constants.DATA_ZONE_LABELS[0]}' | awk '{{print $2}}'",
)
zone2_osd_weight = tools_pod.exec_sh_cmd_on_pod(
command=f"ceph osd tree | grep 'zone {constants.DATA_ZONE_LABELS[1]}' | awk '{{print $2}}'",
)

assert float(zone1_osd_weight.strip()) == float(
zone2_osd_weight.strip()
), "OSD weights are not balanced"
logger.info("OSD weights are balanced")

@pytest.mark.last
@pytest.mark.parametrize(
argnames=["iterations"],
argvalues=[
pytest.param(
3,
marks=[
pytest.mark.polarion_id("OCS-5474"),
pytest.mark.bugzilla("2143858"),
],
),
],
)
def test_cluster_expansion(
self,
setup_logwriter_cephfs_workload_factory,
setup_logwriter_rbd_workload_factory,
logreader_workload_factory,
iterations,
):

"""
Test cluster exapnsion and health when add capacity is performed
continuously
"""

sc_obj = StretchCluster()

# setup logwriter workloads in the background
(
sc_obj.cephfs_logwriter_dep,
sc_obj.cephfs_logreader_job,
) = setup_logwriter_cephfs_workload_factory(read_duration=0)

sc_obj.get_logwriter_reader_pods(label=constants.LOGWRITER_CEPHFS_LABEL)
sc_obj.get_logwriter_reader_pods(label=constants.LOGREADER_CEPHFS_LABEL)
sc_obj.get_logwriter_reader_pods(
label=constants.LOGWRITER_RBD_LABEL, exp_num_replicas=2
)
logger.info("All the workloads pods are successfully up and running")

start_time = datetime.now(timezone.utc)

sc_obj.get_logfile_map(label=constants.LOGWRITER_CEPHFS_LABEL)
sc_obj.get_logfile_map(label=constants.LOGWRITER_RBD_LABEL)

# add capacity to the cluster
for iteration in range(iterations):
logger.info(f"[{iteration+1}] adding capacity to the cluster now...")
self.add_capacity_to_stretch_cluster()
logger.info("successfully added capacity to the cluster")

# check Io for any failures
end_time = datetime.now(timezone.utc)
sc_obj.post_failure_checks(start_time, end_time, wait_for_read_completion=False)
logger.info("Successfully verified with post failure checks for the workloads")

sc_obj.cephfs_logreader_job.delete()
logger.info(sc_obj.cephfs_logreader_pods)
for pod in sc_obj.cephfs_logreader_pods:
pod.wait_for_pod_delete(timeout=120)
logger.info("All old CephFS logreader pods are deleted")

# check for any data loss
assert sc_obj.check_for_data_loss(
constants.LOGWRITER_CEPHFS_LABEL
), "[CephFS] Data is lost"
logger.info("[CephFS] No data loss is seen")
assert sc_obj.check_for_data_loss(
constants.LOGWRITER_RBD_LABEL
), "[RBD] Data is lost"
logger.info("[RBD] No data loss is seen")

# check for data corruption
logreader_workload_factory(
pvc=sc_obj.get_workload_pvc_obj(constants.LOGWRITER_CEPHFS_LABEL)[0],
logreader_path=constants.LOGWRITER_CEPHFS_READER,
duration=5,
)
sc_obj.get_logwriter_reader_pods(constants.LOGREADER_CEPHFS_LABEL)

wait_for_pods_to_be_in_statuses(
expected_statuses=constants.STATUS_COMPLETED,
pod_names=[pod.name for pod in sc_obj.cephfs_logreader_pods],
timeout=900,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
logger.info("[CephFS] Logreader job pods have reached 'Completed' state!")

assert sc_obj.check_for_data_corruption(
label=constants.LOGREADER_CEPHFS_LABEL
), "Data is corrupted for cephFS workloads"
logger.info("No data corruption is seen in CephFS workloads")

assert sc_obj.check_for_data_corruption(
label=constants.LOGWRITER_RBD_LABEL
), "Data is corrupted for RBD workloads"
logger.info("No data corruption is seen in RBD workloads")
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@
@turquoise_squad
class TestZoneShutdownsAndCrashes:

zones = constants.ZONES_LABELS
# We dont want to select arbiter zone randomly for the shutdown/crash
# because its not valid test scenario
zones.remove("arbiter")
zones = constants.DATA_ZONE_LABELS

@pytest.fixture()
def init_sanity(self, request, nodes):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
)
from ocs_ci.ocs import constants
from ocs_ci.ocs.ocp import OCP
from ocs_ci.ocs.resources.pod import get_osd_pods, get_ceph_tools_pod
from ocs_ci.ocs.resources.pod import (
get_osd_pods,
get_ceph_tools_pod,
)
from ocs_ci.ocs.resources import storage_cluster
from ocs_ci.ocs.cluster import (
check_ceph_health_after_add_capacity,
Expand Down

0 comments on commit 0e1402e

Please sign in to comment.