Skip to content

Commit

Permalink
Check job counts equal admission counts when unsuspended (#934)
Browse files Browse the repository at this point in the history
* Check job counts equal admission counts when unsuspended

Change-Id: I181f49577fbada739df373b014e0d5794be118ce

* Unit test for equivalency check in job reconciler

Change-Id: I6aff298f10f7f51ca80eb007c9a461f7e99f79e0

* Compare error for non-matching workloads

Change-Id: I974476314e3133dd9431e7179048c5b2da2629d9
  • Loading branch information
alculquicondor authored Jul 7, 2023
1 parent cc8b0f9 commit 07fa280
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 16 deletions.
2 changes: 2 additions & 0 deletions pkg/controller/jobframework/integrationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var (
Expand All @@ -35,6 +36,7 @@ var (
)

type JobReconcilerInterface interface {
reconcile.Reconciler
SetupWithManager(mgr ctrl.Manager) error
}

Expand Down
38 changes: 22 additions & 16 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var (
ErrChildJobOwnerNotFound = fmt.Errorf("owner isn't set even though %s annotation is set", controllerconsts.ParentWorkloadAnnotation)
ErrUnknownWorkloadOwner = errors.New("workload owner is unknown")
ErrWorkloadOwnerNotFound = errors.New("workload owner not found")
ErrNoMatchingWorkloads = errors.New("no matching workloads")
ErrExtraWorkloads = errors.New("extra workloads")
)

// JobReconciler reconciles a GenericJob object
Expand Down Expand Up @@ -159,7 +161,6 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
// If there's no workload exists and job is unsuspended, we'll stop it immediately.
wl, err := r.ensureOneWorkload(ctx, job, object)
if err != nil {
log.Error(err, "Getting existing workloads")
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -337,31 +338,29 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
w = &workloads.Items[0]
}
if err := r.stopJob(ctx, job, object, w, "No matching Workload"); err != nil {
log.Error(err, "stopping job")
return nil, fmt.Errorf("stopping job with no matching workload: %w", err)
}
}

// Delete duplicate workload instances.
existedWls := 0
for i := range toDelete {
err := r.client.Delete(ctx, toDelete[i])
if err == nil || !apierrors.IsNotFound(err) {
existedWls++
}
if err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "Failed to delete workload")
return nil, fmt.Errorf("deleting not matching workload: %w", err)
}
if err == nil {
existedWls++
r.record.Eventf(object, corev1.EventTypeNormal, "DeletedWorkload",
"Deleted not matching Workload: %v", workload.Key(toDelete[i]))
}
}

if existedWls != 0 {
if match == nil {
return nil, fmt.Errorf("no matching workload was found, tried deleting %d existing workload(s)", existedWls)
return nil, fmt.Errorf("%w: deleted %d workloads", ErrNoMatchingWorkloads, len(workloads.Items))
}
return nil, fmt.Errorf("only one workload should exist, found %d", len(workloads.Items))
return nil, fmt.Errorf("%w: deleted %d workloads", ErrExtraWorkloads, len(workloads.Items))
}

return match, nil
Expand All @@ -378,22 +377,29 @@ func (r *JobReconciler) equivalentToWorkload(job GenericJob, object client.Objec

jobPodSets := resetMinCounts(job.PodSets())

if !workload.CanBePartiallyAdmitted(wl) || job.IsSuspended() {
if !workload.CanBePartiallyAdmitted(wl) || !workload.IsAdmitted(wl) {
// the two sets should fully match.
return equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets, true)
}

// If the job is not suspended, the podSet counts of the job could be lower that
// the initial values, in case of partial admission.
// Check everything but the pod counts.
if !equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets, false) {
return false
}

// Make sure the job podSets don't have higher counts, and use more resources that it originally
// requested.
// the ComparePodSetSlices fails if the lengths are not equal, don't check it once more
for i := range jobPodSets {
if jobPodSets[i].Count > wl.Spec.PodSets[i].Count || jobPodSets[i].Count < pointer.Int32Deref(wl.Spec.PodSets[i].MinCount, wl.Spec.PodSets[i].Count) {
// If the workload is admitted but the job is suspended, ignore counts.
// This might allow some violating jobs to pass equivalency checks, but their
// workloads would be invalidated in the next sync after unsuspending.
if job.IsSuspended() {
return true
}

for i, psAssigment := range wl.Status.Admission.PodSetAssignments {
assignedCount := wl.Spec.PodSets[i].Count
if jobPodSets[i].MinCount != nil {
assignedCount = pointer.Int32Deref(psAssigment.Count, assignedCount)
}
if jobPodSets[i].Count != assignedCount {
return false
}
}
Expand Down
180 changes: 180 additions & 0 deletions pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,20 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/pointer"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
utiltestingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
)

Expand Down Expand Up @@ -286,3 +295,174 @@ func TestPodSets(t *testing.T) {
})
}
}

var (
jobCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(batchv1.Job{}, "TypeMeta", "ObjectMeta"),
}
workloadCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.SortSlices(func(a, b kueue.Workload) bool {
return a.Name < b.Name
}),
cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta", "ObjectMeta"),
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
}
)

func TestReconciler(t *testing.T) {
defer features.SetFeatureGateDuringTest(t, features.PartialAdmission, true)()
cases := map[string]struct {
job batchv1.Job
workloads []kueue.Workload
wantErr error
wantJob batchv1.Job
wantWorkloads []kueue.Workload
}{
"suspended job with matching admitted workload is unsuspended": {
job: *utiltestingjob.MakeJob("job", "ns").
Suspend(true).
Parallelism(10).
Request(corev1.ResourceCPU, "1").
Image("", nil).
Obj(),
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").
PodSets(*utiltesting.MakePodSet("main", 10).Request(corev1.ResourceCPU, "1").Obj()).
Admit(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()).
Obj(),
},
wantJob: *utiltestingjob.MakeJob("job", "ns").
Suspend(false).
Parallelism(10).
Request(corev1.ResourceCPU, "1").
Image("", nil).
Obj(),
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").
PodSets(*utiltesting.MakePodSet("main", 10).Request(corev1.ResourceCPU, "1").Obj()).
Admit(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()).
Obj(),
},
},
"non-matching admitted workload is deleted": {
job: *utiltestingjob.MakeJob("job", "ns").
Suspend(true).
Parallelism(10).
Request(corev1.ResourceCPU, "1").
Image("", nil).
Obj(),
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").
PodSets(*utiltesting.MakePodSet("main", 5).Request(corev1.ResourceCPU, "1").Obj()).
Admit(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()).
Obj(),
},
wantErr: jobframework.ErrNoMatchingWorkloads,
wantJob: *utiltestingjob.MakeJob("job", "ns").
Suspend(true).
Parallelism(10).
Request(corev1.ResourceCPU, "1").
Image("", nil).
Obj(),
},
"suspended job with partial admission and admitted workload is unsuspended": {
job: *utiltestingjob.MakeJob("job", "ns").
SetAnnotation(JobMinParallelismAnnotation, "5").
Suspend(true).
Parallelism(10).
Request(corev1.ResourceCPU, "1").
Image("", nil).
Obj(),
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").
PodSets(*utiltesting.MakePodSet("main", 10).SetMinimumCount(5).Request(corev1.ResourceCPU, "1").Obj()).
Admit(utiltesting.MakeAdmission("cq").AssignmentPodCount(8).Obj()).
Obj(),
},
wantJob: *utiltestingjob.MakeJob("job", "ns").
SetAnnotation(JobMinParallelismAnnotation, "5").
Suspend(false).
Parallelism(8).
Request(corev1.ResourceCPU, "1").
Image("", nil).
Obj(),
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").
PodSets(*utiltesting.MakePodSet("main", 10).SetMinimumCount(5).Request(corev1.ResourceCPU, "1").Obj()).
Admit(utiltesting.MakeAdmission("cq").AssignmentPodCount(8).Obj()).
Obj(),
},
},
"unsuspended job with partial admission and non-matching admitted workload is suspended and workload is deleted": {
job: *utiltestingjob.MakeJob("job", "ns").
SetAnnotation(JobMinParallelismAnnotation, "5").
Suspend(false).
Parallelism(10).
Request(corev1.ResourceCPU, "1").
Image("", nil).
Obj(),
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").
PodSets(*utiltesting.MakePodSet("main", 10).SetMinimumCount(5).Request(corev1.ResourceCPU, "1").Obj()).
Admit(utiltesting.MakeAdmission("cq").AssignmentPodCount(8).Obj()).
Obj(),
},
wantErr: jobframework.ErrNoMatchingWorkloads,
wantJob: *utiltestingjob.MakeJob("job", "ns").
SetAnnotation(JobMinParallelismAnnotation, "5").
Suspend(true).
Parallelism(10).
Request(corev1.ResourceCPU, "1").
Image("", nil).
Obj(),
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
clientBuilder := utiltesting.NewClientBuilder()
if err := SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)); err != nil {
t.Fatalf("Could not setup indexes: %v", err)
}
kClient := clientBuilder.
WithObjects(&tc.job).
Build()
for i := range tc.workloads {
if err := ctrl.SetControllerReference(&tc.job, &tc.workloads[i], kClient.Scheme()); err != nil {
t.Fatalf("Could not setup owner reference in Workloads: %v", err)
}
if err := kClient.Create(ctx, &tc.workloads[i]); err != nil {
t.Fatalf("Could not create workload: %v", err)
}
}
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(kClient.Scheme(), corev1.EventSource{Component: "test"})
reconciler := NewReconciler(kClient, recorder, jobframework.WithManageJobsWithoutQueueName(true))

jobKey := client.ObjectKeyFromObject(&tc.job)
_, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: jobKey,
})
if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" {
t.Errorf("Reconcile returned error (-want,+got):\n%s", diff)
}

var gotJob batchv1.Job
if err := kClient.Get(ctx, jobKey, &gotJob); err != nil {
t.Fatalf("Could not get Job after reconcile: %v", err)
}
if diff := cmp.Diff(tc.wantJob, gotJob, jobCmpOpts...); diff != "" {
t.Errorf("Job after reconcile (-want,+got):\n%s", diff)
}
var gotWorkloads kueue.WorkloadList
if err := kClient.List(ctx, &gotWorkloads); err != nil {
t.Fatalf("Could not get Workloads after reconcile: %v", err)
}
if diff := cmp.Diff(tc.wantWorkloads, gotWorkloads.Items, workloadCmpOpts...); diff != "" {
t.Errorf("Workloads after reconcile (-want,+got):\n%s", diff)
}
})
}
}
15 changes: 15 additions & 0 deletions pkg/util/testing/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package testing

import (
"context"

"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -49,3 +51,16 @@ func NewClientBuilder(addToSchemes ...func(s *runtime.Scheme) error) *fake.Clien
WithIndex(&kueue.Workload{}, indexer.WorkloadQueueKey, indexer.IndexWorkloadQueue).
WithIndex(&kueue.Workload{}, indexer.WorkloadClusterQueueKey, indexer.IndexWorkloadClusterQueue)
}

type builderIndexer struct {
*fake.ClientBuilder
}

func (b *builderIndexer) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
b.ClientBuilder = b.ClientBuilder.WithIndex(obj, field, extractValue)
return nil
}

func AsIndexer(builder *fake.ClientBuilder) client.FieldIndexer {
return &builderIndexer{ClientBuilder: builder}
}

0 comments on commit 07fa280

Please sign in to comment.