Skip to content

Commit

Permalink
Use informer factory to reuse listers
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Lin <[email protected]>
  • Loading branch information
linxiulei committed Sep 14, 2023
1 parent 43fd1fa commit 9a0dc3c
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 105 deletions.
6 changes: 3 additions & 3 deletions cluster-autoscaler/context/autoscaling_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
kube_record "k8s.io/client-go/tools/record"
klog "k8s.io/klog/v2"
Expand Down Expand Up @@ -127,9 +128,8 @@ func NewAutoscalingContext(
}

// NewAutoscalingKubeClients builds AutoscalingKubeClients out of basic client.
func NewAutoscalingKubeClients(opts config.AutoscalingOptions, kubeClient, eventsKubeClient kube_client.Interface) *AutoscalingKubeClients {
listerRegistryStopChannel := make(chan struct{})
listerRegistry := kube_util.NewListerRegistryWithDefaultListers(kubeClient, listerRegistryStopChannel)
func NewAutoscalingKubeClients(opts config.AutoscalingOptions, kubeClient, eventsKubeClient kube_client.Interface, informerFactory informers.SharedInformerFactory) *AutoscalingKubeClients {
listerRegistry := kube_util.NewListerRegistryWithDefaultListers(informerFactory)
kubeEventRecorder := kube_util.CreateEventRecorder(eventsKubeClient, opts.RecordDuplicatedEvents)
logRecorder, err := utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, opts.WriteStatusConfigMap, opts.StatusConfigMapName)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
)

Expand All @@ -44,6 +45,7 @@ type AutoscalerOptions struct {
config.AutoscalingOptions
KubeClient kube_client.Interface
EventsKubeClient kube_client.Interface
InformerFactory informers.SharedInformerFactory
AutoscalingKubeClients *context.AutoscalingKubeClients
CloudProvider cloudprovider.CloudProvider
PredicateChecker predicatechecker.PredicateChecker
Expand Down Expand Up @@ -97,7 +99,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error {
opts.Processors = ca_processors.DefaultProcessors(opts.AutoscalingOptions)
}
if opts.AutoscalingKubeClients == nil {
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.EventsKubeClient)
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.EventsKubeClient, opts.InformerFactory)
}
if opts.ClusterSnapshot == nil {
opts.ClusterSnapshot = clustersnapshot.NewBasicClusterSnapshot()
Expand Down
9 changes: 7 additions & 2 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
"k8s.io/autoscaler/cluster-autoscaler/version"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -437,11 +438,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
kubeClientConfig.Burst = autoscalingOptions.KubeClientBurst
kubeClientConfig.QPS = float32(autoscalingOptions.KubeClientQPS)
kubeClient := createKubeClient(kubeClientConfig)
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)

eventsKubeClient := createKubeClient(getKubeConfig())

predicateChecker, err := predicatechecker.NewSchedulerBasedPredicateChecker(kubeClient,
autoscalingOptions.SchedulerConfig, make(chan struct{}))
predicateChecker, err := predicatechecker.NewSchedulerBasedPredicateChecker(informerFactory, autoscalingOptions.SchedulerConfig)
if err != nil {
return nil, err
}
Expand All @@ -451,6 +452,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(),
KubeClient: kubeClient,
InformerFactory: informerFactory,
EventsKubeClient: eventsKubeClient,
DebuggingSnapshotter: debuggingSnapshotter,
PredicateChecker: predicateChecker,
Expand Down Expand Up @@ -493,6 +495,9 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
Comparator: nodeInfoComparator,
}

stop := make(chan struct{})
informerFactory.Start(stop)

// These metrics should be published only once.
metrics.UpdateNapEnabled(autoscalingOptions.NodeAutoprovisioningEnabled)
metrics.UpdateCPULimitsCores(autoscalingOptions.MinCoresTotal, autoscalingOptions.MaxCoresTotal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
v1listers "k8s.io/client-go/listers/core/v1"
klog "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
Expand All @@ -45,9 +44,7 @@ type SchedulerBasedPredicateChecker struct {
}

// NewSchedulerBasedPredicateChecker builds scheduler based PredicateChecker.
func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, schedConfig *config.KubeSchedulerConfiguration, stop <-chan struct{}) (*SchedulerBasedPredicateChecker, error) {
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)

func NewSchedulerBasedPredicateChecker(informerFactory informers.SharedInformerFactory, schedConfig *config.KubeSchedulerConfiguration) (*SchedulerBasedPredicateChecker, error) {
if schedConfig == nil {
var err error
schedConfig, err = scheduler_config.Default()
Expand Down Expand Up @@ -78,10 +75,6 @@ func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, schedCo
delegatingSharedLister: sharedLister,
}

// this MUST be called after all the informers/listers are acquired via the
// informerFactory....Lister()/informerFactory....Informer() methods
informerFactory.Start(stop)

return checker, nil
}

Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/simulator/predicatechecker/testchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package predicatechecker

import (
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
scheduler_config_latest "k8s.io/kubernetes/pkg/scheduler/apis/config/latest"
Expand All @@ -30,14 +31,14 @@ func NewTestPredicateChecker() (PredicateChecker, error) {
}

// just call out to NewSchedulerBasedPredicateChecker but use fake kubeClient
return NewSchedulerBasedPredicateChecker(clientsetfake.NewSimpleClientset(), schedConfig, make(chan struct{}))
return NewSchedulerBasedPredicateChecker(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig)
}

// NewTestPredicateCheckerWithCustomConfig builds test version of PredicateChecker with custom scheduler config.
func NewTestPredicateCheckerWithCustomConfig(schedConfig *config.KubeSchedulerConfiguration) (PredicateChecker, error) {
if schedConfig != nil {
// just call out to NewSchedulerBasedPredicateChecker but use fake kubeClient
return NewSchedulerBasedPredicateChecker(clientsetfake.NewSimpleClientset(), schedConfig, make(chan struct{}))
return NewSchedulerBasedPredicateChecker(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig)
}

return NewTestPredicateChecker()
Expand Down
129 changes: 40 additions & 89 deletions cluster-autoscaler/utils/kubernetes/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package kubernetes
import (
"time"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
client "k8s.io/client-go/kubernetes"
v1appslister "k8s.io/client-go/listers/apps/v1"
v1batchlister "k8s.io/client-go/listers/batch/v1"
Expand Down Expand Up @@ -78,16 +77,17 @@ func NewListerRegistry(allNode NodeLister, readyNode NodeLister, allPodLister Po
}

// NewListerRegistryWithDefaultListers returns a registry filled with listers of the default implementations
func NewListerRegistryWithDefaultListers(kubeClient client.Interface, stopChannel <-chan struct{}) ListerRegistry {
allPodLister := NewAllPodLister(kubeClient, stopChannel)
readyNodeLister := NewReadyNodeLister(kubeClient, stopChannel)
allNodeLister := NewAllNodeLister(kubeClient, stopChannel)
podDisruptionBudgetLister := NewPodDisruptionBudgetLister(kubeClient, stopChannel)
daemonSetLister := NewDaemonSetLister(kubeClient, stopChannel)
replicationControllerLister := NewReplicationControllerLister(kubeClient, stopChannel)
jobLister := NewJobLister(kubeClient, stopChannel)
replicaSetLister := NewReplicaSetLister(kubeClient, stopChannel)
statefulSetLister := NewStatefulSetLister(kubeClient, stopChannel)
func NewListerRegistryWithDefaultListers(informerFactory informers.SharedInformerFactory) ListerRegistry {
allPodLister := NewScheduledPodLister(informerFactory.Core().V1().Pods().Lister())
readyNodeLister := NewReadyNodeLister(informerFactory.Core().V1().Nodes().Lister())
allNodeLister := NewAllNodeLister(informerFactory.Core().V1().Nodes().Lister())

podDisruptionBudgetLister := NewPodDisruptionBudgetLister(informerFactory.Policy().V1().PodDisruptionBudgets().Lister())
daemonSetLister := informerFactory.Apps().V1().DaemonSets().Lister()
replicationControllerLister := informerFactory.Core().V1().ReplicationControllers().Lister()
jobLister := informerFactory.Batch().V1().Jobs().Lister()
replicaSetLister := informerFactory.Apps().V1().ReplicaSets().Lister()
statefulSetLister := informerFactory.Apps().V1().StatefulSets().Lister()
return NewListerRegistry(allNodeLister, readyNodeLister, allPodLister,
podDisruptionBudgetLister, daemonSetLister, replicationControllerLister,
jobLister, replicaSetLister, statefulSetLister)
Expand Down Expand Up @@ -172,27 +172,31 @@ func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod {
return unschedulablePods
}

// AllPodLister lists all pods.
type AllPodLister struct {
// ScheduledPodLister lists all scheduled pods.
type ScheduledPodLister struct {
podLister v1lister.PodLister
}

// List returns all scheduled pods.
func (lister *AllPodLister) List() ([]*apiv1.Pod, error) {
return lister.podLister.List(labels.Everything())
}
func (lister *ScheduledPodLister) List() ([]*apiv1.Pod, error) {
var pods []*apiv1.Pod

// NewAllPodLister builds AllPodLister
func NewAllPodLister(kubeClient client.Interface, stopchannel <-chan struct{}) PodLister {
selector := fields.ParseSelectorOrDie("status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
podLister := v1lister.NewPodLister(store)
go reflector.Run(stopchannel)
allPods, err := lister.podLister.List(labels.Everything())
if err != nil {
return pods, err
}
for _, p := range allPods {
if p.Status.Phase != apiv1.PodSucceeded && p.Status.Phase != apiv1.PodFailed {
pods = append(pods, p)
}
}
return pods, nil
}

return &AllPodLister{
podLister: podLister,
// NewScheduledPodLister builds ScheduledPodLister
func NewScheduledPodLister(pl v1lister.PodLister) PodLister {
return &ScheduledPodLister{
podLister: pl,
}
}

Expand All @@ -208,24 +212,20 @@ type nodeListerImpl struct {
filter func(*apiv1.Node) bool
}

// NewReadyNodeLister builds a node lister that returns only ready nodes.
func NewReadyNodeLister(kubeClient client.Interface, stopChannel <-chan struct{}) NodeLister {
return NewNodeLister(kubeClient, IsNodeReadyAndSchedulable, stopChannel)
// NewAllNodeLister builds a node lister that returns all nodes (ready and unready).
func NewAllNodeLister(nl v1lister.NodeLister) NodeLister {
return NewNodeLister(nl, nil)
}

// NewAllNodeLister builds a node lister that returns all nodes (ready and unready).
func NewAllNodeLister(kubeClient client.Interface, stopChannel <-chan struct{}) NodeLister {
return NewNodeLister(kubeClient, nil, stopChannel)
// NewReadyNodeLister builds a node lister that returns only ready nodes.
func NewReadyNodeLister(nl v1lister.NodeLister) NodeLister {
return NewNodeLister(nl, IsNodeReadyAndSchedulable)
}

// NewNodeLister builds a node lister.
func NewNodeLister(kubeClient client.Interface, filter func(*apiv1.Node) bool, stopChannel <-chan struct{}) NodeLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &apiv1.Node{}, time.Hour)
nodeLister := v1lister.NewNodeLister(store)
go reflector.Run(stopChannel)
func NewNodeLister(nl v1lister.NodeLister, filter func(*apiv1.Node) bool) NodeLister {
return &nodeListerImpl{
nodeLister: nodeLister,
nodeLister: nl,
filter: filter,
}
}
Expand Down Expand Up @@ -282,61 +282,12 @@ func (lister *PodDisruptionBudgetListerImpl) List() ([]*policyv1.PodDisruptionBu
}

// NewPodDisruptionBudgetLister builds a pod disruption budget lister.
func NewPodDisruptionBudgetLister(kubeClient client.Interface, stopchannel <-chan struct{}) PodDisruptionBudgetLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.PolicyV1().RESTClient(), "poddisruptionbudgets", apiv1.NamespaceAll, fields.Everything())
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &policyv1.PodDisruptionBudget{}, time.Hour)
pdbLister := v1policylister.NewPodDisruptionBudgetLister(store)
go reflector.Run(stopchannel)
func NewPodDisruptionBudgetLister(pdbLister v1policylister.PodDisruptionBudgetLister) PodDisruptionBudgetLister {
return &PodDisruptionBudgetListerImpl{
pdbLister: pdbLister,
}
}

// NewDaemonSetLister builds a daemonset lister.
func NewDaemonSetLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1appslister.DaemonSetLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.AppsV1().RESTClient(), "daemonsets", apiv1.NamespaceAll, fields.Everything())
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &appsv1.DaemonSet{}, time.Hour)
lister := v1appslister.NewDaemonSetLister(store)
go reflector.Run(stopchannel)
return lister
}

// NewReplicationControllerLister builds a replicationcontroller lister.
func NewReplicationControllerLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1lister.ReplicationControllerLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "replicationcontrollers", apiv1.NamespaceAll, fields.Everything())
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &apiv1.ReplicationController{}, time.Hour)
lister := v1lister.NewReplicationControllerLister(store)
go reflector.Run(stopchannel)
return lister
}

// NewJobLister builds a job lister.
func NewJobLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1batchlister.JobLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.BatchV1().RESTClient(), "jobs", apiv1.NamespaceAll, fields.Everything())
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &batchv1.Job{}, time.Hour)
lister := v1batchlister.NewJobLister(store)
go reflector.Run(stopchannel)
return lister
}

// NewReplicaSetLister builds a replicaset lister.
func NewReplicaSetLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1appslister.ReplicaSetLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.AppsV1().RESTClient(), "replicasets", apiv1.NamespaceAll, fields.Everything())
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &appsv1.ReplicaSet{}, time.Hour)
lister := v1appslister.NewReplicaSetLister(store)
go reflector.Run(stopchannel)
return lister
}

// NewStatefulSetLister builds a statefulset lister.
func NewStatefulSetLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1appslister.StatefulSetLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.AppsV1().RESTClient(), "statefulsets", apiv1.NamespaceAll, fields.Everything())
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &appsv1.StatefulSet{}, time.Hour)
lister := v1appslister.NewStatefulSetLister(store)
go reflector.Run(stopchannel)
return lister
}

// NewConfigMapListerForNamespace builds a configmap lister for the passed namespace (including all).
func NewConfigMapListerForNamespace(kubeClient client.Interface, stopchannel <-chan struct{},
namespace string) v1lister.ConfigMapLister {
Expand Down

0 comments on commit 9a0dc3c

Please sign in to comment.