Skip to content

Commit

Permalink
Export received bytes metric (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Sep 2, 2024
1 parent f95f117 commit 55382c3
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 12 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ jobs:
args: --timeout=5m
skip-pkg-cache: true
skip-build-cache: true
version: v1.58.2

- name: Test
if: ${{ github.event_name == 'pull_request' && !contains(env.head_commit_message, '#skip-test') }}
Expand Down
7 changes: 4 additions & 3 deletions exporter/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type SinkHTTPConfig struct {
}

type SinkPromRemoteWriteConfig struct {
URL string `yaml:"url"`
Headers map[string]string `yaml:"headers"`
Labels map[string]string `yaml:"labels"`
URL string `yaml:"url"`
Headers map[string]string `yaml:"headers"`
Labels map[string]string `yaml:"labels"`
SendReceivedBytesMetric bool `yaml:"sendReceivedBytesMetric"`
}

func Load(configPath string) (Config, error) {
Expand Down
27 changes: 23 additions & 4 deletions exporter/sinks/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"inet.af/netaddr"

"github.com/castai/egressd/exporter/config"
Expand Down Expand Up @@ -46,8 +47,26 @@ type PromRemoteWriteSink struct {
func (s *PromRemoteWriteSink) Push(ctx context.Context, batch *pb.PodNetworkMetricBatch) error {
now := s.timeGetter()

ts := make([]promwrite.TimeSeries, 0, len(batch.Items))
var errg errgroup.Group
errg.Go(func() error {
return s.pushMetric(ctx, batch, now, "egressd_transmit_bytes_total", func(v *pb.PodNetworkMetric) float64 {
return float64(v.TxBytes)
})
})

if s.cfg.SendReceivedBytesMetric {
errg.Go(func() error {
return s.pushMetric(ctx, batch, now, "egressd_received_bytes_total", func(v *pb.PodNetworkMetric) float64 {
return float64(v.RxBytes)
})
})
}

return errg.Wait()
}

func (s *PromRemoteWriteSink) pushMetric(ctx context.Context, batch *pb.PodNetworkMetricBatch, now time.Time, name string, valueFunc func(v *pb.PodNetworkMetric) float64) error {
ts := make([]promwrite.TimeSeries, 0, len(batch.Items))
for _, m := range batch.Items {
dstIP, _ := netaddr.ParseIP(m.DstIp)
dstIPType := "public"
Expand All @@ -56,7 +75,7 @@ func (s *PromRemoteWriteSink) Push(ctx context.Context, batch *pb.PodNetworkMetr
}
// Initial labels, sorted by label name asc.
labels := []promwrite.Label{
{Name: "__name__", Value: "egressd_transmit_bytes_total"},
{Name: "__name__", Value: name},
{Name: "cross_zone", Value: isCrossZoneValue(m)},
{Name: "dst_ip", Value: dstIP.String()},
{Name: "dst_ip_type", Value: dstIPType},
Expand Down Expand Up @@ -87,12 +106,12 @@ func (s *PromRemoteWriteSink) Push(ctx context.Context, batch *pb.PodNetworkMetr
Labels: labels,
Sample: promwrite.Sample{
Time: now,
Value: float64(m.TxBytes),
Value: valueFunc(m),
},
})
}

s.log.Infof("pushing metrics, timeseries=%d", len(ts))
s.log.Infof("pushing metrics %q, timeseries=%d", name, len(ts))

_, err := s.client.Write(ctx, &promwrite.WriteRequest{
TimeSeries: ts,
Expand Down
40 changes: 35 additions & 5 deletions exporter/sinks/prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package sinks

import (
"context"
"slices"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -80,7 +82,7 @@ func TestPromSink(t *testing.T) {
Sample: promwrite.Sample{Time: ts, Value: 35},
},
},
client.req.TimeSeries)
client.reqs[0].TimeSeries)
})

t.Run("push with custom labels", func(t *testing.T) {
Expand All @@ -92,6 +94,7 @@ func TestPromSink(t *testing.T) {
Labels: map[string]string{
"xz_label_key": "xz_label_value",
},
SendReceivedBytesMetric: true,
}
client := &mockPromWriteClient{}
ts := time.Date(2023, time.April, 13, 13, 41, 48, 926278000, time.UTC)
Expand Down Expand Up @@ -125,6 +128,34 @@ func TestPromSink(t *testing.T) {
},
}
r.NoError(sink.Push(ctx, batch))
r.Len(client.reqs, 2)

slices.SortFunc(client.reqs, func(a, b *promwrite.WriteRequest) int {
return strings.Compare(a.TimeSeries[0].Labels[0].Name, b.TimeSeries[0].Labels[0].Name)
})

r.Equal([]promwrite.TimeSeries{
{
Labels: []promwrite.Label{
{Name: "__name__", Value: "egressd_received_bytes_total"},
{Name: "cross_zone", Value: "false"},
{Name: "dst_ip", Value: "10.14.7.5"},
{Name: "dst_ip_type", Value: "private"},
{Name: "dst_namespace", Value: "team2"},
{Name: "dst_node", Value: "n1"},
{Name: "dst_pod", Value: "p2"},
{Name: "dst_zone", Value: "us-east-1a"},
{Name: "proto", Value: "TCP"},
{Name: "src_ip", Value: "10.14.7.12"},
{Name: "src_namespace", Value: "team1"},
{Name: "src_node", Value: "n1"},
{Name: "src_pod", Value: "p1"},
{Name: "src_zone", Value: "us-east-1a"},
{Name: "xz_label_key", Value: "xz_label_value"},
},
Sample: promwrite.Sample{Time: ts, Value: 30},
},
}, client.reqs[0].TimeSeries)

r.Equal([]promwrite.TimeSeries{
{
Expand All @@ -147,17 +178,16 @@ func TestPromSink(t *testing.T) {
},
Sample: promwrite.Sample{Time: ts, Value: 35},
},
},
client.req.TimeSeries)
}, client.reqs[1].TimeSeries)
})

}

type mockPromWriteClient struct {
req *promwrite.WriteRequest
reqs []*promwrite.WriteRequest
}

func (m *mockPromWriteClient) Write(ctx context.Context, req *promwrite.WriteRequest, options ...promwrite.WriteOption) (*promwrite.WriteResponse, error) {
m.req = req
m.reqs = append(m.reqs, req)
return nil, nil
}

0 comments on commit 55382c3

Please sign in to comment.