Skip to content

Commit

Permalink
Discovered Apps Failover and Relocate (#9860)
Browse files Browse the repository at this point in the history
* Discovered Apps Failover and Relocate

Signed-off-by: prsurve <[email protected]>


---------

Signed-off-by: prsurve <[email protected]>
  • Loading branch information
prsurve authored Sep 12, 2024
1 parent 7c8abb5 commit 348b59a
Show file tree
Hide file tree
Showing 9 changed files with 583 additions and 15 deletions.
12 changes: 12 additions & 0 deletions conf/ocsci/dr_workload.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,16 @@ ENV_DATA:
dr_workload_app_pvc_selector: { 'appname': 'kubevirt' }, pod_count: 1, pvc_count: 1
},
]
dr_workload_discovered_apps_rbd: [
{ name: "busybox-dict-1", workload_dir: "rdr/busybox/app-busybox-1/resources/deployment",
pod_count: 10, pvc_count: 10,
dr_workload_app_pod_selector_key: "workloadpattern",
dr_workload_app_pod_selector_value: "simple_io",
dr_workload_app_pvc_selector_key: "appname",
dr_workload_app_pvc_selector_value: "busybox_app1",
workload_namespace: "busybox-dict-1",
dr_workload_app_placement_name: "busybox-dict-1"
}
]

# dr_policy_name: PLACEHOLDER
135 changes: 123 additions & 12 deletions ocs_ci/helpers/dr_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@
logger = logging.getLogger(__name__)


def get_current_primary_cluster_name(namespace, workload_type=constants.SUBSCRIPTION):
def get_current_primary_cluster_name(
namespace, workload_type=constants.SUBSCRIPTION, discovered_apps=False
):
"""
Get current primary cluster name based on workload namespace
Args:
namespace (str): Name of the namespace
workload_type (str): Type of workload, i.e., Subscription or ApplicationSet
discovered_apps (bool): If true then deployed workload is discovered_apps
Returns:
str: Current primary cluster name
Expand All @@ -56,6 +59,8 @@ def get_current_primary_cluster_name(namespace, workload_type=constants.SUBSCRIP
restore_index = config.cur_index
if workload_type == constants.APPLICATION_SET:
namespace = constants.GITOPS_CLUSTER_NAMESPACE
if discovered_apps:
namespace = constants.DR_OPS_NAMESAPCE
drpc_data = DRPC(namespace=namespace).get()
if drpc_data.get("spec").get("action") == constants.ACTION_FAILOVER:
cluster_name = drpc_data["spec"]["failoverCluster"]
Expand All @@ -65,13 +70,16 @@ def get_current_primary_cluster_name(namespace, workload_type=constants.SUBSCRIP
return cluster_name


def get_current_secondary_cluster_name(namespace, workload_type=constants.SUBSCRIPTION):
def get_current_secondary_cluster_name(
namespace, workload_type=constants.SUBSCRIPTION, discovered_apps=False
):
"""
Get current secondary cluster name based on workload namespace
Args:
namespace (str): Name of the namespace
workload_type (str): Type of workload, i.e., Subscription or ApplicationSet
discovered_apps (bool): If true then deployed workload is discovered_apps
Returns:
str: Current secondary cluster name
Expand All @@ -80,6 +88,8 @@ def get_current_secondary_cluster_name(namespace, workload_type=constants.SUBSCR
restore_index = config.cur_index
if workload_type == constants.APPLICATION_SET:
namespace = constants.GITOPS_CLUSTER_NAMESPACE
if discovered_apps:
namespace = constants.DR_OPS_NAMESAPCE
primary_cluster_name = get_current_primary_cluster_name(namespace)
drpolicy_data = DRPC(namespace=namespace).drpolicy_obj.get()
config.switch_ctx(restore_index)
Expand Down Expand Up @@ -122,13 +132,16 @@ def set_current_secondary_cluster_context(
config.switch_to_cluster_by_name(cluster_name)


def get_scheduling_interval(namespace, workload_type=constants.SUBSCRIPTION):
def get_scheduling_interval(
namespace, workload_type=constants.SUBSCRIPTION, discovered_apps=False
):
"""
Get scheduling interval for the workload in the given namespace
Args:
namespace (str): Name of the namespace
workload_type (str): Type of workload, i.e., Subscription or ApplicationSet
discovered_apps (bool): If true then deployed workload is discovered_apps
Returns:
int: scheduling interval value from DRPolicy
Expand All @@ -137,6 +150,8 @@ def get_scheduling_interval(namespace, workload_type=constants.SUBSCRIPTION):
restore_index = config.cur_index
if workload_type == constants.APPLICATION_SET:
namespace = constants.GITOPS_CLUSTER_NAMESPACE
if discovered_apps:
namespace = constants.DR_OPS_NAMESAPCE
drpolicy_obj = DRPC(namespace=namespace).drpolicy_obj
interval_value = int(drpolicy_obj.get()["spec"]["schedulingInterval"][:-1])
config.switch_ctx(restore_index)
Expand All @@ -149,6 +164,8 @@ def failover(
workload_type=constants.SUBSCRIPTION,
workload_placement_name=None,
switch_ctx=None,
discovered_apps=False,
old_primary=None,
):
"""
Initiates Failover action to the specified cluster
Expand All @@ -159,6 +176,8 @@ def failover(
workload_type (str): Type of workload, i.e., Subscription or ApplicationSet
workload_placement_name (str): Placement name
switch_ctx (int): The cluster index by the cluster name
discovered_apps (bool): True when cluster is failing over DiscoveredApps
old_primary (str): Name of cluster where workload were running
"""
restore_index = config.cur_index
Expand All @@ -171,9 +190,16 @@ def failover(
resource_name=f"{workload_placement_name}-drpc",
switch_ctx=switch_ctx,
)
elif discovered_apps:
failover_params = (
f'{{"spec":{{"action":"{constants.ACTION_FAILOVER}",'
f'"failoverCluster":"{failover_cluster}",'
f'"preferredCluster":"{old_primary}"}}}}'
)
namespace = constants.DR_OPS_NAMESAPCE
drpc_obj = DRPC(namespace=namespace, resource_name=f"{workload_placement_name}")
else:
drpc_obj = DRPC(namespace=namespace, switch_ctx=switch_ctx)

drpc_obj.wait_for_peer_ready_status()
logger.info(f"Initiating Failover action with failoverCluster:{failover_cluster}")
assert drpc_obj.patch(
Expand All @@ -183,6 +209,7 @@ def failover(
logger.info(
f"Wait for {constants.DRPC}: {drpc_obj.resource_name} to reach {constants.STATUS_FAILEDOVER} phase"
)

drpc_obj.wait_for_phase(constants.STATUS_FAILEDOVER)
config.switch_ctx(restore_index)

Expand All @@ -193,6 +220,9 @@ def relocate(
workload_type=constants.SUBSCRIPTION,
workload_placement_name=None,
switch_ctx=None,
discovered_apps=False,
old_primary=None,
workload_instance=None,
):
"""
Initiates Relocate action to the specified cluster
Expand All @@ -203,6 +233,10 @@ def relocate(
workload_type (str): Type of workload, i.e., Subscription or ApplicationSet
workload_placement_name (str): Placement name
switch_ctx (int): The cluster index by the cluster name
discovered_apps (bool): If true then deployed workload is discovered_apps
old_primary (str): Name of cluster where workload were running
workload_instance (object): Discovered App instance to get namespace and dir location
"""
restore_index = config.cur_index
Expand All @@ -215,6 +249,14 @@ def relocate(
resource_name=f"{workload_placement_name}-drpc",
switch_ctx=switch_ctx,
)
elif discovered_apps:
relocate_params = (
f'{{"spec":{{"action":"{constants.ACTION_RELOCATE}",'
f'"failoverCluster":"{old_primary}",'
f'"preferredCluster":"{preferred_cluster}"}}}}'
)
namespace = constants.DR_OPS_NAMESAPCE
drpc_obj = DRPC(namespace=namespace, resource_name=f"{workload_placement_name}")
else:
drpc_obj = DRPC(namespace=namespace, switch_ctx=switch_ctx)
drpc_obj.wait_for_peer_ready_status()
Expand All @@ -226,7 +268,19 @@ def relocate(
logger.info(
f"Wait for {constants.DRPC}: {drpc_obj.resource_name} to reach {constants.STATUS_RELOCATED} phase"
)
drpc_obj.wait_for_phase(constants.STATUS_RELOCATED)
relocate_condition = constants.STATUS_RELOCATED
if discovered_apps:
relocate_condition = constants.STATUS_RELOCATING
drpc_obj.wait_for_phase(relocate_condition)

if discovered_apps and workload_instance:
logger.info("Doing Cleanup Operations")
do_discovered_apps_cleanup(
drpc_name=workload_placement_name,
old_primary=old_primary,
workload_namespace=workload_instance.workload_namespace,
workload_dir=workload_instance.workload_dir,
)
config.switch_ctx(restore_index)


Expand Down Expand Up @@ -487,7 +541,9 @@ def check_vrg_state(state, namespace):
return False


def wait_for_replication_resources_creation(vr_count, namespace, timeout):
def wait_for_replication_resources_creation(
vr_count, namespace, timeout, discovered_apps=False
):
"""
Wait for replication resources to be created
Expand All @@ -496,13 +552,18 @@ def wait_for_replication_resources_creation(vr_count, namespace, timeout):
namespace (str): the namespace of the VR or ReplicationSource resources
timeout (int): time in seconds to wait for VR or ReplicationSource resources to be created
or reach expected state
discovered_apps (bool): If true then deployed workload is discovered_apps
Raises:
TimeoutExpiredError: In case replication resources not created
"""
logger.info("Waiting for VRG to be created")
if discovered_apps:
vrg_namespace = constants.DR_OPS_NAMESAPCE

sample = TimeoutSampler(
timeout=timeout, sleep=5, func=check_vrg_existence, namespace=namespace
timeout=timeout, sleep=5, func=check_vrg_existence, namespace=vrg_namespace
)
if not sample.wait_for_func_status(result=True):
error_msg = "VRG resource is not created"
Expand All @@ -516,7 +577,6 @@ def wait_for_replication_resources_creation(vr_count, namespace, timeout):
else:
resource_kind = constants.VOLUME_REPLICATION
count_function = get_vr_count

if config.MULTICLUSTER["multicluster_mode"] != "metro-dr":
logger.info(f"Waiting for {vr_count} {resource_kind}s to be created")
sample = TimeoutSampler(
Expand Down Expand Up @@ -549,7 +609,7 @@ def wait_for_replication_resources_creation(vr_count, namespace, timeout):
sleep=5,
func=check_vrg_state,
state="primary",
namespace=namespace,
namespace=vrg_namespace,
)
if not sample.wait_for_func_status(result=True):
error_msg = "VRG hasn't reached expected state primary within the time limit."
Expand Down Expand Up @@ -631,7 +691,12 @@ def wait_for_replication_resources_deletion(namespace, timeout, check_state=True


def wait_for_all_resources_creation(
pvc_count, pod_count, namespace, timeout=900, skip_replication_resources=False
pvc_count,
pod_count,
namespace,
timeout=900,
skip_replication_resources=False,
discovered_apps=False,
):
"""
Wait for workload and replication resources to be created
Expand All @@ -642,6 +707,8 @@ def wait_for_all_resources_creation(
namespace (str): the namespace of the workload
timeout (int): time in seconds to wait for resource creation
skip_replication_resources (bool): if true vr status wont't be check
discovered_apps (bool): If true then deployed workload is discovered_apps
"""
logger.info(f"Waiting for {pvc_count} PVCs to reach {constants.STATUS_BOUND} state")
Expand All @@ -660,9 +727,10 @@ def wait_for_all_resources_creation(
timeout=timeout,
sleep=5,
)

if not skip_replication_resources:
wait_for_replication_resources_creation(pvc_count, namespace, timeout)
wait_for_replication_resources_creation(
pvc_count, namespace, timeout, discovered_apps
)


def wait_for_all_resources_deletion(
Expand Down Expand Up @@ -1493,3 +1561,46 @@ def replace_cluster(workload, primary_cluster_name, secondary_cluster_name):

# Configure DRClusters for fencing automation
configure_drcluster_for_fencing()


def do_discovered_apps_cleanup(
drpc_name, old_primary, workload_namespace, workload_dir
):
"""
Function to clean up Resources
Args:
drpc_name (str): Name of DRPC
old_primary (str): Name of old primary where cleanup will happen
workload_namespace (str): Workload namespace
workload_dir (str): Dir location of workload
"""
restore_index = config.cur_index
config.switch_acm_ctx()
drpc_obj = DRPC(namespace=constants.DR_OPS_NAMESAPCE, resource_name=drpc_name)
drpc_obj.wait_for_progression_status(status=constants.STATUS_WAITFORUSERTOCLEANUP)
config.switch_to_cluster_by_name(old_primary)
workload_path = constants.DR_WORKLOAD_REPO_BASE_DIR + "/" + workload_dir
run_cmd(f"oc delete -k {workload_path} -n {workload_namespace} --wait=false")
wait_for_all_resources_deletion(namespace=workload_namespace)
config.switch_acm_ctx()
drpc_obj.wait_for_progression_status(status=constants.STATUS_COMPLETED)
config.switch_ctx(restore_index)


def generate_kubeobject_capture_interval():
"""
Generate KubeObject Capture Interval
Returns:
int: capture interval value to be used
"""
capture_interval = int(get_all_drpolicy()[0]["spec"]["schedulingInterval"][:-1])

if capture_interval <= 5 and capture_interval != 1:
return capture_interval - 1
elif capture_interval > 6:
return 5
else:
return capture_interval
8 changes: 8 additions & 0 deletions ocs_ci/ocs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@
STATUS_FAILED = "Failed"
STATUS_FAILEDOVER = "FailedOver"
STATUS_RELOCATED = "Relocated"
STATUS_RELOCATING = "Relocating"
STATUS_CONTAINER_STATUS_UNKNOWN = "ContainerStatusUnknown"
STATUS_WAITFORUSERTOCLEANUP = "WaitOnUserToCleanUp"
STATUS_POWERON = "ON"
STATUS_POWEROFF = "OFF"

# NooBaa statuses
BS_AUTH_FAILED = "AUTH_FAILED"
Expand Down Expand Up @@ -2842,6 +2846,7 @@

# DR
DRPC_PATH = os.path.join(TEMPLATE_DIR, "DR", "drpc.yaml")
PLACEMENT_PATH = os.path.join(TEMPLATE_DIR, "DR", "placement.yaml")
CLUSTERROLEBINDING_APPSET_PULLMODEL_PATH = os.path.join(
TEMPLATE_DIR, "DR", "clusterrolebinding_appset_pullmodel.yaml"
)
Expand All @@ -2857,6 +2862,9 @@
GITOPS_CLUSTER_NAMESPACE = "openshift-gitops"
APPLICATION_ARGOCD = "applications.argoproj.io"
PLACEMENT_KIND = "placements.cluster.open-cluster-management.io"

DISCOVERED_APPS = "DiscoveredApps"
DR_OPS_NAMESAPCE = "openshift-dr-ops"
DPA_DISCOVERED_APPS_PATH = os.path.join(TEMPLATE_DIR, "DR", "dpa_discovered_apps.yaml")

DISABLE_DR_EACH_APP = os.path.join(TEMPLATE_DIR, "DR", "disable_dr_each_app.sh")
Expand Down
Loading

0 comments on commit 348b59a

Please sign in to comment.