Skip to content

Commit

Permalink
Disable webgraph checksum (#173)
Browse files Browse the repository at this point in the history
* Disable checksum verification in rocksdb.
~80% of the time seems to be spend in xxh3 hash. This seems to be primarily used to verify the checksums when the blockcache gets a cache-miss and needs to read from disk. We don't gracefully handle corruptions either way, so let's just disable the verification and see how it impacts performance.

* also disable verification in 'RocksDbStore'

* allow rocksdb to prefetch blocks during iteration

* increase block cache for 'Id2NodeDb' and 'RocksDbStore'
  • Loading branch information
mikkeldenker authored Mar 5, 2024
1 parent 7630ae4 commit 13b8e7b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 20 deletions.
13 changes: 10 additions & 3 deletions crates/core/src/kv/rocksdb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ where
options.set_level_compaction_dynamic_level_bytes(true);
options.set_bytes_per_sync(1048576);
let mut block_options = BlockBasedOptions::default();
block_options.set_block_size(16 * 1024);
block_options.set_block_size(1024 * 1024 * 1024); // 1 GB
block_options.set_format_version(5);
block_options.set_cache_index_and_filter_blocks(true);
block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
Expand Down Expand Up @@ -117,7 +117,10 @@ where
V: Serialize + DeserializeOwned + 'static + Send + Sync,
{
fn get_raw(&self, key: &[u8]) -> Option<Vec<u8>> {
self.db.get(key).expect("failed to retrieve key")
let mut opts = rocksdb::ReadOptions::default();
opts.set_verify_checksums(false);

self.db.get_opt(key, &opts).expect("failed to retrieve key")
}

fn insert_raw(&self, key: Vec<u8>, value: Vec<u8>) {
Expand All @@ -139,7 +142,11 @@ where
}

fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (K, V)> + 'a> {
let iter = self.db.iterator(IteratorMode::Start);
let mut opts = rocksdb::ReadOptions::default();
opts.set_verify_checksums(false);
opts.set_async_io(true);

let iter = self.db.iterator_opt(IteratorMode::Start, opts);

Box::new(IntoIter {
inner: iter,
Expand Down
28 changes: 21 additions & 7 deletions crates/core/src/webgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ impl Meta {

struct Id2NodeDb {
db: rocksdb::DB,
_cache: rocksdb::Cache, // needs to be kept alive for as long as the db is alive
}

impl Id2NodeDb {
Expand All @@ -484,22 +485,25 @@ impl Id2NodeDb {
opts.set_target_file_size_base(512 * 1024 * 1024); // 512 MB
opts.set_target_file_size_multiplier(10);

opts.set_compression_type(rocksdb::DBCompressionType::Lz4);

let mut block_opts = rocksdb::BlockBasedOptions::default();
let cache = rocksdb::Cache::new_lru_cache(8 * 1024 * 1024 * 1024); // 8 gb
opts.set_block_based_table_factory(&block_opts);

// some recommended settings (https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning)
opts.set_level_compaction_dynamic_level_bytes(true);
opts.set_bytes_per_sync(1048576);

block_opts.set_block_size(16 * 1024);
block_opts.set_format_version(5);
block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);

opts.set_block_based_table_factory(&block_opts);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
block_opts.set_block_cache(&cache);

let db = rocksdb::DB::open(&opts, path).unwrap();

Self { db }
Self { db, _cache: cache }
}

fn put(&mut self, id: &NodeID, node: &Node) {
Expand All @@ -516,15 +520,22 @@ impl Id2NodeDb {
}

fn get(&self, id: &NodeID) -> Option<Node> {
let mut opts = rocksdb::ReadOptions::default();
opts.set_verify_checksums(false);

self.db
.get(id.as_u64().to_le_bytes())
.get_opt(id.as_u64().to_le_bytes(), &opts)
.unwrap()
.map(|bytes| bincode::deserialize(&bytes).unwrap())
}

fn keys(&self) -> impl Iterator<Item = NodeID> + '_ {
let mut opts = rocksdb::ReadOptions::default();
opts.set_verify_checksums(false);
opts.set_async_io(true);

self.db
.iterator(rocksdb::IteratorMode::Start)
.iterator_opt(rocksdb::IteratorMode::Start, opts)
.filter_map(|r| {
let (key, _) = r.ok()?;
Some(NodeID(u64::from_le_bytes((*key).try_into().unwrap())))
Expand All @@ -540,8 +551,11 @@ impl Id2NodeDb {
}

fn iter(&self) -> impl Iterator<Item = (NodeID, Node)> + '_ {
let mut opts = rocksdb::ReadOptions::default();
opts.set_verify_checksums(false);

self.db
.iterator(rocksdb::IteratorMode::Start)
.iterator_opt(rocksdb::IteratorMode::Start, opts)
.filter_map(|r| {
let (key, value) = r.ok()?;

Expand Down
36 changes: 26 additions & 10 deletions crates/core/src/webgraph/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ impl EdgeStoreWriter {
}

pub fn iter<L: EdgeLabel>(&self) -> impl Iterator<Item = InnerEdge<L>> + '_ + Send + Sync {
let read_opts = rocksdb::ReadOptions::default();
let mut read_opts = rocksdb::ReadOptions::default();
read_opts.set_verify_checksums(false);
read_opts.set_async_io(true);

self.db
.iterator_opt(rocksdb::IteratorMode::Start, read_opts)
Expand Down Expand Up @@ -200,6 +202,7 @@ impl PrefixDb {

options.set_level_zero_slowdown_writes_trigger(-1);
options.set_level_zero_stop_writes_trigger(-1);
options.set_compression_type(rocksdb::DBCompressionType::None);

// some recommended settings (https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning)
options.set_level_compaction_dynamic_level_bytes(true);
Expand All @@ -211,7 +214,6 @@ impl PrefixDb {
block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);

options.set_block_based_table_factory(&block_options);
options.set_compression_type(rocksdb::DBCompressionType::Lz4);

let db = rocksdb::DB::open(&options, path).unwrap();

Expand Down Expand Up @@ -241,10 +243,14 @@ impl PrefixDb {

let mut res = Vec::new();

let iter = self.db.iterator(rocksdb::IteratorMode::From(
&start,
rocksdb::Direction::Forward,
));
let mut opts = rocksdb::ReadOptions::default();
opts.set_verify_checksums(false);
opts.set_async_io(true);

let iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&start, rocksdb::Direction::Forward),
opts,
);

for item in iter {
let (key, _) = item.unwrap();
Expand Down Expand Up @@ -514,9 +520,12 @@ impl EdgeStore {
let node_cf = self.ranges.cf_handle("nodes").unwrap();
let edge_cf = self.ranges.cf_handle("labels").unwrap();

let mut opts = rocksdb::ReadOptions::default();
opts.set_verify_checksums(false);

match (
self.ranges.get_cf(node_cf, node_bytes).unwrap(),
self.ranges.get_cf(edge_cf, node_bytes).unwrap(),
self.ranges.get_cf_opt(node_cf, node_bytes, &opts).unwrap(),
self.ranges.get_cf_opt(edge_cf, node_bytes, &opts).unwrap(),
) {
(Some(node_range_bytes), Some(edge_range_bytes)) => {
let node_range = bincode::deserialize::<Range<usize>>(&node_range_bytes).unwrap();
Expand Down Expand Up @@ -559,7 +568,10 @@ impl EdgeStore {

let node_cf = self.ranges.cf_handle("nodes").unwrap();

match self.ranges.get_cf(node_cf, node_bytes).unwrap() {
let mut opts = rocksdb::ReadOptions::default();
opts.set_verify_checksums(false);

match self.ranges.get_cf_opt(node_cf, node_bytes, &opts).unwrap() {
Some(node_range_bytes) => {
let node_range = bincode::deserialize::<Range<usize>>(&node_range_bytes).unwrap();

Expand Down Expand Up @@ -597,8 +609,12 @@ impl EdgeStore {
pub fn iter_without_label(&self) -> impl Iterator<Item = Edge<()>> + '_ + Send + Sync {
let node_cf = self.ranges.cf_handle("nodes").unwrap();

let mut opts = rocksdb::ReadOptions::default();
opts.set_verify_checksums(false);
opts.set_async_io(true);

self.ranges
.iterator_cf(node_cf, rocksdb::IteratorMode::Start)
.iterator_cf_opt(node_cf, opts, rocksdb::IteratorMode::Start)
.flat_map(move |res| {
let (key, val) = res.unwrap();

Expand Down

0 comments on commit 13b8e7b

Please sign in to comment.