diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index 00c47aeed..7d89ca6af 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -1,4 +1,5 @@ import asyncio +import re import time from collections import defaultdict from contextlib import suppress @@ -31,6 +32,19 @@ KUBERNETES_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION = "kubernetes.dask.org/cooldown-until" +KUBERNETES_MAX_RESOURCE_NAME_LENGTH = 63 +SCHEDULER_NAME_TEMPLATE = "{cluster_name}-scheduler" +MAX_CLUSTER_NAME_LEN = KUBERNETES_MAX_RESOURCE_NAME_LENGTH - len( + SCHEDULER_NAME_TEMPLATE.format(cluster_name="") +) +VALID_CLUSTER_NAME = re.compile( + rf"^(?=.{{,{MAX_CLUSTER_NAME_LEN}}}$)[a-z0-9]([-a-z0-9]*[a-z0-9])?$" +) + + +def _validate_cluster_name(cluster_name: str) -> bool: + return bool(VALID_CLUSTER_NAME.match(cluster_name)) + # Load operator plugins from other packages PLUGINS = [] @@ -75,7 +89,7 @@ def build_scheduler_deployment_spec( } ) metadata = { - "name": f"{cluster_name}-scheduler", + "name": SCHEDULER_NAME_TEMPLATE.format(cluster_name=cluster_name), "labels": labels, "annotations": annotations, } @@ -107,7 +121,7 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels): "apiVersion": "v1", "kind": "Service", "metadata": { - "name": f"{cluster_name}-scheduler", + "name": SCHEDULER_NAME_TEMPLATE.format(cluster_name=cluster_name), "labels": labels, "annotations": annotations, }, @@ -273,6 +287,16 @@ async def daskcluster_create(name, namespace, logger, patch, **kwargs): This allows us to track that the operator is running. """ logger.info(f"DaskCluster {name} created in {namespace}.") + + if not _validate_cluster_name(name): + patch.status["phase"] = "Failed" + raise kopf.PermanentError( + f"The DaskCluster {name} is invalid: a lowercase RFC 1123 subdomain must " + "consist of lower case alphanumeric characters, '-' or '.', and must start " + "and end with an alphanumeric character. DaskCluster name must also be under " + f"{MAX_CLUSTER_NAME_LEN} characters." + ) + patch.status["phase"] = "Created" @@ -331,6 +355,12 @@ async def daskcluster_create_components( patch.status["phase"] = "Pending" +@kopf.on.field("daskcluster.kubernetes.dask.org", field="status.phase", new="Failed") +async def daskcluster_delete(spec, name, namespace, logger, patch, meta, **kwargs): + cluster = await DaskCluster.get(name, namespace=namespace) + await cluster.delete() + + @kopf.on.field("service", field="status", labels={"dask.org/component": "scheduler"}) async def handle_scheduler_service_status( spec, labels, status, namespace, logger, **kwargs @@ -599,7 +629,7 @@ async def daskworkergroup_replica_update( if workers_needed < 0: worker_ids = await retire_workers( n_workers=-workers_needed, - scheduler_service_name=f"{cluster_name}-scheduler", + scheduler_service_name=SCHEDULER_NAME_TEMPLATE.format(cluster_name), worker_group_name=name, namespace=namespace, logger=logger,