diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index fc6ae443a..054f3826a 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -19,10 +19,10 @@ from rich.console import Group from rich.panel import Panel from rich.spinner import Spinner -import pykube.exceptions import kubernetes_asyncio as kubernetes import yaml import kr8s +from kr8s.asyncio.objects import Pod, Service import dask.config from distributed.core import Status, rpc @@ -44,9 +44,7 @@ from dask_kubernetes.aiopykube import HTTPClient, KubeConfig from dask_kubernetes.aiopykube.dask import ( DaskCluster as AIODaskCluster, - DaskWorkerGroup as AIODaskWorkerGroup, ) -from dask_kubernetes.aiopykube.objects import Pod, Service from dask_kubernetes.exceptions import CrashLoopBackOffError, SchedulerStartupError from dask_kubernetes.operator._objects import ( DaskCluster, @@ -504,53 +502,39 @@ async def _wait_for_controller(self): async def _watch_component_status(self): while True: # Get DaskCluster status - with suppress(pykube.exceptions.ObjectDoesNotExist): - cluster = await AIODaskCluster.objects( - self.k8s_api, namespace=self.namespace - ).get_by_name(self.name) - self._startup_component_status["cluster"] = cluster.obj["status"][ - "phase" - ] + with suppress(kr8s.NotFoundError): + cluster = await DaskCluster.get(self.name, namespace=self.namespace) + if "status" in cluster.raw and "phase" in cluster.status: + self._startup_component_status["cluster"] = cluster.status.phase # Get Scheduler Pod status - with suppress(pykube.exceptions.ObjectDoesNotExist): - async with kubernetes.client.api_client.ApiClient() as api_client: - core_api = kubernetes.client.CoreV1Api(api_client) - pods = await core_api.list_namespaced_pod( - namespace=self.namespace, - label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}", - ) - if pods.items: - pod = await Pod.objects( - self.k8s_api, namespace=self.namespace - ).get_by_name(pods.items[0].metadata.name) - phase = pod.obj["status"]["phase"] - if phase == "Running": - conditions = { - c["type"]: c["status"] - for c in pod.obj["status"]["conditions"] - } - if "Ready" not in conditions or conditions["Ready"] != "True": - phase = "Health Checking" - if "containerStatuses" in pod.obj["status"]: - for container in pod.obj["status"]["containerStatuses"]: - if "waiting" in container["state"]: - phase = container["state"]["waiting"]["reason"] - - self._startup_component_status["schedulerpod"] = phase + with suppress(kr8s.NotFoundError): + scheduler_pod = await Pod.get( + namespace=self.namespace, + label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}", + ) + + phase = scheduler_pod.status.phase + if scheduler_pod.status.phase == "Running": + if not await scheduler_pod.ready(): + phase = "Health Checking" + if "container_statuses" in scheduler_pod.status: + for container in scheduler_pod.status.container_statuses: + if "waiting" in container.state: + phase = container.state.waiting.reason + + self._startup_component_status["schedulerpod"] = phase # Get Scheduler Service status - with suppress(pykube.exceptions.ObjectDoesNotExist): - await Service.objects(self.k8s_api).get_by_name( - self.name + "-scheduler" - ) + with suppress(kr8s.NotFoundError): + await Service.get(self.name + "-scheduler", namespace=self.namespace) self._startup_component_status["schedulerservice"] = "Created" # Get DaskWorkerGroup status - with suppress(pykube.exceptions.ObjectDoesNotExist): - await AIODaskWorkerGroup.objects( - self.k8s_api, namespace=self.namespace - ).get_by_name(self.name + "-default") + with suppress(kr8s.NotFoundError): + await DaskWorkerGroup.get( + self.name + "-default", namespace=self.namespace + ) self._startup_component_status["workergroup"] = "Created" await asyncio.sleep(1) @@ -745,7 +729,7 @@ def close(self, timeout=3600): async def _close(self, timeout=3600): await super()._close() if self.shutdown_on_close: - cluster = await DaskCluster(self.name, namespace=self.namespace) + cluster = await DaskCluster.get(self.name, namespace=self.namespace) try: await cluster.delete() except kr8s.NotFoundError: diff --git a/requirements.txt b/requirements.txt index 76f3bf206..552e91d80 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1 kopf>=1.35.3 pykube-ng>=22.9.0 rich>=12.5.1 -kr8s==0.8.7 +kr8s==0.8.8