Skip to content

Commit

Permalink
move cluster feeder initialisation from main to cluster_feeder
Browse files Browse the repository at this point in the history
  • Loading branch information
Evedel committed Oct 28, 2023
1 parent c837778 commit a4fd9dd
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 67 deletions.
72 changes: 64 additions & 8 deletions vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,19 @@ const (
DefaultRecommenderName = "default"
)

type HistorySource int

const (
Checkpoints HistorySource = iota
Prometheus
None
Undefined
)

// ClusterStateFeeder can update state of ClusterState object.
type ClusterStateFeeder interface {
// InitFromHistoryProvider loads historical pod spec into clusterState.
InitFromHistoryProvider(historyProvider history.HistoryProvider) (historyInitError error)

// InitFromCheckpoints loads historical checkpoints into clusterState.
InitFromCheckpoints()
// Init initializes ClusterStateFeeder
Init() error

// LoadVPAs updates clusterState with current state of VPAs.
LoadVPAs()
Expand Down Expand Up @@ -87,6 +93,8 @@ type ClusterStateFeederFactory struct {
MemorySaveMode bool
ControllerFetcher controllerfetcher.ControllerFetcher
RecommenderName string
HistorySource HistorySource
PromHistoryConfig history.PrometheusHistoryProviderConfig
}

// Make creates new ClusterStateFeeder with internal data providers, based on kube client.
Expand All @@ -103,6 +111,8 @@ func (m ClusterStateFeederFactory) Make() *clusterStateFeeder {
memorySaveMode: m.MemorySaveMode,
controllerFetcher: m.ControllerFetcher,
recommenderName: m.RecommenderName,
historySource: m.HistorySource,
promHistoryConfig: m.PromHistoryConfig,
}
}

Expand Down Expand Up @@ -192,15 +202,61 @@ type clusterStateFeeder struct {
memorySaveMode bool
controllerFetcher controllerfetcher.ControllerFetcher
recommenderName string
historySource HistorySource
promHistoryConfig history.PrometheusHistoryProviderConfig
}

func (feeder *clusterStateFeeder) Init() error {
switch feeder.historySource {
case Checkpoints:
klog.Infof("Using checkpoints as a history provider")
feeder.initFromCheckpoints()
case Prometheus:
klog.Infof("Using prometheus as a history provider")

provider, promInitErr := history.NewPrometheusHistoryProvider(feeder.promHistoryConfig)
if promInitErr != nil {
klog.Errorf("Could not initialize history provider")
return promInitErr
}

historyInitErr := feeder.initFromHistoryProvider(provider)
if historyInitErr != nil {
klog.Errorf("Failed to load prometheus history")
return historyInitErr
}
case None:
klog.Infof("Running without a history provider")
default:
klog.Errorf("Wrong storage provider option")
return fmt.Errorf("storage provider option is not set. Supported values: prometheus, none, checkpoint")
}
return nil
}

func (feeder *clusterStateFeeder) InitFromHistoryProvider(historyProvider history.HistoryProvider) (historyInitError error) {
func GetHistorySourceFromArg(historySource string) (HistorySource, error) {
switch historySource {
case "checkpoint":
return Checkpoints, nil
case "prometheus":
return Prometheus, nil
case "none":
return None, nil
default:
klog.Error("Storage option '%s' is not supported. Supported values: prometheus, none, checkpoint", historySource)
return Undefined, fmt.Errorf("storage option '%s' is not supported. Supported values: prometheus, none, checkpoint", historySource)
}
}

func (feeder *clusterStateFeeder) initFromHistoryProvider(historyProvider history.HistoryProvider) (historyInitError error) {
historyInitError = nil
klog.V(3).Info("Initializing VPA from history provider")
clusterHistory, err := historyProvider.GetClusterHistory()
if err != nil {
historyInitError = err
klog.Errorf("Cannot get cluster history: %v", err)
}
if len(clusterHistory) == 0 {
klog.Warningf("history provider returned no pods")
}
for podID, podHistory := range clusterHistory {
klog.V(4).Infof("Adding pod %v with labels %v", podID, podHistory.LastLabels)
Expand Down Expand Up @@ -244,7 +300,7 @@ func (feeder *clusterStateFeeder) setVpaCheckpoint(checkpoint *vpa_types.Vertica
return nil
}

func (feeder *clusterStateFeeder) InitFromCheckpoints() {
func (feeder *clusterStateFeeder) initFromCheckpoints() {
klog.V(3).Info("Initializing VPA from checkpoints")
feeder.LoadVPAs()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func TestClusterStateFeeder_InitFromHistoryProvider(t *testing.T) {
feeder := clusterStateFeeder{
clusterState: clusterState,
}
feeder.InitFromHistoryProvider(&provider)
feeder.initFromHistoryProvider(&provider)
if !assert.Contains(t, feeder.clusterState.Pods, pod1) {
return
}
Expand Down
96 changes: 38 additions & 58 deletions vertical-pod-autoscaler/pkg/recommender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package main
import (
"context"
"flag"
resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
"time"

resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"

apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -56,8 +57,8 @@ var (
kubeApiQps = flag.Float64("kube-api-qps", 5.0, `QPS limit when making requests to Kubernetes apiserver`)
kubeApiBurst = flag.Float64("kube-api-burst", 10.0, `QPS burst limit when making requests to Kubernetes apiserver`)

storage = flag.String("storage", "checkpoint", `Specifies storage mode. Supported values: prometheus, none, checkpoint`)
// prometheus history provider configs
storage = flag.String("storage", "checkpoint", `Specifies storage mode. Supported values: prometheus, none, checkpoint`)
historyLength = flag.String("history-length", "8d", `How much time back prometheus have to be queried to get historical metrics`)
historyResolution = flag.String("history-resolution", "1h", `Resolution at which Prometheus is queried for historical metrics`)
queryTimeout = flag.String("prometheus-query-timeout", "5m", `How long to wait before killing long queries`)
Expand Down Expand Up @@ -123,18 +124,6 @@ func main() {
metrics_recommender.Register()
metrics_quality.Register()

var usePrometheus, useCheckpoints, useNoStorage bool
switch *storage {
case "checkpoint":
useCheckpoints = true
case "prometheus":
usePrometheus = true
case "none":
useNoStorage = true
default:
klog.Fatalf("Storage option '%s' is not supported. Supported values: prometheus, none, checkpoint", *storage)
}

var postProcessors []routines.RecommendationPostProcessor
if *postProcessorCPUasInteger {
postProcessors = append(postProcessors, &routines.IntegerCPUPostProcessor{})
Expand All @@ -159,6 +148,35 @@ func main() {
source = input_metrics.NewPodMetricsesSource(resourceclient.NewForConfigOrDie(config))
}

promQueryTimeout, err := time.ParseDuration(*queryTimeout)
if err != nil {
klog.Fatalf("Could not parse --prometheus-query-timeout as a time.Duration: %v", err)
}
promHistoryConfig := history.PrometheusHistoryProviderConfig{
Address: *prometheusAddress,
QueryTimeout: promQueryTimeout,
HistoryLength: *historyLength,
HistoryResolution: *historyResolution,
PodLabelPrefix: *podLabelPrefix,
PodLabelsMetricName: *podLabelsMetricName,
PodNamespaceLabel: *podNamespaceLabel,
PodNameLabel: *podNameLabel,
CtrNamespaceLabel: *ctrNamespaceLabel,
CtrPodNameLabel: *ctrPodNameLabel,
CtrNameLabel: *ctrNameLabel,
CadvisorMetricsJobName: *prometheusJobName,
Namespace: *vpaObjectNamespace,
PrometheusBasicAuthTransport: history.PrometheusBasicAuthTransport{
Username: *username,
Password: *password,
},
}

historySource, err := input.GetHistorySourceFromArg(*storage)
if err != nil {
klog.Fatalf("Could not initialize history source: %v", err)
}

clusterStateFeeder := input.ClusterStateFeederFactory{
PodLister: podLister,
OOMObserver: oomObserver,
Expand All @@ -171,6 +189,8 @@ func main() {
MemorySaveMode: *memorySaver,
ControllerFetcher: controllerFetcher,
RecommenderName: *recommenderName,
HistorySource: historySource,
PromHistoryConfig: promHistoryConfig,
}.Make()
controllerFetcher.Start(context.Background(), scaleCacheLoopPeriod)

Expand All @@ -183,52 +203,12 @@ func main() {
PodResourceRecommender: logic.CreatePodResourceRecommender(),
RecommendationPostProcessors: postProcessors,
CheckpointsGCInterval: *checkpointsGCInterval,
UseCheckpoints: useCheckpoints,
UseCheckpoints: historySource == input.Checkpoints,
}.Make()

promQueryTimeout, err := time.ParseDuration(*queryTimeout)
if err != nil {
klog.Fatalf("Could not parse --prometheus-query-timeout as a time.Duration: %v", err)
}

if useCheckpoints {
klog.Infof("Using checkpoints as a history provider")
recommender.GetClusterStateFeeder().InitFromCheckpoints()
} else if usePrometheus {
klog.Infof("Using prometheus as a history provider")
config := history.PrometheusHistoryProviderConfig{
Address: *prometheusAddress,
QueryTimeout: promQueryTimeout,
HistoryLength: *historyLength,
HistoryResolution: *historyResolution,
PodLabelPrefix: *podLabelPrefix,
PodLabelsMetricName: *podLabelsMetricName,
PodNamespaceLabel: *podNamespaceLabel,
PodNameLabel: *podNameLabel,
CtrNamespaceLabel: *ctrNamespaceLabel,
CtrPodNameLabel: *ctrPodNameLabel,
CtrNameLabel: *ctrNameLabel,
CadvisorMetricsJobName: *prometheusJobName,
Namespace: *vpaObjectNamespace,
PrometheusBasicAuthTransport: history.PrometheusBasicAuthTransport{
Username: *username,
Password: *password,
},
}

provider, err := history.NewPrometheusHistoryProvider(config)
if err != nil {
klog.Fatalf("Could not initialize history provider: %v", err)
}

historyInitErr := recommender.GetClusterStateFeeder().InitFromHistoryProvider(provider)
if historyInitErr != nil {
klog.Fatalf("Failed to load prometheus history")
}
} else if useNoStorage {
klog.Infof("Running without a history provider")
} else {
klog.Fatalf("Storage provider option is not set. Supported values: prometheus, none, checkpoint")
stateFeederInitErr := recommender.GetClusterStateFeeder().Init()
if stateFeederInitErr != nil {
klog.Fatalf("Could not initialize cluster state feeder: %v", stateFeederInitErr)
}

ticker := time.Tick(*metricsFetcherInterval)
Expand Down

0 comments on commit a4fd9dd

Please sign in to comment.