Skip to content

Commit

Permalink
Fix resuming JobSets after PodTemplate restore
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Aug 9, 2024
1 parent 8ab9738 commit 80333ad
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 24 deletions.
5 changes: 5 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ const (
CoordinatorKey = "jobset.sigs.k8s.io/coordinator"
)

var (
// the legacy names are no longer defined in the api, only in k/2/apis/batch
JobManagedLabels = []string{"job-name", "controller-uid", batchv1.JobNameLabel, batchv1.ControllerUidLabel}
)

type JobSetConditionType string

// These are built-in conditions of a JobSet.
Expand Down
52 changes: 30 additions & 22 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,6 @@ func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, a
// resumeJobsIfNecessary iterates through each replicatedJob, resuming any suspended jobs if the JobSet
// is not suspended.
func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) error {
// Store pod template for each replicatedJob.
replicatedJobTemplateMap := map[string]corev1.PodTemplateSpec{}
for _, replicatedJob := range js.Spec.ReplicatedJobs {
replicatedJobTemplateMap[replicatedJob.Name] = replicatedJob.Template.Spec.Template
}

// Map each replicatedJob to a list of its active jobs.
replicatedJobToActiveJobs := map[string][]*batchv1.Job{}
for _, job := range activeJobs {
Expand All @@ -422,7 +416,7 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset
if !jobSuspended(job) {
continue
}
if err := r.resumeJob(ctx, job, replicatedJobTemplateMap); err != nil {
if err := r.resumeJob(ctx, job, js, &replicatedJob); err != nil {
return err
}
}
Expand All @@ -440,7 +434,7 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset
return nil
}

func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, replicatedJobTemplateMap map[string]corev1.PodTemplateSpec) error {
func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, js *jobset.JobSet, rjob *jobset.ReplicatedJob) error {
log := ctrl.LoggerFrom(ctx)
// Kubernetes validates that a job template is immutable
// so if the job has started i.e., startTime != nil), we must set it to nil first.
Expand All @@ -452,30 +446,44 @@ func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, repl
}

// Get name of parent replicated job and use it to look up the pod template.
replicatedJobName := job.Labels[jobset.ReplicatedJobNameKey]
replicatedJobPodTemplate := replicatedJobTemplateMap[replicatedJobName]
if job.Labels != nil && job.Labels[jobset.ReplicatedJobNameKey] != "" {
if job.Labels != nil && job.Labels[jobset.ReplicatedJobNameKey] != "" && job.Labels[jobset.JobIndexKey] != "" {
jobIdx, err := strconv.Atoi(job.Labels[jobset.JobIndexKey])
if err != nil {
return err
}
// Certain fields on the Job pod template may be mutated while a JobSet is suspended,
// for integration with Kueue. Ensure these updates are propagated to the child Jobs
// when the JobSet is resumed.
// Merge values rather than overwriting them, since a different controller
// (e.g., the Job controller) may have added labels/annotations/etc to the
// Job that do not exist in the ReplicatedJob pod template.
// We merge the PodTemplate properties from the latest PodTemplate
// in the JobSet, with the PodTemplate corresponding to a newly created
// Job.
newJob, err := constructJob(js, rjob, jobIdx)
if err != nil {
return err
}
// Copy the Job labels managed by the Job controller
for _, labelKey := range jobset.JobManagedLabels {
newJob.Spec.Template.Labels[labelKey] = job.Spec.Template.Labels[labelKey]
}
job.Spec.Template.Labels = collections.MergeMaps(
job.Spec.Template.Labels,
replicatedJobPodTemplate.Labels,
newJob.Spec.Template.Labels,
rjob.Template.Spec.Template.Labels,
)
job.Spec.Template.Annotations = collections.MergeMaps(
job.Spec.Template.Annotations,
replicatedJobPodTemplate.Annotations,
newJob.Spec.Template.Annotations,
rjob.Template.Spec.Template.Annotations,
)
job.Spec.Template.Spec.NodeSelector = collections.MergeMaps(
job.Spec.Template.Spec.NodeSelector,
replicatedJobPodTemplate.Spec.NodeSelector,
newJob.Spec.Template.Spec.NodeSelector,
rjob.Template.Spec.Template.Spec.NodeSelector,
)
job.Spec.Template.Spec.Tolerations = collections.MergeSlices(
job.Spec.Template.Spec.Tolerations,
replicatedJobPodTemplate.Spec.Tolerations,
newJob.Spec.Template.Spec.Tolerations,
rjob.Template.Spec.Template.Spec.Tolerations,
)
job.Spec.Template.Spec.SchedulingGates = collections.MergeSlices(
newJob.Spec.Template.Spec.SchedulingGates,
rjob.Template.Spec.Template.Spec.SchedulingGates,
)
} else {
log.Error(nil, "job missing ReplicatedJobName label")
Expand Down
4 changes: 2 additions & 2 deletions pkg/webhooks/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime.
}
mungedSpec := js.Spec.DeepCopy()

// Allow pod template to be mutated for suspended JobSets.
// Allow pod template to be mutated for suspended JobSets, or JobSets getting suspended.
// This is needed for integration with Kueue/DWS.
if ptr.Deref(oldJS.Spec.Suspend, false) {
if ptr.Deref(oldJS.Spec.Suspend, false) || ptr.Deref(js.Spec.Suspend, false) {
for index := range js.Spec.ReplicatedJobs {
// Pod values which must be mutable for Kueue are defined here: https://github.com/kubernetes-sigs/kueue/blob/a50d395c36a2cb3965be5232162cf1fded1bdb08/apis/kueue/v1beta1/workload_types.go#L256-L260
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Annotations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Annotations
Expand Down
99 changes: 99 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
"sigs.k8s.io/jobset/pkg/util/testing"
Expand Down Expand Up @@ -131,6 +132,82 @@ var _ = ginkgo.Describe("JobSet", func() {
})
})

ginkgo.When("job is suspended and resumed", func() {

ginkgo.FIt("should allow to resume JobSet after restoring PodTemplate", func() {
ctx := context.Background()
js := shortSleepTestJobSet(ns).Obj()
jsKey := types.NamespacedName{Name: js.Name, Namespace: js.Namespace}

ginkgo.By("Create a suspended JobSet", func() {
js.Spec.Suspend = ptr.To(true)
js.Spec.TTLSecondsAfterFinished = ptr.To[int32](5)
gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed())
})

ginkgo.By("Unsuspend the JobSet setting nodeSelectors that prevent pods from being scheduled", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(false)
podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template
if podTemplate.Spec.NodeSelector == nil {
podTemplate.Spec.NodeSelector = make(map[string]string)
}
podTemplate.Spec.NodeSelector["kubernetes.io/hostname"] = "non-existing-node"
if podTemplate.Labels == nil {
podTemplate.Labels = make(map[string]string)
}
podTemplate.Labels["custom-label-key"] = "custom-label-value"
if podTemplate.Annotations == nil {
podTemplate.Annotations = make(map[string]string)
}
podTemplate.Annotations["custom-annotation-key"] = "custom-annotation-value"
podTemplate.Spec.SchedulingGates = []corev1.PodSchedulingGate{
{
Name: "example.com/gate",
},
}
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Await for all Jobs to be active", func() {
gomega.Eventually(func() int32 {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
if js.Status.ReplicatedJobsStatus == nil {
return 0
}
return js.Status.ReplicatedJobsStatus[0].Active
}, timeout, interval).Should(gomega.Equal(js.Spec.ReplicatedJobs[0].Replicas))
})

ginkgo.By("Suspend the JobSet restoring the PodTemplate properties", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(true)
podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template
delete(podTemplate.Spec.NodeSelector, "kubernetes.io/hostname")
delete(podTemplate.Labels, "custom-label-key")
delete(podTemplate.Annotations, "custom-annotation-key")
podTemplate.Spec.SchedulingGates = nil
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Unsuspending the JobSet again with PodTemplate allowing completion", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(false)
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Await for the JobSet to complete successfully", func() {
util.JobSetCompleted(ctx, k8sClient, js, timeout)
})
})
})

}) // end of Describe

// getPingCommand returns ping command for 4 hostnames
Expand Down Expand Up @@ -246,3 +323,25 @@ func sleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper {
Replicas(int32(replicas)).
Obj())
}

func shortSleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper {
jsName := "js"
rjobName := "rjob"
replicas := 3
return testing.MakeJobSet(jsName, ns.Name).
ReplicatedJob(testing.MakeReplicatedJob(rjobName).
Job(testing.MakeJobTemplate("job", ns.Name).
PodSpec(corev1.PodSpec{
RestartPolicy: "Never",
Containers: []corev1.Container{
{
Name: "short-sleep-test-container",
Image: "bash:latest",
Command: []string{"bash", "-c"},
Args: []string{"sleep 1"},
},
},
}).Obj()).
Replicas(int32(replicas)).
Obj())
}

0 comments on commit 80333ad

Please sign in to comment.