From c8e62bb41add4cbfce84a57bc7bd3832515d94ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?And=C5=BEej=20Maciusovi=C4=8D?= Date: Wed, 18 Sep 2024 13:30:36 +0300 Subject: [PATCH] Add collector fetch timeout (#118) --- exporter/config/config.go | 16 ++++++++++++---- exporter/config/config_test.go | 8 +++++--- exporter/exporter.go | 12 +++++++----- exporter/exporter_test.go | 4 +++- exporter/sinks/prom_test.go | 2 +- 5 files changed, 28 insertions(+), 14 deletions(-) diff --git a/exporter/config/config.go b/exporter/config/config.go index 71bbf53..77c472d 100644 --- a/exporter/config/config.go +++ b/exporter/config/config.go @@ -10,10 +10,12 @@ import ( ) type Config struct { - PodIP string `envconfig:"POD_IP" yaml:"podIP"` - PodNamespace string `envconfig:"POD_NAMESPACE" yaml:"podNamespace"` - ExportInterval time.Duration `envconfig:"EXPORT_INTERVAL" yaml:"exportInterval"` - Sinks map[string]Sink `yaml:"sinks"` + PodIP string `envconfig:"POD_IP" yaml:"podIP"` + PodNamespace string `envconfig:"POD_NAMESPACE" yaml:"podNamespace"` + ExportInterval time.Duration `envconfig:"EXPORT_INTERVAL" yaml:"exportInterval"` + CollectorsConcurrentFetchCount int `envconfig:"COLLECTORS_CONCURRENT_FETCH_COUNT" yaml:"collectorsConcurrentFetchCount"` + CollectorFetchTimeout time.Duration `envconfig:"COLLECTOR_FETCH_TIMEOUT" yaml:"collectorFetchTimeout"` + Sinks map[string]Sink `yaml:"sinks"` } type SinkType string @@ -76,6 +78,12 @@ func Load(configPath string) (Config, error) { if cfg.ExportInterval == 0 { cfg.ExportInterval = 60 * time.Second } + if cfg.CollectorsConcurrentFetchCount == 0 { + cfg.CollectorsConcurrentFetchCount = 20 + } + if cfg.CollectorFetchTimeout == 0 { + cfg.CollectorFetchTimeout = 3 * time.Second + } if len(cfg.Sinks) == 0 { return Config{}, errors.New("at least one sink config is required") diff --git a/exporter/config/config_test.go b/exporter/config/config_test.go index 5c9c20f..722901d 100644 --- a/exporter/config/config_test.go +++ b/exporter/config/config_test.go @@ -66,9 +66,11 @@ sinks: func newTestConfig() Config { return Config{ - PodIP: "10.10.1.15", - PodNamespace: "egressd", - ExportInterval: 60 * time.Second, + PodIP: "10.10.1.15", + PodNamespace: "egressd", + ExportInterval: 60 * time.Second, + CollectorsConcurrentFetchCount: 20, + CollectorFetchTimeout: 3 * time.Second, Sinks: map[string]Sink{ "castai": { HTTPConfig: &SinkHTTPConfig{ diff --git a/exporter/exporter.go b/exporter/exporter.go index 9c9730f..219759d 100644 --- a/exporter/exporter.go +++ b/exporter/exporter.go @@ -27,8 +27,7 @@ import ( ) const ( - collectorsConcurrentFetch = 20 - sinkConcurrentPush = 5 + sinkConcurrentPush = 5 ) func New( @@ -104,11 +103,11 @@ func (e *Exporter) export(ctx context.Context) error { // Fetch network metrics from all collectors. e.log.Debugf("fetching network metrics from %d collector(s)", collectorsCount) - pulledBatch := make(chan *pb.RawNetworkMetricBatch, collectorsConcurrentFetch) + pulledBatch := make(chan *pb.RawNetworkMetricBatch, e.cfg.CollectorsConcurrentFetchCount) go func() { var fetchGroup errgroup.Group - fetchGroup.SetLimit(collectorsConcurrentFetch) + fetchGroup.SetLimit(e.cfg.CollectorsConcurrentFetchCount) for _, pod := range collectorPodsList.Items { pod := pod @@ -127,6 +126,9 @@ func (e *Exporter) export(ctx context.Context) error { fetchGroup.Go(func() error { if err := func() error { + ctx, cancel := context.WithTimeout(ctx, e.cfg.CollectorFetchTimeout) + defer cancel() + batch, err := e.fetchRawNetworkMetricsBatch(ctx, url) if err != nil { return fmt.Errorf("fetching metrics from collector, url=%s: %w", url, err) @@ -138,7 +140,7 @@ func (e *Exporter) export(ctx context.Context) error { pulledBatch <- batch return nil }(); err != nil { - e.log.Error(err) + e.log.Warn(err) } return nil }) diff --git a/exporter/exporter_test.go b/exporter/exporter_test.go index 79781f8..9cfc7d0 100644 --- a/exporter/exporter_test.go +++ b/exporter/exporter_test.go @@ -145,7 +145,9 @@ func TestExporter(t *testing.T) { sink := &mockSink{batch: make(chan *pb.PodNetworkMetricBatch)} cfg := config.Config{ - ExportInterval: 100 * time.Millisecond, + ExportInterval: 100 * time.Millisecond, + CollectorFetchTimeout: 3 * time.Second, + CollectorsConcurrentFetchCount: 20, Sinks: map[string]config.Sink{ "castai": { HTTPConfig: &config.SinkHTTPConfig{}, diff --git a/exporter/sinks/prom_test.go b/exporter/sinks/prom_test.go index 89e66cc..f6098aa 100644 --- a/exporter/sinks/prom_test.go +++ b/exporter/sinks/prom_test.go @@ -133,7 +133,7 @@ func TestPromSink(t *testing.T) { r.Len(client.reqs, 2) slices.SortFunc(client.reqs, func(a, b *promwrite.WriteRequest) int { - return strings.Compare(a.TimeSeries[0].Labels[0].Name, b.TimeSeries[0].Labels[0].Name) + return strings.Compare(a.TimeSeries[0].Labels[0].Value, b.TimeSeries[0].Labels[0].Value) }) r.Equal([]promwrite.TimeSeries{