Skip to content

Commit

Permalink
Move ip2domain resolving to exporter (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
RomanMelnyk113 authored Aug 16, 2023
1 parent 92636cb commit 1a549ec
Show file tree
Hide file tree
Showing 13 changed files with 387 additions and 191 deletions.
4 changes: 2 additions & 2 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ func run(log logrus.FieldLogger) error {
return err
}

var ip2dns dns.LookuperStarter = &dns.Noop{}
var ip2dns dns.DNSCollector = &dns.Noop{}
if *ebpfDNSTracerEnabled && ebpf.IsKernelBTFAvailable() {
tracer := ebpf.NewTracer(log, ebpf.Config{
QueueSize: *ebpfDNSTracerQueueSize,
})
ip2dns = &dns.IP2DNS{Tracer: tracer}
ip2dns = dns.NewIP2DNS(tracer, log)
}

cfg := collector.Config{
Expand Down
2 changes: 1 addition & 1 deletion cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func run(log logrus.FieldLogger) error {
return errors.New("not sinks configured")
}

ex := exporter.New(log, cfg, kw, clientset, sinksList)
ex := exporter.New(ctx, log, cfg, kw, clientset, sinksList)
go func() {
if err := ex.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
log.Errorf("exporter start: %v", err)
Expand Down
25 changes: 7 additions & 18 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
corev1 "k8s.io/api/core/v1"

"github.com/castai/egressd/conntrack"
"github.com/castai/egressd/dns"
"github.com/castai/egressd/metrics"
"github.com/castai/egressd/pb"
)
Expand Down Expand Up @@ -52,14 +53,14 @@ type rawNetworkMetric struct {
lifetime time.Time
}

type ipLookup interface{ Lookup(netaddr.IP) string }
type dnsRecorder interface{ Records() []*pb.IP2Domain }

func New(
cfg Config,
log logrus.FieldLogger,
podsWatcher podsWatcher,
conntracker conntrack.Client,
ip2dns ipLookup,
ip2dns dnsRecorder,
currentTimeGetter func() time.Time,
) *Collector {
excludeNsMap := map[string]struct{}{}
Expand Down Expand Up @@ -95,7 +96,7 @@ type Collector struct {
log logrus.FieldLogger
podsWatcher podsWatcher
conntracker conntrack.Client
ip2dns ipLookup
ip2dns dnsRecorder
entriesCache map[uint64]*conntrack.Entry
podMetrics map[uint64]*rawNetworkMetric
excludeNsMap map[string]struct{}
Expand Down Expand Up @@ -131,7 +132,7 @@ func (c *Collector) GetRawNetworkMetricsHandler(w http.ResponseWriter, req *http
items = append(items, m.RawNetworkMetric)
}

batch := &pb.RawNetworkMetricBatch{Items: items}
batch := &pb.RawNetworkMetricBatch{Items: items, Ip2Domain: c.ip2dns.Records()}
batchBytes, err := proto.Marshal(batch)
if err != nil {
c.log.Errorf("marshal batch: %v", err)
Expand Down Expand Up @@ -190,12 +191,7 @@ func (c *Collector) collect() error {
c.entriesCache[connKey] = conn

groupKey := entryGroupKey(conn)
srcName := c.ip2dns.Lookup(conn.Src.IP())
dstName := c.ip2dns.Lookup(conn.Dst.IP())
if pm, found := c.podMetrics[groupKey]; found {
pm.SrcDnsName = srcName
pm.DstDnsName = dstName

pm.TxBytes += int64(txBytes)
pm.TxPackets += int64(txPackets)
pm.RxBytes += int64(rxBytes)
Expand All @@ -206,10 +202,8 @@ func (c *Collector) collect() error {
} else {
c.podMetrics[groupKey] = &rawNetworkMetric{
RawNetworkMetric: &pb.RawNetworkMetric{
SrcDnsName: srcName,
DstDnsName: dstName,
SrcIp: toIPint32(conn.Src.IP()),
DstIp: toIPint32(conn.Dst.IP()),
SrcIp: dns.ToIPint32(conn.Src.IP()),
DstIp: dns.ToIPint32(conn.Dst.IP()),
TxBytes: int64(conn.TxBytes),
TxPackets: int64(conn.TxPackets),
RxBytes: int64(conn.RxBytes),
Expand Down Expand Up @@ -317,8 +311,3 @@ func conntrackEntryKey(conn *conntrack.Entry) uint64 {
conntrackEntryHash.Reset()
return res
}

func toIPint32(ip netaddr.IP) int32 {
b := ip.As4()
return int32(binary.BigEndian.Uint32([]byte{b[0], b[1], b[2], b[3]}))
}
31 changes: 20 additions & 11 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/castai/egressd/conntrack"
"github.com/castai/egressd/dns"
"github.com/castai/egressd/pb"
)

Expand Down Expand Up @@ -358,7 +359,7 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {
},
}

newCollector := func(connTracker conntrack.Client, ip2dns ipLookup) *Collector {
newCollector := func(connTracker conntrack.Client, ip2dns dnsRecorder) *Collector {
return New(Config{
ReadInterval: time.Millisecond,
CleanupInterval: 3 * time.Millisecond,
Expand Down Expand Up @@ -389,7 +390,7 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {

connTracker := &mockConntrack{entries: initialEntries}
dstIp := initialEntries[0].Dst.IP()
ip2dns := mockIP2DNS{dstIp: "first-destination.example.com"}
ip2dns := mockIP2DNS{dns.ToIPint32(dstIp): "first-destination.example.com"}

coll := newCollector(connTracker, ip2dns)
coll.cfg.SendTrafficDelta = false
Expand All @@ -400,12 +401,10 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {
key1 := entryGroupKey(&initialEntries[0])
r.EqualValues(20, coll.podMetrics[key1].TxBytes)
r.EqualValues(3, coll.podMetrics[key1].TxPackets)
r.Equal("first-destination.example.com", coll.podMetrics[key1].DstDnsName)

key2 := entryGroupKey(&initialEntries[1])
r.EqualValues(10, coll.podMetrics[key2].RxBytes)
r.EqualValues(2, coll.podMetrics[key2].RxPackets)
r.Equal("", coll.podMetrics[key2].DstDnsName)

initialEntries[0].TxBytes += 10
initialEntries[0].TxPackets += 2
Expand All @@ -429,6 +428,10 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {
r.NoError(err)
r.Len(batch.Items, 2)

r.Len(batch.Ip2Domain, 1)
r.Equal("first-destination.example.com", batch.Ip2Domain[0].Domain)
r.Equal(dns.ToIPint32(dstIp), batch.Ip2Domain[0].Ip)

// Check values are the same as on the last collecting action
r.EqualValues(30, batch.Items[0].TxBytes)
r.EqualValues(5, batch.Items[0].TxPackets)
Expand Down Expand Up @@ -471,7 +474,7 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {

connTracker := &mockConntrack{entries: initialEntries}
dstIp := initialEntries[0].Dst.IP()
ip2dns := mockIP2DNS{dstIp: "first-destination.example.com"}
ip2dns := mockIP2DNS{dns.ToIPint32(dstIp): "first-destination.example.com"}

coll := newCollector(connTracker, ip2dns)
coll.cfg.SendTrafficDelta = true
Expand All @@ -482,12 +485,10 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {
key1 := entryGroupKey(&initialEntries[0])
r.EqualValues(20, coll.podMetrics[key1].TxBytes)
r.EqualValues(3, coll.podMetrics[key1].TxPackets)
r.Equal("first-destination.example.com", coll.podMetrics[key1].DstDnsName)

key2 := entryGroupKey(&initialEntries[1])
r.EqualValues(10, coll.podMetrics[key2].RxBytes)
r.EqualValues(2, coll.podMetrics[key2].RxPackets)
r.Equal("", coll.podMetrics[key2].DstDnsName)

initialEntries[0].TxBytes += 10
initialEntries[0].TxPackets += 2
Expand All @@ -511,6 +512,10 @@ func TestCollector__GetRawNetworkMetricsHandler(t *testing.T) {
r.NoError(err)
r.Len(batch.Items, 2)

r.Len(batch.Ip2Domain, 1)
r.Equal("first-destination.example.com", batch.Ip2Domain[0].Domain)
r.Equal(dns.ToIPint32(dstIp), batch.Ip2Domain[0].Ip)

// Check values are the same as on the last collecting action
r.EqualValues(30, batch.Items[0].TxBytes)
r.EqualValues(5, batch.Items[0].TxPackets)
Expand Down Expand Up @@ -636,10 +641,14 @@ func (m *mockKubeWatcher) Get(nodeName string) ([]*corev1.Pod, error) {
return res, nil
}

type mockIP2DNS map[netaddr.IP]string
type mockIP2DNS map[int32]string

var _ ipLookup = (mockIP2DNS)(nil)
var _ dnsRecorder = (mockIP2DNS)(nil)

func (m mockIP2DNS) Lookup(ip netaddr.IP) string {
return m[ip]
func (m mockIP2DNS) Records() []*pb.IP2Domain {
items := make([]*pb.IP2Domain, 0, len(m))
for ip, domain := range m {
items = append(items, &pb.IP2Domain{Ip: ip, Domain: domain})
}
return items
}
9 changes: 5 additions & 4 deletions dns/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package dns
import (
"context"

"inet.af/netaddr"
"github.com/castai/egressd/pb"
)

type Noop struct{}

var _ LookuperStarter = (*Noop)(nil)
var _ DNSCollector = (*Noop)(nil)

func (t *Noop) Start(ctx context.Context) error {
<-ctx.Done()
return nil
}
func (t *Noop) Lookup(ip netaddr.IP) string {
return ""

func (d *Noop) Records() []*pb.IP2Domain {
return nil
}
78 changes: 43 additions & 35 deletions dns/resolutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,50 @@ package dns

import (
"context"
"encoding/binary"
"fmt"
"sync"
"time"

cache "github.com/Code-Hex/go-generics-cache"
"github.com/google/gopacket/layers"
"github.com/sirupsen/logrus"
"inet.af/netaddr"

"github.com/castai/egressd/ebpf"
"github.com/castai/egressd/pb"
)

type LookuperStarter interface {
type DNSCollector interface {
Start(ctx context.Context) error
Lookup(ip netaddr.IP) string
Records() []*pb.IP2Domain
}

var _ LookuperStarter = (*IP2DNS)(nil)
var _ DNSCollector = (*IP2DNS)(nil)

type tracer interface {
Run(ctx context.Context) error
Events() <-chan ebpf.DNSEvent
}

var defaultDNSTTL = 5 * time.Minute
var defaultDNSTTL = 2 * time.Minute

type IP2DNS struct {
Tracer tracer
ipToName *cache.Cache[string, string]
log logrus.FieldLogger
ipToName *cache.Cache[int32, string]
cnameToName *cache.Cache[string, string]
mu sync.RWMutex
}

func NewIP2DNS(tracer tracer, log logrus.FieldLogger) *IP2DNS {
return &IP2DNS{
Tracer: tracer,
log: log,
}
}

func (d *IP2DNS) Start(ctx context.Context) error {

d.ipToName = cache.NewContext[string, string](ctx)
d.ipToName = cache.NewContext[int32, string](ctx)
d.cnameToName = cache.NewContext[string, string](ctx)

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -61,37 +70,36 @@ func (d *IP2DNS) Start(ctx context.Context) error {
if !ok {
return nil
}
func() {
d.mu.Lock()
defer d.mu.Unlock()
for _, answer := range ev.Answers {
name := string(answer.Name)
ttl := time.Duration(answer.TTL) * time.Second
if ttl == 0 {
ttl = defaultDNSTTL
}
switch answer.Type { //nolint:exhaustive
case layers.DNSTypeA:
if cname, found := d.cnameToName.Get(name); found {
name = cname
}
ip := answer.IP.To4()
d.ipToName.Set(ip.String(), name, cache.WithExpiration(ttl))
case layers.DNSTypeCNAME:
cname := string(answer.CNAME)
d.cnameToName.Set(cname, name, cache.WithExpiration(ttl))
default:
continue
for _, answer := range ev.Answers {
name := string(answer.Name)
switch answer.Type { //nolint:exhaustive
case layers.DNSTypeA:
if cname, found := d.cnameToName.Get(name); found {
name = cname
}
ip, _ := netaddr.FromStdIP(answer.IP)
d.ipToName.Set(ToIPint32(ip), name, cache.WithExpiration(defaultDNSTTL))
case layers.DNSTypeCNAME:
cname := string(answer.CNAME)
d.cnameToName.Set(cname, name, cache.WithExpiration(defaultDNSTTL))
default:
continue
}
}()
}
}
}
}

func (d *IP2DNS) Lookup(ip netaddr.IP) string {
d.mu.RLock()
defer d.mu.RUnlock()
value, _ := d.ipToName.Get(ip.String())
return value
func (d *IP2DNS) Records() []*pb.IP2Domain {
items := make([]*pb.IP2Domain, 0, len(d.ipToName.Keys()))
for _, ip := range d.ipToName.Keys() {
domain, _ := d.ipToName.Get(ip)
items = append(items, &pb.IP2Domain{Ip: ip, Domain: domain})
}
return items
}

func ToIPint32(ip netaddr.IP) int32 {
b := ip.As4()
return int32(binary.BigEndian.Uint32([]byte{b[0], b[1], b[2], b[3]}))
}
Loading

0 comments on commit 1a549ec

Please sign in to comment.