Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
use SyncProducer to guarantee at least once delivery on the producer …
Browse files Browse the repository at this point in the history
…side (#181) (#1668)
  • Loading branch information
lionelvillard authored Nov 16, 2020
1 parent 1c66947 commit 3de76f3
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 41 deletions.
37 changes: 14 additions & 23 deletions kafka/channel/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type KafkaDispatcher struct {
receiver *eventingchannels.MessageReceiver
dispatcher *eventingchannels.MessageDispatcherImpl

kafkaAsyncProducer sarama.AsyncProducer
kafkaSyncProducer sarama.SyncProducer
channelSubscriptions map[eventingchannels.ChannelReference][]types.UID
subsConsumerGroups map[types.UID]sarama.ConsumerGroup
subscriptions map[types.UID]Subscription
Expand Down Expand Up @@ -86,9 +86,10 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
conf := sarama.NewConfig()
conf.Version = sarama.V2_0_0_0
conf.ClientID = args.ClientID
conf.Consumer.Return.Errors = true // Returns the errors in ConsumerGroup#Errors() https://godoc.org/github.com/Shopify/sarama#ConsumerGroup
conf.Consumer.Return.Errors = true // Returns the errors in ConsumerGroup#Errors() https://godoc.org/github.com/Shopify/sarama#ConsumerGroup
conf.Producer.Return.Successes = true // Must be enabled for sync producer

producer, err := sarama.NewAsyncProducer(args.Brokers, conf)
producer, err := sarama.NewSyncProducer(args.Brokers, conf)
if err != nil {
return nil, fmt.Errorf("unable to create kafka producer against Kafka bootstrap servers %v : %v", args.Brokers, err)
}
Expand All @@ -99,7 +100,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
channelSubscriptions: make(map[eventingchannels.ChannelReference][]types.UID),
subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup),
subscriptions: make(map[types.UID]Subscription),
kafkaAsyncProducer: producer,
kafkaSyncProducer: producer,
logger: args.Logger,
topicFunc: args.TopicFunc,
}
Expand All @@ -117,8 +118,15 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat

kafkaProducerMessage.Headers = append(kafkaProducerMessage.Headers, serializeTrace(trace.FromContext(ctx).SpanContext())...)

dispatcher.kafkaAsyncProducer.Input() <- &kafkaProducerMessage
return nil
partition, offset, err := dispatcher.kafkaSyncProducer.SendMessage(&kafkaProducerMessage)

if err == nil {
dispatcher.logger.Debugw("message sent", zap.Int32("partition", partition), zap.Int64("offset", offset))
} else {
dispatcher.logger.Warnw("message not sent", zap.Error(err))
}

return err
},
args.Logger.Desugar(),
eventingchannels.ResolveMessageChannelFromHostHeader(dispatcher.getChannelReferenceFromHost))
Expand Down Expand Up @@ -299,23 +307,6 @@ func (d *KafkaDispatcher) Start(ctx context.Context) error {
return fmt.Errorf("message receiver is not set")
}

if d.kafkaAsyncProducer == nil {
return fmt.Errorf("kafkaAsyncProducer is not set")
}

go func() {
for {
select {
case e := <-d.kafkaAsyncProducer.Errors():
d.logger.Warn("Got", zap.Error(e))
case s := <-d.kafkaAsyncProducer.Successes():
d.logger.Info("Sent", zap.Any("success", s))
case <-ctx.Done():
return
}
}
}()

return d.receiver.Start(ctx)
}

Expand Down
18 changes: 0 additions & 18 deletions kafka/channel/pkg/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package dispatcher
import (
"context"
"errors"
"net/http"
"net/url"
"testing"

"knative.dev/eventing/pkg/channel/fanout"

"github.com/Shopify/sarama"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.uber.org/zap"
Expand Down Expand Up @@ -425,26 +423,10 @@ func TestUnsubscribeUnknownSub(t *testing.T) {

func TestKafkaDispatcher_Start(t *testing.T) {
d := &KafkaDispatcher{}

err := d.Start(context.TODO())
if err == nil {
t.Errorf("Expected error want %s, got %s", "message receiver is not set", err)
}

receiver, err := eventingchannels.NewMessageReceiver(
func(ctx context.Context, channel eventingchannels.ChannelReference, message binding.Message, _ []binding.Transformer, _ http.Header) error {
return nil
},
zap.NewNop(),
eventingchannels.ResolveMessageChannelFromHostHeader(d.getChannelReferenceFromHost))
if err != nil {
t.Fatalf("Error creating new message receiver. Error:%s", err)
}
d.receiver = receiver
err = d.Start(context.TODO())
if err == nil {
t.Errorf("Expected error want %s, got %s", "kafkaAsyncProducer is not set", err)
}
}

func TestNewDispatcher(t *testing.T) {
Expand Down

0 comments on commit 3de76f3

Please sign in to comment.