From 9a2dec7cc7a7a83528a03ac0a4c84896384e847b Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Mon, 18 Nov 2024 11:42:08 +0100 Subject: [PATCH] JobSink: Delete secrets associated with jobs when jobs are deleted As reported in https://github.com/knative/eventing/issues/8323 old JobSink secrets lead to processing old events again while new events are lost. Using OwnerReference and k8s garbage collection, now a secret created for a given event is bound to a given Job lifecycle, so that when a job is deleted, the associated secret will be deleted. Signed-off-by: Pierangelo Di Pilato --- cmd/jobsink/main.go | 87 ++++++++++++++------------- test/rekt/features/jobsink/jobsink.go | 55 ++++++++++++++++- test/rekt/job_sink_test.go | 18 +++++- 3 files changed, 117 insertions(+), 43 deletions(-) diff --git a/cmd/jobsink/main.go b/cmd/jobsink/main.go index a79cf5d7655..e128a783b21 100644 --- a/cmd/jobsink/main.go +++ b/cmd/jobsink/main.go @@ -258,44 +258,6 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { jobName := kmeta.ChildName(ref.Name, id) - logger.Debug("Creating secret for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) - - jobSinkUID := js.GetUID() - - or := metav1.OwnerReference{ - APIVersion: sinksv.SchemeGroupVersion.String(), - Kind: sinks.JobSinkResource.Resource, - Name: js.GetName(), - UID: jobSinkUID, - Controller: ptr.Bool(true), - BlockOwnerDeletion: ptr.Bool(false), - } - - secret := &corev1.Secret{ - TypeMeta: metav1.TypeMeta{}, - ObjectMeta: metav1.ObjectMeta{ - Name: jobName, - Namespace: ref.Namespace, - Labels: map[string]string{ - sinks.JobSinkIDLabel: id, - sinks.JobSinkNameLabel: ref.Name, - }, - OwnerReferences: []metav1.OwnerReference{or}, - }, - Immutable: ptr.Bool(true), - Data: map[string][]byte{"event": eventBytes}, - Type: corev1.SecretTypeOpaque, - } - - _, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{}) - if err != nil && !apierrors.IsAlreadyExists(err) { - logger.Warn("Failed to create secret", zap.Error(err)) - - w.Header().Add("Reason", err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - logger.Debug("Creating job for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) job := js.Spec.Job.DeepCopy() @@ -305,7 +267,14 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } job.Labels[sinks.JobSinkIDLabel] = id job.Labels[sinks.JobSinkNameLabel] = ref.Name - job.OwnerReferences = append(job.OwnerReferences, or) + job.OwnerReferences = append(job.OwnerReferences, metav1.OwnerReference{ + APIVersion: sinksv.SchemeGroupVersion.String(), + Kind: sinks.JobSinkResource.Resource, + Name: js.GetName(), + UID: js.GetUID(), + Controller: ptr.Bool(true), + BlockOwnerDeletion: ptr.Bool(false), + }) var mountPathName string for i := range job.Spec.Template.Spec.Containers { found := false @@ -346,8 +315,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { }) } - _, err = h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{}) - if err != nil { + createdJob, err := h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { logger.Warn("Failed to create job", zap.Error(err)) w.Header().Add("Reason", err.Error()) @@ -355,6 +324,42 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + logger.Debug("Creating secret for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) + + secret := &corev1.Secret{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: ref.Namespace, + Labels: map[string]string{ + sinks.JobSinkIDLabel: id, + sinks.JobSinkNameLabel: ref.Name, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: createdJob.APIVersion, + Kind: createdJob.Kind, + Name: createdJob.Name, + UID: createdJob.UID, + Controller: ptr.Bool(false), + BlockOwnerDeletion: ptr.Bool(false), + }, + }, + }, + Immutable: ptr.Bool(true), + Data: map[string][]byte{"event": eventBytes}, + Type: corev1.SecretTypeOpaque, + } + + _, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + logger.Warn("Failed to create secret", zap.Error(err)) + + w.Header().Add("Reason", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Header().Add("Location", locationHeader(ref, event.Source(), event.ID())) w.WriteHeader(http.StatusAccepted) } diff --git a/test/rekt/features/jobsink/jobsink.go b/test/rekt/features/jobsink/jobsink.go index c872dc64f46..539835f614d 100644 --- a/test/rekt/features/jobsink/jobsink.go +++ b/test/rekt/features/jobsink/jobsink.go @@ -24,6 +24,7 @@ import ( cetest "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" @@ -43,11 +44,14 @@ import ( "knative.dev/eventing/test/rekt/resources/jobsink" ) -func Success() *feature.Feature { +func Success(jobSinkName string) *feature.Feature { f := feature.NewFeature() sink := feature.MakeRandomK8sName("sink") jobSink := feature.MakeRandomK8sName("jobsink") + if jobSinkName != "" { + jobSink = jobSinkName + } source := feature.MakeRandomK8sName("source") event := cetest.FullEvent() @@ -83,6 +87,31 @@ func Success() *feature.Feature { return f } +func DeleteJobsCascadeSecretsDeletion(jobSink string) *feature.Feature { + f := feature.NewFeature() + + f.Setup("Prerequisite: At least one secret for jobsink present", verifySecretsForJobSink(jobSink, func(secrets *corev1.SecretList) bool { + return len(secrets.Items) > 0 + })) + + f.Requirement("delete jobs for jobsink", func(ctx context.Context, t feature.T) { + err := kubeclient.Get(ctx).BatchV1(). + Jobs(environment.FromContext(ctx).Namespace()). + DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", sinks.JobSinkNameLabel, jobSink), + }) + if err != nil { + t.Error(err) + } + }) + + f.Assert("No secrets for jobsink are present", verifySecretsForJobSink(jobSink, func(secrets *corev1.SecretList) bool { + return len(secrets.Items) == 0 + })) + + return f +} + func SuccessTLS() *feature.Feature { f := feature.NewFeature() @@ -239,3 +268,27 @@ func AtLeastOneJobIsComplete(jobSinkName string) feature.StepFn { t.Errorf("No job is complete:\n%v", string(bytes)) } } + +func verifySecretsForJobSink(jobSink string, verify func(secrets *corev1.SecretList) bool) feature.StepFn { + return func(ctx context.Context, t feature.T) { + + interval, timeout := environment.PollTimingsFromContext(ctx) + lastSecretList := &corev1.SecretList{} + err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { + var err error + lastSecretList, err = kubeclient.Get(ctx).CoreV1(). + Secrets(environment.FromContext(ctx).Namespace()). + List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", sinks.JobSinkNameLabel, jobSink), + }) + if err != nil { + return false, fmt.Errorf("failed to list secrets: %w", err) + } + return verify(lastSecretList), nil + }) + if err != nil { + bytes, _ := json.Marshal(lastSecretList) + t.Errorf("failed to wait for no secrets: %v\nSecret list:\n%s", err, string(bytes)) + } + } +} diff --git a/test/rekt/job_sink_test.go b/test/rekt/job_sink_test.go index 3263c450a9a..f933bdc65c1 100644 --- a/test/rekt/job_sink_test.go +++ b/test/rekt/job_sink_test.go @@ -46,7 +46,23 @@ func TestJobSinkSuccess(t *testing.T) { environment.Managed(t), ) - env.Test(ctx, t, jobsink.Success()) + env.Test(ctx, t, jobsink.Success("")) +} + +func TestJobSinkDeleteJobCascadeSecretDeletion(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + jobSinkName := feature.MakeRandomK8sName("jobsink") + env.Test(ctx, t, jobsink.Success(jobSinkName)) + env.Test(ctx, t, jobsink.DeleteJobsCascadeSecretsDeletion(jobSinkName)) } func TestJobSinkSuccessTLS(t *testing.T) {