From 3de76f32aa3435be48810e0f239823de9c0fb8c8 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 16 Nov 2020 11:29:02 -0500 Subject: [PATCH] use SyncProducer to guarantee at least once delivery on the producer side (#181) (#1668) --- kafka/channel/pkg/dispatcher/dispatcher.go | 37 +++++++------------ .../channel/pkg/dispatcher/dispatcher_test.go | 18 --------- 2 files changed, 14 insertions(+), 41 deletions(-) diff --git a/kafka/channel/pkg/dispatcher/dispatcher.go b/kafka/channel/pkg/dispatcher/dispatcher.go index 0e81285628..e4583e0f41 100644 --- a/kafka/channel/pkg/dispatcher/dispatcher.go +++ b/kafka/channel/pkg/dispatcher/dispatcher.go @@ -46,7 +46,7 @@ type KafkaDispatcher struct { receiver *eventingchannels.MessageReceiver dispatcher *eventingchannels.MessageDispatcherImpl - kafkaAsyncProducer sarama.AsyncProducer + kafkaSyncProducer sarama.SyncProducer channelSubscriptions map[eventingchannels.ChannelReference][]types.UID subsConsumerGroups map[types.UID]sarama.ConsumerGroup subscriptions map[types.UID]Subscription @@ -86,9 +86,10 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat conf := sarama.NewConfig() conf.Version = sarama.V2_0_0_0 conf.ClientID = args.ClientID - conf.Consumer.Return.Errors = true // Returns the errors in ConsumerGroup#Errors() https://godoc.org/github.com/Shopify/sarama#ConsumerGroup + conf.Consumer.Return.Errors = true // Returns the errors in ConsumerGroup#Errors() https://godoc.org/github.com/Shopify/sarama#ConsumerGroup + conf.Producer.Return.Successes = true // Must be enabled for sync producer - producer, err := sarama.NewAsyncProducer(args.Brokers, conf) + producer, err := sarama.NewSyncProducer(args.Brokers, conf) if err != nil { return nil, fmt.Errorf("unable to create kafka producer against Kafka bootstrap servers %v : %v", args.Brokers, err) } @@ -99,7 +100,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat channelSubscriptions: make(map[eventingchannels.ChannelReference][]types.UID), subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), subscriptions: make(map[types.UID]Subscription), - kafkaAsyncProducer: producer, + kafkaSyncProducer: producer, logger: args.Logger, topicFunc: args.TopicFunc, } @@ -117,8 +118,15 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat kafkaProducerMessage.Headers = append(kafkaProducerMessage.Headers, serializeTrace(trace.FromContext(ctx).SpanContext())...) - dispatcher.kafkaAsyncProducer.Input() <- &kafkaProducerMessage - return nil + partition, offset, err := dispatcher.kafkaSyncProducer.SendMessage(&kafkaProducerMessage) + + if err == nil { + dispatcher.logger.Debugw("message sent", zap.Int32("partition", partition), zap.Int64("offset", offset)) + } else { + dispatcher.logger.Warnw("message not sent", zap.Error(err)) + } + + return err }, args.Logger.Desugar(), eventingchannels.ResolveMessageChannelFromHostHeader(dispatcher.getChannelReferenceFromHost)) @@ -299,23 +307,6 @@ func (d *KafkaDispatcher) Start(ctx context.Context) error { return fmt.Errorf("message receiver is not set") } - if d.kafkaAsyncProducer == nil { - return fmt.Errorf("kafkaAsyncProducer is not set") - } - - go func() { - for { - select { - case e := <-d.kafkaAsyncProducer.Errors(): - d.logger.Warn("Got", zap.Error(e)) - case s := <-d.kafkaAsyncProducer.Successes(): - d.logger.Info("Sent", zap.Any("success", s)) - case <-ctx.Done(): - return - } - } - }() - return d.receiver.Start(ctx) } diff --git a/kafka/channel/pkg/dispatcher/dispatcher_test.go b/kafka/channel/pkg/dispatcher/dispatcher_test.go index 2adc8ef631..8957617bcd 100644 --- a/kafka/channel/pkg/dispatcher/dispatcher_test.go +++ b/kafka/channel/pkg/dispatcher/dispatcher_test.go @@ -19,14 +19,12 @@ package dispatcher import ( "context" "errors" - "net/http" "net/url" "testing" "knative.dev/eventing/pkg/channel/fanout" "github.com/Shopify/sarama" - "github.com/cloudevents/sdk-go/v2/binding" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "go.uber.org/zap" @@ -425,26 +423,10 @@ func TestUnsubscribeUnknownSub(t *testing.T) { func TestKafkaDispatcher_Start(t *testing.T) { d := &KafkaDispatcher{} - err := d.Start(context.TODO()) if err == nil { t.Errorf("Expected error want %s, got %s", "message receiver is not set", err) } - - receiver, err := eventingchannels.NewMessageReceiver( - func(ctx context.Context, channel eventingchannels.ChannelReference, message binding.Message, _ []binding.Transformer, _ http.Header) error { - return nil - }, - zap.NewNop(), - eventingchannels.ResolveMessageChannelFromHostHeader(d.getChannelReferenceFromHost)) - if err != nil { - t.Fatalf("Error creating new message receiver. Error:%s", err) - } - d.receiver = receiver - err = d.Start(context.TODO()) - if err == nil { - t.Errorf("Expected error want %s, got %s", "kafkaAsyncProducer is not set", err) - } } func TestNewDispatcher(t *testing.T) {