Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resuming JobSet after restoring PodTemplate (by Jobs update) #640

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ const (
CoordinatorKey = "jobset.sigs.k8s.io/coordinator"
)

var (
// the legacy names are no longer defined in the api, only in k/2/apis/batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not introduce the legacy labels now? I think we can remove the original ones now that 1.27 is out of support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to copy them, because Job controller still sets them.

Here is an example Pod template created on cluster 1.30.2:

        batch.kubernetes.io/controller-uid: c46c3b0d-76d8-427b-bda6-a9cb06c77cfd
        batch.kubernetes.io/job-name: sample-job-a-lv6wd
        controller-uid: c46c3b0d-76d8-427b-bda6-a9cb06c77cfd
        job-name: sample-job-a-lv6wd

If they are not present on the original Job template (null) I wouldn't copy them to the newJob template.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think there is no reason to use them in JobSet even if they are used in the Job API. We won't ever drop them in kube api but they are really for public consumption.

Copy link
Contributor Author

@mimowo mimowo Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I don't insert / generate them, just copy over from old template to the new, if they are already present on the old.

I'm also not sure this will pass validation, let me test.

Copy link
Contributor Author

@mimowo mimowo Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not need to expose the details of those labels and copy all labels.

For now I don't see another way of fixing the bug, either:

  1. copy over the Job labels,
  2. delete the Jobs and recreate the Jobs, letting API server default all the fields / labels

This is partly why I prefer (2.) - and implemented initially in #625. However, I understand that updating the Jobs might be preferred as deleting and recreating is a bigger change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that 1 could be done just by copying ALL labels from the previous job.

Copy link
Contributor Author

@mimowo mimowo Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So could we copy all the labels from the old Job without looping over the special managedByJob ones?

Consider the following transitions by Kueue: resume (RF1) -> suspend -> resume (RF2).

It is possible that resuming on RF2 does not add the same labels as for RF1. If we copied all labels, then we would always trail the old ones from RF1.

Indeed, from practical standpoint, this is mostly problem for nodeSelectors, where Kueue may want to choose a new set of nodeSelectors for RF2. I would be ok to accept this issue for labels as it does not have much of a practical issue for Kueue currently, but it would be a "known issue", and may hit us one day.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that 1 could be done just by copying ALL labels from the previous job.

Seems like a race condition :). Consider the scenario I describe here: #640 (comment). The code will then remain problematic if RF1 adds a label, which is not expected in RF2, because with this approach we would never get rid of it.

It will remain problematic in some corner cases and hard to debug. It will also be very surprising why we restore nodeSelectors, but not labels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I synced with @kannon92 on slack and there is a simpler solution which I implement here: #644. It does not cover fully the case of "restoring" a PodTemplate, but it should cover most cases where the admission to RF2 overrides the previous values.

This is needed anyway as the first step, and part of this PR. We can return to these bits if really shown they are needed.

JobManagedLabels = []string{"job-name", "controller-uid", batchv1.JobNameLabel, batchv1.ControllerUidLabel}
)

type JobSetConditionType string

// These are built-in conditions of a JobSet.
Expand Down
52 changes: 30 additions & 22 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,6 @@ func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, a
// resumeJobsIfNecessary iterates through each replicatedJob, resuming any suspended jobs if the JobSet
// is not suspended.
func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) error {
// Store pod template for each replicatedJob.
replicatedJobTemplateMap := map[string]corev1.PodTemplateSpec{}
for _, replicatedJob := range js.Spec.ReplicatedJobs {
replicatedJobTemplateMap[replicatedJob.Name] = replicatedJob.Template.Spec.Template
}

// Map each replicatedJob to a list of its active jobs.
replicatedJobToActiveJobs := map[string][]*batchv1.Job{}
for _, job := range activeJobs {
Expand All @@ -422,7 +416,7 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset
if !jobSuspended(job) {
continue
}
if err := r.resumeJob(ctx, job, replicatedJobTemplateMap); err != nil {
if err := r.resumeJob(ctx, job, js, &replicatedJob); err != nil {
return err
}
}
Expand All @@ -440,7 +434,7 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset
return nil
}

func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, replicatedJobTemplateMap map[string]corev1.PodTemplateSpec) error {
func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, js *jobset.JobSet, rjob *jobset.ReplicatedJob) error {
log := ctrl.LoggerFrom(ctx)
// Kubernetes validates that a job template is immutable
// so if the job has started i.e., startTime != nil), we must set it to nil first.
Expand All @@ -452,30 +446,44 @@ func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, repl
}

// Get name of parent replicated job and use it to look up the pod template.
replicatedJobName := job.Labels[jobset.ReplicatedJobNameKey]
replicatedJobPodTemplate := replicatedJobTemplateMap[replicatedJobName]
if job.Labels != nil && job.Labels[jobset.ReplicatedJobNameKey] != "" {
if job.Labels != nil && job.Labels[jobset.ReplicatedJobNameKey] != "" && job.Labels[jobset.JobIndexKey] != "" {
mimowo marked this conversation as resolved.
Show resolved Hide resolved
jobIdx, err := strconv.Atoi(job.Labels[jobset.JobIndexKey])
if err != nil {
return err
}
// Certain fields on the Job pod template may be mutated while a JobSet is suspended,
// for integration with Kueue. Ensure these updates are propagated to the child Jobs
// when the JobSet is resumed.
// Merge values rather than overwriting them, since a different controller
// (e.g., the Job controller) may have added labels/annotations/etc to the
// Job that do not exist in the ReplicatedJob pod template.
// We merge the PodTemplate properties from the latest PodTemplate
// in the JobSet, with the PodTemplate corresponding to a newly created
// Job.
newJob, err := constructJob(js, rjob, jobIdx)
if err != nil {
return err
}
// Copy the Job labels managed by the Job controller
for _, labelKey := range jobset.JobManagedLabels {
newJob.Spec.Template.Labels[labelKey] = job.Spec.Template.Labels[labelKey]
}
job.Spec.Template.Labels = collections.MergeMaps(
job.Spec.Template.Labels,
replicatedJobPodTemplate.Labels,
newJob.Spec.Template.Labels,
rjob.Template.Spec.Template.Labels,
mimowo marked this conversation as resolved.
Show resolved Hide resolved
)
job.Spec.Template.Annotations = collections.MergeMaps(
job.Spec.Template.Annotations,
replicatedJobPodTemplate.Annotations,
newJob.Spec.Template.Annotations,
rjob.Template.Spec.Template.Annotations,
)
job.Spec.Template.Spec.NodeSelector = collections.MergeMaps(
job.Spec.Template.Spec.NodeSelector,
replicatedJobPodTemplate.Spec.NodeSelector,
newJob.Spec.Template.Spec.NodeSelector,
rjob.Template.Spec.Template.Spec.NodeSelector,
)
job.Spec.Template.Spec.Tolerations = collections.MergeSlices(
job.Spec.Template.Spec.Tolerations,
replicatedJobPodTemplate.Spec.Tolerations,
newJob.Spec.Template.Spec.Tolerations,
rjob.Template.Spec.Template.Spec.Tolerations,
)
job.Spec.Template.Spec.SchedulingGates = collections.MergeSlices(
newJob.Spec.Template.Spec.SchedulingGates,
rjob.Template.Spec.Template.Spec.SchedulingGates,
)
} else {
log.Error(nil, "job missing ReplicatedJobName label")
Expand Down
4 changes: 2 additions & 2 deletions pkg/webhooks/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime.
}
mungedSpec := js.Spec.DeepCopy()

// Allow pod template to be mutated for suspended JobSets.
// Allow pod template to be mutated for suspended JobSets, or JobSets getting suspended.
// This is needed for integration with Kueue/DWS.
if ptr.Deref(oldJS.Spec.Suspend, false) {
if ptr.Deref(oldJS.Spec.Suspend, false) || ptr.Deref(js.Spec.Suspend, false) {
for index := range js.Spec.ReplicatedJobs {
// Pod values which must be mutable for Kueue are defined here: https://github.com/kubernetes-sigs/kueue/blob/a50d395c36a2cb3965be5232162cf1fded1bdb08/apis/kueue/v1beta1/workload_types.go#L256-L260
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Annotations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Annotations
Expand Down
83 changes: 80 additions & 3 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
"sigs.k8s.io/jobset/pkg/util/testing"
Expand Down Expand Up @@ -112,7 +113,7 @@ var _ = ginkgo.Describe("JobSet", func() {
// Create JobSet.
testFinalizer := "fake.example.com/blockDeletion"
ginkgo.By("creating jobset with ttl seconds after finished")
js := sleepTestJobSet(ns).Finalizers([]string{testFinalizer}).TTLSecondsAfterFinished(5).Obj()
js := sleepTestJobSet(ns, 20).Finalizers([]string{testFinalizer}).TTLSecondsAfterFinished(5).Obj()

// Verify jobset created successfully.
ginkgo.By("checking that jobset creation succeeds")
Expand All @@ -131,6 +132,82 @@ var _ = ginkgo.Describe("JobSet", func() {
})
})

ginkgo.When("job is suspended and resumed", func() {
mimowo marked this conversation as resolved.
Show resolved Hide resolved

ginkgo.It("should allow to resume JobSet after restoring PodTemplate", func() {
ctx := context.Background()
js := sleepTestJobSet(ns, 1).Obj()
jsKey := types.NamespacedName{Name: js.Name, Namespace: js.Namespace}

ginkgo.By("Create a suspended JobSet", func() {
js.Spec.Suspend = ptr.To(true)
js.Spec.TTLSecondsAfterFinished = ptr.To[int32](5)
gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed())
})

ginkgo.By("Unsuspend the JobSet setting nodeSelectors that prevent pods from being scheduled", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(false)
podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template
if podTemplate.Spec.NodeSelector == nil {
podTemplate.Spec.NodeSelector = make(map[string]string)
}
podTemplate.Spec.NodeSelector["kubernetes.io/hostname"] = "non-existing-node"
if podTemplate.Labels == nil {
podTemplate.Labels = make(map[string]string)
}
podTemplate.Labels["custom-label-key"] = "custom-label-value"
if podTemplate.Annotations == nil {
podTemplate.Annotations = make(map[string]string)
}
podTemplate.Annotations["custom-annotation-key"] = "custom-annotation-value"
podTemplate.Spec.SchedulingGates = []corev1.PodSchedulingGate{
{
Name: "example.com/gate",
},
}
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Await for all Jobs to be active", func() {
gomega.Eventually(func() int32 {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
if js.Status.ReplicatedJobsStatus == nil {
return 0
}
return js.Status.ReplicatedJobsStatus[0].Active
}, timeout, interval).Should(gomega.Equal(js.Spec.ReplicatedJobs[0].Replicas))
})

ginkgo.By("Suspend the JobSet restoring the PodTemplate properties", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(true)
podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template
delete(podTemplate.Spec.NodeSelector, "kubernetes.io/hostname")
delete(podTemplate.Labels, "custom-label-key")
delete(podTemplate.Annotations, "custom-annotation-key")
podTemplate.Spec.SchedulingGates = nil
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Unsuspending the JobSet again with PodTemplate allowing completion", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(false)
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Await for the JobSet to complete successfully", func() {
util.JobSetCompleted(ctx, k8sClient, js, timeout)
})
})
})

}) // end of Describe

// getPingCommand returns ping command for 4 hostnames
Expand Down Expand Up @@ -225,7 +302,7 @@ func pingTestJobSetSubdomain(ns *corev1.Namespace) *testing.JobSetWrapper {
Obj())
}

func sleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper {
func sleepTestJobSet(ns *corev1.Namespace, durationSeconds int32) *testing.JobSetWrapper {
jsName := "js"
rjobName := "rjob"
replicas := 4
Expand All @@ -239,7 +316,7 @@ func sleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper {
Name: "sleep-test-container",
Image: "bash:latest",
Command: []string{"bash", "-c"},
Args: []string{"sleep 20"},
Args: []string{fmt.Sprintf("sleep %d", durationSeconds)},
},
},
}).Obj()).
Expand Down