Skip to content

Commit

Permalink
Fix OOM issue and "http2: stream closed" issue by only returning one …
Browse files Browse the repository at this point in the history
…item for ListCustomMetrics
  • Loading branch information
CatherineF-dev committed Jan 11, 2024
1 parent 755ef2f commit a983c73
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 21 deletions.
2 changes: 1 addition & 1 deletion custom-metrics-stackdriver-adapter/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ GOOS?=linux
OUT_DIR?=build
PACKAGE=github.com/GoogleCloudPlatform/k8s-stackdriver/custom-metrics-stackdriver-adapter
PREFIX?=staging-k8s.gcr.io
TAG = v0.13.1
TAG = v0.14.0
PKG := $(shell find pkg/* -type f)

.PHONY: build docker push test clean
Expand Down
24 changes: 23 additions & 1 deletion custom-metrics-stackdriver-adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type stackdriverAdapterServerOptions struct {
// EnableDistributionSupport is a flag that indicates whether or not to allow distributions can
// be used (with special reducer labels) in the adapter
EnableDistributionSupport bool
// ListFullCustomMetrics is a flag that whether list all pod custom metrics during api discovery.
// Default = false, which only list 1 metric. Enabling this back would increase memory usage.
ListFullCustomMetrics bool
}

func (sa *StackdriverAdapter) makeProviderOrDie(o *stackdriverAdapterServerOptions, rateInterval time.Duration, alignmentPeriod time.Duration) (provider.MetricsProvider, *translator.Translator) {
Expand Down Expand Up @@ -110,7 +113,19 @@ func (sa *StackdriverAdapter) makeProviderOrDie(o *stackdriverAdapterServerOptio
conf.GenericConfig.EnableMetrics = true

translator := translator.NewTranslator(stackdriverService, gceConf, rateInterval, alignmentPeriod, mapper, o.UseNewResourceModel, o.EnableDistributionSupport)
return adapter.NewStackdriverProvider(client, mapper, gceConf, stackdriverService, translator, rateInterval, o.UseNewResourceModel, o.FallbackForContainerMetrics), translator

// If ListFullCustomMetrics is false, it returns one resource during api discovery `kubectl get --raw "/apis/custom.metrics.k8s.io/v1beta2"` to reduce memory usage.
stackdriverRequest := translator.ListMetricDescriptors(o.FallbackForContainerMetrics)
response, err := stackdriverRequest.Do()
if err != nil {
klog.Fatalf("Failed request to stackdriver api: %s", err)
}
customMetricsListCache := translator.GetMetricsFromSDDescriptorsResp(response)
if !o.ListFullCustomMetrics && len(customMetricsListCache) > 0 {
customMetricsListCache = customMetricsListCache[0:1]
}

return adapter.NewStackdriverProvider(client, mapper, gceConf, stackdriverService, translator, rateInterval, o.UseNewResourceModel, o.FallbackForContainerMetrics, customMetricsListCache), translator
}

func (sa *StackdriverAdapter) withCoreMetrics(translator *translator.Translator) error {
Expand Down Expand Up @@ -154,6 +169,7 @@ func main() {
FallbackForContainerMetrics: false,
EnableCoreMetricsAPI: false,
EnableDistributionSupport: false,
ListFullCustomMetrics: false,
}

flags.BoolVar(&serverOptions.UseNewResourceModel, "use-new-resource-model", serverOptions.UseNewResourceModel,
Expand All @@ -166,6 +182,8 @@ func main() {
"If true, fallbacks to k8s_container resource when given metric is not present on k8s_pod. At most one container with given metric is allowed for each pod.")
flags.BoolVar(&serverOptions.EnableCoreMetricsAPI, "enable-core-metrics-api", serverOptions.EnableCoreMetricsAPI,
"Experimental, do not use. Whether to enable Core Metrics API.")
flags.BoolVar(&serverOptions.ListFullCustomMetrics, "list-full-custom-metrics", serverOptions.ListFullCustomMetrics,
"whether to supporting list full custom metrics. This is a featuragate to list full custom metrics back, which should keep as false to return only 1 metric. Otherwise, it would have high memory usage issue.")
flags.StringVar(&serverOptions.MetricsAddress, "metrics-address", "",
"Endpoint with port on which Prometheus metrics server should be enabled. Example: localhost:8080. If there is no flag, Prometheus metric server is disabled and monitoring metrics are not collected.")
flags.StringVar(&serverOptions.StackdriverEndpoint, "stackdriver-endpoint", "",
Expand All @@ -175,12 +193,16 @@ func main() {

flags.Parse(os.Args)

klog.Info("serverOptions: ", serverOptions)
if !serverOptions.UseNewResourceModel && serverOptions.FallbackForContainerMetrics {
klog.Fatalf("Container metrics work only with new resource model")
}
if !serverOptions.UseNewResourceModel && serverOptions.EnableCoreMetricsAPI {
klog.Fatalf("Core metrics work only with new resource model")
}
if serverOptions.ListFullCustomMetrics {
klog.Infof("ListFullCustomMetrics is enabled, which would increase memory usage a lot. Please keep it as false, unless have to.")
}

// TODO(holubwicz): move duration config to server options
metricsProvider, translator := cmd.makeProviderOrDie(&serverOptions, 5*time.Minute, 1*time.Minute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/GoogleCloudPlatform/k8s-stackdriver/custom-metrics-stackdriver-adapter/pkg/adapter/translator"
Expand Down Expand Up @@ -55,23 +54,24 @@ type StackdriverProvider struct {
rateInterval time.Duration
translator *translator.Translator
useNewResourceModel bool
mu sync.Mutex
metricsCacheSet bool
metricsCache []provider.CustomMetricInfo
fallbackForContainerMetrics bool
}

// NewStackdriverProvider creates a StackdriverProvider
func NewStackdriverProvider(kubeClient *corev1.CoreV1Client, mapper apimeta.RESTMapper, gceConf *config.GceConfig, stackdriverService *stackdriver.Service, translator *translator.Translator, rateInterval time.Duration, useNewResourceModel bool, fallbackForContainerMetrics bool) provider.MetricsProvider {
return &StackdriverProvider{
func NewStackdriverProvider(kubeClient *corev1.CoreV1Client, mapper apimeta.RESTMapper, gceConf *config.GceConfig, stackdriverService *stackdriver.Service, translator *translator.Translator, rateInterval time.Duration, useNewResourceModel bool, fallbackForContainerMetrics bool, customMetricsListCache []provider.CustomMetricInfo) provider.MetricsProvider {
p := &StackdriverProvider{
kubeClient: kubeClient,
stackdriverService: stackdriverService,
config: gceConf,
rateInterval: rateInterval,
translator: translator,
useNewResourceModel: useNewResourceModel,
fallbackForContainerMetrics: fallbackForContainerMetrics,
metricsCache: customMetricsListCache,
}

return p
}

// GetMetricByName fetches a particular metric for a particular object.
Expand Down Expand Up @@ -308,21 +308,9 @@ func (p *StackdriverProvider) getNamespacedMetricBySelector(groupResource schema
return &custom_metrics.MetricValueList{Items: result}, nil
}

// ListAllMetrics returns all custom metrics available from Stackdriver.
// List only pod metrics
// ListAllMetrics returns one custom metric to reduce memory usage, when ListFullCustomMetrics is false (by default),
// Else, it returns all custom metrics available from Stackdriver.
func (p *StackdriverProvider) ListAllMetrics() []provider.CustomMetricInfo {
p.mu.Lock()
defer p.mu.Unlock()
if !p.metricsCacheSet {
stackdriverRequest := p.translator.ListMetricDescriptors(p.fallbackForContainerMetrics)
response, err := stackdriverRequest.Do()
if err != nil {
klog.Errorf("Failed request to stackdriver api: %s", err)
return []provider.CustomMetricInfo{}
}
p.metricsCacheSet = true
p.metricsCache = p.translator.GetMetricsFromSDDescriptorsResp(response)
}
return p.metricsCache
}

Expand Down

0 comments on commit a983c73

Please sign in to comment.