Skip to content

Commit

Permalink
Limit the maximum number of in-use zstd encoders
Browse files Browse the repository at this point in the history
A recent issue reported by Henry Haiying Cai showed that the current
zstd reuse logic has 2 major flaws
- There is no upper bound on created zstd encoders
- The reuse of encoders is low if many goroutines hit `zstdCompress`
  simultaniously

This is fixed by changing the original behavior in 2 ways
- There are never more than GOMAXPROCs encoders in use
- The maximum number of encoders per compression level is GOMAXPRCOS at
  some early point in time

This means we have finally an upper bound on in-use encoders and with
that a worst case memory consumption. Caching that amount of encoders
does not worsen the worst case behavior (unless many compression levels
are in use at the same time).

This should be a significant performance improvement for codebases that
generate many messages in parallel, fanning out to many partitions.

Signed-off-by: René Treffer <[email protected]>
  • Loading branch information
rtreffer committed Sep 7, 2024
1 parent 4db4716 commit 8e68941
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 18 deletions.
53 changes: 40 additions & 13 deletions zstd.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package sarama

import (
"runtime"
"sync"

"github.com/klauspost/compress/zstd"
)

// zstdMaxBufferedEncoders maximum number of not-in-use zstd encoders
// If the pool of encoders is exhausted then new encoders will be created on the fly
const zstdMaxBufferedEncoders = 1

type ZstdEncoderParams struct {
Level int
}
Expand All @@ -20,35 +17,65 @@ var zstdDecMap sync.Map

var zstdAvailableEncoders sync.Map

var zstdCheckedOutEncoders int
var zstdMutex = &sync.Mutex{}
var zstdEncoderReturned = sync.NewCond(zstdMutex)
var zstdTestingDisableConcurrencyLimit bool

func getZstdEncoderChannel(params ZstdEncoderParams) chan *zstd.Encoder {
if c, ok := zstdAvailableEncoders.Load(params); ok {
return c.(chan *zstd.Encoder)
}
c, _ := zstdAvailableEncoders.LoadOrStore(params, make(chan *zstd.Encoder, zstdMaxBufferedEncoders))
limit := runtime.GOMAXPROCS(0)
c, _ := zstdAvailableEncoders.LoadOrStore(params, make(chan *zstd.Encoder, limit))
return c.(chan *zstd.Encoder)
}

func newZstdEncoder(params ZstdEncoderParams) *zstd.Encoder {
encoderLevel := zstd.SpeedDefault
if params.Level != CompressionLevelDefault {
encoderLevel = zstd.EncoderLevelFromZstd(params.Level)
}
zstdEnc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true),
zstd.WithEncoderLevel(encoderLevel),
zstd.WithEncoderConcurrency(1))
return zstdEnc
}

func getZstdEncoder(params ZstdEncoderParams) *zstd.Encoder {

Check failure on line 45 in zstd.go

View workflow job for this annotation

GitHub Actions / Linting with Go 1.23.x

unnecessary leading newline (whitespace)

zstdMutex.Lock()
defer zstdMutex.Unlock()

limit := runtime.GOMAXPROCS(0)
for zstdCheckedOutEncoders >= limit && !zstdTestingDisableConcurrencyLimit {
zstdEncoderReturned.Wait()
limit = runtime.GOMAXPROCS(0)
}

zstdCheckedOutEncoders += 1

select {
case enc := <-getZstdEncoderChannel(params):
return enc
default:
encoderLevel := zstd.SpeedDefault
if params.Level != CompressionLevelDefault {
encoderLevel = zstd.EncoderLevelFromZstd(params.Level)
}
zstdEnc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true),
zstd.WithEncoderLevel(encoderLevel),
zstd.WithEncoderConcurrency(1))
return zstdEnc
return newZstdEncoder(params)
}
}

func releaseEncoder(params ZstdEncoderParams, enc *zstd.Encoder) {
zstdMutex.Lock()

zstdCheckedOutEncoders -= 1

select {
case getZstdEncoderChannel(params) <- enc:
default:
}

zstdEncoderReturned.Signal()

zstdMutex.Unlock()
}

func getDecoder(params ZstdDecoderParams) *zstd.Decoder {
Expand Down
110 changes: 105 additions & 5 deletions zstd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ package sarama

import (
"runtime"
"sync"
"testing"
)

// BenchmarkZstdMemoryConsumption benchmarks the memory consumption of the zstd encoder under the following constraints
// 1. The encoder is created with a high compression level
// 2. The encoder is used to compress a 1MB buffer
// 3. We emulate a 96 core system
// In other words: we test the compression memory and cpu efficiency under minimal parallelism
func BenchmarkZstdMemoryConsumption(b *testing.B) {
params := ZstdEncoderParams{Level: 9}
buf := make([]byte, 1024*1024)
Expand All @@ -15,15 +21,109 @@ func BenchmarkZstdMemoryConsumption(b *testing.B) {
cpus := 96

gomaxprocsBackup := runtime.GOMAXPROCS(cpus)
defer runtime.GOMAXPROCS(gomaxprocsBackup)

b.SetBytes(int64(len(buf) * 2 * cpus))
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for j := 0; j < 2*cpus; j++ {
_, _ = zstdCompress(params, nil, buf)
}
// drain the buffered encoder
getZstdEncoder(params)
// previously this would be achieved with
// zstdEncMap.Delete(params)
// drain the buffered encoder so that we get a fresh one for the next run
zstdAvailableEncoders.Delete(params)
}

b.ReportMetric(float64(cpus), "(gomaxprocs)")
b.ReportMetric(float64(1), "(goroutines)")
}

// BenchmarkZstdMemoryConsumptionConcurrency benchmarks the memory consumption of the zstd encoder under the following constraints
// 1. The encoder is created with a high compression level
// 2. The encoder is used to compress a 1MB buffer
// 3. We emulate a 2 core system
// 4. We create 1000 goroutines that compress the buffer 2 times each
// In summary: we test the compression memory and cpu efficiency under extreme concurrency
func BenchmarkZstdMemoryConsumptionConcurrency(b *testing.B) {
params := ZstdEncoderParams{Level: 9}
buf := make([]byte, 1024*1024)
for i := 0; i < len(buf); i++ {
buf[i] = byte((i / 256) + (i * 257))
}

cpus := 4
goroutines := 256

gomaxprocsBackup := runtime.GOMAXPROCS(cpus)
defer runtime.GOMAXPROCS(gomaxprocsBackup)

b.ReportMetric(float64(cpus), "(gomaxprocs)")
b.ResetTimer()
b.SetBytes(int64(len(buf) * goroutines))
b.ReportAllocs()
for i := 0; i < b.N; i++ {
// create n goroutines, wait until all start and then signal them to start compressing
var start sync.WaitGroup
var done sync.WaitGroup
start.Add(goroutines)
done.Add(goroutines)
for j := 0; j < goroutines; j++ {
go func() {
start.Done()
start.Wait()
_, _ = zstdCompress(params, nil, buf)
done.Done()
}()
zstdAvailableEncoders.Delete(params)
}
done.Wait()
}
runtime.GOMAXPROCS(gomaxprocsBackup)

b.ReportMetric(float64(cpus), "(gomaxprocs)")
b.ReportMetric(float64(goroutines), "(goroutines)")
}

// BenchmarkZstdMemoryNoConcurrencyLimit benchmarks the encoder behavior when the concurrency limit is disabled.
func BenchmarkZstdMemoryNoConcurrencyLimit(b *testing.B) {
zstdTestingDisableConcurrencyLimit = true
defer func() {
zstdTestingDisableConcurrencyLimit = false
}()

params := ZstdEncoderParams{Level: 9}
buf := make([]byte, 1024*1024)
for i := 0; i < len(buf); i++ {
buf[i] = byte((i / 256) + (i * 257))
}

cpus := 4
goroutines := 256

gomaxprocsBackup := runtime.GOMAXPROCS(cpus)
defer runtime.GOMAXPROCS(gomaxprocsBackup)

b.ReportMetric(float64(cpus), "(gomaxprocs)")
b.ResetTimer()
b.SetBytes(int64(len(buf) * goroutines))
b.ReportAllocs()
for i := 0; i < b.N; i++ {
// create n goroutines, wait until all start and then signal them to start compressing
var start sync.WaitGroup
var done sync.WaitGroup
start.Add(goroutines)
done.Add(goroutines)
for j := 0; j < goroutines; j++ {
go func() {
start.Done()
start.Wait()
_, _ = zstdCompress(params, nil, buf)
done.Done()
}()
zstdAvailableEncoders.Delete(params)
}
done.Wait()
}

b.ReportMetric(float64(cpus), "(gomaxprocs)")
b.ReportMetric(float64(goroutines), "(goroutines)")
}

0 comments on commit 8e68941

Please sign in to comment.