From 82c1d939b719ad7f961836f93dd7ea8c9e988514 Mon Sep 17 00:00:00 2001 From: Jonas Dedden Date: Fri, 19 Apr 2024 15:38:31 +0200 Subject: [PATCH] General code quality improvements for the controller part of this repository. Make sure exported symbols are correctly set in __init__.py with __all__. Renamed pytest fixture for creating a k8s namespace from ns to namespace since ns is a bit overloaded, especially in test_controller.py Introduce typehints in controller.py and test_controller.py to massively improve developer experience (previously, one just really had to guess what functions are for and which types are expected) Removed unused function parameters Made function headers (i.e. parameter [type] defintion) compatible with what `kopf.on.[create/change/...]' expects Introduce mypy (and missing stubs of external packages if available) Introduce mypy ignores for (still) untyped parts of the codebase for now --- dask_kubernetes/conftest.py | 29 +- dask_kubernetes/operator/__init__.py | 8 + .../operator/controller/controller.py | 401 ++++++++++++------ .../controller/tests/test_controller.py | 196 ++++++--- .../operator/kubecluster/__init__.py | 8 + .../kubecluster/tests/test_kubecluster.py | 73 ++-- requirements-test.txt | 2 + 7 files changed, 480 insertions(+), 237 deletions(-) diff --git a/dask_kubernetes/conftest.py b/dask_kubernetes/conftest.py index 52a036f59..ce165b9e3 100644 --- a/dask_kubernetes/conftest.py +++ b/dask_kubernetes/conftest.py @@ -6,12 +6,13 @@ import sys import tempfile import uuid +from typing import Final, Iterator import pytest from kopf.testing import KopfRunner from pytest_kind.cluster import KindCluster -DIR = pathlib.Path(__file__).parent.absolute() +DIR: Final[pathlib.Path] = pathlib.Path(__file__).parent.absolute() def check_dependency(dependency): @@ -26,24 +27,24 @@ def check_dependency(dependency): check_dependency("kubectl") check_dependency("docker") -DISABLE_LOGGERS = ["httpcore", "httpx"] +DISABLE_LOGGERS: Final[list[str]] = ["httpcore", "httpx"] -def pytest_configure(): +def pytest_configure() -> None: for logger_name in DISABLE_LOGGERS: logger = logging.getLogger(logger_name) logger.disabled = True @pytest.fixture() -def kopf_runner(k8s_cluster, ns): +def kopf_runner(k8s_cluster: KindCluster, namespace: str) -> KopfRunner: yield KopfRunner( - ["run", "-m", "dask_kubernetes.operator", "--verbose", "--namespace", ns] + ["run", "-m", "dask_kubernetes.operator", "--verbose", "--namespace", namespace] ) @pytest.fixture(scope="session") -def docker_image(): +def docker_image() -> str: image_name = "dask-kubernetes:dev" python_version = f"{sys.version_info.major}.{sys.version_info.minor}" subprocess.run( @@ -62,7 +63,9 @@ def docker_image(): @pytest.fixture(scope="session") -def k8s_cluster(request, docker_image): +def k8s_cluster( + request: pytest.FixtureRequest, docker_image: str +) -> Iterator[KindCluster]: image = None if version := os.environ.get("KUBERNETES_VERSION"): image = f"kindest/node:v{version}" @@ -81,7 +84,7 @@ def k8s_cluster(request, docker_image): @pytest.fixture(scope="session", autouse=True) -def install_istio(k8s_cluster): +def install_istio(k8s_cluster: KindCluster) -> None: if bool(os.environ.get("TEST_ISTIO", False)): check_dependency("istioctl") subprocess.run( @@ -93,7 +96,7 @@ def install_istio(k8s_cluster): @pytest.fixture(autouse=True) -def ns(k8s_cluster): +def namespace(k8s_cluster: KindCluster) -> Iterator[str]: ns = "dask-k8s-pytest-" + uuid.uuid4().hex[:10] k8s_cluster.kubectl("create", "ns", ns) yield ns @@ -101,7 +104,7 @@ def ns(k8s_cluster): @pytest.fixture(scope="session", autouse=True) -def install_gateway(k8s_cluster): +def install_gateway(k8s_cluster: KindCluster) -> Iterator[None]: if bool(os.environ.get("TEST_DASK_GATEWAY", False)): check_dependency("helm") # To ensure the operator can coexist with Gateway @@ -137,11 +140,11 @@ def install_gateway(k8s_cluster): @pytest.fixture(scope="session", autouse=True) -def customresources(k8s_cluster): +def customresources(k8s_cluster: KindCluster) -> Iterator[None]: temp_dir = tempfile.TemporaryDirectory() crd_path = os.path.join(DIR, "operator", "customresources") - def run_generate(crd_path, patch_path, temp_path): + def run_generate(crd_path: str, patch_path: str, temp_path: str) -> None: subprocess.run( ["k8s-crd-resolver", "-r", "-j", patch_path, crd_path, temp_path], check=True, @@ -162,5 +165,5 @@ def run_generate(crd_path, patch_path, temp_path): @pytest.fixture -def anyio_backend(): +def anyio_backend() -> str: return "asyncio" diff --git a/dask_kubernetes/operator/__init__.py b/dask_kubernetes/operator/__init__.py index 4afe88da7..98d418a17 100644 --- a/dask_kubernetes/operator/__init__.py +++ b/dask_kubernetes/operator/__init__.py @@ -5,3 +5,11 @@ make_worker_spec, discover, ) + +__all__ = [ + "KubeCluster", + "make_cluster_spec", + "make_scheduler_spec", + "make_worker_spec", + "discover", +] diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index 320ea7579..9dae024f1 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -1,19 +1,22 @@ +from __future__ import annotations + import asyncio import copy import time from collections import defaultdict from contextlib import suppress from datetime import datetime +from typing import TYPE_CHECKING, Any, Awaitable, Final, cast from uuid import uuid4 import aiohttp import dask.config import kopf -import kr8s +import kr8s # type: ignore[import-untyped] import pkg_resources from distributed.core import clean_exception, rpc from distributed.protocol.pickle import dumps -from kr8s.asyncio.objects import Deployment, Pod, Service +from kr8s.asyncio.objects import Deployment, Pod, Service # type: ignore[import-untyped] from dask_kubernetes.constants import SCHEDULER_NAME_TEMPLATE from dask_kubernetes.exceptions import ValidationError @@ -26,18 +29,23 @@ from dask_kubernetes.operator.networking import get_scheduler_address from dask_kubernetes.operator.validation import validate_cluster_name -_ANNOTATION_NAMESPACES_TO_IGNORE = ( +if TYPE_CHECKING: + from distributed import Scheduler + +_ANNOTATION_NAMESPACES_TO_IGNORE: Final[tuple[str, ...]] = ( "kopf.zalando.org", "kubectl.kubernetes.io", ) -_LABEL_NAMESPACES_TO_IGNORE = () +_LABEL_NAMESPACES_TO_IGNORE: Final[tuple[str, ...]] = () -KUBERNETES_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" +KUBERNETES_DATETIME_FORMAT: Final[str] = "%Y-%m-%dT%H:%M:%SZ" -DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION = "kubernetes.dask.org/cooldown-until" +DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION: Final[ + str +] = "kubernetes.dask.org/cooldown-until" # Load operator plugins from other packages -PLUGINS = [] +PLUGINS: list[Any] = [] for ep in pkg_resources.iter_entry_points(group="dask_operator_plugin"): with suppress(AttributeError, ImportError): PLUGINS.append(ep.load()) @@ -47,7 +55,7 @@ class SchedulerCommError(Exception): """Raised when unable to communicate with a scheduler.""" -def _get_annotations(meta): +def _get_annotations(meta: kopf.Meta) -> dict[str, str]: return { annotation_key: annotation_value for annotation_key, annotation_value in meta.annotations.items() @@ -58,7 +66,7 @@ def _get_annotations(meta): } -def _get_labels(meta): +def _get_labels(meta: kopf.Meta) -> dict[str, str]: return { label_key: label_value for label_key, label_value in meta.labels.items() @@ -69,15 +77,16 @@ def _get_labels(meta): def build_scheduler_deployment_spec( - cluster_name, namespace, pod_spec, annotations, labels -): - labels.update( - **{ - "dask.org/cluster-name": cluster_name, - "dask.org/component": "scheduler", - "sidecar.istio.io/inject": "false", - } - ) + cluster_name: str, + pod_spec: kopf.Spec, + annotations: kopf.Annotations, + labels: kopf.Labels, +) -> dict[str, Any]: + labels = dict(labels) | { + "dask.org/cluster-name": cluster_name, + "dask.org/component": "scheduler", + "sidecar.istio.io/inject": "false", + } metadata = { "name": SCHEDULER_NAME_TEMPLATE.format(cluster_name=cluster_name), "labels": labels, @@ -101,13 +110,16 @@ def build_scheduler_deployment_spec( } -def build_scheduler_service_spec(cluster_name, spec, annotations, labels): - labels.update( - **{ - "dask.org/cluster-name": cluster_name, - "dask.org/component": "scheduler", - } - ) +def build_scheduler_service_spec( + cluster_name: str, + spec: kopf.Spec, + annotations: kopf.Annotations, + labels: kopf.Labels, +) -> dict[str, Any]: + labels = dict(labels) | { + "dask.org/cluster-name": cluster_name, + "dask.org/component": "scheduler", + } return { "apiVersion": "v1", "kind": "Service", @@ -121,37 +133,40 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels): def build_worker_deployment_spec( - worker_group_name, namespace, cluster_name, uuid, pod_spec, annotations, labels -): - labels.update( - **{ - "dask.org/cluster-name": cluster_name, - "dask.org/workergroup-name": worker_group_name, - "dask.org/component": "worker", - "sidecar.istio.io/inject": "false", - } - ) + worker_group_name: str, + namespace: str, + cluster_name: str, + uuid: str, + pod_spec: kopf.Spec, + annotations: kopf.Annotations, + labels: kopf.Labels, +) -> dict[str, Any]: + labels = dict(labels) | { + "dask.org/cluster-name": cluster_name, + "dask.org/workergroup-name": worker_group_name, + "dask.org/component": "worker", + "sidecar.istio.io/inject": "false", + } worker_name = f"{worker_group_name}-worker-{uuid}" metadata = { "name": worker_name, "labels": labels, "annotations": annotations, } - spec = { - "replicas": 1, - "selector": { - "matchLabels": labels, - }, - "template": { - "metadata": metadata, - "spec": copy.deepcopy(pod_spec), - }, - } - deployment_spec = { + deployment_spec: dict[str, Any] = { "apiVersion": "apps/v1", "kind": "Deployment", "metadata": metadata, - "spec": spec, + "spec": { + "replicas": 1, + "selector": { + "matchLabels": labels, + }, + "template": { + "metadata": metadata, + "spec": copy.deepcopy(pod_spec), + }, + }, } worker_env = { "name": "DASK_WORKER_NAME", @@ -175,19 +190,24 @@ def build_worker_deployment_spec( return deployment_spec -def get_job_runner_pod_name(job_name): +def get_job_runner_pod_name(job_name: str) -> str: return f"{job_name}-runner" -def build_job_pod_spec(job_name, cluster_name, namespace, spec, annotations, labels): - labels.update( - **{ - "dask.org/cluster-name": cluster_name, - "dask.org/component": "job-runner", - "sidecar.istio.io/inject": "false", - } - ) - pod_spec = { +def build_job_pod_spec( + job_name: str, + cluster_name: str, + namespace: str, + spec: kopf.Spec, + annotations: kopf.Annotations, + labels: kopf.Labels, +) -> dict[str, Any]: + labels = dict(labels) | { + "dask.org/cluster-name": cluster_name, + "dask.org/component": "job-runner", + "sidecar.istio.io/inject": "false", + } + pod_spec: dict[str, Any] = { "apiVersion": "v1", "kind": "Pod", "metadata": { @@ -213,19 +233,22 @@ def build_job_pod_spec(job_name, cluster_name, namespace, spec, annotations, lab return pod_spec -def build_default_worker_group_spec(cluster_name, spec, annotations, labels): - labels.update( - **{ - "dask.org/cluster-name": cluster_name, - "dask.org/component": "workergroup", - } - ) +def build_default_worker_group_spec( + cluster_name: str, + spec: kopf.Spec, + annotations: kopf.Annotations, + labels: kopf.Labels, +) -> dict[str, Any]: return { "apiVersion": "kubernetes.dask.org/v1", "kind": "DaskWorkerGroup", "metadata": { "name": f"{cluster_name}-default", - "labels": labels, + "labels": dict(labels) + | { + "dask.org/cluster-name": cluster_name, + "dask.org/component": "workergroup", + }, "annotations": annotations, }, "spec": { @@ -235,18 +258,22 @@ def build_default_worker_group_spec(cluster_name, spec, annotations, labels): } -def build_cluster_spec(name, worker_spec, scheduler_spec, annotations, labels): - labels.update( - **{ - "dask.org/cluster-name": name, - } - ) +def build_cluster_spec( + name: str, + worker_spec: kopf.Spec, + scheduler_spec: kopf.Spec, + annotations: kopf.Annotations, + labels: kopf.Labels, +) -> dict[str, Any]: return { "apiVersion": "kubernetes.dask.org/v1", "kind": "DaskCluster", "metadata": { "name": name, - "labels": labels, + "labels": dict(labels) + | { + "dask.org/cluster-name": name, + }, "annotations": annotations, }, "spec": {"worker": worker_spec, "scheduler": scheduler_spec}, @@ -254,7 +281,7 @@ def build_cluster_spec(name, worker_spec, scheduler_spec, annotations, labels): @kopf.on.startup() -async def startup(settings: kopf.OperatorSettings, **kwargs): +async def startup(settings: kopf.OperatorSettings, **__: Any) -> None: # Set server and client timeouts to reconnect from time to time. # In rare occasions the connection might go idle we will no longer receive any events. # These timeouts should help in those cases. @@ -272,16 +299,24 @@ async def startup(settings: kopf.OperatorSettings, **kwargs): # There may be useful things for us to expose via the liveness probe # https://kopf.readthedocs.io/en/stable/probing/#probe-handlers @kopf.on.probe(id="now") -def get_current_timestamp(**kwargs): +def get_current_timestamp(**__: Any) -> str: return datetime.utcnow().isoformat() @kopf.on.create("daskcluster.kubernetes.dask.org") -async def daskcluster_create(name, namespace, logger, patch, **kwargs): +async def daskcluster_create( + name: str | None, + namespace: str | None, + patch: kopf.Patch, + logger: kopf.Logger, + **__: Any, +) -> None: """When DaskCluster resource is created set the status.phase. This allows us to track that the operator is running. """ + if name is None or namespace is None: + raise kopf.PermanentError("Name or Namespace not given.") logger.info(f"DaskCluster {name} created in {namespace}.") try: validate_cluster_name(name) @@ -294,9 +329,17 @@ async def daskcluster_create(name, namespace, logger, patch, **kwargs): @kopf.on.field("daskcluster.kubernetes.dask.org", field="status.phase", new="Created") async def daskcluster_create_components( - spec, name, namespace, logger, patch, meta, **kwargs -): + spec: kopf.Spec, + name: str | None, + namespace: str | None, + logger: kopf.Logger, + patch: kopf.Patch, + meta: kopf.Meta, + **__: Any, +) -> None: """When the DaskCluster status.phase goes into Created create the cluster components.""" + if name is None or namespace is None: + raise kopf.PermanentError("Name or Namespace not given.") logger.info("Creating Dask cluster components.") # Create scheduler deployment @@ -309,7 +352,7 @@ async def daskcluster_create_components( if "labels" in scheduler_spec["metadata"]: labels.update(**scheduler_spec["metadata"]["labels"]) data = build_scheduler_deployment_spec( - name, namespace, scheduler_spec.get("spec"), annotations, labels + name, scheduler_spec.get("spec"), annotations, labels ) kopf.adopt(data) scheduler_deployment = await Deployment(data, namespace=namespace) @@ -349,8 +392,14 @@ async def daskcluster_create_components( @kopf.on.field("service", field="status", labels={"dask.org/component": "scheduler"}) async def handle_scheduler_service_status( - spec, labels, status, namespace, logger, **kwargs -): + spec: kopf.Spec, + labels: kopf.Labels, + status: kopf.Status, + namespace: str | None, + **__: Any, +) -> None: + if namespace is None: + raise kopf.PermanentError("Namespace not given.") # If the Service is a LoadBalancer with no ingress endpoints mark the cluster as Pending if spec["type"] == "LoadBalancer" and not len( status.get("load_balancer", {}).get("ingress", []) @@ -366,27 +415,40 @@ async def handle_scheduler_service_status( @kopf.on.create("daskworkergroup.kubernetes.dask.org") -async def daskworkergroup_create(body, namespace, logger, **kwargs): +async def daskworkergroup_create( + body: kopf.Body, namespace: str | None, logger: kopf.Logger, **kwargs: Any +) -> None: + if namespace is None: + raise kopf.PermanentError("Namespace not given.") wg = await DaskWorkerGroup(body, namespace=namespace) cluster = await wg.cluster() await cluster.adopt(wg) logger.info(f"Successfully adopted by {cluster.name}") del kwargs["new"] - await daskworkergroup_replica_update( - body=body, - logger=logger, - new=wg.replicas, - namespace=namespace, - **kwargs, + await cast( + Awaitable[None], + daskworkergroup_replica_update( + body=body, + logger=logger, + new=wg.replicas, + namespace=namespace, + **kwargs, + ), ) async def retire_workers( - n_workers, scheduler_service_name, worker_group_name, namespace, logger -): + n_workers: int, + scheduler_service_name: str, + worker_group_name: str, + namespace: str | None, + logger: kopf.Logger, +) -> list[str]: + if namespace is None: + raise kopf.PermanentError("Namespace not given.") # Try gracefully retiring via the HTTP API - dashboard_address = await get_scheduler_address( + dashboard_address = await get_scheduler_address( # type: ignore[no-untyped-call] scheduler_service_name, namespace, port_name="http-dashboard", @@ -412,18 +474,18 @@ async def retire_workers( ) # Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways with suppress(Exception): - comm_address = await get_scheduler_address( + comm_address = await get_scheduler_address( # type: ignore[no-untyped-call] scheduler_service_name, namespace, allow_external=False, ) - async with rpc(comm_address) as scheduler_comm: + async with rpc(comm_address) as scheduler_comm: # type: ignore[no-untyped-call] workers_to_close = await scheduler_comm.workers_to_close( n=n_workers, attribute="name", ) await scheduler_comm.retire_workers(names=workers_to_close) - return workers_to_close + return cast(list[str], workers_to_close) # Finally fall back to last-in-first-out scaling logger.warning( @@ -438,9 +500,13 @@ async def retire_workers( return [w.name for w in workers[:-n_workers]] -async def check_scheduler_idle(scheduler_service_name, namespace, logger): +async def check_scheduler_idle( + scheduler_service_name: str, namespace: str | None, logger: kopf.Logger +) -> str: + if namespace is None: + raise kopf.PermanentError("Namespace not given.") # Try getting idle time via HTTP API - dashboard_address = await get_scheduler_address( + dashboard_address = await get_scheduler_address( # type: ignore[no-untyped-call] scheduler_service_name, namespace, port_name="http-dashboard", @@ -453,7 +519,7 @@ async def check_scheduler_idle(scheduler_service_name, namespace, logger): idle_since = (await resp.json())["idle_since"] if idle_since: logger.debug("Scheduler idle since: %s", idle_since) - return idle_since + return cast(str, idle_since) logger.debug( "Received %d response from scheduler API with body %s", resp.status, @@ -466,50 +532,61 @@ async def check_scheduler_idle(scheduler_service_name, namespace, logger): ) # Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways with suppress(Exception): - comm_address = await get_scheduler_address( + comm_address = await get_scheduler_address( # type: ignore[no-untyped-call] scheduler_service_name, namespace, allow_external=False, ) - async with rpc(comm_address) as scheduler_comm: + async with rpc(comm_address) as scheduler_comm: # type: ignore[no-untyped-call] idle_since = await scheduler_comm.check_idle() if idle_since: logger.debug("Scheduler idle since: %s", idle_since) - return idle_since + return cast(str, idle_since) # Finally fall back to code injection via the Dask RPC for distributed<=2023.3.1 logger.debug( f"Checking {scheduler_service_name} idleness failed via the Dask RPC, falling back to run_on_scheduler" ) - def idle_since(dask_scheduler=None): + def idle_since_func( + dask_scheduler: Scheduler + | None = None, # TODO: why is None allowed here? It will crash immediately. + ) -> float: + if not dask_scheduler: + raise TypeError("Dask Scheduler is None.") if not dask_scheduler.idle_timeout: dask_scheduler.idle_timeout = 300 dask_scheduler.check_idle() - return dask_scheduler.idle_since + return cast(float, dask_scheduler.idle_since) - comm_address = await get_scheduler_address( + comm_address = await get_scheduler_address( # type: ignore[no-untyped-call] scheduler_service_name, namespace, allow_external=False, ) - async with rpc(comm_address) as scheduler_comm: + async with rpc(comm_address) as scheduler_comm: # type: ignore[no-untyped-call] response = await scheduler_comm.run_function( - function=dumps(idle_since), + function=dumps(idle_since_func), # type: ignore[no-untyped-call] ) if response["status"] == "error": typ, exc, tb = clean_exception(**response) + if exc is None: + raise TypeError("Exception was None.") raise exc.with_traceback(tb) else: idle_since = response["result"] if idle_since: logger.debug("Scheduler idle since: %s", idle_since) - return idle_since + return cast(str, idle_since) -async def get_desired_workers(scheduler_service_name, namespace, logger): +async def get_desired_workers( + scheduler_service_name: str, namespace: str | None +) -> Any: + if namespace is None: + raise kopf.PermanentError("Namespace not given.") # Try gracefully retiring via the HTTP API - dashboard_address = await get_scheduler_address( + dashboard_address = await get_scheduler_address( # type: ignore[no-untyped-call] scheduler_service_name, namespace, port_name="http-dashboard", @@ -525,12 +602,12 @@ async def get_desired_workers(scheduler_service_name, namespace, logger): # Otherwise try gracefully retiring via the RPC # Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways try: - comm_address = await get_scheduler_address( + comm_address = await get_scheduler_address( # type: ignore[no-untyped-call] scheduler_service_name, namespace, allow_external=False, ) - async with rpc(comm_address) as scheduler_comm: + async with rpc(comm_address) as scheduler_comm: # type: ignore[no-untyped-call] return await scheduler_comm.adaptive_target() except Exception as e: raise SchedulerCommError( @@ -538,22 +615,39 @@ async def get_desired_workers(scheduler_service_name, namespace, logger): ) from e -worker_group_scale_locks = defaultdict(lambda: asyncio.Lock()) +worker_group_scale_locks: dict[str, asyncio.Lock] = defaultdict(lambda: asyncio.Lock()) @kopf.on.field("daskcluster.kubernetes.dask.org", field="spec.worker.replicas") async def daskcluster_default_worker_group_replica_update( - name, namespace, old, new, **kwargs -): + name: str | None, + namespace: str | None, + old: Any | None, + new: Any | None, + **__: Any, +) -> None: + if name is None or namespace is None: + raise kopf.PermanentError("Name or Namespace not given.") if old is not None: + if not all(isinstance(num, int) for num in (old, new)): + raise kopf.PermanentError("Old or new replicas are not int.") wg = await DaskWorkerGroup.get(f"{name}-default", namespace=namespace) await wg.scale(new) @kopf.on.field("daskworkergroup.kubernetes.dask.org", field="spec.worker.replicas") async def daskworkergroup_replica_update( - name, namespace, meta, spec, new, body, logger, **kwargs -): + name: str | None, + namespace: str | None, + meta: kopf.Meta, + spec: kopf.Spec, + new: Any | None, + body: kopf.Body, + logger: kopf.Logger, + **__: Any, +) -> None: + if name is None or namespace is None: + raise kopf.PermanentError("Name or Namespace not given.") cluster_name = spec["cluster"] wg = await DaskWorkerGroup(body, namespace=namespace) try: @@ -572,6 +666,8 @@ async def daskworkergroup_replica_update( label_selector={"dask.org/workergroup-name": name}, ) ) + if not isinstance(new, int): + raise kopf.PermanentError("New is not an int.") desired_workers = new workers_needed = desired_workers - current_workers labels = _get_labels(meta) @@ -632,14 +728,26 @@ async def daskworkergroup_replica_update( @kopf.on.delete("daskworkergroup.kubernetes.dask.org", optional=True) -async def daskworkergroup_remove(name, namespace, **kwargs): +async def daskworkergroup_remove( + name: str | None, namespace: str | None, **__: Any +) -> None: + if name is None or namespace is None: + raise kopf.PermanentError("Name or Namespace not given.") lock_key = f"{name}/{namespace}" if lock_key in worker_group_scale_locks: del worker_group_scale_locks[lock_key] @kopf.on.create("daskjob.kubernetes.dask.org") -async def daskjob_create(name, namespace, logger, patch, **kwargs): +async def daskjob_create( + name: str | None, + namespace: str | None, + logger: kopf.Logger, + patch: kopf.Patch, + **__: Any, +) -> None: + if name is None or namespace is None: + raise kopf.PermanentError("Name or Namespace not given.") logger.info(f"A DaskJob has been created called {name} in {namespace}.") patch.status["jobStatus"] = "JobCreated" @@ -648,8 +756,16 @@ async def daskjob_create(name, namespace, logger, patch, **kwargs): "daskjob.kubernetes.dask.org", field="status.jobStatus", new="JobCreated" ) async def daskjob_create_components( - spec, name, namespace, logger, patch, meta, **kwargs -): + spec: kopf.Spec, + name: str | None, + namespace: str | None, + logger: kopf.Logger, + patch: kopf.Patch, + meta: kopf.Meta, + **__: Any, +) -> None: + if name is None or namespace is None: + raise kopf.PermanentError("Name or Namespace not given.") logger.info("Creating Dask job components.") cluster_name = f"{name}" labels = _get_labels(meta) @@ -704,7 +820,11 @@ async def daskjob_create_components( labels={"dask.org/component": "job-runner"}, new="Running", ) -async def handle_runner_status_change_running(meta, namespace, logger, **kwargs): +async def handle_runner_status_change_running( + meta: kopf.Meta, namespace: str | None, logger: kopf.Logger, **__: Any +) -> None: + if namespace is None: + raise kopf.PermanentError("Namespace not given.") logger.info("Job now in running") name = meta["labels"]["dask.org/cluster-name"] job = await DaskJob.get(name, namespace=namespace) @@ -725,7 +845,11 @@ async def handle_runner_status_change_running(meta, namespace, logger, **kwargs) labels={"dask.org/component": "job-runner"}, new="Succeeded", ) -async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwargs): +async def handle_runner_status_change_succeeded( + meta: kopf.Meta, namespace: str | None, logger: kopf.Logger, **__: Any +) -> None: + if namespace is None: + raise kopf.PermanentError("Namespace not given.") logger.info("Job succeeded, deleting Dask cluster.") name = meta["labels"]["dask.org/cluster-name"] cluster = await DaskCluster.get(name, namespace=namespace) @@ -748,7 +872,11 @@ async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwarg labels={"dask.org/component": "job-runner"}, new="Failed", ) -async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwargs): +async def handle_runner_status_change_failed( + meta: kopf.Meta, namespace: str | None, logger: kopf.Logger, **__: Any +) -> None: + if namespace is None: + raise kopf.PermanentError("Namespace not given.") logger.info("Job failed, deleting Dask cluster.") name = meta["labels"]["dask.org/cluster-name"] cluster = await DaskCluster.get(name, namespace=namespace) @@ -766,7 +894,9 @@ async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwarg @kopf.on.create("daskautoscaler.kubernetes.dask.org") -async def daskautoscaler_create(body, logger, **_): +async def daskautoscaler_create( + body: kopf.Body, logger: kopf.Logger, **__: Any +) -> None: """When an autoscaler is created make it a child of the associated cluster for cascade deletion.""" autoscaler = await DaskAutoscaler(body) cluster = await autoscaler.cluster() @@ -775,7 +905,15 @@ async def daskautoscaler_create(body, logger, **_): @kopf.timer("daskautoscaler.kubernetes.dask.org", interval=5.0) -async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs): +async def daskautoscaler_adapt( + spec: kopf.Spec, + name: str | None, + namespace: str | None, + logger: kopf.Logger, + **__: Any, +) -> None: + if name is None or namespace is None: + raise kopf.PermanentError("Name or Namespace not given.") try: scheduler = await Pod.get( label_selector={ @@ -785,7 +923,7 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs): namespace=namespace, ) if not await scheduler.ready(): - raise ValueError() + raise ValueError except ValueError: logger.info("Scheduler not ready, skipping autoscaling") return @@ -812,7 +950,6 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs): desired_workers = await get_desired_workers( scheduler_service_name=f"{spec['cluster']}-scheduler", namespace=namespace, - logger=logger, ) except SchedulerCommError: logger.error("Unable to get desired number of workers from scheduler.") @@ -850,7 +987,13 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs): @kopf.timer("daskcluster.kubernetes.dask.org", interval=5.0) -async def daskcluster_autoshutdown(spec, name, namespace, logger, **kwargs): +async def daskcluster_autoshutdown( + spec: kopf.Spec, + name: str | None, + namespace: str | None, + logger: kopf.Logger, + **__: Any, +) -> None: idle_timeout = spec.get("idleTimeout", 0) if idle_timeout: try: @@ -859,8 +1002,10 @@ async def daskcluster_autoshutdown(spec, name, namespace, logger, **kwargs): namespace=namespace, logger=logger, ) - except Exception: - logger.warn("Unable to connect to scheduler, skipping autoshutdown check.") + except Exception: # TODO: Not use broad "Exception" catch here + logger.warning( + "Unable to connect to scheduler, skipping autoshutdown check." + ) return if idle_since and time.time() > idle_since + idle_timeout: cluster = await DaskCluster.get(name, namespace=namespace) diff --git a/dask_kubernetes/operator/controller/tests/test_controller.py b/dask_kubernetes/operator/controller/tests/test_controller.py index 33abf9c4f..a121f4323 100644 --- a/dask_kubernetes/operator/controller/tests/test_controller.py +++ b/dask_kubernetes/operator/controller/tests/test_controller.py @@ -1,15 +1,26 @@ +from __future__ import annotations + import asyncio import json import os.path import pathlib from contextlib import asynccontextmanager from datetime import datetime, timedelta +from typing import ( + TYPE_CHECKING, + Any, + AsyncContextManager, + AsyncIterator, + Callable, + Final, + Iterator, +) import dask.config import pytest import yaml from dask.distributed import Client -from kr8s.asyncio.objects import Deployment, Pod, Service +from kr8s.asyncio.objects import Deployment, Pod, Service # type: ignore[import-untyped] from dask_kubernetes.constants import MAX_CLUSTER_NAME_LEN from dask_kubernetes.operator._objects import DaskCluster, DaskJob, DaskWorkerGroup @@ -18,16 +29,20 @@ get_job_runner_pod_name, ) -DIR = pathlib.Path(__file__).parent.absolute() +if TYPE_CHECKING: + from kopf.testing import KopfRunner + from pytest_kind.cluster import KindCluster # type: ignore[import-untyped] -_EXPECTED_ANNOTATIONS = {"test-annotation": "annotation-value"} -_EXPECTED_LABELS = {"test-label": "label-value"} -DEFAULT_CLUSTER_NAME = "simple" +DIR: Final[pathlib.Path] = pathlib.Path(__file__).parent.absolute() + +_EXPECTED_ANNOTATIONS: Final[dict[str, str]] = {"test-annotation": "annotation-value"} +_EXPECTED_LABELS: Final[dict[str, str]] = {"test-label": "label-value"} +DEFAULT_CLUSTER_NAME: Final[str] = "simple" @pytest.fixture() -def gen_cluster_manifest(tmp_path): - def factory(cluster_name=DEFAULT_CLUSTER_NAME): +def gen_cluster_manifest(tmp_path: pathlib.Path) -> Callable[..., pathlib.Path]: + def factory(cluster_name: str = DEFAULT_CLUSTER_NAME) -> pathlib.Path: original_manifest_path = os.path.join(DIR, "resources", "simplecluster.yaml") with open(original_manifest_path, "r") as original_manifest_file: manifest = yaml.safe_load(original_manifest_file) @@ -41,52 +56,60 @@ def factory(cluster_name=DEFAULT_CLUSTER_NAME): @pytest.fixture() -def gen_cluster(k8s_cluster, ns, gen_cluster_manifest): +def gen_cluster( + k8s_cluster: KindCluster, + namespace: str, + gen_cluster_manifest: Callable[..., pathlib.Path], +) -> Iterator[Callable[..., AsyncContextManager[tuple[str, str]]]]: """Yields an instantiated context manager for creating/deleting a simple cluster.""" @asynccontextmanager - async def cm(cluster_name=DEFAULT_CLUSTER_NAME): + async def cm( + cluster_name: str = DEFAULT_CLUSTER_NAME, + ) -> AsyncIterator[tuple[str, str]]: cluster_path = gen_cluster_manifest(cluster_name) # Create cluster resource - k8s_cluster.kubectl("apply", "-n", ns, "-f", cluster_path) + k8s_cluster.kubectl("apply", "-n", namespace, "-f", str(cluster_path)) while cluster_name not in k8s_cluster.kubectl( - "get", "daskclusters.kubernetes.dask.org", "-n", ns + "get", "daskclusters.kubernetes.dask.org", "-n", namespace ): await asyncio.sleep(0.1) try: - yield cluster_name, ns + yield cluster_name, namespace finally: # Test: remove the wait=True, because I think this is blocking the operator - k8s_cluster.kubectl("delete", "-n", ns, "-f", cluster_path) + k8s_cluster.kubectl("delete", "-n", namespace, "-f", str(cluster_path)) yield cm @pytest.fixture() -def gen_job(k8s_cluster, ns): +def gen_job( + k8s_cluster: KindCluster, namespace: str +) -> Iterator[Callable[[str], AsyncContextManager[tuple[str, str]]]]: """Yields an instantiated context manager for creating/deleting a simple job.""" @asynccontextmanager - async def cm(job_file): + async def cm(job_file: str) -> AsyncIterator[tuple[str, str]]: job_path = os.path.join(DIR, "resources", job_file) with open(job_path) as f: job_name = yaml.load(f, yaml.Loader)["metadata"]["name"] # Create cluster resource - k8s_cluster.kubectl("apply", "-n", ns, "-f", job_path) + k8s_cluster.kubectl("apply", "-n", namespace, "-f", job_path) while job_name not in k8s_cluster.kubectl( - "get", "daskjobs.kubernetes.dask.org", "-n", ns + "get", "daskjobs.kubernetes.dask.org", "-n", namespace ): await asyncio.sleep(0.1) try: - yield job_name, ns + yield job_name, namespace finally: # Test: remove the wait=True, because I think this is blocking the operator - k8s_cluster.kubectl("delete", "-n", ns, "-f", job_path) + k8s_cluster.kubectl("delete", "-n", namespace, "-f", job_path) while job_name in k8s_cluster.kubectl( - "get", "daskjobs.kubernetes.dask.org", "-n", ns + "get", "daskjobs.kubernetes.dask.org", "-n", namespace ): await asyncio.sleep(0.1) @@ -94,42 +117,44 @@ async def cm(job_file): @pytest.fixture() -def gen_worker_group(k8s_cluster, ns): +def gen_worker_group( + k8s_cluster: KindCluster, namespace: str +) -> Iterator[Callable[[str], AsyncContextManager[tuple[str, str]]]]: """Yields an instantiated context manager for creating/deleting a worker group.""" @asynccontextmanager - async def cm(worker_group_file): + async def cm(worker_group_file: str) -> AsyncIterator[tuple[str, str]]: worker_group_path = os.path.join(DIR, "resources", worker_group_file) with open(worker_group_path) as f: worker_group_name = yaml.load(f, yaml.Loader)["metadata"]["name"] # Create cluster resource - k8s_cluster.kubectl("apply", "-n", ns, "-f", worker_group_path) + k8s_cluster.kubectl("apply", "-n", namespace, "-f", worker_group_path) while worker_group_name not in k8s_cluster.kubectl( - "get", "daskworkergroups.kubernetes.dask.org", "-n", ns + "get", "daskworkergroups.kubernetes.dask.org", "-n", namespace ): await asyncio.sleep(0.1) try: - yield worker_group_name, ns + yield worker_group_name, namespace finally: # Test: remove the wait=True, because I think this is blocking the operator - k8s_cluster.kubectl("delete", "-n", ns, "-f", worker_group_path) + k8s_cluster.kubectl("delete", "-n", namespace, "-f", worker_group_path) while worker_group_name in k8s_cluster.kubectl( - "get", "daskworkergroups.kubernetes.dask.org", "-n", ns + "get", "daskworkergroups.kubernetes.dask.org", "-n", namespace ): await asyncio.sleep(0.1) yield cm -def test_customresources(k8s_cluster): +def test_customresources(k8s_cluster: KindCluster) -> None: assert "daskclusters.kubernetes.dask.org" in k8s_cluster.kubectl("get", "crd") assert "daskworkergroups.kubernetes.dask.org" in k8s_cluster.kubectl("get", "crd") assert "daskjobs.kubernetes.dask.org" in k8s_cluster.kubectl("get", "crd") -def test_operator_runs(kopf_runner): +def test_operator_runs(kopf_runner: KopfRunner) -> None: with kopf_runner as runner: pass @@ -137,7 +162,7 @@ def test_operator_runs(kopf_runner): assert runner.exception is None -def test_operator_plugins(kopf_runner): +def test_operator_plugins(kopf_runner: KopfRunner) -> None: with kopf_runner as runner: pass @@ -148,7 +173,11 @@ def test_operator_plugins(kopf_runner): @pytest.mark.timeout(180) @pytest.mark.anyio -async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster): +async def test_simplecluster( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster() as (cluster_name, ns): scheduler_deployment_name = "simple-scheduler" @@ -167,7 +196,7 @@ async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster): with k8s_cluster.port_forward( f"service/{service_name}", 8786, "-n", ns ) as port: - async with Client( + async with Client( # type: ignore[no-untyped-call] f"tcp://localhost:{port}", asynchronous=True ) as client: await client.wait_for_workers(2) @@ -292,7 +321,11 @@ async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster): @pytest.mark.anyio -async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster): +async def test_scalesimplecluster( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster() as (cluster_name, ns): scheduler_deployment_name = "simple-scheduler" @@ -310,7 +343,7 @@ async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster): with k8s_cluster.port_forward( f"service/{service_name}", 8786, "-n", ns ) as port: - async with Client( + async with Client( # type: ignore[no-untyped-call] f"tcp://localhost:{port}", asynchronous=True ) as client: k8s_cluster.kubectl( @@ -338,8 +371,10 @@ async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster): @pytest.mark.anyio async def test_scalesimplecluster_from_cluster_spec( - k8s_cluster, kopf_runner, gen_cluster -): + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster() as (cluster_name, ns): scheduler_deployment_name = "simple-scheduler" @@ -357,7 +392,7 @@ async def test_scalesimplecluster_from_cluster_spec( with k8s_cluster.port_forward( f"service/{service_name}", 8786, "-n", ns ) as port: - async with Client( + async with Client( # type: ignore[no-untyped-call] f"tcp://localhost:{port}", asynchronous=True ) as client: k8s_cluster.kubectl( @@ -384,7 +419,11 @@ async def test_scalesimplecluster_from_cluster_spec( @pytest.mark.anyio -async def test_recreate_scheduler_pod(k8s_cluster, kopf_runner, gen_cluster): +async def test_recreate_scheduler_pod( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster() as (cluster_name, ns): scheduler_deployment_name = "simple-scheduler" @@ -423,7 +462,11 @@ async def test_recreate_scheduler_pod(k8s_cluster, kopf_runner, gen_cluster): @pytest.mark.anyio @pytest.mark.skip(reason="Flaky in CI") -async def test_recreate_worker_pods(k8s_cluster, kopf_runner, gen_cluster): +async def test_recreate_worker_pods( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster() as (cluster_name, ns): cluster = await DaskCluster.get(cluster_name, namespace=ns) @@ -453,8 +496,10 @@ async def test_recreate_worker_pods(k8s_cluster, kopf_runner, gen_cluster): @pytest.mark.anyio async def test_simplecluster_batched_worker_deployments( - k8s_cluster, kopf_runner, gen_cluster -): + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: with dask.config.set( { @@ -480,7 +525,7 @@ async def test_simplecluster_batched_worker_deployments( with k8s_cluster.port_forward( f"service/{service_name}", 8786, "-n", ns ) as port: - async with Client( + async with Client( # type: ignore[no-untyped-call] f"tcp://localhost:{port}", asynchronous=True ) as client: await client.wait_for_workers(2) @@ -489,8 +534,8 @@ async def test_simplecluster_batched_worker_deployments( assert (await total) == sum(map(lambda x: x + 1, range(10))) -def _get_job_status(k8s_cluster, ns): - return json.loads( +def _get_job_status(k8s_cluster: KindCluster, ns: str) -> dict[str, Any]: + return json.loads( # type: ignore[no-any-return] k8s_cluster.kubectl( "get", "-n", @@ -502,17 +547,17 @@ def _get_job_status(k8s_cluster, ns): ) -def _assert_job_status_created(job_status): +def _assert_job_status_created(job_status: dict[str, Any]) -> None: assert "jobStatus" in job_status -def _assert_job_status_cluster_created(job, job_status): +def _assert_job_status_cluster_created(job: str, job_status: dict[str, Any]) -> None: assert "jobStatus" in job_status assert job_status["clusterName"] == job assert job_status["jobRunnerPodName"] == get_job_runner_pod_name(job) -def _assert_job_status_running(job, job_status): +def _assert_job_status_running(job: str, job_status: dict[str, Any]) -> None: assert "jobStatus" in job_status assert job_status["clusterName"] == job assert job_status["jobRunnerPodName"] == get_job_runner_pod_name(job) @@ -520,7 +565,9 @@ def _assert_job_status_running(job, job_status): assert datetime.utcnow() > start_time > (datetime.utcnow() - timedelta(seconds=10)) -def _assert_final_job_status(job, job_status, expected_status): +def _assert_final_job_status( + job: str, job_status: dict[str, Any], expected_status: str +) -> None: assert job_status["jobStatus"] == expected_status assert job_status["clusterName"] == job assert job_status["jobRunnerPodName"] == get_job_runner_pod_name(job) @@ -538,7 +585,11 @@ def _assert_final_job_status(job, job_status, expected_status): @pytest.mark.anyio -async def test_job(k8s_cluster, kopf_runner, gen_job): +async def test_job( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_job: Callable[[str], AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner as runner: async with gen_job("simplejob.yaml") as (job, ns): assert job @@ -609,7 +660,11 @@ async def test_job(k8s_cluster, kopf_runner, gen_job): @pytest.mark.anyio -async def test_failed_job(k8s_cluster, kopf_runner, gen_job): +async def test_failed_job( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_job: Callable[[str], AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner as runner: async with gen_job("failedjob.yaml") as (job, ns): assert job @@ -667,12 +722,16 @@ async def test_failed_job(k8s_cluster, kopf_runner, gen_job): @pytest.mark.anyio -async def test_object_dask_cluster(k8s_cluster, kopf_runner, gen_cluster): +async def test_object_dask_cluster( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster() as (cluster_name, ns): cluster = await DaskCluster.get(cluster_name, namespace=ns) - worker_groups = [] + worker_groups: list[DaskWorkerGroup] = [] while not worker_groups: worker_groups = await cluster.worker_groups() await asyncio.sleep(0.1) @@ -700,8 +759,11 @@ async def test_object_dask_cluster(k8s_cluster, kopf_runner, gen_cluster): @pytest.mark.anyio async def test_object_dask_worker_group( - k8s_cluster, kopf_runner, gen_cluster, gen_worker_group -): + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], + gen_worker_group: Callable[[str], AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with ( gen_cluster() as (cluster_name, ns), @@ -715,7 +777,7 @@ async def test_object_dask_worker_group( additional_workergroup_name, namespace=ns ) - worker_groups = [] + worker_groups: list[DaskWorkerGroup] = [] while not worker_groups: worker_groups = await cluster.worker_groups() await asyncio.sleep(0.1) @@ -725,13 +787,13 @@ async def test_object_dask_worker_group( for wg in worker_groups: assert isinstance(wg, DaskWorkerGroup) - deployments = [] + deployments: list[Deployment] = [] while not deployments: deployments = await wg.deployments() await asyncio.sleep(0.1) assert all([isinstance(d, Deployment) for d in deployments]) - pods = [] + pods: list[Pod] = [] while not pods: pods = await wg.pods() await asyncio.sleep(0.1) @@ -756,7 +818,11 @@ async def test_object_dask_worker_group( @pytest.mark.anyio @pytest.mark.skip(reason="Flaky in CI") -async def test_object_dask_job(k8s_cluster, kopf_runner, gen_job): +async def test_object_dask_job( + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_job: Callable[[str], AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_job("simplejob.yaml") as (job_name, ns): job = await DaskJob.get(job_name, namespace=ns) @@ -768,13 +834,15 @@ async def test_object_dask_job(k8s_cluster, kopf_runner, gen_job): assert isinstance(cluster, DaskCluster) -async def _get_cluster_status(k8s_cluster, ns, cluster_name): +async def _get_cluster_status( + k8s_cluster: KindCluster, ns: str, cluster_name: str +) -> str: """ Will loop infinitely in search of non-falsey cluster status. Make sure there is a timeout on any test which calls this. """ while True: - cluster_status = k8s_cluster.kubectl( + cluster_status: str = k8s_cluster.kubectl( "get", "-n", ns, @@ -799,8 +867,12 @@ async def _get_cluster_status(k8s_cluster, ns, cluster_name): ], ) async def test_create_cluster_validates_name( - cluster_name, expected_status, k8s_cluster, kopf_runner, gen_cluster -): + cluster_name: str, + expected_status: str, + k8s_cluster: KindCluster, + kopf_runner: KopfRunner, + gen_cluster: Callable[..., AsyncContextManager[tuple[str, str]]], +) -> None: with kopf_runner: async with gen_cluster(cluster_name=cluster_name) as (_, ns): actual_status = await _get_cluster_status(k8s_cluster, ns, cluster_name) diff --git a/dask_kubernetes/operator/kubecluster/__init__.py b/dask_kubernetes/operator/kubecluster/__init__.py index a7c4fde0f..ef9104129 100644 --- a/dask_kubernetes/operator/kubecluster/__init__.py +++ b/dask_kubernetes/operator/kubecluster/__init__.py @@ -5,3 +5,11 @@ make_worker_spec, ) from .discovery import discover + +__all__ = [ + "KubeCluster", + "make_cluster_spec", + "make_scheduler_spec", + "make_worker_spec", + "discover", +] diff --git a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py index 60b8777d6..a834e0671 100644 --- a/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py @@ -7,10 +7,10 @@ from dask_kubernetes.operator import KubeCluster, make_cluster_spec -def test_kubecluster(kopf_runner, docker_image, ns): +def test_kubecluster(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( - name="cluster", namespace=ns, image=docker_image, n_workers=1 + name="cluster", namespace=namespace, image=docker_image, n_workers=1 ) as cluster: with Client(cluster) as client: client.scheduler_info() @@ -18,11 +18,11 @@ def test_kubecluster(kopf_runner, docker_image, ns): @pytest.mark.anyio -async def test_kubecluster_async(kopf_runner, docker_image, ns): +async def test_kubecluster_async(kopf_runner, docker_image, namespace): with kopf_runner: async with KubeCluster( name="async", - namespace=ns, + namespace=namespace, image=docker_image, n_workers=1, asynchronous=True, @@ -31,55 +31,55 @@ async def test_kubecluster_async(kopf_runner, docker_image, ns): assert await client.submit(lambda x: x + 1, 10).result() == 11 -def test_custom_worker_command(kopf_runner, docker_image, ns): +def test_custom_worker_command(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( name="customworker", image=docker_image, worker_command=["python", "-m", "distributed.cli.dask_worker"], n_workers=1, - namespace=ns, + namespace=namespace, ) as cluster: with Client(cluster) as client: assert client.submit(lambda x: x + 1, 10).result() == 11 -def test_multiple_clusters(kopf_runner, docker_image, ns): +def test_multiple_clusters(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( - name="bar", image=docker_image, n_workers=1, namespace=ns + name="bar", image=docker_image, n_workers=1, namespace=namespace ) as cluster1: with Client(cluster1) as client1: assert client1.submit(lambda x: x + 1, 10).result() == 11 with KubeCluster( - name="baz", image=docker_image, n_workers=1, namespace=ns + name="baz", image=docker_image, n_workers=1, namespace=namespace ) as cluster2: with Client(cluster2) as client2: assert client2.submit(lambda x: x + 1, 10).result() == 11 -def test_clusters_with_custom_port_forward(kopf_runner, docker_image, ns): +def test_clusters_with_custom_port_forward(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( name="bar", image=docker_image, n_workers=1, scheduler_forward_port=8888, - namespace=ns, + namespace=namespace, ) as cluster1: assert cluster1.forwarded_dashboard_port == "8888" with Client(cluster1) as client1: assert client1.submit(lambda x: x + 1, 10).result() == 11 -def test_multiple_clusters_simultaneously(kopf_runner, docker_image, ns): +def test_multiple_clusters_simultaneously(kopf_runner, docker_image, namespace): with kopf_runner: with ( KubeCluster( - name="fizz", image=docker_image, n_workers=1, namespace=ns + name="fizz", image=docker_image, n_workers=1, namespace=namespace ) as cluster1, KubeCluster( - name="buzz", image=docker_image, n_workers=1, namespace=ns + name="buzz", image=docker_image, n_workers=1, namespace=namespace ) as cluster2, ): with Client(cluster1) as client1, Client(cluster2) as client2: @@ -87,18 +87,20 @@ def test_multiple_clusters_simultaneously(kopf_runner, docker_image, ns): assert client2.submit(lambda x: x + 1, 10).result() == 11 -def test_multiple_clusters_simultaneously_same_loop(kopf_runner, docker_image, ns): +def test_multiple_clusters_simultaneously_same_loop( + kopf_runner, docker_image, namespace +): with kopf_runner: with ( KubeCluster( - name="fizz", image=docker_image, n_workers=1, namespace=ns + name="fizz", image=docker_image, n_workers=1, namespace=namespace ) as cluster1, KubeCluster( name="buzz", image=docker_image, loop=cluster1.loop, n_workers=1, - namespace=ns, + namespace=namespace, ) as cluster2, ): with Client(cluster1) as client1, Client(cluster2) as client2: @@ -108,27 +110,30 @@ def test_multiple_clusters_simultaneously_same_loop(kopf_runner, docker_image, n @pytest.mark.anyio -async def test_cluster_from_name(kopf_runner, docker_image, ns): +async def test_cluster_from_name(kopf_runner, docker_image, namespace): with kopf_runner: async with KubeCluster( name="abc", - namespace=ns, + namespace=namespace, image=docker_image, n_workers=1, asynchronous=True, ) as firstcluster: async with KubeCluster.from_name( - "abc", namespace=ns, asynchronous=True + "abc", namespace=namespace, asynchronous=True ) as secondcluster: assert firstcluster == secondcluster firstcluster.scale(2) assert firstcluster.scheduler_info == secondcluster.scheduler_info -def test_additional_worker_groups(kopf_runner, docker_image, ns): +def test_additional_worker_groups(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( - name="additionalgroups", n_workers=1, image=docker_image, namespace=ns + name="additionalgroups", + n_workers=1, + image=docker_image, + namespace=namespace, ) as cluster: cluster.add_worker_group(name="more", n_workers=1) with Client(cluster) as client: @@ -137,18 +142,18 @@ def test_additional_worker_groups(kopf_runner, docker_image, ns): cluster.delete_worker_group(name="more") -def test_cluster_without_operator(docker_image, ns): +def test_cluster_without_operator(docker_image, namespace): with pytest.raises(TimeoutError, match="is the Dask Operator running"): KubeCluster( name="noop", n_workers=1, image=docker_image, resource_timeout=1, - namespace=ns, + namespace=namespace, ) -def test_cluster_crashloopbackoff(kopf_runner, docker_image, ns): +def test_cluster_crashloopbackoff(kopf_runner, docker_image, namespace): with kopf_runner: with pytest.raises(SchedulerStartupError, match="Scheduler failed to start"): spec = make_cluster_spec(name="crashloopbackoff", n_workers=1) @@ -157,19 +162,19 @@ def test_cluster_crashloopbackoff(kopf_runner, docker_image, ns): ] = "dask-schmeduler" KubeCluster( custom_cluster_spec=spec, - namespace=ns, + namespace=namespace, resource_timeout=1, idle_timeout=2, ) -def test_adapt(kopf_runner, docker_image, ns): +def test_adapt(kopf_runner, docker_image, namespace): with kopf_runner: with KubeCluster( name="adaptive", image=docker_image, n_workers=0, - namespace=ns, + namespace=namespace, ) as cluster: cluster.adapt(minimum=0, maximum=1) with Client(cluster) as client: @@ -181,17 +186,17 @@ def test_adapt(kopf_runner, docker_image, ns): cluster.scale(0) -def test_custom_spec(kopf_runner, docker_image, ns): +def test_custom_spec(kopf_runner, docker_image, namespace): with kopf_runner: spec = make_cluster_spec("customspec", image=docker_image) with KubeCluster( - custom_cluster_spec=spec, n_workers=1, namespace=ns + custom_cluster_spec=spec, n_workers=1, namespace=namespace ) as cluster: with Client(cluster) as client: assert client.submit(lambda x: x + 1, 10).result() == 11 -def test_typo_resource_limits(ns): +def test_typo_resource_limits(namespace): with pytest.raises(ValueError): KubeCluster( name="foo", @@ -200,7 +205,7 @@ def test_typo_resource_limits(ns): "CPU": "1", }, }, - namespace=ns, + namespace=namespace, ) @@ -211,11 +216,11 @@ def test_typo_resource_limits(ns): "invalid.chars.in.name", ], ) -def test_invalid_cluster_name_fails(cluster_name, kopf_runner, docker_image, ns): +def test_invalid_cluster_name_fails(cluster_name, kopf_runner, docker_image, namespace): with kopf_runner: with pytest.raises(ValidationError): KubeCluster( name=cluster_name, - namespace=ns, + namespace=namespace, image=docker_image, ) diff --git a/requirements-test.txt b/requirements-test.txt index c44e8bbba..4d9b61c55 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -9,3 +9,5 @@ git+https://github.com/elemental-lf/k8s-crd-resolver@v0.14.0 jsonschema==4.17.3 dask[complete] anyio +mypy +types-PyYAML