From d8b8b347ae33cbfd32bf5df81797d2d715724e87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Szadkowski?= Date: Wed, 18 Sep 2024 20:09:27 +0200 Subject: [PATCH] [Feature] Support managed by external controller (#2203) * Introduce ManagedBy field to RunPolicy that is used by each Kubeflow Job Spec Signed-off-by: Michal Szadkowski * Update Kubeflow JOb manifests Signed-off-by: Michal Szadkowski * Update Kubeflow Jobs Reconcile to use ManagedBy field to decide if skip the process Signed-off-by: Michal Szadkowski * job controller test Signed-off-by: Michal Szadkowski * spec validation webhook Signed-off-by: Michal Szadkowski * add manageBy maxLenght const Signed-off-by: Michal Szadkowski * generate new manifest Signed-off-by: Michal Szadkowski * revert webhook formatting Signed-off-by: Michal Szadkowski * Move allowed controllers constants in one place Signed-off-by: Michal Szadkowski * Make validatation for allowed managedBy values Signed-off-by: Michal Szadkowski * Update after controllers constants move Signed-off-by: Michal Szadkowski * Update jobs controller tests Signed-off-by: Michal Szadkowski * Update validateManagedBy webhook Signed-off-by: Michal Szadkowski * Remove validation for the length of ManagedBy field Signed-off-by: Michal Szadkowski * Update after code review Signed-off-by: Michal Szadkowski * Update ManagedBy comment Signed-off-by: Michal Szadkowski * E2E tests for managedBy Signed-off-by: Michal Szadkowski * Update generated files and manifests Signed-off-by: Michal Szadkowski * Rework after code review Signed-off-by: Michal Szadkowski * Revert kustomization change Signed-off-by: Michal Szadkowski * Update job_test and logging Signed-off-by: Michal Szadkowski * Provide immutability check for ManagedBy Signed-off-by: Michal Szadkowski * Avoid making copy of runPolicy Signed-off-by: Michal Szadkowski * Split RunPolicy validators to Update and Create Signed-off-by: Michal Szadkowski * Fix the naming and call validate always Signed-off-by: Michal Szadkowski * Update tests Signed-off-by: Michal Szadkowski --------- Signed-off-by: Michal Szadkowski --- docs/api/kubeflow.org_v1_generated.asciidoc | 8 ++ hack/python-sdk/swagger.json | 4 + manifests/base/crds/kubeflow.org_jaxjobs.yaml | 10 +++ manifests/base/crds/kubeflow.org_mpijobs.yaml | 10 +++ .../base/crds/kubeflow.org_paddlejobs.yaml | 10 +++ .../base/crds/kubeflow.org_pytorchjobs.yaml | 10 +++ manifests/base/crds/kubeflow.org_tfjobs.yaml | 10 +++ .../base/crds/kubeflow.org_xgboostjobs.yaml | 10 +++ pkg/apis/kubeflow.org/v1/common_types.go | 16 ++++ pkg/apis/kubeflow.org/v1/openapi_generated.go | 7 ++ .../kubeflow.org/v1/zz_generated.deepcopy.go | 5 ++ pkg/common/util/webhooks.go | 32 ++++++++ pkg/controller.v1/common/job.go | 7 ++ pkg/controller.v1/common/job_test.go | 42 ++++++++++ pkg/controller.v1/mpi/mpijob_controller.go | 5 ++ .../mpi/mpijob_controller_test.go | 61 +++++++++++++++ .../paddlepaddle/paddlepaddle_controller.go | 5 ++ .../paddlepaddle_controller_test.go | 61 +++++++++++++++ .../pytorch/pytorchjob_controller.go | 5 ++ .../pytorch/pytorchjob_controller_test.go | 63 +++++++++++++++ .../tensorflow/tfjob_controller.go | 5 ++ .../tensorflow/tfjob_controller_test.go | 61 +++++++++++++++ .../xgboost/xgboostjob_controller.go | 5 ++ .../xgboost/xgboostjob_controller_test.go | 61 +++++++++++++++ .../paddlepaddle/paddlepaddle_webhook.go | 24 +++--- .../paddlepaddle/paddlepaddle_webhook_test.go | 24 +++++- pkg/webhooks/pytorch/pytorchjob_webhook.go | 24 +++--- .../pytorch/pytorchjob_webhook_test.go | 59 +++++++++++++- pkg/webhooks/tensorflow/tfjob_webhook.go | 24 +++--- pkg/webhooks/tensorflow/tfjob_webhook_test.go | 24 +++++- pkg/webhooks/xgboost/xgboostjob_webhook.go | 25 +++--- .../xgboost/xgboostjob_webhook_test.go | 24 +++++- sdk/python/docs/KubeflowOrgV1RunPolicy.md | 1 + .../models/kubeflow_org_v1_run_policy.py | 30 ++++++- sdk/python/test/e2e/test_e2e_pytorchjob.py | 78 +++++++++++++++++++ .../test/test_kubeflow_org_v1_jax_job.py | 1 + .../test/test_kubeflow_org_v1_jax_job_list.py | 2 + .../test/test_kubeflow_org_v1_jax_job_spec.py | 2 + .../test/test_kubeflow_org_v1_mpi_job.py | 1 + .../test/test_kubeflow_org_v1_mpi_job_list.py | 2 + .../test/test_kubeflow_org_v1_mpi_job_spec.py | 1 + .../test/test_kubeflow_org_v1_paddle_job.py | 1 + .../test_kubeflow_org_v1_paddle_job_list.py | 2 + .../test_kubeflow_org_v1_paddle_job_spec.py | 2 + .../test/test_kubeflow_org_v1_py_torch_job.py | 1 + .../test_kubeflow_org_v1_py_torch_job_list.py | 2 + .../test_kubeflow_org_v1_py_torch_job_spec.py | 2 + .../test/test_kubeflow_org_v1_run_policy.py | 1 + .../test/test_kubeflow_org_v1_tf_job.py | 1 + .../test/test_kubeflow_org_v1_tf_job_list.py | 2 + .../test/test_kubeflow_org_v1_tf_job_spec.py | 2 + .../test/test_kubeflow_org_v1_xg_boost_job.py | 1 + .../test_kubeflow_org_v1_xg_boost_job_list.py | 2 + .../test_kubeflow_org_v1_xg_boost_job_spec.py | 2 + 54 files changed, 835 insertions(+), 45 deletions(-) create mode 100644 pkg/common/util/webhooks.go diff --git a/docs/api/kubeflow.org_v1_generated.asciidoc b/docs/api/kubeflow.org_v1_generated.asciidoc index 3648d52e4d..a145d9f738 100644 --- a/docs/api/kubeflow.org_v1_generated.asciidoc +++ b/docs/api/kubeflow.org_v1_generated.asciidoc @@ -677,6 +677,14 @@ Suspending a Job will reset the StartTime field of the Job. Defaults to false. +| *`managedBy`* __string__ | ManagedBy is used to indicate the controller or entity that manages a job. +The value must be either an empty, 'kubeflow.org/training-operator' or +'kueue.x-k8s.io/multikueue'. +The training-operator reconciles a job which doesn't have this +field at all or the field value is the reserved string +'kubeflow.org/training-operator', but delegates reconciling the job +with 'kueue.x-k8s.io/multikueue' to the Kueue. +The field is immutable. |=== diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index a158dd8748..bb5632167a 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -575,6 +575,10 @@ "description": "CleanPodPolicy defines the policy to kill pods after the job completes. Default to None.", "type": "string" }, + "managedBy": { + "description": "ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/training-operator' or 'kueue.x-k8s.io/multikueue'. The training-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/training-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.", + "type": "string" + }, "schedulingPolicy": { "description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling", "$ref": "#/definitions/kubeflow.org.v1.SchedulingPolicy" diff --git a/manifests/base/crds/kubeflow.org_jaxjobs.yaml b/manifests/base/crds/kubeflow.org_jaxjobs.yaml index 79cf2c6128..0131e051e5 100644 --- a/manifests/base/crds/kubeflow.org_jaxjobs.yaml +++ b/manifests/base/crds/kubeflow.org_jaxjobs.yaml @@ -7306,6 +7306,16 @@ spec: CleanPodPolicy defines the policy to kill pods after the job completes. Default to None. type: string + managedBy: + description: |- + ManagedBy is used to indicate the controller or entity that manages a job. + The value must be either an empty, 'kubeflow.org/training-operator' or + 'kueue.x-k8s.io/multikueue'. + The training-operator reconciles a job which doesn't have this + field at all or the field value is the reserved string + 'kubeflow.org/training-operator', but delegates reconciling the job + with 'kueue.x-k8s. + type: string schedulingPolicy: description: SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling diff --git a/manifests/base/crds/kubeflow.org_mpijobs.yaml b/manifests/base/crds/kubeflow.org_mpijobs.yaml index 4139d49fef..6bbdf671ad 100644 --- a/manifests/base/crds/kubeflow.org_mpijobs.yaml +++ b/manifests/base/crds/kubeflow.org_mpijobs.yaml @@ -7311,6 +7311,16 @@ spec: CleanPodPolicy defines the policy to kill pods after the job completes. Default to None. type: string + managedBy: + description: |- + ManagedBy is used to indicate the controller or entity that manages a job. + The value must be either an empty, 'kubeflow.org/training-operator' or + 'kueue.x-k8s.io/multikueue'. + The training-operator reconciles a job which doesn't have this + field at all or the field value is the reserved string + 'kubeflow.org/training-operator', but delegates reconciling the job + with 'kueue.x-k8s. + type: string schedulingPolicy: description: SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling diff --git a/manifests/base/crds/kubeflow.org_paddlejobs.yaml b/manifests/base/crds/kubeflow.org_paddlejobs.yaml index c8a5acb8c3..6c69ba882b 100644 --- a/manifests/base/crds/kubeflow.org_paddlejobs.yaml +++ b/manifests/base/crds/kubeflow.org_paddlejobs.yaml @@ -7793,6 +7793,16 @@ spec: CleanPodPolicy defines the policy to kill pods after the job completes. Default to None. type: string + managedBy: + description: |- + ManagedBy is used to indicate the controller or entity that manages a job. + The value must be either an empty, 'kubeflow.org/training-operator' or + 'kueue.x-k8s.io/multikueue'. + The training-operator reconciles a job which doesn't have this + field at all or the field value is the reserved string + 'kubeflow.org/training-operator', but delegates reconciling the job + with 'kueue.x-k8s. + type: string schedulingPolicy: description: SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling diff --git a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml index a5c261e124..b4eabcaceb 100644 --- a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml +++ b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml @@ -7830,6 +7830,16 @@ spec: CleanPodPolicy defines the policy to kill pods after the job completes. Default to None. type: string + managedBy: + description: |- + ManagedBy is used to indicate the controller or entity that manages a job. + The value must be either an empty, 'kubeflow.org/training-operator' or + 'kueue.x-k8s.io/multikueue'. + The training-operator reconciles a job which doesn't have this + field at all or the field value is the reserved string + 'kubeflow.org/training-operator', but delegates reconciling the job + with 'kueue.x-k8s. + type: string schedulingPolicy: description: SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling diff --git a/manifests/base/crds/kubeflow.org_tfjobs.yaml b/manifests/base/crds/kubeflow.org_tfjobs.yaml index 153243aae6..5b1bd84fe0 100644 --- a/manifests/base/crds/kubeflow.org_tfjobs.yaml +++ b/manifests/base/crds/kubeflow.org_tfjobs.yaml @@ -71,6 +71,16 @@ spec: CleanPodPolicy defines the policy to kill pods after the job completes. Default to None. type: string + managedBy: + description: |- + ManagedBy is used to indicate the controller or entity that manages a job. + The value must be either an empty, 'kubeflow.org/training-operator' or + 'kueue.x-k8s.io/multikueue'. + The training-operator reconciles a job which doesn't have this + field at all or the field value is the reserved string + 'kubeflow.org/training-operator', but delegates reconciling the job + with 'kueue.x-k8s. + type: string schedulingPolicy: description: SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling diff --git a/manifests/base/crds/kubeflow.org_xgboostjobs.yaml b/manifests/base/crds/kubeflow.org_xgboostjobs.yaml index 34237c6aba..4cdceca6ed 100644 --- a/manifests/base/crds/kubeflow.org_xgboostjobs.yaml +++ b/manifests/base/crds/kubeflow.org_xgboostjobs.yaml @@ -67,6 +67,16 @@ spec: CleanPodPolicy defines the policy to kill pods after the job completes. Default to None. type: string + managedBy: + description: |- + ManagedBy is used to indicate the controller or entity that manages a job. + The value must be either an empty, 'kubeflow.org/training-operator' or + 'kueue.x-k8s.io/multikueue'. + The training-operator reconciles a job which doesn't have this + field at all or the field value is the reserved string + 'kubeflow.org/training-operator', but delegates reconciling the job + with 'kueue.x-k8s. + type: string schedulingPolicy: description: SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling diff --git a/pkg/apis/kubeflow.org/v1/common_types.go b/pkg/apis/kubeflow.org/v1/common_types.go index 42fd1d0dad..59923b4da7 100644 --- a/pkg/apis/kubeflow.org/v1/common_types.go +++ b/pkg/apis/kubeflow.org/v1/common_types.go @@ -35,6 +35,12 @@ const ( // JobRoleLabel represents the label key for the job role, e.g. master. JobRoleLabel = "training.kubeflow.org/job-role" + + // KubeflowJobsController represents the value of the default jobs controller + KubeflowJobsController = "kubeflow.org/training-operator" + + // MultiKueueController represents the MultiKueue controller + MultiKueueController = "kueue.x-k8s.io/multikueue" ) // JobStatus represents the current observed state of the training Job. @@ -221,6 +227,16 @@ type RunPolicy struct { // +kubebuilder:default:=false // +optional Suspend *bool `json:"suspend,omitempty"` + + // ManagedBy is used to indicate the controller or entity that manages a job. + // The value must be either an empty, 'kubeflow.org/training-operator' or + // 'kueue.x-k8s.io/multikueue'. + // The training-operator reconciles a job which doesn't have this + // field at all or the field value is the reserved string + // 'kubeflow.org/training-operator', but delegates reconciling the job + // with 'kueue.x-k8s.io/multikueue' to the Kueue. + // The field is immutable. + ManagedBy *string `json:"managedBy,omitempty"` } // SchedulingPolicy encapsulates various scheduling policies of the distributed training diff --git a/pkg/apis/kubeflow.org/v1/openapi_generated.go b/pkg/apis/kubeflow.org/v1/openapi_generated.go index d6f32b1142..560f2b278a 100644 --- a/pkg/apis/kubeflow.org/v1/openapi_generated.go +++ b/pkg/apis/kubeflow.org/v1/openapi_generated.go @@ -1063,6 +1063,13 @@ func schema_pkg_apis_kubefloworg_v1_RunPolicy(ref common.ReferenceCallback) comm Format: "", }, }, + "managedBy": { + SchemaProps: spec.SchemaProps{ + Description: "ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/training-operator' or 'kueue.x-k8s.io/multikueue'. The training-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/training-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.", + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go b/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go index 79ca131cd5..52ce7b75e6 100644 --- a/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go @@ -680,6 +680,11 @@ func (in *RunPolicy) DeepCopyInto(out *RunPolicy) { *out = new(bool) **out = **in } + if in.ManagedBy != nil { + in, out := &in.ManagedBy, &out.ManagedBy + *out = new(string) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunPolicy. diff --git a/pkg/common/util/webhooks.go b/pkg/common/util/webhooks.go new file mode 100644 index 0000000000..aa4031cffe --- /dev/null +++ b/pkg/common/util/webhooks.go @@ -0,0 +1,32 @@ +package util + +import ( + v1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + + apivalidation "k8s.io/apimachinery/pkg/api/validation" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation/field" +) + +var supportedJobControllers = sets.New( + v1.MultiKueueController, + v1.KubeflowJobsController) + +func ValidateRunPolicy(runPolicy *v1.RunPolicy) field.ErrorList { + errs := field.ErrorList{} + if runPolicy.ManagedBy != nil { + manager := *runPolicy.ManagedBy + if !supportedJobControllers.Has(manager) { + fieldPath := field.NewPath("spec", "runPolicy", "managedBy") + errs = append(errs, field.NotSupported(fieldPath, manager, supportedJobControllers.UnsortedList())) + } + } + return errs +} + +func ValidateRunPolicyUpdate(oldRunPolicy, newRunPolicy *v1.RunPolicy) field.ErrorList { + oldManager := oldRunPolicy.ManagedBy + newManager := newRunPolicy.ManagedBy + fieldPath := field.NewPath("spec", "runPolicy", "managedBy") + return apivalidation.ValidateImmutableField(newManager, oldManager, fieldPath) +} diff --git a/pkg/controller.v1/common/job.go b/pkg/controller.v1/common/job.go index d8a494dd0c..ee750c2d1d 100644 --- a/pkg/controller.v1/common/job.go +++ b/pkg/controller.v1/common/job.go @@ -455,3 +455,10 @@ func (jc *JobController) CleanupJob(runPolicy *apiv1.RunPolicy, jobStatus apiv1. func (jc *JobController) calcPGMinResources(minMember int32, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) *corev1.ResourceList { return CalcPGMinResources(minMember, replicas, jc.PriorityClassLister.Get) } + +func (jc *JobController) ManagedByExternalController(controllerName *string) *string { + if controllerName != nil && *controllerName != apiv1.KubeflowJobsController { + return controllerName + } + return nil +} diff --git a/pkg/controller.v1/common/job_test.go b/pkg/controller.v1/common/job_test.go index b204cc2e89..ca948b788b 100644 --- a/pkg/controller.v1/common/job_test.go +++ b/pkg/controller.v1/common/job_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" ) func TestDeletePodsAndServices(T *testing.T) { @@ -219,6 +220,47 @@ func TestPastActiveDeadline(T *testing.T) { } } +func TestManagedByExternalController(T *testing.T) { + cases := map[string]struct { + managedBy *string + wantControllerName *string + }{ + "managedBy is nil": { + managedBy: nil, + wantControllerName: nil, + }, + "managedBy is empty": { + managedBy: ptr.To[string](""), + wantControllerName: ptr.To[string](""), + }, + "managedBy is training-operator controller": { + managedBy: ptr.To[string](apiv1.KubeflowJobsController), + wantControllerName: nil, + }, + "managedBy is not the training-operator controller": { + managedBy: ptr.To[string]("kueue.x-k8s.io/multikueue"), + wantControllerName: ptr.To[string]("kueue.x-k8s.io/multikueue"), + }, + "managedBy is other value": { + managedBy: ptr.To[string]("other-job-controller"), + wantControllerName: ptr.To[string]("other-job-controller"), + }, + } + for name, tc := range cases { + T.Run(name, func(t *testing.T) { + jobController := JobController{} + runPolicy := &apiv1.RunPolicy{ + ManagedBy: tc.managedBy, + } + + gotControllerName := jobController.ManagedByExternalController(runPolicy.ManagedBy) + if diff := cmp.Diff(tc.wantControllerName, gotControllerName); diff != "" { + t.Errorf("Unexpected manager controller (-want +got):\n%s", diff) + } + }) + } +} + func newPod(name string, phase corev1.PodPhase) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/controller.v1/mpi/mpijob_controller.go b/pkg/controller.v1/mpi/mpijob_controller.go index 0067692a66..e59fd15d66 100644 --- a/pkg/controller.v1/mpi/mpijob_controller.go +++ b/pkg/controller.v1/mpi/mpijob_controller.go @@ -135,6 +135,11 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, client.IgnoreNotFound(err) } + if manager := jc.ManagedByExternalController(mpijob.Spec.RunPolicy.ManagedBy); manager != nil { + logger.Info("Skipping MPIJob managed by a custom controller", "managed-by", manager) + return ctrl.Result{}, nil + } + if err = kubeflowv1.ValidateV1MpiJobSpec(&mpijob.Spec); err != nil { logger.Error(err, "MPIJob failed validation") jc.Recorder.Eventf(mpijob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedValidationReason), diff --git a/pkg/controller.v1/mpi/mpijob_controller_test.go b/pkg/controller.v1/mpi/mpijob_controller_test.go index 2c63e484f2..024cbaaec1 100644 --- a/pkg/controller.v1/mpi/mpijob_controller_test.go +++ b/pkg/controller.v1/mpi/mpijob_controller_test.go @@ -1066,6 +1066,67 @@ var _ = Describe("MPIJob controller", func() { By("Checking if the startTime is updated") Expect(created.Status.StartTime).ShouldNot(Equal(startTimeBeforeSuspended)) }) + + It("Should not reconcile a job while managed by external controller", func() { + By("Creating a MPIJob managed by external controller") + job.Spec.RunPolicy = kubeflowv1.RunPolicy{ + ManagedBy: ptr.To(kubeflowv1.MultiKueueController), + } + job.Spec.RunPolicy.Suspend = ptr.To(true) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.MPIJob{} + By("Checking created MPIJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Checking created MPIJob has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the pods and services aren't created") + Consistently(func() bool { + launcherPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + launcherSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + errMasterPod := testK8sClient.Get(ctx, launcherKey, launcherPod) + errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod) + errMasterSvc := testK8sClient.Get(ctx, launcherKey, launcherSvc) + errWorkerSvc := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) && + errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue(), "pods and services should be created by external controller (here not existent)") + + By("Checking if the MPIJob status was not updated") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil))) + + By("Unsuspending the MPIJob") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = ptr.To(false) + return testK8sClient.Update(ctx, created) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking created MPIJob still has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the MPIJob status was not updated, even after unsuspending") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil))) + }) }) }) diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go index 7cae58e9c5..b453f0eb8e 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go @@ -131,6 +131,11 @@ func (r *PaddleJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, client.IgnoreNotFound(err) } + if manager := r.ManagedByExternalController(paddlejob.Spec.RunPolicy.ManagedBy); manager != nil { + logger.Info("Skipping PaddleJob managed by a custom controller", "managed-by", manager) + return ctrl.Result{}, nil + } + // Check if reconciliation is needed jobKey, err := common.KeyFunc(paddlejob) if err != nil { diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_test.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_test.go index 493acf1ad5..72a851642a 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_test.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_test.go @@ -430,6 +430,67 @@ var _ = Describe("PaddleJob controller", func() { By("Checking if the startTime is updated") Expect(created.Status.StartTime).ShouldNot(Equal(startTimeBeforeSuspended)) }) + + It("Should not reconcile a job while managed by external controller", func() { + By("Creating a PaddleJob managed by external controller") + job.Spec.RunPolicy = kubeflowv1.RunPolicy{ + ManagedBy: ptr.To(kubeflowv1.MultiKueueController), + } + job.Spec.RunPolicy.Suspend = ptr.To(true) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.PaddleJob{} + By("Checking created PaddleJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Checking created PaddleJob has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the pods and services aren't created") + Consistently(func() bool { + masterPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + masterSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + errMasterPod := testK8sClient.Get(ctx, masterKey, masterPod) + errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod) + errMasterSvc := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorkerSvc := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) && + errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue(), "pods and services should be created by external controller (here not existent)") + + By("Checking if the PaddleJob status was not updated") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil))) + + By("Unsuspending the PaddleJob") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = ptr.To(false) + return testK8sClient.Update(ctx, created) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking created PaddleJob still has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the PaddleJob status was not updated, even after unsuspending") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil))) + }) }) }) diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller.go b/pkg/controller.v1/pytorch/pytorchjob_controller.go index 7025c24396..a5effc5cc5 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller.go @@ -132,6 +132,11 @@ func (r *PyTorchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, client.IgnoreNotFound(err) } + if manager := r.ManagedByExternalController(pytorchjob.Spec.RunPolicy.ManagedBy); manager != nil { + logger.Info("Skipping PyTorchJob managed by a custom controller", "managed-by", manager) + return ctrl.Result{}, nil + } + // Check if reconciliation is needed jobKey, err := common.KeyFunc(pytorchjob) if err != nil { diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go index 0e97661f6b..37b4c9218c 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go @@ -466,6 +466,69 @@ var _ = Describe("PyTorchJob controller", func() { By("Checking if the startTime is updated") Expect(created.Status.StartTime).ShouldNot(Equal(startTimeBeforeSuspended)) }) + + It("Should not reconcile a job while managed by external controller", func() { + By("Creating a PyTorchJob managed by external controller") + job.Spec.RunPolicy = kubeflowv1.RunPolicy{ + ManagedBy: ptr.To(kubeflowv1.MultiKueueController), + } + job.Spec.RunPolicy.Suspend = ptr.To(true) + job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas = ptr.To[int32](1) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.PyTorchJob{} + By("Checking created PyTorchJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Checking created PyTorchJob has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the pods and services aren't created") + Consistently(func() bool { + masterPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + masterSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + errMasterPod := testK8sClient.Get(ctx, masterKey, masterPod) + errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod) + errMasterSvc := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorkerSvc := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) && + errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue(), "pods and services should be created by external controller (here not existent)") + + By("Checking if the PyTorchJob status was not updated") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil))) + + By("Unsuspending the PyTorchJob") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = ptr.To(false) + return testK8sClient.Update(ctx, created) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking created PyTorchJob still has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the PyTorchJob status was not updated, even after unsuspending") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil))) + + }) }) Context("When creating the elastic PyTorchJob", func() { diff --git a/pkg/controller.v1/tensorflow/tfjob_controller.go b/pkg/controller.v1/tensorflow/tfjob_controller.go index a60d65affe..8447d02f55 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller.go @@ -127,6 +127,11 @@ func (r *TFJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, client.IgnoreNotFound(err) } + if manager := r.ManagedByExternalController(tfjob.Spec.RunPolicy.ManagedBy); manager != nil { + logger.Info("Skipping TFJob managed by a custom controller", "managed-by", manager) + return ctrl.Result{}, nil + } + // Check if reconciliation is needed jobKey, err := common.KeyFunc(tfjob) if err != nil { diff --git a/pkg/controller.v1/tensorflow/tfjob_controller_test.go b/pkg/controller.v1/tensorflow/tfjob_controller_test.go index 1d08aba0c9..53265e358c 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller_test.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller_test.go @@ -606,5 +606,66 @@ var _ = Describe("TFJob controller", func() { By("Checking if the startTime is updated") Expect(created.Status.StartTime).ShouldNot(Equal(startTimeBeforeSuspended)) }) + + It("Should not reconcile a job while managed by external controller", func() { + By("Creating a TFJob managed by external controller") + job.Spec.RunPolicy = kubeflowv1.RunPolicy{ + ManagedBy: ptr.To(kubeflowv1.MultiKueueController), + } + job.Spec.RunPolicy.Suspend = ptr.To(true) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.TFJob{} + By("Checking created TFJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Checking created TFJob has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the pods and services aren't created") + Consistently(func() bool { + chiefPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + chiefSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + errMasterPod := testK8sClient.Get(ctx, chiefKey, chiefPod) + errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod) + errMasterSvc := testK8sClient.Get(ctx, chiefKey, chiefSvc) + errWorkerSvc := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) && + errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue(), "pods and services should be created by external controller (here not existent)") + + By("Checking if the TFJob status was not updated") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil))) + + By("Unsuspending the TFJob") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = ptr.To(false) + return testK8sClient.Update(ctx, created) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking created TFJob still has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the TFJob status was not updated, even after unsuspending") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil))) + }) }) }) diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller.go b/pkg/controller.v1/xgboost/xgboostjob_controller.go index fb860e462f..4733515bbe 100644 --- a/pkg/controller.v1/xgboost/xgboostjob_controller.go +++ b/pkg/controller.v1/xgboost/xgboostjob_controller.go @@ -130,6 +130,11 @@ func (r *XGBoostJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, client.IgnoreNotFound(err) } + if manager := r.ManagedByExternalController(xgboostjob.Spec.RunPolicy.ManagedBy); manager != nil { + logger.Info("Skipping XGBoostJob managed by a custom controller", "managed-by", manager) + return ctrl.Result{}, nil + } + // Check reconcile is required. jobKey, err := common.KeyFunc(xgboostjob) if err != nil { diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller_test.go b/pkg/controller.v1/xgboost/xgboostjob_controller_test.go index 7bf1ef8c84..b1a1fce9c6 100644 --- a/pkg/controller.v1/xgboost/xgboostjob_controller_test.go +++ b/pkg/controller.v1/xgboost/xgboostjob_controller_test.go @@ -359,6 +359,67 @@ var _ = Describe("XGBoost controller", func() { By("Checking if the startTime is updated") Expect(created.Status.StartTime).ShouldNot(Equal(startTimeBeforeSuspended)) }) + + It("Should not reconcile a job while managed by external controller", func() { + By("Creating a XGBoostJob managed by external controller") + job.Spec.RunPolicy = kubeflowv1.RunPolicy{ + ManagedBy: ptr.To(kubeflowv1.MultiKueueController), + } + job.Spec.RunPolicy.Suspend = ptr.To(true) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.XGBoostJob{} + By("Checking created XGBoostJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Checking created XGBoostJob has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the pods and services aren't created") + Consistently(func() bool { + masterPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + masterSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + errMasterPod := testK8sClient.Get(ctx, masterKey, masterPod) + errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod) + errMasterSvc := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorkerSvc := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) && + errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue(), "pods and services should be created by external controller (here not existent)") + + By("Checking if the XGBoostJob status was not updated") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil))) + + By("Unsuspending the XGBoostJob") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = ptr.To(false) + return testK8sClient.Update(ctx, created) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking created XGBoostJob still has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the XGBoostJob status was not updated, even after unsuspending") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil))) + }) }) }) diff --git a/pkg/webhooks/paddlepaddle/paddlepaddle_webhook.go b/pkg/webhooks/paddlepaddle/paddlepaddle_webhook.go index dab25d419a..fedc95b5f7 100644 --- a/pkg/webhooks/paddlepaddle/paddlepaddle_webhook.go +++ b/pkg/webhooks/paddlepaddle/paddlepaddle_webhook.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" trainingoperator "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "github.com/kubeflow/training-operator/pkg/common/util" ) var ( @@ -55,26 +56,31 @@ func (w Webhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admiss job := obj.(*trainingoperator.PaddleJob) log := ctrl.LoggerFrom(ctx).WithName("paddlejob-webhook") log.V(5).Info("Validating create", "paddleJob", klog.KObj(job)) - return nil, validatePaddleJob(job).ToAggregate() + return nil, validatePaddleJob(nil, job).ToAggregate() } -func (w Webhook) ValidateUpdate(ctx context.Context, _, newObj runtime.Object) (admission.Warnings, error) { - job := newObj.(*trainingoperator.PaddleJob) +func (w Webhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { + oldJob := oldObj.(*trainingoperator.PaddleJob) + newJob := newObj.(*trainingoperator.PaddleJob) log := ctrl.LoggerFrom(ctx).WithName("paddlejob-webhook") - log.V(5).Info("Validating update", "paddleJob", klog.KObj(job)) - return nil, validatePaddleJob(job).ToAggregate() + log.V(5).Info("Validating update", "paddleJob", klog.KObj(newJob)) + return nil, validatePaddleJob(oldJob, newJob).ToAggregate() } func (w Webhook) ValidateDelete(context.Context, runtime.Object) (admission.Warnings, error) { return nil, nil } -func validatePaddleJob(job *trainingoperator.PaddleJob) field.ErrorList { +func validatePaddleJob(oldJob, newJob *trainingoperator.PaddleJob) field.ErrorList { var allErrs field.ErrorList - if errors := apimachineryvalidation.NameIsDNS1035Label(job.Name, false); len(errors) != 0 { - allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("name"), job.Name, fmt.Sprintf("should match: %v", strings.Join(errors, ",")))) + if errors := apimachineryvalidation.NameIsDNS1035Label(newJob.Name, false); len(errors) != 0 { + allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("name"), newJob.Name, fmt.Sprintf("should match: %v", strings.Join(errors, ",")))) } - allErrs = append(allErrs, validateSpec(job.Spec.PaddleReplicaSpecs)...) + if oldJob != nil { + allErrs = append(allErrs, util.ValidateRunPolicyUpdate(&oldJob.Spec.RunPolicy, &newJob.Spec.RunPolicy)...) + } + allErrs = append(allErrs, util.ValidateRunPolicy(&newJob.Spec.RunPolicy)...) + allErrs = append(allErrs, validateSpec(newJob.Spec.PaddleReplicaSpecs)...) return allErrs } diff --git a/pkg/webhooks/paddlepaddle/paddlepaddle_webhook_test.go b/pkg/webhooks/paddlepaddle/paddlepaddle_webhook_test.go index 087aa5d87e..10cc0ea053 100644 --- a/pkg/webhooks/paddlepaddle/paddlepaddle_webhook_test.go +++ b/pkg/webhooks/paddlepaddle/paddlepaddle_webhook_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/utils/ptr" @@ -65,6 +66,9 @@ func TestValidateV1PaddleJob(t *testing.T) { Name: "test", }, Spec: trainingoperator.PaddleJobSpec{ + RunPolicy: trainingoperator.RunPolicy{ + ManagedBy: ptr.To(trainingoperator.KubeflowJobsController), + }, PaddleReplicaSpecs: validPaddleReplicaSpecs, }, }, @@ -169,10 +173,28 @@ func TestValidateV1PaddleJob(t *testing.T) { field.Required(paddleReplicaSpecPath, ""), }, }, + "attempt to set unsupported managedBy controller name gets rejected": { + paddleJob: &trainingoperator.PaddleJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: trainingoperator.PaddleJobSpec{ + RunPolicy: trainingoperator.RunPolicy{ + ManagedBy: ptr.To("other-job-controller"), + }, + PaddleReplicaSpecs: validPaddleReplicaSpecs, + }, + }, + wantErr: field.ErrorList{ + field.NotSupported(field.NewPath("spec", "runPolicy", "managedBy"), "", sets.List(sets.New( + trainingoperator.MultiKueueController, + trainingoperator.KubeflowJobsController))), + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - got := validatePaddleJob(tc.paddleJob) + got := validatePaddleJob(nil, tc.paddleJob) if diff := cmp.Diff(tc.wantErr, got, cmpopts.IgnoreFields(field.Error{}, "Detail", "BadValue")); len(diff) != 0 { t.Errorf("Unexpected error (-want,+got):\n%s", diff) } diff --git a/pkg/webhooks/pytorch/pytorchjob_webhook.go b/pkg/webhooks/pytorch/pytorchjob_webhook.go index 14d7b5c0eb..2459815935 100644 --- a/pkg/webhooks/pytorch/pytorchjob_webhook.go +++ b/pkg/webhooks/pytorch/pytorchjob_webhook.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" trainingoperator "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "github.com/kubeflow/training-operator/pkg/common/util" ) var ( @@ -55,15 +56,16 @@ func (w *Webhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admis job := obj.(*trainingoperator.PyTorchJob) log := ctrl.LoggerFrom(ctx).WithName("pytorchjob-webhook") log.V(5).Info("Validating create", "pytorchJob", klog.KObj(job)) - warnings, errs := validatePyTorchJob(job) + warnings, errs := validatePyTorchJob(nil, job) return warnings, errs.ToAggregate() } -func (w *Webhook) ValidateUpdate(ctx context.Context, _ runtime.Object, newObj runtime.Object) (admission.Warnings, error) { - job := newObj.(*trainingoperator.PyTorchJob) +func (w *Webhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { + oldJob := newObj.(*trainingoperator.PyTorchJob) + newJob := newObj.(*trainingoperator.PyTorchJob) log := ctrl.LoggerFrom(ctx).WithName("pytorchjob-webhook") - log.V(5).Info("Validating update", "pytorchJob", klog.KObj(job)) - warnings, errs := validatePyTorchJob(job) + log.V(5).Info("Validating update", "pytorchJob", klog.KObj(newJob)) + warnings, errs := validatePyTorchJob(oldJob, newJob) return warnings, errs.ToAggregate() } @@ -71,14 +73,18 @@ func (w *Webhook) ValidateDelete(context.Context, runtime.Object) (admission.War return nil, nil } -func validatePyTorchJob(job *trainingoperator.PyTorchJob) (admission.Warnings, field.ErrorList) { +func validatePyTorchJob(oldJob, newJob *trainingoperator.PyTorchJob) (admission.Warnings, field.ErrorList) { var allErrs field.ErrorList var warnings admission.Warnings - if errors := apimachineryvalidation.NameIsDNS1035Label(job.ObjectMeta.Name, false); len(errors) != 0 { - allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("name"), job.Name, fmt.Sprintf("should match: %v", strings.Join(errors, ",")))) + if errors := apimachineryvalidation.NameIsDNS1035Label(newJob.ObjectMeta.Name, false); len(errors) != 0 { + allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("name"), newJob.Name, fmt.Sprintf("should match: %v", strings.Join(errors, ",")))) } - ws, err := validateSpec(job.Spec) + if oldJob != nil { + allErrs = append(allErrs, util.ValidateRunPolicyUpdate(&oldJob.Spec.RunPolicy, &newJob.Spec.RunPolicy)...) + } + allErrs = append(allErrs, util.ValidateRunPolicy(&newJob.Spec.RunPolicy)...) + ws, err := validateSpec(newJob.Spec) warnings = append(warnings, ws...) allErrs = append(allErrs, err...) return warnings, allErrs diff --git a/pkg/webhooks/pytorch/pytorchjob_webhook_test.go b/pkg/webhooks/pytorch/pytorchjob_webhook_test.go index 362a6d91e9..7757e36b3e 100644 --- a/pkg/webhooks/pytorch/pytorchjob_webhook_test.go +++ b/pkg/webhooks/pytorch/pytorchjob_webhook_test.go @@ -23,7 +23,9 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" + apivalidation "k8s.io/apimachinery/pkg/api/validation" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -72,9 +74,10 @@ func TestValidateV1PyTorchJob(t *testing.T) { } testCases := map[string]struct { - pytorchJob *trainingoperator.PyTorchJob - wantErr field.ErrorList - wantWarnings admission.Warnings + pytorchJob *trainingoperator.PyTorchJob + oldPytorchJob *trainingoperator.PyTorchJob + wantErr field.ErrorList + wantWarnings admission.Warnings }{ "valid PyTorchJob": { pytorchJob: &trainingoperator.PyTorchJob{ @@ -82,6 +85,9 @@ func TestValidateV1PyTorchJob(t *testing.T) { Name: "test", }, Spec: trainingoperator.PyTorchJobSpec{ + RunPolicy: trainingoperator.RunPolicy{ + ManagedBy: ptr.To(trainingoperator.KubeflowJobsController), + }, PyTorchReplicaSpecs: validPyTorchReplicaSpecs, }, }, @@ -284,11 +290,56 @@ func TestValidateV1PyTorchJob(t *testing.T) { specPath.Child("elasticPolicy").Child("nProcPerNode"), specPath.Child("nprocPerNode")), }, }, + "attempt to set unsupported managedBy controller name gets rejected": { + pytorchJob: &trainingoperator.PyTorchJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: trainingoperator.PyTorchJobSpec{ + RunPolicy: trainingoperator.RunPolicy{ + ManagedBy: ptr.To("other-job-controller"), + }, + PyTorchReplicaSpecs: validPyTorchReplicaSpecs, + }, + }, + wantErr: field.ErrorList{ + field.NotSupported(field.NewPath("spec", "runPolicy", "managedBy"), "", sets.List(sets.New( + trainingoperator.MultiKueueController, + trainingoperator.KubeflowJobsController))), + }, + }, + "attempt to update the managedBy field gets rejected": { + oldPytorchJob: &trainingoperator.PyTorchJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: trainingoperator.PyTorchJobSpec{ + RunPolicy: trainingoperator.RunPolicy{ + ManagedBy: ptr.To(trainingoperator.KubeflowJobsController), + }, + PyTorchReplicaSpecs: validPyTorchReplicaSpecs, + }, + }, + pytorchJob: &trainingoperator.PyTorchJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: trainingoperator.PyTorchJobSpec{ + RunPolicy: trainingoperator.RunPolicy{ + ManagedBy: ptr.To(trainingoperator.MultiKueueController), + }, + PyTorchReplicaSpecs: validPyTorchReplicaSpecs, + }, + }, + wantErr: field.ErrorList{ + field.Invalid(field.NewPath("spec", "runPolicy", "managedBy"), trainingoperator.MultiKueueController, apivalidation.FieldImmutableErrorMsg), + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - gotWarnings, gotError := validatePyTorchJob(tc.pytorchJob) + gotWarnings, gotError := validatePyTorchJob(tc.oldPytorchJob, tc.pytorchJob) if diff := cmp.Diff(tc.wantWarnings, gotWarnings, cmpopts.SortSlices(func(a, b string) bool { return a < b })); len(diff) != 0 { t.Errorf("Unexpected warnings (-want,+got):\n%s", diff) } diff --git a/pkg/webhooks/tensorflow/tfjob_webhook.go b/pkg/webhooks/tensorflow/tfjob_webhook.go index 9e9629a1b8..95f187f44f 100644 --- a/pkg/webhooks/tensorflow/tfjob_webhook.go +++ b/pkg/webhooks/tensorflow/tfjob_webhook.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" trainingoperator "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "github.com/kubeflow/training-operator/pkg/common/util" ) var ( @@ -54,26 +55,31 @@ func (w *Webhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admis job := obj.(*trainingoperator.TFJob) log := ctrl.LoggerFrom(ctx).WithName("tfjob-webhook") log.V(5).Info("Validating create", "TFJob", klog.KObj(job)) - return nil, validateTFJob(job).ToAggregate() + return nil, validateTFJob(nil, job).ToAggregate() } -func (w *Webhook) ValidateUpdate(ctx context.Context, _, newObj runtime.Object) (admission.Warnings, error) { - job := newObj.(*trainingoperator.TFJob) +func (w *Webhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { + oldJob := oldObj.(*trainingoperator.TFJob) + newJob := newObj.(*trainingoperator.TFJob) log := ctrl.LoggerFrom(ctx).WithName("tfjob-webhook") - log.V(5).Info("Validating update", "NewTFJob", klog.KObj(job)) - return nil, validateTFJob(job).ToAggregate() + log.V(5).Info("Validating update", "NewTFJob", klog.KObj(newJob)) + return nil, validateTFJob(oldJob, newJob).ToAggregate() } func (w *Webhook) ValidateDelete(context.Context, runtime.Object) (admission.Warnings, error) { return nil, nil } -func validateTFJob(job *trainingoperator.TFJob) field.ErrorList { +func validateTFJob(oldJob, newJob *trainingoperator.TFJob) field.ErrorList { var allErrs field.ErrorList - if errors := apimachineryvalidation.NameIsDNS1035Label(job.Name, false); len(errors) != 0 { - allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("name"), job.Name, fmt.Sprintf("should match: %v", strings.Join(errors, ",")))) + if errors := apimachineryvalidation.NameIsDNS1035Label(newJob.Name, false); len(errors) != 0 { + allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("name"), newJob.Name, fmt.Sprintf("should match: %v", strings.Join(errors, ",")))) } - allErrs = append(allErrs, validateSpec(job.Spec)...) + if oldJob != nil { + allErrs = append(allErrs, util.ValidateRunPolicyUpdate(&oldJob.Spec.RunPolicy, &newJob.Spec.RunPolicy)...) + } + allErrs = append(allErrs, util.ValidateRunPolicy(&newJob.Spec.RunPolicy)...) + allErrs = append(allErrs, validateSpec(newJob.Spec)...) return allErrs } diff --git a/pkg/webhooks/tensorflow/tfjob_webhook_test.go b/pkg/webhooks/tensorflow/tfjob_webhook_test.go index 236d613295..759cc1b58b 100644 --- a/pkg/webhooks/tensorflow/tfjob_webhook_test.go +++ b/pkg/webhooks/tensorflow/tfjob_webhook_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/utils/ptr" @@ -59,6 +60,9 @@ func TestValidateTFJob(t *testing.T) { Name: "test", }, Spec: trainingoperator.TFJobSpec{ + RunPolicy: trainingoperator.RunPolicy{ + ManagedBy: ptr.To(trainingoperator.KubeflowJobsController), + }, TFReplicaSpecs: validTFReplicaSpecs, }, }, @@ -180,10 +184,28 @@ func TestValidateTFJob(t *testing.T) { field.Forbidden(tfReplicaSpecPath, ""), }, }, + "attempt to set unsupported managedBy controller name gets rejected": { + tfJob: &trainingoperator.TFJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: trainingoperator.TFJobSpec{ + RunPolicy: trainingoperator.RunPolicy{ + ManagedBy: ptr.To("other-job-controller"), + }, + TFReplicaSpecs: validTFReplicaSpecs, + }, + }, + wantErr: field.ErrorList{ + field.NotSupported(field.NewPath("spec", "runPolicy", "managedBy"), "", sets.List(sets.New( + trainingoperator.MultiKueueController, + trainingoperator.KubeflowJobsController))), + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - got := validateTFJob(tc.tfJob) + got := validateTFJob(nil, tc.tfJob) if diff := cmp.Diff(tc.wantErr, got, cmpopts.IgnoreFields(field.Error{}, "Detail", "BadValue")); len(diff) != 0 { t.Errorf("Unexpected error (-want,+got):\n%s", diff) } diff --git a/pkg/webhooks/xgboost/xgboostjob_webhook.go b/pkg/webhooks/xgboost/xgboostjob_webhook.go index 826cbe2c53..5372317487 100644 --- a/pkg/webhooks/xgboost/xgboostjob_webhook.go +++ b/pkg/webhooks/xgboost/xgboostjob_webhook.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" trainingoperator "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "github.com/kubeflow/training-operator/pkg/common/util" ) var ( @@ -55,27 +56,31 @@ func (w *Webhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admis job := obj.(*trainingoperator.XGBoostJob) log := ctrl.LoggerFrom(ctx).WithName("xgboostjob-webhook") log.V(5).Info("Validating create", "xgboostJob", klog.KObj(job)) - return nil, validateXGBoostJob(job).ToAggregate() + return nil, validateXGBoostJob(nil, job).ToAggregate() } -func (w *Webhook) ValidateUpdate(ctx context.Context, _, newObj runtime.Object) (admission.Warnings, error) { - job := newObj.(*trainingoperator.XGBoostJob) +func (w *Webhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { + oldJob := oldObj.(*trainingoperator.XGBoostJob) + newJob := newObj.(*trainingoperator.XGBoostJob) log := ctrl.LoggerFrom(ctx).WithName("xgboostjob-webhook") - log.V(5).Info("Validating create", "xgboostJob", klog.KObj(job)) - return nil, validateXGBoostJob(job).ToAggregate() + log.V(5).Info("Validating create", "xgboostJob", klog.KObj(newJob)) + return nil, validateXGBoostJob(oldJob, newJob).ToAggregate() } func (w *Webhook) ValidateDelete(context.Context, runtime.Object) (admission.Warnings, error) { return nil, nil } -func validateXGBoostJob(job *trainingoperator.XGBoostJob) field.ErrorList { +func validateXGBoostJob(oldJob, newJob *trainingoperator.XGBoostJob) field.ErrorList { var allErrs field.ErrorList - - if errors := apimachineryvalidation.NameIsDNS1035Label(job.Name, false); len(errors) != 0 { - allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("name"), job.Name, fmt.Sprintf("should match: %v", strings.Join(errors, ",")))) + if errors := apimachineryvalidation.NameIsDNS1035Label(newJob.Name, false); len(errors) != 0 { + allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("name"), newJob.Name, fmt.Sprintf("should match: %v", strings.Join(errors, ",")))) + } + if oldJob != nil { + allErrs = append(allErrs, util.ValidateRunPolicyUpdate(&oldJob.Spec.RunPolicy, &newJob.Spec.RunPolicy)...) } - allErrs = append(allErrs, validateSpec(job.Spec)...) + allErrs = append(allErrs, util.ValidateRunPolicy(&newJob.Spec.RunPolicy)...) + allErrs = append(allErrs, validateSpec(newJob.Spec)...) return allErrs } diff --git a/pkg/webhooks/xgboost/xgboostjob_webhook_test.go b/pkg/webhooks/xgboost/xgboostjob_webhook_test.go index 9bd95893c6..3c1d410598 100644 --- a/pkg/webhooks/xgboost/xgboostjob_webhook_test.go +++ b/pkg/webhooks/xgboost/xgboostjob_webhook_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/utils/ptr" @@ -91,6 +92,9 @@ func TestValidateXGBoostJob(t *testing.T) { Name: "test", }, Spec: trainingoperator.XGBoostJobSpec{ + RunPolicy: trainingoperator.RunPolicy{ + ManagedBy: ptr.To(trainingoperator.KubeflowJobsController), + }, XGBReplicaSpecs: validXGBoostReplicaSpecs, }, }, @@ -231,10 +235,28 @@ func TestValidateXGBoostJob(t *testing.T) { field.Required(xgbReplicaSpecPath.Key(string(trainingoperator.XGBoostJobReplicaTypeMaster)), ""), }, }, + "attempt to set unsupported managedBy controller name gets rejected": { + xgboostJob: &trainingoperator.XGBoostJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: trainingoperator.XGBoostJobSpec{ + RunPolicy: trainingoperator.RunPolicy{ + ManagedBy: ptr.To("other-job-controller"), + }, + XGBReplicaSpecs: validXGBoostReplicaSpecs, + }, + }, + wantErr: field.ErrorList{ + field.NotSupported(field.NewPath("spec", "runPolicy", "managedBy"), "", sets.List(sets.New( + trainingoperator.MultiKueueController, + trainingoperator.KubeflowJobsController))), + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - got := validateXGBoostJob(tc.xgboostJob) + got := validateXGBoostJob(nil, tc.xgboostJob) if diff := cmp.Diff(tc.wantErr, got, cmpopts.IgnoreFields(field.Error{}, "Detail", "BadValue")); len(diff) != 0 { t.Errorf("Unexpected errors (-want,+got):\n%s", diff) } diff --git a/sdk/python/docs/KubeflowOrgV1RunPolicy.md b/sdk/python/docs/KubeflowOrgV1RunPolicy.md index ce41fa09f6..d3b32c70c7 100644 --- a/sdk/python/docs/KubeflowOrgV1RunPolicy.md +++ b/sdk/python/docs/KubeflowOrgV1RunPolicy.md @@ -7,6 +7,7 @@ Name | Type | Description | Notes **active_deadline_seconds** | **int** | Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer. | [optional] **backoff_limit** | **int** | Optional number of retries before marking this job failed. | [optional] **clean_pod_policy** | **str** | CleanPodPolicy defines the policy to kill pods after the job completes. Default to None. | [optional] +**managed_by** | **str** | ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/training-operator' or 'kueue.x-k8s.io/multikueue'. The training-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/training-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. | [optional] **scheduling_policy** | [**KubeflowOrgV1SchedulingPolicy**](KubeflowOrgV1SchedulingPolicy.md) | | [optional] **suspend** | **bool** | suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job. Defaults to false. | [optional] **ttl_seconds_after_finished** | **int** | TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite. | [optional] diff --git a/sdk/python/kubeflow/training/models/kubeflow_org_v1_run_policy.py b/sdk/python/kubeflow/training/models/kubeflow_org_v1_run_policy.py index c5fd48bc9a..7782720075 100644 --- a/sdk/python/kubeflow/training/models/kubeflow_org_v1_run_policy.py +++ b/sdk/python/kubeflow/training/models/kubeflow_org_v1_run_policy.py @@ -36,6 +36,7 @@ class KubeflowOrgV1RunPolicy(object): 'active_deadline_seconds': 'int', 'backoff_limit': 'int', 'clean_pod_policy': 'str', + 'managed_by': 'str', 'scheduling_policy': 'KubeflowOrgV1SchedulingPolicy', 'suspend': 'bool', 'ttl_seconds_after_finished': 'int' @@ -45,12 +46,13 @@ class KubeflowOrgV1RunPolicy(object): 'active_deadline_seconds': 'activeDeadlineSeconds', 'backoff_limit': 'backoffLimit', 'clean_pod_policy': 'cleanPodPolicy', + 'managed_by': 'managedBy', 'scheduling_policy': 'schedulingPolicy', 'suspend': 'suspend', 'ttl_seconds_after_finished': 'ttlSecondsAfterFinished' } - def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_policy=None, scheduling_policy=None, suspend=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_policy=None, managed_by=None, scheduling_policy=None, suspend=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 """KubeflowOrgV1RunPolicy - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() @@ -59,6 +61,7 @@ def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_p self._active_deadline_seconds = None self._backoff_limit = None self._clean_pod_policy = None + self._managed_by = None self._scheduling_policy = None self._suspend = None self._ttl_seconds_after_finished = None @@ -70,6 +73,8 @@ def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_p self.backoff_limit = backoff_limit if clean_pod_policy is not None: self.clean_pod_policy = clean_pod_policy + if managed_by is not None: + self.managed_by = managed_by if scheduling_policy is not None: self.scheduling_policy = scheduling_policy if suspend is not None: @@ -146,6 +151,29 @@ def clean_pod_policy(self, clean_pod_policy): self._clean_pod_policy = clean_pod_policy + @property + def managed_by(self): + """Gets the managed_by of this KubeflowOrgV1RunPolicy. # noqa: E501 + + ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/training-operator' or 'kueue.x-k8s.io/multikueue'. The training-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/training-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 + + :return: The managed_by of this KubeflowOrgV1RunPolicy. # noqa: E501 + :rtype: str + """ + return self._managed_by + + @managed_by.setter + def managed_by(self, managed_by): + """Sets the managed_by of this KubeflowOrgV1RunPolicy. + + ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/training-operator' or 'kueue.x-k8s.io/multikueue'. The training-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/training-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 + + :param managed_by: The managed_by of this KubeflowOrgV1RunPolicy. # noqa: E501 + :type: str + """ + + self._managed_by = managed_by + @property def scheduling_policy(self): """Gets the scheduling_policy of this KubeflowOrgV1RunPolicy. # noqa: E501 diff --git a/sdk/python/test/e2e/test_e2e_pytorchjob.py b/sdk/python/test/e2e/test_e2e_pytorchjob.py index c5b28faaf8..5800c0f76f 100644 --- a/sdk/python/test/e2e/test_e2e_pytorchjob.py +++ b/sdk/python/test/e2e/test_e2e_pytorchjob.py @@ -167,6 +167,82 @@ def test_sdk_e2e(job_namespace): TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace) +@pytest.mark.skipif( + GANG_SCHEDULER_NAME in GANG_SCHEDULERS, + reason="For plain scheduling", +) +def test_sdk_e2e_managed_by(job_namespace): + JOB_NAME = "pytorchjob-e2e" + container = generate_container() + + master = KubeflowOrgV1ReplicaSpec( + replicas=1, + restart_policy="OnFailure", + template=V1PodTemplateSpec( + metadata=V1ObjectMeta( + annotations={constants.ISTIO_SIDECAR_INJECTION: "false"} + ), + spec=V1PodSpec(containers=[container]), + ), + ) + + worker = KubeflowOrgV1ReplicaSpec( + replicas=1, + restart_policy="OnFailure", + template=V1PodTemplateSpec( + metadata=V1ObjectMeta( + annotations={constants.ISTIO_SIDECAR_INJECTION: "false"} + ), + spec=V1PodSpec(containers=[container]), + ), + ) + + #1. Job created with default value: 'kubeflow.org/training-operator' - job created and status updated + #2. Job created with kueue value: 'kueue.x-k8s.io/multikueue' - job created but status not updated + #3. Job created with invalid value (not acceptable by the webhook) - job not created + controllers = { + JOB_NAME+"-default-controller": 'kubeflow.org/training-operator', + JOB_NAME+"-multikueue-controller": 'kueue.x-k8s.io/multikueue', + JOB_NAME+"-invalid-controller": 'kueue.x-k8s.io/other-controller', + } + for job_name, managed_by in controllers.items(): + pytorchjob = generate_pytorchjob(job_namespace, job_name, master, worker, managed_by=managed_by) + try: + TRAINING_CLIENT.create_job(job=pytorchjob, namespace=job_namespace) + except Exception as e: + if "invalid" in str(job_name): + error_message = f"Failed to create PyTorchJob: {job_namespace}/{job_name}" + assert error_message in str(e), f"Unexpected error: {e}" + else: + raise Exception(f"PyTorchJob E2E fails. Exception: {e}") + + logging.info(f"List of created {TRAINING_CLIENT.job_kind}s") + jobs = TRAINING_CLIENT.list_jobs(job_namespace) + logging.info(jobs) + + try: + #Only jobs with valid controllers should be created, 2 out of 3 satisfy this condition: 'kubeflow.org/training-operator' and 'kueue.x-k8s.io/multikueue' + if len(jobs) != 2: + raise Exception(f"Too many PyTorchJobs created {jobs}") + + for job in jobs: + if job._metadata.name == 'kubeflow.org/training-operator': + utils.verify_job_e2e(TRAINING_CLIENT, job._metadata.name, job_namespace, wait_timeout=900) + if job._metadata.name == 'kueue.x-k8s.io/multikueue': + conditions = TRAINING_CLIENT.get_job_conditions(job._metadata.name, job_namespace, TRAINING_CLIENT.job_kind, job) + if len(conditions) != 0: + raise Exception(f"{TRAINING_CLIENT.job_kind} conditions {conditions} should not be updated, externally managed by {managed_by}") + + + except Exception as e: + utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace) + TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace) + raise Exception(f"PyTorchJob E2E fails. Exception: {e}") + + for job in jobs: + utils.print_job_results(TRAINING_CLIENT, job._metadata.name, job_namespace) + TRAINING_CLIENT.delete_job(job._metadata.name, job_namespace) + @pytest.mark.skipif( GANG_SCHEDULER_NAME in GANG_SCHEDULERS, reason="For plain scheduling", @@ -246,6 +322,7 @@ def generate_pytorchjob( master: KubeflowOrgV1ReplicaSpec, worker: KubeflowOrgV1ReplicaSpec, scheduling_policy: Optional[KubeflowOrgV1SchedulingPolicy] = None, + managed_by: Optional[str] = None, ) -> KubeflowOrgV1PyTorchJob: return KubeflowOrgV1PyTorchJob( api_version=constants.API_VERSION, @@ -255,6 +332,7 @@ def generate_pytorchjob( run_policy=KubeflowOrgV1RunPolicy( clean_pod_policy="None", scheduling_policy=scheduling_policy, + managed_by=managed_by, ), pytorch_replica_specs={"Master": master, "Worker": worker}, ), diff --git a/sdk/python/test/test_kubeflow_org_v1_jax_job.py b/sdk/python/test/test_kubeflow_org_v1_jax_job.py index 05c9e07d4c..f252ecd54d 100644 --- a/sdk/python/test/test_kubeflow_org_v1_jax_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_jax_job.py @@ -50,6 +50,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_jax_job_list.py b/sdk/python/test/test_kubeflow_org_v1_jax_job_list.py index 4f92324581..cfdf269e43 100644 --- a/sdk/python/test/test_kubeflow_org_v1_jax_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_jax_job_list.py @@ -53,6 +53,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { @@ -106,6 +107,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_jax_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_jax_job_spec.py index 7e023b261b..02da4bb580 100644 --- a/sdk/python/test/test_kubeflow_org_v1_jax_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_jax_job_spec.py @@ -46,6 +46,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { @@ -69,6 +70,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_mpi_job.py b/sdk/python/test/test_kubeflow_org_v1_mpi_job.py index 1f8f164cf9..2f2eccdf8e 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mpi_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_mpi_job.py @@ -52,6 +52,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_mpi_job_list.py b/sdk/python/test/test_kubeflow_org_v1_mpi_job_list.py index 4406dc67e1..ee3248e160 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mpi_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_mpi_job_list.py @@ -55,6 +55,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { @@ -111,6 +112,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_mpi_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_mpi_job_spec.py index c042104169..4142c43a0c 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mpi_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_mpi_job_spec.py @@ -48,6 +48,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_paddle_job.py b/sdk/python/test/test_kubeflow_org_v1_paddle_job.py index 17759aa30a..e0e213f6cb 100644 --- a/sdk/python/test/test_kubeflow_org_v1_paddle_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_paddle_job.py @@ -57,6 +57,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_paddle_job_list.py b/sdk/python/test/test_kubeflow_org_v1_paddle_job_list.py index e82d533a1f..d11b9484fa 100644 --- a/sdk/python/test/test_kubeflow_org_v1_paddle_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_paddle_job_list.py @@ -60,6 +60,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { @@ -120,6 +121,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_paddle_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_paddle_job_spec.py index 71029311b3..4b7e759a42 100644 --- a/sdk/python/test/test_kubeflow_org_v1_paddle_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_paddle_job_spec.py @@ -53,6 +53,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { @@ -76,6 +77,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py index 4a85f5ed46..716555cf75 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py @@ -69,6 +69,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py index 2cbafc65de..be2e21f2c4 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py @@ -72,6 +72,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { @@ -144,6 +145,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py index f19f3f2fd1..eb73d0c63c 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py @@ -65,6 +65,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { @@ -88,6 +89,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_run_policy.py b/sdk/python/test/test_kubeflow_org_v1_run_policy.py index 525be1d785..a279e0c722 100644 --- a/sdk/python/test/test_kubeflow_org_v1_run_policy.py +++ b/sdk/python/test/test_kubeflow_org_v1_run_policy.py @@ -39,6 +39,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_tf_job.py b/sdk/python/test/test_kubeflow_org_v1_tf_job.py index fa1cf4a6bc..8d8e89c94d 100644 --- a/sdk/python/test/test_kubeflow_org_v1_tf_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_tf_job.py @@ -45,6 +45,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_tf_job_list.py b/sdk/python/test/test_kubeflow_org_v1_tf_job_list.py index 6f7c46d212..5dbb8fd138 100644 --- a/sdk/python/test/test_kubeflow_org_v1_tf_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_tf_job_list.py @@ -48,6 +48,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { @@ -103,6 +104,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_tf_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_tf_job_spec.py index 892fb43578..9448020c3e 100644 --- a/sdk/python/test/test_kubeflow_org_v1_tf_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_tf_job_spec.py @@ -41,6 +41,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { @@ -65,6 +66,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job.py b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job.py index 88e6de1793..324bad50d7 100644 --- a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job.py @@ -44,6 +44,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_list.py b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_list.py index 5cdfdd931d..de4dd000f2 100644 --- a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_list.py @@ -47,6 +47,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { @@ -100,6 +101,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { diff --git a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_spec.py index 222f29f84a..c7206b01f1 100644 --- a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_spec.py @@ -40,6 +40,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = { @@ -63,6 +64,7 @@ def make_instance(self, include_optional): active_deadline_seconds = 56, backoff_limit = 56, clean_pod_policy = '0', + managed_by = '0', scheduling_policy = kubeflow_org_v1_scheduling_policy.KubeflowOrgV1SchedulingPolicy( min_available = 56, min_resources = {