diff --git a/cmd/jobsink/main.go b/cmd/jobsink/main.go index cecb40796f5..567b3a9e004 100644 --- a/cmd/jobsink/main.go +++ b/cmd/jobsink/main.go @@ -258,8 +258,6 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - logger.Debug("Creating job for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) - job := js.Spec.Job.DeepCopy() job.Name = jobName if job.Labels == nil { @@ -315,6 +313,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { }) } + logger.Debug("Creating job for event", + zap.String("URI", r.RequestURI), + zap.String("jobName", jobName), + zap.Any("job", job), + ) + createdJob, err := h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{}) if err != nil && !apierrors.IsAlreadyExists(err) { logger.Warn("Failed to create job", zap.Error(err)) @@ -323,8 +327,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } - - logger.Debug("Creating secret for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) + if apierrors.IsAlreadyExists(err) { + logger.Debug("Job already exists", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) + } secret := &corev1.Secret{ TypeMeta: metav1.TypeMeta{}, @@ -341,7 +346,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Kind: "Job", Name: createdJob.Name, UID: createdJob.UID, - Controller: ptr.Bool(false), + Controller: ptr.Bool(true), BlockOwnerDeletion: ptr.Bool(false), }, }, @@ -351,6 +356,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Type: corev1.SecretTypeOpaque, } + logger.Debug("Creating secret for event", + zap.String("URI", r.RequestURI), + zap.String("jobName", jobName), + zap.Any("secret.metadata", secret.ObjectMeta), + ) + _, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{}) if err != nil && !apierrors.IsAlreadyExists(err) { logger.Warn("Failed to create secret", zap.Error(err)) @@ -359,6 +370,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } + if apierrors.IsAlreadyExists(err) { + logger.Debug("Secret already exists", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) + } w.Header().Add("Location", locationHeader(ref, event.Source(), event.ID())) w.WriteHeader(http.StatusAccepted) diff --git a/hack/e2e-debug.sh b/hack/e2e-debug.sh index b9650e467b3..b2bc01bccd6 100755 --- a/hack/e2e-debug.sh +++ b/hack/e2e-debug.sh @@ -35,4 +35,4 @@ wait_until_pods_running knative-eventing || fail_test "Pods in knative-eventing header "Running tests" -go test -tags=e2e -v -timeout=30m -run="${test_name}" "${test_dir}" || fail_test "Test(s) failed" +go test -tags=e2e -v -timeout=30m -parallel=12 -run="${test_name}" "${test_dir}" || fail_test "Test(s) failed" diff --git a/test/rekt/features/jobsink/jobsink.go b/test/rekt/features/jobsink/jobsink.go index 0586b8d68b7..4d3fb117aee 100644 --- a/test/rekt/features/jobsink/jobsink.go +++ b/test/rekt/features/jobsink/jobsink.go @@ -90,9 +90,10 @@ func DeleteJobsCascadeSecretsDeletion(jobSink string) *feature.Feature { })) f.Requirement("delete jobs for jobsink", func(ctx context.Context, t feature.T) { + policy := metav1.DeletePropagationBackground err := kubeclient.Get(ctx).BatchV1(). Jobs(environment.FromContext(ctx).Namespace()). - DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ + DeleteCollection(ctx, metav1.DeleteOptions{PropagationPolicy: &policy}, metav1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s", sinks.JobSinkNameLabel, jobSink), }) if err != nil {