-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: subscribe to server shutdown signal in insert task #553
Conversation
previously relied on http server shutdown closing channel exclusively, now we listen to the shutdown signal internally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
core change looks good, but unit tests are failing to build
api/src/server.rs
Outdated
loop { | ||
let should_exit = shutdown || event_rx.is_closed(); | ||
// make sure the events queue doesn't get too deep when we're under heavy load |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick, it confused me to have this code come in the middle of the shutdown logic. Can we put line 412 and the block starting on line 417 next to each other? Ideally as the first thing in the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reworked the logic a bit to stop using channel openness at all or share anything between shutdown and normal processing. I added draining the channel before shutting down. lmk what you think.
Moves all shutdown logic to one branch, which now tries to pull any pending events from the channel before exiting rather than use channel openness as proxy for shutdown. The select! branches no longer do any async logic and just set variables for the outer loop to use, as it avoids any subtle cancel safety issues with &mut self.
moved all processing after select! so the loop is now 1) wait for something to happen (interval, new events, shutdown) and 2) handle shutdown or process events if appropriate 3) repeat loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's enough rust and tokio things going on here that I feel like it's probably good if @nathanielc takes a look rather than me just approving it.
tracing::info!("Shutting down insert task after processing current batch"); | ||
if !event_rx.is_empty() { | ||
let remaining_event_cnt = event_rx.len(); | ||
let mut buf = Vec::with_capacity(remaining_event_cnt); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we not just use the buf that was already allocated on line 412?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, makes sense. We can resize and use it (or just use it since recv_many might resize automatically - I’ll verify). It’s the last allocation before exiting so I didn’t think too closely about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't use it actually. it gets moved into the select branch even though we don't follow that arm in this case. I added a comment to that effect. expanding the macro might make it easier to see but I didn't bother.
api/src/server.rs
Outdated
// process events at the interval or when we're under heavy load. | ||
// we do it outside the select! to avoid any cancel safety issues | ||
// even though we should be okay since we're using tokio channels/intervals | ||
if events.len() >= EVENT_INSERT_QUEUE_SIZE || process_early { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's "early" about the processing when process_early
is true? Isn't the "early" processing really when the events.len() is too long, but the processing when that bool is true is actually the regularly scheduled processing due to the interval ticking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair. Process interval is a better name. I’ll rename it. It was early in my head because we want to get N requests before we do work so if we don’t get enough in time we process them anyway before that (ie early) but it’s not very clear.
We previously relied on http server shutdown closing the channel to signal the task to stop, which could take at least one tick interval of the task to trigger. Now, we listen to the daemon shutdown signal so it's faster.