Skip to content

Commit

Permalink
Fix empty object TypeMeta in predicates
Browse files Browse the repository at this point in the history
Signed-off-by: Antonin Stefanutti <[email protected]>
  • Loading branch information
astefanutti committed Nov 27, 2024
1 parent aaa79c0 commit bc31553
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 311 deletions.
114 changes: 1 addition & 113 deletions pkg/common/util/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,10 @@
package util

import (
"fmt"
"reflect"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/common"
"github.com/kubeflow/training-operator/pkg/controller.v1/expectation"
commonutil "github.com/kubeflow/training-operator/pkg/util"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// SatisfiedExpectations returns true if the required adds/dels for the given job have been observed.
Expand All @@ -45,82 +38,6 @@ func SatisfiedExpectations(exp expectation.ControllerExpectationsInterface, jobK
return satisfied
}

// OnDependentCreateFunc modify expectations when dependent (pod/service) creation observed.
func OnDependentCreateFunc(exp expectation.ControllerExpectationsInterface) func(event.CreateEvent) bool {
return func(e event.CreateEvent) bool {
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}

//logrus.Info("Update on create function ", ptjr.ControllerName(), " create object ", e.Object.GetName())
if controllerRef := metav1.GetControllerOf(e.Object); controllerRef != nil {
jobKey := fmt.Sprintf("%s/%s", e.Object.GetNamespace(), controllerRef.Name)
var expectKey string
switch e.Object.(type) {
case *corev1.Pod:
expectKey = expectation.GenExpectationPodsKey(jobKey, rtype)
case *corev1.Service:
expectKey = expectation.GenExpectationServicesKey(jobKey, rtype)
default:
return false
}
exp.CreationObserved(expectKey)
return true
}

return true
}
}

// OnDependentUpdateFunc modify expectations when dependent (pod/service) update observed.
func OnDependentUpdateFunc(jc *common.JobController) func(updateEvent event.UpdateEvent) bool {
return func(e event.UpdateEvent) bool {
newObj := e.ObjectNew
oldObj := e.ObjectOld
if newObj.GetResourceVersion() == oldObj.GetResourceVersion() {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return false
}

kind := jc.Controller.GetAPIGroupVersionKind().Kind
var logger = LoggerForGenericKind(newObj, kind)

switch obj := newObj.(type) {
case *corev1.Pod:
logger = commonutil.LoggerForPod(obj, jc.Controller.GetAPIGroupVersionKind().Kind)
case *corev1.Service:
logger = commonutil.LoggerForService(newObj.(*corev1.Service), jc.Controller.GetAPIGroupVersionKind().Kind)
default:
return false
}

newControllerRef := metav1.GetControllerOf(newObj)
oldControllerRef := metav1.GetControllerOf(oldObj)
controllerRefChanged := !reflect.DeepEqual(newControllerRef, oldControllerRef)

if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if job := resolveControllerRef(jc, oldObj.GetNamespace(), oldControllerRef); job != nil {
logger.Infof("pod/service controller ref updated: %v, %v", newObj, oldObj)
return true
}
}

// If it has a controller ref, that's all that matters.
if newControllerRef != nil {
job := resolveControllerRef(jc, newObj.GetNamespace(), newControllerRef)
if job == nil {
return false
}
logger.Debugf("pod/service has a controller ref: %v, %v", newObj, oldObj)
return true
}
return false
}
}

// resolveControllerRef returns the job referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching job
// of the correct Kind.
Expand All @@ -141,32 +58,3 @@ func resolveControllerRef(jc *common.JobController, namespace string, controller
}
return job
}

// OnDependentDeleteFunc modify expectations when dependent (pod/service) deletion observed.
func OnDependentDeleteFunc(exp expectation.ControllerExpectationsInterface) func(event.DeleteEvent) bool {
return func(e event.DeleteEvent) bool {

rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}

// logrus.Info("Update on deleting function ", xgbr.ControllerName(), " delete object ", e.Object.GetName())
if controllerRef := metav1.GetControllerOf(e.Object); controllerRef != nil {
jobKey := fmt.Sprintf("%s/%s", e.Object.GetNamespace(), controllerRef.Name)
var expectKey string
switch e.Object.(type) {
case *corev1.Pod:
expectKey = expectation.GenExpectationPodsKey(jobKey, rtype)
case *corev1.Service:
expectKey = expectation.GenExpectationServicesKey(jobKey, rtype)
default:
return false
}
exp.DeletionObserved(expectKey)
return true
}

return true
}
}
60 changes: 43 additions & 17 deletions pkg/common/util/reconciler_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ import (
"reflect"
"strings"

log "github.com/sirupsen/logrus"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/common"
"github.com/kubeflow/training-operator/pkg/controller.v1/expectation"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// GenExpectationGenericKey generates an expectation key for {Kind} of a job
Expand All @@ -50,19 +53,40 @@ func LoggerForGenericKind(obj metav1.Object, kind string) *log.Entry {
})
}

func objectKind(s *runtime.Scheme, obj client.Object) schema.GroupVersionKind {
gkvs, _, err := s.ObjectKinds(obj)
if err != nil {
var logger = LoggerForGenericKind(obj, "")
logger.Errorf("unknown kind for %v", obj)
return schema.GroupVersionKind{}
}
return gkvs[0]
}

func OnDependentFuncs[T client.Object](s *runtime.Scheme, expectations expectation.ControllerExpectationsInterface, jobController *common.JobController) predicate.TypedFuncs[T] {
return predicate.TypedFuncs[T]{
CreateFunc: OnDependentCreateFuncGeneric[T](s, expectations),
UpdateFunc: OnDependentUpdateFuncGeneric[T](s, jobController),
DeleteFunc: OnDependentDeleteFuncGeneric[T](s, expectations),
}
}

// OnDependentCreateFuncGeneric modify expectations when dependent (pod/service) creation observed.
func OnDependentCreateFuncGeneric(exp expectation.ControllerExpectationsInterface) func(event.CreateEvent) bool {
return func(e event.CreateEvent) bool {
func OnDependentCreateFuncGeneric[T client.Object](s *runtime.Scheme, exp expectation.ControllerExpectationsInterface) func(createEvent event.TypedCreateEvent[T]) bool {
return func(e event.TypedCreateEvent[T]) bool {
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}

if controllerRef := metav1.GetControllerOf(e.Object); controllerRef != nil {
jobKey := fmt.Sprintf("%s/%s", e.Object.GetNamespace(), controllerRef.Name)
var expectKey string
pl := strings.ToLower(e.Object.GetObjectKind().GroupVersionKind().Kind) + "s"
expectKey = GenExpectationGenericKey(jobKey, rtype, pl)
kind := e.Object.GetObjectKind().GroupVersionKind().Kind
if kind == "" {
kind = objectKind(s, e.Object).Kind
}
pl := strings.ToLower(kind) + "s"
expectKey := GenExpectationGenericKey(jobKey, rtype, pl)
exp.CreationObserved(expectKey)
return true
}
Expand All @@ -71,9 +95,9 @@ func OnDependentCreateFuncGeneric(exp expectation.ControllerExpectationsInterfac
}
}

// OnDependentUpdateFuncGeneric modify expectations when dependent (pod/service) update observed.
func OnDependentUpdateFuncGeneric(jc *common.JobController) func(updateEvent event.UpdateEvent) bool {
return func(e event.UpdateEvent) bool {
// OnDependentUpdateFuncGeneric modify expectations when dependent update observed.
func OnDependentUpdateFuncGeneric[T client.Object](_ *runtime.Scheme, jc *common.JobController) func(updateEvent event.TypedUpdateEvent[T]) bool {
return func(e event.TypedUpdateEvent[T]) bool {
newObj := e.ObjectNew
oldObj := e.ObjectOld
if newObj.GetResourceVersion() == oldObj.GetResourceVersion() {
Expand Down Expand Up @@ -110,20 +134,22 @@ func OnDependentUpdateFuncGeneric(jc *common.JobController) func(updateEvent eve
}
}

// OnDependentDeleteFuncGeneric modify expectations when dependent (pod/service) deletion observed.
func OnDependentDeleteFuncGeneric(exp expectation.ControllerExpectationsInterface) func(event.DeleteEvent) bool {
return func(e event.DeleteEvent) bool {

// OnDependentDeleteFuncGeneric modify expectations when dependent deletion observed.
func OnDependentDeleteFuncGeneric[T client.Object](s *runtime.Scheme, exp expectation.ControllerExpectationsInterface) func(event.TypedDeleteEvent[T]) bool {
return func(e event.TypedDeleteEvent[T]) bool {
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}

if controllerRef := metav1.GetControllerOf(e.Object); controllerRef != nil {
jobKey := fmt.Sprintf("%s/%s", e.Object.GetNamespace(), controllerRef.Name)
pl := strings.ToLower(e.Object.GetObjectKind().GroupVersionKind().Kind) + "s"
var expectKey = GenExpectationGenericKey(jobKey, rtype, pl)

kind := e.Object.GetObjectKind().GroupVersionKind().Kind
if kind == "" {
kind = objectKind(s, e.Object).Kind
}
pl := strings.ToLower(kind) + "s"
expectKey := GenExpectationGenericKey(jobKey, rtype, pl)
exp.DeletionObserved(expectKey)
return true
}
Expand Down
59 changes: 0 additions & 59 deletions pkg/common/util/reconciler_test.go

This file was deleted.

4 changes: 4 additions & 0 deletions pkg/controller.v1/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.Replica
}

service := &v1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: v1.SchemeGroupVersion.String(),
},
Spec: v1.ServiceSpec{
ClusterIP: "None",
Selector: labels,
Expand Down
34 changes: 14 additions & 20 deletions pkg/controller.v1/jax/jaxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,48 +175,42 @@ func (r *JAXJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads
if err != nil {
return err
}

// using onOwnerCreateFunc is easier to set defaults
if err = c.Watch(source.Kind[*kubeflowv1.JAXJob](mgr.GetCache(), &kubeflowv1.JAXJob{}, &handler.TypedEnqueueRequestForObject[*kubeflowv1.JAXJob]{},
if err = c.Watch(source.Kind[*kubeflowv1.JAXJob](mgr.GetCache(), &kubeflowv1.JAXJob{},
&handler.TypedEnqueueRequestForObject[*kubeflowv1.JAXJob]{},
predicate.TypedFuncs[*kubeflowv1.JAXJob]{CreateFunc: r.onOwnerCreateFunc()}),
); err != nil {
return err
}

// eventHandler for owned object
eventHandler := handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &kubeflowv1.JAXJob{}, handler.OnlyControllerOwner())
predicates := predicate.Funcs{
CreateFunc: util.OnDependentCreateFunc(r.Expectations),
UpdateFunc: util.OnDependentUpdateFunc(&r.JobController),
DeleteFunc: util.OnDependentDeleteFunc(r.Expectations),
}
// Create generic predicates
genericPredicates := predicate.Funcs{
CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations),
UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController),
DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations),
}
// inject watching for job related pod
if err = c.Watch(source.Kind[client.Object](mgr.GetCache(), &corev1.Pod{}, eventHandler, predicates)); err != nil {
if err = c.Watch(source.Kind[*corev1.Pod](mgr.GetCache(), &corev1.Pod{},
handler.TypedEnqueueRequestForOwner[*corev1.Pod](mgr.GetScheme(), mgr.GetRESTMapper(), &kubeflowv1.JAXJob{}, handler.OnlyControllerOwner()),
util.OnDependentFuncs[*corev1.Pod](r.scheme, r.Expectations, &r.JobController))); err != nil {
return err
}
// inject watching for job related service
if err = c.Watch(source.Kind[client.Object](mgr.GetCache(), &corev1.Service{}, eventHandler, predicates)); err != nil {
if err = c.Watch(source.Kind[*corev1.Service](mgr.GetCache(), &corev1.Service{},
handler.TypedEnqueueRequestForOwner[*corev1.Service](mgr.GetScheme(), mgr.GetRESTMapper(), &kubeflowv1.JAXJob{}, handler.OnlyControllerOwner()),
util.OnDependentFuncs[*corev1.Service](r.scheme, r.Expectations, &r.JobController))); err != nil {
return err
}
// skip watching volcano PodGroup if volcano PodGroup is not installed
if _, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.GroupName, Kind: "PodGroup"},
v1beta1.SchemeGroupVersion.Version); err == nil {
// inject watching for job related volcano PodGroup
if err = c.Watch(source.Kind[client.Object](mgr.GetCache(), &v1beta1.PodGroup{}, eventHandler, genericPredicates)); err != nil {
if err = c.Watch(source.Kind[*v1beta1.PodGroup](mgr.GetCache(), &v1beta1.PodGroup{},
handler.TypedEnqueueRequestForOwner[*v1beta1.PodGroup](mgr.GetScheme(), mgr.GetRESTMapper(), &kubeflowv1.JAXJob{}, handler.OnlyControllerOwner()),
util.OnDependentFuncs[*v1beta1.PodGroup](r.scheme, r.Expectations, &r.JobController))); err != nil {
return err
}
}
// skip watching scheduler-plugins PodGroup if scheduler-plugins PodGroup is not installed
if _, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: schedulerpluginsv1alpha1.SchemeGroupVersion.Group, Kind: "PodGroup"},
schedulerpluginsv1alpha1.SchemeGroupVersion.Version); err == nil {
// inject watching for job related scheduler-plugins PodGroup
if err = c.Watch(source.Kind[client.Object](mgr.GetCache(), &schedulerpluginsv1alpha1.PodGroup{}, eventHandler, genericPredicates)); err != nil {
if err = c.Watch(source.Kind[*schedulerpluginsv1alpha1.PodGroup](mgr.GetCache(), &schedulerpluginsv1alpha1.PodGroup{},
handler.TypedEnqueueRequestForOwner[*schedulerpluginsv1alpha1.PodGroup](mgr.GetScheme(), mgr.GetRESTMapper(), &kubeflowv1.JAXJob{}, handler.OnlyControllerOwner()),
util.OnDependentFuncs[*schedulerpluginsv1alpha1.PodGroup](r.scheme, r.Expectations, &r.JobController))); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit bc31553

Please sign in to comment.