diff --git a/cluster-autoscaler/context/autoscaling_context.go b/cluster-autoscaler/context/autoscaling_context.go index 040d0db8261e..3e7a0e119632 100644 --- a/cluster-autoscaler/context/autoscaling_context.go +++ b/cluster-autoscaler/context/autoscaling_context.go @@ -30,6 +30,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/client-go/informers" kube_client "k8s.io/client-go/kubernetes" kube_record "k8s.io/client-go/tools/record" klog "k8s.io/klog/v2" @@ -127,9 +128,8 @@ func NewAutoscalingContext( } // NewAutoscalingKubeClients builds AutoscalingKubeClients out of basic client. -func NewAutoscalingKubeClients(opts config.AutoscalingOptions, kubeClient, eventsKubeClient kube_client.Interface) *AutoscalingKubeClients { - listerRegistryStopChannel := make(chan struct{}) - listerRegistry := kube_util.NewListerRegistryWithDefaultListers(kubeClient, listerRegistryStopChannel) +func NewAutoscalingKubeClients(opts config.AutoscalingOptions, kubeClient, eventsKubeClient kube_client.Interface, informerFactory informers.SharedInformerFactory) *AutoscalingKubeClients { + listerRegistry := kube_util.NewListerRegistryWithDefaultListers(informerFactory) kubeEventRecorder := kube_util.CreateEventRecorder(eventsKubeClient, opts.RecordDuplicatedEvents) logRecorder, err := utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, opts.WriteStatusConfigMap, opts.StatusConfigMapName) if err != nil { diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index c8f7075ac448..4fd81f3e7436 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -36,6 +36,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/client-go/informers" kube_client "k8s.io/client-go/kubernetes" ) @@ -44,6 +45,7 @@ type AutoscalerOptions struct { config.AutoscalingOptions KubeClient kube_client.Interface EventsKubeClient kube_client.Interface + InformerFactory informers.SharedInformerFactory AutoscalingKubeClients *context.AutoscalingKubeClients CloudProvider cloudprovider.CloudProvider PredicateChecker predicatechecker.PredicateChecker @@ -97,7 +99,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error { opts.Processors = ca_processors.DefaultProcessors(opts.AutoscalingOptions) } if opts.AutoscalingKubeClients == nil { - opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.EventsKubeClient) + opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.EventsKubeClient, opts.InformerFactory) } if opts.ClusterSnapshot == nil { opts.ClusterSnapshot = clustersnapshot.NewBasicClusterSnapshot() diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 7bcece19690b..975094311b44 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -59,6 +59,7 @@ import ( scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" "k8s.io/autoscaler/cluster-autoscaler/utils/units" "k8s.io/autoscaler/cluster-autoscaler/version" + "k8s.io/client-go/informers" kube_client "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -437,11 +438,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter kubeClientConfig.Burst = autoscalingOptions.KubeClientBurst kubeClientConfig.QPS = float32(autoscalingOptions.KubeClientQPS) kubeClient := createKubeClient(kubeClientConfig) + informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) eventsKubeClient := createKubeClient(getKubeConfig()) - predicateChecker, err := predicatechecker.NewSchedulerBasedPredicateChecker(kubeClient, - autoscalingOptions.SchedulerConfig, make(chan struct{})) + predicateChecker, err := predicatechecker.NewSchedulerBasedPredicateChecker(informerFactory, autoscalingOptions.SchedulerConfig) if err != nil { return nil, err } @@ -451,6 +452,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter AutoscalingOptions: autoscalingOptions, ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(), KubeClient: kubeClient, + InformerFactory: informerFactory, EventsKubeClient: eventsKubeClient, DebuggingSnapshotter: debuggingSnapshotter, PredicateChecker: predicateChecker, @@ -493,6 +495,9 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter Comparator: nodeInfoComparator, } + stop := make(chan struct{}) + informerFactory.Start(stop) + // These metrics should be published only once. metrics.UpdateNapEnabled(autoscalingOptions.NodeAutoprovisioningEnabled) metrics.UpdateCPULimitsCores(autoscalingOptions.MinCoresTotal, autoscalingOptions.MaxCoresTotal) diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go b/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go index 9b7075323c14..7a0552e0a2ad 100644 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go +++ b/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go @@ -24,7 +24,6 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" - kube_client "k8s.io/client-go/kubernetes" v1listers "k8s.io/client-go/listers/core/v1" klog "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -45,9 +44,7 @@ type SchedulerBasedPredicateChecker struct { } // NewSchedulerBasedPredicateChecker builds scheduler based PredicateChecker. -func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, schedConfig *config.KubeSchedulerConfiguration, stop <-chan struct{}) (*SchedulerBasedPredicateChecker, error) { - informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) - +func NewSchedulerBasedPredicateChecker(informerFactory informers.SharedInformerFactory, schedConfig *config.KubeSchedulerConfiguration) (*SchedulerBasedPredicateChecker, error) { if schedConfig == nil { var err error schedConfig, err = scheduler_config.Default() @@ -78,10 +75,6 @@ func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, schedCo delegatingSharedLister: sharedLister, } - // this MUST be called after all the informers/listers are acquired via the - // informerFactory....Lister()/informerFactory....Informer() methods - informerFactory.Start(stop) - return checker, nil } diff --git a/cluster-autoscaler/simulator/predicatechecker/testchecker.go b/cluster-autoscaler/simulator/predicatechecker/testchecker.go index 81c3459c88ba..dd9e1745acac 100644 --- a/cluster-autoscaler/simulator/predicatechecker/testchecker.go +++ b/cluster-autoscaler/simulator/predicatechecker/testchecker.go @@ -17,6 +17,7 @@ limitations under the License. package predicatechecker import ( + "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/apis/config" scheduler_config_latest "k8s.io/kubernetes/pkg/scheduler/apis/config/latest" @@ -30,14 +31,14 @@ func NewTestPredicateChecker() (PredicateChecker, error) { } // just call out to NewSchedulerBasedPredicateChecker but use fake kubeClient - return NewSchedulerBasedPredicateChecker(clientsetfake.NewSimpleClientset(), schedConfig, make(chan struct{})) + return NewSchedulerBasedPredicateChecker(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig) } // NewTestPredicateCheckerWithCustomConfig builds test version of PredicateChecker with custom scheduler config. func NewTestPredicateCheckerWithCustomConfig(schedConfig *config.KubeSchedulerConfiguration) (PredicateChecker, error) { if schedConfig != nil { // just call out to NewSchedulerBasedPredicateChecker but use fake kubeClient - return NewSchedulerBasedPredicateChecker(clientsetfake.NewSimpleClientset(), schedConfig, make(chan struct{})) + return NewSchedulerBasedPredicateChecker(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig) } return NewTestPredicateChecker() diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index 5311fd2fc89a..7ec21240f773 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -19,12 +19,11 @@ package kubernetes import ( "time" - appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" apiv1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" client "k8s.io/client-go/kubernetes" v1appslister "k8s.io/client-go/listers/apps/v1" v1batchlister "k8s.io/client-go/listers/batch/v1" @@ -78,16 +77,17 @@ func NewListerRegistry(allNode NodeLister, readyNode NodeLister, allPodLister Po } // NewListerRegistryWithDefaultListers returns a registry filled with listers of the default implementations -func NewListerRegistryWithDefaultListers(kubeClient client.Interface, stopChannel <-chan struct{}) ListerRegistry { - allPodLister := NewAllPodLister(kubeClient, stopChannel) - readyNodeLister := NewReadyNodeLister(kubeClient, stopChannel) - allNodeLister := NewAllNodeLister(kubeClient, stopChannel) - podDisruptionBudgetLister := NewPodDisruptionBudgetLister(kubeClient, stopChannel) - daemonSetLister := NewDaemonSetLister(kubeClient, stopChannel) - replicationControllerLister := NewReplicationControllerLister(kubeClient, stopChannel) - jobLister := NewJobLister(kubeClient, stopChannel) - replicaSetLister := NewReplicaSetLister(kubeClient, stopChannel) - statefulSetLister := NewStatefulSetLister(kubeClient, stopChannel) +func NewListerRegistryWithDefaultListers(informerFactory informers.SharedInformerFactory) ListerRegistry { + allPodLister := NewScheduledPodLister(informerFactory.Core().V1().Pods().Lister()) + readyNodeLister := NewReadyNodeLister(informerFactory.Core().V1().Nodes().Lister()) + allNodeLister := NewAllNodeLister(informerFactory.Core().V1().Nodes().Lister()) + + podDisruptionBudgetLister := NewPodDisruptionBudgetLister(informerFactory.Policy().V1().PodDisruptionBudgets().Lister()) + daemonSetLister := informerFactory.Apps().V1().DaemonSets().Lister() + replicationControllerLister := informerFactory.Core().V1().ReplicationControllers().Lister() + jobLister := informerFactory.Batch().V1().Jobs().Lister() + replicaSetLister := informerFactory.Apps().V1().ReplicaSets().Lister() + statefulSetLister := informerFactory.Apps().V1().StatefulSets().Lister() return NewListerRegistry(allNodeLister, readyNodeLister, allPodLister, podDisruptionBudgetLister, daemonSetLister, replicationControllerLister, jobLister, replicaSetLister, statefulSetLister) @@ -172,27 +172,31 @@ func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod { return unschedulablePods } -// AllPodLister lists all pods. -type AllPodLister struct { +// ScheduledPodLister lists all scheduled pods. +type ScheduledPodLister struct { podLister v1lister.PodLister } // List returns all scheduled pods. -func (lister *AllPodLister) List() ([]*apiv1.Pod, error) { - return lister.podLister.List(labels.Everything()) -} +func (lister *ScheduledPodLister) List() ([]*apiv1.Pod, error) { + var pods []*apiv1.Pod -// NewAllPodLister builds AllPodLister -func NewAllPodLister(kubeClient client.Interface, stopchannel <-chan struct{}) PodLister { - selector := fields.ParseSelectorOrDie("status.phase!=" + - string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed)) - podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector) - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour) - podLister := v1lister.NewPodLister(store) - go reflector.Run(stopchannel) + allPods, err := lister.podLister.List(labels.Everything()) + if err != nil { + return pods, err + } + for _, p := range allPods { + if p.Status.Phase != apiv1.PodSucceeded && p.Status.Phase != apiv1.PodFailed { + pods = append(pods, p) + } + } + return pods, nil +} - return &AllPodLister{ - podLister: podLister, +// NewScheduledPodLister builds ScheduledPodLister +func NewScheduledPodLister(pl v1lister.PodLister) PodLister { + return &ScheduledPodLister{ + podLister: pl, } } @@ -208,24 +212,20 @@ type nodeListerImpl struct { filter func(*apiv1.Node) bool } -// NewReadyNodeLister builds a node lister that returns only ready nodes. -func NewReadyNodeLister(kubeClient client.Interface, stopChannel <-chan struct{}) NodeLister { - return NewNodeLister(kubeClient, IsNodeReadyAndSchedulable, stopChannel) +// NewAllNodeLister builds a node lister that returns all nodes (ready and unready). +func NewAllNodeLister(nl v1lister.NodeLister) NodeLister { + return NewNodeLister(nl, nil) } -// NewAllNodeLister builds a node lister that returns all nodes (ready and unready). -func NewAllNodeLister(kubeClient client.Interface, stopChannel <-chan struct{}) NodeLister { - return NewNodeLister(kubeClient, nil, stopChannel) +// NewReadyNodeLister builds a node lister that returns only ready nodes. +func NewReadyNodeLister(nl v1lister.NodeLister) NodeLister { + return NewNodeLister(nl, IsNodeReadyAndSchedulable) } // NewNodeLister builds a node lister. -func NewNodeLister(kubeClient client.Interface, filter func(*apiv1.Node) bool, stopChannel <-chan struct{}) NodeLister { - listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything()) - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &apiv1.Node{}, time.Hour) - nodeLister := v1lister.NewNodeLister(store) - go reflector.Run(stopChannel) +func NewNodeLister(nl v1lister.NodeLister, filter func(*apiv1.Node) bool) NodeLister { return &nodeListerImpl{ - nodeLister: nodeLister, + nodeLister: nl, filter: filter, } } @@ -282,61 +282,12 @@ func (lister *PodDisruptionBudgetListerImpl) List() ([]*policyv1.PodDisruptionBu } // NewPodDisruptionBudgetLister builds a pod disruption budget lister. -func NewPodDisruptionBudgetLister(kubeClient client.Interface, stopchannel <-chan struct{}) PodDisruptionBudgetLister { - listWatcher := cache.NewListWatchFromClient(kubeClient.PolicyV1().RESTClient(), "poddisruptionbudgets", apiv1.NamespaceAll, fields.Everything()) - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &policyv1.PodDisruptionBudget{}, time.Hour) - pdbLister := v1policylister.NewPodDisruptionBudgetLister(store) - go reflector.Run(stopchannel) +func NewPodDisruptionBudgetLister(pdbLister v1policylister.PodDisruptionBudgetLister) PodDisruptionBudgetLister { return &PodDisruptionBudgetListerImpl{ pdbLister: pdbLister, } } -// NewDaemonSetLister builds a daemonset lister. -func NewDaemonSetLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1appslister.DaemonSetLister { - listWatcher := cache.NewListWatchFromClient(kubeClient.AppsV1().RESTClient(), "daemonsets", apiv1.NamespaceAll, fields.Everything()) - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &appsv1.DaemonSet{}, time.Hour) - lister := v1appslister.NewDaemonSetLister(store) - go reflector.Run(stopchannel) - return lister -} - -// NewReplicationControllerLister builds a replicationcontroller lister. -func NewReplicationControllerLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1lister.ReplicationControllerLister { - listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "replicationcontrollers", apiv1.NamespaceAll, fields.Everything()) - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &apiv1.ReplicationController{}, time.Hour) - lister := v1lister.NewReplicationControllerLister(store) - go reflector.Run(stopchannel) - return lister -} - -// NewJobLister builds a job lister. -func NewJobLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1batchlister.JobLister { - listWatcher := cache.NewListWatchFromClient(kubeClient.BatchV1().RESTClient(), "jobs", apiv1.NamespaceAll, fields.Everything()) - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &batchv1.Job{}, time.Hour) - lister := v1batchlister.NewJobLister(store) - go reflector.Run(stopchannel) - return lister -} - -// NewReplicaSetLister builds a replicaset lister. -func NewReplicaSetLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1appslister.ReplicaSetLister { - listWatcher := cache.NewListWatchFromClient(kubeClient.AppsV1().RESTClient(), "replicasets", apiv1.NamespaceAll, fields.Everything()) - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &appsv1.ReplicaSet{}, time.Hour) - lister := v1appslister.NewReplicaSetLister(store) - go reflector.Run(stopchannel) - return lister -} - -// NewStatefulSetLister builds a statefulset lister. -func NewStatefulSetLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1appslister.StatefulSetLister { - listWatcher := cache.NewListWatchFromClient(kubeClient.AppsV1().RESTClient(), "statefulsets", apiv1.NamespaceAll, fields.Everything()) - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &appsv1.StatefulSet{}, time.Hour) - lister := v1appslister.NewStatefulSetLister(store) - go reflector.Run(stopchannel) - return lister -} - // NewConfigMapListerForNamespace builds a configmap lister for the passed namespace (including all). func NewConfigMapListerForNamespace(kubeClient client.Interface, stopchannel <-chan struct{}, namespace string) v1lister.ConfigMapLister {