Skip to content

Commit

Permalink
feat: better size calc of message
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Jul 20, 2024
1 parent 63ccb4a commit c68efc4
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions kafka/producer/producer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package producer

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -93,7 +92,7 @@ func (b *Batch) AddMessages(ctx *models.ListenerContext, messages []kafka.Messag
return
}
b.messages = append(b.messages, messages...)
b.currentMessageBytes += int64(binary.Size(messages))
b.currentMessageBytes += totalSizeOfMessages(messages)
if isLastChunk {
ctx.Ack()
}
Expand Down Expand Up @@ -214,3 +213,16 @@ func convertKafkaMessage(src kafka.Message) *message.KafkaMessage {
Value: src.Value,
}
}

func totalSizeOfMessages(messages []kafka.Message) int64 {
var size int
for _, m := range messages {
headerSize := 0
for _, header := range m.Headers {
headerSize += 2 + len(header.Key)
headerSize += len(header.Value)
}
size += 14 + (4 + len(m.Key)) + (4 + len(m.Value)) + headerSize
}
return int64(size)
}

0 comments on commit c68efc4

Please sign in to comment.