Skip to content

Commit

Permalink
feat: Make compression encoder pool size controlled by MaxBufferedCom…
Browse files Browse the repository at this point in the history
…pressionEncoders a configurable parameter

This PR is addressing issue: IBM#2965

For Zstd compression, currently the pool size of encoders (controlled by MaxBufferedEncoders param) is hard-coded to be 1, this caused excessive number of large encoder objects being created under high concurrency which caused GC performance issue.

Introduce a new config parameter: MaxBufferedCompressionEncoders (default value 1) and modify the calling paths to pass this parameter through message/record_batch and down to compressor/encoder.

Signed-off-by: Henry Cai <[email protected]>
  • Loading branch information
HenryCaiHaiying committed Aug 21, 2024
1 parent c6b288a commit 10f3aa4
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 44 deletions.
4 changes: 2 additions & 2 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ var (
}
)

func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
func compress(cc CompressionCodec, level int, maxEncoders int, data []byte) ([]byte, error) {
switch cc {
case CompressionNone:
return data, nil
Expand Down Expand Up @@ -187,7 +187,7 @@ func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
}
return buf.Bytes(), nil
case CompressionZSTD:
return zstdCompress(ZstdEncoderParams{level}, nil, data)
return zstdCompress(ZstdEncoderParams{Level: level, MaxBufferedEncoders: maxEncoders}, nil, data)
default:
return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}
}
Expand Down
3 changes: 3 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ type Config struct {
// on the actual compression type used and defaults to default compression
// level for the codec.
CompressionLevel int
// The pool size of encoders used during compression
MaxBufferedCompressionEncoders int
// Generates partitioners for choosing the partition to send messages to
// (defaults to hashing the message key). Similar to the `partitioner.class`
// setting for the JVM producer.
Expand Down Expand Up @@ -531,6 +533,7 @@ func NewConfig() *Config {
c.Producer.Retry.Backoff = 100 * time.Millisecond
c.Producer.Return.Errors = true
c.Producer.CompressionLevel = CompressionLevelDefault
c.Producer.MaxBufferedCompressionEncoders = 1

c.Producer.Transaction.Timeout = 1 * time.Minute
c.Producer.Transaction.Retry.Max = 50
Expand Down
3 changes: 3 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,9 @@ func TestLZ4ConfigValidation(t *testing.T) {

func TestZstdConfigValidation(t *testing.T) {
config := NewTestConfig()
if config.Producer.MaxBufferedCompressionEncoders != 1 {
t.Error("Expects MaxBufferedCompressionEncoders to be 1, got ", config.Producer.MaxBufferedCompressionEncoders)
}
config.Producer.Compression = CompressionZSTD
err := config.Validate()
var target ConfigurationError
Expand Down
19 changes: 10 additions & 9 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ func (cc CompressionCodec) MarshalText() ([]byte, error) {

// Message is a kafka message type
type Message struct {
Codec CompressionCodec // codec used to compress the message contents
CompressionLevel int // compression level
LogAppendTime bool // the used timestamp is LogAppendTime
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Version int8 // v1 requires Kafka 0.10
Timestamp time.Time // the timestamp of the message (version 1+ only)
Codec CompressionCodec // codec used to compress the message contents
CompressionLevel int // compression level
MaxBufferedCompressionEncoders int // pool size of encoders
LogAppendTime bool // the used timestamp is LogAppendTime
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Version int8 // v1 requires Kafka 0.10
Timestamp time.Time // the timestamp of the message (version 1+ only)

compressedCache []byte
compressedSize int // used for computing the compression ratio metrics
Expand Down Expand Up @@ -107,7 +108,7 @@ func (m *Message) encode(pe packetEncoder) error {
payload = m.compressedCache
m.compressedCache = nil
} else if m.Value != nil {
payload, err = compress(m.Codec, m.CompressionLevel, m.Value)
payload, err = compress(m.Codec, m.CompressionLevel, m.MaxBufferedCompressionEncoders, m.Value)
if err != nil {
return err
}
Expand Down
24 changes: 13 additions & 11 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
if set == nil {
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
batch := &RecordBatch{
FirstTimestamp: timestamp,
Version: 2,
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
ProducerID: ps.producerID,
ProducerEpoch: ps.producerEpoch,
FirstTimestamp: timestamp,
Version: 2,
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
MaxBufferedCompressionEncoders: ps.parent.conf.Producer.MaxBufferedCompressionEncoders,
ProducerID: ps.producerID,
ProducerEpoch: ps.producerEpoch,
}
if ps.parent.conf.Producer.Idempotent {
batch.FirstSequence = msg.sequenceNumber
Expand Down Expand Up @@ -199,11 +200,12 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
panic(err)
}
compMsg := &Message{
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
Key: nil,
Value: payload,
Set: set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
MaxBufferedCompressionEncoders: ps.parent.conf.Producer.MaxBufferedCompressionEncoders,
Key: nil,
Value: payload,
Set: set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
compMsg.Version = 1
Expand Down
35 changes: 18 additions & 17 deletions record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,23 @@ func (e recordsArray) decode(pd packetDecoder) error {
}

type RecordBatch struct {
FirstOffset int64
PartitionLeaderEpoch int32
Version int8
Codec CompressionCodec
CompressionLevel int
Control bool
LogAppendTime bool
LastOffsetDelta int32
FirstTimestamp time.Time
MaxTimestamp time.Time
ProducerID int64
ProducerEpoch int16
FirstSequence int32
Records []*Record
PartialTrailingRecord bool
IsTransactional bool
FirstOffset int64
PartitionLeaderEpoch int32
Version int8
Codec CompressionCodec
CompressionLevel int
MaxBufferedCompressionEncoders int
Control bool
LogAppendTime bool
LastOffsetDelta int32
FirstTimestamp time.Time
MaxTimestamp time.Time
ProducerID int64
ProducerEpoch int16
FirstSequence int32
Records []*Record
PartialTrailingRecord bool
IsTransactional bool

compressedRecords []byte
recordsLen int // uncompressed records size
Expand Down Expand Up @@ -203,7 +204,7 @@ func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
}
b.recordsLen = len(raw)

b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, b.MaxBufferedCompressionEncoders, raw)
return err
}

Expand Down
17 changes: 13 additions & 4 deletions zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,32 @@ import (

// zstdMaxBufferedEncoders maximum number of not-in-use zstd encoders
// If the pool of encoders is exhausted then new encoders will be created on the fly
const zstdMaxBufferedEncoders = 1
var zstdMaxBufferedEncoders int

type ZstdEncoderParams struct {
Level int
Level int
MaxBufferedEncoders int
}

type ZstdDecoderParams struct {
}

var zstdDecMap sync.Map

var zstdAvailableEncoders sync.Map

func getZstdMaxBufferedEncoders(params ZstdEncoderParams) int {
if params.MaxBufferedEncoders > 0 {
return params.MaxBufferedEncoders
}
return 1
}

func getZstdEncoderChannel(params ZstdEncoderParams) chan *zstd.Encoder {
if c, ok := zstdAvailableEncoders.Load(params); ok {
if c, ok := zstdAvailableEncoders.Load(params.Level); ok {
return c.(chan *zstd.Encoder)
}
c, _ := zstdAvailableEncoders.LoadOrStore(params, make(chan *zstd.Encoder, zstdMaxBufferedEncoders))
c, _ := zstdAvailableEncoders.LoadOrStore(params.Level, make(chan *zstd.Encoder, getZstdMaxBufferedEncoders(params)))
return c.(chan *zstd.Encoder)
}

Expand Down
51 changes: 50 additions & 1 deletion zstd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package sarama
import (
"runtime"
"testing"

"github.com/klauspost/compress/zstd"
)

func BenchmarkZstdMemoryConsumption(b *testing.B) {
params := ZstdEncoderParams{Level: 9}
params := ZstdEncoderParams{Level: 9, MaxBufferedEncoders: 1}
buf := make([]byte, 1024*1024)
for i := 0; i < len(buf); i++ {
buf[i] = byte((i / 256) + (i * 257))
Expand All @@ -27,3 +29,50 @@ func BenchmarkZstdMemoryConsumption(b *testing.B) {
}
runtime.GOMAXPROCS(gomaxprocsBackup)
}

func TestMaxBufferedEncodersCapacity(t *testing.T) {
num := 10
params := ZstdEncoderParams{Level: 3, MaxBufferedEncoders: num}
buf := make([]byte, 1024*1024)
for i := 0; i < len(buf); i++ {
buf[i] = byte((i / 256) + (i * 257))
}

var encoders []*zstd.Encoder = make([]*zstd.Encoder, num)
var ch chan *zstd.Encoder
for i := 0; i < num; i++ {
encoders[i] = getZstdEncoder(params)
}
ch = getZstdEncoderChannel(params)
// channel should be empty at the moment
if len(ch) != 0 {
t.Error("Expects channel len to be ", 0, ", got ", len(ch))
}
if cap(ch) != num {
t.Error("Expects channel cap to be ", num, ", got ", cap(ch))
}
// this adds the encoders to the channel
for i := 0; i < num; i++ {
releaseEncoder(params, encoders[i])
}
if len(ch) != num {
t.Error("Expects channel len to be ", num, ", got ", len(ch))
}
// Drain the channel
for i := 0; i < num; i++ {
encoders[i] = getZstdEncoder(params)
}
ch = getZstdEncoderChannel(params)
// channel should be empty at the moment
if len(ch) != 0 {
t.Error("Expects channel len to be ", 0, ", got ", len(ch))
}
}

func TestMaxBufferedEncodersDefault(t *testing.T) {
params := ZstdEncoderParams{}
encoders := getZstdMaxBufferedEncoders(params)
if encoders != 1 {
t.Error("Expects encoders to be ", 1, ", got ", encoders)
}
}

0 comments on commit 10f3aa4

Please sign in to comment.