Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate KubeCluster close to kr8s #791

Merged
merged 2 commits into from
Jul 31, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 16 additions & 24 deletions dask_kubernetes/operator/kubecluster/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
TimeoutError,
format_dashboard_link,
)
from kubernetes_asyncio.client.exceptions import ApiException

from dask_kubernetes.common.auth import ClusterAuth

from dask_kubernetes.common.networking import (
get_scheduler_address,
wait_for_scheduler,
Expand All @@ -45,12 +43,16 @@
from dask_kubernetes.common.utils import get_current_namespace
from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
from dask_kubernetes.aiopykube.dask import (
DaskCluster,
DaskCluster as AIODaskCluster,
DaskWorkerGroup as AIODaskWorkerGroup,
)
from dask_kubernetes.aiopykube.objects import Pod, Service
from dask_kubernetes.exceptions import CrashLoopBackOffError, SchedulerStartupError
from dask_kubernetes.operator._objects import DaskWorkerGroup, DaskAutoscaler
from dask_kubernetes.operator._objects import (
DaskCluster,
DaskWorkerGroup,
DaskAutoscaler,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -485,7 +487,7 @@ async def _wait_for_controller(self):
"""Wait for the operator to set the status.phase."""
start = time.time()
while start + self._resource_timeout > time.time():
cluster = await DaskCluster.objects(
cluster = await AIODaskCluster.objects(
self.k8s_api, namespace=self.namespace
).get_by_name(self.name)
if (
Expand All @@ -503,7 +505,7 @@ async def _watch_component_status(self):
while True:
# Get DaskCluster status
with suppress(pykube.exceptions.ObjectDoesNotExist):
cluster = await DaskCluster.objects(
cluster = await AIODaskCluster.objects(
self.k8s_api, namespace=self.namespace
).get_by_name(self.name)
self._startup_component_status["cluster"] = cluster.obj["status"][
Expand Down Expand Up @@ -743,25 +745,15 @@ def close(self, timeout=3600):
async def _close(self, timeout=3600):
await super()._close()
if self.shutdown_on_close:
async with kubernetes.client.api_client.ApiClient() as api_client:
custom_objects_api = kubernetes.client.CustomObjectsApi(api_client)
try:
await custom_objects_api.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace=self.namespace,
name=self.name,
)
except ApiException as e:
if e.reason == "Not Found":
logger.warning(
"Failed to delete DaskCluster, looks like it has already been deleted."
)
else:
raise
cluster = await DaskCluster(self.name, namespace=self.namespace)
try:
await cluster.delete()
except kr8s.NotFoundError:
logger.warning(
"Failed to delete DaskCluster, looks like it has already been deleted."
)
start = time.time()
while (await self._get_cluster()) is not None:
while await cluster.exists():
if time.time() > start + timeout:
raise TimeoutError(
f"Timed out deleting cluster resource {self.name}"
Expand Down