Skip to content

Commit

Permalink
fix(validation): Validate Dask Cluster Names
Browse files Browse the repository at this point in the history
This commit introduces cluster name validation in order to avoid the
invalid state in which a `DaskCluster` resource with a too-long or
RFC-1123-noncompliant name is created but cannot be
deleted while the operator retries infinitely to create a scheduler service (see dask#826 for more details on this bug).

Issues fixed:
dask#870
dask#826
  • Loading branch information
Johanna Goergen committed Feb 27, 2024
1 parent 39038be commit 850aaa6
Showing 1 changed file with 33 additions and 3 deletions.
36 changes: 33 additions & 3 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import re
import time
from collections import defaultdict
from contextlib import suppress
Expand Down Expand Up @@ -31,6 +32,19 @@
KUBERNETES_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION = "kubernetes.dask.org/cooldown-until"
KUBERNETES_MAX_RESOURCE_NAME_LENGTH = 63
SCHEDULER_NAME_TEMPLATE = "{cluster_name}-scheduler"
MAX_CLUSTER_NAME_LEN = KUBERNETES_MAX_RESOURCE_NAME_LENGTH - len(
SCHEDULER_NAME_TEMPLATE.format(cluster_name="")
)
VALID_CLUSTER_NAME = re.compile(
rf"^(?=.{{,{MAX_CLUSTER_NAME_LEN}}}$)[a-z0-9]([-a-z0-9]*[a-z0-9])?$"
)


def _validate_cluster_name(cluster_name: str) -> bool:
return bool(VALID_CLUSTER_NAME.match(cluster_name))


# Load operator plugins from other packages
PLUGINS = []
Expand Down Expand Up @@ -75,7 +89,7 @@ def build_scheduler_deployment_spec(
}
)
metadata = {
"name": f"{cluster_name}-scheduler",
"name": SCHEDULER_NAME_TEMPLATE.format(cluster_name=cluster_name),
"labels": labels,
"annotations": annotations,
}
Expand Down Expand Up @@ -107,7 +121,7 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels):
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": f"{cluster_name}-scheduler",
"name": SCHEDULER_NAME_TEMPLATE.format(cluster_name=cluster_name),
"labels": labels,
"annotations": annotations,
},
Expand Down Expand Up @@ -273,6 +287,16 @@ async def daskcluster_create(name, namespace, logger, patch, **kwargs):
This allows us to track that the operator is running.
"""
logger.info(f"DaskCluster {name} created in {namespace}.")

if not _validate_cluster_name(name):
patch.status["phase"] = "Failed"
raise kopf.PermanentError(
f"The DaskCluster {name} is invalid: a lowercase RFC 1123 subdomain must "
"consist of lower case alphanumeric characters, '-' or '.', and must start "
"and end with an alphanumeric character. DaskCluster name must also be under "
f"{MAX_CLUSTER_NAME_LEN} characters."
)

patch.status["phase"] = "Created"


Expand Down Expand Up @@ -331,6 +355,12 @@ async def daskcluster_create_components(
patch.status["phase"] = "Pending"


@kopf.on.field("daskcluster.kubernetes.dask.org", field="status.phase", new="Failed")
async def daskcluster_delete(spec, name, namespace, logger, patch, meta, **kwargs):
cluster = await DaskCluster.get(name, namespace=namespace)
await cluster.delete()


@kopf.on.field("service", field="status", labels={"dask.org/component": "scheduler"})
async def handle_scheduler_service_status(
spec, labels, status, namespace, logger, **kwargs
Expand Down Expand Up @@ -599,7 +629,7 @@ async def daskworkergroup_replica_update(
if workers_needed < 0:
worker_ids = await retire_workers(
n_workers=-workers_needed,
scheduler_service_name=f"{cluster_name}-scheduler",
scheduler_service_name=SCHEDULER_NAME_TEMPLATE.format(cluster_name),
worker_group_name=name,
namespace=namespace,
logger=logger,
Expand Down

0 comments on commit 850aaa6

Please sign in to comment.