Skip to content

Commit

Permalink
JobSink: Delete secrets associated with jobs when jobs are deleted
Browse files Browse the repository at this point in the history
As reported in #8323 old
JobSink secrets lead to processing old events again while new events
are lost.

Using OwnerReference and k8s garbage collection, now a secret created
for a given event is bound to a given Job lifecycle, so that when a job
is deleted, the associated secret will be deleted.

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi committed Nov 18, 2024
1 parent bc6e878 commit 9a2dec7
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 43 deletions.
87 changes: 46 additions & 41 deletions cmd/jobsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,44 +258,6 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

jobName := kmeta.ChildName(ref.Name, id)

logger.Debug("Creating secret for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))

jobSinkUID := js.GetUID()

or := metav1.OwnerReference{
APIVersion: sinksv.SchemeGroupVersion.String(),
Kind: sinks.JobSinkResource.Resource,
Name: js.GetName(),
UID: jobSinkUID,
Controller: ptr.Bool(true),
BlockOwnerDeletion: ptr.Bool(false),
}

secret := &corev1.Secret{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: ref.Namespace,
Labels: map[string]string{
sinks.JobSinkIDLabel: id,
sinks.JobSinkNameLabel: ref.Name,
},
OwnerReferences: []metav1.OwnerReference{or},
},
Immutable: ptr.Bool(true),
Data: map[string][]byte{"event": eventBytes},
Type: corev1.SecretTypeOpaque,
}

_, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
logger.Warn("Failed to create secret", zap.Error(err))

w.Header().Add("Reason", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}

logger.Debug("Creating job for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))

job := js.Spec.Job.DeepCopy()
Expand All @@ -305,7 +267,14 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
job.Labels[sinks.JobSinkIDLabel] = id
job.Labels[sinks.JobSinkNameLabel] = ref.Name
job.OwnerReferences = append(job.OwnerReferences, or)
job.OwnerReferences = append(job.OwnerReferences, metav1.OwnerReference{
APIVersion: sinksv.SchemeGroupVersion.String(),
Kind: sinks.JobSinkResource.Resource,
Name: js.GetName(),
UID: js.GetUID(),
Controller: ptr.Bool(true),
BlockOwnerDeletion: ptr.Bool(false),
})
var mountPathName string
for i := range job.Spec.Template.Spec.Containers {
found := false
Expand Down Expand Up @@ -346,15 +315,51 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
})
}

_, err = h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{})
if err != nil {
createdJob, err := h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
logger.Warn("Failed to create job", zap.Error(err))

w.Header().Add("Reason", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}

logger.Debug("Creating secret for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))

secret := &corev1.Secret{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: ref.Namespace,
Labels: map[string]string{
sinks.JobSinkIDLabel: id,
sinks.JobSinkNameLabel: ref.Name,
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: createdJob.APIVersion,
Kind: createdJob.Kind,
Name: createdJob.Name,
UID: createdJob.UID,
Controller: ptr.Bool(false),
BlockOwnerDeletion: ptr.Bool(false),
},
},
},
Immutable: ptr.Bool(true),
Data: map[string][]byte{"event": eventBytes},
Type: corev1.SecretTypeOpaque,
}

_, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
logger.Warn("Failed to create secret", zap.Error(err))

w.Header().Add("Reason", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}

w.Header().Add("Location", locationHeader(ref, event.Source(), event.ID()))
w.WriteHeader(http.StatusAccepted)
}
Expand Down
55 changes: 54 additions & 1 deletion test/rekt/features/jobsink/jobsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -43,11 +44,14 @@ import (
"knative.dev/eventing/test/rekt/resources/jobsink"
)

func Success() *feature.Feature {
func Success(jobSinkName string) *feature.Feature {
f := feature.NewFeature()

sink := feature.MakeRandomK8sName("sink")
jobSink := feature.MakeRandomK8sName("jobsink")
if jobSinkName != "" {
jobSink = jobSinkName
}
source := feature.MakeRandomK8sName("source")

event := cetest.FullEvent()
Expand Down Expand Up @@ -83,6 +87,31 @@ func Success() *feature.Feature {
return f
}

func DeleteJobsCascadeSecretsDeletion(jobSink string) *feature.Feature {
f := feature.NewFeature()

f.Setup("Prerequisite: At least one secret for jobsink present", verifySecretsForJobSink(jobSink, func(secrets *corev1.SecretList) bool {
return len(secrets.Items) > 0
}))

f.Requirement("delete jobs for jobsink", func(ctx context.Context, t feature.T) {
err := kubeclient.Get(ctx).BatchV1().
Jobs(environment.FromContext(ctx).Namespace()).
DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", sinks.JobSinkNameLabel, jobSink),
})
if err != nil {
t.Error(err)
}
})

f.Assert("No secrets for jobsink are present", verifySecretsForJobSink(jobSink, func(secrets *corev1.SecretList) bool {
return len(secrets.Items) == 0
}))

return f
}

func SuccessTLS() *feature.Feature {
f := feature.NewFeature()

Expand Down Expand Up @@ -239,3 +268,27 @@ func AtLeastOneJobIsComplete(jobSinkName string) feature.StepFn {
t.Errorf("No job is complete:\n%v", string(bytes))
}
}

func verifySecretsForJobSink(jobSink string, verify func(secrets *corev1.SecretList) bool) feature.StepFn {
return func(ctx context.Context, t feature.T) {

interval, timeout := environment.PollTimingsFromContext(ctx)
lastSecretList := &corev1.SecretList{}
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
var err error
lastSecretList, err = kubeclient.Get(ctx).CoreV1().
Secrets(environment.FromContext(ctx).Namespace()).
List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", sinks.JobSinkNameLabel, jobSink),
})
if err != nil {
return false, fmt.Errorf("failed to list secrets: %w", err)
}
return verify(lastSecretList), nil
})
if err != nil {
bytes, _ := json.Marshal(lastSecretList)
t.Errorf("failed to wait for no secrets: %v\nSecret list:\n%s", err, string(bytes))
}
}
}
18 changes: 17 additions & 1 deletion test/rekt/job_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,23 @@ func TestJobSinkSuccess(t *testing.T) {
environment.Managed(t),
)

env.Test(ctx, t, jobsink.Success())
env.Test(ctx, t, jobsink.Success(""))
}

func TestJobSinkDeleteJobCascadeSecretDeletion(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

jobSinkName := feature.MakeRandomK8sName("jobsink")
env.Test(ctx, t, jobsink.Success(jobSinkName))
env.Test(ctx, t, jobsink.DeleteJobsCascadeSecretsDeletion(jobSinkName))
}

func TestJobSinkSuccessTLS(t *testing.T) {
Expand Down

0 comments on commit 9a2dec7

Please sign in to comment.