From adc50c0237a6f75ca0a3959ddff456969250e929 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 1 Aug 2023 16:50:33 +0100 Subject: [PATCH 1/6] Migrate KubeCluster create resource to kr8s --- .../operator/kubecluster/kubecluster.py | 142 +++++++++--------- requirements.txt | 2 +- 2 files changed, 68 insertions(+), 76 deletions(-) diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index 861ee15b2..b27c45223 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -12,6 +12,7 @@ import warnings import weakref import uuid +import httpx from rich import box from rich.live import Live @@ -332,86 +333,77 @@ async def _(): async def _create_cluster(self): if self.shutdown_on_close is None: self.shutdown_on_close = True - async with kubernetes.client.api_client.ApiClient() as api_client: - core_api = kubernetes.client.CoreV1Api(api_client) - custom_objects_api = kubernetes.client.CustomObjectsApi(api_client) - if not self._custom_cluster_spec: - self._log("Generating cluster spec") - data = make_cluster_spec( - name=self.name, - env=self.env, - resources=self.resources, - worker_command=self.worker_command, - n_workers=self.n_workers, - image=self.image, - scheduler_service_type=self.scheduler_service_type, - idle_timeout=self.idle_timeout, - ) + if not self._custom_cluster_spec: + self._log("Generating cluster spec") + data = make_cluster_spec( + name=self.name, + env=self.env, + resources=self.resources, + worker_command=self.worker_command, + n_workers=self.n_workers, + image=self.image, + scheduler_service_type=self.scheduler_service_type, + idle_timeout=self.idle_timeout, + ) + else: + data = self._custom_cluster_spec + try: + self._log("Creating DaskCluster object") + cluster = await DaskCluster(data, namespace=self.namespace) + await cluster.create() + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + raise RuntimeError( + "Failed to create DaskCluster resource." + "Are the Dask Custom Resource Definitions installed? " + "https://kubernetes.dask.org/en/latest/operator.html#installing-the-operator" + ) from e else: - data = self._custom_cluster_spec - try: - self._log("Creating DaskCluster object") - await custom_objects_api.create_namespaced_custom_object( - group="kubernetes.dask.org", - version="v1", - plural="daskclusters", - namespace=self.namespace, - body=data, - ) - except kubernetes.client.ApiException as e: - if e.status == 404: - raise RuntimeError( - "Failed to create DaskCluster resource." - "Are the Dask Custom Resource Definitions installed? " - "https://kubernetes.dask.org/en/latest/operator.html#installing-the-operator" - ) from e - else: - raise e - - try: - self._log("Waiting for controller to action cluster") - await self._wait_for_controller() - except TimeoutError as e: - await self._close() raise e - try: - self._log("Waiting for scheduler pod") - await wait_for_scheduler( - self.name, - self.namespace, - timeout=self._resource_timeout, - ) - except CrashLoopBackOffError as e: - logs = await self._get_logs() - pods = await core_api.list_namespaced_pod( - namespace=self.namespace, - label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}", - ) - await self._close() - raise SchedulerStartupError( - "Scheduler failed to start.", - "Scheduler Pod logs:", - logs[pods.items[0].metadata.name], - ) from e - self._log("Waiting for scheduler service") - await wait_for_service(f"{self.name}-scheduler", self.namespace) - scheduler_address = await self._get_scheduler_address() - self._log("Connecting to scheduler") - await wait_for_scheduler_comm(scheduler_address) - self.scheduler_comm = rpc(scheduler_address) - local_port = self.scheduler_forward_port - if local_port: - local_port = int(local_port) - self._log("Getting dashboard URL") - dashboard_address = await get_scheduler_address( - f"{self.name}-scheduler", + + try: + self._log("Waiting for controller to action cluster") + await self._wait_for_controller() + except TimeoutError as e: + await self._close() + raise e + try: + self._log("Waiting for scheduler pod") + await wait_for_scheduler( + self.name, self.namespace, - port_name="http-dashboard", - port_forward_cluster_ip=self.port_forward_cluster_ip, - local_port=local_port, + timeout=self._resource_timeout, ) - self.forwarded_dashboard_port = dashboard_address.split(":")[-1] + except CrashLoopBackOffError as e: + scheduler_pod = await Pod.get( + namespace=self.namespace, + label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}", + ) + await self._close() + raise SchedulerStartupError( + "Scheduler failed to start.", + "Scheduler Pod logs:", + scheduler_pod.logs(), + ) from e + self._log("Waiting for scheduler service") + await wait_for_service(f"{self.name}-scheduler", self.namespace) + scheduler_address = await self._get_scheduler_address() + self._log("Connecting to scheduler") + await wait_for_scheduler_comm(scheduler_address) + self.scheduler_comm = rpc(scheduler_address) + local_port = self.scheduler_forward_port + if local_port: + local_port = int(local_port) + self._log("Getting dashboard URL") + dashboard_address = await get_scheduler_address( + f"{self.name}-scheduler", + self.namespace, + port_name="http-dashboard", + port_forward_cluster_ip=self.port_forward_cluster_ip, + local_port=local_port, + ) + self.forwarded_dashboard_port = dashboard_address.split(":")[-1] async def _connect_cluster(self): if self.shutdown_on_close is None: diff --git a/requirements.txt b/requirements.txt index b08b2b081..a8e3ca7b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1 kopf>=1.35.3 pykube-ng>=22.9.0 rich>=12.5.1 -kr8s==0.8.9 +kr8s==0.8.10 From cb06050cb047f09658d4594051ae948da58df6e1 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 1 Aug 2023 17:42:01 +0100 Subject: [PATCH 2/6] Make failure more graceful --- .../operator/kubecluster/kubecluster.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index b27c45223..cd6aa538b 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -376,15 +376,19 @@ async def _create_cluster(self): timeout=self._resource_timeout, ) except CrashLoopBackOffError as e: - scheduler_pod = await Pod.get( - namespace=self.namespace, - label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}", - ) + try: + scheduler_pod = await Pod.get( + namespace=self.namespace, + label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}", + ) + logs = await scheduler_pod.logs() + except Exception: + logs = "Failed to get logs." await self._close() raise SchedulerStartupError( "Scheduler failed to start.", "Scheduler Pod logs:", - scheduler_pod.logs(), + logs, ) from e self._log("Waiting for scheduler service") await wait_for_service(f"{self.name}-scheduler", self.namespace) From c8f5dc956d37deb6a565dc99e851e2af9ae3a445 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 1 Aug 2023 21:35:05 +0100 Subject: [PATCH 3/6] Explicit kwarg --- dask_kubernetes/operator/kubecluster/kubecluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index cd6aa538b..890129696 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -1008,7 +1008,7 @@ def make_scheduler_spec( async def wait_for_service(service_name, namespace): """Block until service is available.""" - service = await Service.get(service_name, namespace) + service = await Service.get(service_name, namespace=namespace) while not await service.ready(): await asyncio.sleep(0.1) From bc93cc0092124adbec55d9c45379a4566d4c94a4 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 2 Aug 2023 11:09:33 +0100 Subject: [PATCH 4/6] Reduce logging noise --- dask_kubernetes/conftest.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dask_kubernetes/conftest.py b/dask_kubernetes/conftest.py index 20bcab40e..1fbcfd35d 100644 --- a/dask_kubernetes/conftest.py +++ b/dask_kubernetes/conftest.py @@ -1,5 +1,6 @@ import pytest +import logging import pathlib import os import subprocess @@ -17,6 +18,9 @@ check_dependency("kubectl") check_dependency("docker") +for mod in ("httpx", "httpcore"): + logging.getLogger().setLevel(logging.WARNING) + @pytest.fixture() def kopf_runner(k8s_cluster): From 15020d8fb85058fcf1705d237dc89fb391e191a6 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 2 Aug 2023 11:41:50 +0100 Subject: [PATCH 5/6] More logging and timeout tweaks --- dask_kubernetes/conftest.py | 2 +- dask_kubernetes/operator/_objects.py | 14 +++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/dask_kubernetes/conftest.py b/dask_kubernetes/conftest.py index 1fbcfd35d..ca020cd52 100644 --- a/dask_kubernetes/conftest.py +++ b/dask_kubernetes/conftest.py @@ -18,7 +18,7 @@ check_dependency("kubectl") check_dependency("docker") -for mod in ("httpx", "httpcore"): +for mod in ("httpx", "httpcore", "httpcore.http11"): logging.getLogger().setLevel(logging.WARNING) diff --git a/dask_kubernetes/operator/_objects.py b/dask_kubernetes/operator/_objects.py index 3f7ce6f00..4c7fc3c3d 100644 --- a/dask_kubernetes/operator/_objects.py +++ b/dask_kubernetes/operator/_objects.py @@ -3,6 +3,8 @@ from kr8s.asyncio.objects import APIObject, Pod, Deployment, Service +WAIT_TIMEOUT = 30 + class DaskCluster(APIObject): version = "kubernetes.dask.org/v1" @@ -115,7 +117,9 @@ async def deployments(self) -> List[Deployment]: ) async def cluster(self) -> DaskCluster: - return await DaskCluster.get(self.spec.cluster, namespace=self.namespace) + return await DaskCluster.get( + self.spec.cluster, namespace=self.namespace, timeout=WAIT_TIMEOUT + ) class DaskAutoscaler(APIObject): @@ -127,7 +131,9 @@ class DaskAutoscaler(APIObject): namespaced = True async def cluster(self) -> DaskCluster: - return await DaskCluster.get(self.spec.cluster, namespace=self.namespace) + return await DaskCluster.get( + self.spec.cluster, namespace=self.namespace, timeout=WAIT_TIMEOUT + ) class DaskJob(APIObject): @@ -139,7 +145,9 @@ class DaskJob(APIObject): namespaced = True async def cluster(self) -> DaskCluster: - return await DaskCluster.get(self.name, namespace=self.namespace) + return await DaskCluster.get( + self.name, namespace=self.namespace, timeout=WAIT_TIMEOUT + ) async def pod(self) -> Pod: pods = [] From cdc56ac0d96a3028aa92611ee84a18e6d6830cc3 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 2 Aug 2023 12:04:59 +0100 Subject: [PATCH 6/6] Run test in own namespace --- .../operator/kubecluster/tests/test_kubecluster.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py index 2d71ab9fc..6ed05bd7e 100644 --- a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py @@ -139,14 +139,19 @@ def test_cluster_without_operator(docker_image): KubeCluster(name="noop", n_workers=1, image=docker_image, resource_timeout=1) -def test_cluster_crashloopbackoff(kopf_runner, docker_image): +def test_cluster_crashloopbackoff(kopf_runner, docker_image, ns): with kopf_runner: with pytest.raises(SchedulerStartupError, match="Scheduler failed to start"): spec = make_cluster_spec(name="crashloopbackoff", n_workers=1) spec["spec"]["scheduler"]["spec"]["containers"][0]["args"][ 0 ] = "dask-schmeduler" - KubeCluster(custom_cluster_spec=spec, resource_timeout=1, idle_timeout=2) + KubeCluster( + custom_cluster_spec=spec, + namespace=ns, + resource_timeout=1, + idle_timeout=2, + ) def test_adapt(kopf_runner, docker_image):