Skip to content

Commit

Permalink
fix send bound
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo committed Oct 20, 2024
1 parent b44abd8 commit 04a613d
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion foundations/src/batcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ impl<T: BatchOperation + 'static + Send + Sync> BatcherInner<T> {
let mut waiters = vec![];
let mut batch = self.active_batch.write().await;
let max_documents = self.max_batch_size.load(std::sync::atomic::Ordering::Relaxed);
let mut batches = vec![];

for document in T::Mode::filter_item_iter(documents) {
if batch
Expand All @@ -338,7 +339,7 @@ impl<T: BatchOperation + 'static + Send + Sync> BatcherInner<T> {
.unwrap_or(true)
{
if let Some(b) = batch.take() {
self.queued_batches.send(b).await.ok();
batches.push(b);
}

*batch = Some(self.new_batch());
Expand All @@ -362,6 +363,10 @@ impl<T: BatchOperation + 'static + Send + Sync> BatcherInner<T> {
T::Mode::input_add(&mut b.ops, tracker, document);
}

for batch in batches {
self.queued_batches.send(batch).await.ok();
}

waiters
}
}
Expand Down

0 comments on commit 04a613d

Please sign in to comment.