Skip to content

Commit

Permalink
Kafka watcher component in network mapper (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
amitlicht authored Mar 22, 2023
1 parent 28f3077 commit 82bd55b
Show file tree
Hide file tree
Showing 46 changed files with 2,114 additions and 289 deletions.
3 changes: 1 addition & 2 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
*_gen.go linguist-generated=true
generated.go linguist-generated=true
go.mod linguist-generated=true
go.sum linguist-generated=true
schema.graphql linguist-generated=true
go.sum linguist-generated=true
4 changes: 3 additions & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
service:
- mapper
- sniffer
- kafka-watcher

steps:
- name: Checkout
Expand Down Expand Up @@ -61,7 +62,7 @@ jobs:
uses: docker/build-push-action@v2
with:
context: src/
file: src/${{ matrix.service }}.Dockerfile
file: build/${{ matrix.service }}.Dockerfile
tags: ${{ env.REGISTRY }}:${{ matrix.service }}-${{ github.sha }}
push: true
network: host
Expand Down Expand Up @@ -100,3 +101,4 @@ jobs:
retag_image_as_latest() { MANIFEST=$(aws ecr batch-get-image --repository-name ${{ env.REPOSITORY_NAME }} --image-ids imageTag="$1-${{ github.sha }}" --query "images[].imageManifest" --output text); if [ -z "$MANIFEST" ]; then echo Manifest not found; exit 1; fi; OUTPUT=$(aws ecr put-image --repository-name ${{ env.REPOSITORY_NAME }} --image-tag "$1-latest" --image-manifest "$MANIFEST" 2>&1 || true); if echo $OUTPUT | grep 'An error occurred' >/dev/null && ! echo $OUTPUT | grep ImageAlreadyExistsException >/dev/null; then echo $OUTPUT; exit 1; fi; }
retag_image_as_latest mapper
retag_image_as_latest sniffer
retag_image_as_latest kafka-watcher
11 changes: 11 additions & 0 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ jobs:
- uses: actions/checkout@v3
- name: Install dependencies
run: sudo apt update && sudo apt install libpcap-dev # required for the linter to be able to lint github.com/google/gopacket
- name: go get
run: go get .
working-directory: src
- name: go generate
run: go generate ./...
working-directory: src
- name: go vet
run: go vet ./...
working-directory: src/
- name: check git diff
run: git diff --exit-code
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
Expand Down
14 changes: 13 additions & 1 deletion .github/workflows/release-tag.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,16 @@ jobs:
network: host
platforms: linux/amd64,linux/arm64
build-args:
SOURCE_IMAGE=${{ env.REGISTRY }}:sniffer-${{ github.sha }}
SOURCE_IMAGE=${{ env.REGISTRY }}:sniffer-${{ github.sha }}

- name: Push to Docker Hub - Kafka Watcher
uses: docker/build-push-action@v2
with:
context: .github/workflows
file: .github/workflows/release.Dockerfile
tags: otterize/network-mapper-kafka-watcher:latest,otterize/network-mapper-kafka-watcher:${{ github.ref_name }}
push: true
network: host
platforms: linux/amd64,linux/arm64
build-args:
SOURCE_IMAGE=${{ env.REGISTRY }}:kafka-watcher-${{ github.sha }}
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ For more platforms, see [the installation guide](https://docs.otterize.com/k8s-i
## How does the network mapper work?
The Otterize network mapper creates a map of in-cluster traffic by capturing DNS traffic and inspecting active connections then resolving the IP addresses participating in connections to their pods, and crawling up the ownership of the pod until it reaches the root object. The network mapper continues building the network map as long as it's deployed.
### Components
- Sniffer: the sniffer is deployed to each node, and is responsible for capturing node-local DNS traffic and inspecting open connections.
- Mapper: the mapper is deployed once, and resolves service names using the Kubernetes API with traffic information reported by the sniffers.
- Sniffer: the sniffer is deployed to each node, and is responsible for capturing node-local DNS traffic and inspecting open connections.
- Kafka watcher (experimental): deployed once to your cluster, and is responsible for capturing kafka server logs and reporting them through the network mapper.

### Service name resolution
Service names are resolved in one of two ways:
Expand Down
26 changes: 26 additions & 0 deletions build/kafka-watcher.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM --platform=linux/amd64 golang:1.19-alpine as buildenv
RUN apk add --no-cache ca-certificates git protoc
RUN apk add build-base libpcap-dev
WORKDIR /src

# restore dependencies
COPY go.mod go.sum ./
RUN go mod download

COPY . .

FROM buildenv as test
RUN go test ./exp/kafka-watcher/...

FROM test as builder
ARG TARGETOS
ARG TARGETARCH
RUN CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o /main ./exp/kafka-watcher/cmd

# Use distroless as minimal base image to package the manager binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM gcr.io/distroless/static:nonroot
COPY --from=builder /main /main
USER 65532:65532

ENTRYPOINT ["/main"]
1 change: 0 additions & 1 deletion src/mapper.Dockerfile → build/mapper.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN go generate ./mapper/...

FROM buildenv as test
# install dependencies for "envtest" package
Expand Down
1 change: 0 additions & 1 deletion src/sniffer.Dockerfile → build/sniffer.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN go generate ./sniffer/...

FROM buildenv as test
RUN go test ./sniffer/... && echo dep > /dep
Expand Down
52 changes: 52 additions & 0 deletions src/exp/kafka-watcher/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"context"
"fmt"
"github.com/otterize/network-mapper/src/exp/kafka-watcher/pkg/config"
"github.com/otterize/network-mapper/src/exp/kafka-watcher/pkg/logwatcher"
"github.com/otterize/network-mapper/src/exp/kafka-watcher/pkg/mapperclient"
"k8s.io/apimachinery/pkg/types"
"strings"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

func parseKafkaServers(serverNames []string) ([]types.NamespacedName, error) {
var servers []types.NamespacedName
for _, serverName := range serverNames {
nameParts := strings.Split(serverName, ".")
if len(nameParts) != 2 {
return nil, fmt.Errorf("error parsing server pod name %s - should be formatted as 'name.namespace'", serverName)
}
servers = append(servers, types.NamespacedName{
Name: nameParts[0],
Namespace: nameParts[1],
})
}
return servers, nil
}

func main() {
if viper.GetBool(config.DebugKey) {
logrus.SetLevel(logrus.DebugLevel)
}

kafkaServers, err := parseKafkaServers(viper.GetStringSlice(config.KafkaServersKey))
if err != nil {
panic(err)
}
mapperClient := mapperclient.NewMapperClient(viper.GetString(config.MapperApiUrlKey))
w, err := logwatcher.NewWatcher(
mapperClient,
kafkaServers,
)
if err != nil {
panic(err)
}

if err := w.RunForever(context.Background()); err != nil {
panic(err)
}
}
35 changes: 35 additions & 0 deletions src/exp/kafka-watcher/pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package config

import (
"github.com/spf13/viper"
"strings"
"time"
)

const (
EnvPrefix = "OTTERIZE"
MapperApiUrlKey = "mapper-api-url"
MapperApiUrlDefault = "http://mapper:9090/query"
ReportIntervalKey = "report-interval"
ReportIntervalDefault = 10 * time.Second
CallsTimeoutKey = "calls-timeout"
CallsTimeoutDefault = 5 * time.Second
CooldownIntervalKey = "cooldown-interval"
CooldownIntervalDefault = 10 * time.Second
DebugKey = "debug"
DebugDefault = false

KafkaServersKey = "kafka-servers"
)

func init() {
viper.SetDefault(MapperApiUrlKey, MapperApiUrlDefault)
viper.SetDefault(ReportIntervalKey, ReportIntervalDefault)
viper.SetDefault(CallsTimeoutKey, CallsTimeoutDefault)
viper.SetDefault(CooldownIntervalKey, CooldownIntervalDefault)
viper.SetDefault(DebugKey, DebugDefault)
viper.SetDefault(KafkaServersKey, []string{})
viper.SetEnvPrefix(EnvPrefix)
viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
viper.AutomaticEnv()
}
158 changes: 158 additions & 0 deletions src/exp/kafka-watcher/pkg/logwatcher/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package logwatcher

import (
"bufio"
"context"
"errors"
"github.com/oriser/regroup"
"github.com/otterize/network-mapper/src/exp/kafka-watcher/pkg/config"
mapperclient2 "github.com/otterize/network-mapper/src/exp/kafka-watcher/pkg/mapperclient"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"math"
"sync"
"time"
)

// AclAuthorizerRegex matches & decodes AclAuthorizer log records.
// Sample log record for reference:
// [2023-03-12 13:51:55,904] INFO Principal = User:2.5.4.45=#13206331373734376636373865323137613636346130653335393130326638303662,CN=myclient.otterize-tutorial-kafka-mtls,O=SPIRE,C=US is Denied Operation = Describe from host = 10.244.0.27 on resource = Topic:LITERAL:mytopic for request = Metadata with resourceRefCount = 1 (kafka.authorizer.logger)
var AclAuthorizerRegex = regroup.MustCompile(
`^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d+\] [A-Z]+ Principal = \S+ is (?P<access>\S+) Operation = (?P<operation>\S+) from host = (?P<host>\S+) on resource = Topic:LITERAL:(?P<topic>.+) for request = \S+ with resourceRefCount = \d+ \(kafka\.authorizer\.logger\)$`,
)

type AuthorizerRecord struct {
Server types.NamespacedName
Access string `regroup:"access"`
Operation string `regroup:"operation"`
Host string `regroup:"host"`
Topic string `regroup:"topic"`
}

type SeenRecordsStore map[AuthorizerRecord]time.Time

type Watcher struct {
clientset *kubernetes.Clientset
mu sync.Mutex
seen SeenRecordsStore
mapperClient mapperclient2.MapperClient
kafkaServers []types.NamespacedName
}

func NewWatcher(mapperClient mapperclient2.MapperClient, kafkaServers []types.NamespacedName) (*Watcher, error) {
conf, err := rest.InClusterConfig()
if err != nil {
return nil, err
}

cs, err := kubernetes.NewForConfig(conf)
if err != nil {
return nil, err
}

w := &Watcher{
clientset: cs,
mu: sync.Mutex{},
seen: SeenRecordsStore{},
mapperClient: mapperClient,
kafkaServers: kafkaServers,
}

return w, nil
}

func (w *Watcher) processLogRecord(kafkaServer types.NamespacedName, record string) {
r := AuthorizerRecord{
Server: kafkaServer,
}
if err := AclAuthorizerRegex.MatchToTarget(record, &r); errors.Is(err, &regroup.NoMatchFoundError{}) {
return
} else if err != nil {
logrus.Errorf("Error matching authorizer regex: %s", err)
return
}

w.mu.Lock()
defer w.mu.Unlock()
w.seen[r] = time.Now()
}

func (w *Watcher) WatchOnce(ctx context.Context, kafkaServer types.NamespacedName) error {
podLogOpts := corev1.PodLogOptions{
Follow: true,
SinceSeconds: lo.ToPtr(int64(math.Ceil(viper.GetDuration(config.CooldownIntervalKey).Seconds()))),
}
req := w.clientset.CoreV1().Pods(kafkaServer.Namespace).GetLogs(kafkaServer.Name, &podLogOpts)
reader, err := req.Stream(ctx)
if err != nil {
return err
}

defer reader.Close()

s := bufio.NewScanner(reader)
s.Split(bufio.ScanLines)
for s.Scan() {
w.processLogRecord(kafkaServer, s.Text())
}

return nil
}

func (w *Watcher) WatchForever(ctx context.Context, kafkaServer types.NamespacedName) {
log := logrus.WithField("pod", kafkaServer)
cooldownPeriod := viper.GetDuration(config.CooldownIntervalKey)
for {
log.Info("Watching logs")
err := w.WatchOnce(ctx, kafkaServer)
if err != nil {
log.WithError(err).Error("Error watching logs")
}
log.Infof("Watcher stopped, will retry after cooldown period (%s)...", cooldownPeriod)
time.Sleep(cooldownPeriod)
}
}

func (w *Watcher) Flush() SeenRecordsStore {
w.mu.Lock()
defer w.mu.Unlock()
r := w.seen
w.seen = SeenRecordsStore{}
return r
}

func (w *Watcher) ReportResults(ctx context.Context) error {
records := w.Flush()
logrus.Infof("Reporting %d records", len(records))

results := lo.MapToSlice(records, func(r AuthorizerRecord, t time.Time) mapperclient2.KafkaMapperResult {
return mapperclient2.KafkaMapperResult{
SrcIp: r.Host,
ServerPodName: r.Server.Name,
ServerNamespace: r.Server.Namespace,
Topic: r.Topic,
Operation: r.Operation,
LastSeen: t,
}
})

return w.mapperClient.ReportKafkaMapperResults(ctx, mapperclient2.KafkaMapperResults{Results: results})
}

func (w *Watcher) RunForever(ctx context.Context) error {
for _, kafkaServer := range w.kafkaServers {
go w.WatchForever(ctx, kafkaServer)
}

for {
time.Sleep(viper.GetDuration(config.ReportIntervalKey))
if err := w.ReportResults(ctx); err != nil {
logrus.WithError(err).Errorf("Failed reporting watcher results to mapper")
}
}
}
28 changes: 28 additions & 0 deletions src/exp/kafka-watcher/pkg/mapperclient/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package mapperclient

import (
"context"
"github.com/Khan/genqlient/graphql"
"net/http"
)

type MapperClient interface {
ReportKafkaMapperResults(ctx context.Context, results KafkaMapperResults) error
}

type mapperClientImpl struct {
mapperAddress string
gqlClient graphql.Client
}

func NewMapperClient(mapperAddress string) MapperClient {
return &mapperClientImpl{
mapperAddress: mapperAddress,
gqlClient: graphql.NewClient(mapperAddress, http.DefaultClient),
}
}

func (c *mapperClientImpl) ReportKafkaMapperResults(ctx context.Context, results KafkaMapperResults) error {
_, err := reportKafkaMapperResults(ctx, c.gqlClient, results)
return err
}
Loading

0 comments on commit 82bd55b

Please sign in to comment.