Skip to content

Commit

Permalink
Implement resumability for revindex
Browse files Browse the repository at this point in the history
  • Loading branch information
luizirber committed Aug 6, 2024
1 parent 5219fa9 commit e4ed6e4
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 17 deletions.
105 changes: 90 additions & 15 deletions src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use byteorder::{LittleEndian, WriteBytesExt};
use log::{info, trace};
Expand All @@ -12,15 +12,15 @@ use crate::collection::{Collection, CollectionSet};
use crate::encodings::{Color, Idx};
use crate::index::revindex::{
self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps,
MANIFEST, STORAGE_SPEC, VERSION,
MANIFEST, PROCESSED, STORAGE_SPEC, VERSION,
};
use crate::index::{calculate_gather_stats, GatherResult, SigCounter};
use crate::manifest::Manifest;
use crate::prelude::*;
use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree};
use crate::sketch::Sketch;
use crate::storage::{
rocksdb::{cf_descriptors, db_options, DB, HASHES, METADATA},
rocksdb::{cf_descriptors, db_options, ALL_CFS, DB, HASHES, METADATA},
InnerStorage, RocksDBStorage, Storage,
};
use crate::Result;
Expand All @@ -38,6 +38,7 @@ fn compute_color(idxs: &Datasets) -> Color {
pub struct RevIndex {
db: Arc<DB>,
collection: Arc<CollectionSet>,
processed: Arc<RwLock<Datasets>>,
}

pub(crate) fn merge_datasets(
Expand Down Expand Up @@ -78,9 +79,19 @@ impl RevIndex {

let processed_sigs = AtomicUsize::new(0);

let collection = Arc::new(collection);
let processed = Arc::new(RwLock::new(Self::load_processed(
db.clone(),
collection.clone(),
true,
)?));

dbg!(processed.read().unwrap().len());

let index = Self {
db,
collection: Arc::new(collection),
collection,
processed: processed.clone(),
};

index.collection.par_iter().for_each(|(dataset_id, _)| {
Expand All @@ -89,7 +100,16 @@ impl RevIndex {
info!("Processed {} reference sigs", i);
}

index.map_hashes_colors(dataset_id as Idx);
// check if this dataset_id was processed already
// call map_hashes_colors only if not already processed
if !processed.read().unwrap().contains(&dataset_id) {
index.map_hashes_colors(dataset_id as Idx);

// if cached in a new field in the RevIndex,
// then update the cache too

processed.write().unwrap().extend([dataset_id]);
}
});

index.save_collection().expect("Error saving collection");
Expand Down Expand Up @@ -127,7 +147,37 @@ impl RevIndex {
storage_spec,
)?);

Ok(module::RevIndex::Plain(Self { db, collection }))
let processed = Arc::new(RwLock::new(Self::load_processed(
db.clone(),
collection.clone(),
false,
)?));

dbg!(processed.read().unwrap().len());

Ok(module::RevIndex::Plain(Self {
db,
collection,
processed,
}))
}

fn load_processed(
db: Arc<DB>,
collection: Arc<CollectionSet>,
assume_empty: bool,
) -> Result<Datasets> {
let cf_metadata = db.cf_handle(METADATA).unwrap();
if let Some(rdr) = db.get_pinned_cf(&cf_metadata, PROCESSED)? {
// convert rdr to Datasets
Datasets::from_slice(&rdr)
.ok_or_else(|| todo!("throw error from deserializing Datasets"))
} else if assume_empty {
Ok(Datasets::default())
} else {
let all_datasets: Vec<_> = (0..collection.manifest().len()).map(|v| v as Idx).collect();
Ok(Datasets::new(&all_datasets))
}
}

fn load_collection_from_rocksdb(
Expand Down Expand Up @@ -211,6 +261,14 @@ impl RevIndex {
.merge_cf(&cf_hashes, &hash_bytes[..], colors.as_slice())
.expect("error merging");
}

// finished processing this dataset,
// do a merge_cf in the PROCESSED key in metadata
// to account for that.
let cf_metadata = self.db.cf_handle(METADATA).unwrap();
self.db
.merge_cf(&cf_metadata, PROCESSED, colors.as_slice())
.expect("error merging");
}
}

Expand Down Expand Up @@ -400,24 +458,41 @@ impl RevIndexOps for RevIndex {
fn update(mut self, collection: CollectionSet) -> Result<module::RevIndex> {
// TODO: verify new collection manifest is a superset of current one,
// and the initial chunk is the same
let to_skip = self.collection.check_superset(&collection)?;
self.collection.check_superset(&collection)?;
info!("sigs in the original index: {}", self.collection.len());

self.collection = Arc::new(collection);
info!(
"sigs in the new index once finished: {}",
self.collection.len()
);

let processed = self.processed.clone();
info!(
"sigs left to process: {}",
self.collection.len() - processed.read().unwrap().len()
);

// process the remainder
let processed_sigs = AtomicUsize::new(0);

self.collection = Arc::new(collection);

self.collection
.par_iter()
.skip(to_skip)
.for_each(|(dataset_id, _)| {
self.collection.par_iter().for_each(|(dataset_id, _)| {
// check if this dataset_id was processed already
// call map_hashes_colors only if not already processed
if !processed.read().unwrap().contains(&dataset_id) {
let i = processed_sigs.fetch_add(1, Ordering::SeqCst);
if i % 1000 == 0 {
info!("Processed {} reference sigs", i);
}

self.map_hashes_colors(dataset_id as Idx);
});

// if cached in a new field in the RevIndex,
// then update the cache too

processed.write().unwrap().extend([dataset_id]);
}
});

self.save_collection().expect("Error saving collection");

Expand All @@ -437,7 +512,7 @@ impl RevIndexOps for RevIndex {
}

fn compact(&self) {
for cf_name in [HASHES, METADATA] {
for cf_name in ALL_CFS {
let cf = self.db.cf_handle(cf_name).unwrap();
self.db.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>)
}
Expand Down
3 changes: 1 addition & 2 deletions src/core/src/index/revindex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::Result;
const MANIFEST: &str = "manifest";
const STORAGE_SPEC: &str = "storage_spec";
const VERSION: &str = "version";
const PROCESSED: &str = "processed";

type QueryColors = HashMap<Color, Datasets>;
type HashToColorT = HashMap<HashIntoType, Color, BuildNoHashHasher<HashIntoType>>;
Expand Down Expand Up @@ -351,15 +352,13 @@ impl Datasets {
}
}

/*
fn contains(&self, value: &Idx) -> bool {
match self {
Self::Empty => false,
Self::Unique(v) => v == value,
Self::Many(ref v) => v.contains(*value),
}
}
*/
}

#[derive(Getters, Setters, Debug)]
Expand Down
6 changes: 6 additions & 0 deletions src/core/src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub(crate) const METADATA: &str = "metadata";
// Column family for using rocksdb as a Storage
pub(crate) const STORAGE: &str = "storage";

pub(crate) const ALL_CFS: [&str; 3] = [HASHES, METADATA, STORAGE];

pub type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;

/// Store data in RocksDB
Expand Down Expand Up @@ -81,6 +83,10 @@ pub(crate) fn cf_descriptors() -> Vec<ColumnFamilyDescriptor> {

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
cfopts.set_merge_operator_associative(
"datasets operator",
crate::index::revindex::disk_revindex::merge_datasets,
);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);

Expand Down

0 comments on commit e4ed6e4

Please sign in to comment.