Skip to content

Commit

Permalink
fix batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo committed Oct 20, 2024
1 parent 04a613d commit 49aadf4
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions foundations/src/batcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,22 +393,33 @@ impl<T: BatchOperation + 'static + Send + Sync> Batcher<T> {
inner: inner.clone(),
_auto_loader_abort: CancelOnDrop(
tokio::task::spawn(async move {
let mut next_wakeup = None;
loop {
tokio::select! {
Some(batch) = rx.recv() => {
let ticket = inner.semaphore.clone().acquire_owned().await.unwrap();
inner.spawn_batch(batch, ticket);
},
_ = async {
if let Some(expires_at) = next_wakeup {
tokio::time::sleep_until(expires_at).await;
} else {
inner.notify.notified().await;
}
} => {},
_ = inner.notify.notified() => {},
}
inner.notify.notified().await;

let Some((id, expires_at)) = inner.active_batch.read().await.as_ref().map(|b| (b.id, b.expires_at))
else {
continue;
};

if expires_at > tokio::time::Instant::now() {
tokio::time::sleep_until(expires_at).await;
next_wakeup = Some(expires_at);
continue;
} else {
next_wakeup = None;
}

let mut batch = inner.active_batch.write().await;
Expand Down

0 comments on commit 49aadf4

Please sign in to comment.