Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to controller-runtime logger in mpi job controller #2177

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 47 additions & 39 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/sirupsen/logrus"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you get a chance to remove logrus from other parts of Training Operator:

?

That will allow us to remove that dependency from go.mod.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this comment from you @champon1020: #2048 (comment).
Do you want to work on migration for other files in the separate PR ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm going to create another PR to do migration in other controllers.

corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -37,7 +36,6 @@ import (
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -74,7 +72,7 @@ func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSched
Scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor(controllerName),
apiReader: mgr.GetAPIReader(),
Log: log.Log,
Log: log.Log.WithName(controllerName),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: attach controllerName as the logger name

}

cfg := mgr.GetConfig()
Expand Down Expand Up @@ -125,8 +123,7 @@ type MPIJobReconciler struct {
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
logger := jc.Log.WithValues(kubeflowv1.MPIJobSingular, req.NamespacedName)
logger := jc.Log.WithValues("namespace", req.NamespacedName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change it ?

Copy link
Contributor

@varshaprasad96 varshaprasad96 Oct 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get the logger from the context and augment it with additional info from the reconciler's request with labels:

log := ctrl.LoggerFrom(ctx).WithValues(....)

Copy link
Contributor Author

@champon1020 champon1020 Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change it ?

I think the MPIJobSingular, whose value is "mpijob", is not clear as the key of NamespacedName.
Moreover, I attached the controllerName as the logger name here (#2177 (comment)).

Are there any reasons that MPIJobSingular was used here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get the logger from the context and augment it with additional info from the reconciler's request with labels:

To use LoggerFrom method, we need to modify some methods to receive context.Context. Since it requires much more modifications, I think it is better to work on as another issue.


mpijob := &kubeflowv1.MPIJob{}
err := jc.Get(ctx, req.NamespacedName, mpijob)
Expand Down Expand Up @@ -164,13 +161,13 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
// MPIJob needs not service
err = jc.ReconcileJobs(mpijob, mpijob.Spec.MPIReplicaSpecs, mpijob.Status, &mpijob.Spec.RunPolicy)
if err != nil {
logrus.Warnf("Reconcile MPIJob error %v", err)
logger.Info("Reconcile MPIJob error", "error", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to set verbosity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've replaced logger.Info with logger.Error as mentioned here (#2177 (comment)), but using verbosity might be a good approach.
Is there any rules to set verbosity? (I don't have any ideas how much verbosity is proper here)

return ctrl.Result{}, err
}

t, err := util.DurationUntilExpireTime(&mpijob.Spec.RunPolicy, mpijob.Status)
if err != nil {
logrus.Warnf("Reconcile MPIJob Job error %v", err)
logger.Error(err, "Reconcile MPIJob Job error")
return ctrl.Result{}, err
}
if t >= 0 {
Expand Down Expand Up @@ -316,9 +313,11 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
return true
}

logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)
Copy link
Contributor

@varshaprasad96 varshaprasad96 Oct 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this namespace same as reconciler request namespace or different? If it's the same, can we insert the labelled logger into the context which we set while initialising (in line 126) and use it as is? If not, do we want to change the key name to something specific?


jc.Scheme.Default(mpiJob)
msg := fmt.Sprintf("MPIJob %s is created.", e.Object.GetName())
logrus.Info(msg)
logger.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg)
return true
Expand Down Expand Up @@ -411,6 +410,8 @@ func (jc *MPIJobReconciler) ReconcilePods(
}

func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launcher *corev1.Pod, worker []*corev1.Pod) error {
logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)

if launcher != nil {
initializeMPIJobStatuses(mpiJob, kubeflowv1.MPIJobReplicaTypeLauncher)
if isPodSucceeded(launcher) {
Expand Down Expand Up @@ -441,7 +442,7 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch
}
err := updateMPIJobConditions(mpiJob, kubeflowv1.JobFailed, reason, msg)
if err != nil {
klog.Errorf("Append mpiJob(%s/%s) condition error: %v", mpiJob.Namespace, mpiJob.Name, err)
logger.Error(err, "Append mpiJob(%s/%s) condition error")
return err
}

Expand Down Expand Up @@ -491,13 +492,14 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch

func (jc *MPIJobReconciler) GetJobFromAPIClient(namespace, name string) (metav1.Object, error) {
job := &kubeflowv1.MPIJob{}
logger := jc.Log.WithValues("namespace", namespace, "name", name)

err := jc.apiReader.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: name}, job)
if err != nil {
if errors.IsNotFound(err) {
logrus.Error(err, "MPIJob not found", "namespace", namespace, "name", name)
logger.Error(err, "MPIJob not found")
} else {
logrus.Error(err, "failed to get job from api-server", "namespace", namespace, "name", name)
logger.Error(err, "failed to get job from api-server")
}
return nil, err
}
Expand Down Expand Up @@ -539,15 +541,16 @@ func (jc *MPIJobReconciler) DeleteJob(job interface{}) error {
return fmt.Errorf("%v is not a type of MPIJob", mpiJob)
}

log := commonutil.LoggerForJob(mpiJob)
logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)

if err := jc.Delete(context.Background(), mpiJob); err != nil {
logger.Error(err, "failed to delete job")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.Error(err, "failed to delete job")
logger.Error(err, "failed to delete MPIJob")

jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, FailedDeleteJobReason, "Error deleting: %v", err)
log.Errorf("failed to delete job %s/%s, %v", mpiJob.Namespace, mpiJob.Name, err)
return err
}

jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", mpiJob.Name)
log.Infof("job %s/%s has been deleted", mpiJob.Namespace, mpiJob.Name)
logger.Info("job has been deleted")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.Info("job has been deleted")
logger.Info("MPIJob has been deleted")

trainingoperatorcommon.DeletedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
return nil
}
Expand All @@ -565,6 +568,8 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
return fmt.Errorf("%+v is not a type of MPIJob", job)
}

logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)

for rtype, spec := range replicas {
status := jobStatus.ReplicaStatuses[rtype]

Expand All @@ -573,8 +578,12 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
running := status.Active
failed := status.Failed

logrus.Infof("MPIJob=%s, ReplicaType=%s expected=%d, running=%d, succeeded=%d , failed=%d",
mpiJob.Name, rtype, expected, running, succeeded, failed)
logger.Info("replica status",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.Info("replica status",
logger.Info("MPIJob replica status",

"replica_type", rtype,
"expected", expected,
"running", running,
"succeeded", succeeded,
"failed", failed)

if rtype == kubeflowv1.MPIJobReplicaTypeLauncher {
if running > 0 {
Expand All @@ -584,7 +593,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
// when launcher is succeed, the job is finished.
if expected == 0 {
msg := fmt.Sprintf("MPIJob %s is successfully completed.", mpiJob.Name)
logrus.Info(msg)
logger.Info(msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
Expand Down Expand Up @@ -629,26 +638,16 @@ func (jc *MPIJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobStatu
}

startTime := time.Now()
logger := commonutil.LoggerForJob(mpiJob)
logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)
defer func() {
logger.Infof("Finished updating MpiJobs Status %q (%v)",
mpiJob.Name, time.Since(startTime))
logger.Info("Finished updating MpiJobs Status", "duration", time.Since(startTime))
}()

mpiJob = mpiJob.DeepCopy()
mpiJob.Status = *jobStatus.DeepCopy()

result := jc.Status().Update(context.Background(), mpiJob)

if result != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I think this if statement may be no longer needed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. WDYT @kubeflow/wg-training-leads

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we can remove it.

jc.Log.WithValues("mpijob", types.NamespacedName{
Namespace: mpiJob.GetNamespace(),
Name: mpiJob.GetName(),
})
return result
}

return nil
return result
}

// getLauncherJob gets the launcher Job controlled by this MPIJob.
Expand Down Expand Up @@ -918,6 +917,11 @@ func (jc *MPIJobReconciler) getOrCreateWorker(mpiJob *kubeflowv1.MPIJob) ([]*cor
// sets the appropriate OwnerReferences on the resource so handleObject can
// discover the MPIJob resource that 'owns' it.
func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *corev1.Pod {
logger := jc.Log.WithValues(
"namespace", mpiJob.Namespace,
"name", mpiJob.Name,
"replica_type", strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)))

genericLabels := jc.GenLabels(mpiJob.GetName())
labels := defaultWorkerLabels(genericLabels)

Expand All @@ -932,9 +936,9 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c
podSpec.Labels[key] = value
}
setRestartPolicy(podSpec, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker])
logger := commonutil.LoggerForReplica(mpiJob, strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)))

if len(podSpec.Spec.Containers) == 0 {
klog.Errorln("Worker pod does not have any containers in its spec")
logger.Error(nil, "Worker pod does not have any containers in its spec")
return nil
}
container := podSpec.Spec.Containers[0]
Expand Down Expand Up @@ -976,7 +980,7 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c
if jc.Config.EnableGangScheduling() {
if !util.IsGangSchedulerSet(mpiJob.Spec.MPIReplicaSpecs, jc.PodGroupControl.GetSchedulerName()) {
errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten"
logger.Warning(errMsg)
logger.Info(errMsg)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg)
}

Expand All @@ -1002,6 +1006,11 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c
// the appropriate OwnerReferences on the resource so handleObject can discover
// the MPIJob resource that 'owns' it.
func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDeliveryImage string, isGPULauncher bool) *corev1.Pod {
logger := jc.Log.WithValues(
"namespace", mpiJob.Namespace,
"name", mpiJob.Name,
"replica_type", strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)))

launcherName := mpiJob.Name + launcherSuffix

genericLabels := jc.GenLabels(mpiJob.GetName())
Expand All @@ -1020,12 +1029,11 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive
podSpec.Labels[key] = value
}

logger := commonutil.LoggerForReplica(mpiJob, strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)))
// add SchedulerName to podSpec
if jc.Config.EnableGangScheduling() {
if !util.IsGangSchedulerSet(mpiJob.Spec.MPIReplicaSpecs, jc.PodGroupControl.GetSchedulerName()) {
errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten"
logger.Warning(errMsg)
logger.Info(errMsg)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg)
}

Expand Down Expand Up @@ -1075,9 +1083,9 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive
},
})
if len(podSpec.Spec.Containers) == 0 {
klog.Errorln("Launcher pod does not have any containers in its spec")
msg := fmt.Sprintf(MessageResourceDoesNotExist, "Launcher")
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceDoesNotExist, msg)
errMsg := fmt.Sprintf(MessageResourceDoesNotExist, "Launcher")
logger.Error(nil, "Launcher pod does not have any containers in its spec")
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceDoesNotExist, errMsg)
return nil
}
container := podSpec.Spec.Containers[0]
Expand Down Expand Up @@ -1141,7 +1149,7 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive
// the pod template. We recommend to set it from the replica level.
if podSpec.Spec.RestartPolicy != corev1.RestartPolicy("") {
errMsg := "Restart policy in pod template will be overwritten by restart policy in replica spec"
klog.Warning(errMsg)
logger.Info(errMsg)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateRestartPolicyReason, errMsg)
}
setRestartPolicy(podSpec, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher])
Expand Down
Loading