diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index 62ab7707f..fc6ae443a 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -33,10 +33,8 @@ TimeoutError, format_dashboard_link, ) -from kubernetes_asyncio.client.exceptions import ApiException from dask_kubernetes.common.auth import ClusterAuth - from dask_kubernetes.common.networking import ( get_scheduler_address, wait_for_scheduler, @@ -45,12 +43,16 @@ from dask_kubernetes.common.utils import get_current_namespace from dask_kubernetes.aiopykube import HTTPClient, KubeConfig from dask_kubernetes.aiopykube.dask import ( - DaskCluster, + DaskCluster as AIODaskCluster, DaskWorkerGroup as AIODaskWorkerGroup, ) from dask_kubernetes.aiopykube.objects import Pod, Service from dask_kubernetes.exceptions import CrashLoopBackOffError, SchedulerStartupError -from dask_kubernetes.operator._objects import DaskWorkerGroup, DaskAutoscaler +from dask_kubernetes.operator._objects import ( + DaskCluster, + DaskWorkerGroup, + DaskAutoscaler, +) logger = logging.getLogger(__name__) @@ -485,7 +487,7 @@ async def _wait_for_controller(self): """Wait for the operator to set the status.phase.""" start = time.time() while start + self._resource_timeout > time.time(): - cluster = await DaskCluster.objects( + cluster = await AIODaskCluster.objects( self.k8s_api, namespace=self.namespace ).get_by_name(self.name) if ( @@ -503,7 +505,7 @@ async def _watch_component_status(self): while True: # Get DaskCluster status with suppress(pykube.exceptions.ObjectDoesNotExist): - cluster = await DaskCluster.objects( + cluster = await AIODaskCluster.objects( self.k8s_api, namespace=self.namespace ).get_by_name(self.name) self._startup_component_status["cluster"] = cluster.obj["status"][ @@ -743,25 +745,15 @@ def close(self, timeout=3600): async def _close(self, timeout=3600): await super()._close() if self.shutdown_on_close: - async with kubernetes.client.api_client.ApiClient() as api_client: - custom_objects_api = kubernetes.client.CustomObjectsApi(api_client) - try: - await custom_objects_api.delete_namespaced_custom_object( - group="kubernetes.dask.org", - version="v1", - plural="daskclusters", - namespace=self.namespace, - name=self.name, - ) - except ApiException as e: - if e.reason == "Not Found": - logger.warning( - "Failed to delete DaskCluster, looks like it has already been deleted." - ) - else: - raise + cluster = await DaskCluster(self.name, namespace=self.namespace) + try: + await cluster.delete() + except kr8s.NotFoundError: + logger.warning( + "Failed to delete DaskCluster, looks like it has already been deleted." + ) start = time.time() - while (await self._get_cluster()) is not None: + while await cluster.exists(): if time.time() > start + timeout: raise TimeoutError( f"Timed out deleting cluster resource {self.name}"