Skip to content

Commit

Permalink
implement method to internalize storage
Browse files Browse the repository at this point in the history
  • Loading branch information
luizirber committed Jul 24, 2024
1 parent 0c18037 commit 5a84039
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/core/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ impl CollectionSet {
pub fn selection(&self) -> Selection {
todo!("Extract selection from first sig")
}

pub unsafe fn set_storage_unchecked(&mut self, storage: InnerStorage) {
self.storage = storage;
}
}

impl Collection {
Expand Down
30 changes: 29 additions & 1 deletion src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree};
use crate::sketch::Sketch;
use crate::storage::{
rocksdb::{cf_descriptors, db_options, DB, HASHES, METADATA},
InnerStorage, Storage,
InnerStorage, RocksDBStorage, Storage,
};
use crate::Result;

Expand Down Expand Up @@ -464,6 +464,34 @@ impl RevIndexOps for RevIndex {
Ok(())
}

fn collection(&self) -> &CollectionSet {
&self.collection
}

fn internalize_storage(&mut self) -> Result<()> {
// TODO: check if collection is already internal, if so return

// build new rocksdb storage from db
let new_storage = RocksDBStorage::from_db(self.db.clone());

// use manifest to copy from current storage to new one
self.collection().par_iter().for_each(|(_, record)| {
let path = record.internal_location().as_str();
let sig_data = self.collection.storage().load(path).unwrap();
new_storage.save(path, &sig_data);
});

// Replace storage for collection.
// Using unchecked version because we just used the manifest
// above to make sure the storage is still consistent
unsafe {
Arc::get_mut(&mut self.collection)
.map(|v| v.set_storage_unchecked(InnerStorage::new(new_storage)));
}

Ok(())
}

fn convert(&self, _output_db: module::RevIndex) -> Result<()> {
todo!()
/*
Expand Down
63 changes: 63 additions & 0 deletions src/core/src/index/revindex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ pub trait RevIndexOps {
query: &KmerMinHash,
selection: Option<Selection>,
) -> Result<Vec<GatherResult>>;

fn collection(&self) -> &CollectionSet;

fn internalize_storage(&mut self) -> Result<()>;
}

impl HashToColor {
Expand Down Expand Up @@ -869,4 +873,63 @@ mod test {

Ok(())
}

#[test]
fn revindex_internalize_storage() -> Result<()> {
let basedir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));

let mut zip_collection = basedir.clone();
zip_collection.push("../../tests/test-data/track_abund/track_abund.zip");

let outdir = TempDir::new()?;

let zip_copy = PathBuf::from(
outdir
.path()
.join("sigs.zip")
.into_os_string()
.into_string()
.unwrap(),
);
std::fs::copy(zip_collection, zip_copy.as_path())?;

let selection = Selection::builder().ksize(31).scaled(10000).build();
let collection = Collection::from_zipfile(zip_copy.as_path())?.select(&selection)?;
let output = outdir.path().join("index");

let query = prepare_query(collection.sig_for_dataset(0)?.into(), &selection).unwrap();

let index = RevIndex::create(output.as_path(), collection.try_into()?, false)?;

let (counter, query_colors, hash_to_color) = index.prepare_gather_counters(&query);

let matches_external = index.gather(
counter,
query_colors,
hash_to_color,
0,
&query,
Some(selection.clone()),
)?;

let mut index = index;
index
.internalize_storage()
.expect("Error internalizing storage");

let (counter, query_colors, hash_to_color) = index.prepare_gather_counters(&query);

let matches_internal = index.gather(
counter,
query_colors,
hash_to_color,
0,
&query,
Some(selection),
)?;

assert_eq!(matches_external, matches_internal);

Ok(())
}
}

0 comments on commit 5a84039

Please sign in to comment.