Skip to content

Commit

Permalink
fmt code
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo committed Dec 8, 2024
1 parent 4c4c018 commit cdaeac9
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 30 deletions.
93 changes: 78 additions & 15 deletions crates/batching/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ where
#[cfg_attr(all(coverage_nightly, test), coverage(off))]
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::atomic::AtomicUsize};
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;

use super::*;

Expand Down Expand Up @@ -376,7 +377,9 @@ mod tests {
let loader = Batcher::builder().batch_size(2).concurrency(10).build(fetcher);

let start = std::time::Instant::now();
let ab = loader.execute_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]).await;
let ab = loader
.execute_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"])
.await;
assert_eq!(ab, vec![Some(1), Some(2), Some(3), None, None, None, None, None, None, None]);
assert!(start.elapsed() < std::time::Duration::from_millis(15));
assert_eq!(requests.load(std::sync::atomic::Ordering::Relaxed), 5);
Expand All @@ -393,10 +396,16 @@ mod tests {
capacity: 2,
};

let loader = Batcher::builder().batch_size(2).concurrency(1).delay(std::time::Duration::from_millis(10)).build(fetcher);
let loader = Batcher::builder()
.batch_size(2)
.concurrency(1)
.delay(std::time::Duration::from_millis(10))
.build(fetcher);

let start = std::time::Instant::now();
let ab = loader.execute_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]).await;
let ab = loader
.execute_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"])
.await;
assert_eq!(ab, vec![Some(1), Some(2), Some(3), None, None, None, None, None, None, None]);
assert!(start.elapsed() < std::time::Duration::from_millis(35));
assert!(start.elapsed() >= std::time::Duration::from_millis(25));
Expand All @@ -414,10 +423,16 @@ mod tests {
capacity: 100,
};

let loader = BatcherBuilder::default().batch_size(100).concurrency(1).delay(std::time::Duration::from_millis(10)).build(fetcher);
let loader = BatcherBuilder::default()
.batch_size(100)
.concurrency(1)
.delay(std::time::Duration::from_millis(10))
.build(fetcher);

let start = std::time::Instant::now();
let ab = loader.execute_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]).await;
let ab = loader
.execute_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"])
.await;
assert_eq!(ab, vec![Some(1), Some(2), Some(3), None, None, None, None, None, None, None]);
assert!(start.elapsed() >= std::time::Duration::from_millis(10));
assert_eq!(requests.load(std::sync::atomic::Ordering::Relaxed), 1);
Expand All @@ -434,7 +449,11 @@ mod tests {
capacity: 100,
};

let loader = BatcherBuilder::default().batch_size(100).concurrency(10).delay(std::time::Duration::from_millis(10)).build(fetcher);
let loader = BatcherBuilder::default()
.batch_size(100)
.concurrency(10)
.delay(std::time::Duration::from_millis(10))
.build(fetcher);

let start = std::time::Instant::now();
let ab = loader.execute_many(0..1134).await;
Expand All @@ -455,12 +474,18 @@ mod tests {
capacity: 2,
};

let loader = BatcherBuilder::default().batch_size(2).concurrency(100).delay(std::time::Duration::from_millis(10)).build(fetcher);
let loader = BatcherBuilder::default()
.batch_size(2)
.concurrency(100)
.delay(std::time::Duration::from_millis(10))
.build(fetcher);

tokio::time::sleep(std::time::Duration::from_millis(20)).await;

let start = std::time::Instant::now();
let ab = loader.execute_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]).await;
let ab = loader
.execute_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"])
.await;
assert_eq!(ab, vec![Some(1), Some(2), Some(3), None, None, None, None, None, None, None]);
assert!(start.elapsed() >= std::time::Duration::from_millis(5));
assert!(start.elapsed() < std::time::Duration::from_millis(25));
Expand All @@ -477,7 +502,11 @@ mod tests {
capacity: 2,
};

let loader = BatcherBuilder::default().batch_size(2).concurrency(100).delay(std::time::Duration::from_millis(10)).build(fetcher);
let loader = BatcherBuilder::default()
.batch_size(2)
.concurrency(100)
.delay(std::time::Duration::from_millis(10))
.build(fetcher);

tokio::time::sleep(std::time::Duration::from_millis(5)).await;

Expand All @@ -499,7 +528,11 @@ mod tests {
capacity: 4,
};

let loader = BatcherBuilder::default().batch_size(4).concurrency(1).delay(std::time::Duration::from_millis(10)).build(fetcher);
let loader = BatcherBuilder::default()
.batch_size(4)
.concurrency(1)
.delay(std::time::Duration::from_millis(10))
.build(fetcher);

let start = std::time::Instant::now();
let ab = loader.execute_many(vec!["a", "a", "b", "b", "c", "c"]).await;
Expand Down Expand Up @@ -530,11 +563,26 @@ mod tests {
}
}

let loader = BatcherBuilder::default().batch_size(4).concurrency(1).delay(std::time::Duration::from_millis(10)).build(TestExecutor(requests.clone()));
let loader = BatcherBuilder::default()
.batch_size(4)
.concurrency(1)
.delay(std::time::Duration::from_millis(10))
.build(TestExecutor(requests.clone()));

let start = std::time::Instant::now();
let ab = loader.execute_many(vec!["1", "1", "2", "2", "3", "3", "hello"]).await;
assert_eq!(ab, vec![Some(Ok(1)), Some(Ok(1)), Some(Ok(2)), Some(Ok(2)), Some(Ok(3)), Some(Ok(3)), Some(Err(()))]);
assert_eq!(
ab,
vec![
Some(Ok(1)),
Some(Ok(1)),
Some(Ok(2)),
Some(Ok(2)),
Some(Ok(3)),
Some(Ok(3)),
Some(Err(()))
]
);
assert_eq!(requests.load(std::sync::atomic::Ordering::Relaxed), 2);
assert!(start.elapsed() >= std::time::Duration::from_millis(5));
assert!(start.elapsed() < std::time::Duration::from_millis(20));
Expand All @@ -561,11 +609,26 @@ mod tests {
}
}

let loader = BatcherBuilder::default().batch_size(4).concurrency(1).delay(std::time::Duration::from_millis(10)).build(TestExecutor(requests.clone()));
let loader = BatcherBuilder::default()
.batch_size(4)
.concurrency(1)
.delay(std::time::Duration::from_millis(10))
.build(TestExecutor(requests.clone()));

let start = std::time::Instant::now();
let ab = loader.execute_many(vec!["1", "1", "2", "2", "3", "3", "hello"]).await;
assert_eq!(ab, vec![Some(Some(1)), Some(Some(1)), Some(Some(2)), Some(Some(2)), Some(Some(3)), Some(Some(3)), Some(None)]);
assert_eq!(
ab,
vec![
Some(Some(1)),
Some(Some(1)),
Some(Some(2)),
Some(Some(2)),
Some(Some(3)),
Some(Some(3)),
Some(None)
]
);
assert_eq!(requests.load(std::sync::atomic::Ordering::Relaxed), 2);
assert!(start.elapsed() >= std::time::Duration::from_millis(5));
assert!(start.elapsed() < std::time::Duration::from_millis(20));
Expand Down
70 changes: 55 additions & 15 deletions crates/batching/src/dataloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ where
let mut waiters = Vec::<BatchWaiting<E::Key, E::Value>>::new();

let mut count = 0;

{
let mut new_batch = false;
let mut batch = self.current_batch.lock().await;
Expand Down Expand Up @@ -319,10 +319,14 @@ mod tests {
assert!(keys.len() <= self.capacity);
tokio::time::sleep(self.delay).await;
self.requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Some(keys.into_iter().filter_map(|k| {
let value = self.values.get(&k)?.clone();
Some((k, value))
}).collect())
Some(
keys.into_iter()
.filter_map(|k| {
let value = self.values.get(&k)?.clone();
Some((k, value))
})
.collect(),
)
}
}

Expand Down Expand Up @@ -383,7 +387,10 @@ mod tests {
let loader = DataLoader::builder().batch_size(2).concurrency(10).build(fetcher);

let start = std::time::Instant::now();
let ab = loader.load_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]).await.unwrap();
let ab = loader
.load_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"])
.await
.unwrap();
assert_eq!(ab, HashMap::from_iter(vec![("a", 1), ("b", 2), ("c", 3)]));
assert!(start.elapsed() < std::time::Duration::from_millis(15));
assert_eq!(requests.load(std::sync::atomic::Ordering::Relaxed), 5);
Expand All @@ -400,10 +407,17 @@ mod tests {
capacity: 2,
};

let loader = DataLoader::builder().batch_size(2).concurrency(1).delay(std::time::Duration::from_millis(10)).build(fetcher);
let loader = DataLoader::builder()
.batch_size(2)
.concurrency(1)
.delay(std::time::Duration::from_millis(10))
.build(fetcher);

let start = std::time::Instant::now();
let ab = loader.load_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]).await.unwrap();
let ab = loader
.load_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"])
.await
.unwrap();
assert_eq!(ab, HashMap::from_iter(vec![("a", 1), ("b", 2), ("c", 3)]));
assert!(start.elapsed() < std::time::Duration::from_millis(35));
assert!(start.elapsed() >= std::time::Duration::from_millis(25));
Expand All @@ -421,10 +435,17 @@ mod tests {
capacity: 100,
};

let loader = DataLoaderBuilder::default().batch_size(100).concurrency(1).delay(std::time::Duration::from_millis(10)).build(fetcher);
let loader = DataLoaderBuilder::default()
.batch_size(100)
.concurrency(1)
.delay(std::time::Duration::from_millis(10))
.build(fetcher);

let start = std::time::Instant::now();
let ab = loader.load_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]).await.unwrap();
let ab = loader
.load_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"])
.await
.unwrap();
assert_eq!(ab, HashMap::from_iter(vec![("a", 1), ("b", 2), ("c", 3)]));
assert!(start.elapsed() >= std::time::Duration::from_millis(10));
assert_eq!(requests.load(std::sync::atomic::Ordering::Relaxed), 1);
Expand All @@ -441,7 +462,11 @@ mod tests {
capacity: 100,
};

let loader = DataLoaderBuilder::default().batch_size(100).concurrency(10).delay(std::time::Duration::from_millis(10)).build(fetcher);
let loader = DataLoaderBuilder::default()
.batch_size(100)
.concurrency(10)
.delay(std::time::Duration::from_millis(10))
.build(fetcher);

let start = std::time::Instant::now();
let ab = loader.load_many(0..1134).await.unwrap();
Expand All @@ -462,12 +487,19 @@ mod tests {
capacity: 2,
};

let loader = DataLoader::builder().batch_size(2).concurrency(100).delay(std::time::Duration::from_millis(10)).build(fetcher);
let loader = DataLoader::builder()
.batch_size(2)
.concurrency(100)
.delay(std::time::Duration::from_millis(10))
.build(fetcher);

tokio::time::sleep(std::time::Duration::from_millis(20)).await;

let start = std::time::Instant::now();
let ab = loader.load_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]).await.unwrap();
let ab = loader
.load_many(vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"])
.await
.unwrap();
assert_eq!(ab, HashMap::from_iter(vec![("a", 1), ("b", 2), ("c", 3)]));
assert!(start.elapsed() >= std::time::Duration::from_millis(5));
assert!(start.elapsed() < std::time::Duration::from_millis(25));
Expand All @@ -484,7 +516,11 @@ mod tests {
capacity: 2,
};

let loader = DataLoader::builder().batch_size(2).concurrency(100).delay(std::time::Duration::from_millis(10)).build(fetcher);
let loader = DataLoader::builder()
.batch_size(2)
.concurrency(100)
.delay(std::time::Duration::from_millis(10))
.build(fetcher);

tokio::time::sleep(std::time::Duration::from_millis(5)).await;

Expand All @@ -506,7 +542,11 @@ mod tests {
capacity: 4,
};

let loader = DataLoader::builder().batch_size(4).concurrency(1).delay(std::time::Duration::from_millis(10)).build(fetcher);
let loader = DataLoader::builder()
.batch_size(4)
.concurrency(1)
.delay(std::time::Duration::from_millis(10))
.build(fetcher);

let start = std::time::Instant::now();
let ab = loader.load_many(vec!["a", "a", "b", "b", "c", "c"]).await.unwrap();
Expand Down

0 comments on commit cdaeac9

Please sign in to comment.