Skip to content

Commit

Permalink
feat: support tls for kafka connection
Browse files Browse the repository at this point in the history
  • Loading branch information
aldy505 committed Nov 17, 2024
1 parent f2ccadb commit edb9f7e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 14 deletions.
3 changes: 3 additions & 0 deletions cmd/vroom/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type (
KafkaSaslMechanism string `env:"SENTRY_KAFKA_SASL_MECHANISM"`
KafkaSaslUsername string `env:"SENTRY_KAFKA_SASL_USERNAME"`
KafkaSaslPassword string `env:"SENTRY_KAFKA_SASL_PASSWORD"`
KafkaSslCaPath string `env:"SENTRY_KAFKA_SSL_CA_PATH"`
KafkaSslCertPath string `env:"SENTRY_KAFKA_SSL_CERT_PATH"`
KafkaSslKeyPath string `env:"SENTRY_KAFKA_SSL_KEY_PATH"`

OccurrencesKafkaBrokers []string `env:"SENTRY_KAFKA_BROKERS_OCCURRENCES" env-default:"localhost:9092"`
ProfilingKafkaBrokers []string `env:"SENTRY_KAFKA_BROKERS_PROFILING" env-default:"localhost:9092"`
Expand Down
62 changes: 48 additions & 14 deletions cmd/vroom/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"log"
"math"
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/google/uuid"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"

Expand Down Expand Up @@ -125,35 +129,65 @@ func extractMetricsFromSampleChunkFunctions(c *chunk.SampleChunk, functions []no
}

func createKafkaRoundTripper(e ServiceConfig) kafka.RoundTripper {
var saslMechanism sasl.Mechanism = nil

Check failure on line 132 in cmd/vroom/utils.go

View workflow job for this annotation

GitHub Actions / lint

var-declaration: should drop = nil from declaration of var saslMechanism; it is the zero value (revive)
var tlsConfig *tls.Config = nil

Check failure on line 133 in cmd/vroom/utils.go

View workflow job for this annotation

GitHub Actions / lint

var-declaration: should drop = nil from declaration of var tlsConfig; it is the zero value (revive)

switch strings.ToUpper(e.KafkaSaslMechanism) {
case "PLAIN":
return &kafka.Transport{
SASL: plain.Mechanism{
Username: e.KafkaSaslUsername,
Password: e.KafkaSaslPassword,
},
saslMechanism = plain.Mechanism{
Username: e.KafkaSaslUsername,
Password: e.KafkaSaslPassword,
}
case "SCRAM-SHA-256":
mechanism, err := scram.Mechanism(scram.SHA256, e.KafkaSaslUsername, e.KafkaSaslPassword)
if err != nil {
log.Fatal("error creating scram mechanism", err)
log.Fatal("unable to create scram-sha-256 mechanism", err)
return nil
}

return &kafka.Transport{
SASL: mechanism,
}
saslMechanism = mechanism
case "SCRAM-SHA-512":
mechanism, err := scram.Mechanism(scram.SHA512, e.KafkaSaslUsername, e.KafkaSaslPassword)
if err != nil {
log.Fatal("error creating scram mechanism", err)
log.Fatal("unable to create scram-sha-512 mechanism", err)
return nil
}

return &kafka.Transport{
SASL: mechanism,
saslMechanism = mechanism
}

if e.KafkaSslCertPath != "" && e.KafkaSslKeyPath != "" {
certs, err := tls.LoadX509KeyPair(e.KafkaSslCertPath, e.KafkaSslKeyPath)
if err != nil {
log.Fatal("unable to load certificate key pair", err)
return nil
}
default:
return nil

caCertificatePool, err := x509.SystemCertPool()
if err != nil {
caCertificatePool = x509.NewCertPool()
}
if e.KafkaSslCaPath != "" {
caFile, err := os.ReadFile(e.KafkaSslCaPath)
if err != nil {
log.Fatal("unable to read ca file", err)
return nil
}

if ok := caCertificatePool.AppendCertsFromPEM(caFile); !ok {
log.Fatal("unable to append ca certificate to pool")
return nil
}
}

tlsConfig = &tls.Config{
RootCAs: caCertificatePool,
Certificates: []tls.Certificate{certs},
}
}

return &kafka.Transport{
SASL: saslMechanism,
TLS: tlsConfig,
}
}

0 comments on commit edb9f7e

Please sign in to comment.