diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index 4d4995c3d..6b25cbae2 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -455,7 +455,7 @@ async def _get_scheduler_address(self): async def _wait_for_controller(self): """Wait for the operator to set the status.phase.""" start = time.time() - cluster = await DaskCluster.get(self.name, namespace=self.namespace) + cluster = await DaskCluster.get(self.name, namespace=self.namespace, timeout=30) while start + self._resource_timeout > time.time(): if await cluster.ready(): return diff --git a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py index ce3cf8de3..105231dd9 100644 --- a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py @@ -24,7 +24,6 @@ def test_kubecluster(kopf_runner, docker_image, ns): assert client.submit(lambda x: x + 1, 10).result() == 11 -@pytest.mark.skip(reason="Skip while refactoring") @pytest.mark.anyio async def test_kubecluster_async(kopf_runner, docker_image, ns): with kopf_runner: @@ -39,7 +38,6 @@ async def test_kubecluster_async(kopf_runner, docker_image, ns): assert await client.submit(lambda x: x + 1, 10).result() == 11 -@pytest.mark.skip(reason="Skip while refactoring") def test_custom_worker_command(kopf_runner, docker_image, ns): with kopf_runner: with KubeCluster( @@ -53,7 +51,6 @@ def test_custom_worker_command(kopf_runner, docker_image, ns): assert client.submit(lambda x: x + 1, 10).result() == 11 -@pytest.mark.skip(reason="Skip while refactoring") def test_multiple_clusters(kopf_runner, docker_image, ns): with kopf_runner: with KubeCluster( @@ -68,7 +65,6 @@ def test_multiple_clusters(kopf_runner, docker_image, ns): assert client2.submit(lambda x: x + 1, 10).result() == 11 -@pytest.mark.skip(reason="Skip while refactoring") def test_clusters_with_custom_port_forward(kopf_runner, docker_image, ns): with kopf_runner: with KubeCluster( @@ -83,7 +79,6 @@ def test_clusters_with_custom_port_forward(kopf_runner, docker_image, ns): assert client1.submit(lambda x: x + 1, 10).result() == 11 -@pytest.mark.skip(reason="Skip while refactoring") def test_multiple_clusters_simultaneously(kopf_runner, docker_image, ns): with kopf_runner: with KubeCluster( @@ -96,7 +91,6 @@ def test_multiple_clusters_simultaneously(kopf_runner, docker_image, ns): assert client2.submit(lambda x: x + 1, 10).result() == 11 -@pytest.mark.skip(reason="Skip while refactoring") def test_multiple_clusters_simultaneously_same_loop(kopf_runner, docker_image, ns): with kopf_runner: with KubeCluster( @@ -114,7 +108,6 @@ def test_multiple_clusters_simultaneously_same_loop(kopf_runner, docker_image, n assert client2.submit(lambda x: x + 1, 10).result() == 11 -@pytest.mark.skip(reason="Skip while refactoring") @pytest.mark.anyio async def test_cluster_from_name(kopf_runner, docker_image, ns): with kopf_runner: @@ -133,7 +126,6 @@ async def test_cluster_from_name(kopf_runner, docker_image, ns): assert firstcluster.scheduler_info == secondcluster.scheduler_info -@pytest.mark.skip(reason="Skip while refactoring") def test_additional_worker_groups(kopf_runner, docker_image, ns): with kopf_runner: with KubeCluster( @@ -146,7 +138,6 @@ def test_additional_worker_groups(kopf_runner, docker_image, ns): cluster.delete_worker_group(name="more") -@pytest.mark.skip(reason="Skip while refactoring") def test_cluster_without_operator(docker_image, ns): with pytest.raises(TimeoutError, match="is the Dask Operator running"): KubeCluster( @@ -158,25 +149,28 @@ def test_cluster_without_operator(docker_image, ns): ) -@pytest.mark.skip(reason="Skip while refactoring") -@pytest.mark.flaky(reruns=0) -def test_cluster_crashloopbackoff(kopf_runner, docker_image): +def test_cluster_crashloopbackoff(kopf_runner, docker_image, ns): with kopf_runner: with pytest.raises(SchedulerStartupError, match="Scheduler failed to start"): spec = make_cluster_spec(name="crashloopbackoff", n_workers=1) spec["spec"]["scheduler"]["spec"]["containers"][0]["args"][ 0 ] = "dask-schmeduler" - KubeCluster(custom_cluster_spec=spec, resource_timeout=1, idle_timeout=2) + KubeCluster( + custom_cluster_spec=spec, + namespace=ns, + resource_timeout=1, + idle_timeout=2, + ) -@pytest.mark.skip(reason="Skip while refactoring") -def test_adapt(kopf_runner, docker_image): +def test_adapt(kopf_runner, docker_image, ns): with kopf_runner: with KubeCluster( name="adaptive", image=docker_image, n_workers=0, + namespace=ns, ) as cluster: cluster.adapt(minimum=0, maximum=1) with Client(cluster) as client: @@ -188,17 +182,17 @@ def test_adapt(kopf_runner, docker_image): cluster.scale(0) -@pytest.mark.skip(reason="Skip while refactoring") -def test_custom_spec(kopf_runner, docker_image): +def test_custom_spec(kopf_runner, docker_image, ns): with kopf_runner: spec = make_cluster_spec("customspec", image=docker_image) - with KubeCluster(custom_cluster_spec=spec, n_workers=1) as cluster: + with KubeCluster( + custom_cluster_spec=spec, n_workers=1, namespace=ns + ) as cluster: with Client(cluster) as client: assert client.submit(lambda x: x + 1, 10).result() == 11 -@pytest.mark.skip(reason="Skip while refactoring") -def test_typo_resource_limits(): +def test_typo_resource_limits(ns): with pytest.raises(ValueError): KubeCluster( name="foo", @@ -207,4 +201,5 @@ def test_typo_resource_limits(): "CPU": "1", }, }, + namespace=ns, ) diff --git a/setup.cfg b/setup.cfg index 1e7ab33fa..8c72c205a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -39,4 +39,4 @@ parentdir_prefix = dask-kubernetes- addopts = -v --keep-cluster --durations=10 timeout = 60 timeout_func_only = true -reruns = 5 +reruns = 10