-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatic offset tracking for stream queues #661
base: main
Are you sure you want to change the base?
Conversation
2ecc8d3
to
e00b4e5
Compare
194926e
to
e58ad9b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens with consumer_offset_capacity
is reached? MFile
doesn't auto expand.
Fixed here |
d52b6f5
to
a74d4a6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to think about replication of the consumer offset file.
@carlhoerberg Performance seems pretty similar with both solutions, maybe even slightly quicker with append-only (I increased file size to Config.instance.segment_size to make sure we don't have to constantly GC). So I think we can go with that solution 👍 |
Cool! Havent read all code but looks like the file is still expanded? Is that nessecary? Shouldnt a GC be issued instead? The file size should be a minimum segment size yes, but larger if there are many consumers we are tracking for, so calc the size of the file after GC and the capacity should be a multiple of that, say 1000-10000 larger or so, so that the offset of each consumer could be updated that many times without causing a new GC. |
Yeah, it will try to compact the file first (only keep the latest offset for each ctag). If the file is still full after that (i.e. there was nothing to compact) it will be expanded. Probably safer to set the capacity to a multiple of tracked ctags as you say though. Will look at that tomorrow!
|
@carlhoerberg Added this. Do we need to set some form of max size for the offsets file? capacity could in theory be very large if there are many consumers (and/or if ctags are long). |
@carlhoerberg added some replication for the consumer offsets file. Not really sure what the best approach is for when we replace the file during compaction though. Just |
Ah, well spotted, can implement replace file on the client side with write-to-tempfile/rename in another PR |
Should revive this |
221f248
to
f7aad09
Compare
…g x-stream-use-automatic-offset
Co-authored-by: Carl Hörberg <[email protected]>
Co-authored-by: Carl Hörberg <[email protected]>
Co-authored-by: Carl Hörberg <[email protected]>
… full. Use config.instance.segment_size
cce2560
to
a3c6808
Compare
WHAT is this pull request doing?
Adds broker tracking of consumer offsets in streams if no
x-stream-offset
is provided by the consumer. Does not track if the consumer tag is generated by the broker.✅ When to run
cleanup_consumer_offsets
?✅ IndexError when trying to cleanup if msg_size => segment_size
HOW can this pull request be tested?
Run specs