From dd0382fc3f503feefa9028359e2fda58504fb415 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 31 Jul 2023 11:51:36 +0100 Subject: [PATCH 1/3] Remove unused import --- .../operator/kubecluster/kubecluster.py | 80 ++++++++----------- 1 file changed, 34 insertions(+), 46 deletions(-) diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index 62ab7707f..a9a9724ed 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -19,7 +19,6 @@ from rich.console import Group from rich.panel import Panel from rich.spinner import Spinner -import pykube.exceptions import kubernetes_asyncio as kubernetes import yaml import kr8s @@ -33,6 +32,7 @@ TimeoutError, format_dashboard_link, ) +from kr8s.asyncio.objects import Pod, Service from kubernetes_asyncio.client.exceptions import ApiException from dask_kubernetes.common.auth import ClusterAuth @@ -45,12 +45,14 @@ from dask_kubernetes.common.utils import get_current_namespace from dask_kubernetes.aiopykube import HTTPClient, KubeConfig from dask_kubernetes.aiopykube.dask import ( - DaskCluster, - DaskWorkerGroup as AIODaskWorkerGroup, + DaskCluster as AIODaskCluster, ) -from dask_kubernetes.aiopykube.objects import Pod, Service from dask_kubernetes.exceptions import CrashLoopBackOffError, SchedulerStartupError -from dask_kubernetes.operator._objects import DaskWorkerGroup, DaskAutoscaler +from dask_kubernetes.operator._objects import ( + DaskCluster, + DaskWorkerGroup, + DaskAutoscaler, +) logger = logging.getLogger(__name__) @@ -485,7 +487,7 @@ async def _wait_for_controller(self): """Wait for the operator to set the status.phase.""" start = time.time() while start + self._resource_timeout > time.time(): - cluster = await DaskCluster.objects( + cluster = await AIODaskCluster.objects( self.k8s_api, namespace=self.namespace ).get_by_name(self.name) if ( @@ -502,53 +504,39 @@ async def _wait_for_controller(self): async def _watch_component_status(self): while True: # Get DaskCluster status - with suppress(pykube.exceptions.ObjectDoesNotExist): - cluster = await DaskCluster.objects( - self.k8s_api, namespace=self.namespace - ).get_by_name(self.name) - self._startup_component_status["cluster"] = cluster.obj["status"][ - "phase" - ] + with suppress(kr8s.NotFoundError): + cluster = await DaskCluster.get(self.name, namespace=self.namespace) + if "status" in cluster.raw and "phase" in cluster.status: + self._startup_component_status["cluster"] = cluster.status.phase # Get Scheduler Pod status - with suppress(pykube.exceptions.ObjectDoesNotExist): - async with kubernetes.client.api_client.ApiClient() as api_client: - core_api = kubernetes.client.CoreV1Api(api_client) - pods = await core_api.list_namespaced_pod( - namespace=self.namespace, - label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}", - ) - if pods.items: - pod = await Pod.objects( - self.k8s_api, namespace=self.namespace - ).get_by_name(pods.items[0].metadata.name) - phase = pod.obj["status"]["phase"] - if phase == "Running": - conditions = { - c["type"]: c["status"] - for c in pod.obj["status"]["conditions"] - } - if "Ready" not in conditions or conditions["Ready"] != "True": - phase = "Health Checking" - if "containerStatuses" in pod.obj["status"]: - for container in pod.obj["status"]["containerStatuses"]: - if "waiting" in container["state"]: - phase = container["state"]["waiting"]["reason"] - - self._startup_component_status["schedulerpod"] = phase + with suppress(kr8s.NotFoundError): + scheduler_pod = await Pod.get( + namespace=self.namespace, + label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}", + ) + + phase = scheduler_pod.status.phase + if scheduler_pod.status.phase == "Running": + if not await scheduler_pod.ready(): + phase = "Health Checking" + if "container_statuses" in scheduler_pod.status: + for container in scheduler_pod.status.container_statuses: + if "waiting" in container.state: + phase = container.state.waiting.reason + + self._startup_component_status["schedulerpod"] = phase # Get Scheduler Service status - with suppress(pykube.exceptions.ObjectDoesNotExist): - await Service.objects(self.k8s_api).get_by_name( - self.name + "-scheduler" - ) + with suppress(kr8s.NotFoundError): + await Service.get(self.name + "-scheduler", namespace=self.namespace) self._startup_component_status["schedulerservice"] = "Created" # Get DaskWorkerGroup status - with suppress(pykube.exceptions.ObjectDoesNotExist): - await AIODaskWorkerGroup.objects( - self.k8s_api, namespace=self.namespace - ).get_by_name(self.name + "-default") + with suppress(kr8s.NotFoundError): + await DaskWorkerGroup.get( + self.name + "-default", namespace=self.namespace + ) self._startup_component_status["workergroup"] = "Created" await asyncio.sleep(1) From 0b096dc7bd9a31cea1f6319e6322cddd09e8d8b0 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 31 Jul 2023 14:28:48 +0100 Subject: [PATCH 2/3] Get before deletion --- dask_kubernetes/operator/kubecluster/kubecluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index ec5f47629..054f3826a 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -729,7 +729,7 @@ def close(self, timeout=3600): async def _close(self, timeout=3600): await super()._close() if self.shutdown_on_close: - cluster = await DaskCluster(self.name, namespace=self.namespace) + cluster = await DaskCluster.get(self.name, namespace=self.namespace) try: await cluster.delete() except kr8s.NotFoundError: From 82cd22b412b23f38331597e35628f78fc6550220 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 31 Jul 2023 15:09:23 +0100 Subject: [PATCH 3/3] Bump kr8s to 0.8.8 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 76f3bf206..552e91d80 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1 kopf>=1.35.3 pykube-ng>=22.9.0 rich>=12.5.1 -kr8s==0.8.7 +kr8s==0.8.8