Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make compression encoder pool size controlled by MaxBufferedCompressionEncoders a configurable parameter #2968

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ var (
}
)

func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
func compress(cc CompressionCodec, level int, maxEncoders int, data []byte) ([]byte, error) {
switch cc {
case CompressionNone:
return data, nil
Expand Down Expand Up @@ -187,7 +187,7 @@ func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
}
return buf.Bytes(), nil
case CompressionZSTD:
return zstdCompress(ZstdEncoderParams{level}, nil, data)
return zstdCompress(ZstdEncoderParams{Level: level, MaxBufferedEncoders: maxEncoders}, nil, data)
default:
return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}
}
Expand Down
3 changes: 3 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ type Config struct {
// on the actual compression type used and defaults to default compression
// level for the codec.
CompressionLevel int
// The pool size of encoders used during compression
MaxBufferedCompressionEncoders int
// Generates partitioners for choosing the partition to send messages to
// (defaults to hashing the message key). Similar to the `partitioner.class`
// setting for the JVM producer.
Expand Down Expand Up @@ -531,6 +533,7 @@ func NewConfig() *Config {
c.Producer.Retry.Backoff = 100 * time.Millisecond
c.Producer.Return.Errors = true
c.Producer.CompressionLevel = CompressionLevelDefault
c.Producer.MaxBufferedCompressionEncoders = 1

c.Producer.Transaction.Timeout = 1 * time.Minute
c.Producer.Transaction.Retry.Max = 50
Expand Down
3 changes: 3 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,9 @@ func TestLZ4ConfigValidation(t *testing.T) {

func TestZstdConfigValidation(t *testing.T) {
config := NewTestConfig()
if config.Producer.MaxBufferedCompressionEncoders != 1 {
t.Error("Expects MaxBufferedCompressionEncoders to be 1, got ", config.Producer.MaxBufferedCompressionEncoders)
}
config.Producer.Compression = CompressionZSTD
err := config.Validate()
var target ConfigurationError
Expand Down
19 changes: 10 additions & 9 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ func (cc CompressionCodec) MarshalText() ([]byte, error) {

// Message is a kafka message type
type Message struct {
Codec CompressionCodec // codec used to compress the message contents
CompressionLevel int // compression level
LogAppendTime bool // the used timestamp is LogAppendTime
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Version int8 // v1 requires Kafka 0.10
Timestamp time.Time // the timestamp of the message (version 1+ only)
Codec CompressionCodec // codec used to compress the message contents
CompressionLevel int // compression level
MaxBufferedCompressionEncoders int // pool size of encoders
LogAppendTime bool // the used timestamp is LogAppendTime
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Version int8 // v1 requires Kafka 0.10
Timestamp time.Time // the timestamp of the message (version 1+ only)

compressedCache []byte
compressedSize int // used for computing the compression ratio metrics
Expand Down Expand Up @@ -107,7 +108,7 @@ func (m *Message) encode(pe packetEncoder) error {
payload = m.compressedCache
m.compressedCache = nil
} else if m.Value != nil {
payload, err = compress(m.Codec, m.CompressionLevel, m.Value)
payload, err = compress(m.Codec, m.CompressionLevel, m.MaxBufferedCompressionEncoders, m.Value)
if err != nil {
return err
}
Expand Down
24 changes: 13 additions & 11 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
if set == nil {
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
batch := &RecordBatch{
FirstTimestamp: timestamp,
Version: 2,
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
ProducerID: ps.producerID,
ProducerEpoch: ps.producerEpoch,
FirstTimestamp: timestamp,
Version: 2,
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
MaxBufferedCompressionEncoders: ps.parent.conf.Producer.MaxBufferedCompressionEncoders,
ProducerID: ps.producerID,
ProducerEpoch: ps.producerEpoch,
}
if ps.parent.conf.Producer.Idempotent {
batch.FirstSequence = msg.sequenceNumber
Expand Down Expand Up @@ -199,11 +200,12 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
panic(err)
}
compMsg := &Message{
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
Key: nil,
Value: payload,
Set: set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
MaxBufferedCompressionEncoders: ps.parent.conf.Producer.MaxBufferedCompressionEncoders,
Key: nil,
Value: payload,
Set: set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
}
if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
compMsg.Version = 1
Expand Down
35 changes: 18 additions & 17 deletions record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,23 @@ func (e recordsArray) decode(pd packetDecoder) error {
}

type RecordBatch struct {
FirstOffset int64
PartitionLeaderEpoch int32
Version int8
Codec CompressionCodec
CompressionLevel int
Control bool
LogAppendTime bool
LastOffsetDelta int32
FirstTimestamp time.Time
MaxTimestamp time.Time
ProducerID int64
ProducerEpoch int16
FirstSequence int32
Records []*Record
PartialTrailingRecord bool
IsTransactional bool
FirstOffset int64
PartitionLeaderEpoch int32
Version int8
Codec CompressionCodec
CompressionLevel int
MaxBufferedCompressionEncoders int
Control bool
LogAppendTime bool
LastOffsetDelta int32
FirstTimestamp time.Time
MaxTimestamp time.Time
ProducerID int64
ProducerEpoch int16
FirstSequence int32
Records []*Record
PartialTrailingRecord bool
IsTransactional bool

compressedRecords []byte
recordsLen int // uncompressed records size
Expand Down Expand Up @@ -203,7 +204,7 @@ func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
}
b.recordsLen = len(raw)

b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, b.MaxBufferedCompressionEncoders, raw)
return err
}

Expand Down
17 changes: 13 additions & 4 deletions zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,32 @@ import (

// zstdMaxBufferedEncoders maximum number of not-in-use zstd encoders
// If the pool of encoders is exhausted then new encoders will be created on the fly
const zstdMaxBufferedEncoders = 1
var zstdMaxBufferedEncoders int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this from a const to a var feels like a code smell.

Ah, yep. This PR also removes the only use. I propose instead of this change, move getZstdMaxBufferedEncoders to here, and renaming it to zstdMaxBufferedEncoders. That way we replace the functionality that the const was providing, and also we can reuse some of the godoc as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do the movement and function rename, but getZstdMaxBufferedEncoders also takes in an argument so it won't be exactly the same as before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it won’t be exactly the same, that’s ok.

There’s a kind of common Go style recommendation to avoid Get and get prefixes. (example: https://google.github.io/styleguide/go/decisions#getters ) The idea is the verb is kind of so generic, that it doesn’t increase the information in the name.


type ZstdEncoderParams struct {
Level int
Level int
MaxBufferedEncoders int
}

type ZstdDecoderParams struct {
}

var zstdDecMap sync.Map

var zstdAvailableEncoders sync.Map

func getZstdMaxBufferedEncoders(params ZstdEncoderParams) int {
if params.MaxBufferedEncoders > 0 {
return params.MaxBufferedEncoders
}
return 1
}

func getZstdEncoderChannel(params ZstdEncoderParams) chan *zstd.Encoder {
if c, ok := zstdAvailableEncoders.Load(params); ok {
if c, ok := zstdAvailableEncoders.Load(params.Level); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Hm, not sure we want to clobber all MaxBufferedEncoders of the same Level together…

Maybe, we could do something like:

params.MaxBufferedEncoders = zstdMaxBufferedEncoders(params)
if c, ok := zstdAvailableEncoders.Load(params); ok {
…

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I probably should move MaxBufferedEncoders out of the ZstdEncoderParams struct. The fields in ZstdEncoderParmszstdAvailableEncoders map was keyed on encoder parameters. Encoders with the same encoder parameters are created to be the same kind of objects. MaxBufferedEncoders are used to control the pool size, it's actually not part of the encoder parameter for creating an individual encoder object, use MaxBufferedEncoder as part of the map key would be confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so the idea is that this would be set generically for all encoders in a process? And you just implemented that in a “stateless” sort of way, where everyone is expected to pass in the value?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the 'maxBufferedEncoders' needs to pass in explicitly through function call. ZstdEncoderParams is a struct for the parameters needed for creating an encoder. maxBufferedEncoders is not a parameter for an individual encoder, it should not be stored in that param struct.

return c.(chan *zstd.Encoder)
}
c, _ := zstdAvailableEncoders.LoadOrStore(params, make(chan *zstd.Encoder, zstdMaxBufferedEncoders))
c, _ := zstdAvailableEncoders.LoadOrStore(params.Level, make(chan *zstd.Encoder, getZstdMaxBufferedEncoders(params)))
return c.(chan *zstd.Encoder)
}

Expand Down
51 changes: 50 additions & 1 deletion zstd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package sarama
import (
"runtime"
"testing"

"github.com/klauspost/compress/zstd"
)

func BenchmarkZstdMemoryConsumption(b *testing.B) {
params := ZstdEncoderParams{Level: 9}
params := ZstdEncoderParams{Level: 9, MaxBufferedEncoders: 1}
buf := make([]byte, 1024*1024)
for i := 0; i < len(buf); i++ {
buf[i] = byte((i / 256) + (i * 257))
Expand All @@ -27,3 +29,50 @@ func BenchmarkZstdMemoryConsumption(b *testing.B) {
}
runtime.GOMAXPROCS(gomaxprocsBackup)
}

func TestMaxBufferedEncodersCapacity(t *testing.T) {
num := 10
params := ZstdEncoderParams{Level: 3, MaxBufferedEncoders: num}
buf := make([]byte, 1024*1024)
for i := 0; i < len(buf); i++ {
buf[i] = byte((i / 256) + (i * 257))
}

var encoders []*zstd.Encoder = make([]*zstd.Encoder, num)
var ch chan *zstd.Encoder
for i := 0; i < num; i++ {
encoders[i] = getZstdEncoder(params)
}
ch = getZstdEncoderChannel(params)
// channel should be empty at the moment
if len(ch) != 0 {
t.Error("Expects channel len to be ", 0, ", got ", len(ch))
}
if cap(ch) != num {
t.Error("Expects channel cap to be ", num, ", got ", cap(ch))
}
// this adds the encoders to the channel
for i := 0; i < num; i++ {
releaseEncoder(params, encoders[i])
}
if len(ch) != num {
t.Error("Expects channel len to be ", num, ", got ", len(ch))
}
// Drain the channel
for i := 0; i < num; i++ {
encoders[i] = getZstdEncoder(params)
}
ch = getZstdEncoderChannel(params)
// channel should be empty at the moment
if len(ch) != 0 {
t.Error("Expects channel len to be ", 0, ", got ", len(ch))
}
}

func TestMaxBufferedEncodersDefault(t *testing.T) {
params := ZstdEncoderParams{}
encoders := getZstdMaxBufferedEncoders(params)
if encoders != 1 {
t.Error("Expects encoders to be ", 1, ", got ", encoders)
}
}