Skip to content

Commit

Permalink
Add collector fetch timeout (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Sep 18, 2024
1 parent a403361 commit c8e62bb
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 14 deletions.
16 changes: 12 additions & 4 deletions exporter/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
)

type Config struct {
PodIP string `envconfig:"POD_IP" yaml:"podIP"`
PodNamespace string `envconfig:"POD_NAMESPACE" yaml:"podNamespace"`
ExportInterval time.Duration `envconfig:"EXPORT_INTERVAL" yaml:"exportInterval"`
Sinks map[string]Sink `yaml:"sinks"`
PodIP string `envconfig:"POD_IP" yaml:"podIP"`
PodNamespace string `envconfig:"POD_NAMESPACE" yaml:"podNamespace"`
ExportInterval time.Duration `envconfig:"EXPORT_INTERVAL" yaml:"exportInterval"`
CollectorsConcurrentFetchCount int `envconfig:"COLLECTORS_CONCURRENT_FETCH_COUNT" yaml:"collectorsConcurrentFetchCount"`
CollectorFetchTimeout time.Duration `envconfig:"COLLECTOR_FETCH_TIMEOUT" yaml:"collectorFetchTimeout"`
Sinks map[string]Sink `yaml:"sinks"`
}

type SinkType string
Expand Down Expand Up @@ -76,6 +78,12 @@ func Load(configPath string) (Config, error) {
if cfg.ExportInterval == 0 {
cfg.ExportInterval = 60 * time.Second
}
if cfg.CollectorsConcurrentFetchCount == 0 {
cfg.CollectorsConcurrentFetchCount = 20
}
if cfg.CollectorFetchTimeout == 0 {
cfg.CollectorFetchTimeout = 3 * time.Second
}

if len(cfg.Sinks) == 0 {
return Config{}, errors.New("at least one sink config is required")
Expand Down
8 changes: 5 additions & 3 deletions exporter/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ sinks:

func newTestConfig() Config {
return Config{
PodIP: "10.10.1.15",
PodNamespace: "egressd",
ExportInterval: 60 * time.Second,
PodIP: "10.10.1.15",
PodNamespace: "egressd",
ExportInterval: 60 * time.Second,
CollectorsConcurrentFetchCount: 20,
CollectorFetchTimeout: 3 * time.Second,
Sinks: map[string]Sink{
"castai": {
HTTPConfig: &SinkHTTPConfig{
Expand Down
12 changes: 7 additions & 5 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import (
)

const (
collectorsConcurrentFetch = 20
sinkConcurrentPush = 5
sinkConcurrentPush = 5
)

func New(
Expand Down Expand Up @@ -104,11 +103,11 @@ func (e *Exporter) export(ctx context.Context) error {
// Fetch network metrics from all collectors.
e.log.Debugf("fetching network metrics from %d collector(s)", collectorsCount)

pulledBatch := make(chan *pb.RawNetworkMetricBatch, collectorsConcurrentFetch)
pulledBatch := make(chan *pb.RawNetworkMetricBatch, e.cfg.CollectorsConcurrentFetchCount)

go func() {
var fetchGroup errgroup.Group
fetchGroup.SetLimit(collectorsConcurrentFetch)
fetchGroup.SetLimit(e.cfg.CollectorsConcurrentFetchCount)

for _, pod := range collectorPodsList.Items {
pod := pod
Expand All @@ -127,6 +126,9 @@ func (e *Exporter) export(ctx context.Context) error {

fetchGroup.Go(func() error {
if err := func() error {
ctx, cancel := context.WithTimeout(ctx, e.cfg.CollectorFetchTimeout)
defer cancel()

batch, err := e.fetchRawNetworkMetricsBatch(ctx, url)
if err != nil {
return fmt.Errorf("fetching metrics from collector, url=%s: %w", url, err)
Expand All @@ -138,7 +140,7 @@ func (e *Exporter) export(ctx context.Context) error {
pulledBatch <- batch
return nil
}(); err != nil {
e.log.Error(err)
e.log.Warn(err)
}
return nil
})
Expand Down
4 changes: 3 additions & 1 deletion exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ func TestExporter(t *testing.T) {
sink := &mockSink{batch: make(chan *pb.PodNetworkMetricBatch)}

cfg := config.Config{
ExportInterval: 100 * time.Millisecond,
ExportInterval: 100 * time.Millisecond,
CollectorFetchTimeout: 3 * time.Second,
CollectorsConcurrentFetchCount: 20,
Sinks: map[string]config.Sink{
"castai": {
HTTPConfig: &config.SinkHTTPConfig{},
Expand Down
2 changes: 1 addition & 1 deletion exporter/sinks/prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestPromSink(t *testing.T) {
r.Len(client.reqs, 2)

slices.SortFunc(client.reqs, func(a, b *promwrite.WriteRequest) int {
return strings.Compare(a.TimeSeries[0].Labels[0].Name, b.TimeSeries[0].Labels[0].Name)
return strings.Compare(a.TimeSeries[0].Labels[0].Value, b.TimeSeries[0].Labels[0].Value)
})

r.Equal([]promwrite.TimeSeries{
Expand Down

0 comments on commit c8e62bb

Please sign in to comment.