From 487e9d341f20e3b4f942959a0a155ae268536a4c Mon Sep 17 00:00:00 2001 From: Gregor Peach Date: Tue, 26 May 2020 12:42:59 -0700 Subject: [PATCH 1/3] Initial `rayon` support: ParallelExtend + FromParallelIterator This was a straightforward step towards #14 --- .gitignore | 8 ++- Cargo.toml | 1 + src/lib.rs | 3 ++ src/rayon_impls.rs | 119 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 src/rayon_impls.rs diff --git a/.gitignore b/.gitignore index ecb95b05..1be244ee 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,10 @@ +# Don't include build output /target -**/*.rs.bk +# We're a library--don't include our lockfile Cargo.lock +# Don't include rustfmt backup files +**/*.rs.bk + +# Don't include IDE configuration files +.idea **/.vscode/* diff --git a/Cargo.toml b/Cargo.toml index 77336482..f0c83922 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ sanitize = ['crossbeam-epoch/sanitize'] crossbeam-epoch = "0.8.2" parking_lot = "0.10" num_cpus = "1.12.0" +rayon = {version = "1.3", optional = true} serde = {version = "1.0.105", optional = true} [dependencies.ahash] diff --git a/src/lib.rs b/src/lib.rs index 528e2806..4d673e98 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -252,6 +252,9 @@ mod raw; mod set; mod set_ref; +#[cfg(feature = "rayon")] +mod rayon_impls; + #[cfg(feature = "serde")] mod serde_impls; diff --git a/src/rayon_impls.rs b/src/rayon_impls.rs new file mode 100644 index 00000000..0d45e762 --- /dev/null +++ b/src/rayon_impls.rs @@ -0,0 +1,119 @@ +use crate::HashMap; +use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator}; +use std::hash::{BuildHasher, Hash}; + +impl ParallelExtend<(K, V)> for HashMap +where + K: Clone + Hash + Ord + Send + Sync + 'static, + V: Send + Sync + 'static, + S: BuildHasher + Sync, +{ + // This is of limited use due to the `&mut self` parameter. See `par_extend_sync` + fn par_extend(&mut self, par_iter: I) + where + I: IntoParallelIterator, + { + self.par_extend_sync(par_iter); + } +} + +impl HashMap +where + K: Clone + Hash + Ord + Send + Sync + 'static, + V: Send + Sync + 'static, + S: BuildHasher + Sync, +{ + // FIXME: Terrible name, just didn't want to shadow the rayon name + fn par_extend_sync(&self, par_iter: I) + where + I: IntoParallelIterator, + { + par_iter.into_par_iter().for_each(|(k, v)| { + // Unfortunate that we need to create a guard for each insert operation + // Ideally we'd create a guard for each `rayon` worker thread instead + // Perhaps this could be done with a thread local? + let guard = self.guard(); + self.insert(k, v, &guard); + }); + } +} + +impl FromParallelIterator<(K, V)> for HashMap +where + K: Clone + Hash + Ord + Send + Sync + 'static, + V: Send + Sync + 'static, +{ + fn from_par_iter(par_iter: I) -> Self + where + I: IntoParallelIterator, + { + let mut created_map = HashMap::new(); + created_map.par_extend(par_iter); + created_map + } +} + +#[cfg(test)] +mod test { + use crate::HashMap; + use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelExtend}; + + #[test] + fn parallel_extend_by_nothing() { + let to_extend_with = Vec::new(); + + let mut map = HashMap::new(); + let guard = map.guard(); + map.insert(1, 2, &guard); + map.insert(3, 4, &guard); + + map.par_extend(to_extend_with.into_par_iter()); + + assert_eq!(map.len(), 2); + + assert_eq!(map.get(&1, &guard), Some(&2)); + assert_eq!(map.get(&3, &guard), Some(&4)); + } + + #[test] + fn parallel_extend_by_a_bunch() { + let mut to_extend_with = Vec::new(); + for i in 0..100 { + to_extend_with.push((i + 100, i * 10)); + } + + let mut map = HashMap::new(); + let guard = map.guard(); + map.insert(1, 2, &guard); + map.insert(3, 4, &guard); + + map.par_extend(to_extend_with.into_par_iter()); + assert_eq!(map.len(), 102); + + assert_eq!(map.get(&1, &guard), Some(&2)); + assert_eq!(map.get(&3, &guard), Some(&4)); + assert_eq!(map.get(&100, &guard), Some(&0)); + assert_eq!(map.get(&199, &guard), Some(&990)); + } + + #[test] + fn from_empty_parallel_iter() { + let to_create_from: Vec<(i32, i32)> = Vec::new(); + let created_map: HashMap = HashMap::from_par_iter(to_create_from.into_par_iter()); + assert_eq!(created_map.len(), 0); + } + + #[test] + fn from_large_parallel_iter() { + let mut to_create_from: Vec<(i32, i32)> = Vec::new(); + for i in 0..100 { + to_create_from.push((i + 100, i * 10)); + } + let created_map: HashMap = HashMap::from_par_iter(to_create_from.into_par_iter()); + assert_eq!(created_map.len(), 100); + + let guard = created_map.guard(); + assert_eq!(created_map.get(&100, &guard), Some(&0)); + assert_eq!(created_map.get(&199, &guard), Some(&990)); + } +} From cd9b5ddd67fc5f13f38a6162d5a36cb7a924d0b3 Mon Sep 17 00:00:00 2001 From: Gregor Peach Date: Tue, 26 May 2020 19:26:54 -0700 Subject: [PATCH 2/3] Improve rayon_impls, run rayon tests through sanitizers --- azure-pipelines.yml | 4 ++-- src/rayon_impls.rs | 26 ++++++++++++-------------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 792290f9..47d51fc3 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -73,7 +73,7 @@ jobs: displayName: Enable debug symbols # only --lib --tests b/c of https://github.com/rust-lang/rust/issues/53945 - script: | - env ASAN_OPTIONS="detect_odr_violation=0" RUSTFLAGS="-Z sanitizer=address" cargo test --lib --tests --features sanitize --target x86_64-unknown-linux-gnu + env ASAN_OPTIONS="detect_odr_violation=0" RUSTFLAGS="-Z sanitizer=address" cargo test --lib --tests --all-features --target x86_64-unknown-linux-gnu displayName: cargo -Z sanitizer=address test - job: lsan dependsOn: deny @@ -91,7 +91,7 @@ jobs: cat Cargo.toml displayName: Enable debug symbols - script: | - env RUSTFLAGS="-Z sanitizer=leak" cargo test --features sanitize --target x86_64-unknown-linux-gnu + env RUSTFLAGS="-Z sanitizer=leak" cargo test --all-features --target x86_64-unknown-linux-gnu displayName: cargo -Z sanitizer=leak test - template: coverage.yml@templates parameters: diff --git a/src/rayon_impls.rs b/src/rayon_impls.rs index 0d45e762..bc84682d 100644 --- a/src/rayon_impls.rs +++ b/src/rayon_impls.rs @@ -8,46 +8,44 @@ where V: Send + Sync + 'static, S: BuildHasher + Sync, { - // This is of limited use due to the `&mut self` parameter. See `par_extend_sync` fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { - self.par_extend_sync(par_iter); + (&*self).par_extend(par_iter); } } -impl HashMap +impl ParallelExtend<(K, V)> for &HashMap where K: Clone + Hash + Ord + Send + Sync + 'static, V: Send + Sync + 'static, S: BuildHasher + Sync, { - // FIXME: Terrible name, just didn't want to shadow the rayon name - fn par_extend_sync(&self, par_iter: I) + fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { - par_iter.into_par_iter().for_each(|(k, v)| { - // Unfortunate that we need to create a guard for each insert operation - // Ideally we'd create a guard for each `rayon` worker thread instead - // Perhaps this could be done with a thread local? - let guard = self.guard(); - self.insert(k, v, &guard); - }); + par_iter.into_par_iter().for_each_init( + || self.guard(), + |guard, (k, v)| { + self.insert(k, v, &guard); + }, + ); } } -impl FromParallelIterator<(K, V)> for HashMap +impl FromParallelIterator<(K, V)> for HashMap where K: Clone + Hash + Ord + Send + Sync + 'static, V: Send + Sync + 'static, + S: BuildHasher + Default + Sync, { fn from_par_iter(par_iter: I) -> Self where I: IntoParallelIterator, { - let mut created_map = HashMap::new(); + let mut created_map = HashMap::with_hasher(S::default()); created_map.par_extend(par_iter); created_map } From 209c6e3b015076d123c634e1a6624f5e62f6e18e Mon Sep 17 00:00:00 2001 From: Gregor Peach Date: Sun, 14 Jun 2020 18:11:50 -0700 Subject: [PATCH 3/3] Add more rayon implementations, including for HashSet --- src/map_ref.rs | 2 +- src/rayon_impls.rs | 245 +++++++++++++++++++++++++++++++++++++++++---- src/set.rs | 2 +- src/set_ref.rs | 2 +- 4 files changed, 227 insertions(+), 24 deletions(-) diff --git a/src/map_ref.rs b/src/map_ref.rs index 673f35a6..ef9b429f 100644 --- a/src/map_ref.rs +++ b/src/map_ref.rs @@ -11,7 +11,7 @@ use std::ops::Index; /// The current thread will be pinned for the duration of this reference. /// Keep in mind that this prevents the collection of garbage generated by the map. pub struct HashMapRef<'map, K, V, S = crate::DefaultHashBuilder> { - map: &'map HashMap, + pub(crate) map: &'map HashMap, guard: GuardRef<'map>, } diff --git a/src/rayon_impls.rs b/src/rayon_impls.rs index bc84682d..609cfb48 100644 --- a/src/rayon_impls.rs +++ b/src/rayon_impls.rs @@ -1,7 +1,23 @@ -use crate::HashMap; +use crate::{HashMap, HashMapRef, HashSet, HashSetRef}; use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator}; use std::hash::{BuildHasher, Hash}; +impl FromParallelIterator<(K, V)> for HashMap +where + K: Clone + Hash + Ord + Send + Sync + 'static, + V: Send + Sync + 'static, + S: BuildHasher + Default + Sync, +{ + fn from_par_iter(par_iter: I) -> Self + where + I: IntoParallelIterator, + { + let mut created_map = HashMap::with_hasher(S::default()); + created_map.par_extend(par_iter); + created_map + } +} + impl ParallelExtend<(K, V)> for HashMap where K: Clone + Hash + Ord + Send + Sync + 'static, @@ -35,29 +51,125 @@ where } } -impl FromParallelIterator<(K, V)> for HashMap +impl<'map, K, V, S> ParallelExtend<(K, V)> for HashMapRef<'map, K, V, S> where K: Clone + Hash + Ord + Send + Sync + 'static, V: Send + Sync + 'static, + S: BuildHasher + Sync, +{ + fn par_extend(&mut self, par_iter: I) + where + I: IntoParallelIterator, + { + self.map.par_extend(par_iter); + } +} + +impl FromParallelIterator for HashSet +where + K: Clone + Hash + Ord + Send + Sync + 'static, S: BuildHasher + Default + Sync, { fn from_par_iter(par_iter: I) -> Self where - I: IntoParallelIterator, + I: IntoParallelIterator, { - let mut created_map = HashMap::with_hasher(S::default()); - created_map.par_extend(par_iter); - created_map + let mut created_set = HashSet::with_hasher(S::default()); + created_set.par_extend(par_iter); + created_set + } +} + +impl ParallelExtend for HashSet +where + K: Clone + Hash + Ord + Send + Sync + 'static, + S: BuildHasher + Sync, +{ + fn par_extend(&mut self, par_iter: I) + where + I: IntoParallelIterator, + { + (&*self).par_extend(par_iter); + } +} + +impl ParallelExtend for &HashSet +where + K: Clone + Hash + Ord + Send + Sync + 'static, + S: BuildHasher + Sync, +{ + fn par_extend(&mut self, par_iter: I) + where + I: IntoParallelIterator, + { + let tuple_iter = par_iter.into_par_iter().map(|k| (k, ())); + (&self.map).par_extend(tuple_iter); + } +} + +impl<'set, K, S> ParallelExtend for HashSetRef<'set, K, S> +where + K: Clone + Hash + Ord + Send + Sync + 'static, + S: BuildHasher + Sync, +{ + fn par_extend(&mut self, par_iter: I) + where + I: IntoParallelIterator, + { + self.set.par_extend(par_iter); } } #[cfg(test)] mod test { - use crate::HashMap; + use crate::{HashMap, HashSet}; use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelExtend}; #[test] - fn parallel_extend_by_nothing() { + fn hm_from_empty_parallel_iter() { + let to_create_from: Vec<(i32, i32)> = Vec::new(); + let created_map: HashMap = HashMap::from_par_iter(to_create_from.into_par_iter()); + assert_eq!(created_map.len(), 0); + } + + #[test] + fn hm_from_large_parallel_iter() { + let mut to_create_from: Vec<(i32, i32)> = Vec::new(); + for i in 0..100 { + to_create_from.push((i + 100, i * 10)); + } + let created_map: HashMap = HashMap::from_par_iter(to_create_from.into_par_iter()); + assert_eq!(created_map.len(), 100); + + let guard = created_map.guard(); + assert_eq!(created_map.get(&100, &guard), Some(&0)); + assert_eq!(created_map.get(&199, &guard), Some(&990)); + } + + #[test] + fn hs_from_empty_parallel_iter() { + let to_create_from: Vec = Vec::new(); + let created_set: HashSet = HashSet::from_par_iter(to_create_from.into_par_iter()); + assert_eq!(created_set.len(), 0); + } + + #[test] + fn hs_from_large_parallel_iter() { + let mut to_create_from: Vec<(i32, i32)> = Vec::new(); + for i in 0..100 { + to_create_from.push((i + 100, i * 10)); + } + let created_map: HashSet<(i32, i32)> = + HashSet::from_par_iter(to_create_from.into_par_iter()); + assert_eq!(created_map.len(), 100); + + let guard = created_map.guard(); + assert!(created_map.contains(&(100, 0), &guard)); + assert!(!created_map.contains(&(100, 10000), &guard)); + } + + #[test] + fn hm_parallel_extend_by_nothing() { let to_extend_with = Vec::new(); let mut map = HashMap::new(); @@ -74,7 +186,7 @@ mod test { } #[test] - fn parallel_extend_by_a_bunch() { + fn hm_parallel_extend_by_a_bunch() { let mut to_extend_with = Vec::new(); for i in 0..100 { to_extend_with.push((i + 100, i * 10)); @@ -95,23 +207,114 @@ mod test { } #[test] - fn from_empty_parallel_iter() { - let to_create_from: Vec<(i32, i32)> = Vec::new(); - let created_map: HashMap = HashMap::from_par_iter(to_create_from.into_par_iter()); - assert_eq!(created_map.len(), 0); + fn hm_ref_parallel_extend_by_nothing() { + let to_extend_with = Vec::new(); + + let map = HashMap::new(); + let guard = map.guard(); + map.insert(1, 2, &guard); + map.insert(3, 4, &guard); + + map.pin().par_extend(to_extend_with.into_par_iter()); + + assert_eq!(map.len(), 2); + + assert_eq!(map.get(&1, &guard), Some(&2)); + assert_eq!(map.get(&3, &guard), Some(&4)); } #[test] - fn from_large_parallel_iter() { - let mut to_create_from: Vec<(i32, i32)> = Vec::new(); + fn hm_ref_parallel_extend_by_a_bunch() { + let mut to_extend_with = Vec::new(); for i in 0..100 { - to_create_from.push((i + 100, i * 10)); + to_extend_with.push((i + 100, i * 10)); } - let created_map: HashMap = HashMap::from_par_iter(to_create_from.into_par_iter()); - assert_eq!(created_map.len(), 100); - let guard = created_map.guard(); - assert_eq!(created_map.get(&100, &guard), Some(&0)); - assert_eq!(created_map.get(&199, &guard), Some(&990)); + let map = HashMap::new(); + let guard = map.guard(); + map.insert(1, 2, &guard); + map.insert(3, 4, &guard); + + map.pin().par_extend(to_extend_with.into_par_iter()); + assert_eq!(map.len(), 102); + + assert_eq!(map.get(&1, &guard), Some(&2)); + assert_eq!(map.get(&3, &guard), Some(&4)); + assert_eq!(map.get(&100, &guard), Some(&0)); + assert_eq!(map.get(&199, &guard), Some(&990)); + } + + #[test] + fn hs_parallel_extend_by_nothing() { + let to_extend_with = Vec::new(); + + let mut set = HashSet::new(); + let guard = set.guard(); + set.insert(1, &guard); + set.insert(3, &guard); + + set.par_extend(to_extend_with.into_par_iter()); + + assert_eq!(set.len(), 2); + + assert!(set.contains(&1, &guard)); + assert!(!set.contains(&17, &guard)); + } + + #[test] + fn hs_parallel_extend_by_a_bunch() { + let mut to_extend_with = Vec::new(); + for i in 0..100 { + to_extend_with.push((i + 100, i * 10)); + } + + let mut set = HashSet::new(); + let guard = set.guard(); + set.insert((1, 2), &guard); + set.insert((3, 4), &guard); + + set.par_extend(to_extend_with.into_par_iter()); + assert_eq!(set.len(), 102); + + assert!(set.contains(&(1, 2), &guard)); + assert!(set.contains(&(199, 990), &guard)); + assert!(!set.contains(&(199, 167), &guard)); + } + + #[test] + fn hs_ref_parallel_extend_by_nothing() { + let to_extend_with = Vec::new(); + + let mut set = HashSet::new(); + let guard = set.guard(); + set.insert((1, 2), &guard); + set.insert((3, 4), &guard); + + set.par_extend(to_extend_with.into_par_iter()); + assert_eq!(set.len(), 2); + + assert!(set.contains(&(1, 2), &guard)); + assert!(!set.contains(&(199, 990), &guard)); + assert!(!set.contains(&(199, 167), &guard)); + } + + #[test] + fn hs_ref_parallel_extend_by_a_bunch() { + let mut to_extend_with = Vec::new(); + for i in 0..100 { + to_extend_with.push((i + 100, i * 10)); + } + + let set = HashSet::new(); + let mut set_ref = set.pin(); + set_ref.insert((1, 2)); + set_ref.insert((3, 4)); + + set_ref.par_extend(to_extend_with.into_par_iter()); + assert_eq!(set.len(), 102); + + assert!(set_ref.contains(&(1, 2))); + assert!(set_ref.contains(&(199, 990))); + assert!(!set_ref.contains(&(199, 167))); } } diff --git a/src/set.rs b/src/set.rs index 8468b584..c1c5906e 100644 --- a/src/set.rs +++ b/src/set.rs @@ -41,7 +41,7 @@ use std::iter::FromIterator; /// } /// ``` pub struct HashSet { - map: HashMap, + pub(crate) map: HashMap, } impl HashSet { diff --git a/src/set_ref.rs b/src/set_ref.rs index 985a9f49..f0aa872d 100644 --- a/src/set_ref.rs +++ b/src/set_ref.rs @@ -10,7 +10,7 @@ use std::hash::{BuildHasher, Hash}; /// The current thread will be pinned for the duration of this reference. /// Keep in mind that this prevents the collection of garbage generated by the set. pub struct HashSetRef<'set, T, S = crate::DefaultHashBuilder> { - set: &'set HashSet, + pub(crate) set: &'set HashSet, guard: GuardRef<'set>, }