From 80333ad5f06b4a63a546683b30b0d8420c11ac42 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 5 Aug 2024 19:55:14 +0200 Subject: [PATCH] Fix resuming JobSets after PodTemplate restore --- api/jobset/v1alpha2/jobset_types.go | 5 ++ pkg/controllers/jobset_controller.go | 52 ++++++++------- pkg/webhooks/jobset_webhook.go | 4 +- test/e2e/e2e_test.go | 99 ++++++++++++++++++++++++++++ 4 files changed, 136 insertions(+), 24 deletions(-) diff --git a/api/jobset/v1alpha2/jobset_types.go b/api/jobset/v1alpha2/jobset_types.go index 1b21665fb..88a5bcd6e 100644 --- a/api/jobset/v1alpha2/jobset_types.go +++ b/api/jobset/v1alpha2/jobset_types.go @@ -51,6 +51,11 @@ const ( CoordinatorKey = "jobset.sigs.k8s.io/coordinator" ) +var ( + // the legacy names are no longer defined in the api, only in k/2/apis/batch + JobManagedLabels = []string{"job-name", "controller-uid", batchv1.JobNameLabel, batchv1.ControllerUidLabel} +) + type JobSetConditionType string // These are built-in conditions of a JobSet. diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index ef76878cb..bdb967596 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -395,12 +395,6 @@ func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, a // resumeJobsIfNecessary iterates through each replicatedJob, resuming any suspended jobs if the JobSet // is not suspended. func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) error { - // Store pod template for each replicatedJob. - replicatedJobTemplateMap := map[string]corev1.PodTemplateSpec{} - for _, replicatedJob := range js.Spec.ReplicatedJobs { - replicatedJobTemplateMap[replicatedJob.Name] = replicatedJob.Template.Spec.Template - } - // Map each replicatedJob to a list of its active jobs. replicatedJobToActiveJobs := map[string][]*batchv1.Job{} for _, job := range activeJobs { @@ -422,7 +416,7 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset if !jobSuspended(job) { continue } - if err := r.resumeJob(ctx, job, replicatedJobTemplateMap); err != nil { + if err := r.resumeJob(ctx, job, js, &replicatedJob); err != nil { return err } } @@ -440,7 +434,7 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset return nil } -func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, replicatedJobTemplateMap map[string]corev1.PodTemplateSpec) error { +func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, js *jobset.JobSet, rjob *jobset.ReplicatedJob) error { log := ctrl.LoggerFrom(ctx) // Kubernetes validates that a job template is immutable // so if the job has started i.e., startTime != nil), we must set it to nil first. @@ -452,30 +446,44 @@ func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, repl } // Get name of parent replicated job and use it to look up the pod template. - replicatedJobName := job.Labels[jobset.ReplicatedJobNameKey] - replicatedJobPodTemplate := replicatedJobTemplateMap[replicatedJobName] - if job.Labels != nil && job.Labels[jobset.ReplicatedJobNameKey] != "" { + if job.Labels != nil && job.Labels[jobset.ReplicatedJobNameKey] != "" && job.Labels[jobset.JobIndexKey] != "" { + jobIdx, err := strconv.Atoi(job.Labels[jobset.JobIndexKey]) + if err != nil { + return err + } // Certain fields on the Job pod template may be mutated while a JobSet is suspended, // for integration with Kueue. Ensure these updates are propagated to the child Jobs // when the JobSet is resumed. - // Merge values rather than overwriting them, since a different controller - // (e.g., the Job controller) may have added labels/annotations/etc to the - // Job that do not exist in the ReplicatedJob pod template. + // We merge the PodTemplate properties from the latest PodTemplate + // in the JobSet, with the PodTemplate corresponding to a newly created + // Job. + newJob, err := constructJob(js, rjob, jobIdx) + if err != nil { + return err + } + // Copy the Job labels managed by the Job controller + for _, labelKey := range jobset.JobManagedLabels { + newJob.Spec.Template.Labels[labelKey] = job.Spec.Template.Labels[labelKey] + } job.Spec.Template.Labels = collections.MergeMaps( - job.Spec.Template.Labels, - replicatedJobPodTemplate.Labels, + newJob.Spec.Template.Labels, + rjob.Template.Spec.Template.Labels, ) job.Spec.Template.Annotations = collections.MergeMaps( - job.Spec.Template.Annotations, - replicatedJobPodTemplate.Annotations, + newJob.Spec.Template.Annotations, + rjob.Template.Spec.Template.Annotations, ) job.Spec.Template.Spec.NodeSelector = collections.MergeMaps( - job.Spec.Template.Spec.NodeSelector, - replicatedJobPodTemplate.Spec.NodeSelector, + newJob.Spec.Template.Spec.NodeSelector, + rjob.Template.Spec.Template.Spec.NodeSelector, ) job.Spec.Template.Spec.Tolerations = collections.MergeSlices( - job.Spec.Template.Spec.Tolerations, - replicatedJobPodTemplate.Spec.Tolerations, + newJob.Spec.Template.Spec.Tolerations, + rjob.Template.Spec.Template.Spec.Tolerations, + ) + job.Spec.Template.Spec.SchedulingGates = collections.MergeSlices( + newJob.Spec.Template.Spec.SchedulingGates, + rjob.Template.Spec.Template.Spec.SchedulingGates, ) } else { log.Error(nil, "job missing ReplicatedJobName label") diff --git a/pkg/webhooks/jobset_webhook.go b/pkg/webhooks/jobset_webhook.go index f66395901..3cdf589f0 100644 --- a/pkg/webhooks/jobset_webhook.go +++ b/pkg/webhooks/jobset_webhook.go @@ -314,9 +314,9 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime. } mungedSpec := js.Spec.DeepCopy() - // Allow pod template to be mutated for suspended JobSets. + // Allow pod template to be mutated for suspended JobSets, or JobSets getting suspended. // This is needed for integration with Kueue/DWS. - if ptr.Deref(oldJS.Spec.Suspend, false) { + if ptr.Deref(oldJS.Spec.Suspend, false) || ptr.Deref(js.Spec.Suspend, false) { for index := range js.Spec.ReplicatedJobs { // Pod values which must be mutable for Kueue are defined here: https://github.com/kubernetes-sigs/kueue/blob/a50d395c36a2cb3965be5232162cf1fded1bdb08/apis/kueue/v1beta1/workload_types.go#L256-L260 mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Annotations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Annotations diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 8c713a721..410141e2a 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" "sigs.k8s.io/jobset/pkg/util/testing" @@ -131,6 +132,82 @@ var _ = ginkgo.Describe("JobSet", func() { }) }) + ginkgo.When("job is suspended and resumed", func() { + + ginkgo.FIt("should allow to resume JobSet after restoring PodTemplate", func() { + ctx := context.Background() + js := shortSleepTestJobSet(ns).Obj() + jsKey := types.NamespacedName{Name: js.Name, Namespace: js.Namespace} + + ginkgo.By("Create a suspended JobSet", func() { + js.Spec.Suspend = ptr.To(true) + js.Spec.TTLSecondsAfterFinished = ptr.To[int32](5) + gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed()) + }) + + ginkgo.By("Unsuspend the JobSet setting nodeSelectors that prevent pods from being scheduled", func() { + gomega.Eventually(func() error { + gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed()) + js.Spec.Suspend = ptr.To(false) + podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template + if podTemplate.Spec.NodeSelector == nil { + podTemplate.Spec.NodeSelector = make(map[string]string) + } + podTemplate.Spec.NodeSelector["kubernetes.io/hostname"] = "non-existing-node" + if podTemplate.Labels == nil { + podTemplate.Labels = make(map[string]string) + } + podTemplate.Labels["custom-label-key"] = "custom-label-value" + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + podTemplate.Annotations["custom-annotation-key"] = "custom-annotation-value" + podTemplate.Spec.SchedulingGates = []corev1.PodSchedulingGate{ + { + Name: "example.com/gate", + }, + } + return k8sClient.Update(ctx, js) + }, timeout, interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Await for all Jobs to be active", func() { + gomega.Eventually(func() int32 { + gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed()) + if js.Status.ReplicatedJobsStatus == nil { + return 0 + } + return js.Status.ReplicatedJobsStatus[0].Active + }, timeout, interval).Should(gomega.Equal(js.Spec.ReplicatedJobs[0].Replicas)) + }) + + ginkgo.By("Suspend the JobSet restoring the PodTemplate properties", func() { + gomega.Eventually(func() error { + gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed()) + js.Spec.Suspend = ptr.To(true) + podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template + delete(podTemplate.Spec.NodeSelector, "kubernetes.io/hostname") + delete(podTemplate.Labels, "custom-label-key") + delete(podTemplate.Annotations, "custom-annotation-key") + podTemplate.Spec.SchedulingGates = nil + return k8sClient.Update(ctx, js) + }, timeout, interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Unsuspending the JobSet again with PodTemplate allowing completion", func() { + gomega.Eventually(func() error { + gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed()) + js.Spec.Suspend = ptr.To(false) + return k8sClient.Update(ctx, js) + }, timeout, interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Await for the JobSet to complete successfully", func() { + util.JobSetCompleted(ctx, k8sClient, js, timeout) + }) + }) + }) + }) // end of Describe // getPingCommand returns ping command for 4 hostnames @@ -246,3 +323,25 @@ func sleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper { Replicas(int32(replicas)). Obj()) } + +func shortSleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper { + jsName := "js" + rjobName := "rjob" + replicas := 3 + return testing.MakeJobSet(jsName, ns.Name). + ReplicatedJob(testing.MakeReplicatedJob(rjobName). + Job(testing.MakeJobTemplate("job", ns.Name). + PodSpec(corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "short-sleep-test-container", + Image: "bash:latest", + Command: []string{"bash", "-c"}, + Args: []string{"sleep 1"}, + }, + }, + }).Obj()). + Replicas(int32(replicas)). + Obj()) +}