Skip to content

Commit

Permalink
Troy/fix dataloader (#223)
Browse files Browse the repository at this point in the history
Fixes a bug found when deploying the dataloader that slipped through the
unit tests.
  • Loading branch information
TroyKomodo authored Dec 14, 2024
2 parents 96a6ddb + 949db92 commit c0580a6
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ scuffle-signal = { path = "crates/signal", version = "0.0.2" }
scuffle-http = { path = "crates/http", version = "0.0.4" }
scuffle-metrics = { path = "crates/metrics", version = "0.0.4" }
scuffle-pprof = { path = "crates/pprof", version = "0.0.2" }
scuffle-batching = { path = "crates/batching", version = "0.0.3" }
scuffle-batching = { path = "crates/batching", version = "0.0.4" }
scuffle-postcompile = { path = "crates/postcompile", version = "0.0.5" }
scuffle-ffmpeg = { path = "crates/ffmpeg", version = "0.0.2" }
scuffle-h3-webtransport = { path = "crates/h3-webtransport", version = "0.0.2" }
Expand Down
2 changes: 1 addition & 1 deletion crates/batching/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "scuffle-batching"
version = "0.0.3"
version = "0.0.4"
edition = "2021"
repository = "https://github.com/scufflecloud/scuffle"
authors = ["Scuffle <[email protected]>"]
Expand Down
23 changes: 22 additions & 1 deletion crates/batching/src/dataloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ where
let mut count = 0;

{
let mut new_batch = false;
let mut new_batch = true;
let mut batch = self.current_batch.lock().await;

for item in items {
Expand Down Expand Up @@ -555,4 +555,25 @@ mod tests {
assert!(start.elapsed() >= std::time::Duration::from_millis(5));
assert!(start.elapsed() < std::time::Duration::from_millis(20));
}

#[tokio::test]
async fn already_batch() {
let requests = Arc::new(AtomicUsize::new(0));

let fetcher = TestFetcher {
values: HashMap::from_iter(vec![("a", 1), ("b", 2), ("c", 3)]),
delay: std::time::Duration::from_millis(5),
requests: requests.clone(),
capacity: 2,
};

let loader = DataLoader::builder().batch_size(10).concurrency(1).build(fetcher);

let start = std::time::Instant::now();
let (a, b) = tokio::join!(loader.load("a"), loader.load("b"));
assert_eq!(a, Ok(Some(1)));
assert_eq!(b, Ok(Some(2)));
assert!(start.elapsed() < std::time::Duration::from_millis(15));
assert_eq!(requests.load(std::sync::atomic::Ordering::Relaxed), 1);
}
}

0 comments on commit c0580a6

Please sign in to comment.