Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: authentication support for kafka connection #530

Merged
merged 10 commits into from
Dec 23, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- Add more metadata fields to Chunk Kafka message. ([#518](https://github.com/getsentry/vroom/pull/518))
- Ingest Android profile chunks. ([#521](https://github.com/getsentry/vroom/pull/521))
- Handle profile chunks in regressed endpoint. ([#527](https://github.com/getsentry/vroom/pull/527))
- Authentication support for Kafka connection. ([#530](https://github.com/getsentry/vroom/pull/530))
- Add support for android chunks. ([#540](https://github.com/getsentry/vroom/pull/540))

**Bug Fixes**:
Expand Down
7 changes: 7 additions & 0 deletions cmd/vroom/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ type (

SentryDSN string `env:"SENTRY_DSN"`

KafkaSaslMechanism string `env:"SENTRY_KAFKA_SASL_MECHANISM"`
KafkaSaslUsername string `env:"SENTRY_KAFKA_SASL_USERNAME"`
KafkaSaslPassword string `env:"SENTRY_KAFKA_SASL_PASSWORD"`
KafkaSslCaPath string `env:"SENTRY_KAFKA_SSL_CA_PATH"`
KafkaSslCertPath string `env:"SENTRY_KAFKA_SSL_CERT_PATH"`
KafkaSslKeyPath string `env:"SENTRY_KAFKA_SSL_KEY_PATH"`

OccurrencesKafkaBrokers []string `env:"SENTRY_KAFKA_BROKERS_OCCURRENCES" env-default:"localhost:9092"`
ProfilingKafkaBrokers []string `env:"SENTRY_KAFKA_BROKERS_PROFILING" env-default:"localhost:9092"`
SpansKafkaBrokers []string `env:"SENTRY_KAFKA_BROKERS_SPANS" env-default:"localhost:9092"`
Expand Down
2 changes: 2 additions & 0 deletions cmd/vroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func newEnvironment() (*environment, error) {
ReadTimeout: 3 * time.Second,
Topic: e.config.OccurrencesKafkaTopic,
WriteTimeout: 3 * time.Second,
Transport: createKafkaRoundTripper(e.config),
}
e.profilingWriter = &kafka.Writer{
Addr: kafka.TCP(e.config.ProfilingKafkaBrokers...),
Expand All @@ -79,6 +80,7 @@ func newEnvironment() (*environment, error) {
Compression: kafka.Lz4,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
Transport: createKafkaRoundTripper(e.config),
}
return &e, nil
}
Expand Down
83 changes: 83 additions & 0 deletions cmd/vroom/utils.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,91 @@
package main

import (
"crypto/tls"
"crypto/x509"
"log"
"net"
"os"
"strings"
"time"

"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
)

type MetricSummary struct {
Min float64
Max float64
Sum float64
Count uint64
}

func createKafkaRoundTripper(e ServiceConfig) kafka.RoundTripper {
var saslMechanism sasl.Mechanism
var tlsConfig *tls.Config

switch strings.ToUpper(e.KafkaSaslMechanism) {
case "PLAIN":
saslMechanism = plain.Mechanism{
Username: e.KafkaSaslUsername,
Password: e.KafkaSaslPassword,
}
case "SCRAM-SHA-256":
mechanism, err := scram.Mechanism(scram.SHA256, e.KafkaSaslUsername, e.KafkaSaslPassword)
if err != nil {
log.Fatal("unable to create scram-sha-256 mechanism", err)
return nil
}

saslMechanism = mechanism
case "SCRAM-SHA-512":
mechanism, err := scram.Mechanism(scram.SHA512, e.KafkaSaslUsername, e.KafkaSaslPassword)
if err != nil {
log.Fatal("unable to create scram-sha-512 mechanism", err)
return nil
}

saslMechanism = mechanism
}

if e.KafkaSslCertPath != "" && e.KafkaSslKeyPath != "" {
certs, err := tls.LoadX509KeyPair(e.KafkaSslCertPath, e.KafkaSslKeyPath)
if err != nil {
log.Fatal("unable to load certificate key pair", err)
return nil
}

caCertificatePool, err := x509.SystemCertPool()
if err != nil {
caCertificatePool = x509.NewCertPool()
}
if e.KafkaSslCaPath != "" {
caFile, err := os.ReadFile(e.KafkaSslCaPath)
if err != nil {
log.Fatal("unable to read ca file", err)
return nil
}

if ok := caCertificatePool.AppendCertsFromPEM(caFile); !ok {
log.Fatal("unable to append ca certificate to pool")
return nil
}
}

tlsConfig = &tls.Config{
RootCAs: caCertificatePool,
Certificates: []tls.Certificate{certs},
}
}

return &kafka.Transport{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to set a Dialer as well here?

If writer Transport is nil DefaultTransport is used, but in this case it won't as we're explicitly setting it with createKafkaRoundTripper.

I think here we should first create a Transport like here and then set the SASL and TLS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

SASL: saslMechanism,
TLS: tlsConfig,
Dial: (&net.Dialer{
Timeout: 3 * time.Second,
DualStack: true,
}).DialContext,
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ require (
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/xattr v0.4.9 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/xdg/scram v1.0.5 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.26.0 // indirect
Expand Down
Loading