-
Notifications
You must be signed in to change notification settings - Fork 704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KEP-2170: Adding validation webhook for v2 trainjob #2307
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ import ( | |
"k8s.io/apimachinery/pkg/util/validation/field" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/webhook/admission" | ||
jobsetv1alpha2 "sigs.k8s.io/jobset/api/jobset/v1alpha2" | ||
|
||
kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" | ||
runtime "github.com/kubeflow/training-operator/pkg/runtime.v2" | ||
|
@@ -69,14 +70,19 @@ func (r *ClusterTrainingRuntime) EventHandlerRegistrars() []runtime.ReconcilerBu | |
} | ||
|
||
func (r *ClusterTrainingRuntime) ValidateObjects(ctx context.Context, old, new *kubeflowv2.TrainJob) (admission.Warnings, field.ErrorList) { | ||
clusterTrainingRuntime := &kubeflowv2.ClusterTrainingRuntime{} | ||
if err := r.client.Get(ctx, client.ObjectKey{ | ||
Namespace: old.Namespace, | ||
Name: old.Spec.RuntimeRef.Name, | ||
}, &kubeflowv2.ClusterTrainingRuntime{}); err != nil { | ||
Namespace: new.Namespace, | ||
Name: new.Spec.RuntimeRef.Name, | ||
Comment on lines
+75
to
+76
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you ever seen the isseus when we use the old object names? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we get There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I am validating updated object instead of the existing one |
||
}, clusterTrainingRuntime); err != nil { | ||
return nil, field.ErrorList{ | ||
field.Invalid(field.NewPath("spec", "RuntimeRef"), old.Spec.RuntimeRef, | ||
field.Invalid(field.NewPath("spec", "RuntimeRef"), new.Spec.RuntimeRef, | ||
fmt.Sprintf("%v: specified clusterTrainingRuntime must be created before the TrainJob is created", err)), | ||
} | ||
} | ||
return r.framework.RunCustomValidationPlugins(old, new) | ||
info := r.runtimeInfo(ctx, new, clusterTrainingRuntime.Spec.Template, clusterTrainingRuntime.Spec.MLPolicy, clusterTrainingRuntime.Spec.PodGroupPolicy) | ||
jobSetTemplate := jobsetv1alpha2.JobSet{ | ||
Spec: clusterTrainingRuntime.Spec.Template.Spec, | ||
} | ||
return r.framework.RunCustomValidationPlugins(jobSetTemplate.DeepCopy(), info, old, new) | ||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -83,6 +83,26 @@ func (r *TrainingRuntime) NewObjects(ctx context.Context, trainJob *kubeflowv2.T | |||
func (r *TrainingRuntime) buildObjects( | ||||
ctx context.Context, trainJob *kubeflowv2.TrainJob, jobSetTemplateSpec kubeflowv2.JobSetTemplateSpec, mlPolicy *kubeflowv2.MLPolicy, podGroupPolicy *kubeflowv2.PodGroupPolicy, | ||||
) ([]client.Object, error) { | ||||
|
||||
info := r.runtimeInfo(ctx, trainJob, jobSetTemplateSpec, mlPolicy, podGroupPolicy) | ||||
if err := r.framework.RunEnforceMLPolicyPlugins(info, trainJob); err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
if err := r.framework.RunEnforcePodGroupPolicyPlugins(info, trainJob); err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
jobSetTemplate := jobsetv1alpha2.JobSet{ | ||||
Spec: jobSetTemplateSpec.Spec, | ||||
} | ||||
|
||||
return r.framework.RunComponentBuilderPlugins(ctx, jobSetTemplate.DeepCopy(), info, trainJob) | ||||
} | ||||
|
||||
func (r *TrainingRuntime) runtimeInfo( | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be part of Runtime interface:
And should we name this API more explicit (e.g. getRuntimeInfo() or initializeRuntimeInfo() ) ?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be part of trainingRuntime as it depends on config from trainJob/trainingRuntume resources There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, but the Info object will be used for every runtime that we register with our manager. |
||||
ctx context.Context, trainJob *kubeflowv2.TrainJob, jobSetTemplateSpec kubeflowv2.JobSetTemplateSpec, mlPolicy *kubeflowv2.MLPolicy, podGroupPolicy *kubeflowv2.PodGroupPolicy) *runtime.Info { | ||||
|
||||
propagationLabels := jobSetTemplateSpec.Labels | ||||
if propagationLabels == nil && trainJob.Spec.Labels != nil { | ||||
propagationLabels = make(map[string]string, len(trainJob.Spec.Labels)) | ||||
|
@@ -113,19 +133,7 @@ func (r *TrainingRuntime) buildObjects( | |||
|
||||
info := runtime.NewInfo(opts...) | ||||
|
||||
if err := r.framework.RunEnforceMLPolicyPlugins(info, trainJob); err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
if err := r.framework.RunEnforcePodGroupPolicyPlugins(info, trainJob); err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
jobSetTemplate := jobsetv1alpha2.JobSet{ | ||||
Spec: jobSetTemplateSpec.Spec, | ||||
} | ||||
|
||||
return r.framework.RunComponentBuilderPlugins(ctx, jobSetTemplate.DeepCopy(), info, trainJob) | ||||
return info | ||||
} | ||||
|
||||
func (r *TrainingRuntime) TerminalCondition(ctx context.Context, trainJob *kubeflowv2.TrainJob) (*metav1.Condition, error) { | ||||
|
@@ -141,14 +149,19 @@ func (r *TrainingRuntime) EventHandlerRegistrars() []runtime.ReconcilerBuilder { | |||
} | ||||
|
||||
func (r *TrainingRuntime) ValidateObjects(ctx context.Context, old, new *kubeflowv2.TrainJob) (admission.Warnings, field.ErrorList) { | ||||
trainingRuntime := &kubeflowv2.TrainingRuntime{} | ||||
if err := r.client.Get(ctx, client.ObjectKey{ | ||||
Namespace: old.Namespace, | ||||
Name: old.Spec.RuntimeRef.Name, | ||||
}, &kubeflowv2.TrainingRuntime{}); err != nil { | ||||
Namespace: new.Namespace, | ||||
Name: new.Spec.RuntimeRef.Name, | ||||
}, trainingRuntime); err != nil { | ||||
return nil, field.ErrorList{ | ||||
field.Invalid(field.NewPath("spec", "runtimeRef"), old.Spec.RuntimeRef, | ||||
field.Invalid(field.NewPath("spec", "runtimeRef"), new.Spec.RuntimeRef, | ||||
fmt.Sprintf("%v: specified trainingRuntime must be created before the TrainJob is created", err)), | ||||
} | ||||
} | ||||
return r.framework.RunCustomValidationPlugins(old, new) | ||||
info := r.runtimeInfo(ctx, new, trainingRuntime.Spec.Template, trainingRuntime.Spec.MLPolicy, trainingRuntime.Spec.PodGroupPolicy) | ||||
jobSetTemplate := jobsetv1alpha2.JobSet{ | ||||
Spec: trainingRuntime.Spec.Template.Spec, | ||||
} | ||||
return r.framework.RunCustomValidationPlugins(jobSetTemplate.DeepCopy(), info, old, new) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the main goal to pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is required to access the runtimePolicy for now which is configured in the trainingRuntiume There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tenzen-y What do you think about it ? |
||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,19 +20,23 @@ import ( | |
"context" | ||
"fmt" | ||
"maps" | ||
"slices" | ||
|
||
"github.com/go-logr/logr" | ||
corev1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/api/equality" | ||
apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
"k8s.io/apimachinery/pkg/api/meta" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
apiruntime "k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/apimachinery/pkg/util/validation/field" | ||
"k8s.io/utils/ptr" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/builder" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" | ||
"sigs.k8s.io/controller-runtime/pkg/webhook/admission" | ||
jobsetv1alpha2 "sigs.k8s.io/jobset/api/jobset/v1alpha2" | ||
|
||
kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" | ||
|
@@ -51,6 +55,7 @@ type JobSet struct { | |
var _ framework.WatchExtensionPlugin = (*JobSet)(nil) | ||
var _ framework.ComponentBuilderPlugin = (*JobSet)(nil) | ||
var _ framework.TerminalConditionPlugin = (*JobSet)(nil) | ||
var _ framework.CustomValidationPlugin = (*JobSet)(nil) | ||
|
||
const Name = constants.JobSetKind | ||
|
||
|
@@ -157,3 +162,98 @@ func (j *JobSet) ReconcilerBuilders() []runtime.ReconcilerBuilder { | |
}, | ||
} | ||
} | ||
|
||
func (j *JobSet) Validate(runtimeJobTemplate client.Object, runtimeInfo *runtime.Info, oldObj, newObj *kubeflowv2.TrainJob) (admission.Warnings, field.ErrorList) { | ||
|
||
var allErrs field.ErrorList | ||
specPath := field.NewPath("spec") | ||
runtimeRefPath := specPath.Child("runtimeRef") | ||
|
||
jobSet, ok := runtimeJobTemplate.(*jobsetv1alpha2.JobSet) | ||
if !ok { | ||
return nil, nil | ||
} | ||
|
||
if newObj.Spec.ModelConfig != nil && newObj.Spec.ModelConfig.Input != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think, for now we should check the initContainers in JobSet, as I mentioned here: https://github.com/kubeflow/training-operator/blob/master/pkg/runtime.v2/framework/plugins/jobset/builder.go#L87-L89 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am checking the initContainers here https://github.com/kubeflow/training-operator/pull/2307/files#diff-935da6e0f990201db2f6ddf15c768526f70993d5a2408814013e96e3fedd5ebfR165. The condition here is only to check presence to initializer job if input modelconfig or dataset config is present in the trainJob |
||
if !slices.ContainsFunc(jobSet.Spec.ReplicatedJobs, func(x jobsetv1alpha2.ReplicatedJob) bool { | ||
return x.Name == constants.JobInitializer | ||
}) { | ||
allErrs = append(allErrs, field.Invalid(runtimeRefPath, newObj.Spec.RuntimeRef, fmt.Sprintf("trainingRuntime should have %s job when trainJob is configured with input modelConfig", constants.JobInitializer))) | ||
} else { | ||
for _, job := range jobSet.Spec.ReplicatedJobs { | ||
if job.Name == constants.JobInitializer { | ||
if !slices.ContainsFunc(job.Template.Spec.Template.Spec.InitContainers, func(x corev1.Container) bool { | ||
return x.Name == constants.ContainerModelInitializer | ||
}) { | ||
allErrs = append(allErrs, field.Invalid(runtimeRefPath, newObj.Spec.RuntimeRef, fmt.Sprintf("trainingRuntime should have container with name - %s in the %s job", constants.ContainerModelInitializer, constants.JobInitializer))) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
if newObj.Spec.DatasetConfig != nil { | ||
if !slices.ContainsFunc(jobSet.Spec.ReplicatedJobs, func(x jobsetv1alpha2.ReplicatedJob) bool { | ||
return x.Name == constants.JobInitializer | ||
}) { | ||
allErrs = append(allErrs, field.Invalid(runtimeRefPath, newObj.Spec.RuntimeRef, fmt.Sprintf("trainingRuntime should have %s job when trainJob is configured with input datasetConfig", constants.JobInitializer))) | ||
} else { | ||
for _, job := range jobSet.Spec.ReplicatedJobs { | ||
if job.Name == constants.JobInitializer { | ||
if !slices.ContainsFunc(job.Template.Spec.Template.Spec.InitContainers, func(x corev1.Container) bool { | ||
return x.Name == constants.ContainerDatasetInitializer | ||
}) { | ||
allErrs = append(allErrs, field.Invalid(runtimeRefPath, newObj.Spec.RuntimeRef, fmt.Sprintf("trainingRuntime should have container with name - %s in the %s job", constants.ContainerDatasetInitializer, constants.JobInitializer))) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
if len(newObj.Spec.PodSpecOverrides) != 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we implement this validation when we support PodSpecOverride ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have any preference. Any reason to omit now? Or can we update it later if we see issues? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest that we remove it from this PR, since we haven't discussed how we will implement PodSpecOverrides yet. |
||
podSpecOverridesPath := specPath.Child("podSpecOverrides") | ||
jobsMap := map[string]bool{} | ||
for _, job := range jobSet.Spec.ReplicatedJobs { | ||
jobsMap[job.Name] = true | ||
} | ||
// validate if jobOverrides are valid | ||
for idx, override := range newObj.Spec.PodSpecOverrides { | ||
for _, job := range override.TargetJobs { | ||
if _, found := jobsMap[job.Name]; !found { | ||
allErrs = append(allErrs, field.Invalid(podSpecOverridesPath, newObj.Spec.PodSpecOverrides, fmt.Sprintf("job: %s, configured in the podOverride should be present in the referenced training runtime", job))) | ||
} | ||
} | ||
if len(override.Containers) != 0 { | ||
// validate if containerOverrides are valid | ||
containerMap := map[string]bool{} | ||
for _, job := range jobSet.Spec.ReplicatedJobs { | ||
for _, container := range job.Template.Spec.Template.Spec.Containers { | ||
containerMap[container.Name] = true | ||
} | ||
} | ||
containerOverridePath := podSpecOverridesPath.Index(idx) | ||
for _, container := range override.Containers { | ||
if _, found := containerMap[container.Name]; !found { | ||
allErrs = append(allErrs, field.Invalid(containerOverridePath, override.Containers, fmt.Sprintf("container: %s, configured in the containerOverride should be present in the referenced training runtime", container.Name))) | ||
} | ||
} | ||
} | ||
if len(override.InitContainers) != 0 { | ||
// validate if initContainerOverrides are valid | ||
initContainerMap := map[string]bool{} | ||
for _, job := range jobSet.Spec.ReplicatedJobs { | ||
for _, initContainer := range job.Template.Spec.Template.Spec.InitContainers { | ||
initContainerMap[initContainer.Name] = true | ||
} | ||
} | ||
initContainerOverridePath := podSpecOverridesPath.Index(idx) | ||
for _, container := range override.Containers { | ||
if _, found := initContainerMap[container.Name]; !found { | ||
allErrs = append(allErrs, field.Invalid(initContainerOverridePath, override.InitContainers, fmt.Sprintf("initContainer: %s, configured in the initContainerOverride should be present in the referenced training runtime", container.Name))) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
return nil, allErrs | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ package mpi | |
|
||
import ( | ||
"context" | ||
"strconv" | ||
|
||
"k8s.io/apimachinery/pkg/util/validation/field" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
@@ -55,7 +56,16 @@ func (m *MPI) EnforceMLPolicy(info *runtime.Info, trainJob *kubeflowv2.TrainJob) | |
return nil | ||
} | ||
|
||
// TODO: Need to implement validations for MPIJob. | ||
func (m *MPI) Validate(oldObj, newObj *kubeflowv2.TrainJob) (admission.Warnings, field.ErrorList) { | ||
return nil, nil | ||
func (m *MPI) Validate(runtimeJobTemplate client.Object, runtimeInfo *runtime.Info, oldJobObj, newJobObj *kubeflowv2.TrainJob) (admission.Warnings, field.ErrorList) { | ||
var allErrs field.ErrorList | ||
specPath := field.NewPath("spec") | ||
if newJobObj.Spec.Trainer != nil { | ||
numProcPerNodePath := specPath.Child("trainer").Child("numProcPerNode") | ||
if runtimeInfo.RuntimePolicy.MLPolicy != nil && runtimeInfo.RuntimePolicy.MLPolicy.MPI != nil { | ||
if _, err := strconv.Atoi(*newJobObj.Spec.Trainer.NumProcPerNode); err != nil { | ||
allErrs = append(allErrs, field.Invalid(numProcPerNodePath, newJobObj.Spec.Trainer.NumProcPerNode, "should have an int value")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uhm, based on considering these string and integer conversion everywhere, we want to define the numProcPerNode as a typed IntOrString. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think so, is this value compatible with the k8s API conventions: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md ? |
||
} | ||
} | ||
} | ||
return nil, allErrs | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please can we implement the validation for exporter in the future once we design it as part of: #2245 ?
We should discuss whether we want to use sidecar container or another ReplicatedJob for model checkpointing.
cc @saileshd1402 @akshaychitneni @tenzen-y
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. Makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akshaychitneni Please can you remove the values from your PR that we will not use for now (e.g. JobExporter).