diff --git a/kafka/message/message.go b/kafka/message/message.go index 4455caa..95aba25 100644 --- a/kafka/message/message.go +++ b/kafka/message/message.go @@ -10,3 +10,13 @@ type KafkaMessage struct { SeqNo uint64 VbID uint16 } + +func (m *KafkaMessage) IsHeaderExist(key string) bool { + for _, header := range m.Headers { + if header.Key == key { + return true + } + } + + return false +} diff --git a/kafka/producer/producer_batch.go b/kafka/producer/producer_batch.go index fbc363a..b394913 100644 --- a/kafka/producer/producer_batch.go +++ b/kafka/producer/producer_batch.go @@ -1,6 +1,7 @@ package producer import ( + "bytes" "context" "errors" "fmt" @@ -136,10 +137,6 @@ func (b *Batch) FlushMessages() { b.metric.BatchProduceLatency = time.Since(startedTime).Milliseconds() - b.messages = b.messages[:0] - b.currentMessageBytes = 0 - b.batchTicker.Reset(b.batchTickerDuration) - if b.sinkResponseHandler != nil { switch e := err.(type) { case nil: @@ -155,6 +152,10 @@ func (b *Batch) FlushMessages() { return } } + + b.messages = b.messages[:0] + b.currentMessageBytes = 0 + b.batchTicker.Reset(b.batchTickerDuration) } b.dcpCheckpointCommit() } @@ -181,6 +182,8 @@ func (b *Batch) handleWriteError(writeErrors kafka.WriteErrors) { Message: convertKafkaMessage(b.messages[i], b.cbEvent), Err: writeErrors[i], }) + + markMessageAsRejected(&b.messages[i]) } else { b.sinkResponseHandler.OnSuccess(&gKafka.SinkResponseHandlerContext{ Message: convertKafkaMessage(b.messages[i], b.cbEvent), @@ -191,11 +194,13 @@ func (b *Batch) handleWriteError(writeErrors kafka.WriteErrors) { } func (b *Batch) handleResponseError(err error) { - for _, msg := range b.messages { + for i, msg := range b.messages { b.sinkResponseHandler.OnError(&gKafka.SinkResponseHandlerContext{ Message: convertKafkaMessage(msg, b.cbEvent), Err: err, }) + + markMessageAsRejected(&b.messages[i]) } } @@ -213,6 +218,19 @@ func (b *Batch) handleMessageTooLargeError(mTooLargeError kafka.MessageTooLargeE Message: convertKafkaMessage(mTooLargeError.Message, b.cbEvent), Err: mTooLargeError, }) + + for i, msg := range b.messages { + if bytes.Equal(msg.Key, mTooLargeError.Message.Key) { + markMessageAsRejected(&b.messages[i]) + } + } +} + +func markMessageAsRejected(message *kafka.Message) { + if !kafkaHeaderExists(message.Headers, gKafka.MessageIsRejectedKey) { + message.Headers = append(message.Headers, kafka.Header{Key: gKafka.MessageIsRejectedKey, Value: []byte("true")}) + println(message) + } } func convertKafkaMessage(src kafka.Message, cbEvent *couchbase.Event) *message.KafkaMessage { @@ -238,3 +256,12 @@ func totalSizeOfMessages(messages []kafka.Message) int64 { } return int64(size) } + +func kafkaHeaderExists(headers []kafka.Header, key string) bool { + for _, header := range headers { + if header.Key == key { + return true + } + } + return false +} diff --git a/kafka/rejection_log_sink_response_handler.go b/kafka/rejection_log_sink_response_handler.go index 786bca4..2519902 100644 --- a/kafka/rejection_log_sink_response_handler.go +++ b/kafka/rejection_log_sink_response_handler.go @@ -5,11 +5,14 @@ import ( "fmt" "github.com/Trendyol/go-dcp-kafka/config" + "github.com/Trendyol/go-dcp-kafka/kafka/message" "github.com/Trendyol/go-dcp/logger" jsoniter "github.com/json-iterator/go" "github.com/segmentio/kafka-go" ) +const MessageIsRejectedKey = "isRejected" + type RejectionLogSinkResponseHandler struct { Config config.Kafka KafkaClient Client @@ -32,6 +35,10 @@ func (r *RejectionLogSinkResponseHandler) OnSuccess(_ *SinkResponseHandlerContex } func (r *RejectionLogSinkResponseHandler) OnError(ctx *SinkResponseHandlerContext) { + if isMessageRejected(ctx.Message) { + return + } + rejectionLog := r.buildRejectionLog(ctx) if err := r.publishToKafka(ctx, rejectionLog); err != nil { logger.Log.Error("failed to publish rejection log, err: %v", err) @@ -82,6 +89,10 @@ func (r *RejectionLogSinkResponseHandler) publishToKafka(ctx *SinkResponseHandle return nil } +func isMessageRejected(kafkaMessage *message.KafkaMessage) bool { + return kafkaMessage.IsHeaderExist(MessageIsRejectedKey) +} + func NewRejectionLogSinkResponseHandler() SinkResponseHandler { return &RejectionLogSinkResponseHandler{} }