Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Allow Helm releases to have a random suffix #33

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion azimuth_capi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CAPIHelmConfig(Section):
#: The name of the CAPI Helm chart to use to deploy clusters
chart_name: constr(min_length = 1) = "openstack-cluster"
#: The version of the CAPI Helm chart to use to deploy clusters
chart_version: SemVerVersion = "0.1.2-dev.0.main.51"
chart_version: SemVerVersion = "0.1.2-dev.0.main.55"
#: The default values to use for all clusters
#: Values defined in templates take precedence
default_values: t.Dict[str, t.Any] = Field(default_factory = dict)
Expand Down Expand Up @@ -199,6 +199,9 @@ class Config:
#: The number of seconds to wait between timer executions
timer_interval: conint(gt = 0) = 60

#: The length of the random suffix for clusters
suffix_length: conint(gt = 0) = 5

#: The field manager name to use for server-side apply
easykube_field_manager: constr(min_length = 1) = "azimuth-capi-operator"

Expand Down
4 changes: 4 additions & 0 deletions azimuth_capi/models/v1alpha1/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ class Config:
None,
description = "The name of the secret containing the kubeconfig file, if known."
)
helm_release_name: t.Optional[str] = Field(
None,
description = "The name of the Helm release for the cluster."
)
phase: ClusterPhase = Field(
ClusterPhase.UNKNOWN.value,
description = "The overall phase of the cluster."
Expand Down
111 changes: 68 additions & 43 deletions azimuth_capi/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import json
import logging
import pathlib
import secrets
import ssl
import string
import sys

import kopf
Expand Down Expand Up @@ -255,6 +257,20 @@ async def on_cluster_create(instance, name, namespace, patch, **kwargs):
# Then fetch the template
ekresource = await ekresource_for_model(api.ClusterTemplate)
template = api.ClusterTemplate.parse_obj(await ekresource.fetch(instance.spec.template_name))
# Get the name for the Helm release
if not instance.status.helm_release_name:
try:
# Check if a Helm release already exists with the cluster name
_ = await helm_client.get_current_revision(name, namespace = namespace)
except helm_errors.ReleaseNotFoundError:
# This means that we have an opportunity to use a random suffix
chars = string.ascii_lowercase + string.digits
suffix = "".join(secrets.choice(chars) for _ in range(settings.suffix_length))
instance.status.helm_release_name = f"{name}-{suffix}"
else:
# This means that the cluster was created prior to the random suffixes
instance.status.helm_release_name = name
await save_cluster_status(instance)
# Generate the Helm values for the release
helm_values = mergeconcat(
settings.capi_helm.default_values,
Expand All @@ -268,7 +284,7 @@ async def on_cluster_create(instance, name, namespace, patch, **kwargs):
)
# Use Helm to install or upgrade the release
_ = await helm_client.install_or_upgrade_release(
name,
instance.status.helm_release_name,
await helm_client.get_chart(
settings.capi_helm.chart_name,
repo = settings.capi_helm.chart_repository,
Expand All @@ -281,30 +297,32 @@ async def on_cluster_create(instance, name, namespace, patch, **kwargs):
)
# Ensure that a Zenith operator instance exists for the cluster
if settings.zenith.enabled:
operator_resources = await zenith_operator_resources(name, namespace, secret)
operator_resources = await zenith_operator_resources(instance, secret)
for resource in operator_resources:
kopf.adopt(resource, instance.dict(by_alias = True))
await ekclient.apply_object(resource, force = True)
# Patch the labels to include the cluster template
# This is used by the admission webhook to search for clusters using a template in
# order to prevent deletion of cluster templates that are in use
labels = patch.setdefault("metadata", {}).setdefault("labels", {})
labels[f"{settings.api_group}/cluster-template"] = instance.spec.template_name
# Patch the labels to include the cluster template and the release name
# These are used to search for the cluster later
patch.setdefault("metadata", {}).setdefault("labels", {}).update({
f"{settings.api_group}/cluster-template": instance.spec.template_name,
f"{settings.api_group}/release-name": instance.status.helm_release_name,
})


@model_handler(api.Cluster, kopf.on.delete)
async def on_cluster_delete(name, namespace, **kwargs):
async def on_cluster_delete(instance, name, namespace, **kwargs):
"""
Executes whenever a cluster is deleted.
"""
# Delete the corresponding Helm release
release_name = instance.status.helm_release_name or name
try:
await helm_client.uninstall_release(name, namespace = namespace)
await helm_client.uninstall_release(release_name, namespace = namespace)
except helm_errors.ReleaseNotFoundError:
pass
# Wait until the associated CAPI cluster no longer exists
ekresource = await ekclient.api(CLUSTER_API_VERSION).resource("clusters")
cluster, stream = await ekresource.watch_one(name, namespace = namespace)
cluster, stream = await ekresource.watch_one(release_name, namespace = namespace)
if cluster:
async with stream as events:
async for event in events:
Expand All @@ -313,7 +331,7 @@ async def on_cluster_delete(name, namespace, **kwargs):


@model_handler(api.Cluster, kopf.on.resume)
async def on_cluster_resume(instance, name, namespace, **kwargs):
async def on_cluster_resume(instance, name, namespace, patch, **kwargs):
"""
Executes for each cluster when the operator is resumed.
"""
Expand All @@ -322,7 +340,8 @@ async def on_cluster_resume(instance, name, namespace, **kwargs):
# However if CAPI objects have been deleted while the operator was down, we will
# not receive delete events for those
# So on resume we remove any items in the status that no longer exist
labels = { "capi.stackhpc.com/cluster": name }
release_name = instance.status.helm_release_name or name
labels = { "capi.stackhpc.com/cluster": release_name }
# Get easykube resources for the Cluster API types
ekcapi = ekclient.api(CLUSTER_API_VERSION)
ekclusters = await ekcapi.resource("clusters")
Expand Down Expand Up @@ -360,16 +379,10 @@ async def on_cluster_resume(instance, name, namespace, **kwargs):
instance,
[
addon
async for addon in ekhelmreleases.list(
namespace = namespace,
labels = { "capi.stackhpc.com/cluster": name }
)
async for addon in ekhelmreleases.list(labels = labels, namespace = namespace)
] + [
addon
async for addon in ekmanifests.list(
namespace = namespace,
labels = { "capi.stackhpc.com/cluster": name }
)
async for addon in ekmanifests.list(labels = labels, namespace = namespace)
]
)
await save_cluster_status(instance)
Expand All @@ -390,24 +403,37 @@ async def wrapper(**inner):
# Retry the fetch and updating of the state until it succeeds without conflict
# kopf retry logic does not apply to events
while True:
try:
cluster = api.Cluster.parse_obj(
await ekclusters.fetch(
inner["labels"][cluster_label],
namespace = inner["namespace"]
# Try to find the cluster with the name in the cluster label
# We use the release name label first, falling back to by name
# This is because all of these objects are created by the Helm release
cluster_name = inner["labels"][cluster_label]
cluster_namespace = inner["namespace"]
cluster = await ekclusters.first(
labels = { f"{settings.api_group}/release-name": cluster_name },
namespace = cluster_namespace
)
if not cluster:
try:
cluster = await ekclusters.fetch(
cluster_name,
namespace = cluster_namespace
)
)
except ApiError as exc:
if exc.status_code == 404:
# We couldn't find the cluster - let's give up for now
break
else:
raise
cluster = api.Cluster.parse_obj(cluster)
try:
await func(cluster = cluster, **inner)
await save_cluster_status(cluster)
except ApiError as exc:
# On a 404, don't bother trying again as the cluster is gone
if exc.status_code == 404:
break
# On a conflict response, go round again
elif exc.status_code == 409:
if exc.status_code == 409:
# On a conflict response, go round again
continue
# Any other error should be bubbled up
else:
# Any other error should be bubbled up
raise
else:
# On success, we can break the loop
Expand Down Expand Up @@ -614,8 +640,7 @@ async def on_kubernetes_app_event(


async def annotate_addon_for_reservation(
cluster_name,
cluster_namespace,
instance,
reservation,
service_name,
service_status = None
Expand All @@ -624,6 +649,8 @@ async def annotate_addon_for_reservation(
Annotates the addon for the reservation, if one exists, with information
about the reservation.
"""
cluster_name = instance.status.helm_release_name or instance.metadata.name
cluster_namespace = instance.metadata.namespace
# If the reservation is not part of a Helm release, it isn't part of an addon
annotations = reservation["metadata"].get("annotations", {})
release_namespace = annotations.get("meta.helm.sh/release-namespace")
Expand Down Expand Up @@ -698,19 +725,20 @@ def get_service_status(reservation):
@model_handler(
api.Cluster,
kopf.daemon,
include_instance = False,
cancellation_timeout = 1
)
async def monitor_cluster_services(name, namespace, **kwargs):
async def monitor_cluster_services(instance, name, namespace, **kwargs):
"""
Daemon that monitors Zenith reservations
"""
if not settings.zenith.enabled:
return
if not instance.status.kubeconfig_secret_name:
raise kopf.TemporaryError("kubeconfig for cluster not set yet")
eksecrets = await ekclient.api("v1").resource("secrets")
try:
kubeconfig = await eksecrets.fetch(
f"{name}-kubeconfig",
instance.status.kubeconfig_secret_name,
namespace = namespace
)
except ApiError as exc:
Expand Down Expand Up @@ -739,8 +767,7 @@ async def monitor_cluster_services(name, namespace, **kwargs):
service_name = get_service_name(reservation)
service_status = get_service_status(reservation)
addon = await annotate_addon_for_reservation(
name,
namespace,
instance,
reservation,
service_name,
service_status
Expand Down Expand Up @@ -769,8 +796,7 @@ async def monitor_cluster_services(name, namespace, **kwargs):
if reservation.get("status", {}).get("phase", "Unknown") == "Ready":
service_status = get_service_status(reservation)
addon = await annotate_addon_for_reservation(
name,
namespace,
instance,
reservation,
service_name,
service_status
Expand Down Expand Up @@ -800,8 +826,7 @@ async def monitor_cluster_services(name, namespace, **kwargs):
namespace = namespace
)
await annotate_addon_for_reservation(
name,
namespace,
instance,
reservation,
service_name
)
Expand Down
11 changes: 7 additions & 4 deletions azimuth_capi/zenith.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ async def zenith_values(client, cluster, addons):
)


async def zenith_operator_resources(name, namespace, cloud_credentials_secret):
async def zenith_operator_resources(instance, cloud_credentials_secret):
"""
Returns the resources required to enable the Zenith operator for the given cluster.
"""
Expand All @@ -266,12 +266,15 @@ async def zenith_operator_resources(name, namespace, cloud_credentials_secret):
repo = settings.zenith.chart_repository,
version = settings.zenith.chart_version
),
name,
instance.metadata.name,
mergeconcat(
settings.zenith.operator_defaults,
{
"kubeconfigSecret": {
"name": f"{name}-kubeconfig",
"name": (
instance.status.kubeconfig_secret_name or
f"{instance.status.helm_release_name}-kubeconfig"
),
"key": "value",
},
"config": {
Expand All @@ -284,6 +287,6 @@ async def zenith_operator_resources(name, namespace, cloud_credentials_secret):
},
}
),
namespace = namespace
namespace = instance.metadata.namespace
)
)