From 8796bea6be5249ff3b3c5e0506b5587d77513184 Mon Sep 17 00:00:00 2001 From: Johnu George Date: Thu, 3 Aug 2023 15:34:07 +0530 Subject: [PATCH 1/8] Removing dead code --- pkg/reconciler.v1/common/README.md | 24 - pkg/reconciler.v1/common/gang.go | 34 -- .../common/gang_scheduler_framework.go | 189 ------- pkg/reconciler.v1/common/gang_volcano.go | 203 -------- pkg/reconciler.v1/common/interface.go | 241 --------- pkg/reconciler.v1/common/job.go | 473 ------------------ pkg/reconciler.v1/common/pod.go | 278 ---------- pkg/reconciler.v1/common/pod_test.go | 140 ------ pkg/reconciler.v1/common/service.go | 226 --------- pkg/reconciler.v1/common/service_test.go | 97 ---- pkg/reconciler.v1/common/utils.go | 66 --- pkg/reconciler.v1/common/utils_test.go | 60 --- .../test_job/test_job_reconciler.go | 164 ------ 13 files changed, 2195 deletions(-) delete mode 100644 pkg/reconciler.v1/common/README.md delete mode 100644 pkg/reconciler.v1/common/gang.go delete mode 100644 pkg/reconciler.v1/common/gang_scheduler_framework.go delete mode 100644 pkg/reconciler.v1/common/gang_volcano.go delete mode 100644 pkg/reconciler.v1/common/interface.go delete mode 100644 pkg/reconciler.v1/common/job.go delete mode 100644 pkg/reconciler.v1/common/pod.go delete mode 100644 pkg/reconciler.v1/common/pod_test.go delete mode 100644 pkg/reconciler.v1/common/service.go delete mode 100644 pkg/reconciler.v1/common/service_test.go delete mode 100644 pkg/reconciler.v1/common/utils.go delete mode 100644 pkg/reconciler.v1/common/utils_test.go delete mode 100644 test_job/reconciler.v1/test_job/test_job_reconciler.go diff --git a/pkg/reconciler.v1/common/README.md b/pkg/reconciler.v1/common/README.md deleted file mode 100644 index 5a1b0a6832..0000000000 --- a/pkg/reconciler.v1/common/README.md +++ /dev/null @@ -1,24 +0,0 @@ -## Reconciler.v1 - -This is package providing most functionalities in `pkg/controller.v1` in the form of [reconciler](https://book.kubebuilder.io/cronjob-tutorial/controller-overview.html). - -To use the reconciler, following methods must be overridden according to the APIs the reconciler handles. - -```go -// GetJob returns the job that matches the request -func (r *JobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error) - -// ExtractReplicasSpec extracts the ReplicasSpec map from this job -func (r *JobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error) - -// ExtractRunPolicy extracts the RunPolicy from this job -func (r *JobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error) - -// ExtractJobStatus extracts the JobStatus from this job -func (r *JobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error) - -// IsMasterRole checks if Pod is the master Pod -func (r *JobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool -``` - -A simple example can be found at `test_job/reconciler.v1/test_job/test_job_reconciler.go`. diff --git a/pkg/reconciler.v1/common/gang.go b/pkg/reconciler.v1/common/gang.go deleted file mode 100644 index 79966da687..0000000000 --- a/pkg/reconciler.v1/common/gang.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -import ( - "sigs.k8s.io/controller-runtime/pkg/client" -) - -// BaseGangReconciler defines a basic gang reconciler -type BaseGangReconciler struct { - Enabled bool -} - -// GangSchedulingEnabled returns if gang-scheduling is enabled for all jobs -func (r *BaseGangReconciler) GangSchedulingEnabled() bool { - return r.Enabled -} - -// GetPodGroupName returns the name of PodGroup for this job -func (r *BaseGangReconciler) GetPodGroupName(job client.Object) string { - return job.GetName() -} diff --git a/pkg/reconciler.v1/common/gang_scheduler_framework.go b/pkg/reconciler.v1/common/gang_scheduler_framework.go deleted file mode 100644 index 49beb748e4..0000000000 --- a/pkg/reconciler.v1/common/gang_scheduler_framework.go +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright 2023 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -import ( - "context" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - controllerv1 "github.com/kubeflow/training-operator/pkg/controller.v1/common" - "github.com/kubeflow/training-operator/pkg/util/k8sutil" - - log "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - schedulingv1 "k8s.io/api/scheduling/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" -) - -// SchedulerFrameworkReconciler defines a gang-scheduling reconciler for Kubernetes Scheduler Framework -type SchedulerFrameworkReconciler struct { - BaseGangReconciler - ReconcilerUtilInterface - client.Client - SchedulerName string -} - -func BareSchedulerFrameworkReconciler(client client.Client, bgReconciler *BaseGangReconciler, enabled bool) *SchedulerFrameworkReconciler { - if bgReconciler == nil { - bgReconciler = &BaseGangReconciler{} - } - bgReconciler.Enabled = enabled - return &SchedulerFrameworkReconciler{ - BaseGangReconciler: *bgReconciler, - Client: client, - } -} - -func (r *SchedulerFrameworkReconciler) OverrideForGangSchedulingInterface(ui ReconcilerUtilInterface) { - if ui != nil { - r.ReconcilerUtilInterface = ui - } -} - -// GetGangSchedulerName returns the name of Gang Scheduler will be used. -func (r *SchedulerFrameworkReconciler) GetGangSchedulerName() string { - return r.SchedulerName -} - -// GangSchedulingEnabled returns if gang-scheduling is enabled for all jobs -func (r *SchedulerFrameworkReconciler) GangSchedulingEnabled() bool { - return r.BaseGangReconciler.GangSchedulingEnabled() -} - -// GetPodGroupName returns the name of PodGroup for this job -func (r *SchedulerFrameworkReconciler) GetPodGroupName(job client.Object) string { - return r.BaseGangReconciler.GetPodGroupName(job) -} - -// GetPodGroupForJob returns the PodGroup associated with this job -func (r *SchedulerFrameworkReconciler) GetPodGroupForJob(ctx context.Context, job client.Object) (client.Object, error) { - var pg *schedulerpluginsv1alpha1.PodGroup = nil - err := r.Get(ctx, types.NamespacedName{ - Namespace: job.GetNamespace(), - Name: r.GetPodGroupName(job), - }, pg) - return pg, err -} - -// DeletePodGroup delete the PodGroup associated with this job -func (r *SchedulerFrameworkReconciler) DeletePodGroup(ctx context.Context, job client.Object) error { - pg := &schedulerpluginsv1alpha1.PodGroup{} - pg.SetNamespace(job.GetNamespace()) - pg.SetName(r.GetPodGroupName(job)) - return client.IgnoreNotFound(r.Delete(ctx, pg)) -} - -// ReconcilePodGroup reconciles the PodGroup resource for this job -func (r *SchedulerFrameworkReconciler) ReconcilePodGroup( - ctx context.Context, - job client.Object, - runPolicy *kubeflowv1.RunPolicy, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, -) error { - minMember := k8sutil.GetTotalReplicas(replicas) - var scheduleTimeoutSeconds *int32 - var minResources *corev1.ResourceList - - if runPolicy.SchedulingPolicy != nil { - if minAvailable := runPolicy.SchedulingPolicy.MinAvailable; minAvailable != nil { - minMember = *minAvailable - } - if timeout := runPolicy.SchedulingPolicy.ScheduleTimeoutSeconds; timeout != nil { - scheduleTimeoutSeconds = timeout - } - if mr := runPolicy.SchedulingPolicy.MinResources; mr != nil { - minResources = (*corev1.ResourceList)(mr) - } - } - - if minResources == nil { - minResources = r.calcPGMinResources(minMember, replicas) - } - - pgSpec := schedulerpluginsv1alpha1.PodGroupSpec{ - MinMember: minMember, - MinResources: *minResources, - ScheduleTimeoutSeconds: scheduleTimeoutSeconds, - } - - // Check if exist - pg := &schedulerpluginsv1alpha1.PodGroup{} - err := r.Get(ctx, types.NamespacedName{ - Name: r.GetPodGroupName(job), - Namespace: job.GetNamespace(), - }, pg) - // If created, check updates, otherwise create it. - if err == nil { - pg.Spec = pgSpec - err = r.Update(ctx, pg) - } - - if errors.IsNotFound(err) { - pg.TypeMeta = metav1.TypeMeta{ - APIVersion: schedulerpluginsv1alpha1.SchemeGroupVersion.String(), - Kind: "PodGroup", - } - pg.ObjectMeta = metav1.ObjectMeta{ - Name: r.GetPodGroupName(job), - Namespace: job.GetNamespace(), - } - pg.Spec = pgSpec - err = controllerutil.SetControllerReference(job, pg, r.GetScheme()) - if err == nil { - err = r.Create(ctx, pg) - } - } - - if err != nil { - log.Warnf("Sync PodGroup %v: %v", klog.KObj(pg), err) - return err - } - return nil -} - -// DecoratePodForGangScheduling decorates the podTemplate before it's used to generate a pod with information for gang-scheduling -func (r *SchedulerFrameworkReconciler) DecoratePodForGangScheduling( - _ string, - podTemplate *corev1.PodTemplateSpec, - job client.Object, -) { - if podTemplate.Spec.SchedulerName == "" { - podTemplate.Spec.SchedulerName = r.GetGangSchedulerName() - } - - if podTemplate.Labels == nil { - podTemplate.Labels = make(map[string]string) - } - podTemplate.Labels[schedulerpluginsv1alpha1.PodGroupLabel] = job.GetName() -} - -// calcPGMinResources calculates the minimal resources needed for this job. The value will be embedded into the associated PodGroup -func (r *SchedulerFrameworkReconciler) calcPGMinResources( - minMember int32, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, -) *corev1.ResourceList { - return controllerv1.CalcPGMinResources(minMember, replicas, - func(pc string) (*schedulingv1.PriorityClass, error) { - priorityClass := &schedulingv1.PriorityClass{} - err := r.Get(context.TODO(), types.NamespacedName{Name: pc}, priorityClass) - return priorityClass, err - }) -} diff --git a/pkg/reconciler.v1/common/gang_volcano.go b/pkg/reconciler.v1/common/gang_volcano.go deleted file mode 100644 index eddaa6e362..0000000000 --- a/pkg/reconciler.v1/common/gang_volcano.go +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -import ( - "context" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - controllerv1 "github.com/kubeflow/training-operator/pkg/controller.v1/common" - commonutil "github.com/kubeflow/training-operator/pkg/util" - "github.com/kubeflow/training-operator/pkg/util/k8sutil" - - log "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - schedulingv1 "k8s.io/api/scheduling/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - volcano "volcano.sh/apis/pkg/apis/scheduling/v1beta1" -) - -// VolcanoReconciler defines a gang-scheduling reconciler for volcano.sh/volcano -type VolcanoReconciler struct { - BaseGangReconciler - ReconcilerUtilInterface - client.Client -} - -const ( - // VolcanoPodGroupAnnotation defines which PodGroup is linked to this Pod in annotation - VolcanoPodGroupAnnotation = "scheduling.k8s.io/group-name" -) - -// BareVolcanoReconciler returns a VolcanoReconciler pointer with minimal components defined -func BareVolcanoReconciler(client client.Client, bgReconciler *BaseGangReconciler, enabled bool) *VolcanoReconciler { - if bgReconciler == nil { - bgReconciler = &BaseGangReconciler{} - } - bgReconciler.Enabled = enabled - return &VolcanoReconciler{ - BaseGangReconciler: *bgReconciler, - Client: client, - } -} - -// OverrideForGangSchedulingInterface reset ReconcilerUtilInterface used in this VolcanoReconciler -func (r *VolcanoReconciler) OverrideForGangSchedulingInterface(ui ReconcilerUtilInterface) { - if ui != nil { - r.ReconcilerUtilInterface = ui - } -} - -// GetGangSchedulerName returns the name of Gang Scheduler will be used, which is "volcano" for VolcanoReconciler -func (r *VolcanoReconciler) GetGangSchedulerName() string { - return "volcano" -} - -// GangSchedulingEnabled returns if gang-scheduling is enabled for all jobs -func (r *VolcanoReconciler) GangSchedulingEnabled() bool { - return r.BaseGangReconciler.GangSchedulingEnabled() -} - -// GetPodGroupName returns the name of PodGroup for this job -func (r *VolcanoReconciler) GetPodGroupName(job client.Object) string { - return r.BaseGangReconciler.GetPodGroupName(job) -} - -// GetPodGroupForJob returns the PodGroup associated with this job -func (r *VolcanoReconciler) GetPodGroupForJob(ctx context.Context, job client.Object) (client.Object, error) { - var pg *volcano.PodGroup = nil - err := r.Get(ctx, types.NamespacedName{ - Namespace: job.GetNamespace(), - Name: r.GetPodGroupName(job), - }, pg) - - return pg, err -} - -// DeletePodGroup delete the PodGroup associated with this job -func (r *VolcanoReconciler) DeletePodGroup(ctx context.Context, job client.Object) error { - pg := &volcano.PodGroup{} - pg.SetNamespace(job.GetNamespace()) - pg.SetName(r.GetPodGroupName(job)) - - err := r.Delete(ctx, pg) - if errors.IsNotFound(err) { - return nil - } - return err -} - -// ReconcilePodGroup reconciles the PodGroup resource for this job -func (r *VolcanoReconciler) ReconcilePodGroup( - ctx context.Context, - job client.Object, - runPolicy *kubeflowv1.RunPolicy, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec) error { - - minMember := k8sutil.GetTotalReplicas(replicas) - queue := "" - priorityClass := "" - var minResources *corev1.ResourceList - - if runPolicy.SchedulingPolicy != nil { - if runPolicy.SchedulingPolicy.MinAvailable != nil { - minMember = *runPolicy.SchedulingPolicy.MinAvailable - } - - if runPolicy.SchedulingPolicy.Queue != "" { - queue = runPolicy.SchedulingPolicy.Queue - } - - if runPolicy.SchedulingPolicy.PriorityClass != "" { - priorityClass = runPolicy.SchedulingPolicy.PriorityClass - } - - if runPolicy.SchedulingPolicy.MinResources != nil { - minResources = (*corev1.ResourceList)(runPolicy.SchedulingPolicy.MinResources) - } - } - - if minResources == nil { - minResources = r.calcPGMinResources(minMember, replicas) - } - - pgSpec := volcano.PodGroupSpec{ - MinMember: minMember, - Queue: queue, - PriorityClassName: priorityClass, - MinResources: minResources, - } - - // Check if exist - pg := &volcano.PodGroup{} - err := r.Get(ctx, types.NamespacedName{Namespace: job.GetNamespace(), Name: r.GetPodGroupName(job)}, pg) - // If Created, check updates, otherwise create it - if err == nil { - pg.Spec = pgSpec - err = r.Update(ctx, pg) - } - - if errors.IsNotFound(err) { - pg.ObjectMeta = metav1.ObjectMeta{ - Name: r.GetPodGroupName(job), - Namespace: job.GetNamespace(), - } - pg.Spec = pgSpec - err = controllerutil.SetControllerReference(job, pg, r.GetScheme()) - if err == nil { - err = r.Create(ctx, pg) - } - } - - if err != nil { - log.Warnf("Sync PodGroup %v: %v", - types.NamespacedName{Namespace: job.GetNamespace(), Name: r.GetPodGroupName(job)}, err) - return err - } - - return nil -} - -// DecoratePodForGangScheduling decorates the podTemplate before it's used to generate a pod with information for gang-scheduling -func (r *VolcanoReconciler) DecoratePodForGangScheduling(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object) { - if podTemplate.Spec.SchedulerName == "" || podTemplate.Spec.SchedulerName == r.GetGangSchedulerName() { - podTemplate.Spec.SchedulerName = r.GetGangSchedulerName() - } else { - warnMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten" - commonutil.LoggerForReplica(job, rtype).Warn(warnMsg) - r.GetRecorder().Event(job, corev1.EventTypeWarning, "PodTemplateSchedulerNameAlreadySet", warnMsg) - } - - if podTemplate.Annotations == nil { - podTemplate.Annotations = map[string]string{} - } - - podTemplate.Annotations[VolcanoPodGroupAnnotation] = job.GetName() -} - -// calcPGMinResources calculates the minimal resources needed for this job. The value will be embedded into the associated PodGroup -func (r *VolcanoReconciler) calcPGMinResources(minMember int32, replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec) *corev1.ResourceList { - pcGetFunc := func(pc string) (*schedulingv1.PriorityClass, error) { - priorityClass := &schedulingv1.PriorityClass{} - err := r.Get(context.Background(), types.NamespacedName{Name: pc}, priorityClass) - return priorityClass, err - } - - return controllerv1.CalcPGMinResources(minMember, replicas, pcGetFunc) -} diff --git a/pkg/reconciler.v1/common/interface.go b/pkg/reconciler.v1/common/interface.go deleted file mode 100644 index 59e13cd0ef..0000000000 --- a/pkg/reconciler.v1/common/interface.go +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -import ( - "context" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - - "github.com/go-logr/logr" - "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -// ReconcilerUtilInterface defines the abstract interface of reconciler on utility features, such like get event -// recorder or logger -type ReconcilerUtilInterface interface { - // GetReconcilerName SHOULD be overridden if a new Reconciler is defined. The default implementation returns - // "common-reconciler" - GetReconcilerName() string - - // GetRecorder CAN be overridden to customize EventRecorder - GetRecorder() record.EventRecorder - - // GetLogger CAN be overridden to customize logger - GetLogger(job client.Object) logr.Logger - - // GetScheme CAN be overridden to customize runtime scheme - GetScheme() *runtime.Scheme -} - -// GangSchedulingInterface defines the abstract interface for gang-scheduling related actions, such like get, create or -// delete PodGroup -type GangSchedulingInterface interface { - // OverrideForGangSchedulingInterface MUST NOT be overridden as it resets ReconcilerUtilInterface - OverrideForGangSchedulingInterface(ui ReconcilerUtilInterface) - - // GangSchedulingEnabled CAN be overridden if definition of gang-scheduling enabling changes. - GangSchedulingEnabled() bool - - // GetGangSchedulerName CAN be overridden to customize the name of gang scheduler. This name will be used to check - // the value of podTemplateSpec.Spec.SchedulerName. For volcano, it is "volcano". - GetGangSchedulerName() string - - // GetPodGroupName CAN be overridden to customize the name of PodGroup generated for the job. For example: - // podGroupName := fmt.Sprintf("%s-podgroup", job.GetName()) or podGroupName := job.GetName() - GetPodGroupName(job client.Object) string - - // GetPodGroupForJob SHOULD be overridden if Group, APIVersion or Kind changes for PodGroup. The PodGroup is - // defined in different gang-scheduler as: - // Kube-Batch: "scheduling.incubator.k8s.io/v1alpha1/PodGroup", "scheduling.sigs.dev/v1alpha2/PodGroup" - // Volcano: "scheduling.volcano.sh/v1beta1/PodGroup" - // Scheduler-Framework: "scheduling.sigs.k8s.io/v1alpha1/PodGroup" - GetPodGroupForJob(ctx context.Context, job client.Object) (client.Object, error) - - // DeletePodGroup SHOULD be overridden if Group, APIVersion and Kind changes for PodGroup. - DeletePodGroup(ctx context.Context, job client.Object) error - - // ReconcilePodGroup CAN be overridden if the logic to reconcile PodGroup changes. - ReconcilePodGroup(ctx context.Context, job client.Object, runPolicy *kubeflowv1.RunPolicy, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec) error - - // DecoratePodForGangScheduling SHOULD be overridden if gang scheduler demands Pods associated with PodGroup to be - // decorated with specific requests. - DecoratePodForGangScheduling(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object) -} - -// PodInterface defines the abstract interface for Pod related actions, such like get, create or delete Pod -type PodInterface interface { - // OverrideForPodInterface MUST NOT be overridden as it reset ReconcilerUtilInterface, GangSchedulingInterface, JobInterface - OverrideForPodInterface(ui ReconcilerUtilInterface, gi GangSchedulingInterface, ji JobInterface) - - // GetDefaultContainerName CAN be overridden if the default container name is not "kubeflow". - GetDefaultContainerName() string - - // GenPodName CAN be overridden to customize Pod name. - GenPodName(jobName string, rtype string, index string) string - - // GetPodsForJob CAN be overridden to customize how to list all pods with the job. - GetPodsForJob(ctx context.Context, job client.Object) ([]*corev1.Pod, error) - - // FilterPodsForReplicaType CAN be overridden if the linking approach between pods and replicaType changes as this - // function filters out pods for specific replica type from all pods associated with the job. - FilterPodsForReplicaType(pods []*corev1.Pod, replicaType string) ([]*corev1.Pod, error) - - // GetPodSlices SHOULD NOT be overridden as it generates pod slices for further pod processing. - GetPodSlices(pods []*corev1.Pod, replicas int, logger *logrus.Entry) [][]*corev1.Pod - - // ReconcilePods CAN be overridden if the logic to reconcile all Pods for the job changes. - ReconcilePods( - ctx context.Context, - job client.Object, - jobStatus *kubeflowv1.JobStatus, - pods []*corev1.Pod, - rtype kubeflowv1.ReplicaType, - spec *kubeflowv1.ReplicaSpec, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec) error - - // CreateNewPod CAN be overridden to customize how to create a new pod. - CreateNewPod(job client.Object, rt string, index string, - spec *kubeflowv1.ReplicaSpec, masterRole bool, replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec) error - - // DeletePod CAN be overridden to customize how to delete a pod of {name} in namespace {ns}. - DeletePod(ctx context.Context, ns string, name string) error - - // DecoratePod CAN be overridden if customization to the pod is needed. The default implementation applies nothing - // to the pod. - DecoratePod(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object) -} - -// ServiceInterface defines the abstract interface for Pod related actions, such like get, create or delete Service -type ServiceInterface interface { - // OverrideForServiceInterface MUST NOT be overridden as it reset ReconcilerUtilInterface, PodInterface, JobInterface - OverrideForServiceInterface(ui ReconcilerUtilInterface, pi PodInterface, ji JobInterface) - - // GetPortsFromJob CAN be overridden to customize how to find ports defined in the ReplicasSpec. - GetPortsFromJob(spec *kubeflowv1.ReplicaSpec) (map[string]int32, error) - - // GetServicesForJob CAN be overridden to customize how to find all services associated with this job. - GetServicesForJob(ctx context.Context, job client.Object) ([]*corev1.Service, error) - - // FilterServicesForReplicaType CAN be overridden to customize how to filter out services for this Replica Type. - FilterServicesForReplicaType(services []*corev1.Service, replicaType string) ([]*corev1.Service, error) - - // GetServiceSlices CAN be overridden to customize how to generate service slices. - GetServiceSlices(services []*corev1.Service, replicas int, logger *logrus.Entry) [][]*corev1.Service - - // ReconcileServices CAN be overridden to customize how to reconcile services for this job. - ReconcileServices( - job client.Object, - services []*corev1.Service, - rtype kubeflowv1.ReplicaType, - spec *kubeflowv1.ReplicaSpec) error - - // CreateNewService CAN be overridden to customize how to create a new service. - CreateNewService(job client.Object, rtype kubeflowv1.ReplicaType, - spec *kubeflowv1.ReplicaSpec, index string) error - - // DeleteService CAN be overridden to customize how to delete the service of {name} in namespace {ns}. - DeleteService(ns string, name string, job client.Object) error - - // DecorateService CAN be overridden to customize this service right before being created - DecorateService(rtype string, svc *corev1.Service, job client.Object) -} - -// JobInterface defines the abstract interface for Pod related actions, such like get, create or delete TFJob, -// PyTorchJob or KFJob, etc. -type JobInterface interface { - // OverrideForJobInterface MUST NOT be overridden as it reset ReconcilerUtilInterface, PodInterface, ServiceInterface, JobInterface - OverrideForJobInterface(ui ReconcilerUtilInterface, pi PodInterface, si ServiceInterface, gi GangSchedulingInterface) - - // GenLabels CAN be overridden to customize generic label generated for Pods and Services - GenLabels(jobName string) map[string]string - - // GetGroupNameLabelValue CAN be overridden to customize value used in labels regarding Group of job processed. - GetGroupNameLabelValue() string - - // GetJob MUST be overridden to get jobs with specified kind - GetJob(ctx context.Context, req ctrl.Request) (client.Object, error) - - // ExtractReplicasSpec MUST be overridden to extract ReplicasSpec from a job - ExtractReplicasSpec(job client.Object) (map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, error) - - // ExtractRunPolicy MUST be overridden to extract the pointer of RunPolicy from a job - ExtractRunPolicy(job client.Object) (*kubeflowv1.RunPolicy, error) - - // ExtractJobStatus MUST be overridden to extract the pointer of JobStatus from a job - ExtractJobStatus(job client.Object) (*kubeflowv1.JobStatus, error) - - // IsMasterRole MUST be overridden to determine whether this ReplicaType with index specified is a master role. - // MasterRole pod will have "job-role=master" set in its label - IsMasterRole(replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, rtype kubeflowv1.ReplicaType, index int) bool - - // ReconcileJob CAN be overridden to customize how to reconcile a job. - ReconcileJob( - ctx context.Context, - job client.Object, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, - status *kubeflowv1.JobStatus, - runPolicy *kubeflowv1.RunPolicy) error - - // DeleteJob CAN be overridden to customize how to delete a job. - DeleteJob(job client.Object) error - - // UpdateJobStatus CAN be overridden to customize how to update job status without submitting to APIServer. - UpdateJobStatus( - job client.Object, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, - jobStatus *kubeflowv1.JobStatus) error - - // UpdateJobStatusInAPIServer CAN be overridden to customize how to update job status directly to APIServer. - UpdateJobStatusInAPIServer(ctx context.Context, job client.Object) error - - // CleanupResources CAN be overridden to customize how to delete all resources associated with this job. - CleanupResources(runPolicy *kubeflowv1.RunPolicy, status kubeflowv1.JobStatus, job client.Object) error - - // CleanupJob CAN be overridden to customize how to clean up this job. - CleanupJob(runPolicy *kubeflowv1.RunPolicy, status kubeflowv1.JobStatus, job client.Object) error - - // RecordAbnormalPods CAN be overridden to customize how to record abnormal pods - RecordAbnormalPods(activePods []*corev1.Pod, object client.Object) - - // SetStatusForSuccessJob CAN be overridden to customize how to set status for success job - SetStatusForSuccessJob(status *kubeflowv1.JobStatus) - - // IsFlagReplicaTypeForJobStatus CAN be overridden to customize how to determine if this ReplicaType is the - // flag ReplicaType for the status of this kind of job - IsFlagReplicaTypeForJobStatus(rtype string) bool - - // IsJobSucceeded CAN be overridden to customize how to determine if this job is succeeded. - IsJobSucceeded(status kubeflowv1.JobStatus) bool - - // IsJobFailed CAN be overridden to customize how to determine if this job is failed. - IsJobFailed(status kubeflowv1.JobStatus) bool - - // ShouldCleanUp CAN be overridden to customize how to determine if this job should be cleaned up. - ShouldCleanUp(status kubeflowv1.JobStatus) bool - - // PastBackoffLimit CAN be overridden to customize how to determine if this job has past backoff limit. - PastBackoffLimit(jobName string, runPolicy *kubeflowv1.RunPolicy, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, pods []*corev1.Pod) (bool, error) - - // PastActiveDeadline CAN be overridden to customize how to determine if this job has past activate deadline. - PastActiveDeadline(runPolicy *kubeflowv1.RunPolicy, jobStatus *kubeflowv1.JobStatus) bool -} diff --git a/pkg/reconciler.v1/common/job.go b/pkg/reconciler.v1/common/job.go deleted file mode 100644 index 0d24654337..0000000000 --- a/pkg/reconciler.v1/common/job.go +++ /dev/null @@ -1,473 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -import ( - "context" - "fmt" - "reflect" - "strings" - "time" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "github.com/kubeflow/training-operator/pkg/core" - commonutil "github.com/kubeflow/training-operator/pkg/util" - "github.com/kubeflow/training-operator/pkg/util/k8sutil" - - "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -const ( - GroupName = "kubeflow.org" - - ReasonKey = "reason" - ReasonJobDeleted = "job deleted" - - MsgReconcileCancelled = "Reconcile Cancelled" - MsgReconcileStart = "Reconcile Starts" - - MsgGetPodsFailed = "Get Pods Failed" - MsgGetServicesFailed = "Get Services Failed" - - MsgBackoffLimitReachedTemplate = "Job %s has failed because it has reached the specified backoff limit" - MsgActiveDeadlineReachedTemplate = "Job %s has failed because it was active longer than specified deadline" - - ErrUpdateJobConditionsFailed = "failed to update job conditions" - - ErrUpdateJobErrorTemplate = "UpdateJobStatus error %v" - ErrAppendJobConditionTemplate = "Append job condition error %v" - ErrReconcilePodsTemplate = "ReconcilePods error %v" - ErrReconcileServicesTemplate = "ReconcileServices error %v" - ErrReconcileGangTemplate = "ReconcilePodGroups error %v" - ErrGetReplicasStatusFromStatusFailedTemplate = "failed to get ReplicasStatus for %s from status" - - WarnDefaultImplementationTemplate = "Warning: executing default implementation for JobReconciler.%s" - WarnNotCountedInBackoffLimit = "The restart policy of replica %v of the job %v is not OnFailure or Always. Not counted in backoff limit." -) - -// JobReconciler defines a Reconciler dealing with generic training job -type JobReconciler struct { - client.Client - ReconcilerUtilInterface - PodInterface - ServiceInterface - GangSchedulingInterface - counter *commonutil.Counter -} - -// BareJobReconciler returns the pointer of a JobReconciler with minimal implementation -func BareJobReconciler(client client.Client) *JobReconciler { - return &JobReconciler{ - Client: client, - counter: commonutil.NewCounter(), - } -} - -// OverrideForJobInterface resets ReconcilerUtilInterface, PodInterface, ServiceInterface, GangSchedulingInterface used in JobReconciler -func (r *JobReconciler) OverrideForJobInterface(ui ReconcilerUtilInterface, pi PodInterface, si ServiceInterface, gi GangSchedulingInterface) { - if ui != nil { - r.ReconcilerUtilInterface = ui - } - if pi != nil { - r.PodInterface = pi - } - if si != nil { - r.ServiceInterface = si - } - if gi != nil { - r.GangSchedulingInterface = gi - } -} - -// GenLabels returns labels used for this job (based on the name of this generic training job) -func (r *JobReconciler) GenLabels(jobName string) map[string]string { - jobName = strings.Replace(jobName, "/", "-", -1) - return map[string]string{ - kubeflowv1.OperatorNameLabel: r.GetReconcilerName(), - kubeflowv1.JobNameLabel: jobName, - } -} - -// GetGroupNameLabelValue returns the Group Name for the generic training job, which is "kubeflow.org" -func (r *JobReconciler) GetGroupNameLabelValue() string { - return GroupName -} - -// ReconcileJob reconciles generic training job -func (r *JobReconciler) ReconcileJob( - ctx context.Context, - job client.Object, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, - status *kubeflowv1.JobStatus, - runPolicy *kubeflowv1.RunPolicy) error { - - logger := r.GetLogger(job) - logger.Info(MsgReconcileStart) - jobKind := job.GetObjectKind().GroupVersionKind().Kind - - oldStatus := status.DeepCopy() - - var err error - if r.ShouldCleanUp(*status) { - if err = r.CleanupResources(runPolicy, *status, job); err != nil { - return err - } - if err = r.CleanupJob(runPolicy, *status, job); err != nil { - return err - } - if r.IsJobSucceeded(*status) { - r.SetStatusForSuccessJob(status) - } - if !reflect.DeepEqual(*oldStatus, *status) { - return r.UpdateJobStatusInAPIServer(ctx, job) - } - return nil - } - - pods, err := r.GetPodsForJob(ctx, job) - if err != nil { - logger.Info(MsgGetPodsFailed) - return err - } - - services, err := r.GetServicesForJob(ctx, job) - if err != nil { - logger.Info(MsgGetServicesFailed) - return err - } - - previousRetry, _ := r.counter.Counts(types.NamespacedName{ - Namespace: job.GetNamespace(), - Name: job.GetName(), - }.String()) - if previousRetry < 0 { - // TODO: may be we should abort here? - previousRetry = 0 - } - - activePods := k8sutil.FilterActivePods(pods) - r.RecordAbnormalPods(activePods, job) - - active := int32(len(activePods)) - failed := k8sutil.FilterPodCount(pods, corev1.PodFailed) - totalReplicas := k8sutil.GetTotalReplicas(replicas) - prevReplicasFailedNum := k8sutil.GetTotalFailedReplicas(status.ReplicaStatuses) - - var failureMessage string - jobExceedsLimit := false - exceedsBackoffLimit := false - pastBackoffLimit := false - - if runPolicy.BackoffLimit != nil { - jobHasNewFailure := failed > prevReplicasFailedNum - exceedsBackoffLimit = jobHasNewFailure && (active != totalReplicas) && - (int32(previousRetry)+1 > *runPolicy.BackoffLimit) - - pastBackoffLimit, err = r.PastBackoffLimit(job.GetName(), runPolicy, replicas, pods) - if err != nil { - return err - } - } - - if exceedsBackoffLimit || pastBackoffLimit { - // check if the number of pod restart exceeds backoff (for restart OnFailure only) - // OR if the number of failed jobs increased since the last syncJob - jobExceedsLimit = true - failureMessage = fmt.Sprintf(MsgBackoffLimitReachedTemplate, job.GetName()) - } else if r.PastActiveDeadline(runPolicy, status) { - failureMessage = fmt.Sprintf(MsgActiveDeadlineReachedTemplate, job.GetName()) - jobExceedsLimit = true - } - - if jobExceedsLimit { - if status.CompletionTime == nil { - now := metav1.Now() - status.CompletionTime = &now - } - if err = r.CleanupResources(runPolicy, *status, job); err != nil { - return err - } - if err = r.CleanupJob(runPolicy, *status, job); err != nil { - return err - } - if r.IsJobSucceeded(*status) { - r.SetStatusForSuccessJob(status) - } - r.GetRecorder().Event(job, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) - commonutil.UpdateJobConditions(status, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) - - return r.UpdateJobStatusInAPIServer(ctx, job) - } - - if r.GangSchedulingEnabled() { - err = r.ReconcilePodGroup(ctx, job, runPolicy, replicas) - if err != nil { - logrus.Warnf(ErrReconcileGangTemplate, err) - return err - } - } - - for rtype, spec := range replicas { - core.InitializeReplicaStatuses(status, rtype) - - err = r.ReconcilePods(ctx, job, status, pods, rtype, spec, replicas) - if err != nil { - logrus.Warnf(ErrReconcilePodsTemplate, err) - return err - } - - err = r.ReconcileServices(job, services, rtype, spec) - if err != nil { - logrus.Warnf(ErrReconcileServicesTemplate, err) - return err - } - } - - err = r.UpdateJobStatus(job, replicas, status) - if err != nil { - logrus.Warnf(ErrUpdateJobErrorTemplate, err) - return err - } - - if !reflect.DeepEqual(*oldStatus, status) { - return r.UpdateJobStatusInAPIServer(ctx, job) - } - - return nil -} - -// DeleteJob deletes this generic training job -func (r *JobReconciler) DeleteJob(job client.Object) error { - return r.Delete(context.Background(), job) -} - -// RecordAbnormalPods records abnormal pods during the reconciliation of jobs -func (r *JobReconciler) RecordAbnormalPods(activePods []*corev1.Pod, object client.Object) { - core.RecordAbnormalPods(activePods, object, r.GetRecorder()) -} - -// SetStatusForSuccessJob sets the status for job that succeed -func (r *JobReconciler) SetStatusForSuccessJob(status *kubeflowv1.JobStatus) { - for rytpe := range status.ReplicaStatuses { - status.ReplicaStatuses[rytpe].Succeeded += status.ReplicaStatuses[rytpe].Active - status.ReplicaStatuses[rytpe].Active = 0 - } -} - -// UpdateJobStatus updates the status of this generic training job WITHOUT pushing the updated status to the APIServer -func (r *JobReconciler) UpdateJobStatus( - job client.Object, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, - jobStatus *kubeflowv1.JobStatus) error { - - logrus.Warnf(WarnDefaultImplementationTemplate, "UpdateJobStatus") - - jobKind := job.GetObjectKind().GroupVersionKind().Kind - jobNamespacedName := types.NamespacedName{Namespace: job.GetNamespace(), Name: job.GetName()}.String() - - logger := r.GetLogger(job) - - for rtype, spec := range replicas { - status, ok := jobStatus.ReplicaStatuses[rtype] - if !ok { - return fmt.Errorf(ErrGetReplicasStatusFromStatusFailedTemplate, rtype) - } - - succeeded := status.Succeeded - expected := *(spec.Replicas) - succeeded - running := status.Active - failed := status.Failed - - logrus.Infof("%s=%s, ReplicaType=%s expected=%d, running=%d, succeeded=%d , failed=%d", - jobKind, jobNamespacedName, rtype, expected, running, succeeded, failed) - - if r.IsFlagReplicaTypeForJobStatus(string(rtype)) { - if running > 0 { - msg := fmt.Sprintf("%s %s is running.", jobKind, jobNamespacedName) - commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg) - } - - if expected == 0 { - msg := fmt.Sprintf("%s %s is successfully completed.", jobKind, jobNamespacedName) - logrus.Info(msg) - r.GetRecorder().Event(job, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobSucceededReason), msg) - if jobStatus.CompletionTime == nil { - now := metav1.Now() - jobStatus.CompletionTime = &now - } - commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobSucceededReason), msg) - return nil - } - } - - if failed > 0 { - if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { - msg := fmt.Sprintf("%s %s is restarting because %d %s replica(s) failed.", - jobKind, jobNamespacedName, failed, rtype) - r.GetRecorder().Event(job, corev1.EventTypeWarning, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) - commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) - } else { - msg := fmt.Sprintf("%s %s is failed because %d %s replica(s) failed.", - jobKind, jobNamespacedName, failed, rtype) - if jobStatus.CompletionTime == nil { - now := metav1.Now() - jobStatus.CompletionTime = &now - } - commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), msg) - } - } - - } - - msg := fmt.Sprintf("%s %s is running.", jobKind, jobNamespacedName) - logger.Info(msg) - commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg) - - return nil -} - -// UpdateJobStatusInAPIServer updates the status of this generic training job in APIServer -func (r *JobReconciler) UpdateJobStatusInAPIServer(ctx context.Context, job client.Object) error { - return r.Status().Update(ctx, job) -} - -// CleanupResources cleans up all resources associated with this generic training job -func (r *JobReconciler) CleanupResources(runPolicy *kubeflowv1.RunPolicy, status kubeflowv1.JobStatus, job client.Object) error { - if *runPolicy.CleanPodPolicy == kubeflowv1.CleanPodPolicyNone { - return nil - } - ctx := context.Background() - cleanRunningPod := *runPolicy.CleanPodPolicy == kubeflowv1.CleanPodPolicyRunning - - if err := r.DeletePodGroup(ctx, job); err != nil { - return err - } - - pods, err := r.GetPodsForJob(ctx, job) - if err != nil { - return err - } - - for _, pod := range pods { - if cleanRunningPod && pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending { - continue - } - if err = r.Delete(ctx, pod); err != nil { - return err - } - // Each Pod may or may not has its service with same name - svc := &corev1.Service{} - err = r.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, svc) - if errors.IsNotFound(err) { - continue - } - if err != nil { - return err - } - if err = r.Delete(ctx, svc); err != nil { - return err - } - - } - - return nil -} - -// CleanupJob cleans up all resources associated with this generic training job as well as the job itself -func (r *JobReconciler) CleanupJob(runPolicy *kubeflowv1.RunPolicy, status kubeflowv1.JobStatus, job client.Object) error { - currentTime := time.Now() - - ttl := runPolicy.TTLSecondsAfterFinished - if ttl == nil { - return nil - } - - duration := time.Second * time.Duration(*ttl) - // todo: Is the jobStatus.CompletionTime maybe nil ? - finishTime := status.CompletionTime - expireTime := finishTime.Add(duration) - - if currentTime.After(expireTime) { - err := r.DeleteJob(job) - if err != nil { - commonutil.LoggerForJob(job).Warnf("Cleanup Job error: %v.", err) - return err - } - return nil - } else { - if finishTime.After(currentTime) { - commonutil.LoggerForJob(job).Warnf("Found Job finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.") - } - } - return nil -} - -// IsFlagReplicaTypeForJobStatus checks if this replicaType is the flag replicaType for the status of generic training job -func (r *JobReconciler) IsFlagReplicaTypeForJobStatus(rtype string) bool { - logrus.Warnf(WarnDefaultImplementationTemplate, "IsFlagReplicaTypeForJobStatus") - return true -} - -// IsJobSucceeded checks if this generic training job succeeded -func (r *JobReconciler) IsJobSucceeded(status kubeflowv1.JobStatus) bool { - return commonutil.IsSucceeded(status) -} - -// IsJobFailed checks if this generic training job failed -func (r *JobReconciler) IsJobFailed(status kubeflowv1.JobStatus) bool { - return commonutil.IsFailed(status) -} - -// ShouldCleanUp checks if resources associated with this generic training job should be cleaned up -func (r *JobReconciler) ShouldCleanUp(status kubeflowv1.JobStatus) bool { - return r.IsJobSucceeded(status) || r.IsJobFailed(status) -} - -// PastBackoffLimit checks if this generic training job has past backoff limit -func (r *JobReconciler) PastBackoffLimit(jobName string, runPolicy *kubeflowv1.RunPolicy, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, pods []*corev1.Pod) (bool, error) { - return core.PastBackoffLimit(jobName, runPolicy, replicas, pods, r.FilterPodsForReplicaType) -} - -// PastActiveDeadline checks if this generic training job has ActiveDeadlineSeconds field set and if it is exceeded. -func (r *JobReconciler) PastActiveDeadline(runPolicy *kubeflowv1.RunPolicy, jobStatus *kubeflowv1.JobStatus) bool { - return core.PastActiveDeadline(runPolicy, *jobStatus) -} - -func (r *JobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error) { - panic("implement me") -} - -func (r *JobReconciler) ExtractReplicasSpec(job client.Object) (map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, error) { - panic("implement me") -} - -func (r *JobReconciler) ExtractRunPolicy(job client.Object) (*kubeflowv1.RunPolicy, error) { - panic("implement me") -} - -func (r *JobReconciler) ExtractJobStatus(job client.Object) (*kubeflowv1.JobStatus, error) { - panic("implement me") -} - -func (r *JobReconciler) IsMasterRole(replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, rtype kubeflowv1.ReplicaType, index int) bool { - panic("implement me") -} diff --git a/pkg/reconciler.v1/common/pod.go b/pkg/reconciler.v1/common/pod.go deleted file mode 100644 index 1a4c3d63f2..0000000000 --- a/pkg/reconciler.v1/common/pod.go +++ /dev/null @@ -1,278 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -import ( - "context" - "strconv" - "strings" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - log "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "github.com/kubeflow/training-operator/pkg/core" - commonutil "github.com/kubeflow/training-operator/pkg/util" - trainutil "github.com/kubeflow/training-operator/pkg/util/train" -) - -// DefaultContainerName defines the default name for container in Pod -const DefaultContainerName = "kubeflow" - -var ( - // Prometheus metrics - createdPodsCount = promauto.NewCounter(prometheus.CounterOpts{ - Name: "reconciler_created_pods_total", - Help: "The total number of created pods", - }) - deletedPodsCount = promauto.NewCounter(prometheus.CounterOpts{ - Name: "reconciler_deleted_pods_total", - Help: "The total number of deleted pods", - }) - failedPodsCount = promauto.NewCounter(prometheus.CounterOpts{ - Name: "reconciler_failed_pods_total", - Help: "The total number of failed pods", - }) -) - -// PodReconciler defines a Pod Reconciler for generic training job -type PodReconciler struct { - client.Client - ReconcilerUtilInterface - GangSchedulingInterface - JobInterface -} - -// OverrideForPodInterface resets ReconcilerUtilInterface, GangSchedulingInterface, JobInterface for PodReconciler -func (r *PodReconciler) OverrideForPodInterface(ui ReconcilerUtilInterface, gi GangSchedulingInterface, ji JobInterface) { - if ui != nil { - r.ReconcilerUtilInterface = ui - } - if ji != nil { - r.JobInterface = ji - } - if gi != nil { - r.GangSchedulingInterface = gi - } -} - -// BarePodReconciler returns a pointer of BarePodReconciler with minimal implementation -func BarePodReconciler(client client.Client) *PodReconciler { - return &PodReconciler{Client: client} -} - -// GenPodName returns the name of the Pod based on jobName, replicaType and its index -func (r *PodReconciler) GenPodName(jobName string, rtype string, index string) string { - return core.GenGeneralName(jobName, rtype, index) -} - -// GetDefaultContainerName returns the default name of the container -func (r *PodReconciler) GetDefaultContainerName() string { - return DefaultContainerName -} - -// GetPodsForJob returns all Pods associated with this job -func (r *PodReconciler) GetPodsForJob(ctx context.Context, job client.Object) ([]*corev1.Pod, error) { - podList := &corev1.PodList{} - err := r.List(ctx, podList, client.MatchingLabels(r.GenLabels(job.GetName()))) - if err != nil { - return nil, err - } - - var pods []*corev1.Pod - for idx := range podList.Items { - pods = append(pods, &podList.Items[idx]) - } - - return pods, nil - // TODO: (zw0610) adding Claiming Pods -} - -// GetPodSlices generates podSlice from all Pods listed for this job -func (r *PodReconciler) GetPodSlices(pods []*corev1.Pod, replicas int, logger *log.Entry) [][]*corev1.Pod { - return core.GetPodSlices(pods, replicas, logger) -} - -// FilterPodsForReplicaType filters out Pods for this replicaType -func (r *PodReconciler) FilterPodsForReplicaType(pods []*corev1.Pod, replicaType string) ([]*corev1.Pod, error) { - return core.FilterPodsForReplicaType(pods, replicaType) -} - -// ReconcilePods reconciles Pods for this job -func (r *PodReconciler) ReconcilePods( - ctx context.Context, - job client.Object, - jobStatus *kubeflowv1.JobStatus, - pods []*corev1.Pod, - rType kubeflowv1.ReplicaType, - spec *kubeflowv1.ReplicaSpec, - replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec) error { - - rt := strings.ToLower(string(rType)) - // Convert ReplicaType to lower string. - logger := commonutil.LoggerForReplica(job, rt) - // Get all pods for the type rt. - pods, err := r.FilterPodsForReplicaType(pods, rt) - if err != nil { - return err - } - numReplicas := int(*spec.Replicas) - var masterRole bool - - core.InitializeReplicaStatuses(jobStatus, rType) - - // GetPodSlices will return enough information here to make decision to add/remove/update resources. - // - // For example, let's assume we have pods with replica-index 0, 1, 2 - // If replica is 4, return a slice with size 4. [[0],[1],[2],[]], a pod with replica-index 3 will be created. - // - // If replica is 1, return a slice with size 3. [[0],[1],[2]], pod with replica-index 1 and 2 are out of range and will be deleted. - podSlices := r.GetPodSlices(pods, numReplicas, logger) - for index, podSlice := range podSlices { - if len(podSlice) > 1 { - logger.Warningf("We have too many pods for %s %d", rt, index) - } else if len(podSlice) == 0 { - logger.Infof("Need to create new pod: %s-%d", rt, index) - - // check if this replica is the master role - masterRole = r.IsMasterRole(replicas, kubeflowv1.ReplicaType(rt), index) - err = r.CreateNewPod(job, rt, strconv.Itoa(index), spec, masterRole, replicas) - if err != nil { - return err - } - } else { - // Check the status of the current pod. - pod := podSlice[0] - - // check if the index is in the valid range, if not, we should kill the pod - if index < 0 || index >= numReplicas { - err = r.DeletePod(ctx, pod.Namespace, pod.Name) - if err != nil { - return err - } - } - - // Get the exit code of the container. - var exitCode int32 = 0xbeef // magic number - for _, status := range pod.Status.ContainerStatuses { - state := status.State - if status.Name == r.GetDefaultContainerName() && state.Terminated != nil { - exitCode = state.Terminated.ExitCode - logger.Infof("Pod: %v.%v exited with code %v", pod.Namespace, pod.Name, exitCode) - r.GetRecorder().Eventf(job, corev1.EventTypeNormal, "ExitedWithCode", "Pod: %v.%v exited with code %v", pod.Namespace, pod.Name, exitCode) - } - } - // Check if the pod is retryable. - if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { - if pod.Status.Phase == corev1.PodFailed && trainutil.IsRetryableExitCode(exitCode) { - failedPodsCount.Inc() - logger.Infof("Need to restart the pod: %v.%v", pod.Namespace, pod.Name) - if err = r.DeletePod(ctx, pod.Namespace, pod.Name); err != nil { - return err - } - } - } - - core.UpdateJobReplicaStatuses(jobStatus, rType, pod) - } - } - return nil - -} - -// CreateNewPod generate Pods for this job and submits creation request to APIServer -func (r *PodReconciler) CreateNewPod(job client.Object, rt string, index string, - spec *kubeflowv1.ReplicaSpec, masterRole bool, replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec) error { - - logger := commonutil.LoggerForReplica(job, rt) - - podLabels := r.GenLabels(job.GetName()) - podLabels[kubeflowv1.ReplicaTypeLabel] = rt - podLabels[kubeflowv1.ReplicaIndexLabel] = index - if masterRole { - podLabels[kubeflowv1.JobRoleLabel] = "master" - } - - podTemplate := spec.Template.DeepCopy() - - podTemplate.Name = r.GenPodName(job.GetName(), rt, index) - podTemplate.Namespace = job.GetNamespace() - if podTemplate.Labels == nil { - podTemplate.Labels = make(map[string]string) - } - - for key, value := range podLabels { - podTemplate.Labels[key] = value - } - - if podTemplate.Spec.RestartPolicy != corev1.RestartPolicy("") { - errMsg := "Restart policy in pod template will be overwritten by restart policy in replica spec" - logger.Warning(errMsg) - r.GetRecorder().Event(job, corev1.EventTypeWarning, "SettedPodTemplateRestartPolicy", errMsg) - } - if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { - podTemplate.Spec.RestartPolicy = corev1.RestartPolicyNever - } else { - podTemplate.Spec.RestartPolicy = corev1.RestartPolicy(spec.RestartPolicy) - } - - if r.GangSchedulingEnabled() { - r.DecoratePodForGangScheduling(rt, podTemplate, job) - } - - r.DecoratePod(rt, podTemplate, job) - - pod := &corev1.Pod{ - ObjectMeta: podTemplate.ObjectMeta, - Spec: podTemplate.Spec, - } - - err := controllerutil.SetControllerReference(job, pod, r.GetScheme()) - if err != nil { - return err - } - - err = r.Create(context.Background(), pod) - if err != nil && errors.IsTimeout(err) { - return nil - } else if err != nil { - return err - } - createdPodsCount.Inc() - return nil -} - -// DeletePod delete a Pod specified by name and namespace -func (r *PodReconciler) DeletePod(ctx context.Context, ns string, name string) error { - pod := &corev1.Pod{} - pod.Name = name - pod.Namespace = ns - err := r.Delete(ctx, pod) - if err == nil { - deletedPodsCount.Inc() - } - return err -} - -// DecoratePod decorates podTemplate before a Pod is submitted to the APIServer -func (r *PodReconciler) DecoratePod(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object) { - // Default implementation applies nothing to podTemplate - // return -} diff --git a/pkg/reconciler.v1/common/pod_test.go b/pkg/reconciler.v1/common/pod_test.go deleted file mode 100644 index e4a9d02e35..0000000000 --- a/pkg/reconciler.v1/common/pod_test.go +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common_test - -import ( - "testing" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - testjobv1 "github.com/kubeflow/training-operator/test_job/apis/test_job/v1" - "github.com/kubeflow/training-operator/test_job/reconciler.v1/test_job" - testutilv1 "github.com/kubeflow/training-operator/test_job/test_util/v1" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func TestGenPodName(t *testing.T) { - type tc struct { - testJob *testjobv1.TestJob - testRType string - testIndex string - expectedName string - } - testCase := []tc{ - func() tc { - tj := testutilv1.NewTestJob(1) - tj.SetName("hello-world") - return tc{ - testJob: tj, - testRType: string(testjobv1.TestReplicaTypeWorker), - testIndex: "1", - expectedName: "hello-world-worker-1", - } - }(), - } - - testReconciler := test_job.NewTestReconciler() - - for _, c := range testCase { - na := testReconciler.GenPodName(c.testJob.GetName(), c.testRType, c.testIndex) - if na != c.expectedName { - t.Errorf("Expected %s, got %s", c.expectedName, na) - } - } -} - -func PodInSlice(pod *corev1.Pod, pods []*corev1.Pod) bool { - for _, p := range pods { - if p.GetNamespace() == pod.GetNamespace() && p.GetName() == pod.GetName() { - return true - } - } - return false -} - -func TestFilterPodsForReplicaType(t *testing.T) { - type tc struct { - testPods []*corev1.Pod - testRType string - expectedPods []*corev1.Pod - } - testCase := []tc{ - func() tc { - tj := testutilv1.NewTestJob(3) - tj.SetName("hello-world") - - pod0 := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod0", - Namespace: "default", - Labels: map[string]string{ - kubeflowv1.ReplicaTypeLabel: string(testjobv1.TestReplicaTypeMaster), - }, - }, - Spec: corev1.PodSpec{}, - Status: corev1.PodStatus{}, - } - - pod1 := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "default", - Labels: map[string]string{ - kubeflowv1.ReplicaTypeLabel: string(testjobv1.TestReplicaTypeWorker), - }, - }, - Spec: corev1.PodSpec{}, - Status: corev1.PodStatus{}, - } - - pod2 := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod2", - Namespace: "default", - Labels: map[string]string{ - kubeflowv1.ReplicaTypeLabel: string(testjobv1.TestReplicaTypeWorker), - }, - }, - Spec: corev1.PodSpec{}, - Status: corev1.PodStatus{}, - } - - allPods := []*corev1.Pod{pod0, pod1, pod2} - filteredPods := []*corev1.Pod{pod1, pod2} - - return tc{ - testPods: allPods, - testRType: string(testjobv1.TestReplicaTypeWorker), - expectedPods: filteredPods, - } - }(), - } - - testReconciler := test_job.NewTestReconciler() - - for _, c := range testCase { - filtered, err := testReconciler.FilterPodsForReplicaType(c.testPods, c.testRType) - if err != nil { - t.Errorf("FilterPodsForReplicaType returns error %v", err) - } - for _, ep := range c.expectedPods { - if !PodInSlice(ep, filtered) { - t.Errorf("Cannot found expected pod %s", ep.GetName()) - } - } - - } -} diff --git a/pkg/reconciler.v1/common/service.go b/pkg/reconciler.v1/common/service.go deleted file mode 100644 index 7aad4351bb..0000000000 --- a/pkg/reconciler.v1/common/service.go +++ /dev/null @@ -1,226 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -import ( - "context" - "strconv" - "strings" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - "github.com/kubeflow/training-operator/pkg/core" - commonutil "github.com/kubeflow/training-operator/pkg/util" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - log "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" -) - -var ( - succeededServiceCreationCount = promauto.NewCounter(prometheus.CounterOpts{ - Name: "reconciler_succeeded_service_creation_total", - Help: "The total number of succeeded service creation", - }) - failedServiceCreationCount = promauto.NewCounter(prometheus.CounterOpts{ - Name: "reconciler_failed_service_creation_total", - Help: "The total number of failed service creation", - }) -) - -// ServiceReconciler defines a Service Reconciler for generic training job -type ServiceReconciler struct { - client.Client - ReconcilerUtilInterface - PodInterface - JobInterface -} - -// BareServiceReconciler returns a pointer of ServiceReconciler with minimal implementation -func BareServiceReconciler(client client.Client) *ServiceReconciler { - return &ServiceReconciler{ - Client: client, - } -} - -// OverrideForServiceInterface resets ReconcilerUtilInterface, PodInterface, JobInterface for ServiceReconciler -func (r *ServiceReconciler) OverrideForServiceInterface(ui ReconcilerUtilInterface, pi PodInterface, ji JobInterface) { - if ui != nil { - r.ReconcilerUtilInterface = ui - } - if pi != nil { - r.PodInterface = pi - } - if ji != nil { - r.JobInterface = ji - } -} - -// GetPortsFromJob gets the ports of job container. Port could be nil, if distributed communication strategy doesn't need and no other ports that need to be exposed. -func (r *ServiceReconciler) GetPortsFromJob(spec *kubeflowv1.ReplicaSpec) (map[string]int32, error) { - defaultContainerName := r.GetDefaultContainerName() - return core.GetPortsFromJob(spec, defaultContainerName) -} - -// GetServicesForJob returns all services associated with this job -func (r *ServiceReconciler) GetServicesForJob(ctx context.Context, job client.Object) ([]*corev1.Service, error) { - svcList := &corev1.ServiceList{} - err := r.List(ctx, svcList, client.MatchingLabels(r.GenLabels(job.GetName()))) - if err != nil { - return nil, err - } - - var svcs []*corev1.Service - for idx := range svcList.Items { - svcs = append(svcs, &svcList.Items[idx]) - } - - return svcs, nil -} - -// FilterServicesForReplicaType returns service belong to a replicaType. -func (r *ServiceReconciler) FilterServicesForReplicaType(services []*corev1.Service, - replicaType string) ([]*corev1.Service, error) { - return core.FilterServicesForReplicaType(services, replicaType) -} - -// GetServiceSlices returns the serviceSlice based on all Services listed for this job -func (r *ServiceReconciler) GetServiceSlices(services []*corev1.Service, replicas int, logger *log.Entry) [][]*corev1.Service { - return core.GetServiceSlices(services, replicas, logger) -} - -// ReconcileServices reconciles the Services for this job -func (r *ServiceReconciler) ReconcileServices( - job client.Object, - services []*corev1.Service, - rtype kubeflowv1.ReplicaType, - spec *kubeflowv1.ReplicaSpec) error { - - // Convert ReplicaType to lower string. - rt := strings.ToLower(string(rtype)) - replicas := int(*spec.Replicas) - // Get all services for the type rt. - services, err := r.FilterServicesForReplicaType(services, rt) - if err != nil { - return err - } - - // GetServiceSlices will return enough information here to make decision to add/remove/update resources. - // - // For example, let's assume we have services with replica-index 0, 1, 2 - // If replica is 4, return a slice with size 4. [[0],[1],[2],[]], a svc with replica-index 3 will be created. - // - // If replica is 1, return a slice with size 3. [[0],[1],[2]], svc with replica-index 1 and 2 are out of range and will be deleted. - serviceSlices := r.GetServiceSlices(services, replicas, commonutil.LoggerForReplica(job, rt)) - - for index, serviceSlice := range serviceSlices { - if len(serviceSlice) > 1 { - commonutil.LoggerForReplica(job, rt).Warningf("We have too many services for %s %d", rtype, index) - } else if len(serviceSlice) == 0 { - commonutil.LoggerForReplica(job, rt).Infof("need to create new service: %s-%d", rtype, index) - err = r.CreateNewService(job, rtype, spec, strconv.Itoa(index)) - if err != nil { - return err - } - } else { - // Check the status of the current svc. - svc := serviceSlice[0] - - // check if the index is in the valid range, if not, we should kill the svc - if index < 0 || index >= replicas { - err = r.DeleteService(svc.Namespace, svc.Name, job) - if err != nil { - return err - } - } - } - } - return nil - -} - -// CreateNewService generates Service based the job, replica info. and index and submits it to APIServer -func (r *ServiceReconciler) CreateNewService(job client.Object, rtype kubeflowv1.ReplicaType, - spec *kubeflowv1.ReplicaSpec, index string) error { - - // Convert ReplicaType to lower string. - rt := strings.ToLower(string(rtype)) - // Append ReplicaTypeLabel and ReplicaIndexLabel labels. - labels := r.GenLabels(job.GetName()) - labels[kubeflowv1.ReplicaTypeLabel] = rt - labels[kubeflowv1.ReplicaIndexLabel] = index - - ports, err := r.GetPortsFromJob(spec) - if err != nil { - return err - } - - service := &corev1.Service{ - Spec: corev1.ServiceSpec{ - ClusterIP: "None", - Selector: labels, - Ports: []corev1.ServicePort{}, - }, - } - - // Add service ports to headless service - for name, port := range ports { - svcPort := corev1.ServicePort{Name: name, Port: port} - service.Spec.Ports = append(service.Spec.Ports, svcPort) - } - - service.Name = core.GenGeneralName(job.GetName(), rt, index) - service.Namespace = job.GetNamespace() - service.Labels = labels - // Create OwnerReference. - err = controllerutil.SetControllerReference(job, service, r.GetScheme()) - if err != nil { - return err - } - - r.DecorateService(rt, service, job) - - err = r.Create(context.Background(), service) - if err != nil && errors.IsTimeout(err) { - succeededServiceCreationCount.Inc() - return nil - } else if err != nil { - failedServiceCreationCount.Inc() - return err - } - succeededServiceCreationCount.Inc() - return nil -} - -// DeleteService deletes a Service specified by its name and namespace from APIServer -func (r *ServiceReconciler) DeleteService(ns string, name string, job client.Object) error { - svc := &corev1.Service{} - svc.Name = name - svc.Namespace = ns - err := r.Delete(context.Background(), svc) - if err == nil { - deletedPodsCount.Inc() - } - return err -} - -// DecorateService decorates the Service before it's submitted to APIServer -func (r *ServiceReconciler) DecorateService(rtype string, svc *corev1.Service, job client.Object) { - // Default implementation applies nothing to podTemplate - // return -} diff --git a/pkg/reconciler.v1/common/service_test.go b/pkg/reconciler.v1/common/service_test.go deleted file mode 100644 index 1c34d196a0..0000000000 --- a/pkg/reconciler.v1/common/service_test.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common_test - -import ( - "context" - "reflect" - "strings" - "testing" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - testjobv1 "github.com/kubeflow/training-operator/test_job/apis/test_job/v1" - "github.com/kubeflow/training-operator/test_job/reconciler.v1/test_job" - testutilv1 "github.com/kubeflow/training-operator/test_job/test_util/v1" - - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -func TestCreateNewService(t *testing.T) { - type tc struct { - testJob *testjobv1.TestJob - testRType kubeflowv1.ReplicaType - testSpec *kubeflowv1.ReplicaSpec - testIndex string - expectedService *corev1.Service - } - testCase := []tc{ - func() tc { - tj := testutilv1.NewTestJob(3) - jobName := "testjob1" - tj.SetName(jobName) - idx := "0" - svc := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: jobName + "-worker-" + idx, - Namespace: corev1.NamespaceDefault, - }, - Spec: corev1.ServiceSpec{ - Ports: []corev1.ServicePort{ - corev1.ServicePort{ - Name: testjobv1.DefaultPortName, - Port: testjobv1.DefaultPort, - }, - }, - ClusterIP: corev1.ClusterIPNone, - Selector: map[string]string{ - kubeflowv1.OperatorNameLabel: "Test Reconciler", - kubeflowv1.JobNameLabel: jobName, - kubeflowv1.ReplicaTypeLabel: strings.ToLower(string(testjobv1.TestReplicaTypeWorker)), - kubeflowv1.ReplicaIndexLabel: idx, - }, - }, - } - return tc{ - testJob: tj, - testRType: kubeflowv1.ReplicaType(testjobv1.TestReplicaTypeWorker), - testSpec: tj.Spec.TestReplicaSpecs[testjobv1.TestReplicaTypeWorker], - testIndex: idx, - expectedService: svc, - } - }(), - } - testReconciler := test_job.NewTestReconciler() - - for _, c := range testCase { - err := testReconciler.CreateNewService(c.testJob, c.testRType, c.testSpec, c.testIndex) - if err != nil { - t.Errorf("Got error when CreateNewService: %v", err) - continue - } - var got corev1.Service - if err = testReconciler.FC.Get(context.Background(), client.ObjectKeyFromObject(c.expectedService), &got, &client.GetOptions{}); err != nil { - if apierrors.IsNotFound(err) { - t.Errorf("Cannot find Service %s/%s created", c.expectedService.GetNamespace(), c.expectedService.GetName()) - } - t.Errorf("Got error when Get service: %v", err) - } - if !reflect.DeepEqual(c.expectedService.Spec, got.Spec) { - t.Errorf("Spec mismatch for service %s/%s", c.expectedService.GetNamespace(), c.expectedService.GetName()) - } - } -} diff --git a/pkg/reconciler.v1/common/utils.go b/pkg/reconciler.v1/common/utils.go deleted file mode 100644 index 2a9f68256c..0000000000 --- a/pkg/reconciler.v1/common/utils.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -import ( - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -const ReconcilerName = "common-reconciler" - -// GetReconcilerName returns the name of this reconciler, which is "common-reconciler" -func (r *ReconcilerUtil) GetReconcilerName() string { - return ReconcilerName -} - -// ReconcilerUtil defines a reconciler with utility features -type ReconcilerUtil struct { - Recorder record.EventRecorder - Log logr.Logger - Scheme *runtime.Scheme -} - -// BareUtilReconciler returns a pointer of ReconcilerUtil with default implementation -func BareUtilReconciler( - recorder record.EventRecorder, - log logr.Logger, - scheme *runtime.Scheme) *ReconcilerUtil { - return &ReconcilerUtil{ - Recorder: recorder, - Log: log, - Scheme: scheme, - } -} - -// GetRecorder returns a record.EventRecorder -func (r *ReconcilerUtil) GetRecorder() record.EventRecorder { - return r.Recorder -} - -// GetLogger returns a logr.Logger -func (r *ReconcilerUtil) GetLogger(job client.Object) logr.Logger { - return r.Log.WithValues( - job.GetObjectKind().GroupVersionKind().Kind, - types.NamespacedName{Name: job.GetName(), Namespace: job.GetNamespace()}.String()) -} - -// GetScheme returns the pointer of runtime.Schemes that is used in this reconciler -func (r *ReconcilerUtil) GetScheme() *runtime.Scheme { - return r.Scheme -} diff --git a/pkg/reconciler.v1/common/utils_test.go b/pkg/reconciler.v1/common/utils_test.go deleted file mode 100644 index b8744904e4..0000000000 --- a/pkg/reconciler.v1/common/utils_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package common_test - -import ( - "testing" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - - "github.com/kubeflow/training-operator/test_job/reconciler.v1/test_job" -) - -func TestGenLabels(t *testing.T) { - type tc struct { - testJobName string - expectedLabel map[string]string - } - testCase := []tc{ - func() tc { - return tc{ - testJobName: "test/job1", - expectedLabel: map[string]string{ - kubeflowv1.JobNameLabel: "test-job1", - kubeflowv1.OperatorNameLabel: "Test Reconciler", - }, - } - }(), - } - - testReconciler := test_job.NewTestReconciler() - - for _, c := range testCase { - labels := testReconciler.GenLabels(c.testJobName) - if len(labels) != len(c.expectedLabel) { - t.Errorf("Expected to get %d labels, got %d labels", len(c.expectedLabel), len(labels)) - continue - } - for ek, ev := range c.expectedLabel { - if v, ok := labels[ek]; !ok { - t.Errorf("Cannot found expected key %s", ek) - } else { - if ev != v { - t.Errorf("Expected to get %s for %s, got %s", ev, ek, v) - } - } - } - } -} diff --git a/test_job/reconciler.v1/test_job/test_job_reconciler.go b/test_job/reconciler.v1/test_job/test_job_reconciler.go deleted file mode 100644 index b6a139d060..0000000000 --- a/test_job/reconciler.v1/test_job/test_job_reconciler.go +++ /dev/null @@ -1,164 +0,0 @@ -package test_job - -import ( - "context" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - commonreconciler "github.com/kubeflow/training-operator/pkg/reconciler.v1/common" - v1 "github.com/kubeflow/training-operator/test_job/apis/test_job/v1" - "github.com/kubeflow/training-operator/test_job/client/clientset/versioned/scheme" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/controller-runtime/pkg/log" -) - -type TestReconciler struct { - commonreconciler.ReconcilerUtil - commonreconciler.ServiceReconciler - commonreconciler.PodReconciler - commonreconciler.VolcanoReconciler - commonreconciler.JobReconciler - - FC client.Client - Job *v1.TestJob - Pods []*corev1.Pod - Services []*corev1.Service - PodGroup client.Object -} - -func NewTestReconciler() *TestReconciler { - scm := runtime.NewScheme() - utilruntime.Must(clientgoscheme.AddToScheme(scm)) - utilruntime.Must(v1.AddToScheme(scm)) - - fakeClient := fake.NewClientBuilder().WithScheme(scm).Build() - - r := &TestReconciler{ - FC: fakeClient, - } - - // Generate Bare Components - jobR := commonreconciler.BareJobReconciler(fakeClient) - jobR.OverrideForJobInterface(r, r, r, r) - - podR := commonreconciler.BarePodReconciler(fakeClient) - podR.OverrideForPodInterface(r, r, r) - - svcR := commonreconciler.BareServiceReconciler(fakeClient) - svcR.OverrideForServiceInterface(r, r, r) - - gangR := commonreconciler.BareVolcanoReconciler(fakeClient, nil, false) - gangR.OverrideForGangSchedulingInterface(r) - - Log := log.Log - utilR := commonreconciler.BareUtilReconciler(nil, Log, scm) - //kubeflowReconciler := commonreconciler.BareKubeflowReconciler() - - r.JobReconciler = *jobR - r.PodReconciler = *podR - r.ServiceReconciler = *svcR - r.VolcanoReconciler = *gangR - r.ReconcilerUtil = *utilR - - return r -} - -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -func (r *TestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - _ = log.FromContext(ctx) - - job, err := r.GetJob(ctx, req) - if err != nil { - return ctrl.Result{}, err - } - - logger := r.GetLogger(job) - - if job.GetDeletionTimestamp() != nil { - return ctrl.Result{}, nil - } - - scheme.Scheme.Default(job) - - // Get rid of SatisfiedExpectation - replicasSpec, err := r.ExtractReplicasSpec(job) - if err != nil { - return ctrl.Result{}, err - } - - runPolicy, err := r.ExtractRunPolicy(job) - if err != nil { - return ctrl.Result{}, err - } - - status, err := r.ExtractJobStatus(job) - if err != nil { - return ctrl.Result{}, err - } - - err = r.ReconcileJob(ctx, job, replicasSpec, status, runPolicy) - if err != nil { - logger.Info("Reconcile Test Job error %v", err) - return ctrl.Result{}, err - } - - return ctrl.Result{}, nil -} - -func (r *TestReconciler) GetReconcilerName() string { - return "Test Reconciler" -} - -func (r *TestReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error) { - return r.Job, nil -} - -func (r *TestReconciler) GetDefaultContainerName() string { - return v1.DefaultContainerName -} - -func (r *TestReconciler) GetPodGroupForJob(ctx context.Context, job client.Object) (client.Object, error) { - return r.PodGroup, nil -} - -func (r *TestReconciler) GetPodsForJob(ctx context.Context, job client.Object) ([]*corev1.Pod, error) { - return r.Pods, nil -} - -func (r *TestReconciler) GetServicesForJob(ctx context.Context, job client.Object) ([]*corev1.Service, error) { - return r.Services, nil -} - -func (r *TestReconciler) ExtractReplicasSpec(job client.Object) (map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, error) { - tj := job.(*v1.TestJob) - - rs := map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec{} - for k, v := range tj.Spec.TestReplicaSpecs { - rs[kubeflowv1.ReplicaType(k)] = v - } - - return rs, nil -} - -func (r *TestReconciler) ExtractRunPolicy(job client.Object) (*kubeflowv1.RunPolicy, error) { - tj := job.(*v1.TestJob) - - return tj.Spec.RunPolicy, nil -} - -func (r *TestReconciler) ExtractJobStatus(job client.Object) (*kubeflowv1.JobStatus, error) { - tj := job.(*v1.TestJob) - - return &tj.Status, nil -} - -func (r *TestReconciler) IsMasterRole(replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, rtype kubeflowv1.ReplicaType, index int) bool { - return string(rtype) == string(v1.TestReplicaTypeMaster) -} From 64ede6be22f6eb80932508ae6a23a3058bc43c2e Mon Sep 17 00:00:00 2001 From: Johnu George Date: Sun, 6 Aug 2023 00:24:02 +0530 Subject: [PATCH 2/8] Update monitoring guide --- docs/monitoring/README.md | 82 +++------------------------------------ 1 file changed, 6 insertions(+), 76 deletions(-) diff --git a/docs/monitoring/README.md b/docs/monitoring/README.md index bc5e4ac9d3..08a95cb53a 100644 --- a/docs/monitoring/README.md +++ b/docs/monitoring/README.md @@ -1,106 +1,36 @@ -# Prometheus Monitoring for TFJob +# Prometheus Monitoring for Training Job ## Available Metrics Currently available metrics to monitor are listed below. -### Metrics for Each Component Container for TFJob - -Component Containers: - -- tf-operator -- tf-chief -- tf-ps -- tf-worker - -#### Each Container Reports on its: - -Use prometheus graph to run the following example commands to visualize metrics. - -_Note_: These metrics are derived from [cAdvisor](https://github.com/google/cadvisor) kubelet integration which reports to Prometheus through our prometheus-operator installation. You may see a complete list of metrics available in `\metrics` page of your Prometheus web UI which you can further use to compose your own queries. - -**CPU usage** - -``` -sum (rate (container_cpu_usage_seconds_total{pod_name=~"tfjob-name-.*"}[1m])) by (pod_name) -``` - -**GPU Usage** - -``` -sum (rate (container_accelerator_memory_used_bytes{pod_name=~"tfjob-name-.*"}[1m])) by (pod_name) -``` - -**Memory Usage** - -``` -sum (rate (container_memory_usage_bytes{pod_name=~"tfjob-name-.*"}[1m])) by (pod_name) -``` - -**Network Usage** - -``` -sum (rate (container_network_transmit_bytes_total{pod_name=~"tfjob-name-.*"}[1m])) by (pod_name) -``` - -**I/O Usage** - -``` -sum (rate (container_fs_write_seconds_total{pod_name=~"tfjob-name-.*"}[1m])) by (pod_name) -``` - -**Keep-Alive check** - -``` -up -``` - -This is maintained by Prometheus on its own with its `up` metric detailed in the documentation [here](https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series). - -**Is Leader check** - -``` -tf_operator_is_leader -``` - -_Note_: Replace `tfjob-name` with your own TF Job name you want to monitor for the example queries above. - -### Report TFJob metrics: - -_Note_: If you are using release v1 tf-operator, these TFJob metrics don't have suffix `total`. So you have to use metric name like `tf_operator_jobs_created` to get your metrics. See [PR](https://github.com/kubeflow/training-operator/pull/1055) to get more information. - -**Job Creation** - -``` -tf_operator_jobs_created_total -``` **Job Creation** ``` -sum (rate (tf_operator_jobs_created_total[60m])) +training_operator_jobs_created_total ``` **Job Deletion** ``` -tf_operator_jobs_deleted_total +training_operator_jobs_deleted_total ``` **Successful Job Completions** ``` -tf_operator_jobs_successful_total +training_operator_jobs_successful_total ``` **Failed Jobs** ``` -tf_operator_jobs_failed_total +training_operator_jobs_failed_total ``` **Restarted Jobs** ``` -tf_operator_jobs_restarted_total +training_operator_jobs_restarted_total ``` From db5960856df7a9edbfaf3b3e67a503798c04f801 Mon Sep 17 00:00:00 2001 From: Johnu George Date: Wed, 23 Aug 2023 17:20:00 +0530 Subject: [PATCH 3/8] Changelog changes --- CHANGELOG.md | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a4e2c63bc..9e15aedfe1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,43 @@ # Changelog + +## [v1.7.0-rc.0](https://github.com/kubeflow/training-operator/tree/v1.7.0-rc.0) (2023-07-07) + +**Breaking Changes** +- Make scheduler-plugins the default gang scheduler. [\#1747](https://github.com/kubeflow/training-operator/pull/1747) ([Syulin7](https://github.com/Syulin7)) +- Upgrade the kubernetes dependencies to v1.27 https://github.com/kubeflow/training-operator/pull/1834 ([tenzen-y](https://github.com/tenzen-y)) + +**New features** +- Make scheduler-plugins the default gang scheduler. [\#1747](https://github.com/kubeflow/training-operator/pull/1747) ([Syulin7](https://github.com/Syulin7)) +- Merge kubeflow/common to training-operator [\#1813](https://github.com/kubeflow/training-operator/pull/1813) ([johnugeorge](https://github.com/johnugeorge)) +- Auto-generate RBAC manifests by the controller-gen [\#1815](https://github.com/kubeflow/training-operator/pull/1815) ([Syulin7](https://github.com/Syulin7)) +- Implement suspend semantics [\#1859](https://github.com/kubeflow/training-operator/pull/1859) ([tenzen-y](https://github.com/tenzen-y)) +- Set up controllers using goroutines to start the manager quickly [\#1869](https://github.com/kubeflow/training-operator/pull/1869) ([tenzen-y](https://github.com/tenzen-y)) +- Set correct ENV for PytorchJob to support torchrun [\#1840](https://github.com/kubeflow/training-operator/pull/1840) ([kuizhiqing](https://github.com/kuizhiqing)) + +**Bug fixes** +- Fix a bug that XGBoostJob's running condition isn't updated when the job is resumed [\#1866](https://github.com/kubeflow/training-operator/pull/1866) ([tenzen-y](https://github.com/tenzen-y)) +- Set a Running condition when the XGBoostJob is completed and doesn't have a Running condition [\#1789](https://github.com/kubeflow/training-operator/pull/1789) ([tenzen-y](https://github.com/tenzen-y)) +- Avoid to depend on local env when installing the code-generators [\#1810](https://github.com/kubeflow/training-operator/pull/1810) ([tenzen-y](https://github.com/tenzen-y)) + + +**Misc** +- Removing reconciler code [\#1879](https://github.com/kubeflow/training-operator/pull/1879) ([johnugeorge](https://github.com/johnugeorge)) +- Make Condition and ReplicaStatus optional [\#1862](https://github.com/kubeflow/training-operator/pull/1862) ([tenzen-y](https://github.com/tenzen-y)) +- Use the same reasons for Condition and Event [\#1854](https://github.com/kubeflow/training-operator/pull/1854) ([tenzen-y](https://github.com/tenzen-y)) +- Fully consolidate tfjob-operator to training-operator [\#1850](https://github.com/kubeflow/training-operator/pull/1850) ([tenzen-y](https://github.com/tenzen-y)) +- Clean up /pkg/common/util/v1 [\#1845](https://github.com/kubeflow/training-operator/pull/1845) ([tenzen-y](https://github.com/tenzen-y)) +- Refactoring tests in common/controller.v1 [\#1843](https://github.com/kubeflow/training-operator/pull/1843) ([tenzen-y](https://github.com/tenzen-y)) +- remove duplicate code of add task spec annotation [\#1839](https://github.com/kubeflow/training-operator/pull/1839) ([lowang-bh](https://github.com/lowang-bh)) +- fetch volcano log when e2e failed [\#1837](https://github.com/kubeflow/training-operator/pull/1837) ([lowang-bh](https://github.com/lowang-bh)) +- Add check pods are not scheduled when testing gang-scheduler integrations in e2e [\#1835](https://github.com/kubeflow/training-operator/pull/1835) ([tenzen-y](https://github.com/tenzen-y)) +- Replace dummy client with fake client [\#1818](https://github.com/kubeflow/training-operator/pull/1818) ([tenzen-y](https://github.com/tenzen-y)) +- Add default Intel MPI env variables to MPIJob [\#1804](https://github.com/kubeflow/training-operator/pull/1804) ([tkatila](https://github.com/tkatila)) +- Improve E2E tests for the gang-scheduling [\#1801](https://github.com/kubeflow/training-operator/pull/1801) ([tenzen-y](https://github.com/tenzen-y)) +- xgb yaml container name should be consistent with xgb job default container name [\#1794](https://github.com/kubeflow/training-operator/pull/1794) ([Crisescode](https://github.com/Crisescode)) +- make timeout configurable from e2e tests [\#1787](https://github.com/kubeflow/training-operator/pull/1787) ([nagar-ajay](https://github.com/nagar-ajay)) + + ## [v1.6.0](https://github.com/kubeflow/training-operator/tree/v1.6.0) (2023-03-21) Note: Since scheduler-plugins has changed API from `sigs.k8s.io` with the `x-k8s.io`, future releases of training operator(v1.7+) will not support scheduler-plugins v0.24.x or lower. Related: [\#1769](https://github.com/kubeflow/training-operator/issues/1769) From 6d3c345402f1900ea2c0a22f6c7dea20a368a42d Mon Sep 17 00:00:00 2001 From: Johnu George Date: Tue, 5 Sep 2023 16:17:58 +0530 Subject: [PATCH 4/8] Adding tenzen to Approvers list --- OWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/OWNERS b/OWNERS index de6663b065..f04322a386 100644 --- a/OWNERS +++ b/OWNERS @@ -4,7 +4,7 @@ approvers: - johnugeorge - terrytangyuan - zw0610 + - tenzen-y reviewers: - jinchihe - kuizhiqing - - tenzen-y From 42e7e967c4952e9a3b9237612fcf36502e31dfd9 Mon Sep 17 00:00:00 2001 From: Johnu George Date: Tue, 5 Sep 2023 16:19:52 +0530 Subject: [PATCH 5/8] Merge changes --- CHANGELOG.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e617478c28..43c2b2a372 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,6 @@ ## [v1.7.0-rc.0](https://github.com/kubeflow/training-operator/tree/v1.7.0-rc.0) (2023-07-07) **Breaking Changes** -- Make scheduler-plugins the default gang scheduler. [\#1747](https://github.com/kubeflow/training-operator/pull/1747) ([Syulin7](https://github.com/Syulin7)) [Full Changelog](https://github.com/kubeflow/training-operator/compare/v1.6.0...v1.7.0-rc.0) **Breaking Changes** @@ -15,7 +14,6 @@ - Make scheduler-plugins the default gang scheduler. [\#1747](https://github.com/kubeflow/training-operator/pull/1747) ([Syulin7](https://github.com/Syulin7)) - Merge kubeflow/common to training-operator [\#1813](https://github.com/kubeflow/training-operator/pull/1813) ([johnugeorge](https://github.com/johnugeorge)) - Auto-generate RBAC manifests by the controller-gen [\#1815](https://github.com/kubeflow/training-operator/pull/1815) ([Syulin7](https://github.com/Syulin7)) -- Implement suspend semantics [\#1859](https://github.com/kubeflow/training-operator/pull/1859) ([tenzen-y](https://github.com/tenzen-y)) - Implement suspend semantics [\#1859](https://github.com/kubeflow/training-operator/pull/1859) ([tenzen-y](https://github.com/tenzen-y)) - Set up controllers using goroutines to start the manager quickly [\#1869](https://github.com/kubeflow/training-operator/pull/1869) ([tenzen-y](https://github.com/tenzen-y)) - Set correct ENV for PytorchJob to support torchrun [\#1840](https://github.com/kubeflow/training-operator/pull/1840) ([kuizhiqing](https://github.com/kuizhiqing)) From ad4a6873239edc905cb58916268548945b6ea58d Mon Sep 17 00:00:00 2001 From: Johnu George Date: Tue, 5 Sep 2023 16:20:19 +0530 Subject: [PATCH 6/8] Merge changes --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 43c2b2a372..84f7b62348 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,6 @@ ## [v1.7.0-rc.0](https://github.com/kubeflow/training-operator/tree/v1.7.0-rc.0) (2023-07-07) -**Breaking Changes** [Full Changelog](https://github.com/kubeflow/training-operator/compare/v1.6.0...v1.7.0-rc.0) **Breaking Changes** From 6765c358a997cd0e1a11caa64c7dbf39be4e1647 Mon Sep 17 00:00:00 2001 From: Johnu George Date: Tue, 5 Sep 2023 21:24:22 +0530 Subject: [PATCH 7/8] Sort alphabetically --- OWNERS | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/OWNERS b/OWNERS index f04322a386..6860ce1ac4 100644 --- a/OWNERS +++ b/OWNERS @@ -1,10 +1,10 @@ approvers: - gaocegege - - Jeffwan - johnugeorge + - Jeffwan + - tenzen-y - terrytangyuan - zw0610 - - tenzen-y reviewers: - jinchihe - kuizhiqing From 7deb9b4f282819385285bee724bf416aac4ce697 Mon Sep 17 00:00:00 2001 From: Johnu George Date: Tue, 5 Sep 2023 21:30:46 +0530 Subject: [PATCH 8/8] Sort alphabetically --- OWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/OWNERS b/OWNERS index 6860ce1ac4..b8d1ea5f1d 100644 --- a/OWNERS +++ b/OWNERS @@ -1,7 +1,7 @@ approvers: - gaocegege - - johnugeorge - Jeffwan + - johnugeorge - tenzen-y - terrytangyuan - zw0610