Skip to content

Commit

Permalink
Merge pull request #3 from stackhpc/add-schedule-delete
Browse files Browse the repository at this point in the history
Add scheduled delete
  • Loading branch information
JohnGarbutt authored Mar 19, 2024
2 parents 5450fcc + f5e3bc4 commit ea79d63
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 51 deletions.
14 changes: 6 additions & 8 deletions azimuth_schedule_operator/models/v1alpha1/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@

class ScheduleStatus(schema.BaseModel):
# updated to show operator found CRD
ref_found: bool = False
ref_exists: bool = False
# updated when delete has been triggered
delete_triggered: bool = False
ref_delete_triggered: bool = False
updated_at: schema.Optional[datetime.datetime] = None


class ScheduleRef(schema.BaseModel):
apiVersion: str
api_version: str
kind: str
name: str


class ScheduleSpec(schema.BaseModel):
ref: ScheduleRef
notBefore: datetime.datetime
notAfter: datetime.datetime
not_after: datetime.datetime


class Schedule(
Expand All @@ -44,8 +44,6 @@ def get_fake_dict():
metadata=dict(name="test1", uid="fakeuid1", namespace="ns1"),
spec=dict(
ref=dict(apiVersion="v1", kind="Pod", name="test1"),
notBefore=datetime.datetime.now(),
notAfter=datetime.datetime.now() + datetime.timedelta(days=1),
notAfter=datetime.datetime.now(datetime.timezone.utc),
),
status=dict(ref_found=True, delete_triggered=False),
)
74 changes: 68 additions & 6 deletions azimuth_schedule_operator/operator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import datetime
import logging
import os
import sys
Expand All @@ -12,6 +13,10 @@
LOG = logging.getLogger(__name__)
K8S_CLIENT = None

CHECK_INTERVAL_SECONDS = int(
os.environ.get("AZIMUTH_SCHEDULE_CHECK_INTERVAL_SECONDS", "60")
)


@kopf.on.startup()
async def startup(settings, **kwargs):
Expand Down Expand Up @@ -46,15 +51,72 @@ async def startup(settings, **kwargs):


@kopf.on.cleanup()
async def cleanup(**kwargs):
async def cleanup(**_):
if K8S_CLIENT:
await K8S_CLIENT.aclose()
LOG.info("Cleanup complete.")


@kopf.on.create(registry.API_GROUP, "schedule")
@kopf.on.update(registry.API_GROUP, "schedule")
@kopf.on.resume(registry.API_GROUP, "schedule")
async def schedule_changed(body, name, namespace, labels, **kwargs):
async def get_reference(namespace: str, ref: schedule_crd.ScheduleRef):
resource = await K8S_CLIENT.api(ref.api_version).resource(ref.kind)
object = await resource.fetch(ref.name, namespace=namespace)
return object


async def delete_reference(namespace: str, ref: schedule_crd.ScheduleRef):
resource = await K8S_CLIENT.api(ref.api_version).resource(ref.kind)
await resource.delete(ref.name, namespace=namespace)


async def update_schedule_status(namespace: str, name: str, status_updates: dict):
status_resource = await K8S_CLIENT.api(registry.API_VERSION).resource(
"schedules/status"
)
await status_resource.patch(
name,
dict(status=status_updates),
namespace=namespace,
)


async def check_for_delete(namespace: str, schedule: schedule_crd.Schedule):
now = datetime.datetime.now(datetime.timezone.utc)
if now >= schedule.spec.not_after:
LOG.info(f"Attempting delete for {namespace} and {schedule.metadata.name}.")
await delete_reference(namespace, schedule.spec.ref)
await update_schedule(
namespace, schedule.metadata.name, ref_delete_triggered=True
)
else:
LOG.info(f"No delete for {namespace} and {schedule.metadata.name}.")


async def update_schedule(
namespace: str,
name: str,
ref_exists: bool = None,
ref_delete_triggered: bool = None,
):
now = datetime.datetime.now(datetime.timezone.utc)
now_string = now.strftime("%Y-%m-%dT%H:%M:%SZ")
status_updates = dict(updatedAt=now_string)

if ref_exists is not None:
status_updates["refExists"] = ref_exists
if ref_delete_triggered is not None:
status_updates["refDeleteTriggered"] = ref_delete_triggered

LOG.info(f"Updating status for {name} in {namespace} with: {status_updates}")
await update_schedule_status(namespace, name, status_updates)


@kopf.timer(registry.API_GROUP, "schedule", interval=CHECK_INTERVAL_SECONDS)
async def schedule_check(body, namespace, **_):
schedule = schedule_crd.Schedule(**body)
LOG.error(f"seen schedule changed {schedule.spec}")

if not schedule.status.ref_exists:
await get_reference(namespace, schedule.spec.ref)
await update_schedule(namespace, schedule.metadata.name, ref_exists=True)

if not schedule.status.ref_delete_triggered:
await check_for_delete(namespace, schedule)
14 changes: 7 additions & 7 deletions azimuth_schedule_operator/tests/models/test_crds.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,29 @@ def test_cluster_type_crd_json(self):
],
"type": "object"
},
"notBefore": {
"format": "date-time",
"type": "string"
},
"notAfter": {
"format": "date-time",
"type": "string"
}
},
"required": [
"ref",
"notBefore",
"notAfter"
],
"type": "object"
},
"status": {
"properties": {
"refFound": {
"refExists": {
"type": "boolean"
},
"deleteTriggered": {
"refDeleteTriggered": {
"type": "boolean"
},
"updatedAt": {
"format": "date-time",
"nullable": true,
"type": "string"
}
},
"type": "object"
Expand Down
128 changes: 126 additions & 2 deletions azimuth_schedule_operator/tests/test_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import unittest
from unittest import mock

Expand Down Expand Up @@ -48,5 +49,128 @@ async def test_cleanup_calls_aclose(self, mock_client):
await operator.cleanup()
mock_client.aclose.assert_awaited_once_with()

async def test_cluster_type_create_success(self):
await operator.schedule_changed(schedule_crd.get_fake_dict(), "type1", "ns", {})
@mock.patch.object(operator, "update_schedule")
@mock.patch.object(operator, "check_for_delete")
@mock.patch.object(operator, "get_reference")
async def test_schedule_check(
self, mock_get_reference, mock_check_for_delete, mock_update_schedule
):
body = schedule_crd.get_fake_dict()
fake = schedule_crd.Schedule(**body)
namespace = "ns1"

await operator.schedule_check(body, namespace)

mock_get_reference.assert_awaited_once_with(namespace, fake.spec.ref)
mock_check_for_delete.assert_awaited_once_with(namespace, fake)
mock_update_schedule.assert_awaited_once_with(
namespace,
fake.metadata.name,
ref_exists=True,
)

@mock.patch.object(operator, "update_schedule")
@mock.patch.object(operator, "check_for_delete")
@mock.patch.object(operator, "get_reference")
async def test_schedule_check_skip(
self, mock_get_reference, mock_check_for_delete, mock_update_schedule
):
body = schedule_crd.get_fake_dict()
body["status"] = {"refExists": True, "refDeleteTriggered": True}
namespace = "ns1"

await operator.schedule_check(body, namespace)

mock_get_reference.assert_not_called()
mock_check_for_delete.assert_not_called()
mock_update_schedule.assert_not_called()

@mock.patch.object(operator, "update_schedule")
@mock.patch.object(operator, "delete_reference")
async def test_check_for_delete(self, mock_delete_reference, mock_update_schedule):
namespace = "ns1"
schedule = schedule_crd.get_fake()

await operator.check_for_delete(namespace, schedule)

mock_delete_reference.assert_awaited_once_with(namespace, schedule.spec.ref)
mock_update_schedule.assert_awaited_once_with(
namespace, schedule.metadata.name, ref_delete_triggered=True
)

@mock.patch.object(operator, "update_schedule")
@mock.patch.object(operator, "delete_reference")
async def test_check_for_delete_skip(
self, mock_delete_reference, mock_update_schedule
):
namespace = "ns1"
schedule = schedule_crd.get_fake()
now = datetime.datetime.now(datetime.timezone.utc)
schedule.spec.not_after = now + datetime.timedelta(seconds=5)

await operator.check_for_delete(namespace, schedule)

mock_delete_reference.assert_not_called()
mock_update_schedule.assert_not_called()

@mock.patch.object(operator, "update_schedule_status")
async def test_update_schedule(self, mock_update_schedule_status):
name = "schedule1"
namespace = "ns1"

await operator.update_schedule(
namespace, name, ref_exists=True, ref_delete_triggered=False
)

mock_update_schedule_status.assert_awaited_once_with(
namespace,
name,
{"updatedAt": mock.ANY, "refExists": True, "refDeleteTriggered": False},
)

@mock.patch.object(operator, "K8S_CLIENT", new_callable=mock.Mock)
async def test_get_reference(self, mock_client):
mock_resource = mock.AsyncMock()
mock_api = mock.AsyncMock()
mock_api.resource.return_value = mock_resource
mock_client.api.return_value = mock_api
mock_resource.fetch.return_value = "result"
ref = schedule_crd.ScheduleRef(api_version="v1", kind="Pod", name="pod1")

result = await operator.get_reference("ns1", ref)

self.assertEqual(result, "result")
mock_client.api.assert_called_once_with("v1")
mock_api.resource.assert_awaited_once_with("Pod")
mock_resource.fetch.assert_awaited_once_with("pod1", namespace="ns1")

@mock.patch.object(operator, "K8S_CLIENT", new_callable=mock.Mock)
async def test_delete_reference(self, mock_client):
mock_resource = mock.AsyncMock()
mock_api = mock.AsyncMock()
mock_api.resource.return_value = mock_resource
mock_client.api.return_value = mock_api
ref = schedule_crd.ScheduleRef(api_version="v1", kind="Pod", name="pod1")

await operator.delete_reference("ns1", ref)

mock_client.api.assert_called_once_with("v1")
mock_api.resource.assert_awaited_once_with("Pod")
mock_resource.delete.assert_awaited_once_with("pod1", namespace="ns1")

@mock.patch.object(operator, "K8S_CLIENT", new_callable=mock.Mock)
async def test_update_schedule_status(self, mock_client):
mock_resource = mock.AsyncMock()
mock_api = mock.AsyncMock()
mock_api.resource.return_value = mock_resource
mock_client.api.return_value = mock_api

await operator.update_schedule_status("ns1", "test1", {"a": "asdf"})

mock_client.api.assert_called_once_with(
"scheduling.azimuth.stackhpc.com/v1alpha1"
)
mock_api.resource.assert_awaited_once_with("schedules/status")
mock_resource.patch.assert_awaited_once_with(
"test1", {"status": {"a": "asdf"}}, namespace="ns1"
)
Empty file.
6 changes: 5 additions & 1 deletion charts/operator/templates/clusterrole-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ rules:
- apiGroups: ["scheduling.azimuth.stackhpc.com"]
resources: ["*"]
verbs: ["*"]
# allow these things to be deleted by the operator
- apiGroups: ["caas.azimuth.stackhpc.com"]
resources: ["*"]
resources: ["clusters"]
verbs: ["*"]
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["*"]
- apiGroups: ["rbac.authorization.k8s.io"]
resources: ["clusterrolebindings"]
Expand Down
12 changes: 6 additions & 6 deletions charts/operator/templates/prometheusrule.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ spec:
groups:
- name: azimuth-schedule-operator.rules
rules:
- alert: AzimuthCaasClusterNotReady
- alert: AzimuthScheduleRefNotFound
expr: >-
sum(azimuth_schedule_clusters_phase{phase!="Ready"}) by(cluster_namespace, cluster_name) > 0
for: 1h
sum(azimuth_schedule_delete_triggered{schedule_ref_found!="true"}) by(schedule_namespace, schedule_name) > 0
for: 15m
annotations:
description: >-
Azimuth schedule
{{ "{{" }} $labels.cluster_namespace {{ "}}" }}/{{ "{{" }} $labels.cluster_name {{ "}}" }}
has been in a non-ready state for longer than one hour.
summary: Azimuth schedule has been in a non-ready state for more than one hour.
{{ "{{" }} $labels.schedule_namespace {{ "}}" }}/{{ "{{" }} $labels.schedule_name {{ "}}" }}
has been in a not found state for longer than 15 mins.
summary: Azimuth schedule has not found its ref for longer than 15 mins.
labels:
severity: warning
{{- end }}
23 changes: 10 additions & 13 deletions charts/operator/values.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# Config for the operator
config:
placeholder: "asdf"

# The operator image to use
image:
repository: ghcr.io/stackhpc/azimuth-schedule-operator
Expand Down Expand Up @@ -70,7 +66,7 @@ metrics:
create: true
extraRules:
- apiGroups:
- schedule.azimuth.stackhpc.com
- scheduling.azimuth.stackhpc.com
resources:
- schedules
verbs:
Expand All @@ -87,20 +83,21 @@ metrics:
spec:
resources:
- groupVersionKind:
group: schedule.azimuth.stackhpc.com
group: scheduling.azimuth.stackhpc.com
version: v1alpha1
kind: Schedule
metricNamePrefix: azimuth_schedule_clusters
metricNamePrefix: azimuth_schedule
labelsFromPath:
cluster_namespace: [metadata, namespace]
cluster_name: [metadata, name]
cluster_type_name: [spec, clusterTypeName]
cluster_type_version: [spec, clusterTypeVersion]
schedule_namespace: [metadata, namespace]
schedule_name: [metadata, name]
schedule_ref_kind: [spec, ref, kind]
schedule_ref_name: [spec, ref, name]
schedule_ref_found: [status, refExists]
metrics:
- name: phase
- name: delete_triggered
help: "Schedule phase"
each:
type: Info
info:
labelsFromPath:
phase: [status, phase]
delete_triggered: [status, refDeleteTriggered]
Loading

0 comments on commit ea79d63

Please sign in to comment.