Skip to content

Commit

Permalink
feat: wrap kafka client with sync once for support singleton (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri authored Aug 13, 2024
1 parent 7491cce commit c27bd17
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions verify_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"context"
"fmt"
"sync"

"github.com/segmentio/kafka-go"
)
Expand All @@ -16,24 +17,39 @@ type client struct {
*kafka.Client
}

var (
kafkaClientInstance *client
kafkaClientOnce sync.Once
)

func newKafkaClient(cfg *ConsumerConfig) (kafkaClient, error) {
kc := client{
Client: &kafka.Client{
Addr: kafka.TCP(cfg.Reader.Brokers...),
},
}
var err error
kafkaClientOnce.Do(func() {
kc := &client{
Client: &kafka.Client{
Addr: kafka.TCP(cfg.Reader.Brokers...),
},
}

transport := &Transport{
Transport: &kafka.Transport{
MetadataTopics: cfg.getTopics(),
},
}
if err := fillLayer(transport, cfg.SASL, cfg.TLS); err != nil {
return nil, fmt.Errorf("error when initializing kafka client for verify topic purpose %w", err)
transport := &Transport{
Transport: &kafka.Transport{
MetadataTopics: cfg.getTopics(),
},
}
if err = fillLayer(transport, cfg.SASL, cfg.TLS); err != nil {
err = fmt.Errorf("error when initializing kafka client for verify topic purpose %w", err)
return
}

kc.Transport = transport
kafkaClientInstance = kc
})

if err != nil {
return nil, err
}

kc.Transport = transport
return &kc, nil
return kafkaClientInstance, nil
}

func (k *client) GetClient() *kafka.Client {
Expand Down

0 comments on commit c27bd17

Please sign in to comment.