Skip to content

Commit

Permalink
Support secure ssl kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr committed Jan 7, 2023
1 parent 8d445a7 commit 1dc1246
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 8 deletions.
7 changes: 6 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ import (
)

type Kafka struct {
Brokers []string `yaml:"brokers"`
Topic string `yaml:"topic"`
InterCAPath string `yaml:"interCAPath"`
ScramUsername string `yaml:"scramUsername"`
ScramPassword string `yaml:"scramPassword"`
RootCAPath string `yaml:"rootCAPath"`
Brokers []string `yaml:"brokers"`
ProducerBatchSize int `yaml:"producerBatchSize"`
ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
RequiredAcks int `yaml:"requiredAcks"`
SecureConnection bool `yaml:"secureConnection"`
}

type Config struct {
Expand Down
21 changes: 14 additions & 7 deletions example/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,12 @@ metadataBucket: dcp-test-meta
connectTimeout: 10s
dcp:
connectTimeout: 10s
flowControlBuffer: 16
persistencePollingInterval: 100ms
group:
name: groupName
membership:
type: static
memberNumber: 1
totalMembers: 1
api:
port: 8080
metric:
enabled: true
path: /metrics
kafka:
topic: "topicname"
brokers:
Expand All @@ -35,3 +28,17 @@ kafka:
producerBatchSize: 50
producerBatchTickerDuration: 5s
requiredAcks: 1

#SSL configurations
secureConnection: true
#Config support env variable "$HOME/example/..."
rootCAPath: "example/stretch-kafka/rootCA.pem"
interCAPath: "example/stretch-kafka/interCA.pem"
scramUsername: "username"
scramPassword: "password"

checkpoint:
timeout: 100s

logging:
level: debug
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.40.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
github.com/xdg/scram v1.0.5 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect
Expand Down
50 changes: 50 additions & 0 deletions kafka/producer/producer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package kafka

import (
"crypto/tls"
"crypto/x509"
"math"
"os"
"sync"
"time"

"github.com/segmentio/kafka-go/sasl/scram"

"github.com/Trendyol/go-kafka-connect-couchbase/config"
"github.com/Trendyol/go-kafka-connect-couchbase/logger"

Expand Down Expand Up @@ -34,11 +39,56 @@ func NewProducer(config *config.Kafka, logger logger.Logger, errorLogger logger.
Logger: logger,
ErrorLogger: errorLogger,
}
if config.SecureConnection {
transport, err := createSecureKafkaTransport(config.ScramUsername, config.ScramPassword, config.RootCAPath,
config.InterCAPath, errorLogger)
if err != nil {
panic("Secure kafka couldn't connect " + err.Error())
}
writer.Transport = transport
}
return &producer{
producerBatch: newProducerBatch(config.ProducerBatchTickerDuration, writer, config.ProducerBatchSize, logger, errorLogger),
}
}

func createSecureKafkaTransport(
scramUsername,
scramPassword,
rootCAPath,
interCAPath string,
errorLogger logger.Logger,
) (*kafka.Transport, error) {
mechanism, err := scram.Mechanism(scram.SHA512, scramUsername, scramPassword)
if err != nil {
return nil, err
}

caCert, err := os.ReadFile(os.ExpandEnv(rootCAPath))
if err != nil {
errorLogger.Printf("An error occurred while reading ca.pem file! Error: %s", err.Error())
return nil, err
}

intCert, err := os.ReadFile(os.ExpandEnv(interCAPath))
if err != nil {
errorLogger.Printf("An error occurred while reading int.pem file! Error: %s", err.Error())
return nil, err
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
caCertPool.AppendCertsFromPEM(intCert)

return &kafka.Transport{
TLS: &tls.Config{
RootCAs: caCertPool,
MinVersion: tls.VersionTLS12,
},
SASL: mechanism,
}, nil
}

var KafkaMessagePool = sync.Pool{
New: func() any {
return &kafka.Message{}
Expand Down

0 comments on commit 1dc1246

Please sign in to comment.