From 25300fa5d173f6eba098b4f6e9a58f36d5eeb878 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sat, 16 Sep 2023 17:38:18 -0700 Subject: [PATCH] make storage and manifest private in collection --- src/core/src/collection.rs | 43 +++++++++- src/core/src/index/linear.rs | 85 ++++++-------------- src/core/src/index/revindex/disk_revindex.rs | 30 ++++--- src/core/src/index/revindex/mem_revindex.rs | 36 +++------ src/core/src/index/revindex/mod.rs | 4 +- 5 files changed, 94 insertions(+), 104 deletions(-) diff --git a/src/core/src/collection.rs b/src/core/src/collection.rs index 8f3b049313..eaed3fbffa 100644 --- a/src/core/src/collection.rs +++ b/src/core/src/collection.rs @@ -9,9 +9,12 @@ use crate::signature::Signature; use crate::storage::{FSStorage, InnerStorage, MemStorage, SigStore, Storage, ZipStorage}; use crate::Result; +#[cfg(feature = "parallel")] +use rayon::prelude::*; + pub struct Collection { - pub(crate) manifest: Manifest, - pub(crate) storage: InnerStorage, + manifest: Manifest, + storage: InnerStorage, } pub struct CollectionSet { @@ -60,6 +63,38 @@ impl CollectionSet { } impl Collection { + pub fn new(manifest: Manifest, storage: InnerStorage) -> Self { + Self { manifest, storage } + } + + pub fn iter(&self) -> impl Iterator { + self.manifest.iter().enumerate().map(|(i, r)| (i as Idx, r)) + } + + #[cfg(feature = "parallel")] + pub fn par_iter(&self) -> impl IndexedParallelIterator { + self.manifest + .par_iter() + .enumerate() + .map(|(i, r)| (i as Idx, r)) + } + + pub fn len(&self) -> usize { + self.manifest.len() + } + + pub fn is_empty(&self) -> bool { + self.manifest.len() == 0 + } + + pub fn manifest(&self) -> &Manifest { + &self.manifest + } + + pub fn storage(&self) -> &InnerStorage { + &self.storage + } + pub fn from_zipfile>(zipfile: P) -> Result { let storage = ZipStorage::from_file(zipfile)?; // Load manifest from standard location in zipstorage @@ -119,6 +154,10 @@ impl Collection { }) } + pub fn record_for_dataset(&self, dataset_id: Idx) -> Result<&Record> { + Ok(&self.manifest[dataset_id as usize]) + } + pub fn sig_for_dataset(&self, dataset_id: Idx) -> Result { let match_path = if self.manifest.is_empty() { "" diff --git a/src/core/src/index/linear.rs b/src/core/src/index/linear.rs index 1b4cd2f8ec..ff919b6f57 100644 --- a/src/core/src/index/linear.rs +++ b/src/core/src/index/linear.rs @@ -10,12 +10,11 @@ use rayon::prelude::*; use crate::collection::CollectionSet; use crate::encodings::Idx; use crate::index::{GatherResult, Index, Selection, SigCounter}; -use crate::manifest::Manifest; use crate::selection::Select; use crate::signature::{Signature, SigsTrait}; use crate::sketch::minhash::KmerMinHash; use crate::sketch::Sketch; -use crate::storage::{InnerStorage, SigStore, Storage}; +use crate::storage::SigStore; use crate::Result; pub struct LinearIndex { @@ -46,53 +45,32 @@ impl LinearIndex { } pub fn location(&self) -> Option { - if let Some(_storage) = &self.storage() { - // storage.path() - unimplemented!() - } else { - None - } - } - - pub fn storage(&self) -> Option { - Some(self.collection.storage.clone()) + unimplemented!() } pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter { let processed_sigs = AtomicUsize::new(0); - let search_sigs: Vec<_> = self - .collection - .manifest - .internal_locations() - .map(PathBuf::from) - .collect(); - let template = self.template(); #[cfg(feature = "parallel")] - let sig_iter = search_sigs.par_iter(); + let sig_iter = self.collection.par_iter(); #[cfg(not(feature = "parallel"))] - let sig_iter = search_sigs.iter(); + let sig_iter = self.collection.iter(); + + let counters = sig_iter.filter_map(|(dataset_id, record)| { + let filename = record.internal_location(); - let counters = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { let i = processed_sigs.fetch_add(1, Ordering::SeqCst); if i % 1000 == 0 { info!("Processed {} reference sigs", i); } - let search_sig = if let Some(storage) = &self.storage() { - let sig_data = storage - .load(filename.as_str()) - .unwrap_or_else(|_| panic!("error loading {:?}", filename)); - - Signature::from_reader(sig_data.as_slice()) - } else { - Signature::from_path(filename) - } - .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) - .swap_remove(0); + let search_sig = self + .collection + .sig_for_dataset(dataset_id) + .unwrap_or_else(|_| panic!("error loading {:?}", filename)); let mut search_mh = None; if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) { @@ -147,7 +125,8 @@ impl LinearIndex { for (dataset_id, size) in counter.most_common() { if size >= threshold { matches.push( - self.collection.manifest[dataset_id as usize] + self.collection + .record_for_dataset(dataset_id)? .internal_location() .to_string(), ); @@ -165,14 +144,11 @@ impl LinearIndex { query: &KmerMinHash, round: usize, ) -> Result { - let match_path = if self.collection.manifest.is_empty() { - "" - } else { - self.collection.manifest[dataset_id as usize] - .internal_location() - .as_str() - } - .into(); + let match_path = self + .collection + .record_for_dataset(dataset_id)? + .internal_location() + .into(); let match_sig = self.collection.sig_for_dataset(dataset_id)?; let result = self.stats_for_match(&match_sig, query, match_size, match_path, round)?; Ok(result) @@ -289,18 +265,8 @@ impl LinearIndex { Ok(matches) } - pub fn manifest(&self) -> Manifest { - self.collection.manifest.clone() - } - - pub fn set_manifest(&mut self, new_manifest: Manifest) -> Result<()> { - self.collection.manifest = new_manifest; - Ok(()) - } - pub fn signatures_iter(&self) -> impl Iterator + '_ { - // FIXME temp solution, must find better one! - (0..self.collection.manifest.len()).map(move |dataset_id| { + (0..self.collection.len()).map(move |dataset_id| { self.collection .sig_for_dataset(dataset_id as Idx) .expect("error loading sig") @@ -339,19 +305,16 @@ impl<'a> Index<'a> for LinearIndex { } fn len(&self) -> usize { - self.collection.manifest.len() + self.collection.len() } fn signatures(&self) -> Vec { self.collection() - .manifest - .internal_locations() - .map(PathBuf::from) - .map(|p| { + .iter() + .map(|(i, p)| { self.collection() - .storage - .load_sig(p.as_str()) - .unwrap_or_else(|_| panic!("Error processing {:?}", p)) + .sig_for_dataset(i as Idx) + .unwrap_or_else(|_| panic!("Error processing {}", p.internal_location())) }) .collect() } diff --git a/src/core/src/index/revindex/disk_revindex.rs b/src/core/src/index/revindex/disk_revindex.rs index 4c4064acc4..e8b440d745 100644 --- a/src/core/src/index/revindex/disk_revindex.rs +++ b/src/core/src/index/revindex/disk_revindex.rs @@ -80,19 +80,14 @@ impl RevIndex { collection: Arc::new(collection), }; - index - .collection - .manifest - .par_iter() - .enumerate() - .for_each(|(dataset_id, _)| { - let i = processed_sigs.fetch_add(1, Ordering::SeqCst); - if i % 1000 == 0 { - info!("Processed {} reference sigs", i); - } + index.collection.par_iter().for_each(|(dataset_id, _)| { + let i = processed_sigs.fetch_add(1, Ordering::SeqCst); + if i % 1000 == 0 { + info!("Processed {} reference sigs", i); + } - index.map_hashes_colors(dataset_id as Idx); - }); + index.map_hashes_colors(dataset_id as Idx); + }); index.save_collection().expect("Error saving collection"); @@ -143,7 +138,7 @@ impl RevIndex { InnerStorage::from_spec(spec)? }; - Collection { manifest, storage }.try_into() + Collection::new(manifest, storage).try_into() } fn save_collection(&self) -> Result<()> { @@ -152,12 +147,12 @@ impl RevIndex { // write manifest let mut wtr = vec![]; { - self.collection.manifest.to_writer(&mut wtr)?; + self.collection.manifest().to_writer(&mut wtr)?; } self.db.put_cf(&cf_metadata, MANIFEST, &wtr[..])?; // write storage spec - let spec = self.collection.storage.spec(); + let spec = self.collection.storage().spec(); // TODO: check if spec if memstorage, would probably have to // save into rocksdb in that case! @@ -269,7 +264,10 @@ impl RevIndexOps for RevIndex { .into_iter() .filter_map(|(dataset_id, size)| { if size >= threshold { - let row = &self.collection.manifest[dataset_id as usize]; + let row = &self + .collection + .record_for_dataset(dataset_id) + .expect("dataset not found"); Some((row.name().into(), size)) } else { None diff --git a/src/core/src/index/revindex/mem_revindex.rs b/src/core/src/index/revindex/mem_revindex.rs index b951d9513d..e3efb7146a 100644 --- a/src/core/src/index/revindex/mem_revindex.rs +++ b/src/core/src/index/revindex/mem_revindex.rs @@ -1,17 +1,14 @@ -use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use camino::Utf8Path as Path; use camino::Utf8PathBuf as PathBuf; use log::{debug, info}; -use nohash_hasher::BuildNoHashHasher; -use serde::{Deserialize, Serialize}; #[cfg(feature = "parallel")] use rayon::prelude::*; use crate::collection::Collection; -use crate::encodings::{Color, Colors, Idx}; +use crate::encodings::{Colors, Idx}; use crate::index::linear::LinearIndex; use crate::index::revindex::HashToColor; use crate::index::{GatherResult, Index, SigCounter}; @@ -19,8 +16,6 @@ use crate::prelude::*; use crate::signature::{Signature, SigsTrait}; use crate::sketch::minhash::KmerMinHash; use crate::sketch::Sketch; -use crate::storage::Storage; -use crate::HashIntoType; use crate::Result; pub struct RevIndex { @@ -38,20 +33,13 @@ impl LinearIndex { ) -> RevIndex { let processed_sigs = AtomicUsize::new(0); - let search_sigs: Vec<_> = self - .collection() - .manifest - .internal_locations() - .map(PathBuf::from) - .collect(); - #[cfg(feature = "parallel")] - let sig_iter = search_sigs.par_iter(); + let sig_iter = self.collection().par_iter(); #[cfg(not(feature = "parallel"))] - let sig_iter = search_sigs.iter(); + let sig_iter = self.collection().iter(); - let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, filename)| { + let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, _)| { let i = processed_sigs.fetch_add(1, Ordering::SeqCst); if i % 1000 == 0 { info!("Processed {} reference sigs", i); @@ -59,13 +47,12 @@ impl LinearIndex { let search_sig = self .collection() - .storage - .load_sig(filename.as_str()) - .unwrap_or_else(|_| panic!("Error processing {:?}", filename)) + .sig_for_dataset(dataset_id as Idx) + .expect("Error loading sig") .into(); RevIndex::map_hashes_colors( - dataset_id, + dataset_id as Idx, &search_sig, queries, &merged_query, @@ -160,7 +147,7 @@ impl RevIndex { } fn map_hashes_colors( - dataset_id: usize, + dataset_id: Idx, search_sig: &Signature, queries: Option<&[KmerMinHash]>, merged_query: &Option, @@ -275,8 +262,11 @@ impl RevIndex { let match_size = if size >= threshold { size } else { break }; let match_sig = self.linear.sig_for_dataset(dataset_id)?; - let match_path = - self.linear.collection().manifest[dataset_id as usize].internal_location(); + let match_path = self + .linear + .collection() + .record_for_dataset(dataset_id)? + .internal_location(); let mut match_mh = None; if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(self.linear.template()) { diff --git a/src/core/src/index/revindex/mod.rs b/src/core/src/index/revindex/mod.rs index 42a9837d13..ec7c6a00ad 100644 --- a/src/core/src/index/revindex/mod.rs +++ b/src/core/src/index/revindex/mod.rs @@ -98,11 +98,11 @@ impl HashToColor { self.0.is_empty() } - fn add_to(&mut self, colors: &mut Colors, dataset_id: usize, matched_hashes: Vec) { + fn add_to(&mut self, colors: &mut Colors, dataset_id: Idx, matched_hashes: Vec) { let mut color = None; matched_hashes.into_iter().for_each(|hash| { - color = Some(colors.update(color, &[dataset_id as Idx]).unwrap()); + color = Some(colors.update(color, &[dataset_id]).unwrap()); self.0.insert(hash, color.unwrap()); }); }