diff --git a/dask_kubernetes/conftest.py b/dask_kubernetes/conftest.py index 20bcab40e..ca020cd52 100644 --- a/dask_kubernetes/conftest.py +++ b/dask_kubernetes/conftest.py @@ -1,5 +1,6 @@ import pytest +import logging import pathlib import os import subprocess @@ -17,6 +18,9 @@ check_dependency("kubectl") check_dependency("docker") +for mod in ("httpx", "httpcore", "httpcore.http11"): + logging.getLogger().setLevel(logging.WARNING) + @pytest.fixture() def kopf_runner(k8s_cluster): diff --git a/dask_kubernetes/operator/_objects.py b/dask_kubernetes/operator/_objects.py index 3f7ce6f00..4c7fc3c3d 100644 --- a/dask_kubernetes/operator/_objects.py +++ b/dask_kubernetes/operator/_objects.py @@ -3,6 +3,8 @@ from kr8s.asyncio.objects import APIObject, Pod, Deployment, Service +WAIT_TIMEOUT = 30 + class DaskCluster(APIObject): version = "kubernetes.dask.org/v1" @@ -115,7 +117,9 @@ async def deployments(self) -> List[Deployment]: ) async def cluster(self) -> DaskCluster: - return await DaskCluster.get(self.spec.cluster, namespace=self.namespace) + return await DaskCluster.get( + self.spec.cluster, namespace=self.namespace, timeout=WAIT_TIMEOUT + ) class DaskAutoscaler(APIObject): @@ -127,7 +131,9 @@ class DaskAutoscaler(APIObject): namespaced = True async def cluster(self) -> DaskCluster: - return await DaskCluster.get(self.spec.cluster, namespace=self.namespace) + return await DaskCluster.get( + self.spec.cluster, namespace=self.namespace, timeout=WAIT_TIMEOUT + ) class DaskJob(APIObject): @@ -139,7 +145,9 @@ class DaskJob(APIObject): namespaced = True async def cluster(self) -> DaskCluster: - return await DaskCluster.get(self.name, namespace=self.namespace) + return await DaskCluster.get( + self.name, namespace=self.namespace, timeout=WAIT_TIMEOUT + ) async def pod(self) -> Pod: pods = [] diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index 861ee15b2..890129696 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -12,6 +12,7 @@ import warnings import weakref import uuid +import httpx from rich import box from rich.live import Live @@ -332,86 +333,81 @@ async def _(): async def _create_cluster(self): if self.shutdown_on_close is None: self.shutdown_on_close = True - async with kubernetes.client.api_client.ApiClient() as api_client: - core_api = kubernetes.client.CoreV1Api(api_client) - custom_objects_api = kubernetes.client.CustomObjectsApi(api_client) - if not self._custom_cluster_spec: - self._log("Generating cluster spec") - data = make_cluster_spec( - name=self.name, - env=self.env, - resources=self.resources, - worker_command=self.worker_command, - n_workers=self.n_workers, - image=self.image, - scheduler_service_type=self.scheduler_service_type, - idle_timeout=self.idle_timeout, - ) + if not self._custom_cluster_spec: + self._log("Generating cluster spec") + data = make_cluster_spec( + name=self.name, + env=self.env, + resources=self.resources, + worker_command=self.worker_command, + n_workers=self.n_workers, + image=self.image, + scheduler_service_type=self.scheduler_service_type, + idle_timeout=self.idle_timeout, + ) + else: + data = self._custom_cluster_spec + try: + self._log("Creating DaskCluster object") + cluster = await DaskCluster(data, namespace=self.namespace) + await cluster.create() + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + raise RuntimeError( + "Failed to create DaskCluster resource." + "Are the Dask Custom Resource Definitions installed? " + "https://kubernetes.dask.org/en/latest/operator.html#installing-the-operator" + ) from e else: - data = self._custom_cluster_spec - try: - self._log("Creating DaskCluster object") - await custom_objects_api.create_namespaced_custom_object( - group="kubernetes.dask.org", - version="v1", - plural="daskclusters", - namespace=self.namespace, - body=data, - ) - except kubernetes.client.ApiException as e: - if e.status == 404: - raise RuntimeError( - "Failed to create DaskCluster resource." - "Are the Dask Custom Resource Definitions installed? " - "https://kubernetes.dask.org/en/latest/operator.html#installing-the-operator" - ) from e - else: - raise e - - try: - self._log("Waiting for controller to action cluster") - await self._wait_for_controller() - except TimeoutError as e: - await self._close() raise e + + try: + self._log("Waiting for controller to action cluster") + await self._wait_for_controller() + except TimeoutError as e: + await self._close() + raise e + try: + self._log("Waiting for scheduler pod") + await wait_for_scheduler( + self.name, + self.namespace, + timeout=self._resource_timeout, + ) + except CrashLoopBackOffError as e: try: - self._log("Waiting for scheduler pod") - await wait_for_scheduler( - self.name, - self.namespace, - timeout=self._resource_timeout, - ) - except CrashLoopBackOffError as e: - logs = await self._get_logs() - pods = await core_api.list_namespaced_pod( + scheduler_pod = await Pod.get( namespace=self.namespace, label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}", ) - await self._close() - raise SchedulerStartupError( - "Scheduler failed to start.", - "Scheduler Pod logs:", - logs[pods.items[0].metadata.name], - ) from e - self._log("Waiting for scheduler service") - await wait_for_service(f"{self.name}-scheduler", self.namespace) - scheduler_address = await self._get_scheduler_address() - self._log("Connecting to scheduler") - await wait_for_scheduler_comm(scheduler_address) - self.scheduler_comm = rpc(scheduler_address) - local_port = self.scheduler_forward_port - if local_port: - local_port = int(local_port) - self._log("Getting dashboard URL") - dashboard_address = await get_scheduler_address( - f"{self.name}-scheduler", - self.namespace, - port_name="http-dashboard", - port_forward_cluster_ip=self.port_forward_cluster_ip, - local_port=local_port, - ) - self.forwarded_dashboard_port = dashboard_address.split(":")[-1] + logs = await scheduler_pod.logs() + except Exception: + logs = "Failed to get logs." + await self._close() + raise SchedulerStartupError( + "Scheduler failed to start.", + "Scheduler Pod logs:", + logs, + ) from e + self._log("Waiting for scheduler service") + await wait_for_service(f"{self.name}-scheduler", self.namespace) + scheduler_address = await self._get_scheduler_address() + self._log("Connecting to scheduler") + await wait_for_scheduler_comm(scheduler_address) + self.scheduler_comm = rpc(scheduler_address) + local_port = self.scheduler_forward_port + if local_port: + local_port = int(local_port) + self._log("Getting dashboard URL") + dashboard_address = await get_scheduler_address( + f"{self.name}-scheduler", + self.namespace, + port_name="http-dashboard", + port_forward_cluster_ip=self.port_forward_cluster_ip, + local_port=local_port, + ) + self.forwarded_dashboard_port = dashboard_address.split(":")[-1] async def _connect_cluster(self): if self.shutdown_on_close is None: @@ -1012,7 +1008,7 @@ def make_scheduler_spec( async def wait_for_service(service_name, namespace): """Block until service is available.""" - service = await Service.get(service_name, namespace) + service = await Service.get(service_name, namespace=namespace) while not await service.ready(): await asyncio.sleep(0.1) diff --git a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py index 2d71ab9fc..6ed05bd7e 100644 --- a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py @@ -139,14 +139,19 @@ def test_cluster_without_operator(docker_image): KubeCluster(name="noop", n_workers=1, image=docker_image, resource_timeout=1) -def test_cluster_crashloopbackoff(kopf_runner, docker_image): +def test_cluster_crashloopbackoff(kopf_runner, docker_image, ns): with kopf_runner: with pytest.raises(SchedulerStartupError, match="Scheduler failed to start"): spec = make_cluster_spec(name="crashloopbackoff", n_workers=1) spec["spec"]["scheduler"]["spec"]["containers"][0]["args"][ 0 ] = "dask-schmeduler" - KubeCluster(custom_cluster_spec=spec, resource_timeout=1, idle_timeout=2) + KubeCluster( + custom_cluster_spec=spec, + namespace=ns, + resource_timeout=1, + idle_timeout=2, + ) def test_adapt(kopf_runner, docker_image): diff --git a/requirements.txt b/requirements.txt index b08b2b081..a8e3ca7b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1 kopf>=1.35.3 pykube-ng>=22.9.0 rich>=12.5.1 -kr8s==0.8.9 +kr8s==0.8.10