Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate KubeCluster component status to kr8s #792

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 28 additions & 44 deletions dask_kubernetes/operator/kubecluster/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
from rich.console import Group
from rich.panel import Panel
from rich.spinner import Spinner
import pykube.exceptions
import kubernetes_asyncio as kubernetes
import yaml
import kr8s
from kr8s.asyncio.objects import Pod, Service

import dask.config
from distributed.core import Status, rpc
Expand All @@ -44,9 +44,7 @@
from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
from dask_kubernetes.aiopykube.dask import (
DaskCluster as AIODaskCluster,
DaskWorkerGroup as AIODaskWorkerGroup,
)
from dask_kubernetes.aiopykube.objects import Pod, Service
from dask_kubernetes.exceptions import CrashLoopBackOffError, SchedulerStartupError
from dask_kubernetes.operator._objects import (
DaskCluster,
Expand Down Expand Up @@ -504,53 +502,39 @@ async def _wait_for_controller(self):
async def _watch_component_status(self):
while True:
# Get DaskCluster status
with suppress(pykube.exceptions.ObjectDoesNotExist):
cluster = await AIODaskCluster.objects(
self.k8s_api, namespace=self.namespace
).get_by_name(self.name)
self._startup_component_status["cluster"] = cluster.obj["status"][
"phase"
]
with suppress(kr8s.NotFoundError):
cluster = await DaskCluster.get(self.name, namespace=self.namespace)
if "status" in cluster.raw and "phase" in cluster.status:
self._startup_component_status["cluster"] = cluster.status.phase

# Get Scheduler Pod status
with suppress(pykube.exceptions.ObjectDoesNotExist):
async with kubernetes.client.api_client.ApiClient() as api_client:
core_api = kubernetes.client.CoreV1Api(api_client)
pods = await core_api.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}",
)
if pods.items:
pod = await Pod.objects(
self.k8s_api, namespace=self.namespace
).get_by_name(pods.items[0].metadata.name)
phase = pod.obj["status"]["phase"]
if phase == "Running":
conditions = {
c["type"]: c["status"]
for c in pod.obj["status"]["conditions"]
}
if "Ready" not in conditions or conditions["Ready"] != "True":
phase = "Health Checking"
if "containerStatuses" in pod.obj["status"]:
for container in pod.obj["status"]["containerStatuses"]:
if "waiting" in container["state"]:
phase = container["state"]["waiting"]["reason"]

self._startup_component_status["schedulerpod"] = phase
with suppress(kr8s.NotFoundError):
scheduler_pod = await Pod.get(
namespace=self.namespace,
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}",
)

phase = scheduler_pod.status.phase
if scheduler_pod.status.phase == "Running":
if not await scheduler_pod.ready():
phase = "Health Checking"
if "container_statuses" in scheduler_pod.status:
for container in scheduler_pod.status.container_statuses:
if "waiting" in container.state:
phase = container.state.waiting.reason

self._startup_component_status["schedulerpod"] = phase

# Get Scheduler Service status
with suppress(pykube.exceptions.ObjectDoesNotExist):
await Service.objects(self.k8s_api).get_by_name(
self.name + "-scheduler"
)
with suppress(kr8s.NotFoundError):
await Service.get(self.name + "-scheduler", namespace=self.namespace)
self._startup_component_status["schedulerservice"] = "Created"

# Get DaskWorkerGroup status
with suppress(pykube.exceptions.ObjectDoesNotExist):
await AIODaskWorkerGroup.objects(
self.k8s_api, namespace=self.namespace
).get_by_name(self.name + "-default")
with suppress(kr8s.NotFoundError):
await DaskWorkerGroup.get(
self.name + "-default", namespace=self.namespace
)
self._startup_component_status["workergroup"] = "Created"

await asyncio.sleep(1)
Expand Down Expand Up @@ -745,7 +729,7 @@ def close(self, timeout=3600):
async def _close(self, timeout=3600):
await super()._close()
if self.shutdown_on_close:
cluster = await DaskCluster(self.name, namespace=self.namespace)
cluster = await DaskCluster.get(self.name, namespace=self.namespace)
try:
await cluster.delete()
except kr8s.NotFoundError:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1
kopf>=1.35.3
pykube-ng>=22.9.0
rich>=12.5.1
kr8s==0.8.7
kr8s==0.8.8