Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make compression encoder pool size controlled by MaxBufferedCompressionEncoders a configurable parameter #2968

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

HenryCaiHaiying
Copy link

This PR is addressing issue: #2965

For Zstd compression, currently the pool size of encoders (controlled by MaxBufferedEncoders param) is hard-coded to be 1, this caused excessive number of large encoder objects being created under high concurrency which caused GC performance issue.

Introduce a new config parameter: MaxBufferedCompressionEncoders (default value 1) and modify the calling paths to pass this parameter through message/record_batch and down to compressor/encoder.

…pressionEncoders a configurable parameter

This PR is addressing issue: IBM#2965

For Zstd compression, currently the pool size of encoders (controlled by MaxBufferedEncoders param) is hard-coded to be 1, this caused excessive number of large encoder objects being created under high concurrency which caused GC performance issue.

Introduce a new config parameter: MaxBufferedCompressionEncoders (default value 1) and modify the calling paths to pass this parameter through message/record_batch and down to compressor/encoder.

Signed-off-by: Henry Cai <[email protected]>
@HenryCaiHaiying
Copy link
Author

@dnwe Check to see if you like the idea of making the encoder pool size configurable.

zstd.go Outdated
@@ -8,23 +8,32 @@ import (

// zstdMaxBufferedEncoders maximum number of not-in-use zstd encoders
// If the pool of encoders is exhausted then new encoders will be created on the fly
const zstdMaxBufferedEncoders = 1
var zstdMaxBufferedEncoders int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this from a const to a var feels like a code smell.

Ah, yep. This PR also removes the only use. I propose instead of this change, move getZstdMaxBufferedEncoders to here, and renaming it to zstdMaxBufferedEncoders. That way we replace the functionality that the const was providing, and also we can reuse some of the godoc as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do the movement and function rename, but getZstdMaxBufferedEncoders also takes in an argument so it won't be exactly the same as before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it won’t be exactly the same, that’s ok.

There’s a kind of common Go style recommendation to avoid Get and get prefixes. (example: https://google.github.io/styleguide/go/decisions#getters ) The idea is the verb is kind of so generic, that it doesn’t increase the information in the name.

zstd.go Outdated
func getZstdEncoderChannel(params ZstdEncoderParams) chan *zstd.Encoder {
if c, ok := zstdAvailableEncoders.Load(params); ok {
if c, ok := zstdAvailableEncoders.Load(params.Level); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Hm, not sure we want to clobber all MaxBufferedEncoders of the same Level together…

Maybe, we could do something like:

params.MaxBufferedEncoders = zstdMaxBufferedEncoders(params)
if c, ok := zstdAvailableEncoders.Load(params); ok {
…

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I probably should move MaxBufferedEncoders out of the ZstdEncoderParams struct. The fields in ZstdEncoderParmszstdAvailableEncoders map was keyed on encoder parameters. Encoders with the same encoder parameters are created to be the same kind of objects. MaxBufferedEncoders are used to control the pool size, it's actually not part of the encoder parameter for creating an individual encoder object, use MaxBufferedEncoder as part of the map key would be confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so the idea is that this would be set generically for all encoders in a process? And you just implemented that in a “stateless” sort of way, where everyone is expected to pass in the value?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the 'maxBufferedEncoders' needs to pass in explicitly through function call. ZstdEncoderParams is a struct for the parameters needed for creating an encoder. maxBufferedEncoders is not a parameter for an individual encoder, it should not be stored in that param struct.

Copy link
Author

@HenryCaiHaiying HenryCaiHaiying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, I will update the code accordingly.

zstd.go Outdated
@@ -8,23 +8,32 @@ import (

// zstdMaxBufferedEncoders maximum number of not-in-use zstd encoders
// If the pool of encoders is exhausted then new encoders will be created on the fly
const zstdMaxBufferedEncoders = 1
var zstdMaxBufferedEncoders int
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do the movement and function rename, but getZstdMaxBufferedEncoders also takes in an argument so it won't be exactly the same as before.

zstd.go Outdated
func getZstdEncoderChannel(params ZstdEncoderParams) chan *zstd.Encoder {
if c, ok := zstdAvailableEncoders.Load(params); ok {
if c, ok := zstdAvailableEncoders.Load(params.Level); ok {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I probably should move MaxBufferedEncoders out of the ZstdEncoderParams struct. The fields in ZstdEncoderParmszstdAvailableEncoders map was keyed on encoder parameters. Encoders with the same encoder parameters are created to be the same kind of objects. MaxBufferedEncoders are used to control the pool size, it's actually not part of the encoder parameter for creating an individual encoder object, use MaxBufferedEncoder as part of the map key would be confusing.

1. rename getZstdMaxBufferedEncoders to zstdMaxBufferedEncoders
2. move MaxBufferedEncoders out of ZstdEncoderParams struct

Signed-off-by: Henry Cai <[email protected]>
Signed-off-by: Henry Cai <[email protected]>
@dnwe
Copy link
Collaborator

dnwe commented Aug 21, 2024

@HenryCaiHaiying thanks for the PR. On a general note I think we would want to be similar stdlib naming with MaxIdleEncoders rather than MaxBufferedEncoders. However, I also wonder whether @klauspost (or @rtreffer) had any thoughts the proposed usage of zstd here?

We previously merged #2375 which tried to avoid the memory cost (on high CPU core machines) of zstd maintaining an encoder per core and instead passed zstd.WithEncoderConcurrency(1) to zstd and started to internally keep a single re-usable encoder around and allocated the rest on-demand. Now with this PR we'd provide a config parameter to keep more than 1 warm encoder around in our custom pool, but should we just be passing this through as the value to WithEncoderConcurrency instead and removing our custom code?

@rtreffer
Copy link
Contributor

I am sorry for the wall of text, I'll go over

  • history of the original PR
  • the issue described
  • encoder concurrency
  • the solution space (IMHO)

History of the original PR

The problem was that we were constrained by storage and network transfer.
We were also scaling to larger machines (48 or 96 cores iirc).
Compression level was set to high (minimize network / storage). Zstd was used as a better gzip.

With that we got gigabytes of preallocated memory (for 96 cores, while using 2-5, and 32MB state).
Coupled with a GOMEMLIMIT we ran into a GC death spiral.

Messages were small, but we used a custom partitioner that would send less than 1 MB to each partition in a round robin manner, batching up messages as efficiently as possible, keeping in flight requests low, while increasing compression ratios (even 1MB is rather small for a compression algorithm like zstd).
I can highly recommend such a partitioner if messages don't require ordering or predictable partition assignment. I was told that the java libraries have a partitioner like this. I could probably contributed something like it.

It worked: these changes massively dropped network and storage requirements. And memory consumption as well as pprof results were good.

Reported issue

At 2-4kb of allocations per message the messages are likely to be 1kb or less. Or ~1000x worse than what the original PR used. Batching also reduces the frequency of requesting an encoder, and higher compression settings mean we were using encoders longer.

I think the raised issue is valid: this is the worst case scenario. It is the exact opposite side of the spectrum. Speed optimized compression, small messages, high concurrency.

Encoder concurrency

WithEncoderConcurrency makes a single encoder use multiple goroutines. There are 2 problems with this:

  1. We have very small payloads (1MB max), meaning the speedup will likely be low (might even be non-existing)
  2. We have high concurrency before we hit the encoder

So running n encoders with no concurrency should provide the best results.
This is not backed by a benchmark though.

Solution space

Making the pool size configurable is one option.

Back in the day I was thinking of a pool with expiry. E.g. keep a []struct{Encoder, time.Time} (used as a FIFO queue) and on each enqueue / dequeue of an encoder check for old (e.g. >=1s or >=5s) encoders. This would likely remove the need for any configuration and the cleanup work can be done inline before each enqueue and after each dequeue. It is unlikely to get the described GC pressure if the instances are kept for >1s at least while allowing to shrink to 1.

My personal preference for this case would probably be

  1. Better batching (if possible) - this is a completely different approach though
  2. A configuration free approach
  3. Config setting

Complexity goes probably the opposite way 🙈

I hope this is somewhat helpful.

@HenryCaiHaiying
Copy link
Author

Thanks @rtreffer for the history flashback and detailed analysis. I think our case is a bit different than yours, we have 500 Kafka brokers on the downstream so anytime during the process run, there are probably hundreds of goroutines (one corresponds to one broker) trying to do message compression and producing. And the producer batch size is very small in our case (only a few KB), we couldn't make our batch size bigger without significantly re-architect our product. So we need either a configurable parameter for the pool size or make the pool smart to shrink if some objects are not used for a long time. I think the smart pool idea is good but it would probably need some effort to make it right and it probably still needs a configurable parameter (e.g. the decay timeout setting).

And agrees with your opinion on not using encoder_options.WithEncoderConcurreny(n) because we are only calling EncodeAll() once for each encoder object (unless we changes the sarama's zstd code to only create one singleton encoder object).

@HenryCaiHaiying
Copy link
Author

@puellanivis Are you still expecting me to make some code changes? I think I've addressed all the review comments.

@puellanivis
Copy link
Contributor

@puellanivis Are you still expecting me to make some code changes? I think I've addressed all the review comments.

Nope. I don’t see anything pressing to address. I just also cannot give an approval, or resolve the issues I already opened. 😂

@HenryCaiHaiying
Copy link
Author

So who can approve this PR?

@rtreffer
Copy link
Contributor

So... In the original PR I wrongly assumed that we would be tied to GOMAXPROCS as the upper bound for zstd encoders.

I just wrote a test case that shows this assumption is wrong, I assume it changed in 2020 when golang goroutines became more preememptive.

See rtreffer@2bad38e

  1. Set GOMAXPROCS to 1
  2. Wait for 500 goroutines creating a 1kb message each
  3. Let each goroutine check out an encoder and keep track of the number of checked out encoders
  4. Encode the buffer
  5. Return the encoder, decrement the checked out encoders
  6. Return the results, find the maximum number of encoders

At GOMAXPROCS=1 there shouldn't be 15 or 16 encoders. Note that the test is a bit flaky. But there is a bug on the other side of the spectrum, too, and a good chance the original issue is also impacted by that.

@HenryCaiHaiying
Copy link
Author

@rtreffer Yes the max number of encoders being created corresponds to the max number of goroutines not the number of cpu cores. On my laptop I have 10 cpu cores, if I create 30 goroutines I can see the number of encoders being created can also go to 30.

@klauspost
Copy link
Contributor

klauspost commented Aug 28, 2024

Sorry. I didn't have time to go through the PR before now.

It seems you are wastly overcomplicating things. There is no need to buffer encoders, since you are only using them for EncodeAll. This can be called concurrently and the encoder will allow for a predefined number of concurrent calls.

When creating the Encoder, simply use WithEncoderConcurrency to set the maximum number of concurrent EncodeAll calls to allow. So a simple params -> *Encoder mapping is all you need.

The encoder will then keep this number of internal decoders around between calls.

@HenryCaiHaiying
Copy link
Author

@klauspost if we go with the encoder pool in your library, we would also need to change the Sarama client to use a singleton Encoder object and still needs to configure this Sarama client to has this concurrency number configurable and pass it down to your library. It seems @rtreffer Has a different approach trying to make the pool smarter (upper bound the pool size to be GOMAXPROCS, the pool can grown/shrink base on the usage, no need to configure the pool size): rtreffer@23168b5

@klauspost
Copy link
Contributor

klauspost commented Sep 4, 2024

@HenryCaiHaiying You can do whatever you want. This just smells like over-engineering to me.

You never need more than 1 instance per config. You can use the same instance for all your encodes. Setting the concurrency limit does exactly what you are trying to do.

If you for whatever reason want to change the concurrency, you can just create a new instance, do an atomic replace and start using that.

@rtreffer
Copy link
Contributor

rtreffer commented Sep 7, 2024

@klauspost unfortunately the concurrency setting does not have the desired outcome. The main problem is the greedy allocation in zstd/encoder.go#L87-L90

The problem in the past was that on high core count machines (e.g. 96 core machines) the encoder would pre-allocate 96 instances, which can add up to gigabytes of memory. This is particularly problematic in multi-tenancy setups (kubernetes). 96 cores is currently the second largest general purpose size on latest generation AWS instances.

The main feature here is lazy allocations of encoders to avoid issues on setups where GOMAXPROCS is high but the actual parallelism is low.
I would be very happy if you could show how to reach that with less effort from our end. I do agree that in the case of a config setting we could just turn it into documented behavior.

However my design goals are:

  1. Low memory consumption for processes that use sarama with low parallelism (currently OK)
  2. Deterministic maximum memory consumption for processes that use high concurrency (currently broken)
  3. Reuse of encoders to avoid wasteful allocations (works only for low concurrency)

This would avoid any config tweaks. I finally opened an alternative PR for that: #2979 // CC @HenryCaiHaiying

@klauspost
Copy link
Contributor

the encoder would pre-allocate 96 instances, which can add up to gigabytes of memory.

Let's get the numbers: WithEncoderConcurrency(96) and WithLowerEncoderMem(true):

Allocs are for creating the Encoder and running 96 encodes to warm up the encoders:

    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-fastest: Memory Used: 405MB, 3121 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-default: Memory Used: 886MB, 3100 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-better: Memory Used: 1157MB, 1373 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-best: Memory Used: 4061MB, 1418 allocs

Pretty much all of this is allocating a window, based on the Window Size. For example setting WithWindowSize(1<<20):

    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-fastest: Memory Used: 117MB, 3115 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-default: Memory Used: 214MB, 3100 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-better: Memory Used: 485MB, 1368 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-best: Memory Used: 3389MB, 1425 allocs

The compression loss would be small, and all payloads < 1MB will be unaffected.

EncodeAll with < 1 block (64K/128K) has a shortcut, so no history is allocated - example doing 50KB encodes:

    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-fastest: Memory Used: 26MB, 1294 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-default: Memory Used: 123MB, 1279 allocs
    
   (this is not implemented for better/best, so numbers are as above)

In principle this could be applied to all EncodeAll operations, but it will require a bit of "plumbing" to work.

Limiting the window size and capping the concurrency are easy mitigations. Here are the static allocs with WithEncoderConcurrency(32), WithLowerEncoderMem(true), WithWindowSize(1<<20) and encoding 10MB payloads:

    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-fastest: Memory Used: 25MB, 1067 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-default: Memory Used: 58MB, 1045 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-better: Memory Used: 151MB, 473 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-best: Memory Used: 1139MB, 511 allocs

@HenryCaiHaiying
Copy link
Author

So for payload < 1MB, are WithLowerEncoderMem(true) and WithWindowSize(1<<20) recommended? Any downside of using those flags?

@klauspost
Copy link
Contributor

Any downside of using those flags?

None. Even with payloads >1MB the difference will be minimal. It will also reduce memory usage on decompression as a bonus.

"LowerEncoderMem" will do more lazy allocations, and over-allocate the window, so operations longer than the window is done with fewer copies - so mostly a very minor speed difference.

@rtreffer
Copy link
Contributor

So 3GB-4GB on 96 cores without setting the concurrency is roughly in line with what I was seeing.
Setting the compression level to best cut our bills >50% with no downside after the memory consumption fix. Some vendors charge for bytes stored and transferred, so better compression and batching is straight up 💰

Capping the window size at 1MB should be done no matter the implementation as this is the default max message size in kafka and going above this limit is annoying so most installations won't (needs to be configured on the servers and for all clients). That's a really useful call-out.

I see this discussion mostly trade-offs between

  • code complexity / simplicity
  • configuration parameters
  • memory consumption

I am more on the side that the library should adapt to usage, but I am fine with any other direction, too.
I would love for a decision on that front though given that the bug report shows a real issue.

@HenryCaiHaiying
Copy link
Author

I think I am fine with either directions (smart pool or number of encoder instances per config), but it seems we would also need to update the default config for encoder window size (and/or using LowerEncoderMemSetting). Even if we go with the simple setting with the number of encoders, we might also want to update the code to pass the the number of encoder setting to WithEncoderConcurrency()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants