diff --git a/cmd/jobsink/main.go b/cmd/jobsink/main.go index aa9d3ed5294..b0d66298fec 100644 --- a/cmd/jobsink/main.go +++ b/cmd/jobsink/main.go @@ -257,8 +257,6 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - logger.Debug("Creating job for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) - job := js.Spec.Job.DeepCopy() job.Name = jobName if job.Labels == nil { @@ -314,6 +312,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { }) } + logger.Debug("Creating job for event", + zap.String("URI", r.RequestURI), + zap.String("jobName", jobName), + zap.Any("job", job), + ) + createdJob, err := h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{}) if err != nil && !apierrors.IsAlreadyExists(err) { logger.Warn("Failed to create job", zap.Error(err)) @@ -322,8 +326,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } - - logger.Debug("Creating secret for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) + if apierrors.IsAlreadyExists(err) { + logger.Debug("Job already exists", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) + } secret := &corev1.Secret{ TypeMeta: metav1.TypeMeta{}, @@ -340,7 +345,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Kind: "Job", Name: createdJob.Name, UID: createdJob.UID, - Controller: ptr.Bool(false), + Controller: ptr.Bool(true), BlockOwnerDeletion: ptr.Bool(false), }, }, @@ -350,6 +355,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Type: corev1.SecretTypeOpaque, } + logger.Debug("Creating secret for event", + zap.String("URI", r.RequestURI), + zap.String("jobName", jobName), + zap.Any("secret.metadata", secret.ObjectMeta), + ) + _, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{}) if err != nil && !apierrors.IsAlreadyExists(err) { logger.Warn("Failed to create secret", zap.Error(err)) @@ -358,6 +369,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } + if apierrors.IsAlreadyExists(err) { + logger.Debug("Secret already exists", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) + } w.Header().Add("Location", locationHeader(ref, event.Source(), event.ID())) w.WriteHeader(http.StatusAccepted) diff --git a/hack/e2e-debug.sh b/hack/e2e-debug.sh index b9650e467b3..b2bc01bccd6 100755 --- a/hack/e2e-debug.sh +++ b/hack/e2e-debug.sh @@ -35,4 +35,4 @@ wait_until_pods_running knative-eventing || fail_test "Pods in knative-eventing header "Running tests" -go test -tags=e2e -v -timeout=30m -run="${test_name}" "${test_dir}" || fail_test "Test(s) failed" +go test -tags=e2e -v -timeout=30m -parallel=12 -run="${test_name}" "${test_dir}" || fail_test "Test(s) failed" diff --git a/test/rekt/features/jobsink/jobsink.go b/test/rekt/features/jobsink/jobsink.go index 437f41cfdd3..4ad54779160 100644 --- a/test/rekt/features/jobsink/jobsink.go +++ b/test/rekt/features/jobsink/jobsink.go @@ -95,9 +95,10 @@ func DeleteJobsCascadeSecretsDeletion(jobSink string) *feature.Feature { })) f.Requirement("delete jobs for jobsink", func(ctx context.Context, t feature.T) { + policy := metav1.DeletePropagationBackground err := kubeclient.Get(ctx).BatchV1(). Jobs(environment.FromContext(ctx).Namespace()). - DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ + DeleteCollection(ctx, metav1.DeleteOptions{PropagationPolicy: &policy}, metav1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s", sinks.JobSinkNameLabel, jobSink), }) if err != nil {