From c68efc4169ff92a51d286cae4f259b8b33eb9232 Mon Sep 17 00:00:00 2001 From: Eray Arslan Date: Sat, 20 Jul 2024 13:27:18 +0300 Subject: [PATCH] feat: better size calc of message --- kafka/producer/producer_batch.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/kafka/producer/producer_batch.go b/kafka/producer/producer_batch.go index 67b0606..aa48a05 100644 --- a/kafka/producer/producer_batch.go +++ b/kafka/producer/producer_batch.go @@ -2,7 +2,6 @@ package producer import ( "context" - "encoding/binary" "errors" "fmt" "io" @@ -93,7 +92,7 @@ func (b *Batch) AddMessages(ctx *models.ListenerContext, messages []kafka.Messag return } b.messages = append(b.messages, messages...) - b.currentMessageBytes += int64(binary.Size(messages)) + b.currentMessageBytes += totalSizeOfMessages(messages) if isLastChunk { ctx.Ack() } @@ -214,3 +213,16 @@ func convertKafkaMessage(src kafka.Message) *message.KafkaMessage { Value: src.Value, } } + +func totalSizeOfMessages(messages []kafka.Message) int64 { + var size int + for _, m := range messages { + headerSize := 0 + for _, header := range m.Headers { + headerSize += 2 + len(header.Key) + headerSize += len(header.Value) + } + size += 14 + (4 + len(m.Key)) + (4 + len(m.Value)) + headerSize + } + return int64(size) +}