From 8e670efc1e359bf425e09daf6d79934471a60458 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Fri, 1 Nov 2024 00:08:43 +0000 Subject: [PATCH 1/8] KEP-2170: Implement Initializer builder in the JobSet plugin Signed-off-by: Andrey Velichkevich --- .../v2alpha1/openapi_generated.go | 18 +- .../kubeflow.org/v2alpha1/trainjob_types.go | 15 +- .../v2alpha1/zz_generated.deepcopy.go | 6 +- pkg/constants/constants.go | 40 +++ .../core/clustertrainingruntime_test.go | 6 +- pkg/runtime.v2/core/trainingruntime_test.go | 119 ++++++- .../framework/core/framework_test.go | 2 +- .../framework/plugins/jobset/builder.go | 90 +++++- .../framework/plugins/jobset/jobset.go | 2 +- pkg/util.v2/testing/wrapper.go | 301 +++++++++++++++--- .../controller.v2/trainjob_controller_test.go | 28 +- 11 files changed, 538 insertions(+), 89 deletions(-) diff --git a/pkg/apis/kubeflow.org/v2alpha1/openapi_generated.go b/pkg/apis/kubeflow.org/v2alpha1/openapi_generated.go index 24f7b3d19f..5394285cda 100644 --- a/pkg/apis/kubeflow.org/v2alpha1/openapi_generated.go +++ b/pkg/apis/kubeflow.org/v2alpha1/openapi_generated.go @@ -299,15 +299,15 @@ func schema_pkg_apis_kubefloworg_v2alpha1_DatasetConfig(ref common.ReferenceCall }, "secretRef": { SchemaProps: spec.SchemaProps{ - Description: "Reference to the TrainJob's secrets to download dataset.", - Ref: ref("k8s.io/api/core/v1.SecretReference"), + Description: "Reference to the secret with credentials to download dataset. Secret must be created in the TrainJob's namespace.", + Ref: ref("k8s.io/api/core/v1.LocalObjectReference"), }, }, }, }, }, Dependencies: []string{ - "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.SecretReference"}, + "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.LocalObjectReference"}, } } @@ -341,15 +341,15 @@ func schema_pkg_apis_kubefloworg_v2alpha1_InputModel(ref common.ReferenceCallbac }, "secretRef": { SchemaProps: spec.SchemaProps{ - Description: "Reference to the TrainJob's secrets to download model.", - Ref: ref("k8s.io/api/core/v1.SecretReference"), + Description: "Reference to the secret with credentials to download model. Secret must be created in the TrainJob's namespace.", + Ref: ref("k8s.io/api/core/v1.LocalObjectReference"), }, }, }, }, }, Dependencies: []string{ - "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.SecretReference"}, + "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.LocalObjectReference"}, } } @@ -602,15 +602,15 @@ func schema_pkg_apis_kubefloworg_v2alpha1_OutputModel(ref common.ReferenceCallba }, "secretRef": { SchemaProps: spec.SchemaProps{ - Description: "Reference to the TrainJob's secrets to export model.", - Ref: ref("k8s.io/api/core/v1.SecretReference"), + Description: "Reference to the secret with credentials to export model. Secret must be created in the TrainJob's namespace.", + Ref: ref("k8s.io/api/core/v1.LocalObjectReference"), }, }, }, }, }, Dependencies: []string{ - "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.SecretReference"}, + "k8s.io/api/core/v1.EnvVar", "k8s.io/api/core/v1.LocalObjectReference"}, } } diff --git a/pkg/apis/kubeflow.org/v2alpha1/trainjob_types.go b/pkg/apis/kubeflow.org/v2alpha1/trainjob_types.go index 38f740a9c2..55a813350e 100644 --- a/pkg/apis/kubeflow.org/v2alpha1/trainjob_types.go +++ b/pkg/apis/kubeflow.org/v2alpha1/trainjob_types.go @@ -167,8 +167,9 @@ type DatasetConfig struct { // These values will be merged with the TrainingRuntime's dataset initializer environments. Env []corev1.EnvVar `json:"env,omitempty"` - // Reference to the TrainJob's secrets to download dataset. - SecretRef *corev1.SecretReference `json:"secretRef,omitempty"` + // Reference to the secret with credentials to download dataset. + // Secret must be created in the TrainJob's namespace. + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` } // ModelConfig represents the desired model configuration. @@ -193,8 +194,9 @@ type InputModel struct { // These values will be merged with the TrainingRuntime's model initializer environments. Env []corev1.EnvVar `json:"env,omitempty"` - // Reference to the TrainJob's secrets to download model. - SecretRef *corev1.SecretReference `json:"secretRef,omitempty"` + // Reference to the secret with credentials to download model. + // Secret must be created in the TrainJob's namespace. + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` } // OutputModel represents the desired trained model configuration. @@ -206,8 +208,9 @@ type OutputModel struct { // These values will be merged with the TrainingRuntime's model exporter environments. Env []corev1.EnvVar `json:"env,omitempty"` - // Reference to the TrainJob's secrets to export model. - SecretRef *corev1.SecretReference `json:"secretRef,omitempty"` + // Reference to the secret with credentials to export model. + // Secret must be created in the TrainJob's namespace. + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` } // PodSpecOverride represents the custom overrides that will be applied for the TrainJob's resources. diff --git a/pkg/apis/kubeflow.org/v2alpha1/zz_generated.deepcopy.go b/pkg/apis/kubeflow.org/v2alpha1/zz_generated.deepcopy.go index e643a85bdb..f482715d9a 100644 --- a/pkg/apis/kubeflow.org/v2alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow.org/v2alpha1/zz_generated.deepcopy.go @@ -166,7 +166,7 @@ func (in *DatasetConfig) DeepCopyInto(out *DatasetConfig) { } if in.SecretRef != nil { in, out := &in.SecretRef, &out.SecretRef - *out = new(v1.SecretReference) + *out = new(v1.LocalObjectReference) **out = **in } } @@ -198,7 +198,7 @@ func (in *InputModel) DeepCopyInto(out *InputModel) { } if in.SecretRef != nil { in, out := &in.SecretRef, &out.SecretRef - *out = new(v1.SecretReference) + *out = new(v1.LocalObjectReference) **out = **in } } @@ -368,7 +368,7 @@ func (in *OutputModel) DeepCopyInto(out *OutputModel) { } if in.SecretRef != nil { in, out := &in.SecretRef, &out.SecretRef - *out = new(v1.SecretReference) + *out = new(v1.LocalObjectReference) **out = **in } } diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 29288d2d0e..bfbfd04a3a 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -4,6 +4,7 @@ import ( "fmt" batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" ) const ( @@ -26,12 +27,19 @@ const ( // JobInitializer is the Job name for the initializer. JobInitializer string = "initializer" + // VolumeNameInitializer is the name for the initializer Pod's Volume and VolumeMount. + // TODO (andreyvelich): Add validation to check that initializer Pod has the correct volume. + VolumeNameInitializer string = "initializer" + // ContainerModelInitializer is the container name for the model initializer. ContainerModelInitializer string = "model-initializer" // ContainerDatasetInitializer is the container name for the dataset initializer. ContainerDatasetInitializer string = "dataset-initializer" + // InitializerEnvStorageUri is the env name for the initializer storage uri. + InitializerEnvStorageUri string = "STORAGE_URI" + // PodGroupKind is the Kind name for the PodGroup. PodGroupKind string = "PodGroup" @@ -56,4 +64,36 @@ const ( var ( // JobCompletionIndexFieldPath is the field path for the Job completion index annotation. JobCompletionIndexFieldPath string = fmt.Sprintf("metadata.annotations['%s']", batchv1.JobCompletionIndexAnnotation) + + // This is temp container that we use in the initializer ReplicatedJob. + // TODO (andreyvelich): Once JobSet supports execution policy, we can remove it. + // TODO (andreyvelich): Add validation to check that initializer ReplicatedJob has this container. + ContainerBusyBox corev1.Container = corev1.Container{ + Name: "busybox", + Image: "busybox", + } + + // VolumeMountModelInitializer is the volume mount for the model initializer container. + // TODO (andreyvelich): Add validation to check that initializer ReplicatedJob has the following volumes. + VolumeMountModelInitializer = corev1.VolumeMount{ + Name: VolumeNameInitializer, + MountPath: "/workspace/model", + } + + // VolumeMountModelInitializer is the volume mount for the dataset initializer container. + VolumeMountDatasetInitializer = corev1.VolumeMount{ + Name: VolumeNameInitializer, + MountPath: "/workspace/dataset", + } + + // VolumeInitializer is the volume for the initializer ReplicatedJob. + // TODO (andreyvelich): We should make VolumeSource configurable. + VolumeInitializer = corev1.Volume{ + Name: VolumeNameInitializer, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: VolumeNameInitializer, + }, + }, + } ) diff --git a/pkg/runtime.v2/core/clustertrainingruntime_test.go b/pkg/runtime.v2/core/clustertrainingruntime_test.go index 40062c2d45..050d86a23a 100644 --- a/pkg/runtime.v2/core/clustertrainingruntime_test.go +++ b/pkg/runtime.v2/core/clustertrainingruntime_test.go @@ -47,9 +47,9 @@ func TestClusterTrainingRuntimeNewObjects(t *testing.T) { "succeeded to build PodGroup and JobSet with NumNodes from the Runtime and container from the Trainer.": { clusterTrainingRuntime: testingutil.MakeClusterTrainingRuntimeWrapper("test-runtime").RuntimeSpec( testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeClusterTrainingRuntimeWrapper("test-runtime").Spec). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). NumNodes(100). ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). PodGroupPolicyCoschedulingSchedulingTimeout(120). Obj(), ).Obj(), @@ -59,15 +59,15 @@ func TestClusterTrainingRuntimeNewObjects(t *testing.T) { RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.ClusterTrainingRuntimeKind), "test-runtime"). Trainer( testingutil.MakeTrainJobTrainerWrapper(). - ContainerTrainer("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). + Container("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). Obj(), ). Obj(), wantObjs: []client.Object{ testingutil.MakeJobSetWrapper(metav1.NamespaceDefault, "test-job"). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). NumNodes(100). ContainerTrainer("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). Suspend(true). PodLabel(schedulerpluginsv1alpha1.PodGroupLabel, "test-job"). ControllerReference(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainJobKind), "test-job", "uid"). diff --git a/pkg/runtime.v2/core/trainingruntime_test.go b/pkg/runtime.v2/core/trainingruntime_test.go index b1cb5bf5e2..31f942818d 100644 --- a/pkg/runtime.v2/core/trainingruntime_test.go +++ b/pkg/runtime.v2/core/trainingruntime_test.go @@ -47,15 +47,15 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { wantError error }{ // Test cases for the PlainML MLPolicy. - "succeeded to build PodGroup and JobSet with NumNodes from the TrainJob and container from Runtime.": { + "succeeded to build PodGroup and JobSet with NumNodes from the TrainJob and container from the Runtime.": { trainingRuntime: testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime"). Label("conflictLabel", "overridden"). Annotation("conflictAnnotation", "overridden"). RuntimeSpec( testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").Spec). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). NumNodes(100). ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). PodGroupPolicyCoschedulingSchedulingTimeout(120). Obj(), ).Obj(), @@ -73,9 +73,9 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { Obj(), wantObjs: []client.Object{ testingutil.MakeJobSetWrapper(metav1.NamespaceDefault, "test-job"). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). NumNodes(30). ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). Suspend(true). Label("conflictLabel", "override"). Annotation("conflictAnnotation", "override"). @@ -86,8 +86,10 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { ControllerReference(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainJobKind), "test-job", "uid"). MinMember(31). // 31 replicas = 30 Trainer nodes + 1 Initializer. MinResources(corev1.ResourceList{ - // TODO (andreyvelich): Create helper function to calculate PodGroup resources in the unit tests. - corev1.ResourceCPU: resource.MustParse("31"), // Every replica has 1 CPU = 31 CPUs in total. + // Every replica has 1 CPU = 31 CPUs in total. + // Since initializers use init containers, they execute sequentially. + // MinResources is equal to the maximum from the initContainer resources. + corev1.ResourceCPU: resource.MustParse("31"), }). SchedulingTimeout(120). Obj(), @@ -117,8 +119,8 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "test-runtime"). Trainer( testingutil.MakeTrainJobTrainerWrapper(). - ContainerTrainer("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). - ContainerTrainerEnv( + Container("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). + ContainerEnv( []corev1.EnvVar{ { Name: "TRAIN_JOB", @@ -157,6 +159,105 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { Obj(), }, }, + "succeeded to build JobSet with dataset and model initializer from the TrainJob.": { + trainingRuntime: testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").RuntimeSpec( + testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").Spec). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + NumNodes(100). + ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + Obj(), + ).Obj(), + trainJob: testingutil.MakeTrainJobWrapper(metav1.NamespaceDefault, "test-job"). + UID("uid"). + RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "test-runtime"). + Trainer( + testingutil.MakeTrainJobTrainerWrapper(). + Obj(), + ). + DatasetConfig( + testingutil.MakeTrainJobDatasetConfigWrapper(). + StorageUri("hf://trainjob-dataset"). + ContainerEnv( + []corev1.EnvVar{ + { + Name: "TRAIN_JOB", + Value: "test:trainjob:dataset", + }, + }, + ). + SecretRef(corev1.LocalObjectReference{Name: "trainjob-secret-dataset"}). + Obj(), + ). + ModelConfig( + testingutil.MakeTrainJobModelConfigWrapper(). + StorageUri("hf://trainjob-model"). + ContainerEnv( + []corev1.EnvVar{ + { + Name: "TRAIN_JOB", + Value: "test:trainjob:model", + }, + }, + ). + SecretRef(corev1.LocalObjectReference{Name: "trainjob-secret-model"}). + Obj(), + ). + Obj(), + wantObjs: []client.Object{ + testingutil.MakeJobSetWrapper(metav1.NamespaceDefault, "test-job"). + NumNodes(100). + ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetInitializerEnv( + []corev1.EnvVar{ + { + Name: constants.InitializerEnvStorageUri, + Value: "hf://trainjob-dataset", + }, + { + Name: "TRAIN_JOB", + Value: "test:trainjob:dataset", + }, + }, + ). + InitContainerDatasetInitializerEnvFrom( + []corev1.EnvFromSource{ + { + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "trainjob-secret-dataset", + }, + }, + }, + }, + ). + InitContainerModelInitializerEnv( + []corev1.EnvVar{ + { + Name: constants.InitializerEnvStorageUri, + Value: "hf://trainjob-model", + }, + { + Name: "TRAIN_JOB", + Value: "test:trainjob:model", + }, + }, + ). + InitContainerModelInitializerEnvFrom( + []corev1.EnvFromSource{ + { + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "trainjob-secret-model", + }, + }, + }, + }, + ). + ControllerReference(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainJobKind), "test-job", "uid"). + Obj(), + }, + }, // Test cases for the Torch MLPolicy. "succeeded to build JobSet with Torch values from the TrainJob": { trainingRuntime: testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").RuntimeSpec( @@ -236,8 +337,8 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { RuntimeRef(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "test-runtime"). Trainer( testingutil.MakeTrainJobTrainerWrapper(). - ContainerTrainer("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). - ContainerTrainerEnv( + Container("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). + ContainerEnv( []corev1.EnvVar{ { Name: "TRAIN_JOB", diff --git a/pkg/runtime.v2/framework/core/framework_test.go b/pkg/runtime.v2/framework/core/framework_test.go index c8d0b1ca8f..69255c4016 100644 --- a/pkg/runtime.v2/framework/core/framework_test.go +++ b/pkg/runtime.v2/framework/core/framework_test.go @@ -427,7 +427,7 @@ func TestRunComponentBuilderPlugins(t *testing.T) { Trainer( testingutil.MakeTrainJobTrainerWrapper(). NumNodes(100). - ContainerTrainer("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). + Container("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests). Obj(), ). Obj(), diff --git a/pkg/runtime.v2/framework/plugins/jobset/builder.go b/pkg/runtime.v2/framework/plugins/jobset/builder.go index 6849634ee2..6146930122 100644 --- a/pkg/runtime.v2/framework/plugins/jobset/builder.go +++ b/pkg/runtime.v2/framework/plugins/jobset/builder.go @@ -19,6 +19,7 @@ package jobset import ( "maps" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" @@ -51,6 +52,92 @@ func NewBuilder(objectKey client.ObjectKey, jobSetTemplateSpec kubeflowv2.JobSet } } +// mergeInitializerEnvs merge the TrainJob and Runtime Pod envs. +func mergeInitializerEnvs(storageUri *string, trainJobEnvs, containerEnv []corev1.EnvVar) []corev1.EnvVar { + envNames := sets.New[string]() + envs := []corev1.EnvVar{} + // Add the Storage URI env. + if storageUri != nil { + envNames.Insert(constants.InitializerEnvStorageUri) + envs = append(envs, corev1.EnvVar{ + Name: constants.InitializerEnvStorageUri, + Value: *storageUri, + }) + } + // Add the rest TrainJob envs. + // TODO (andreyvelich): Validate that TrainJob dataset and model envs don't have the STORAGE_URI env. + if trainJobEnvs != nil { + for _, e := range trainJobEnvs { + envNames.Insert(e.Name) + envs = append(envs, e) + } + } + + // TrainJob envs take precedence over the TrainingRuntime envs. + for _, e := range containerEnv { + if !envNames.Has(e.Name) { + envs = append(envs, e) + } + } + return envs +} + +// Initializer updates JobSet values for the initializer Job. +func (b *Builder) Initializer(info *runtime.Info, trainJob *kubeflowv2.TrainJob) *Builder { + for i, rJob := range b.Spec.ReplicatedJobs { + if rJob.Name == constants.JobInitializer { + // TODO (andreyvelich): Currently, we use initContainers for the initializers. + // Once JobSet supports execution policy for the ReplicatedJobs, we should migrate to containers. + // Ref: https://github.com/kubernetes-sigs/jobset/issues/672 + for j, container := range rJob.Template.Spec.Template.Spec.InitContainers { + // Update values for the dataset initializer container. + if container.Name == constants.ContainerDatasetInitializer && trainJob.Spec.DatasetConfig != nil { + // Update the dataset initializer envs. + b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Env = mergeInitializerEnvs( + trainJob.Spec.DatasetConfig.StorageUri, + trainJob.Spec.DatasetConfig.Env, + container.Env, + ) + // Update the dataset initializer secret reference. + if trainJob.Spec.DatasetConfig.SecretRef != nil { + b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].EnvFrom = append( + b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].EnvFrom, + corev1.EnvFromSource{ + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: *trainJob.Spec.DatasetConfig.SecretRef, + }, + }, + ) + } + } + // TODO (andreyvelich): Add support for the model exporter when we support it. + // Update values for the model initializer container. + if container.Name == constants.ContainerModelInitializer && trainJob.Spec.ModelConfig != nil && trainJob.Spec.ModelConfig.Input != nil { + // Update the model initializer envs. + b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Env = mergeInitializerEnvs( + trainJob.Spec.ModelConfig.Input.StorageUri, + trainJob.Spec.ModelConfig.Input.Env, + container.Env, + ) + // Update the model initializer secret reference. + if trainJob.Spec.ModelConfig.Input.SecretRef != nil { + b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].EnvFrom = append( + b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].EnvFrom, + corev1.EnvFromSource{ + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: *trainJob.Spec.ModelConfig.Input.SecretRef, + }, + }, + ) + } + + } + } + } + } + return b +} + // Trainer updates JobSet values for the trainer Job. func (b *Builder) Trainer(info *runtime.Info, trainJob *kubeflowv2.TrainJob) *Builder { for i, rJob := range b.Spec.ReplicatedJobs { @@ -85,7 +172,7 @@ func (b *Builder) Trainer(info *runtime.Info, trainJob *kubeflowv2.TrainJob) *Bu envNames.Insert(env.Name) } trainerEnvs := info.Trainer.Env - // Info envs take precedence over TrainingRuntime envs. + // Info envs take precedence over the TrainingRuntime envs. for _, env := range container.Env { if !envNames.Has(env.Name) { trainerEnvs = append(trainerEnvs, env) @@ -93,6 +180,7 @@ func (b *Builder) Trainer(info *runtime.Info, trainJob *kubeflowv2.TrainJob) *Bu } b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Env = trainerEnvs } + // Update the Trainer container port. if info.Trainer.ContainerPort != nil { b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Ports = append( b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Ports, *info.Trainer.ContainerPort) diff --git a/pkg/runtime.v2/framework/plugins/jobset/jobset.go b/pkg/runtime.v2/framework/plugins/jobset/jobset.go index 5a6400e46d..ff54d757bd 100644 --- a/pkg/runtime.v2/framework/plugins/jobset/jobset.go +++ b/pkg/runtime.v2/framework/plugins/jobset/jobset.go @@ -98,10 +98,10 @@ func (j *JobSet) Build(ctx context.Context, runtimeJobTemplate client.Object, in } } - // TODO (andreyvelich): add support for model and dataset initializers. // TODO (andreyvelich): Add support for the PodSpecOverride. // TODO (andreyvelich): Refactor the builder with wrappers for PodSpec. jobSet := jobSetBuilder. + Initializer(info, trainJob). Trainer(info, trainJob). PodLabels(info.PodLabels). Suspend(trainJob.Spec.Suspend). diff --git a/pkg/util.v2/testing/wrapper.go b/pkg/util.v2/testing/wrapper.go index 818d746403..6b1a3e7349 100644 --- a/pkg/util.v2/testing/wrapper.go +++ b/pkg/util.v2/testing/wrapper.go @@ -53,11 +53,26 @@ func MakeJobSetWrapper(namespace, name string) *JobSetWrapper { Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ - Containers: []corev1.Container{ + InitContainers: []corev1.Container{ { Name: constants.ContainerDatasetInitializer, + VolumeMounts: []corev1.VolumeMount{ + constants.VolumeMountDatasetInitializer, + }, + }, + { + Name: constants.ContainerModelInitializer, + VolumeMounts: []corev1.VolumeMount{ + constants.VolumeMountModelInitializer, + }, }, }, + Containers: []corev1.Container{ + constants.ContainerBusyBox, + }, + Volumes: []corev1.Volume{ + constants.VolumeInitializer, + }, }, }, }, @@ -72,8 +87,15 @@ func MakeJobSetWrapper(namespace, name string) *JobSetWrapper { Containers: []corev1.Container{ { Name: constants.ContainerTrainer, + VolumeMounts: []corev1.VolumeMount{ + constants.VolumeMountDatasetInitializer, + constants.VolumeMountModelInitializer, + }, }, }, + Volumes: []corev1.Volume{ + constants.VolumeInitializer, + }, }, }, }, @@ -144,15 +166,71 @@ func (j *JobSetWrapper) ContainerTrainerEnv(env []corev1.EnvVar) *JobSetWrapper return j } -func (j *JobSetWrapper) ContainerDatasetModelInitializer(image string, command []string, args []string, res corev1.ResourceList) *JobSetWrapper { +func (j *JobSetWrapper) InitContainerDatasetModelInitializer(image string, command []string, args []string, res corev1.ResourceList) *JobSetWrapper { for i, rJob := range j.Spec.ReplicatedJobs { if rJob.Name == constants.JobInitializer { - for k, container := range rJob.Template.Spec.Template.Spec.Containers { + for k, container := range rJob.Template.Spec.Template.Spec.InitContainers { if container.Name == constants.ContainerDatasetInitializer || container.Name == constants.ContainerModelInitializer { - j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Image = image - j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Command = command - j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Args = args - j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Resources.Requests = res + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Image = image + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Command = command + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Args = args + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Resources.Requests = res + } + } + } + } + return j +} + +func (j *JobSetWrapper) InitContainerDatasetInitializerEnv(env []corev1.EnvVar) *JobSetWrapper { + for i, rJob := range j.Spec.ReplicatedJobs { + if rJob.Name == constants.JobInitializer { + for k, container := range rJob.Template.Spec.Template.Spec.InitContainers { + if container.Name == constants.ContainerDatasetInitializer { + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Env = env + + } + } + } + } + return j +} + +func (j *JobSetWrapper) InitContainerDatasetInitializerEnvFrom(envFrom []corev1.EnvFromSource) *JobSetWrapper { + for i, rJob := range j.Spec.ReplicatedJobs { + if rJob.Name == constants.JobInitializer { + for k, container := range rJob.Template.Spec.Template.Spec.InitContainers { + if container.Name == constants.ContainerDatasetInitializer { + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].EnvFrom = envFrom + + } + } + } + } + return j +} + +func (j *JobSetWrapper) InitContainerModelInitializerEnv(env []corev1.EnvVar) *JobSetWrapper { + for i, rJob := range j.Spec.ReplicatedJobs { + if rJob.Name == constants.JobInitializer { + for k, container := range rJob.Template.Spec.Template.Spec.InitContainers { + if container.Name == constants.ContainerModelInitializer { + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Env = env + + } + } + } + } + return j +} + +func (j *JobSetWrapper) InitContainerModelInitializerEnvFrom(envFrom []corev1.EnvFromSource) *JobSetWrapper { + for i, rJob := range j.Spec.ReplicatedJobs { + if rJob.Name == constants.JobInitializer { + for k, container := range rJob.Template.Spec.Template.Spec.InitContainers { + if container.Name == constants.ContainerModelInitializer { + j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].EnvFrom = envFrom + } } } @@ -227,31 +305,6 @@ func MakeTrainJobWrapper(namespace, name string) *TrainJobWrapper { } } -func (t *TrainJobTrainerWrapper) NumNodes(numNodes int32) *TrainJobTrainerWrapper { - t.Trainer.NumNodes = &numNodes - return t -} - -func (t *TrainJobTrainerWrapper) NumProcPerNode(numProcPerNode string) *TrainJobTrainerWrapper { - t.Trainer.NumProcPerNode = &numProcPerNode - return t -} - -func (t *TrainJobTrainerWrapper) ContainerTrainer(image string, command []string, args []string, resRequests corev1.ResourceList) *TrainJobTrainerWrapper { - t.Trainer.Image = &image - t.Trainer.Command = command - t.Trainer.Args = args - t.Trainer.ResourcesPerNode = &corev1.ResourceRequirements{ - Requests: resRequests, - } - return t -} - -func (t *TrainJobTrainerWrapper) ContainerTrainerEnv(env []corev1.EnvVar) *TrainJobTrainerWrapper { - t.Trainer.Env = env - return t -} - func (t *TrainJobWrapper) Suspend(suspend bool) *TrainJobWrapper { t.Spec.Suspend = &suspend return t @@ -297,6 +350,16 @@ func (t *TrainJobWrapper) Trainer(trainer *kubeflowv2.Trainer) *TrainJobWrapper return t } +func (t *TrainJobWrapper) DatasetConfig(datasetConfig *kubeflowv2.DatasetConfig) *TrainJobWrapper { + t.Spec.DatasetConfig = datasetConfig + return t +} + +func (t *TrainJobWrapper) ModelConfig(modelConfig *kubeflowv2.ModelConfig) *TrainJobWrapper { + t.Spec.ModelConfig = modelConfig + return t +} + func (t *TrainJobWrapper) ManagedBy(m string) *TrainJobWrapper { t.Spec.ManagedBy = &m return t @@ -316,10 +379,96 @@ func MakeTrainJobTrainerWrapper() *TrainJobTrainerWrapper { } } +func (t *TrainJobTrainerWrapper) NumNodes(numNodes int32) *TrainJobTrainerWrapper { + t.Trainer.NumNodes = &numNodes + return t +} + +func (t *TrainJobTrainerWrapper) NumProcPerNode(numProcPerNode string) *TrainJobTrainerWrapper { + t.Trainer.NumProcPerNode = &numProcPerNode + return t +} + +func (t *TrainJobTrainerWrapper) Container(image string, command []string, args []string, resRequests corev1.ResourceList) *TrainJobTrainerWrapper { + t.Trainer.Image = &image + t.Trainer.Command = command + t.Trainer.Args = args + t.Trainer.ResourcesPerNode = &corev1.ResourceRequirements{ + Requests: resRequests, + } + return t +} + +func (t *TrainJobTrainerWrapper) ContainerEnv(env []corev1.EnvVar) *TrainJobTrainerWrapper { + t.Trainer.Env = env + return t +} + func (t *TrainJobTrainerWrapper) Obj() *kubeflowv2.Trainer { return &t.Trainer } +type TrainJobDatasetConfigWrapper struct { + kubeflowv2.DatasetConfig +} + +func MakeTrainJobDatasetConfigWrapper() *TrainJobDatasetConfigWrapper { + return &TrainJobDatasetConfigWrapper{ + DatasetConfig: kubeflowv2.DatasetConfig{}, + } +} + +func (t *TrainJobDatasetConfigWrapper) StorageUri(storageUri string) *TrainJobDatasetConfigWrapper { + t.DatasetConfig.StorageUri = &storageUri + return t +} + +func (t *TrainJobDatasetConfigWrapper) ContainerEnv(env []corev1.EnvVar) *TrainJobDatasetConfigWrapper { + t.DatasetConfig.Env = env + return t +} + +func (t *TrainJobDatasetConfigWrapper) SecretRef(secretRef corev1.LocalObjectReference) *TrainJobDatasetConfigWrapper { + t.DatasetConfig.SecretRef = &secretRef + return t +} + +func (t *TrainJobDatasetConfigWrapper) Obj() *kubeflowv2.DatasetConfig { + return &t.DatasetConfig +} + +type TrainJobModelConfigWrapper struct { + kubeflowv2.ModelConfig +} + +func MakeTrainJobModelConfigWrapper() *TrainJobModelConfigWrapper { + return &TrainJobModelConfigWrapper{ + ModelConfig: kubeflowv2.ModelConfig{ + // TODO (andreyvelich): Add support for output model when implemented. + Input: &kubeflowv2.InputModel{}, + }, + } +} + +func (t *TrainJobModelConfigWrapper) StorageUri(storageUri string) *TrainJobModelConfigWrapper { + t.ModelConfig.Input.StorageUri = &storageUri + return t +} + +func (t *TrainJobModelConfigWrapper) ContainerEnv(env []corev1.EnvVar) *TrainJobModelConfigWrapper { + t.ModelConfig.Input.Env = env + return t +} + +func (t *TrainJobModelConfigWrapper) SecretRef(secretRef corev1.LocalObjectReference) *TrainJobModelConfigWrapper { + t.ModelConfig.Input.SecretRef = &secretRef + return t +} + +func (t *TrainJobModelConfigWrapper) Obj() *kubeflowv2.ModelConfig { + return &t.ModelConfig +} + type TrainingRuntimeWrapper struct { kubeflowv2.TrainingRuntime } @@ -345,9 +494,26 @@ func MakeTrainingRuntimeWrapper(namespace, name string) *TrainingRuntimeWrapper Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: constants.ContainerDatasetInitializer, - }}, + InitContainers: []corev1.Container{ + { + Name: constants.ContainerDatasetInitializer, + VolumeMounts: []corev1.VolumeMount{ + constants.VolumeMountDatasetInitializer, + }, + }, + { + Name: constants.ContainerModelInitializer, + VolumeMounts: []corev1.VolumeMount{ + constants.VolumeMountModelInitializer, + }, + }, + }, + Containers: []corev1.Container{ + constants.ContainerBusyBox, + }, + Volumes: []corev1.Volume{ + constants.VolumeInitializer, + }, }, }, }, @@ -359,9 +525,18 @@ func MakeTrainingRuntimeWrapper(namespace, name string) *TrainingRuntimeWrapper Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: constants.ContainerTrainer, - }}, + Containers: []corev1.Container{ + { + Name: constants.ContainerTrainer, + VolumeMounts: []corev1.VolumeMount{ + constants.VolumeMountDatasetInitializer, + constants.VolumeMountModelInitializer, + }, + }, + }, + Volumes: []corev1.Volume{ + constants.VolumeInitializer, + }, }, }, }, @@ -424,9 +599,26 @@ func MakeClusterTrainingRuntimeWrapper(name string) *ClusterTrainingRuntimeWrapp Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: constants.ContainerDatasetInitializer, - }}, + InitContainers: []corev1.Container{ + { + Name: constants.ContainerDatasetInitializer, + VolumeMounts: []corev1.VolumeMount{ + constants.VolumeMountDatasetInitializer, + }, + }, + { + Name: constants.ContainerModelInitializer, + VolumeMounts: []corev1.VolumeMount{ + constants.VolumeMountModelInitializer, + }, + }, + }, + Containers: []corev1.Container{ + constants.ContainerBusyBox, + }, + Volumes: []corev1.Volume{ + constants.VolumeInitializer, + }, }, }, }, @@ -438,9 +630,18 @@ func MakeClusterTrainingRuntimeWrapper(name string) *ClusterTrainingRuntimeWrapp Spec: batchv1.JobSpec{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: constants.ContainerTrainer, - }}, + Containers: []corev1.Container{ + { + Name: constants.ContainerTrainer, + VolumeMounts: []corev1.VolumeMount{ + constants.VolumeMountDatasetInitializer, + constants.VolumeMountModelInitializer, + }, + }, + }, + Volumes: []corev1.Volume{ + constants.VolumeInitializer, + }, }, }, }, @@ -521,15 +722,15 @@ func (s *TrainingRuntimeSpecWrapper) ContainerTrainerEnv(env []corev1.EnvVar) *T return s } -func (s *TrainingRuntimeSpecWrapper) ContainerDatasetModelInitializer(image string, command []string, args []string, res corev1.ResourceList) *TrainingRuntimeSpecWrapper { +func (s *TrainingRuntimeSpecWrapper) InitContainerDatasetModelInitializer(image string, command []string, args []string, res corev1.ResourceList) *TrainingRuntimeSpecWrapper { for i, rJob := range s.Template.Spec.ReplicatedJobs { if rJob.Name == constants.JobInitializer { - for j, container := range rJob.Template.Spec.Template.Spec.Containers { + for j, container := range rJob.Template.Spec.Template.Spec.InitContainers { if container.Name == constants.ContainerDatasetInitializer || container.Name == constants.ContainerModelInitializer { - s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Image = image - s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Command = command - s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Args = args - s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Resources.Requests = res + s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Image = image + s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Command = command + s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Args = args + s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Resources.Requests = res } } } diff --git a/test/integration/controller.v2/trainjob_controller_test.go b/test/integration/controller.v2/trainjob_controller_test.go index 663658e817..a961b36b22 100644 --- a/test/integration/controller.v2/trainjob_controller_test.go +++ b/test/integration/controller.v2/trainjob_controller_test.go @@ -86,8 +86,18 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { SpecAnnotation("testingKey", "testingVal"). Trainer( testingutil.MakeTrainJobTrainerWrapper(). - ContainerTrainer("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests). + Container("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests). Obj()). + DatasetConfig( + testingutil.MakeTrainJobDatasetConfigWrapper(). + StorageUri("hf://trainjob-dataset"). + Obj(), + ). + ModelConfig( + testingutil.MakeTrainJobModelConfigWrapper(). + StorageUri("hf://trainjob-model"). + Obj(), + ). Obj() trainJobKey = client.ObjectKeyFromObject(trainJob) @@ -96,12 +106,13 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "alpha").Spec). NumNodes(100). ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). PodGroupPolicyCoscheduling(&kubeflowv2.CoschedulingPodGroupPolicySource{ScheduleTimeoutSeconds: ptr.To[int32](100)}). Obj()). Obj() }) + // Integration tests for the PlainML Runtime. ginkgo.It("Should succeed to create TrainJob with TrainingRuntime", func() { ginkgo.By("Creating TrainingRuntime and TrainJob") gomega.Expect(k8sClient.Create(ctx, trainingRuntime)).Should(gomega.Succeed()) @@ -116,7 +127,9 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { Replicas(1). NumNodes(100). ContainerTrainer("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetInitializerEnv([]corev1.EnvVar{{Name: constants.InitializerEnvStorageUri, Value: "hf://trainjob-dataset"}}). + InitContainerModelInitializerEnv([]corev1.EnvVar{{Name: constants.InitializerEnvStorageUri, Value: "hf://trainjob-model"}}). Suspend(true). Label("testingKey", "testingVal"). Annotation("testingKey", "testingVal"). @@ -169,7 +182,9 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { Replicas(1). NumNodes(100). ContainerTrainer(updatedImageName, []string{"trainjob"}, []string{"trainjob"}, resRequests). - ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). + InitContainerDatasetInitializerEnv([]corev1.EnvVar{{Name: constants.InitializerEnvStorageUri, Value: "hf://trainjob-dataset"}}). + InitContainerModelInitializerEnv([]corev1.EnvVar{{Name: constants.InitializerEnvStorageUri, Value: "hf://trainjob-model"}}). Suspend(true). Label("testingKey", "testingVal"). Annotation("testingKey", "testingVal"). @@ -244,14 +259,15 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) + // Integration tests for the Torch Runtime. ginkgo.It("Should succeed to create TrainJob with Torch TrainingRuntime", func() { ginkgo.By("Creating Torch TrainingRuntime and TrainJob") trainJob = testingutil.MakeTrainJobWrapper(ns.Name, "alpha"). RuntimeRef(kubeflowv2.GroupVersion.WithKind(kubeflowv2.TrainingRuntimeKind), "alpha"). Trainer( testingutil.MakeTrainJobTrainerWrapper(). - ContainerTrainer("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests). - ContainerTrainerEnv([]corev1.EnvVar{{Name: "TRAIN_JOB", Value: "value"}}). + Container("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests). + ContainerEnv([]corev1.EnvVar{{Name: "TRAIN_JOB", Value: "value"}}). Obj()). Obj() trainJobKey = client.ObjectKeyFromObject(trainJob) From ba9e215b486b951c01420e0d64a13c431b4750af Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Fri, 1 Nov 2024 00:10:54 +0000 Subject: [PATCH 2/8] Update the SDK models Signed-off-by: Andrey Velichkevich --- api.v2/openapi-spec/swagger.json | 12 ++++++------ docs/proposals/2170-kubeflow-training-v2/README.md | 6 +++--- sdk_v2/docs/KubeflowOrgV2alpha1DatasetConfig.md | 2 +- sdk_v2/docs/KubeflowOrgV2alpha1InputModel.md | 2 +- sdk_v2/docs/KubeflowOrgV2alpha1OutputModel.md | 2 +- .../models/kubeflow_org_v2alpha1_dataset_config.py | 6 +++--- .../models/kubeflow_org_v2alpha1_input_model.py | 6 +++--- .../models/kubeflow_org_v2alpha1_output_model.py | 6 +++--- 8 files changed, 21 insertions(+), 21 deletions(-) diff --git a/api.v2/openapi-spec/swagger.json b/api.v2/openapi-spec/swagger.json index eadb6bfc06..83cbd5dedc 100644 --- a/api.v2/openapi-spec/swagger.json +++ b/api.v2/openapi-spec/swagger.json @@ -137,8 +137,8 @@ } }, "secretRef": { - "description": "Reference to the TrainJob's secrets to download dataset.", - "$ref": "#/definitions/v1.SecretReference" + "description": "Reference to the secret with credentials to download dataset. Secret must be created in the TrainJob's namespace.", + "$ref": "#/definitions/v1.LocalObjectReference" }, "storageUri": { "description": "Storage uri for the dataset provider.", @@ -159,8 +159,8 @@ } }, "secretRef": { - "description": "Reference to the TrainJob's secrets to download model.", - "$ref": "#/definitions/v1.SecretReference" + "description": "Reference to the secret with credentials to download model. Secret must be created in the TrainJob's namespace.", + "$ref": "#/definitions/v1.LocalObjectReference" }, "storageUri": { "description": "Storage uri for the model provider.", @@ -315,8 +315,8 @@ } }, "secretRef": { - "description": "Reference to the TrainJob's secrets to export model.", - "$ref": "#/definitions/v1.SecretReference" + "description": "Reference to the secret with credentials to export model. Secret must be created in the TrainJob's namespace.", + "$ref": "#/definitions/v1.LocalObjectReference" }, "storageUri": { "description": "Storage uri for the model exporter.", diff --git a/docs/proposals/2170-kubeflow-training-v2/README.md b/docs/proposals/2170-kubeflow-training-v2/README.md index c04b615dcc..be0f16b957 100644 --- a/docs/proposals/2170-kubeflow-training-v2/README.md +++ b/docs/proposals/2170-kubeflow-training-v2/README.md @@ -597,7 +597,7 @@ type DatasetConfig struct { Env []corev1.EnvVar `json:"env,omitempty"` // Reference to the TrainJob's secrets to download dataset. - SecretRef *corev1.SecretReference `json:"secretRef,omitempty"` + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` } ``` @@ -665,7 +665,7 @@ type InputModel struct { Env []corev1.EnvVar `json:"env,omitempty"` // Reference to the TrainJob's secrets to download model. - SecretRef *corev1.SecretReference `json:"secretRef,omitempty"` + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` } type OutputModel struct { @@ -677,7 +677,7 @@ type OutputModel struct { Env []corev1.EnvVar `json:"env,omitempty"` // Reference to the TrainJob's secrets to export model. - SecretRef *corev1.SecretReference `json:"secretRef,omitempty"` + SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty"` } ``` diff --git a/sdk_v2/docs/KubeflowOrgV2alpha1DatasetConfig.md b/sdk_v2/docs/KubeflowOrgV2alpha1DatasetConfig.md index 39edeefdab..342306b212 100644 --- a/sdk_v2/docs/KubeflowOrgV2alpha1DatasetConfig.md +++ b/sdk_v2/docs/KubeflowOrgV2alpha1DatasetConfig.md @@ -5,7 +5,7 @@ DatasetConfig represents the desired dataset configuration. When this API is use Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **env** | [**list[V1EnvVar]**](V1EnvVar.md) | List of environment variables to set in the dataset initializer container. These values will be merged with the TrainingRuntime's dataset initializer environments. | [optional] -**secret_ref** | [**V1SecretReference**](V1SecretReference.md) | | [optional] +**secret_ref** | [**V1LocalObjectReference**](V1LocalObjectReference.md) | | [optional] **storage_uri** | **str** | Storage uri for the dataset provider. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk_v2/docs/KubeflowOrgV2alpha1InputModel.md b/sdk_v2/docs/KubeflowOrgV2alpha1InputModel.md index ae22fddd4f..385295c819 100644 --- a/sdk_v2/docs/KubeflowOrgV2alpha1InputModel.md +++ b/sdk_v2/docs/KubeflowOrgV2alpha1InputModel.md @@ -5,7 +5,7 @@ InputModel represents the desired pre-trained model configuration. Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **env** | [**list[V1EnvVar]**](V1EnvVar.md) | List of environment variables to set in the model initializer container. These values will be merged with the TrainingRuntime's model initializer environments. | [optional] -**secret_ref** | [**V1SecretReference**](V1SecretReference.md) | | [optional] +**secret_ref** | [**V1LocalObjectReference**](V1LocalObjectReference.md) | | [optional] **storage_uri** | **str** | Storage uri for the model provider. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk_v2/docs/KubeflowOrgV2alpha1OutputModel.md b/sdk_v2/docs/KubeflowOrgV2alpha1OutputModel.md index 10563c4697..754c62ea76 100644 --- a/sdk_v2/docs/KubeflowOrgV2alpha1OutputModel.md +++ b/sdk_v2/docs/KubeflowOrgV2alpha1OutputModel.md @@ -5,7 +5,7 @@ OutputModel represents the desired trained model configuration. Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **env** | [**list[V1EnvVar]**](V1EnvVar.md) | List of environment variables to set in the model exporter container. These values will be merged with the TrainingRuntime's model exporter environments. | [optional] -**secret_ref** | [**V1SecretReference**](V1SecretReference.md) | | [optional] +**secret_ref** | [**V1LocalObjectReference**](V1LocalObjectReference.md) | | [optional] **storage_uri** | **str** | Storage uri for the model exporter. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_dataset_config.py b/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_dataset_config.py index 406bcebca6..d706b36fa0 100644 --- a/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_dataset_config.py +++ b/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_dataset_config.py @@ -34,7 +34,7 @@ class KubeflowOrgV2alpha1DatasetConfig(object): """ openapi_types = { 'env': 'list[V1EnvVar]', - 'secret_ref': 'V1SecretReference', + 'secret_ref': 'V1LocalObjectReference', 'storage_uri': 'str' } @@ -91,7 +91,7 @@ def secret_ref(self): :return: The secret_ref of this KubeflowOrgV2alpha1DatasetConfig. # noqa: E501 - :rtype: V1SecretReference + :rtype: V1LocalObjectReference """ return self._secret_ref @@ -101,7 +101,7 @@ def secret_ref(self, secret_ref): :param secret_ref: The secret_ref of this KubeflowOrgV2alpha1DatasetConfig. # noqa: E501 - :type: V1SecretReference + :type: V1LocalObjectReference """ self._secret_ref = secret_ref diff --git a/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_input_model.py b/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_input_model.py index 3a06d04596..92038070ef 100644 --- a/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_input_model.py +++ b/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_input_model.py @@ -34,7 +34,7 @@ class KubeflowOrgV2alpha1InputModel(object): """ openapi_types = { 'env': 'list[V1EnvVar]', - 'secret_ref': 'V1SecretReference', + 'secret_ref': 'V1LocalObjectReference', 'storage_uri': 'str' } @@ -91,7 +91,7 @@ def secret_ref(self): :return: The secret_ref of this KubeflowOrgV2alpha1InputModel. # noqa: E501 - :rtype: V1SecretReference + :rtype: V1LocalObjectReference """ return self._secret_ref @@ -101,7 +101,7 @@ def secret_ref(self, secret_ref): :param secret_ref: The secret_ref of this KubeflowOrgV2alpha1InputModel. # noqa: E501 - :type: V1SecretReference + :type: V1LocalObjectReference """ self._secret_ref = secret_ref diff --git a/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_output_model.py b/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_output_model.py index 501fcb39ab..22447d8559 100644 --- a/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_output_model.py +++ b/sdk_v2/kubeflow/training/models/kubeflow_org_v2alpha1_output_model.py @@ -34,7 +34,7 @@ class KubeflowOrgV2alpha1OutputModel(object): """ openapi_types = { 'env': 'list[V1EnvVar]', - 'secret_ref': 'V1SecretReference', + 'secret_ref': 'V1LocalObjectReference', 'storage_uri': 'str' } @@ -91,7 +91,7 @@ def secret_ref(self): :return: The secret_ref of this KubeflowOrgV2alpha1OutputModel. # noqa: E501 - :rtype: V1SecretReference + :rtype: V1LocalObjectReference """ return self._secret_ref @@ -101,7 +101,7 @@ def secret_ref(self, secret_ref): :param secret_ref: The secret_ref of this KubeflowOrgV2alpha1OutputModel. # noqa: E501 - :type: V1SecretReference + :type: V1LocalObjectReference """ self._secret_ref = secret_ref From 58d563d5e799f2e30974e59749300f3290029bde Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Fri, 1 Nov 2024 01:03:35 +0000 Subject: [PATCH 3/8] Remove Info from Initializer builder Signed-off-by: Andrey Velichkevich --- pkg/constants/constants.go | 2 +- pkg/runtime.v2/core/trainingruntime_test.go | 4 ++-- pkg/runtime.v2/framework/plugins/jobset/builder.go | 14 ++++++-------- pkg/runtime.v2/framework/plugins/jobset/jobset.go | 2 +- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index bfbfd04a3a..3e025de83b 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -65,7 +65,7 @@ var ( // JobCompletionIndexFieldPath is the field path for the Job completion index annotation. JobCompletionIndexFieldPath string = fmt.Sprintf("metadata.annotations['%s']", batchv1.JobCompletionIndexAnnotation) - // This is temp container that we use in the initializer ReplicatedJob. + // This is the temporary container that we use in the initializer ReplicatedJob. // TODO (andreyvelich): Once JobSet supports execution policy, we can remove it. // TODO (andreyvelich): Add validation to check that initializer ReplicatedJob has this container. ContainerBusyBox corev1.Container = corev1.Container{ diff --git a/pkg/runtime.v2/core/trainingruntime_test.go b/pkg/runtime.v2/core/trainingruntime_test.go index 31f942818d..5ca6bfc57c 100644 --- a/pkg/runtime.v2/core/trainingruntime_test.go +++ b/pkg/runtime.v2/core/trainingruntime_test.go @@ -87,8 +87,8 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { MinMember(31). // 31 replicas = 30 Trainer nodes + 1 Initializer. MinResources(corev1.ResourceList{ // Every replica has 1 CPU = 31 CPUs in total. - // Since initializers use init containers, they execute sequentially. - // MinResources is equal to the maximum from the initContainer resources. + // Initializer uses InitContainers which execute sequentially. + // Thus, the MinResources is equal to the maximum from the initContainer resources. corev1.ResourceCPU: resource.MustParse("31"), }). SchedulingTimeout(120). diff --git a/pkg/runtime.v2/framework/plugins/jobset/builder.go b/pkg/runtime.v2/framework/plugins/jobset/builder.go index 6146930122..34485bb9c9 100644 --- a/pkg/runtime.v2/framework/plugins/jobset/builder.go +++ b/pkg/runtime.v2/framework/plugins/jobset/builder.go @@ -52,7 +52,7 @@ func NewBuilder(objectKey client.ObjectKey, jobSetTemplateSpec kubeflowv2.JobSet } } -// mergeInitializerEnvs merge the TrainJob and Runtime Pod envs. +// mergeInitializerEnvs merges the TrainJob and Runtime Pod envs. func mergeInitializerEnvs(storageUri *string, trainJobEnvs, containerEnv []corev1.EnvVar) []corev1.EnvVar { envNames := sets.New[string]() envs := []corev1.EnvVar{} @@ -66,11 +66,9 @@ func mergeInitializerEnvs(storageUri *string, trainJobEnvs, containerEnv []corev } // Add the rest TrainJob envs. // TODO (andreyvelich): Validate that TrainJob dataset and model envs don't have the STORAGE_URI env. - if trainJobEnvs != nil { - for _, e := range trainJobEnvs { - envNames.Insert(e.Name) - envs = append(envs, e) - } + for _, e := range trainJobEnvs { + envNames.Insert(e.Name) + envs = append(envs, e) } // TrainJob envs take precedence over the TrainingRuntime envs. @@ -83,7 +81,7 @@ func mergeInitializerEnvs(storageUri *string, trainJobEnvs, containerEnv []corev } // Initializer updates JobSet values for the initializer Job. -func (b *Builder) Initializer(info *runtime.Info, trainJob *kubeflowv2.TrainJob) *Builder { +func (b *Builder) Initializer(trainJob *kubeflowv2.TrainJob) *Builder { for i, rJob := range b.Spec.ReplicatedJobs { if rJob.Name == constants.JobInitializer { // TODO (andreyvelich): Currently, we use initContainers for the initializers. @@ -110,7 +108,7 @@ func (b *Builder) Initializer(info *runtime.Info, trainJob *kubeflowv2.TrainJob) ) } } - // TODO (andreyvelich): Add support for the model exporter when we support it. + // TODO (andreyvelich): Add the model exporter when we support it. // Update values for the model initializer container. if container.Name == constants.ContainerModelInitializer && trainJob.Spec.ModelConfig != nil && trainJob.Spec.ModelConfig.Input != nil { // Update the model initializer envs. diff --git a/pkg/runtime.v2/framework/plugins/jobset/jobset.go b/pkg/runtime.v2/framework/plugins/jobset/jobset.go index ff54d757bd..ef04890b39 100644 --- a/pkg/runtime.v2/framework/plugins/jobset/jobset.go +++ b/pkg/runtime.v2/framework/plugins/jobset/jobset.go @@ -101,7 +101,7 @@ func (j *JobSet) Build(ctx context.Context, runtimeJobTemplate client.Object, in // TODO (andreyvelich): Add support for the PodSpecOverride. // TODO (andreyvelich): Refactor the builder with wrappers for PodSpec. jobSet := jobSetBuilder. - Initializer(info, trainJob). + Initializer(trainJob). Trainer(info, trainJob). PodLabels(info.PodLabels). Suspend(trainJob.Spec.Suspend). From e9f017c62f724cc23535fc06f1f1c0ddc00e054e Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Fri, 1 Nov 2024 01:11:24 +0000 Subject: [PATCH 4/8] Update manifests Signed-off-by: Andrey Velichkevich --- Makefile | 2 +- .../v2/base/crds/kubeflow.org_trainjobs.yaml | 44 +++++++++---------- 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/Makefile b/Makefile index df7f4969fa..97dc64660f 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,7 @@ manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and Cust output:rbac:artifacts:config=manifests/v2/base/rbac \ output:webhook:artifacts:config=manifests/v2/base/webhook -generate: controller-gen ## Generate apidoc, sdk and code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations. +generate: controller-gen manifests ## Generate apidoc, sdk and code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations. $(CONTROLLER_GEN) object:headerFile="hack/boilerplate/boilerplate.go.txt" paths="./pkg/apis/..." hack/update-codegen.sh hack/python-sdk/gen-sdk.sh diff --git a/manifests/v2/base/crds/kubeflow.org_trainjobs.yaml b/manifests/v2/base/crds/kubeflow.org_trainjobs.yaml index 8307cdc51e..ed6cda3760 100644 --- a/manifests/v2/base/crds/kubeflow.org_trainjobs.yaml +++ b/manifests/v2/base/crds/kubeflow.org_trainjobs.yaml @@ -173,15 +173,15 @@ spec: type: object type: array secretRef: - description: Reference to the TrainJob's secrets to download dataset. + description: |- + Reference to the secret with credentials to download dataset. + Secret must be created in the TrainJob's namespace. properties: name: - description: name is unique within a namespace to reference - a secret resource. - type: string - namespace: - description: namespace defines the space within which the - secret name must be unique. + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? type: string type: object x-kubernetes-map-type: atomic @@ -339,16 +339,15 @@ spec: type: object type: array secretRef: - description: Reference to the TrainJob's secrets to download - model. + description: |- + Reference to the secret with credentials to download model. + Secret must be created in the TrainJob's namespace. properties: name: - description: name is unique within a namespace to reference - a secret resource. - type: string - namespace: - description: namespace defines the space within which - the secret name must be unique. + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? type: string type: object x-kubernetes-map-type: atomic @@ -479,16 +478,15 @@ spec: type: object type: array secretRef: - description: Reference to the TrainJob's secrets to export - model. + description: |- + Reference to the secret with credentials to export model. + Secret must be created in the TrainJob's namespace. properties: name: - description: name is unique within a namespace to reference - a secret resource. - type: string - namespace: - description: namespace defines the space within which - the secret name must be unique. + description: |- + Name of the referent. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, kind, uid? type: string type: object x-kubernetes-map-type: atomic From 1e14800424151fca76fb841e282eb13a3e86d6e5 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Fri, 1 Nov 2024 17:20:46 +0000 Subject: [PATCH 5/8] Update pkg/constants/constants.go Co-authored-by: Yuki Iwai Signed-off-by: Andrey Velichkevich --- pkg/constants/constants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 3e025de83b..3243d184ac 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -70,7 +70,7 @@ var ( // TODO (andreyvelich): Add validation to check that initializer ReplicatedJob has this container. ContainerBusyBox corev1.Container = corev1.Container{ Name: "busybox", - Image: "busybox", + Image: "busybox:stable-glibc", } // VolumeMountModelInitializer is the volume mount for the model initializer container. From b9f9e8a5b96b84dc69ca9479aa91147d8cb8a612 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Fri, 1 Nov 2024 17:24:54 +0000 Subject: [PATCH 6/8] Use var for envs Signed-off-by: Andrey Velichkevich --- pkg/controller.v1/pytorch/elastic.go | 2 +- pkg/controller.v1/pytorch/master.go | 2 +- pkg/runtime.v2/framework/plugins/jobset/builder.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/controller.v1/pytorch/elastic.go b/pkg/controller.v1/pytorch/elastic.go index f5c2008b60..ee0a4f6308 100644 --- a/pkg/controller.v1/pytorch/elastic.go +++ b/pkg/controller.v1/pytorch/elastic.go @@ -67,7 +67,7 @@ func GetElasticEnvVarGenerator() EnvVarGenerator { func (e ElasticEnvVarGenerator) Generate( job *kubeflowv1.PyTorchJob) ([]corev1.EnvVar, error) { - envVars := []corev1.EnvVar{} + var envVars []corev1.EnvVar elasticPolicy := job.Spec.ElasticPolicy if elasticPolicy == nil { diff --git a/pkg/controller.v1/pytorch/master.go b/pkg/controller.v1/pytorch/master.go index 064c7054b9..c46a031cca 100644 --- a/pkg/controller.v1/pytorch/master.go +++ b/pkg/controller.v1/pytorch/master.go @@ -32,7 +32,7 @@ func GetMasterEnvVarGenerator() EnvVarGenerator { func (e MasterEnvVarGenerator) Generate( job *kubeflowv1.PyTorchJob) ([]corev1.EnvVar, error) { - envVars := []corev1.EnvVar{} + var envVars []corev1.EnvVar if job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster] != nil { masterPort, err := getPortFromPyTorchJob(job, kubeflowv1.PyTorchJobReplicaTypeMaster) if err != nil { diff --git a/pkg/runtime.v2/framework/plugins/jobset/builder.go b/pkg/runtime.v2/framework/plugins/jobset/builder.go index 34485bb9c9..d264982ee9 100644 --- a/pkg/runtime.v2/framework/plugins/jobset/builder.go +++ b/pkg/runtime.v2/framework/plugins/jobset/builder.go @@ -55,7 +55,7 @@ func NewBuilder(objectKey client.ObjectKey, jobSetTemplateSpec kubeflowv2.JobSet // mergeInitializerEnvs merges the TrainJob and Runtime Pod envs. func mergeInitializerEnvs(storageUri *string, trainJobEnvs, containerEnv []corev1.EnvVar) []corev1.EnvVar { envNames := sets.New[string]() - envs := []corev1.EnvVar{} + var envs []corev1.EnvVar // Add the Storage URI env. if storageUri != nil { envNames.Insert(constants.InitializerEnvStorageUri) From c35b8d1a8a2e8bec4a795f9301aa042fa5c8c72a Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Fri, 1 Nov 2024 17:27:18 +0000 Subject: [PATCH 7/8] Remove check manifests from GitHub actions Signed-off-by: Andrey Velichkevich --- .github/workflows/test-go.yaml | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/.github/workflows/test-go.yaml b/.github/workflows/test-go.yaml index de9c242398..b314f73724 100644 --- a/.github/workflows/test-go.yaml +++ b/.github/workflows/test-go.yaml @@ -29,14 +29,10 @@ jobs: run: | go mod tidy && pushd hack/swagger && go mod tidy && popd && git add go.* && git diff --cached --exit-code || (echo 'Please run "go mod tidy" to sync Go modules' && exit 1); - - name: Check manifests + - name: Check auto-generated assets run: | - make manifests && git add manifests && - git diff --cached --exit-code || (echo 'Please run "make manifests" to generate manifests' && exit 1); - - name: Check auto-generated codes - run: | - make generate && git add pkg sdk && - git diff --cached --exit-code || (echo 'Please run "make generate" to generate Go codes' && exit 1); + make generate && git add pkg sdk manifests && + git diff --cached --exit-code || (echo 'Please run "make generate" to generate assets' && exit 1); - name: Verify gofmt run: | make fmt && git add pkg cmd && From cd0bc23d9412b358065687e5654a36ae2af025e8 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Fri, 1 Nov 2024 20:38:06 +0000 Subject: [PATCH 8/8] Move consts to JobSet plugin Signed-off-by: Andrey Velichkevich --- pkg/constants/constants.go | 40 ---------------- pkg/runtime.v2/core/trainingruntime_test.go | 5 +- .../framework/plugins/jobset/builder.go | 4 +- .../framework/plugins/jobset/constants.go | 48 +++++++++++++++++++ pkg/util.v2/testing/wrapper.go | 43 +++++++++-------- .../controller.v2/trainjob_controller_test.go | 9 ++-- 6 files changed, 80 insertions(+), 69 deletions(-) create mode 100644 pkg/runtime.v2/framework/plugins/jobset/constants.go diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 3243d184ac..29288d2d0e 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -4,7 +4,6 @@ import ( "fmt" batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" ) const ( @@ -27,19 +26,12 @@ const ( // JobInitializer is the Job name for the initializer. JobInitializer string = "initializer" - // VolumeNameInitializer is the name for the initializer Pod's Volume and VolumeMount. - // TODO (andreyvelich): Add validation to check that initializer Pod has the correct volume. - VolumeNameInitializer string = "initializer" - // ContainerModelInitializer is the container name for the model initializer. ContainerModelInitializer string = "model-initializer" // ContainerDatasetInitializer is the container name for the dataset initializer. ContainerDatasetInitializer string = "dataset-initializer" - // InitializerEnvStorageUri is the env name for the initializer storage uri. - InitializerEnvStorageUri string = "STORAGE_URI" - // PodGroupKind is the Kind name for the PodGroup. PodGroupKind string = "PodGroup" @@ -64,36 +56,4 @@ const ( var ( // JobCompletionIndexFieldPath is the field path for the Job completion index annotation. JobCompletionIndexFieldPath string = fmt.Sprintf("metadata.annotations['%s']", batchv1.JobCompletionIndexAnnotation) - - // This is the temporary container that we use in the initializer ReplicatedJob. - // TODO (andreyvelich): Once JobSet supports execution policy, we can remove it. - // TODO (andreyvelich): Add validation to check that initializer ReplicatedJob has this container. - ContainerBusyBox corev1.Container = corev1.Container{ - Name: "busybox", - Image: "busybox:stable-glibc", - } - - // VolumeMountModelInitializer is the volume mount for the model initializer container. - // TODO (andreyvelich): Add validation to check that initializer ReplicatedJob has the following volumes. - VolumeMountModelInitializer = corev1.VolumeMount{ - Name: VolumeNameInitializer, - MountPath: "/workspace/model", - } - - // VolumeMountModelInitializer is the volume mount for the dataset initializer container. - VolumeMountDatasetInitializer = corev1.VolumeMount{ - Name: VolumeNameInitializer, - MountPath: "/workspace/dataset", - } - - // VolumeInitializer is the volume for the initializer ReplicatedJob. - // TODO (andreyvelich): We should make VolumeSource configurable. - VolumeInitializer = corev1.Volume{ - Name: VolumeNameInitializer, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: VolumeNameInitializer, - }, - }, - } ) diff --git a/pkg/runtime.v2/core/trainingruntime_test.go b/pkg/runtime.v2/core/trainingruntime_test.go index 5ca6bfc57c..2a11716cd3 100644 --- a/pkg/runtime.v2/core/trainingruntime_test.go +++ b/pkg/runtime.v2/core/trainingruntime_test.go @@ -31,6 +31,7 @@ import ( kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" "github.com/kubeflow/training-operator/pkg/constants" + jobsetplugin "github.com/kubeflow/training-operator/pkg/runtime.v2/framework/plugins/jobset" testingutil "github.com/kubeflow/training-operator/pkg/util.v2/testing" ) @@ -211,7 +212,7 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { InitContainerDatasetInitializerEnv( []corev1.EnvVar{ { - Name: constants.InitializerEnvStorageUri, + Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-dataset", }, { @@ -234,7 +235,7 @@ func TestTrainingRuntimeNewObjects(t *testing.T) { InitContainerModelInitializerEnv( []corev1.EnvVar{ { - Name: constants.InitializerEnvStorageUri, + Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-model", }, { diff --git a/pkg/runtime.v2/framework/plugins/jobset/builder.go b/pkg/runtime.v2/framework/plugins/jobset/builder.go index d264982ee9..dc104bdd28 100644 --- a/pkg/runtime.v2/framework/plugins/jobset/builder.go +++ b/pkg/runtime.v2/framework/plugins/jobset/builder.go @@ -58,9 +58,9 @@ func mergeInitializerEnvs(storageUri *string, trainJobEnvs, containerEnv []corev var envs []corev1.EnvVar // Add the Storage URI env. if storageUri != nil { - envNames.Insert(constants.InitializerEnvStorageUri) + envNames.Insert(InitializerEnvStorageUri) envs = append(envs, corev1.EnvVar{ - Name: constants.InitializerEnvStorageUri, + Name: InitializerEnvStorageUri, Value: *storageUri, }) } diff --git a/pkg/runtime.v2/framework/plugins/jobset/constants.go b/pkg/runtime.v2/framework/plugins/jobset/constants.go new file mode 100644 index 0000000000..3a8a8e07e6 --- /dev/null +++ b/pkg/runtime.v2/framework/plugins/jobset/constants.go @@ -0,0 +1,48 @@ +package jobset + +import ( + corev1 "k8s.io/api/core/v1" +) + +const ( + + // VolumeNameInitializer is the name for the initializer Pod's Volume and VolumeMount. + // TODO (andreyvelich): Add validation to check that initializer Pod has the correct volume. + VolumeNameInitializer string = "initializer" + + // InitializerEnvStorageUri is the env name for the initializer storage uri. + InitializerEnvStorageUri string = "STORAGE_URI" +) + +var ( + // This is the temporary container that we use in the initializer ReplicatedJob. + // TODO (andreyvelich): Once JobSet supports execution policy, we can remove it. + ContainerBusyBox corev1.Container = corev1.Container{ + Name: "busybox", + Image: "busybox:stable-glibc", + } + + // VolumeMountModelInitializer is the volume mount for the model initializer container. + // TODO (andreyvelich): Add validation to check that initializer ReplicatedJob has the following volumes. + VolumeMountModelInitializer = corev1.VolumeMount{ + Name: VolumeNameInitializer, + MountPath: "/workspace/model", + } + + // VolumeMountModelInitializer is the volume mount for the dataset initializer container. + VolumeMountDatasetInitializer = corev1.VolumeMount{ + Name: VolumeNameInitializer, + MountPath: "/workspace/dataset", + } + + // VolumeInitializer is the volume for the initializer ReplicatedJob. + // TODO (andreyvelich): We should make VolumeSource configurable. + VolumeInitializer = corev1.Volume{ + Name: VolumeNameInitializer, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: VolumeNameInitializer, + }, + }, + } +) diff --git a/pkg/util.v2/testing/wrapper.go b/pkg/util.v2/testing/wrapper.go index 6b1a3e7349..4f19ba3d2c 100644 --- a/pkg/util.v2/testing/wrapper.go +++ b/pkg/util.v2/testing/wrapper.go @@ -28,6 +28,7 @@ import ( kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" "github.com/kubeflow/training-operator/pkg/constants" + jobsetplugin "github.com/kubeflow/training-operator/pkg/runtime.v2/framework/plugins/jobset" ) type JobSetWrapper struct { @@ -57,21 +58,21 @@ func MakeJobSetWrapper(namespace, name string) *JobSetWrapper { { Name: constants.ContainerDatasetInitializer, VolumeMounts: []corev1.VolumeMount{ - constants.VolumeMountDatasetInitializer, + jobsetplugin.VolumeMountDatasetInitializer, }, }, { Name: constants.ContainerModelInitializer, VolumeMounts: []corev1.VolumeMount{ - constants.VolumeMountModelInitializer, + jobsetplugin.VolumeMountModelInitializer, }, }, }, Containers: []corev1.Container{ - constants.ContainerBusyBox, + jobsetplugin.ContainerBusyBox, }, Volumes: []corev1.Volume{ - constants.VolumeInitializer, + jobsetplugin.VolumeInitializer, }, }, }, @@ -88,13 +89,13 @@ func MakeJobSetWrapper(namespace, name string) *JobSetWrapper { { Name: constants.ContainerTrainer, VolumeMounts: []corev1.VolumeMount{ - constants.VolumeMountDatasetInitializer, - constants.VolumeMountModelInitializer, + jobsetplugin.VolumeMountDatasetInitializer, + jobsetplugin.VolumeMountModelInitializer, }, }, }, Volumes: []corev1.Volume{ - constants.VolumeInitializer, + jobsetplugin.VolumeInitializer, }, }, }, @@ -498,21 +499,21 @@ func MakeTrainingRuntimeWrapper(namespace, name string) *TrainingRuntimeWrapper { Name: constants.ContainerDatasetInitializer, VolumeMounts: []corev1.VolumeMount{ - constants.VolumeMountDatasetInitializer, + jobsetplugin.VolumeMountDatasetInitializer, }, }, { Name: constants.ContainerModelInitializer, VolumeMounts: []corev1.VolumeMount{ - constants.VolumeMountModelInitializer, + jobsetplugin.VolumeMountModelInitializer, }, }, }, Containers: []corev1.Container{ - constants.ContainerBusyBox, + jobsetplugin.ContainerBusyBox, }, Volumes: []corev1.Volume{ - constants.VolumeInitializer, + jobsetplugin.VolumeInitializer, }, }, }, @@ -529,13 +530,13 @@ func MakeTrainingRuntimeWrapper(namespace, name string) *TrainingRuntimeWrapper { Name: constants.ContainerTrainer, VolumeMounts: []corev1.VolumeMount{ - constants.VolumeMountDatasetInitializer, - constants.VolumeMountModelInitializer, + jobsetplugin.VolumeMountDatasetInitializer, + jobsetplugin.VolumeMountModelInitializer, }, }, }, Volumes: []corev1.Volume{ - constants.VolumeInitializer, + jobsetplugin.VolumeInitializer, }, }, }, @@ -603,21 +604,21 @@ func MakeClusterTrainingRuntimeWrapper(name string) *ClusterTrainingRuntimeWrapp { Name: constants.ContainerDatasetInitializer, VolumeMounts: []corev1.VolumeMount{ - constants.VolumeMountDatasetInitializer, + jobsetplugin.VolumeMountDatasetInitializer, }, }, { Name: constants.ContainerModelInitializer, VolumeMounts: []corev1.VolumeMount{ - constants.VolumeMountModelInitializer, + jobsetplugin.VolumeMountModelInitializer, }, }, }, Containers: []corev1.Container{ - constants.ContainerBusyBox, + jobsetplugin.ContainerBusyBox, }, Volumes: []corev1.Volume{ - constants.VolumeInitializer, + jobsetplugin.VolumeInitializer, }, }, }, @@ -634,13 +635,13 @@ func MakeClusterTrainingRuntimeWrapper(name string) *ClusterTrainingRuntimeWrapp { Name: constants.ContainerTrainer, VolumeMounts: []corev1.VolumeMount{ - constants.VolumeMountDatasetInitializer, - constants.VolumeMountModelInitializer, + jobsetplugin.VolumeMountDatasetInitializer, + jobsetplugin.VolumeMountModelInitializer, }, }, }, Volumes: []corev1.Volume{ - constants.VolumeInitializer, + jobsetplugin.VolumeInitializer, }, }, }, diff --git a/test/integration/controller.v2/trainjob_controller_test.go b/test/integration/controller.v2/trainjob_controller_test.go index a961b36b22..39ce245227 100644 --- a/test/integration/controller.v2/trainjob_controller_test.go +++ b/test/integration/controller.v2/trainjob_controller_test.go @@ -32,6 +32,7 @@ import ( kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" "github.com/kubeflow/training-operator/pkg/constants" + jobsetplugin "github.com/kubeflow/training-operator/pkg/runtime.v2/framework/plugins/jobset" testingutil "github.com/kubeflow/training-operator/pkg/util.v2/testing" "github.com/kubeflow/training-operator/test/integration/framework" "github.com/kubeflow/training-operator/test/util" @@ -128,8 +129,8 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { NumNodes(100). ContainerTrainer("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests). InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). - InitContainerDatasetInitializerEnv([]corev1.EnvVar{{Name: constants.InitializerEnvStorageUri, Value: "hf://trainjob-dataset"}}). - InitContainerModelInitializerEnv([]corev1.EnvVar{{Name: constants.InitializerEnvStorageUri, Value: "hf://trainjob-model"}}). + InitContainerDatasetInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-dataset"}}). + InitContainerModelInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-model"}}). Suspend(true). Label("testingKey", "testingVal"). Annotation("testingKey", "testingVal"). @@ -183,8 +184,8 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() { NumNodes(100). ContainerTrainer(updatedImageName, []string{"trainjob"}, []string{"trainjob"}, resRequests). InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests). - InitContainerDatasetInitializerEnv([]corev1.EnvVar{{Name: constants.InitializerEnvStorageUri, Value: "hf://trainjob-dataset"}}). - InitContainerModelInitializerEnv([]corev1.EnvVar{{Name: constants.InitializerEnvStorageUri, Value: "hf://trainjob-model"}}). + InitContainerDatasetInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-dataset"}}). + InitContainerModelInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-model"}}). Suspend(true). Label("testingKey", "testingVal"). Annotation("testingKey", "testingVal").