diff --git a/cmd/jobsink/main.go b/cmd/jobsink/main.go index 4e796c4dea5..162324df25f 100644 --- a/cmd/jobsink/main.go +++ b/cmd/jobsink/main.go @@ -257,6 +257,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + js = js.DeepCopy() // Do not modify informer copy. + js.SetDefaults(ctx) + job := js.Spec.Job.DeepCopy() job.Name = jobName if job.Labels == nil { diff --git a/pkg/apis/sinks/v1alpha1/job_sink_defaults.go b/pkg/apis/sinks/v1alpha1/job_sink_defaults.go index 13f62e868b9..3bd18fbf3f0 100644 --- a/pkg/apis/sinks/v1alpha1/job_sink_defaults.go +++ b/pkg/apis/sinks/v1alpha1/job_sink_defaults.go @@ -18,7 +18,48 @@ package v1alpha1 import ( "context" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" ) func (sink *JobSink) SetDefaults(ctx context.Context) { + if sink.Spec.Job != nil { + setBatchJobDefaults(sink.Spec.Job) + } +} + +func setBatchJobDefaults(job *batchv1.Job) { + for i := range job.Spec.Template.Spec.Containers { + executionModeFound := false + for j := range job.Spec.Template.Spec.Containers[i].Env { + if job.Spec.Template.Spec.Containers[i].Env[j].Name == ExecutionModeEnvVar { + executionModeFound = true + break + } + } + if executionModeFound { + continue + } + job.Spec.Template.Spec.Containers[i].Env = append(job.Spec.Template.Spec.Containers[i].Env, corev1.EnvVar{ + Name: ExecutionModeEnvVar, + Value: string(ExecutionModeBatch), + }) + } + for i := range job.Spec.Template.Spec.InitContainers { + executionModeFound := false + for j := range job.Spec.Template.Spec.InitContainers[i].Env { + if job.Spec.Template.Spec.InitContainers[i].Env[j].Name == ExecutionModeEnvVar { + executionModeFound = true + break + } + } + if executionModeFound { + continue + } + job.Spec.Template.Spec.InitContainers[i].Env = append(job.Spec.Template.Spec.InitContainers[i].Env, corev1.EnvVar{ + Name: ExecutionModeEnvVar, + Value: string(ExecutionModeBatch), + }) + } } diff --git a/pkg/apis/sinks/v1alpha1/job_sink_defaults_test.go b/pkg/apis/sinks/v1alpha1/job_sink_defaults_test.go index 9df0edde347..766ab45b0d6 100644 --- a/pkg/apis/sinks/v1alpha1/job_sink_defaults_test.go +++ b/pkg/apis/sinks/v1alpha1/job_sink_defaults_test.go @@ -21,13 +21,111 @@ import ( "testing" "github.com/google/go-cmp/cmp" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" ) func TestSetDefaults(t *testing.T) { testCases := map[string]struct { initial JobSink expected JobSink - }{} + }{ + "execution mode": { + initial: JobSink{ + Spec: JobSinkSpec{ + Job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "cnt", + Image: "img", + }, + { + Name: "cnt2", + Image: "img2", + }, + { + Name: "cnt3", + Image: "img3", + Env: []corev1.EnvVar{ + {Name: "KNATIVE_EXECUTION_MODE", Value: "something"}, + }, + }, + }, + InitContainers: []corev1.Container{ + { + Name: "cnt", + Image: "img", + }, + { + Name: "cnt-ini2", + Image: "img-ini2", + Env: []corev1.EnvVar{ + {Name: "KNATIVE_EXECUTION_MODE", Value: "something"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: JobSink{ + Spec: JobSinkSpec{ + Job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + InitContainers: []corev1.Container{ + { + Name: "cnt", + Image: "img", + Env: []corev1.EnvVar{ + {Name: "KNATIVE_EXECUTION_MODE", Value: "batch"}, + }, + }, + { + Name: "cnt-ini2", + Image: "img-ini2", + Env: []corev1.EnvVar{ + {Name: "KNATIVE_EXECUTION_MODE", Value: "something"}, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: "cnt", + Image: "img", + Env: []corev1.EnvVar{ + {Name: "KNATIVE_EXECUTION_MODE", Value: "batch"}, + }, + }, + { + Name: "cnt2", + Image: "img2", + Env: []corev1.EnvVar{ + {Name: "KNATIVE_EXECUTION_MODE", Value: "batch"}, + }, + }, + { + Name: "cnt3", + Image: "img3", + Env: []corev1.EnvVar{ + {Name: "KNATIVE_EXECUTION_MODE", Value: "something"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } for n, tc := range testCases { t.Run(n, func(t *testing.T) { tc.initial.SetDefaults(context.TODO()) diff --git a/pkg/apis/sinks/v1alpha1/job_sink_types.go b/pkg/apis/sinks/v1alpha1/job_sink_types.go index 857e9f79a17..da69766bdd0 100644 --- a/pkg/apis/sinks/v1alpha1/job_sink_types.go +++ b/pkg/apis/sinks/v1alpha1/job_sink_types.go @@ -22,9 +22,20 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" +) + +const ( + ExecutionModeEnvVar = "KNATIVE_EXECUTION_MODE" +) + +type ExecutionMode string + +const ( + ExecutionModeBatch ExecutionMode = "batch" ) // +genclient diff --git a/pkg/reconciler/jobsink/jobsink_test.go b/pkg/reconciler/jobsink/jobsink_test.go index fba76c2ef10..5ee0d119f65 100644 --- a/pkg/reconciler/jobsink/jobsink_test.go +++ b/pkg/reconciler/jobsink/jobsink_test.go @@ -263,6 +263,9 @@ func testJob(name string) *batchv1.Job { Containers: []corev1.Container{ { Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KNATIVE_EXECUTION_MODE", Value: "batch"}, + }, }, }, },