Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate KubeCluster create resource to kr8s #796

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dask_kubernetes/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest

import logging
import pathlib
import os
import subprocess
Expand All @@ -17,6 +18,9 @@
check_dependency("kubectl")
check_dependency("docker")

for mod in ("httpx", "httpcore", "httpcore.http11"):
logging.getLogger().setLevel(logging.WARNING)


@pytest.fixture()
def kopf_runner(k8s_cluster):
Expand Down
14 changes: 11 additions & 3 deletions dask_kubernetes/operator/_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from kr8s.asyncio.objects import APIObject, Pod, Deployment, Service

WAIT_TIMEOUT = 30


class DaskCluster(APIObject):
version = "kubernetes.dask.org/v1"
Expand Down Expand Up @@ -115,7 +117,9 @@ async def deployments(self) -> List[Deployment]:
)

async def cluster(self) -> DaskCluster:
return await DaskCluster.get(self.spec.cluster, namespace=self.namespace)
return await DaskCluster.get(
self.spec.cluster, namespace=self.namespace, timeout=WAIT_TIMEOUT
)


class DaskAutoscaler(APIObject):
Expand All @@ -127,7 +131,9 @@ class DaskAutoscaler(APIObject):
namespaced = True

async def cluster(self) -> DaskCluster:
return await DaskCluster.get(self.spec.cluster, namespace=self.namespace)
return await DaskCluster.get(
self.spec.cluster, namespace=self.namespace, timeout=WAIT_TIMEOUT
)


class DaskJob(APIObject):
Expand All @@ -139,7 +145,9 @@ class DaskJob(APIObject):
namespaced = True

async def cluster(self) -> DaskCluster:
return await DaskCluster.get(self.name, namespace=self.namespace)
return await DaskCluster.get(
self.name, namespace=self.namespace, timeout=WAIT_TIMEOUT
)

async def pod(self) -> Pod:
pods = []
Expand Down
144 changes: 70 additions & 74 deletions dask_kubernetes/operator/kubecluster/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import warnings
import weakref
import uuid
import httpx

from rich import box
from rich.live import Live
Expand Down Expand Up @@ -332,86 +333,81 @@ async def _():
async def _create_cluster(self):
if self.shutdown_on_close is None:
self.shutdown_on_close = True
async with kubernetes.client.api_client.ApiClient() as api_client:
core_api = kubernetes.client.CoreV1Api(api_client)
custom_objects_api = kubernetes.client.CustomObjectsApi(api_client)

if not self._custom_cluster_spec:
self._log("Generating cluster spec")
data = make_cluster_spec(
name=self.name,
env=self.env,
resources=self.resources,
worker_command=self.worker_command,
n_workers=self.n_workers,
image=self.image,
scheduler_service_type=self.scheduler_service_type,
idle_timeout=self.idle_timeout,
)
if not self._custom_cluster_spec:
self._log("Generating cluster spec")
data = make_cluster_spec(
name=self.name,
env=self.env,
resources=self.resources,
worker_command=self.worker_command,
n_workers=self.n_workers,
image=self.image,
scheduler_service_type=self.scheduler_service_type,
idle_timeout=self.idle_timeout,
)
else:
data = self._custom_cluster_spec
try:
self._log("Creating DaskCluster object")
cluster = await DaskCluster(data, namespace=self.namespace)
await cluster.create()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise RuntimeError(
"Failed to create DaskCluster resource."
"Are the Dask Custom Resource Definitions installed? "
"https://kubernetes.dask.org/en/latest/operator.html#installing-the-operator"
) from e
else:
data = self._custom_cluster_spec
try:
self._log("Creating DaskCluster object")
await custom_objects_api.create_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace=self.namespace,
body=data,
)
except kubernetes.client.ApiException as e:
if e.status == 404:
raise RuntimeError(
"Failed to create DaskCluster resource."
"Are the Dask Custom Resource Definitions installed? "
"https://kubernetes.dask.org/en/latest/operator.html#installing-the-operator"
) from e
else:
raise e

try:
self._log("Waiting for controller to action cluster")
await self._wait_for_controller()
except TimeoutError as e:
await self._close()
raise e

try:
self._log("Waiting for controller to action cluster")
await self._wait_for_controller()
except TimeoutError as e:
await self._close()
raise e
try:
self._log("Waiting for scheduler pod")
await wait_for_scheduler(
self.name,
self.namespace,
timeout=self._resource_timeout,
)
except CrashLoopBackOffError as e:
try:
self._log("Waiting for scheduler pod")
await wait_for_scheduler(
self.name,
self.namespace,
timeout=self._resource_timeout,
)
except CrashLoopBackOffError as e:
logs = await self._get_logs()
pods = await core_api.list_namespaced_pod(
scheduler_pod = await Pod.get(
namespace=self.namespace,
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}",
)
await self._close()
raise SchedulerStartupError(
"Scheduler failed to start.",
"Scheduler Pod logs:",
logs[pods.items[0].metadata.name],
) from e
self._log("Waiting for scheduler service")
await wait_for_service(f"{self.name}-scheduler", self.namespace)
scheduler_address = await self._get_scheduler_address()
self._log("Connecting to scheduler")
await wait_for_scheduler_comm(scheduler_address)
self.scheduler_comm = rpc(scheduler_address)
local_port = self.scheduler_forward_port
if local_port:
local_port = int(local_port)
self._log("Getting dashboard URL")
dashboard_address = await get_scheduler_address(
f"{self.name}-scheduler",
self.namespace,
port_name="http-dashboard",
port_forward_cluster_ip=self.port_forward_cluster_ip,
local_port=local_port,
)
self.forwarded_dashboard_port = dashboard_address.split(":")[-1]
logs = await scheduler_pod.logs()
except Exception:
logs = "Failed to get logs."
await self._close()
raise SchedulerStartupError(
"Scheduler failed to start.",
"Scheduler Pod logs:",
logs,
) from e
self._log("Waiting for scheduler service")
await wait_for_service(f"{self.name}-scheduler", self.namespace)
scheduler_address = await self._get_scheduler_address()
self._log("Connecting to scheduler")
await wait_for_scheduler_comm(scheduler_address)
self.scheduler_comm = rpc(scheduler_address)
local_port = self.scheduler_forward_port
if local_port:
local_port = int(local_port)
self._log("Getting dashboard URL")
dashboard_address = await get_scheduler_address(
f"{self.name}-scheduler",
self.namespace,
port_name="http-dashboard",
port_forward_cluster_ip=self.port_forward_cluster_ip,
local_port=local_port,
)
self.forwarded_dashboard_port = dashboard_address.split(":")[-1]

async def _connect_cluster(self):
if self.shutdown_on_close is None:
Expand Down Expand Up @@ -1012,7 +1008,7 @@ def make_scheduler_spec(

async def wait_for_service(service_name, namespace):
"""Block until service is available."""
service = await Service.get(service_name, namespace)
service = await Service.get(service_name, namespace=namespace)
while not await service.ready():
await asyncio.sleep(0.1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,19 @@ def test_cluster_without_operator(docker_image):
KubeCluster(name="noop", n_workers=1, image=docker_image, resource_timeout=1)


def test_cluster_crashloopbackoff(kopf_runner, docker_image):
def test_cluster_crashloopbackoff(kopf_runner, docker_image, ns):
with kopf_runner:
with pytest.raises(SchedulerStartupError, match="Scheduler failed to start"):
spec = make_cluster_spec(name="crashloopbackoff", n_workers=1)
spec["spec"]["scheduler"]["spec"]["containers"][0]["args"][
0
] = "dask-schmeduler"
KubeCluster(custom_cluster_spec=spec, resource_timeout=1, idle_timeout=2)
KubeCluster(
custom_cluster_spec=spec,
namespace=ns,
resource_timeout=1,
idle_timeout=2,
)


def test_adapt(kopf_runner, docker_image):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1
kopf>=1.35.3
pykube-ng>=22.9.0
rich>=12.5.1
kr8s==0.8.9
kr8s==0.8.10