From 2e1e125d7c6d1d4ccd1f25cc2e62212a4deaa225 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Fri, 1 Nov 2024 21:07:14 +0000 Subject: [PATCH] KEP-2170: Implement Initializer builders in the JobSet plugin (#2316) * KEP-2170: Implement Initializer builder in the JobSet plugin Signed-off-by: Andrey Velichkevich * Update the SDK models Signed-off-by: Andrey Velichkevich * Remove Info from Initializer builder Signed-off-by: Andrey Velichkevich * Update manifests Signed-off-by: Andrey Velichkevich * Update pkg/constants/constants.go Co-authored-by: Yuki Iwai Signed-off-by: Andrey Velichkevich * Use var for envs Signed-off-by: Andrey Velichkevich * Remove check manifests from GitHub actions Signed-off-by: Andrey Velichkevich * Move consts to JobSet plugin Signed-off-by: Andrey Velichkevich --------- Signed-off-by: Andrey Velichkevich Co-authored-by: Yuki Iwai --- .github/workflows/test-go.yaml | 10 +- Makefile | 2 +- api.v2/openapi-spec/swagger.json | 12 +- .../2170-kubeflow-training-v2/README.md | 6 +- .../v2/base/crds/kubeflow.org_trainjobs.yaml | 44 ++- .../v2alpha1/openapi_generated.go | 18 +- .../kubeflow.org/v2alpha1/trainjob_types.go | 15 +- .../v2alpha1/zz_generated.deepcopy.go | 6 +- pkg/controller.v1/pytorch/elastic.go | 2 +- pkg/controller.v1/pytorch/master.go | 2 +- .../core/clustertrainingruntime_test.go | 6 +- pkg/runtime.v2/core/trainingruntime_test.go | 120 ++++++- .../framework/core/framework_test.go | 2 +- .../framework/plugins/jobset/builder.go | 88 ++++- .../framework/plugins/jobset/constants.go | 48 +++ .../framework/plugins/jobset/jobset.go | 2 +- pkg/util.v2/testing/wrapper.go | 302 +++++++++++++++--- .../docs/KubeflowOrgV2alpha1DatasetConfig.md | 2 +- sdk_v2/docs/KubeflowOrgV2alpha1InputModel.md | 2 +- sdk_v2/docs/KubeflowOrgV2alpha1OutputModel.md | 2 +- .../kubeflow_org_v2alpha1_dataset_config.py | 6 +- .../kubeflow_org_v2alpha1_input_model.py | 6 +- .../kubeflow_org_v2alpha1_output_model.py | 6 +- .../controller.v2/trainjob_controller_test.go | 29 +- 24 files changed, 595 insertions(+), 143 deletions(-) create mode 100644 pkg/runtime.v2/framework/plugins/jobset/constants.go diff --git a/.github/workflows/test-go.yaml b/.github/workflows/test-go.yaml index de9c242398..b314f73724 100644 --- a/.github/workflows/test-go.yaml +++ b/.github/workflows/test-go.yaml @@ -29,14 +29,10 @@ jobs: run: | go mod tidy && pushd hack/swagger && go mod tidy && popd && git add go.* && git diff --cached --exit-code || (echo 'Please run "go mod tidy" to sync Go modules' && exit 1); - - name: Check manifests + - name: Check auto-generated assets run: | - make manifests && git add manifests && - git diff --cached --exit-code || (echo 'Please run "make manifests" to generate manifests' && exit 1); - - name: Check auto-generated codes - run: | - make generate && git add pkg sdk && - git diff --cached --exit-code || (echo 'Please run "make generate" to generate Go codes' && exit 1); + make generate && git add pkg sdk manifests && + git diff --cached --exit-code || (echo 'Please run "make generate" to generate assets' && exit 1); - name: Verify gofmt run: | make fmt && git add pkg cmd && diff --git a/Makefile b/Makefile index df7f4969fa..97dc64660f 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,7 @@ manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and Cust output:rbac:artifacts:config=manifests/v2/base/rbac \ output:webhook:artifacts:config=manifests/v2/base/webhook -generate: controller-gen ## Generate apidoc, sdk and code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations. +generate: controller-gen manifests ## Generate apidoc, sdk and code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations. $(CONTROLLER_GEN) object:headerFile="hack/boilerplate/boilerplate.go.txt" paths="./pkg/apis/..." hack/update-codegen.sh hack/python-sdk/gen-sdk.sh diff --git a/api.v2/openapi-spec/swagger.json b/api.v2/openapi-spec/swagger.json index eadb6bfc06..83cbd5dedc 100644 --- a/api.v2/openapi-spec/swagger.json +++ b/api.v2/openapi-spec/swagger.json @@ -137,8 +137,8 @@ } }, "secretRef": { - "description": "Reference to the TrainJob's secrets to download dataset.", - "$ref": "#/definitions/v1.SecretReference" + "description": "Reference to the secret with credentials to download dataset. Secret must be created in the TrainJob's namespace.", + "$ref": "#/definitions/v1.LocalObjectReference" }, "storageUri": { "description": "Storage uri for the dataset provider.", @@ -159,8 +159,8 @@ } }, "secretRef": { - "description": "Reference to the TrainJob's secrets to download model.", - "$ref": "#/definitions/v1.SecretReference" + "description": "Reference to the secret with credentials to download model. Secret must be created in the TrainJob's namespace.", + "$ref": "#/definitions/v1.LocalObjectReference" }, "storageUri": { "description": "Storage uri for the model provider.", @@ -315,8 +315,8 @@ } }, "secretRef": { - "description": "Reference to the TrainJob's secrets to export model.", - "$ref": "#/definitions/v1.SecretReference" + "description": "Reference to the secret with credentials to export model. Secret must be created in the TrainJob's namespace.", + "$ref": "#/definitions/v1.LocalObjectReference" }, "storageUri": { "description": "Storage uri for the model exporter.", diff --git a/docs/proposals/2170-kubeflow-training-v2/README.md b/docs/proposals/2170-kubeflow-training-v2/README.md index c04b615dcc..be0f16b957 100644 --- a/docs/proposals/2170-kubeflow-training-v2/README.md +++ b/docs/proposals/2170-kubeflow-training-v2/README.md @@ -597,7 +597,7 @@ type DatasetConfig struct { Env []corev1.EnvVar `json:"env,omitempty"` // Reference to the TrainJob's secrets to download dataset. - SecretRef *corev1.SecretReference `json:"secretRef,omitempty"` + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` } ``` @@ -665,7 +665,7 @@ type InputModel struct { Env []corev1.EnvVar `json:"env,omitempty"` // Reference to the TrainJob's secrets to download model. - SecretRef *corev1.SecretReference `json:"secretRef,omitempty"` + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` } type OutputModel struct { @@ -677,7 +677,7 @@ type OutputModel struct { Env []corev1.EnvVar `json:"env,omitempty"` // Reference to the TrainJob's secrets to export model. - SecretRef *corev1.SecretReference `json:"secretRef,omitempty"` + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` } ``` diff --git a/manifests/v2/base/crds/kubeflow.org_trainjobs.yaml b/manifests/v2/base/crds/kubeflow.org_trainjobs.yaml index 8307cdc51e..ed6cda3760 100644 --- a/manifests/v2/base/crds/kubeflow.org_trainjobs.yaml +++ b/manifests/v2/base/crds/kubeflow.org_trainjobs.yaml @@ -173,15 +173,15 @@ spec: type: object type: array secretRef: - description: Reference to the TrainJob's secrets to download dataset. + description: |- + Reference to the secret with credentials to download dataset. + Secret must be created in the TrainJob's namespace. properties: name: - description: name is unique within a namespace to reference - a secret resource. - type: string - namespace: - description: namespace defines the space within which the - secret name must be unique. + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? type: string type: object x-kubernetes-map-type: atomic @@ -339,16 +339,15 @@ spec: type: object type: array secretRef: - description: Reference to the TrainJob's secrets to download - model. + description: |- + Reference to the secret with credentials to download model. + Secret must be created in the TrainJob's namespace. properties: name: - description: name is unique within a namespace to reference - a secret resource. - type: string - namespace: - description: namespace defines the space within which - the secret name must be unique. + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? type: string type: object x-kubernetes-map-type: atomic @@ -479,16 +478,15 @@ spec: type: object type: array secretRef: - description: Reference to the TrainJob's secrets to export - model. + description: |- + Reference to the secret with credentials to export model. + Secret must be created in the TrainJob's namespace. properties: name: - description: name is unique within a namespace to reference - a secret resource. - type: string - namespace: - description: namespace defines the space within which - the secret name must be unique. + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? type: string type: object x-kubernetes-map-type: atomic diff --git a/pkg/apis/kubeflow.org/v2alpha1/openapi_generated.go b/pkg/apis/kubeflow.org/v2alpha1/openapi_generated.go index 24f7b3d19f..5394285cda 100644 --- a/pkg/apis/kubeflow.org/v2alpha1/openapi_generated.go +++ b/pkg/apis/kubeflow.org/v2alpha1/openapi_generated.go @@ -299,15 +299,15 @@ func schema_pkg_apis_kubefloworg_v2alpha1_DatasetConfig(ref common.ReferenceCall }, "secretRef": { SchemaProps: spec.SchemaProps{ - Description: "Reference to the TrainJob's secrets to download dataset.", - Ref: ref("k8s.io/api/core/v1.SecretReference"), + Description: "Reference to the secret with credentials to download dataset. Secret must be created in the TrainJob's namespace.", + Ref: ref("k8s.io/api/core/v1.LocalObjectReference"), }, }, }, }, }, Dependencies: []string{ - "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.SecretReference"}, + "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.LocalObjectReference"}, } } @@ -341,15 +341,15 @@ func schema_pkg_apis_kubefloworg_v2alpha1_InputModel(ref common.ReferenceCallbac }, "secretRef": { SchemaProps: spec.SchemaProps{ - Description: "Reference to the TrainJob's secrets to download model.", - Ref: ref("k8s.io/api/core/v1.SecretReference"), + Description: "Reference to the secret with credentials to download model. Secret must be created in the TrainJob's namespace.", + Ref: ref("k8s.io/api/core/v1.LocalObjectReference"), }, }, }, }, }, Dependencies: []string{ - "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.SecretReference"}, + "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.LocalObjectReference"}, } } @@ -602,15 +602,15 @@ func schema_pkg_apis_kubefloworg_v2alpha1_OutputModel(ref common.ReferenceCallba }, "secretRef": { SchemaProps: spec.SchemaProps{ - Description: "Reference to the TrainJob's secrets to export model.", - Ref: ref("k8s.io/api/core/v1.SecretReference"), + Description: "Reference to the secret with credentials to export model. Secret must be created in the TrainJob's namespace.", + Ref: ref("k8s.io/api/core/v1.LocalObjectReference"), }, }, }, }, }, Dependencies: []string{ - "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.SecretReference"}, + "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.LocalObjectReference"}, } } diff --git a/pkg/apis/kubeflow.org/v2alpha1/trainjob_types.go b/pkg/apis/kubeflow.org/v2alpha1/trainjob_types.go index 38f740a9c2..55a813350e 100644 --- a/pkg/apis/kubeflow.org/v2alpha1/trainjob_types.go +++ b/pkg/apis/kubeflow.org/v2alpha1/trainjob_types.go @@ -167,8 +167,9 @@ type DatasetConfig struct { // These values will be merged with the TrainingRuntime's dataset initializer environments. Env []corev1.EnvVar `json:"env,omitempty"` - // Reference to the TrainJob's secrets to download dataset. - SecretRef *corev1.SecretReference `json:"secretRef,omitempty"` + // Reference to the secret with credentials to download dataset. + // Secret must be created in the TrainJob's namespace. + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` } // ModelConfig represents the desired model configuration. @@ -193,8 +194,9 @@ type InputModel struct { // These values will be merged with the TrainingRuntime's model initializer environments. Env []corev1.EnvVar `json:"env,omitempty"` - // Reference to the TrainJob's secrets to download model. - SecretRef *corev1.SecretReference `json:"secretRef,omitempty"` + // Reference to the secret with credentials to download model. + // Secret must be created in the TrainJob's namespace. + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` } // OutputModel represents the desired trained model configuration. @@ -206,8 +208,9 @@ type OutputModel struct { // These values will be merged with the TrainingRuntime's model exporter environments. Env []corev1.EnvVar `json:"env,omitempty"` - // Reference to the TrainJob's secrets to export model. - SecretRef *corev1.SecretReference `json:"secretRef,omitempty"` + // Reference to the secret with credentials to export model. + // Secret must be created in the TrainJob's namespace. + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` } // PodSpecOverride represents the custom overrides that will be applied for the TrainJob's resources. diff --git a/pkg/apis/kubeflow.org/v2alpha1/zz_generated.deepcopy.go b/pkg/apis/kubeflow.org/v2alpha1/zz_generated.deepcopy.go index e643a85bdb..f482715d9a 100644 --- a/pkg/apis/kubeflow.org/v2alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow.org/v2alpha1/zz_generated.deepcopy.go @@ -166,7 +166,7 @@ func (in *DatasetConfig) DeepCopyInto(out *DatasetConfig) { } if in.SecretRef != nil { in, out := &in.SecretRef, &out.SecretRef - *out = new(v1.SecretReference) + *out = new(v1.LocalObjectReference) **out = **in } } @@ -198,7 +198,7 @@ func (in *InputModel) DeepCopyInto(out *InputModel) { } if in.SecretRef != nil { in, out := &in.SecretRef, &out.SecretRef - *out = new(v1.SecretReference) + *out = new(v1.LocalObjectReference) **out = **in } } @@ -368,7 +368,7 @@ func (in *OutputModel) DeepCopyInto(out *OutputModel) { } if in.SecretRef != nil { in, out := &in.SecretRef, &out.SecretRef - *out = new(v1.SecretReference) + *out = new(v1.LocalObjectReference) **out = **in } } diff --git a/pkg/controller.v1/pytorch/elastic.go b/pkg/controller.v1/pytorch/elastic.go index f5c2008b60..ee0a4f6308 100644 --- a/pkg/controller.v1/pytorch/elastic.go +++ b/pkg/controller.v1/pytorch/elastic.go @@ -67,7 +67,7 @@ func GetElasticEnvVarGenerator() EnvVarGenerator { func (e ElasticEnvVarGenerator) Generate( job *kubeflowv1.PyTorchJob) ([]corev1.EnvVar, error) { - envVars := []corev1.EnvVar{} + var envVars []corev1.EnvVar elasticPolicy := job.Spec.ElasticPolicy if elasticPolicy == nil { diff --git a/pkg/controller.v1/pytorch/master.go b/pkg/controller.v1/pytorch/master.go index 064c7054b9..c46a031cca 100644 --- a/pkg/controller.v1/pytorch/master.go +++ b/pkg/controller.v1/pytorch/master.go @@ -32,7 +32,7 @@ func GetMasterEnvVarGenerator() EnvVarGenerator { func (e MasterEnvVarGenerator) Generate( job *kubeflowv1.PyTorchJob) ([]corev1.EnvVar, error) { - envVars := []corev1.EnvVar{} + var envVars []corev1.EnvVar if job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster] != nil { masterPort, err := getPortFromPyTorchJob(job, kubeflowv1.PyTorchJobReplicaTypeMaster) if err != nil { diff --git a/pkg/runtime.v2/core/clustertrainingruntime_test.go b/pkg/runtime.v2/core/clustertrainingruntime_test.go index 40062c2d45..050d86a23a 100644 --- a/pkg/runtime.v2/core/clustertrainingruntime_test.go +++ b/pkg/runtime.v2/core/clustertrainingruntime_test.go @@ -47,9 +47,9 @@ func TestClusterTrainingRuntimeNewObjects(t *testing.T) { "succeeded to build PodGroup and JobSet with NumNodes from the Runtime and container from the Trainer.": { clusterTrainingRuntime: testingutil.MakeClusterTrainingRuntimeWrapper("test-runtime").RuntimeSpec( testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeClusterTrainingRuntimeWrapper("test-runtime").Spec). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). NumNodes(100). ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). PodGroupPolicyCoschedulingSchedulingTimeout(120). Obj(), ).Obj(), @@ -59,15 +59,15 @@ func TestClusterTrainingRuntimeNewObjects(t *testing.T) { RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.ClusterTrainingRuntimeKind), "test-runtime"). Trainer( testingutil.MakeTrainJobTrainerWrapper(). - ContainerTrainer("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). + Container("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). Obj(), ). Obj(), wantObjs: []client.Object{ testingutil.MakeJobSetWrapper(metav1.NamespaceDefault, "test-job"). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). NumNodes(100). ContainerTrainer("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). Suspend(true). PodLabel(schedulerpluginsv1alpha1.PodGroupLabel, "test-job"). ControllerReference(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainJobKind), "test-job", "uid"). diff --git a/pkg/runtime.v2/core/trainingruntime_test.go b/pkg/runtime.v2/core/trainingruntime_test.go index b1cb5bf5e2..2a11716cd3 100644 --- a/pkg/runtime.v2/core/trainingruntime_test.go +++ b/pkg/runtime.v2/core/trainingruntime_test.go @@ -31,6 +31,7 @@ import ( kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" "github.com/kubeflow/training-operator/pkg/constants" + jobsetplugin "github.com/kubeflow/training-operator/pkg/runtime.v2/framework/plugins/jobset" testingutil "github.com/kubeflow/training-operator/pkg/util.v2/testing" ) @@ -47,15 +48,15 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { wantError error }{ // Test cases for the PlainML MLPolicy. - "succeeded to build PodGroup and JobSet with NumNodes from the TrainJob and container from Runtime.": { + "succeeded to build PodGroup and JobSet with NumNodes from the TrainJob and container from the Runtime.": { trainingRuntime: testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime"). Label("conflictLabel", "overridden"). Annotation("conflictAnnotation", "overridden"). RuntimeSpec( testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").Spec). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). NumNodes(100). ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). PodGroupPolicyCoschedulingSchedulingTimeout(120). Obj(), ).Obj(), @@ -73,9 +74,9 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { Obj(), wantObjs: []client.Object{ testingutil.MakeJobSetWrapper(metav1.NamespaceDefault, "test-job"). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). NumNodes(30). ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). Suspend(true). Label("conflictLabel", "override"). Annotation("conflictAnnotation", "override"). @@ -86,8 +87,10 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { ControllerReference(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainJobKind), "test-job", "uid"). MinMember(31). // 31 replicas = 30 Trainer nodes + 1 Initializer. MinResources(corev1.ResourceList{ - // TODO (andreyvelich): Create helper function to calculate PodGroup resources in the unit tests. - corev1.ResourceCPU: resource.MustParse("31"), // Every replica has 1 CPU = 31 CPUs in total. + // Every replica has 1 CPU = 31 CPUs in total. + // Initializer uses InitContainers which execute sequentially. + // Thus, the MinResources is equal to the maximum from the initContainer resources. + corev1.ResourceCPU: resource.MustParse("31"), }). SchedulingTimeout(120). Obj(), @@ -117,8 +120,8 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "test-runtime"). Trainer( testingutil.MakeTrainJobTrainerWrapper(). - ContainerTrainer("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). - ContainerTrainerEnv( + Container("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). + ContainerEnv( []corev1.EnvVar{ { Name: "TRAIN_JOB", @@ -157,6 +160,105 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { Obj(), }, }, + "succeeded to build JobSet with dataset and model initializer from the TrainJob.": { + trainingRuntime: testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").RuntimeSpec( + testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").Spec). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + NumNodes(100). + ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + Obj(), + ).Obj(), + trainJob: testingutil.MakeTrainJobWrapper(metav1.NamespaceDefault, "test-job"). + UID("uid"). + RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "test-runtime"). + Trainer( + testingutil.MakeTrainJobTrainerWrapper(). + Obj(), + ). + DatasetConfig( + testingutil.MakeTrainJobDatasetConfigWrapper(). + StorageUri("hf://trainjob-dataset"). + ContainerEnv( + []corev1.EnvVar{ + { + Name: "TRAIN_JOB", + Value: "test:trainjob:dataset", + }, + }, + ). + SecretRef(corev1.LocalObjectReference{Name: "trainjob-secret-dataset"}). + Obj(), + ). + ModelConfig( + testingutil.MakeTrainJobModelConfigWrapper(). + StorageUri("hf://trainjob-model"). + ContainerEnv( + []corev1.EnvVar{ + { + Name: "TRAIN_JOB", + Value: "test:trainjob:model", + }, + }, + ). + SecretRef(corev1.LocalObjectReference{Name: "trainjob-secret-model"}). + Obj(), + ). + Obj(), + wantObjs: []client.Object{ + testingutil.MakeJobSetWrapper(metav1.NamespaceDefault, "test-job"). + NumNodes(100). + ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetInitializerEnv( + []corev1.EnvVar{ + { + Name: jobsetplugin.InitializerEnvStorageUri, + Value: "hf://trainjob-dataset", + }, + { + Name: "TRAIN_JOB", + Value: "test:trainjob:dataset", + }, + }, + ). + InitContainerDatasetInitializerEnvFrom( + []corev1.EnvFromSource{ + { + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "trainjob-secret-dataset", + }, + }, + }, + }, + ). + InitContainerModelInitializerEnv( + []corev1.EnvVar{ + { + Name: jobsetplugin.InitializerEnvStorageUri, + Value: "hf://trainjob-model", + }, + { + Name: "TRAIN_JOB", + Value: "test:trainjob:model", + }, + }, + ). + InitContainerModelInitializerEnvFrom( + []corev1.EnvFromSource{ + { + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "trainjob-secret-model", + }, + }, + }, + }, + ). + ControllerReference(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainJobKind), "test-job", "uid"). + Obj(), + }, + }, // Test cases for the Torch MLPolicy. "succeeded to build JobSet with Torch values from the TrainJob": { trainingRuntime: testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").RuntimeSpec( @@ -236,8 +338,8 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "test-runtime"). Trainer( testingutil.MakeTrainJobTrainerWrapper(). - ContainerTrainer("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). - ContainerTrainerEnv( + Container("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). + ContainerEnv( []corev1.EnvVar{ { Name: "TRAIN_JOB", diff --git a/pkg/runtime.v2/framework/core/framework_test.go b/pkg/runtime.v2/framework/core/framework_test.go index c8d0b1ca8f..69255c4016 100644 --- a/pkg/runtime.v2/framework/core/framework_test.go +++ b/pkg/runtime.v2/framework/core/framework_test.go @@ -427,7 +427,7 @@ func TestRunComponentBuilderPlugins(t *testing.T) { Trainer( testingutil.MakeTrainJobTrainerWrapper(). NumNodes(100). - ContainerTrainer("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). + Container("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). Obj(), ). Obj(), diff --git a/pkg/runtime.v2/framework/plugins/jobset/builder.go b/pkg/runtime.v2/framework/plugins/jobset/builder.go index 6849634ee2..dc104bdd28 100644 --- a/pkg/runtime.v2/framework/plugins/jobset/builder.go +++ b/pkg/runtime.v2/framework/plugins/jobset/builder.go @@ -19,6 +19,7 @@ package jobset import ( "maps" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" @@ -51,6 +52,90 @@ func NewBuilder(objectKey client.ObjectKey, jobSetTemplateSpec kubeflowv2.JobSet } } +// mergeInitializerEnvs merges the TrainJob and Runtime Pod envs. +func mergeInitializerEnvs(storageUri *string, trainJobEnvs, containerEnv []corev1.EnvVar) []corev1.EnvVar { + envNames := sets.New[string]() + var envs []corev1.EnvVar + // Add the Storage URI env. + if storageUri != nil { + envNames.Insert(InitializerEnvStorageUri) + envs = append(envs, corev1.EnvVar{ + Name: InitializerEnvStorageUri, + Value: *storageUri, + }) + } + // Add the rest TrainJob envs. + // TODO (andreyvelich): Validate that TrainJob dataset and model envs don't have the STORAGE_URI env. + for _, e := range trainJobEnvs { + envNames.Insert(e.Name) + envs = append(envs, e) + } + + // TrainJob envs take precedence over the TrainingRuntime envs. + for _, e := range containerEnv { + if !envNames.Has(e.Name) { + envs = append(envs, e) + } + } + return envs +} + +// Initializer updates JobSet values for the initializer Job. +func (b *Builder) Initializer(trainJob *kubeflowv2.TrainJob) *Builder { + for i, rJob := range b.Spec.ReplicatedJobs { + if rJob.Name == constants.JobInitializer { + // TODO (andreyvelich): Currently, we use initContainers for the initializers. + // Once JobSet supports execution policy for the ReplicatedJobs, we should migrate to containers. + // Ref: https://github.com/kubernetes-sigs/jobset/issues/672 + for j, container := range rJob.Template.Spec.Template.Spec.InitContainers { + // Update values for the dataset initializer container. + if container.Name == constants.ContainerDatasetInitializer && trainJob.Spec.DatasetConfig != nil { + // Update the dataset initializer envs. + b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Env = mergeInitializerEnvs( + trainJob.Spec.DatasetConfig.StorageUri, + trainJob.Spec.DatasetConfig.Env, + container.Env, + ) + // Update the dataset initializer secret reference. + if trainJob.Spec.DatasetConfig.SecretRef != nil { + b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].EnvFrom = append( + b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].EnvFrom, + corev1.EnvFromSource{ + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: *trainJob.Spec.DatasetConfig.SecretRef, + }, + }, + ) + } + } + // TODO (andreyvelich): Add the model exporter when we support it. + // Update values for the model initializer container. + if container.Name == constants.ContainerModelInitializer && trainJob.Spec.ModelConfig != nil && trainJob.Spec.ModelConfig.Input != nil { + // Update the model initializer envs. + b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Env = mergeInitializerEnvs( + trainJob.Spec.ModelConfig.Input.StorageUri, + trainJob.Spec.ModelConfig.Input.Env, + container.Env, + ) + // Update the model initializer secret reference. + if trainJob.Spec.ModelConfig.Input.SecretRef != nil { + b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].EnvFrom = append( + b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].EnvFrom, + corev1.EnvFromSource{ + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: *trainJob.Spec.ModelConfig.Input.SecretRef, + }, + }, + ) + } + + } + } + } + } + return b +} + // Trainer updates JobSet values for the trainer Job. func (b *Builder) Trainer(info *runtime.Info, trainJob *kubeflowv2.TrainJob) *Builder { for i, rJob := range b.Spec.ReplicatedJobs { @@ -85,7 +170,7 @@ func (b *Builder) Trainer(info *runtime.Info, trainJob *kubeflowv2.TrainJob) *Bu envNames.Insert(env.Name) } trainerEnvs := info.Trainer.Env - // Info envs take precedence over TrainingRuntime envs. + // Info envs take precedence over the TrainingRuntime envs. for _, env := range container.Env { if !envNames.Has(env.Name) { trainerEnvs = append(trainerEnvs, env) @@ -93,6 +178,7 @@ func (b *Builder) Trainer(info *runtime.Info, trainJob *kubeflowv2.TrainJob) *Bu } b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Env = trainerEnvs } + // Update the Trainer container port. if info.Trainer.ContainerPort != nil { b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Ports = append( b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Ports, *info.Trainer.ContainerPort) diff --git a/pkg/runtime.v2/framework/plugins/jobset/constants.go b/pkg/runtime.v2/framework/plugins/jobset/constants.go new file mode 100644 index 0000000000..3a8a8e07e6 --- /dev/null +++ b/pkg/runtime.v2/framework/plugins/jobset/constants.go @@ -0,0 +1,48 @@ +package jobset + +import ( + corev1 "k8s.io/api/core/v1" +) + +const ( + + // VolumeNameInitializer is the name for the initializer Pod's Volume and VolumeMount. + // TODO (andreyvelich): Add validation to check that initializer Pod has the correct volume. + VolumeNameInitializer string = "initializer" + + // InitializerEnvStorageUri is the env name for the initializer storage uri. + InitializerEnvStorageUri string = "STORAGE_URI" +) + +var ( + // This is the temporary container that we use in the initializer ReplicatedJob. + // TODO (andreyvelich): Once JobSet supports execution policy, we can remove it. + ContainerBusyBox corev1.Container = corev1.Container{ + Name: "busybox", + Image: "busybox:stable-glibc", + } + + // VolumeMountModelInitializer is the volume mount for the model initializer container. + // TODO (andreyvelich): Add validation to check that initializer ReplicatedJob has the following volumes. + VolumeMountModelInitializer = corev1.VolumeMount{ + Name: VolumeNameInitializer, + MountPath: "/workspace/model", + } + + // VolumeMountModelInitializer is the volume mount for the dataset initializer container. + VolumeMountDatasetInitializer = corev1.VolumeMount{ + Name: VolumeNameInitializer, + MountPath: "/workspace/dataset", + } + + // VolumeInitializer is the volume for the initializer ReplicatedJob. + // TODO (andreyvelich): We should make VolumeSource configurable. + VolumeInitializer = corev1.Volume{ + Name: VolumeNameInitializer, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: VolumeNameInitializer, + }, + }, + } +) diff --git a/pkg/runtime.v2/framework/plugins/jobset/jobset.go b/pkg/runtime.v2/framework/plugins/jobset/jobset.go index 5a6400e46d..ef04890b39 100644 --- a/pkg/runtime.v2/framework/plugins/jobset/jobset.go +++ b/pkg/runtime.v2/framework/plugins/jobset/jobset.go @@ -98,10 +98,10 @@ func (j *JobSet) Build(ctx context.Context, runtimeJobTemplate client.Object, in } } - // TODO (andreyvelich): add support for model and dataset initializers. // TODO (andreyvelich): Add support for the PodSpecOverride. // TODO (andreyvelich): Refactor the builder with wrappers for PodSpec. jobSet := jobSetBuilder. + Initializer(trainJob). Trainer(info, trainJob). PodLabels(info.PodLabels). Suspend(trainJob.Spec.Suspend). diff --git a/pkg/util.v2/testing/wrapper.go b/pkg/util.v2/testing/wrapper.go index 818d746403..4f19ba3d2c 100644 --- a/pkg/util.v2/testing/wrapper.go +++ b/pkg/util.v2/testing/wrapper.go @@ -28,6 +28,7 @@ import ( kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" "github.com/kubeflow/training-operator/pkg/constants" + jobsetplugin "github.com/kubeflow/training-operator/pkg/runtime.v2/framework/plugins/jobset" ) type JobSetWrapper struct { @@ -53,11 +54,26 @@ func MakeJobSetWrapper(namespace, name string) *JobSetWrapper { Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ - Containers: []corev1.Container{ + InitContainers: []corev1.Container{ { Name: constants.ContainerDatasetInitializer, + VolumeMounts: []corev1.VolumeMount{ + jobsetplugin.VolumeMountDatasetInitializer, + }, + }, + { + Name: constants.ContainerModelInitializer, + VolumeMounts: []corev1.VolumeMount{ + jobsetplugin.VolumeMountModelInitializer, + }, }, }, + Containers: []corev1.Container{ + jobsetplugin.ContainerBusyBox, + }, + Volumes: []corev1.Volume{ + jobsetplugin.VolumeInitializer, + }, }, }, }, @@ -72,8 +88,15 @@ func MakeJobSetWrapper(namespace, name string) *JobSetWrapper { Containers: []corev1.Container{ { Name: constants.ContainerTrainer, + VolumeMounts: []corev1.VolumeMount{ + jobsetplugin.VolumeMountDatasetInitializer, + jobsetplugin.VolumeMountModelInitializer, + }, }, }, + Volumes: []corev1.Volume{ + jobsetplugin.VolumeInitializer, + }, }, }, }, @@ -144,15 +167,71 @@ func (j *JobSetWrapper) ContainerTrainerEnv(env []corev1.EnvVar) *JobSetWrapper return j } -func (j *JobSetWrapper) ContainerDatasetModelInitializer(image string, command []string, args []string, res corev1.ResourceList) *JobSetWrapper { +func (j *JobSetWrapper) InitContainerDatasetModelInitializer(image string, command []string, args []string, res corev1.ResourceList) *JobSetWrapper { for i, rJob := range j.Spec.ReplicatedJobs { if rJob.Name == constants.JobInitializer { - for k, container := range rJob.Template.Spec.Template.Spec.Containers { + for k, container := range rJob.Template.Spec.Template.Spec.InitContainers { if container.Name == constants.ContainerDatasetInitializer || container.Name == constants.ContainerModelInitializer { - j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Image = image - j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Command = command - j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Args = args - j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Resources.Requests = res + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Image = image + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Command = command + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Args = args + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Resources.Requests = res + } + } + } + } + return j +} + +func (j *JobSetWrapper) InitContainerDatasetInitializerEnv(env []corev1.EnvVar) *JobSetWrapper { + for i, rJob := range j.Spec.ReplicatedJobs { + if rJob.Name == constants.JobInitializer { + for k, container := range rJob.Template.Spec.Template.Spec.InitContainers { + if container.Name == constants.ContainerDatasetInitializer { + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Env = env + + } + } + } + } + return j +} + +func (j *JobSetWrapper) InitContainerDatasetInitializerEnvFrom(envFrom []corev1.EnvFromSource) *JobSetWrapper { + for i, rJob := range j.Spec.ReplicatedJobs { + if rJob.Name == constants.JobInitializer { + for k, container := range rJob.Template.Spec.Template.Spec.InitContainers { + if container.Name == constants.ContainerDatasetInitializer { + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].EnvFrom = envFrom + + } + } + } + } + return j +} + +func (j *JobSetWrapper) InitContainerModelInitializerEnv(env []corev1.EnvVar) *JobSetWrapper { + for i, rJob := range j.Spec.ReplicatedJobs { + if rJob.Name == constants.JobInitializer { + for k, container := range rJob.Template.Spec.Template.Spec.InitContainers { + if container.Name == constants.ContainerModelInitializer { + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Env = env + + } + } + } + } + return j +} + +func (j *JobSetWrapper) InitContainerModelInitializerEnvFrom(envFrom []corev1.EnvFromSource) *JobSetWrapper { + for i, rJob := range j.Spec.ReplicatedJobs { + if rJob.Name == constants.JobInitializer { + for k, container := range rJob.Template.Spec.Template.Spec.InitContainers { + if container.Name == constants.ContainerModelInitializer { + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].EnvFrom = envFrom + } } } @@ -227,31 +306,6 @@ func MakeTrainJobWrapper(namespace, name string) *TrainJobWrapper { } } -func (t *TrainJobTrainerWrapper) NumNodes(numNodes int32) *TrainJobTrainerWrapper { - t.Trainer.NumNodes = &numNodes - return t -} - -func (t *TrainJobTrainerWrapper) NumProcPerNode(numProcPerNode string) *TrainJobTrainerWrapper { - t.Trainer.NumProcPerNode = &numProcPerNode - return t -} - -func (t *TrainJobTrainerWrapper) ContainerTrainer(image string, command []string, args []string, resRequests corev1.ResourceList) *TrainJobTrainerWrapper { - t.Trainer.Image = &image - t.Trainer.Command = command - t.Trainer.Args = args - t.Trainer.ResourcesPerNode = &corev1.ResourceRequirements{ - Requests: resRequests, - } - return t -} - -func (t *TrainJobTrainerWrapper) ContainerTrainerEnv(env []corev1.EnvVar) *TrainJobTrainerWrapper { - t.Trainer.Env = env - return t -} - func (t *TrainJobWrapper) Suspend(suspend bool) *TrainJobWrapper { t.Spec.Suspend = &suspend return t @@ -297,6 +351,16 @@ func (t *TrainJobWrapper) Trainer(trainer *kubeflowv2.Trainer) *TrainJobWrapper return t } +func (t *TrainJobWrapper) DatasetConfig(datasetConfig *kubeflowv2.DatasetConfig) *TrainJobWrapper { + t.Spec.DatasetConfig = datasetConfig + return t +} + +func (t *TrainJobWrapper) ModelConfig(modelConfig *kubeflowv2.ModelConfig) *TrainJobWrapper { + t.Spec.ModelConfig = modelConfig + return t +} + func (t *TrainJobWrapper) ManagedBy(m string) *TrainJobWrapper { t.Spec.ManagedBy = &m return t @@ -316,10 +380,96 @@ func MakeTrainJobTrainerWrapper() *TrainJobTrainerWrapper { } } +func (t *TrainJobTrainerWrapper) NumNodes(numNodes int32) *TrainJobTrainerWrapper { + t.Trainer.NumNodes = &numNodes + return t +} + +func (t *TrainJobTrainerWrapper) NumProcPerNode(numProcPerNode string) *TrainJobTrainerWrapper { + t.Trainer.NumProcPerNode = &numProcPerNode + return t +} + +func (t *TrainJobTrainerWrapper) Container(image string, command []string, args []string, resRequests corev1.ResourceList) *TrainJobTrainerWrapper { + t.Trainer.Image = &image + t.Trainer.Command = command + t.Trainer.Args = args + t.Trainer.ResourcesPerNode = &corev1.ResourceRequirements{ + Requests: resRequests, + } + return t +} + +func (t *TrainJobTrainerWrapper) ContainerEnv(env []corev1.EnvVar) *TrainJobTrainerWrapper { + t.Trainer.Env = env + return t +} + func (t *TrainJobTrainerWrapper) Obj() *kubeflowv2.Trainer { return &t.Trainer } +type TrainJobDatasetConfigWrapper struct { + kubeflowv2.DatasetConfig +} + +func MakeTrainJobDatasetConfigWrapper() *TrainJobDatasetConfigWrapper { + return &TrainJobDatasetConfigWrapper{ + DatasetConfig: kubeflowv2.DatasetConfig{}, + } +} + +func (t *TrainJobDatasetConfigWrapper) StorageUri(storageUri string) *TrainJobDatasetConfigWrapper { + t.DatasetConfig.StorageUri = &storageUri + return t +} + +func (t *TrainJobDatasetConfigWrapper) ContainerEnv(env []corev1.EnvVar) *TrainJobDatasetConfigWrapper { + t.DatasetConfig.Env = env + return t +} + +func (t *TrainJobDatasetConfigWrapper) SecretRef(secretRef corev1.LocalObjectReference) *TrainJobDatasetConfigWrapper { + t.DatasetConfig.SecretRef = &secretRef + return t +} + +func (t *TrainJobDatasetConfigWrapper) Obj() *kubeflowv2.DatasetConfig { + return &t.DatasetConfig +} + +type TrainJobModelConfigWrapper struct { + kubeflowv2.ModelConfig +} + +func MakeTrainJobModelConfigWrapper() *TrainJobModelConfigWrapper { + return &TrainJobModelConfigWrapper{ + ModelConfig: kubeflowv2.ModelConfig{ + // TODO (andreyvelich): Add support for output model when implemented. + Input: &kubeflowv2.InputModel{}, + }, + } +} + +func (t *TrainJobModelConfigWrapper) StorageUri(storageUri string) *TrainJobModelConfigWrapper { + t.ModelConfig.Input.StorageUri = &storageUri + return t +} + +func (t *TrainJobModelConfigWrapper) ContainerEnv(env []corev1.EnvVar) *TrainJobModelConfigWrapper { + t.ModelConfig.Input.Env = env + return t +} + +func (t *TrainJobModelConfigWrapper) SecretRef(secretRef corev1.LocalObjectReference) *TrainJobModelConfigWrapper { + t.ModelConfig.Input.SecretRef = &secretRef + return t +} + +func (t *TrainJobModelConfigWrapper) Obj() *kubeflowv2.ModelConfig { + return &t.ModelConfig +} + type TrainingRuntimeWrapper struct { kubeflowv2.TrainingRuntime } @@ -345,9 +495,26 @@ func MakeTrainingRuntimeWrapper(namespace, name string) *TrainingRuntimeWrapper Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: constants.ContainerDatasetInitializer, - }}, + InitContainers: []corev1.Container{ + { + Name: constants.ContainerDatasetInitializer, + VolumeMounts: []corev1.VolumeMount{ + jobsetplugin.VolumeMountDatasetInitializer, + }, + }, + { + Name: constants.ContainerModelInitializer, + VolumeMounts: []corev1.VolumeMount{ + jobsetplugin.VolumeMountModelInitializer, + }, + }, + }, + Containers: []corev1.Container{ + jobsetplugin.ContainerBusyBox, + }, + Volumes: []corev1.Volume{ + jobsetplugin.VolumeInitializer, + }, }, }, }, @@ -359,9 +526,18 @@ func MakeTrainingRuntimeWrapper(namespace, name string) *TrainingRuntimeWrapper Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: constants.ContainerTrainer, - }}, + Containers: []corev1.Container{ + { + Name: constants.ContainerTrainer, + VolumeMounts: []corev1.VolumeMount{ + jobsetplugin.VolumeMountDatasetInitializer, + jobsetplugin.VolumeMountModelInitializer, + }, + }, + }, + Volumes: []corev1.Volume{ + jobsetplugin.VolumeInitializer, + }, }, }, }, @@ -424,9 +600,26 @@ func MakeClusterTrainingRuntimeWrapper(name string) *ClusterTrainingRuntimeWrapp Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: constants.ContainerDatasetInitializer, - }}, + InitContainers: []corev1.Container{ + { + Name: constants.ContainerDatasetInitializer, + VolumeMounts: []corev1.VolumeMount{ + jobsetplugin.VolumeMountDatasetInitializer, + }, + }, + { + Name: constants.ContainerModelInitializer, + VolumeMounts: []corev1.VolumeMount{ + jobsetplugin.VolumeMountModelInitializer, + }, + }, + }, + Containers: []corev1.Container{ + jobsetplugin.ContainerBusyBox, + }, + Volumes: []corev1.Volume{ + jobsetplugin.VolumeInitializer, + }, }, }, }, @@ -438,9 +631,18 @@ func MakeClusterTrainingRuntimeWrapper(name string) *ClusterTrainingRuntimeWrapp Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: constants.ContainerTrainer, - }}, + Containers: []corev1.Container{ + { + Name: constants.ContainerTrainer, + VolumeMounts: []corev1.VolumeMount{ + jobsetplugin.VolumeMountDatasetInitializer, + jobsetplugin.VolumeMountModelInitializer, + }, + }, + }, + Volumes: []corev1.Volume{ + jobsetplugin.VolumeInitializer, + }, }, }, }, @@ -521,15 +723,15 @@ func (s *TrainingRuntimeSpecWrapper) ContainerTrainerEnv(env []corev1.EnvVar) *T return s } -func (s *TrainingRuntimeSpecWrapper) ContainerDatasetModelInitializer(image string, command []string, args []string, res corev1.ResourceList) *TrainingRuntimeSpecWrapper { +func (s *TrainingRuntimeSpecWrapper) InitContainerDatasetModelInitializer(image string, command []string, args []string, res corev1.ResourceList) *TrainingRuntimeSpecWrapper { for i, rJob := range s.Template.Spec.ReplicatedJobs { if rJob.Name == constants.JobInitializer { - for j, container := range rJob.Template.Spec.Template.Spec.Containers { + for j, container := range rJob.Template.Spec.Template.Spec.InitContainers { if container.Name == constants.ContainerDatasetInitializer || container.Name == constants.ContainerModelInitializer { - s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Image = image - s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Command = command - s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Args = args - s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Resources.Requests = res + s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Image = image + s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Command = command + s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Args = args + s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Resources.Requests = res } } } diff --git a/sdk_v2/docs/KubeflowOrgV2alpha1DatasetConfig.md b/sdk_v2/docs/KubeflowOrgV2alpha1DatasetConfig.md index 39edeefdab..342306b212 100644 --- a/sdk_v2/docs/KubeflowOrgV2alpha1DatasetConfig.md +++ b/sdk_v2/docs/KubeflowOrgV2alpha1DatasetConfig.md @@ -5,7 +5,7 @@ DatasetConfig represents the desired dataset configuration. When this API is use Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **env** | [**list[V1EnvVar]**](V1EnvVar.md) | List of environment variables to set in the dataset initializer container. These values will be merged with the TrainingRuntime's dataset initializer environments. | [optional] -**secret_ref** | [**V1SecretReference**](V1SecretReference.md) | | [optional] +**secret_ref** | [**V1LocalObjectReference**](V1LocalObjectReference.md) | | [optional] **storage_uri** | **str** | Storage uri for the dataset provider. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk_v2/docs/KubeflowOrgV2alpha1InputModel.md b/sdk_v2/docs/KubeflowOrgV2alpha1InputModel.md index ae22fddd4f..385295c819 100644 --- a/sdk_v2/docs/KubeflowOrgV2alpha1InputModel.md +++ b/sdk_v2/docs/KubeflowOrgV2alpha1InputModel.md @@ -5,7 +5,7 @@ InputModel represents the desired pre-trained model configuration. Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **env** | [**list[V1EnvVar]**](V1EnvVar.md) | List of environment variables to set in the model initializer container. These values will be merged with the TrainingRuntime's model initializer environments. | [optional] -**secret_ref** | [**V1SecretReference**](V1SecretReference.md) | | [optional] +**secret_ref** | [**V1LocalObjectReference**](V1LocalObjectReference.md) | | [optional] **storage_uri** | **str** | Storage uri for the model provider. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk_v2/docs/KubeflowOrgV2alpha1OutputModel.md b/sdk_v2/docs/KubeflowOrgV2alpha1OutputModel.md index 10563c4697..754c62ea76 100644 --- a/sdk_v2/docs/KubeflowOrgV2alpha1OutputModel.md +++ b/sdk_v2/docs/KubeflowOrgV2alpha1OutputModel.md @@ -5,7 +5,7 @@ OutputModel represents the desired trained model configuration. Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **env** | [**list[V1EnvVar]**](V1EnvVar.md) | List of environment variables to set in the model exporter container. These values will be merged with the TrainingRuntime's model exporter environments. | [optional] -**secret_ref** | [**V1SecretReference**](V1SecretReference.md) | | [optional] +**secret_ref** | [**V1LocalObjectReference**](V1LocalObjectReference.md) | | [optional] **storage_uri** | **str** | Storage uri for the model exporter. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_dataset_config.py b/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_dataset_config.py index 406bcebca6..d706b36fa0 100644 --- a/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_dataset_config.py +++ b/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_dataset_config.py @@ -34,7 +34,7 @@ class KubeflowOrgV2alpha1DatasetConfig(object): """ openapi_types = { 'env': 'list[V1EnvVar]', - 'secret_ref': 'V1SecretReference', + 'secret_ref': 'V1LocalObjectReference', 'storage_uri': 'str' } @@ -91,7 +91,7 @@ def secret_ref(self): :return: The secret_ref of this KubeflowOrgV2alpha1DatasetConfig. # noqa: E501 - :rtype: V1SecretReference + :rtype: V1LocalObjectReference """ return self._secret_ref @@ -101,7 +101,7 @@ def secret_ref(self, secret_ref): :param secret_ref: The secret_ref of this KubeflowOrgV2alpha1DatasetConfig. # noqa: E501 - :type: V1SecretReference + :type: V1LocalObjectReference """ self._secret_ref = secret_ref diff --git a/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_input_model.py b/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_input_model.py index 3a06d04596..92038070ef 100644 --- a/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_input_model.py +++ b/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_input_model.py @@ -34,7 +34,7 @@ class KubeflowOrgV2alpha1InputModel(object): """ openapi_types = { 'env': 'list[V1EnvVar]', - 'secret_ref': 'V1SecretReference', + 'secret_ref': 'V1LocalObjectReference', 'storage_uri': 'str' } @@ -91,7 +91,7 @@ def secret_ref(self): :return: The secret_ref of this KubeflowOrgV2alpha1InputModel. # noqa: E501 - :rtype: V1SecretReference + :rtype: V1LocalObjectReference """ return self._secret_ref @@ -101,7 +101,7 @@ def secret_ref(self, secret_ref): :param secret_ref: The secret_ref of this KubeflowOrgV2alpha1InputModel. # noqa: E501 - :type: V1SecretReference + :type: V1LocalObjectReference """ self._secret_ref = secret_ref diff --git a/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_output_model.py b/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_output_model.py index 501fcb39ab..22447d8559 100644 --- a/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_output_model.py +++ b/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_output_model.py @@ -34,7 +34,7 @@ class KubeflowOrgV2alpha1OutputModel(object): """ openapi_types = { 'env': 'list[V1EnvVar]', - 'secret_ref': 'V1SecretReference', + 'secret_ref': 'V1LocalObjectReference', 'storage_uri': 'str' } @@ -91,7 +91,7 @@ def secret_ref(self): :return: The secret_ref of this KubeflowOrgV2alpha1OutputModel. # noqa: E501 - :rtype: V1SecretReference + :rtype: V1LocalObjectReference """ return self._secret_ref @@ -101,7 +101,7 @@ def secret_ref(self, secret_ref): :param secret_ref: The secret_ref of this KubeflowOrgV2alpha1OutputModel. # noqa: E501 - :type: V1SecretReference + :type: V1LocalObjectReference """ self._secret_ref = secret_ref diff --git a/test/integration/controller.v2/trainjob_controller_test.go b/test/integration/controller.v2/trainjob_controller_test.go index 663658e817..39ce245227 100644 --- a/test/integration/controller.v2/trainjob_controller_test.go +++ b/test/integration/controller.v2/trainjob_controller_test.go @@ -32,6 +32,7 @@ import ( kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" "github.com/kubeflow/training-operator/pkg/constants" + jobsetplugin "github.com/kubeflow/training-operator/pkg/runtime.v2/framework/plugins/jobset" testingutil "github.com/kubeflow/training-operator/pkg/util.v2/testing" "github.com/kubeflow/training-operator/test/integration/framework" "github.com/kubeflow/training-operator/test/util" @@ -86,8 +87,18 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { SpecAnnotation("testingKey", "testingVal"). Trainer( testingutil.MakeTrainJobTrainerWrapper(). - ContainerTrainer("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests). + Container("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests). Obj()). + DatasetConfig( + testingutil.MakeTrainJobDatasetConfigWrapper(). + StorageUri("hf://trainjob-dataset"). + Obj(), + ). + ModelConfig( + testingutil.MakeTrainJobModelConfigWrapper(). + StorageUri("hf://trainjob-model"). + Obj(), + ). Obj() trainJobKey = client.ObjectKeyFromObject(trainJob) @@ -96,12 +107,13 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "alpha").Spec). NumNodes(100). ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). PodGroupPolicyCoscheduling(&kubeflowv2.CoschedulingPodGroupPolicySource{ScheduleTimeoutSeconds: ptr.To[int32](100)}). Obj()). Obj() }) + // Integration tests for the PlainML Runtime. ginkgo.It("Should succeed to create TrainJob with TrainingRuntime", func() { ginkgo.By("Creating TrainingRuntime and TrainJob") gomega.Expect(k8sClient.Create(ctx, trainingRuntime)).Should(gomega.Succeed()) @@ -116,7 +128,9 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { Replicas(1). NumNodes(100). ContainerTrainer("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-dataset"}}). + InitContainerModelInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-model"}}). Suspend(true). Label("testingKey", "testingVal"). Annotation("testingKey", "testingVal"). @@ -169,7 +183,9 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { Replicas(1). NumNodes(100). ContainerTrainer(updatedImageName, []string{"trainjob"}, []string{"trainjob"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-dataset"}}). + InitContainerModelInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-model"}}). Suspend(true). Label("testingKey", "testingVal"). Annotation("testingKey", "testingVal"). @@ -244,14 +260,15 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) + // Integration tests for the Torch Runtime. ginkgo.It("Should succeed to create TrainJob with Torch TrainingRuntime", func() { ginkgo.By("Creating Torch TrainingRuntime and TrainJob") trainJob = testingutil.MakeTrainJobWrapper(ns.Name, "alpha"). RuntimeRef(kubeflowv2.GroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "alpha"). Trainer( testingutil.MakeTrainJobTrainerWrapper(). - ContainerTrainer("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests). - ContainerTrainerEnv([]corev1.EnvVar{{Name: "TRAIN_JOB", Value: "value"}}). + Container("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests). + ContainerEnv([]corev1.EnvVar{{Name: "TRAIN_JOB", Value: "value"}}). Obj()). Obj() trainJobKey = client.ObjectKeyFromObject(trainJob)