Skip to content

Commit

Permalink
make storage and manifest private in collection
Browse files Browse the repository at this point in the history
  • Loading branch information
luizirber committed Sep 17, 2023
1 parent 84ce2bf commit 25300fa
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 104 deletions.
43 changes: 41 additions & 2 deletions src/core/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ use crate::signature::Signature;
use crate::storage::{FSStorage, InnerStorage, MemStorage, SigStore, Storage, ZipStorage};
use crate::Result;

#[cfg(feature = "parallel")]
use rayon::prelude::*;

pub struct Collection {
pub(crate) manifest: Manifest,
pub(crate) storage: InnerStorage,
manifest: Manifest,
storage: InnerStorage,
}

pub struct CollectionSet {
Expand Down Expand Up @@ -60,6 +63,38 @@ impl CollectionSet {
}

impl Collection {
pub fn new(manifest: Manifest, storage: InnerStorage) -> Self {

Check warning on line 66 in src/core/src/collection.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/collection.rs#L66

Added line #L66 was not covered by tests
Self { manifest, storage }
}

pub fn iter(&self) -> impl Iterator<Item = (Idx, &Record)> {
self.manifest.iter().enumerate().map(|(i, r)| (i as Idx, r))

Check warning on line 71 in src/core/src/collection.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/collection.rs#L70-L71

Added lines #L70 - L71 were not covered by tests
}

#[cfg(feature = "parallel")]
pub fn par_iter(&self) -> impl IndexedParallelIterator<Item = (Idx, &Record)> {

Check warning on line 75 in src/core/src/collection.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/collection.rs#L75

Added line #L75 was not covered by tests
self.manifest
.par_iter()
.enumerate()
.map(|(i, r)| (i as Idx, r))

Check warning on line 79 in src/core/src/collection.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/collection.rs#L79

Added line #L79 was not covered by tests
}

pub fn len(&self) -> usize {

Check warning on line 82 in src/core/src/collection.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/collection.rs#L82

Added line #L82 was not covered by tests
self.manifest.len()
}

pub fn is_empty(&self) -> bool {
self.manifest.len() == 0

Check warning on line 87 in src/core/src/collection.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/collection.rs#L86-L87

Added lines #L86 - L87 were not covered by tests
}

pub fn manifest(&self) -> &Manifest {

Check warning on line 90 in src/core/src/collection.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/collection.rs#L90

Added line #L90 was not covered by tests
&self.manifest
}

pub fn storage(&self) -> &InnerStorage {
&self.storage

Check warning on line 95 in src/core/src/collection.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/collection.rs#L94-L95

Added lines #L94 - L95 were not covered by tests
}

pub fn from_zipfile<P: AsRef<Path>>(zipfile: P) -> Result<Self> {
let storage = ZipStorage::from_file(zipfile)?;
// Load manifest from standard location in zipstorage
Expand Down Expand Up @@ -119,6 +154,10 @@ impl Collection {
})
}

pub fn record_for_dataset(&self, dataset_id: Idx) -> Result<&Record> {

Check warning on line 157 in src/core/src/collection.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/collection.rs#L157

Added line #L157 was not covered by tests
Ok(&self.manifest[dataset_id as usize])
}

pub fn sig_for_dataset(&self, dataset_id: Idx) -> Result<SigStore> {
let match_path = if self.manifest.is_empty() {
""
Expand Down
85 changes: 24 additions & 61 deletions src/core/src/index/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ use rayon::prelude::*;
use crate::collection::CollectionSet;
use crate::encodings::Idx;
use crate::index::{GatherResult, Index, Selection, SigCounter};
use crate::manifest::Manifest;
use crate::selection::Select;
use crate::signature::{Signature, SigsTrait};
use crate::sketch::minhash::KmerMinHash;
use crate::sketch::Sketch;
use crate::storage::{InnerStorage, SigStore, Storage};
use crate::storage::SigStore;
use crate::Result;

pub struct LinearIndex {
Expand Down Expand Up @@ -46,53 +45,32 @@ impl LinearIndex {
}

pub fn location(&self) -> Option<String> {

Check warning on line 47 in src/core/src/index/linear.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/linear.rs#L47

Added line #L47 was not covered by tests
if let Some(_storage) = &self.storage() {
// storage.path()
unimplemented!()
} else {
None
}
}

pub fn storage(&self) -> Option<InnerStorage> {
Some(self.collection.storage.clone())
unimplemented!()
}

pub fn counter_for_query(&self, query: &KmerMinHash) -> SigCounter {

Check warning on line 51 in src/core/src/index/linear.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/linear.rs#L51

Added line #L51 was not covered by tests
let processed_sigs = AtomicUsize::new(0);

let search_sigs: Vec<_> = self
.collection
.manifest
.internal_locations()
.map(PathBuf::from)
.collect();

let template = self.template();

#[cfg(feature = "parallel")]
let sig_iter = search_sigs.par_iter();
let sig_iter = self.collection.par_iter();

#[cfg(not(feature = "parallel"))]
let sig_iter = search_sigs.iter();
let sig_iter = self.collection.iter();

let counters = sig_iter.filter_map(|(dataset_id, record)| {
let filename = record.internal_location();

let counters = sig_iter.enumerate().filter_map(|(dataset_id, filename)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}

let search_sig = if let Some(storage) = &self.storage() {
let sig_data = storage
.load(filename.as_str())
.unwrap_or_else(|_| panic!("error loading {:?}", filename));

Signature::from_reader(sig_data.as_slice())
} else {
Signature::from_path(filename)
}
.unwrap_or_else(|_| panic!("Error processing {:?}", filename))
.swap_remove(0);
let search_sig = self
.collection
.sig_for_dataset(dataset_id)
.unwrap_or_else(|_| panic!("error loading {:?}", filename));

Check warning on line 73 in src/core/src/index/linear.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/linear.rs#L73

Added line #L73 was not covered by tests

let mut search_mh = None;
if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) {
Expand Down Expand Up @@ -147,7 +125,8 @@ impl LinearIndex {
for (dataset_id, size) in counter.most_common() {
if size >= threshold {
matches.push(
self.collection.manifest[dataset_id as usize]
self.collection
.record_for_dataset(dataset_id)?
.internal_location()
.to_string(),
);
Expand All @@ -165,14 +144,11 @@ impl LinearIndex {
query: &KmerMinHash,
round: usize,
) -> Result<GatherResult> {
let match_path = if self.collection.manifest.is_empty() {
""
} else {
self.collection.manifest[dataset_id as usize]
.internal_location()
.as_str()
}
.into();
let match_path = self
.collection
.record_for_dataset(dataset_id)?
.internal_location()
.into();
let match_sig = self.collection.sig_for_dataset(dataset_id)?;
let result = self.stats_for_match(&match_sig, query, match_size, match_path, round)?;
Ok(result)
Expand Down Expand Up @@ -289,18 +265,8 @@ impl LinearIndex {
Ok(matches)
}

pub fn manifest(&self) -> Manifest {
self.collection.manifest.clone()
}

pub fn set_manifest(&mut self, new_manifest: Manifest) -> Result<()> {
self.collection.manifest = new_manifest;
Ok(())
}

pub fn signatures_iter(&self) -> impl Iterator<Item = SigStore> + '_ {

Check warning on line 268 in src/core/src/index/linear.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/linear.rs#L268

Added line #L268 was not covered by tests
// FIXME temp solution, must find better one!
(0..self.collection.manifest.len()).map(move |dataset_id| {
(0..self.collection.len()).map(move |dataset_id| {
self.collection
.sig_for_dataset(dataset_id as Idx)
.expect("error loading sig")
Expand Down Expand Up @@ -339,19 +305,16 @@ impl<'a> Index<'a> for LinearIndex {
}

fn len(&self) -> usize {
self.collection.manifest.len()
self.collection.len()

Check warning on line 308 in src/core/src/index/linear.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/linear.rs#L307-L308

Added lines #L307 - L308 were not covered by tests
}

fn signatures(&self) -> Vec<Self::Item> {
self.collection()

Check warning on line 312 in src/core/src/index/linear.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/linear.rs#L311-L312

Added lines #L311 - L312 were not covered by tests
.manifest
.internal_locations()
.map(PathBuf::from)
.map(|p| {
.iter()
.map(|(i, p)| {
self.collection()
.storage
.load_sig(p.as_str())
.unwrap_or_else(|_| panic!("Error processing {:?}", p))
.sig_for_dataset(i as Idx)
.unwrap_or_else(|_| panic!("Error processing {}", p.internal_location()))

Check warning on line 317 in src/core/src/index/linear.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/linear.rs#L314-L317

Added lines #L314 - L317 were not covered by tests
})
.collect()
}
Expand Down
30 changes: 14 additions & 16 deletions src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,14 @@ impl RevIndex {
collection: Arc::new(collection),
};

index
.collection
.manifest
.par_iter()
.enumerate()
.for_each(|(dataset_id, _)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}
index.collection.par_iter().for_each(|(dataset_id, _)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}

index.map_hashes_colors(dataset_id as Idx);
});
index.map_hashes_colors(dataset_id as Idx);
});

index.save_collection().expect("Error saving collection");

Expand Down Expand Up @@ -143,7 +138,7 @@ impl RevIndex {
InnerStorage::from_spec(spec)?
};

Collection { manifest, storage }.try_into()
Collection::new(manifest, storage).try_into()
}

fn save_collection(&self) -> Result<()> {
Expand All @@ -152,12 +147,12 @@ impl RevIndex {
// write manifest
let mut wtr = vec![];
{
self.collection.manifest.to_writer(&mut wtr)?;
self.collection.manifest().to_writer(&mut wtr)?;
}
self.db.put_cf(&cf_metadata, MANIFEST, &wtr[..])?;

// write storage spec
let spec = self.collection.storage.spec();
let spec = self.collection.storage().spec();

// TODO: check if spec if memstorage, would probably have to
// save into rocksdb in that case!
Expand Down Expand Up @@ -269,7 +264,10 @@ impl RevIndexOps for RevIndex {
.into_iter()
.filter_map(|(dataset_id, size)| {

Check warning on line 265 in src/core/src/index/revindex/disk_revindex.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/revindex/disk_revindex.rs#L265

Added line #L265 was not covered by tests
if size >= threshold {
let row = &self.collection.manifest[dataset_id as usize];
let row = &self

Check warning on line 267 in src/core/src/index/revindex/disk_revindex.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/revindex/disk_revindex.rs#L267

Added line #L267 was not covered by tests
.collection
.record_for_dataset(dataset_id)
.expect("dataset not found");
Some((row.name().into(), size))
} else {
None

Check warning on line 273 in src/core/src/index/revindex/disk_revindex.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/revindex/disk_revindex.rs#L273

Added line #L273 was not covered by tests
Expand Down
36 changes: 13 additions & 23 deletions src/core/src/index/revindex/mem_revindex.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};

use camino::Utf8Path as Path;
use camino::Utf8PathBuf as PathBuf;
use log::{debug, info};
use nohash_hasher::BuildNoHashHasher;
use serde::{Deserialize, Serialize};

#[cfg(feature = "parallel")]
use rayon::prelude::*;

use crate::collection::Collection;
use crate::encodings::{Color, Colors, Idx};
use crate::encodings::{Colors, Idx};
use crate::index::linear::LinearIndex;
use crate::index::revindex::HashToColor;
use crate::index::{GatherResult, Index, SigCounter};
use crate::prelude::*;
use crate::signature::{Signature, SigsTrait};
use crate::sketch::minhash::KmerMinHash;
use crate::sketch::Sketch;
use crate::storage::Storage;
use crate::HashIntoType;
use crate::Result;

pub struct RevIndex {
Expand All @@ -38,34 +33,26 @@ impl LinearIndex {
) -> RevIndex {
let processed_sigs = AtomicUsize::new(0);

let search_sigs: Vec<_> = self
.collection()
.manifest
.internal_locations()
.map(PathBuf::from)
.collect();

#[cfg(feature = "parallel")]
let sig_iter = search_sigs.par_iter();
let sig_iter = self.collection().par_iter();

#[cfg(not(feature = "parallel"))]
let sig_iter = search_sigs.iter();
let sig_iter = self.collection().iter();

let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, filename)| {
let filtered_sigs = sig_iter.enumerate().filter_map(|(dataset_id, _)| {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}

let search_sig = self
.collection()
.storage
.load_sig(filename.as_str())
.unwrap_or_else(|_| panic!("Error processing {:?}", filename))
.sig_for_dataset(dataset_id as Idx)
.expect("Error loading sig")
.into();

RevIndex::map_hashes_colors(
dataset_id,
dataset_id as Idx,
&search_sig,
queries,
&merged_query,
Expand Down Expand Up @@ -160,7 +147,7 @@ impl RevIndex {
}

fn map_hashes_colors(
dataset_id: usize,
dataset_id: Idx,
search_sig: &Signature,
queries: Option<&[KmerMinHash]>,
merged_query: &Option<KmerMinHash>,
Expand Down Expand Up @@ -275,8 +262,11 @@ impl RevIndex {
let match_size = if size >= threshold { size } else { break };

let match_sig = self.linear.sig_for_dataset(dataset_id)?;

Check warning on line 264 in src/core/src/index/revindex/mem_revindex.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/revindex/mem_revindex.rs#L264

Added line #L264 was not covered by tests
let match_path =
self.linear.collection().manifest[dataset_id as usize].internal_location();
let match_path = self
.linear
.collection()
.record_for_dataset(dataset_id)?
.internal_location();

let mut match_mh = None;
if let Some(Sketch::MinHash(mh)) = match_sig.select_sketch(self.linear.template()) {

Check warning on line 272 in src/core/src/index/revindex/mem_revindex.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/revindex/mem_revindex.rs#L272

Added line #L272 was not covered by tests
Expand Down
4 changes: 2 additions & 2 deletions src/core/src/index/revindex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ impl HashToColor {
self.0.is_empty()
}

fn add_to(&mut self, colors: &mut Colors, dataset_id: usize, matched_hashes: Vec<u64>) {
fn add_to(&mut self, colors: &mut Colors, dataset_id: Idx, matched_hashes: Vec<u64>) {

Check warning on line 101 in src/core/src/index/revindex/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/index/revindex/mod.rs#L101

Added line #L101 was not covered by tests
let mut color = None;

matched_hashes.into_iter().for_each(|hash| {
color = Some(colors.update(color, &[dataset_id as Idx]).unwrap());
color = Some(colors.update(color, &[dataset_id]).unwrap());
self.0.insert(hash, color.unwrap());
});
}
Expand Down

0 comments on commit 25300fa

Please sign in to comment.