Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move ip2domain resolving to exporter #50

Merged
merged 13 commits into from
Aug 16, 2023
4 changes: 2 additions & 2 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ func run(log logrus.FieldLogger) error {
return err
}

var ip2dns dns.LookuperStarter = &dns.Noop{}
var ip2dns dns.DNSCollector = &dns.Noop{}
if *ebpfDNSTracerEnabled && ebpf.IsKernelBTFAvailable() {
tracer := ebpf.NewTracer(log, ebpf.Config{
QueueSize: *ebpfDNSTracerQueueSize,
})
ip2dns = &dns.IP2DNS{Tracer: tracer}
ip2dns = dns.NewIP2DNS(tracer, log)
}

cfg := collector.Config{
Expand Down
2 changes: 1 addition & 1 deletion cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func run(log logrus.FieldLogger) error {
return errors.New("not sinks configured")
}

ex := exporter.New(log, cfg, kw, clientset, sinksList)
ex := exporter.New(ctx, log, cfg, kw, clientset, sinksList)
go func() {
if err := ex.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
log.Errorf("exporter start: %v", err)
Expand Down
25 changes: 7 additions & 18 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
corev1 "k8s.io/api/core/v1"

"github.com/castai/egressd/conntrack"
"github.com/castai/egressd/dns"
"github.com/castai/egressd/metrics"
"github.com/castai/egressd/pb"
)
Expand Down Expand Up @@ -52,14 +53,14 @@ type rawNetworkMetric struct {
lifetime time.Time
}

type ipLookup interface{ Lookup(netaddr.IP) string }
type dnsRecorder interface{ Records() []*pb.IP2Domain }

func New(
cfg Config,
log logrus.FieldLogger,
podsWatcher podsWatcher,
conntracker conntrack.Client,
ip2dns ipLookup,
ip2dns dnsRecorder,
currentTimeGetter func() time.Time,
) *Collector {
excludeNsMap := map[string]struct{}{}
Expand Down Expand Up @@ -95,7 +96,7 @@ type Collector struct {
log logrus.FieldLogger
podsWatcher podsWatcher
conntracker conntrack.Client
ip2dns ipLookup
ip2dns dnsRecorder
entriesCache map[uint64]*conntrack.Entry
podMetrics map[uint64]*rawNetworkMetric
excludeNsMap map[string]struct{}
Expand Down Expand Up @@ -131,7 +132,7 @@ func (c *Collector) GetRawNetworkMetricsHandler(w http.ResponseWriter, req *http
items = append(items, m.RawNetworkMetric)
}

batch := &pb.RawNetworkMetricBatch{Items: items}
batch := &pb.RawNetworkMetricBatch{Items: items, Ip2Domain: c.ip2dns.Records()}
batchBytes, err := proto.Marshal(batch)
if err != nil {
c.log.Errorf("marshal batch: %v", err)
Expand Down Expand Up @@ -190,12 +191,7 @@ func (c *Collector) collect() error {
c.entriesCache[connKey] = conn

groupKey := entryGroupKey(conn)
srcName := c.ip2dns.Lookup(conn.Src.IP())
dstName := c.ip2dns.Lookup(conn.Dst.IP())
if pm, found := c.podMetrics[groupKey]; found {
pm.SrcDnsName = srcName
pm.DstDnsName = dstName

pm.TxBytes += int64(txBytes)
pm.TxPackets += int64(txPackets)
pm.RxBytes += int64(rxBytes)
Expand All @@ -206,10 +202,8 @@ func (c *Collector) collect() error {
} else {
c.podMetrics[groupKey] = &rawNetworkMetric{
RawNetworkMetric: &pb.RawNetworkMetric{
SrcDnsName: srcName,
DstDnsName: dstName,
SrcIp: toIPint32(conn.Src.IP()),
DstIp: toIPint32(conn.Dst.IP()),
SrcIp: dns.ToIPint32(conn.Src.IP()),
DstIp: dns.ToIPint32(conn.Dst.IP()),
TxBytes: int64(conn.TxBytes),
TxPackets: int64(conn.TxPackets),
RxBytes: int64(conn.RxBytes),
Expand Down Expand Up @@ -317,8 +311,3 @@ func conntrackEntryKey(conn *conntrack.Entry) uint64 {
conntrackEntryHash.Reset()
return res
}

func toIPint32(ip netaddr.IP) int32 {
b := ip.As4()
return int32(binary.BigEndian.Uint32([]byte{b[0], b[1], b[2], b[3]}))
}
31 changes: 20 additions & 11 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/castai/egressd/conntrack"
"github.com/castai/egressd/dns"
"github.com/castai/egressd/pb"
)

Expand Down Expand Up @@ -358,7 +359,7 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {
},
}

newCollector := func(connTracker conntrack.Client, ip2dns ipLookup) *Collector {
newCollector := func(connTracker conntrack.Client, ip2dns dnsRecorder) *Collector {
return New(Config{
ReadInterval: time.Millisecond,
CleanupInterval: 3 * time.Millisecond,
Expand Down Expand Up @@ -389,7 +390,7 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {

connTracker := &mockConntrack{entries: initialEntries}
dstIp := initialEntries[0].Dst.IP()
ip2dns := mockIP2DNS{dstIp: "first-destination.example.com"}
ip2dns := mockIP2DNS{dns.ToIPint32(dstIp): "first-destination.example.com"}

coll := newCollector(connTracker, ip2dns)
coll.cfg.SendTrafficDelta = false
Expand All @@ -400,12 +401,10 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {
key1 := entryGroupKey(&initialEntries[0])
r.EqualValues(20, coll.podMetrics[key1].TxBytes)
r.EqualValues(3, coll.podMetrics[key1].TxPackets)
r.Equal("first-destination.example.com", coll.podMetrics[key1].DstDnsName)

key2 := entryGroupKey(&initialEntries[1])
r.EqualValues(10, coll.podMetrics[key2].RxBytes)
r.EqualValues(2, coll.podMetrics[key2].RxPackets)
r.Equal("", coll.podMetrics[key2].DstDnsName)

initialEntries[0].TxBytes += 10
initialEntries[0].TxPackets += 2
Expand All @@ -429,6 +428,10 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {
r.NoError(err)
r.Len(batch.Items, 2)

r.Len(batch.Ip2Domain, 1)
r.Equal("first-destination.example.com", batch.Ip2Domain[0].Domain)
r.Equal(dns.ToIPint32(dstIp), batch.Ip2Domain[0].Ip)

// Check values are the same as on the last collecting action
r.EqualValues(30, batch.Items[0].TxBytes)
r.EqualValues(5, batch.Items[0].TxPackets)
Expand Down Expand Up @@ -471,7 +474,7 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {

connTracker := &mockConntrack{entries: initialEntries}
dstIp := initialEntries[0].Dst.IP()
ip2dns := mockIP2DNS{dstIp: "first-destination.example.com"}
ip2dns := mockIP2DNS{dns.ToIPint32(dstIp): "first-destination.example.com"}

coll := newCollector(connTracker, ip2dns)
coll.cfg.SendTrafficDelta = true
Expand All @@ -482,12 +485,10 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {
key1 := entryGroupKey(&initialEntries[0])
r.EqualValues(20, coll.podMetrics[key1].TxBytes)
r.EqualValues(3, coll.podMetrics[key1].TxPackets)
r.Equal("first-destination.example.com", coll.podMetrics[key1].DstDnsName)

key2 := entryGroupKey(&initialEntries[1])
r.EqualValues(10, coll.podMetrics[key2].RxBytes)
r.EqualValues(2, coll.podMetrics[key2].RxPackets)
r.Equal("", coll.podMetrics[key2].DstDnsName)

initialEntries[0].TxBytes += 10
initialEntries[0].TxPackets += 2
Expand All @@ -511,6 +512,10 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {
r.NoError(err)
r.Len(batch.Items, 2)

r.Len(batch.Ip2Domain, 1)
r.Equal("first-destination.example.com", batch.Ip2Domain[0].Domain)
r.Equal(dns.ToIPint32(dstIp), batch.Ip2Domain[0].Ip)

// Check values are the same as on the last collecting action
r.EqualValues(30, batch.Items[0].TxBytes)
r.EqualValues(5, batch.Items[0].TxPackets)
Expand Down Expand Up @@ -636,10 +641,14 @@ func (m *mockKubeWatcher) Get(nodeName string) ([]*corev1.Pod, error) {
return res, nil
}

type mockIP2DNS map[netaddr.IP]string
type mockIP2DNS map[int32]string

var _ ipLookup = (mockIP2DNS)(nil)
var _ dnsRecorder = (mockIP2DNS)(nil)

func (m mockIP2DNS) Lookup(ip netaddr.IP) string {
return m[ip]
func (m mockIP2DNS) Records() []*pb.IP2Domain {
items := make([]*pb.IP2Domain, 0, len(m))
for ip, domain := range m {
items = append(items, &pb.IP2Domain{Ip: ip, Domain: domain})
}
return items
}
9 changes: 5 additions & 4 deletions dns/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package dns
import (
"context"

"inet.af/netaddr"
"github.com/castai/egressd/pb"
)

type Noop struct{}

var _ LookuperStarter = (*Noop)(nil)
var _ DNSCollector = (*Noop)(nil)

func (t *Noop) Start(ctx context.Context) error {
<-ctx.Done()
return nil
}
func (t *Noop) Lookup(ip netaddr.IP) string {
return ""

func (d *Noop) Records() []*pb.IP2Domain {
return nil
}
78 changes: 43 additions & 35 deletions dns/resolutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,50 @@ package dns

import (
"context"
"encoding/binary"
"fmt"
"sync"
"time"

cache "github.com/Code-Hex/go-generics-cache"
"github.com/google/gopacket/layers"
"github.com/sirupsen/logrus"
"inet.af/netaddr"

"github.com/castai/egressd/ebpf"
"github.com/castai/egressd/pb"
)

type LookuperStarter interface {
type DNSCollector interface {
Start(ctx context.Context) error
Lookup(ip netaddr.IP) string
Records() []*pb.IP2Domain
}

var _ LookuperStarter = (*IP2DNS)(nil)
var _ DNSCollector = (*IP2DNS)(nil)

type tracer interface {
Run(ctx context.Context) error
Events() <-chan ebpf.DNSEvent
}

var defaultDNSTTL = 5 * time.Minute
var defaultDNSTTL = 2 * time.Minute

type IP2DNS struct {
Tracer tracer
ipToName *cache.Cache[string, string]
log logrus.FieldLogger
ipToName *cache.Cache[int32, string]
cnameToName *cache.Cache[string, string]
mu sync.RWMutex
}

func NewIP2DNS(tracer tracer, log logrus.FieldLogger) *IP2DNS {
return &IP2DNS{
Tracer: tracer,
log: log,
}
}

func (d *IP2DNS) Start(ctx context.Context) error {

d.ipToName = cache.NewContext[string, string](ctx)
d.ipToName = cache.NewContext[int32, string](ctx)
d.cnameToName = cache.NewContext[string, string](ctx)

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -61,37 +70,36 @@ func (d *IP2DNS) Start(ctx context.Context) error {
if !ok {
return nil
}
func() {
d.mu.Lock()
defer d.mu.Unlock()
for _, answer := range ev.Answers {
name := string(answer.Name)
ttl := time.Duration(answer.TTL) * time.Second
if ttl == 0 {
ttl = defaultDNSTTL
}
switch answer.Type { //nolint:exhaustive
case layers.DNSTypeA:
if cname, found := d.cnameToName.Get(name); found {
name = cname
}
ip := answer.IP.To4()
d.ipToName.Set(ip.String(), name, cache.WithExpiration(ttl))
case layers.DNSTypeCNAME:
cname := string(answer.CNAME)
d.cnameToName.Set(cname, name, cache.WithExpiration(ttl))
default:
continue
for _, answer := range ev.Answers {
name := string(answer.Name)
switch answer.Type { //nolint:exhaustive
case layers.DNSTypeA:
if cname, found := d.cnameToName.Get(name); found {
name = cname
}
ip, _ := netaddr.FromStdIP(answer.IP)
d.ipToName.Set(ToIPint32(ip), name, cache.WithExpiration(defaultDNSTTL))
case layers.DNSTypeCNAME:
cname := string(answer.CNAME)
d.cnameToName.Set(cname, name, cache.WithExpiration(defaultDNSTTL))
default:
continue
}
}()
}
}
}
}

func (d *IP2DNS) Lookup(ip netaddr.IP) string {
d.mu.RLock()
defer d.mu.RUnlock()
value, _ := d.ipToName.Get(ip.String())
return value
func (d *IP2DNS) Records() []*pb.IP2Domain {
items := make([]*pb.IP2Domain, 0, len(d.ipToName.Keys()))
for _, ip := range d.ipToName.Keys() {
domain, _ := d.ipToName.Get(ip)
items = append(items, &pb.IP2Domain{Ip: ip, Domain: domain})
}
return items
}

func ToIPint32(ip netaddr.IP) int32 {
b := ip.As4()
return int32(binary.BigEndian.Uint32([]byte{b[0], b[1], b[2], b[3]}))
}
Loading