Skip to content

Commit

Permalink
extend exporter with dns
Browse files Browse the repository at this point in the history
  • Loading branch information
RomanMelnyk113 committed Aug 11, 2023
1 parent 0266fdb commit 90793ce
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 108 deletions.
3 changes: 1 addition & 2 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func run(log logrus.FieldLogger) error {
return err
}

var ip2dns dns.LookuperStarter = &dns.Noop{}
var ip2dns dns.DNSCollector = &dns.Noop{}
if *ebpfDNSTracerEnabled && ebpf.IsKernelBTFAvailable() {
tracer := ebpf.NewTracer(log, ebpf.Config{
QueueSize: *ebpfDNSTracerQueueSize,
Expand Down Expand Up @@ -137,7 +137,6 @@ func run(log logrus.FieldLogger) error {
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/healthz", healthHandler)
mux.HandleFunc("/api/v1/raw-network-metrics", coll.GetRawNetworkMetricsHandler)
mux.HandleFunc("/api/v1/ip2dns", ip2dns.GetIp2DnsHandler)

srv := &http.Server{
Addr: fmt.Sprintf(":%d", *httpListenPort),
Expand Down
2 changes: 1 addition & 1 deletion cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func run(log logrus.FieldLogger) error {
return errors.New("not sinks configured")
}

ex := exporter.New(log, cfg, kw, clientset, sinksList)
ex := exporter.New(ctx, log, cfg, kw, clientset, sinksList)
go func() {
if err := ex.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
log.Errorf("exporter start: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ type rawNetworkMetric struct {
lifetime time.Time
}

type ipLookup interface{ Lookup(netaddr.IP) string }
type dnsRecorder interface{ Records() []*pb.IP2Domain }

func New(
cfg Config,
log logrus.FieldLogger,
podsWatcher podsWatcher,
conntracker conntrack.Client,
ip2dns ipLookup,
ip2dns dnsRecorder,
currentTimeGetter func() time.Time,
) *Collector {
excludeNsMap := map[string]struct{}{}
Expand Down Expand Up @@ -95,7 +95,7 @@ type Collector struct {
log logrus.FieldLogger
podsWatcher podsWatcher
conntracker conntrack.Client
ip2dns ipLookup
ip2dns dnsRecorder
entriesCache map[uint64]*conntrack.Entry
podMetrics map[uint64]*rawNetworkMetric
excludeNsMap map[string]struct{}
Expand Down Expand Up @@ -131,7 +131,7 @@ func (c *Collector) GetRawNetworkMetricsHandler(w http.ResponseWriter, req *http
items = append(items, m.RawNetworkMetric)
}

batch := &pb.RawNetworkMetricBatch{Items: items}
batch := &pb.RawNetworkMetricBatch{Items: items, Ip2Domain: c.ip2dns.Records()}
batchBytes, err := proto.Marshal(batch)
if err != nil {
c.log.Errorf("marshal batch: %v", err)
Expand Down
16 changes: 4 additions & 12 deletions dns/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,19 @@ package dns

import (
"context"
"net/http"

"inet.af/netaddr"
"github.com/castai/egressd/pb"
)

type Noop struct{}

var _ LookuperStarter = (*Noop)(nil)
var _ DNSCollector = (*Noop)(nil)

func (t *Noop) Start(ctx context.Context) error {
<-ctx.Done()
return nil
}
func (t *Noop) Lookup(ip netaddr.IP) string {
return ""
}

func (d *Noop) GetIp2DnsHandler(w http.ResponseWriter, req *http.Request) {
var batchBytes []byte
w.WriteHeader(http.StatusNoContent)
if _, err := w.Write(batchBytes); err != nil {
return
}
func (d *Noop) Records() []*pb.IP2Domain {
return nil
}
48 changes: 11 additions & 37 deletions dns/resolutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,29 @@ package dns
import (
"context"
"fmt"
"net/http"
"time"

cache "github.com/Code-Hex/go-generics-cache"
"github.com/google/gopacket/layers"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"inet.af/netaddr"

"github.com/castai/egressd/ebpf"
"github.com/castai/egressd/pb"
)

type LookuperStarter interface {
type DNSCollector interface {
Start(ctx context.Context) error
Lookup(ip netaddr.IP) string
GetIp2DnsHandler(http.ResponseWriter, *http.Request)
Records() []*pb.IP2Domain
}

var _ LookuperStarter = (*IP2DNS)(nil)
var _ DNSCollector = (*IP2DNS)(nil)

type tracer interface {
Run(ctx context.Context) error
Events() <-chan ebpf.DNSEvent
}

var defaultDNSTTL = 5 * time.Minute

type IP2Domain struct {
IP string
Domain string
}
var defaultDNSTTL = 2 * time.Minute

type IP2DNS struct {
Tracer tracer
Expand All @@ -50,27 +41,6 @@ func NewIP2DNS(tracer tracer, log logrus.FieldLogger) *IP2DNS {
}
}

func (d *IP2DNS) GetIp2DnsHandler(w http.ResponseWriter, req *http.Request) {
items := make([]*pb.IP2Domain, 0, len(d.ipToName.Keys()))
for _, ip := range d.ipToName.Keys() {
domain, _ := d.ipToName.Get(ip)
items = append(items, &pb.IP2Domain{Ip: ip, Domain: domain})
}
batch := &pb.IP2DomainBatch{Items: items}
batchBytes, err := proto.Marshal(batch)
if err != nil {
d.log.Errorf("marshal batch: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if _, err := w.Write(batchBytes); err != nil {
d.log.Errorf("write batch: %v", err)
return
}
d.ipToName.DeleteExpired()
d.cnameToName.DeleteExpired()
}

func (d *IP2DNS) Start(ctx context.Context) error {

d.ipToName = cache.NewContext[string, string](ctx)
Expand Down Expand Up @@ -118,7 +88,11 @@ func (d *IP2DNS) Start(ctx context.Context) error {
}
}

func (d *IP2DNS) Lookup(ip netaddr.IP) string {
value, _ := d.ipToName.Get(ip.String())
return value
func (d *IP2DNS) Records() []*pb.IP2Domain {
items := make([]*pb.IP2Domain, 0, len(d.ipToName.Keys()))
for _, ip := range d.ipToName.Keys() {
domain, _ := d.ipToName.Get(ip)
items = append(items, &pb.IP2Domain{Ip: ip, Domain: domain})
}
return items
}
8 changes: 8 additions & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
)

func New(
ctx context.Context,
log logrus.FieldLogger,
cfg config.Config,
kubeWatcher kubeWatcher,
Expand All @@ -44,6 +45,7 @@ func New(
kubeClient: kubeClient,
httpClient: newHTTPClient(),
sinks: sinks,
dnsStorage: newDNSStorage(ctx, log),
}
}

Expand All @@ -54,6 +56,7 @@ type Exporter struct {
kubeClient kubernetes.Interface
httpClient *http.Client
sinks []sinks.Sink
dnsStorage *ip2dns
}

func (e *Exporter) Start(ctx context.Context) error {
Expand Down Expand Up @@ -147,6 +150,8 @@ func (e *Exporter) export(ctx context.Context) error {
// Aggregate raw metrics into pod metrics.
var podsMetrics []*pb.PodNetworkMetric
for batch := range pulledBatch {
// TODO: ip2dns cache
e.dnsStorage.Fill(batch.Ip2Domain)
for _, rawMetrics := range batch.Items {
podMetrics, err := e.buildPodNetworkMetric(rawMetrics)
if err != nil {
Expand Down Expand Up @@ -200,6 +205,9 @@ func (e *Exporter) buildPodNetworkMetric(conn *pb.RawNetworkMetric) (*pb.PodNetw
RxPackets: conn.RxPackets,
Proto: conn.Proto,
}
if !dstIP.IsPrivate() {
metric.DstDnsName = e.dnsStorage.Lookup(dstIP)
}

srcNode, err := e.kubeWatcher.GetNodeByName(pod.Spec.NodeName)
if err != nil && !errors.Is(err, kube.ErrNotFound) {
Expand Down
39 changes: 39 additions & 0 deletions exporter/ip2dns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package exporter

import (
"context"
"time"

cache "github.com/Code-Hex/go-generics-cache"
"github.com/castai/egressd/pb"
"github.com/sirupsen/logrus"
"inet.af/netaddr"
)

var defaultDNSTTL = 1 * time.Hour

type ip2dns struct {
log logrus.FieldLogger
ipToName *cache.Cache[string, string]
}

func newDNSStorage(ctx context.Context, log logrus.FieldLogger) *ip2dns {
return &ip2dns{
log: log,
ipToName: cache.NewContext[string, string](ctx),
}
}

func (d *ip2dns) Fill(items []*pb.IP2Domain) {
for i := range items {
d.ipToName.Set(items[i].Ip, items[i].Domain, cache.WithExpiration(defaultDNSTTL))
}
}

func (d *ip2dns) Lookup(ip netaddr.IP) string {
value, ok := d.ipToName.Get(ip.String())
if !ok {
d.log.Debugf("doman not found for IP %q", ip.String())
}
return value
}
Loading

0 comments on commit 90793ce

Please sign in to comment.