From aae52c0a5655c5040ec0a6c710b6ee991b116e33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Marcos=20Bezerra?= Date: Wed, 22 May 2024 17:15:29 -0300 Subject: [PATCH] rocksDB Column Families revamp and refac --- Cargo.lock | 9 + Cargo.toml | 11 +- src/eth/storage/rocks/mod.rs | 11 +- src/eth/storage/rocks/rocks_cf.rs | 328 +++++++++++++ src/eth/storage/rocks/rocks_config.rs | 161 +++++++ src/eth/storage/rocks/rocks_db.rs | 563 +---------------------- src/eth/storage/rocks/rocks_permanent.rs | 8 +- src/eth/storage/rocks/rocks_state.rs | 232 +++++++--- src/infra/metrics/metrics_types.rs | 4 + 9 files changed, 707 insertions(+), 620 deletions(-) create mode 100644 src/eth/storage/rocks/rocks_cf.rs create mode 100644 src/eth/storage/rocks/rocks_config.rs diff --git a/Cargo.lock b/Cargo.lock index 493b77159..270b93cbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5299,6 +5299,7 @@ dependencies = [ "binary_macros", "bincode", "byte-unit", + "cfg-if", "chrono", "clap", "console-subscriber", @@ -5328,6 +5329,7 @@ dependencies = [ "k8s-openapi", "keccak-hasher", "kube", + "lazy_static", "metrics", "metrics-exporter-prometheus", "nom", @@ -5358,6 +5360,7 @@ dependencies = [ "sqlx", "stringreader", "strum", + "sugars", "testcontainers", "testcontainers-modules", "thiserror", @@ -5442,6 +5445,12 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" +[[package]] +name = "sugars" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc0db74f9ee706e039d031a560bd7d110c7022f016051b3d33eeff9583e3e67a" + [[package]] name = "syn" version = "1.0.109" diff --git a/Cargo.toml b/Cargo.toml index ac4da9ea3..07bd0ccb0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ default-run = "stratus" anyhow = "=1.0.82" async-trait = "=0.1.80" byte-unit = "=5.1.4" +cfg-if = "=1.0.0" chrono = "=0.4.38" const_format = "=0.2.32" const-hex = "=1.10.0" @@ -24,6 +25,7 @@ hex-literal = "=0.4.1" humantime = "=2.1.0" indexmap = { version = "=2.2.6", features = ["serde"] } itertools = "=0.12.1" +lazy_static = "=1.4.0" nonempty = { version = "=0.10.0", features = ["serialize"] } once_cell = "=1.19.0" paste = "=1.0.14" @@ -32,6 +34,7 @@ pin-project = "=1.1.5" quote = "=1.0.36" rand = "=0.8.5" strum = "=0.26.2" +sugars = "=3.0.1" thiserror = "=1.0.59" url = "=2.5.0" uuid = { version = "=1.8.0", features = ["v4", "fast-rng" ] } @@ -45,7 +48,7 @@ clap = { version = "=4.5.4", features = ["derive", "env"] } dotenvy = "=0.15.7" # serialization -display_json = "0.2.1" +display_json = "=0.2.1" serde = "=1.0.199" serde_json = "=1.0.116" serde_with = "=3.8.1" @@ -87,9 +90,9 @@ sqlx = { version = "=0.7.4", features = ["runtime-tokio", "postgres", "bigdecima num-traits = "=0.2.18" rocksdb = { version = "=0.22.0", features = ["multi-threaded-cf"], optional = true } raft = { version = "=0.7.0", optional = true } -slog = "2.7" -slog-term = "2.8" -slog-async = "2.7" +slog = "=2.7.0" +slog-term = "=2.9.1" +slog-async = "=2.8.0" kube = { version = "=0.90.0", optional = true, features = ["runtime", "derive"] } k8s-openapi = { version = "=0.21.1", optional = true, features = ["v1_27"] } diff --git a/src/eth/storage/rocks/mod.rs b/src/eth/storage/rocks/mod.rs index fffd41eb0..8246a7b42 100644 --- a/src/eth/storage/rocks/mod.rs +++ b/src/eth/storage/rocks/mod.rs @@ -1,4 +1,11 @@ -pub mod rocks_db; +/// Data manipulation for column families. +mod rocks_cf; +/// Settings and tweaks for the database and column families. +mod rocks_config; +/// Functionalities related to the whole database. +mod rocks_db; +/// Exposed API. pub mod rocks_permanent; -pub mod rocks_state; +/// State handler for DB and column families. +mod rocks_state; mod types; diff --git a/src/eth/storage/rocks/rocks_cf.rs b/src/eth/storage/rocks/rocks_cf.rs new file mode 100644 index 000000000..b752b49f1 --- /dev/null +++ b/src/eth/storage/rocks/rocks_cf.rs @@ -0,0 +1,328 @@ +//! RocksDB handling of column families. + +use std::marker::PhantomData; +use std::sync::Arc; + +use anyhow::Result; +use rocksdb::DBIteratorWithThreadMode; +use rocksdb::IteratorMode; +use rocksdb::Options; +use rocksdb::WriteBatch; +use rocksdb::DB; +use serde::Deserialize; +use serde::Serialize; + +/// A Column Family in RocksDB. +/// +/// Exposes an API for key-value pair storage. +#[derive(Clone)] +pub struct RocksCf { + pub db: Arc, + // TODO: check if we can gather metrics from a Column Family, if not, remove this field + _opts: Options, + column_family: String, + _marker: PhantomData<(K, V)>, +} + +impl RocksCf +where + K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, + V: Serialize + for<'de> Deserialize<'de> + Clone, +{ + /// Create Column Family for given DB if it doesn't exist. + pub fn new_cf(db: Arc, column_family: &str, opts: Options) -> Self { + Self { + db, + column_family: column_family.to_owned(), + _opts: opts, + _marker: PhantomData, + } + } + + // Clears the database + pub fn clear(&self) -> Result<()> { + let cf = self.db.cf_handle(&self.column_family).unwrap(); + + // try clearing everything + let first = self.db.iterator_cf(&cf, IteratorMode::Start).next(); + let last = self.db.iterator_cf(&cf, IteratorMode::End).next(); + if let (Some(Ok((first_key, _))), Some(Ok((last_key, _)))) = (first, last) { + self.db.delete_range_cf(&cf, first_key, last_key)?; + } + + // clear left-overs + let mut batch = WriteBatch::default(); + for item in self.db.iterator_cf(&cf, IteratorMode::Start) { + let (key, _) = item?; // Handle or unwrap the Result + batch.delete_cf(&cf, key); + } + self.db.write(batch)?; + Ok(()) + } + + pub fn get(&self, key: &K) -> Option { + let Ok(serialized_key) = bincode::serialize(key) else { return None }; + let cf = self.db.cf_handle(&self.column_family).unwrap(); + let Ok(Some(value_bytes)) = self.db.get_cf(&cf, serialized_key) else { + return None; + }; + + bincode::deserialize(&value_bytes).ok() + } + + pub fn multi_get(&self, keys: I) -> anyhow::Result> + where + I: IntoIterator + Clone, + { + let serialized_keys = keys.clone().into_iter().map(|k| bincode::serialize(&k)).collect::, _>>()?; + Ok(self + .db + .multi_get(serialized_keys) + .into_iter() + .zip(keys) + .filter_map(|(value, key)| { + if let Ok(Some(value)) = value { + let Ok(value) = bincode::deserialize::(&value) else { return None }; // XXX: Maybe we should fail on a failed conversion instead of ignoring; + Some((key, value)) + } else { + None + } + }) + .collect()) + } + + pub fn get_current_block_number(&self) -> u64 { + let Ok(serialized_key) = bincode::serialize(&"current_block") else { + return 0; + }; + let cf = self.db.cf_handle(&self.column_family).unwrap(); + + let Ok(Some(value_bytes)) = self.db.get_cf(&cf, serialized_key) else { + return 0; + }; + + bincode::deserialize(&value_bytes).ok().unwrap_or(0) + } + + pub fn get_index_block_number(&self) -> u64 { + self.last_index().map(|(block_number, _)| block_number).unwrap_or(0) + } + + // Mimics the 'insert' functionality of a HashMap + pub fn insert(&self, key: K, value: V) { + let cf = self.db.cf_handle(&self.column_family).unwrap(); + + let serialized_key = bincode::serialize(&key).unwrap(); + let serialized_value = bincode::serialize(&value).unwrap(); + self.db.put_cf(&cf, serialized_key, serialized_value).unwrap(); + } + + pub fn prepare_batch_insertion(&self, changes: Vec<(K, V)>, current_block: Option, batch: &mut WriteBatch) { + let cf = self.db.cf_handle(&self.column_family).unwrap(); + + for (key, value) in changes { + let serialized_key = bincode::serialize(&key).unwrap(); + let serialized_value = bincode::serialize(&value).unwrap(); + // Add each serialized key-value pair to the batch + batch.put_cf(&cf, serialized_key, serialized_value); + } + + if let Some(current_block) = current_block { + let serialized_block_key = bincode::serialize(&"current_block").unwrap(); + let serialized_block_value = bincode::serialize(¤t_block).unwrap(); + batch.put_cf(&cf, serialized_block_key, serialized_block_value); + } + } + + /// inserts data but keep a block as key pointing to the keys inserted in a given block + /// this makes for faster search based on block_number, ergo index + pub fn prepare_batch_insertion_indexed(&self, changes: Vec<(K, V)>, current_block: u64, batch: &mut WriteBatch) { + let cf = self.db.cf_handle(&self.column_family).unwrap(); + + let mut keys = vec![]; + + for (key, value) in changes { + let serialized_key = bincode::serialize(&key).unwrap(); + let serialized_value = bincode::serialize(&value).unwrap(); + + keys.push(key); + + // Add each serialized key-value pair to the batch + batch.put_cf(&cf, serialized_key, serialized_value); + } + + let serialized_block_value = bincode::serialize(¤t_block).unwrap(); + let serialized_keys = bincode::serialize(&keys).unwrap(); + batch.put_cf(&cf, serialized_block_value, serialized_keys); + } + + // Deletes an entry from the database by key + pub fn delete(&self, key: &K) -> Result<()> { + let serialized_key = bincode::serialize(key)?; + let cf = self.db.cf_handle(&self.column_family).unwrap(); + + self.db.delete_cf(&cf, serialized_key)?; + Ok(()) + } + + // Deletes an entry from the database by key + pub fn delete_index(&self, key: u64) -> Result<()> { + let serialized_key = bincode::serialize(&key)?; + let cf = self.db.cf_handle(&self.column_family).unwrap(); + //XXX check if value is a vec that can be deserialized as a safety measure + self.db.delete_cf(&cf, serialized_key)?; + Ok(()) + } + + // Custom method that combines entry and or_insert_with from a HashMap + pub fn entry_or_insert_with(&self, key: K, default: F) -> V + where + F: FnOnce() -> V, + { + match self.get(&key) { + Some(value) => value, + None => { + let new_value = default(); + self.insert(key, new_value.clone()); + new_value + } + } + } + + pub fn iter_start(&self) -> RocksDBIterator { + let cf = self.db.cf_handle(&self.column_family).unwrap(); + + let iter = self.db.iterator_cf(&cf, IteratorMode::Start); + RocksDBIterator::::new(iter) + } + + pub fn iter_end(&self) -> RocksDBIterator { + let cf = self.db.cf_handle(&self.column_family).unwrap(); + + let iter = self.db.iterator_cf(&cf, IteratorMode::End); + RocksDBIterator::::new(iter) + } + + pub fn indexed_iter_end(&self) -> IndexedRocksDBIterator { + let cf = self.db.cf_handle(&self.column_family).unwrap(); + + let iter = self.db.iterator_cf(&cf, IteratorMode::End); + IndexedRocksDBIterator::::new(iter) + } + + pub fn iter_from Deserialize<'de> + std::hash::Hash + Eq>( + &self, + key_prefix: P, + direction: rocksdb::Direction, + ) -> RocksDBIterator { + let serialized_key = bincode::serialize(&key_prefix).unwrap(); + let cf = self.db.cf_handle(&self.column_family).unwrap(); + + let iter = self.db.iterator_cf(&cf, IteratorMode::From(&serialized_key, direction)); + RocksDBIterator::::new(iter) + } + + pub fn last_index(&self) -> Option<(u64, Vec)> { + let cf = self.db.cf_handle(&self.column_family).unwrap(); + + let iter = self.db.iterator_cf(&cf, IteratorMode::End); + IndexedRocksDBIterator::::new(iter).next() + } + + pub fn last(&self) -> Option<(K, V)> { + let cf = self.db.cf_handle(&self.column_family).unwrap(); + + let mut iter = self.db.iterator_cf(&cf, IteratorMode::End); + if let Some(Ok((k, v))) = iter.next() { + let key = bincode::deserialize(&k).unwrap(); + let value = bincode::deserialize(&v).unwrap(); + Some((key, value)) + } else { + None + } + } +} + +pub struct RocksDBIterator<'a, K, V> { + iter: DBIteratorWithThreadMode<'a, DB>, + _marker: PhantomData<(K, V)>, +} + +impl<'a, K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Serialize + for<'de> Deserialize<'de> + Clone> RocksDBIterator<'a, K, V> { + pub fn new(iter: DBIteratorWithThreadMode<'a, DB>) -> Self { + Self { iter, _marker: PhantomData } + } +} + +/// Custom iterator for navigating RocksDB entries. +/// +/// This iterator is designed to skip over specific keys used for internal control purposes, such as: +/// - `"current_block"`: Used to indicate the current block number in the database. +/// - Keys representing index keys (if deserialized as `u64`): Used for various indexing purposes. +/// +/// The iterator will: +/// - Ignore any entries where the key is `"current_block"`. +/// - Attempt to deserialize all other keys to the generic type `K`. If deserialization fails, it assumes +/// the key might be an index key or improperly formatted, and skips it. +impl<'a, K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Serialize + for<'de> Deserialize<'de> + Clone> Iterator + for RocksDBIterator<'a, K, V> +{ + type Item = (K, V); + + /// Retrieves the next key-value pair from the database, skipping over special control keys and + /// potentially misformatted keys. + /// + /// Returns: + /// - `Some((K, V))` if a valid key-value pair is found. + /// - `None` if there are no more items to process, or if only special/control keys remain. + fn next(&mut self) -> Option { + for key_value_result in self.iter.by_ref() { + let Ok((key, value)) = key_value_result else { continue }; + + // Check if the key is a special 'current_block' key and skip it + if key == bincode::serialize(&"current_block").unwrap().into_boxed_slice() { + continue; // Move to the next key-value pair + } + + // Attempt to deserialize the key to type `K` + if let Ok(deserialized_key) = bincode::deserialize::(&key) { + // Attempt to deserialize the value to type `V` + if let Ok(deserialized_value) = bincode::deserialize::(&value) { + // Return the deserialized key-value pair if both are successful + return Some((deserialized_key, deserialized_value)); + } + } + // If deserialization fails, continue to the next item + } + // Return None if all items are processed or if all remaining items fail conditions + None + } +} + +impl<'a, K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq> IndexedRocksDBIterator<'a, K> { + pub fn new(iter: DBIteratorWithThreadMode<'a, DB>) -> Self { + Self { iter, _marker: PhantomData } + } +} + +pub struct IndexedRocksDBIterator<'a, K> { + iter: DBIteratorWithThreadMode<'a, DB>, + _marker: PhantomData>, +} + +impl<'a, K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq> Iterator for IndexedRocksDBIterator<'a, K> { + type Item = (u64, Vec); + + fn next(&mut self) -> Option { + for key_value_result in self.iter.by_ref() { + let Ok((key, value)) = key_value_result else { continue }; + + if let Ok(index_key) = bincode::deserialize::(&key) { + if let Ok(index_values) = bincode::deserialize::>(&value) { + return Some((index_key, index_values)); + } + } + } + None + } +} diff --git a/src/eth/storage/rocks/rocks_config.rs b/src/eth/storage/rocks/rocks_config.rs new file mode 100644 index 000000000..0df5ab27c --- /dev/null +++ b/src/eth/storage/rocks/rocks_config.rs @@ -0,0 +1,161 @@ +use rocksdb::BlockBasedOptions; +use rocksdb::Cache; +use rocksdb::Options; + +const GIGABYTE: usize = 1024 * 1024 * 1024; +const CACHE_SIZE: usize = 30 * GIGABYTE; + +#[derive(Debug, Clone, Copy)] +pub enum CacheSetting { + Enabled, + Disabled, +} + +#[derive(Debug, Clone, Copy)] +pub enum DbConfig { + LargeSSTFiles, + FastWriteSST, + Default, +} + +impl Default for DbConfig { + fn default() -> Self { + Self::Default + } +} + +impl DbConfig { + pub fn to_options(self, cache_setting: CacheSetting) -> Options { + let mut opts = Options::default(); + let mut block_based_options = BlockBasedOptions::default(); + + opts.create_if_missing(true); + opts.create_missing_column_families(true); + opts.increase_parallelism(16); + + // NOTE: As per the rocks db wiki: "The overhead of statistics is usually small but non-negligible. We usually observe an overhead of 5%-10%." + #[cfg(feature = "metrics")] + opts.enable_statistics(); + #[cfg(feature = "metrics")] + opts.set_statistics_level(rocksdb::statistics::StatsLevel::ExceptTimeForMutex); + + match self { + DbConfig::LargeSSTFiles => { + // Set the compaction style to Level Compaction + opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); + + // Configure the size of SST files at each level + opts.set_target_file_size_base(512 * 1024 * 1024); + + // Increase the file size multiplier to expand file size at upper levels + opts.set_target_file_size_multiplier(2); // Each level grows in file size quicker + + // Reduce the number of L0 files that trigger compaction, increasing frequency + opts.set_level_zero_file_num_compaction_trigger(2); + + // Reduce thresholds for slowing and stopping writes, which forces more frequent compaction + opts.set_level_zero_slowdown_writes_trigger(10); + opts.set_level_zero_stop_writes_trigger(20); + + // Increase the max bytes for L1 to allow more data before triggering compaction + opts.set_max_bytes_for_level_base(2048 * 1024 * 1024); + + // Increase the level multiplier to aggressively increase space at each level + opts.set_max_bytes_for_level_multiplier(8.0); // Exponential growth of levels is more pronounced + + // Configure block size to optimize for larger blocks, improving sequential read performance + block_based_options.set_block_size(128 * 1024); // 128KB blocks + + // Increase the number of write buffers to delay flushing, optimizing CPU usage for compaction + opts.set_max_write_buffer_number(5); + opts.set_write_buffer_size(128 * 1024 * 1024); // 128MB per write buffer + + // Keep a higher number of open files to accommodate more files being produced by aggressive compaction + opts.set_max_open_files(20000); + + // Apply more aggressive compression settings, if I/O and CPU permit + opts.set_compression_per_level(&[ + rocksdb::DBCompressionType::Lz4, + rocksdb::DBCompressionType::Zstd, // Use Zstd for higher compression from L1 onwards + ]); + } + DbConfig::FastWriteSST => { + // Continue using Level Compaction due to its effective use of I/O and CPU for writes + opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); + + // Increase initial SST file sizes to reduce the frequency of writes to disk + opts.set_target_file_size_base(512 * 1024 * 1024); // Starting at 512MB for L1 + + // Minimize the file size multiplier to control the growth of file sizes at upper levels + opts.set_target_file_size_multiplier(1); // Minimal increase in file size at upper levels + + // Increase triggers for write slowdown and stop to maximize buffer before I/O actions + opts.set_level_zero_file_num_compaction_trigger(100); // Slow down writes at 100 L0 files + opts.set_level_zero_stop_writes_trigger(200); // Stop writes at 200 L0 files + + // Expand the maximum bytes for base level to further delay the need for compaction-related I/O + opts.set_max_bytes_for_level_base(2048 * 1024 * 1024); + + // Use a higher level multiplier to increase space exponentially at higher levels + opts.set_max_bytes_for_level_multiplier(10.0); + + // Opt for larger block sizes to decrease the number of read and write operations to disk + block_based_options.set_block_size(512 * 1024); // 512KB blocks + + // Maximize the use of write buffers to extend the time data stays in memory before flushing + opts.set_max_write_buffer_number(16); + opts.set_write_buffer_size(1024 * 1024 * 1024); // 1GB per write buffer + + // Allow a very high number of open files to minimize the overhead of opening and closing files + opts.set_max_open_files(20000); + + // Choose compression that balances CPU use and effective storage reduction + opts.set_compression_per_level(&[rocksdb::DBCompressionType::Lz4, rocksdb::DBCompressionType::Zstd]); + + // Enable settings that make full use of CPU to handle more data in memory and process compaction + opts.set_allow_concurrent_memtable_write(true); + opts.set_enable_write_thread_adaptive_yield(true); + } + DbConfig::Default => { + block_based_options.set_ribbon_filter(15.5); // https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter + + opts.set_allow_concurrent_memtable_write(true); + opts.set_enable_write_thread_adaptive_yield(true); + + let transform = rocksdb::SliceTransform::create_fixed_prefix(10); + opts.set_prefix_extractor(transform); + opts.set_memtable_prefix_bloom_ratio(0.2); + + // Enable a size-tiered compaction style, which is good for workloads with a high rate of updates and overwrites + opts.set_compaction_style(rocksdb::DBCompactionStyle::Universal); + + let mut universal_compact_options = rocksdb::UniversalCompactOptions::default(); + universal_compact_options.set_size_ratio(10); + universal_compact_options.set_min_merge_width(2); + universal_compact_options.set_max_merge_width(6); + universal_compact_options.set_max_size_amplification_percent(50); + universal_compact_options.set_compression_size_percent(-1); + universal_compact_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Total); + opts.set_universal_compaction_options(&universal_compact_options); + + let pt_opts = rocksdb::PlainTableFactoryOptions { + user_key_length: 0, + bloom_bits_per_key: 10, + hash_table_ratio: 0.75, + index_sparseness: 8, + encoding_type: rocksdb::KeyEncodingType::Plain, // Default encoding + full_scan_mode: false, // Optimized for point lookups rather than full scans + huge_page_tlb_size: 0, // Not using huge pages + store_index_in_file: false, // Store index in memory for faster access + }; + opts.set_plain_table_factory(&pt_opts); + } + } + if let CacheSetting::Enabled = cache_setting { + let cache = Cache::new_lru_cache(CACHE_SIZE); + block_based_options.set_block_cache(&cache); + } + opts.set_block_based_table_factory(&block_based_options); + opts + } +} diff --git a/src/eth/storage/rocks/rocks_db.rs b/src/eth/storage/rocks/rocks_db.rs index 731ce500b..523aaafbf 100644 --- a/src/eth/storage/rocks/rocks_db.rs +++ b/src/eth/storage/rocks/rocks_db.rs @@ -1,558 +1,45 @@ -#[cfg(feature = "metrics")] -use std::collections::HashMap; -use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; -#[cfg(feature = "metrics")] -use std::sync::Mutex; use anyhow::anyhow; -use anyhow::Result; use rocksdb::backup::BackupEngine; use rocksdb::backup::BackupEngineOptions; -#[cfg(feature = "metrics")] -use rocksdb::statistics::Histogram; -#[cfg(feature = "metrics")] -use rocksdb::statistics::Ticker; -use rocksdb::BlockBasedOptions; -use rocksdb::Cache; -use rocksdb::DBIteratorWithThreadMode; use rocksdb::Env; -use rocksdb::IteratorMode; use rocksdb::Options; -use rocksdb::WriteBatch; use rocksdb::DB; -use serde::Deserialize; -use serde::Serialize; -#[cfg(feature = "metrics")] -type HistogramInt = u32; -#[cfg(feature = "metrics")] -type Sum = u64; -#[cfg(feature = "metrics")] -type Count = u64; - -#[cfg(feature = "metrics")] -use crate::infra::metrics; - -const GIGABYTE: usize = 1024 * 1024 * 1024; - -pub enum DbConfig { - LargeSSTFiles, - FastWriteSST, - Default, -} - -// A generic struct that abstracts over key-value pairs stored in RocksDB. -pub struct RocksDb { - pub db: Arc, - pub opts: Options, - _marker: PhantomData<(K, V)>, - // Last collected stats for a histogram - #[cfg(feature = "metrics")] - pub prev_stats: Mutex>, - column_family: String, -} - -impl Deserialize<'de> + std::hash::Hash + Eq, V: Serialize + for<'de> Deserialize<'de> + Clone> RocksDb { - pub fn new(cf_name: &str, db: Arc, config: DbConfig) -> anyhow::Result> { - let enable_cache = cf_name == "accounts" || cf_name == "account_slots"; - let opts = Self::get_options(config, enable_cache); - - db.create_cf(cf_name, &opts)?; - - Ok(Arc::new(Self { - db, - opts, - _marker: PhantomData, - #[cfg(feature = "metrics")] - prev_stats: Mutex::new(HashMap::new()), - column_family: cf_name.to_owned(), - })) - } - - pub fn new_db(db_path: &Path, config: DbConfig) -> anyhow::Result<(Arc, Options)> { - let opts = Self::get_options(config, false); - let db = match DB::open(&opts, db_path) { - Ok(db) => db, - Err(e) => { - tracing::error!("Failed to open RocksDB: {}", e); - DB::repair(&opts, db_path)?; - DB::open(&opts, db_path)? - } - }; //XXX in case of corruption, use DB - - Ok((Arc::new(db), opts)) - } - - fn get_options(config: DbConfig, cache: bool) -> Options { - let mut opts = Options::default(); - let mut block_based_options = BlockBasedOptions::default(); - - opts.create_if_missing(true); - opts.increase_parallelism(16); - - // NOTE: As per the rocks db wiki: "The overhead of statistics is usually small but non-negligible. We usually observe an overhead of 5%-10%." - #[cfg(feature = "metrics")] - opts.enable_statistics(); - #[cfg(feature = "metrics")] - opts.set_statistics_level(rocksdb::statistics::StatsLevel::ExceptTimeForMutex); - - match config { - DbConfig::LargeSSTFiles => { - // Set the compaction style to Level Compaction - opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - - // Configure the size of SST files at each level - opts.set_target_file_size_base(512 * 1024 * 1024); - - // Increase the file size multiplier to expand file size at upper levels - opts.set_target_file_size_multiplier(2); // Each level grows in file size quicker - - // Reduce the number of L0 files that trigger compaction, increasing frequency - opts.set_level_zero_file_num_compaction_trigger(2); - - // Reduce thresholds for slowing and stopping writes, which forces more frequent compaction - opts.set_level_zero_slowdown_writes_trigger(10); - opts.set_level_zero_stop_writes_trigger(20); - - // Increase the max bytes for L1 to allow more data before triggering compaction - opts.set_max_bytes_for_level_base(2048 * 1024 * 1024); - - // Increase the level multiplier to aggressively increase space at each level - opts.set_max_bytes_for_level_multiplier(8.0); // Exponential growth of levels is more pronounced - - // Configure block size to optimize for larger blocks, improving sequential read performance - block_based_options.set_block_size(128 * 1024); // 128KB blocks - - // Increase the number of write buffers to delay flushing, optimizing CPU usage for compaction - opts.set_max_write_buffer_number(5); - opts.set_write_buffer_size(128 * 1024 * 1024); // 128MB per write buffer - - // Keep a higher number of open files to accommodate more files being produced by aggressive compaction - opts.set_max_open_files(20000); - - // Apply more aggressive compression settings, if I/O and CPU permit - opts.set_compression_per_level(&[ - rocksdb::DBCompressionType::Lz4, - rocksdb::DBCompressionType::Zstd, // Use Zstd for higher compression from L1 onwards - ]); - } - DbConfig::FastWriteSST => { - // Continue using Level Compaction due to its effective use of I/O and CPU for writes - opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - - // Increase initial SST file sizes to reduce the frequency of writes to disk - opts.set_target_file_size_base(512 * 1024 * 1024); // Starting at 512MB for L1 - - // Minimize the file size multiplier to control the growth of file sizes at upper levels - opts.set_target_file_size_multiplier(1); // Minimal increase in file size at upper levels - - // Increase triggers for write slowdown and stop to maximize buffer before I/O actions - opts.set_level_zero_file_num_compaction_trigger(100); // Slow down writes at 100 L0 files - opts.set_level_zero_stop_writes_trigger(200); // Stop writes at 200 L0 files - - // Expand the maximum bytes for base level to further delay the need for compaction-related I/O - opts.set_max_bytes_for_level_base(2048 * 1024 * 1024); - - // Use a higher level multiplier to increase space exponentially at higher levels - opts.set_max_bytes_for_level_multiplier(10.0); - - // Opt for larger block sizes to decrease the number of read and write operations to disk - block_based_options.set_block_size(512 * 1024); // 512KB blocks - - // Maximize the use of write buffers to extend the time data stays in memory before flushing - opts.set_max_write_buffer_number(16); - opts.set_write_buffer_size(1024 * 1024 * 1024); // 1GB per write buffer - - // Allow a very high number of open files to minimize the overhead of opening and closing files - opts.set_max_open_files(20000); - - // Choose compression that balances CPU use and effective storage reduction - opts.set_compression_per_level(&[rocksdb::DBCompressionType::Lz4, rocksdb::DBCompressionType::Zstd]); - - // Enable settings that make full use of CPU to handle more data in memory and process compaction - opts.set_allow_concurrent_memtable_write(true); - opts.set_enable_write_thread_adaptive_yield(true); - } - DbConfig::Default => { - block_based_options.set_ribbon_filter(15.5); // https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter - - opts.set_allow_concurrent_memtable_write(true); - opts.set_enable_write_thread_adaptive_yield(true); - - let transform = rocksdb::SliceTransform::create_fixed_prefix(10); - opts.set_prefix_extractor(transform); - opts.set_memtable_prefix_bloom_ratio(0.2); - - // Enable a size-tiered compaction style, which is good for workloads with a high rate of updates and overwrites - opts.set_compaction_style(rocksdb::DBCompactionStyle::Universal); - - let mut universal_compact_options = rocksdb::UniversalCompactOptions::default(); - universal_compact_options.set_size_ratio(10); - universal_compact_options.set_min_merge_width(2); - universal_compact_options.set_max_merge_width(6); - universal_compact_options.set_max_size_amplification_percent(50); - universal_compact_options.set_compression_size_percent(-1); - universal_compact_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Total); - opts.set_universal_compaction_options(&universal_compact_options); - - let pt_opts = rocksdb::PlainTableFactoryOptions { - user_key_length: 0, - bloom_bits_per_key: 10, - hash_table_ratio: 0.75, - index_sparseness: 8, - encoding_type: rocksdb::KeyEncodingType::Plain, // Default encoding - full_scan_mode: false, // Optimized for point lookups rather than full scans - huge_page_tlb_size: 0, // Not using huge pages - store_index_in_file: false, // Store index in memory for faster access - }; - opts.set_plain_table_factory(&pt_opts); - } - } - if cache { - let cache = Cache::new_lru_cache(GIGABYTE * 30); - block_based_options.set_block_cache(&cache); - } - opts.set_block_based_table_factory(&block_based_options); - opts - } - - pub fn backup_path(&self) -> anyhow::Result { - Ok(format!("{}backup", self.db.path().to_str().ok_or(anyhow!("Invalid path"))?)) - } - - fn backup_engine(&self) -> anyhow::Result { - let backup_opts = BackupEngineOptions::new(self.backup_path()?)?; - let backup_env = Env::new()?; - Ok(BackupEngine::open(&backup_opts, &backup_env)?) - } - - pub fn backup(&self) -> anyhow::Result<()> { - let mut backup_engine = self.backup_engine()?; - backup_engine.create_new_backup(&self.db)?; - backup_engine.purge_old_backups(2)?; - Ok(()) - } - - // Clears the database - pub fn clear(&self) -> Result<()> { - let mut batch = WriteBatch::default(); - let cf = self.db.cf_handle(&self.column_family).unwrap(); - for item in self.db.iterator_cf(&cf, IteratorMode::Start) { - let (key, _) = item?; // Handle or unwrap the Result - batch.delete_cf(&cf, key); - } - self.db.write(batch)?; - Ok(()) - } - - pub fn get(&self, key: &K) -> Option { - let Ok(serialized_key) = bincode::serialize(key) else { return None }; - let cf = self.db.cf_handle(&self.column_family).unwrap(); - let Ok(Some(value_bytes)) = self.db.get_cf(&cf, serialized_key) else { - return None; - }; - - bincode::deserialize(&value_bytes).ok() - } - - pub fn multi_get(&self, keys: I) -> anyhow::Result> - where - I: IntoIterator + Clone, - { - let serialized_keys = keys.clone().into_iter().map(|k| bincode::serialize(&k)).collect::, _>>()?; - Ok(self - .db - .multi_get(serialized_keys) - .into_iter() - .zip(keys) - .filter_map(|(value, key)| { - if let Ok(Some(value)) = value { - let Ok(value) = bincode::deserialize::(&value) else { return None }; // XXX: Maybe we should fail on a failed conversion instead of ignoring; - Some((key, value)) - } else { - None - } - }) - .collect()) - } - - pub fn get_current_block_number(&self) -> u64 { - let Ok(serialized_key) = bincode::serialize(&"current_block") else { - return 0; - }; - let cf = self.db.cf_handle(&self.column_family).unwrap(); - - let Ok(Some(value_bytes)) = self.db.get_cf(&cf, serialized_key) else { - return 0; - }; - - bincode::deserialize(&value_bytes).ok().unwrap_or(0) - } - - pub fn get_index_block_number(&self) -> u64 { - self.last_index().map(|(block_number, _)| block_number).unwrap_or(0) - } - - // Mimics the 'insert' functionality of a HashMap - pub fn insert(&self, key: K, value: V) { - let cf = self.db.cf_handle(&self.column_family).unwrap(); - - let serialized_key = bincode::serialize(&key).unwrap(); - let serialized_value = bincode::serialize(&value).unwrap(); - self.db.put_cf(&cf, serialized_key, serialized_value).unwrap(); - } - - pub fn prepare_batch_insertion(&self, changes: Vec<(K, V)>, current_block: Option, batch: &mut WriteBatch) { - let cf = self.db.cf_handle(&self.column_family).unwrap(); - - for (key, value) in changes { - let serialized_key = bincode::serialize(&key).unwrap(); - let serialized_value = bincode::serialize(&value).unwrap(); - // Add each serialized key-value pair to the batch - batch.put_cf(&cf, serialized_key, serialized_value); - } - - if let Some(current_block) = current_block { - let serialized_block_key = bincode::serialize(&"current_block").unwrap(); - let serialized_block_value = bincode::serialize(¤t_block).unwrap(); - batch.put_cf(&cf, serialized_block_key, serialized_block_value); - } - } - - /// inserts data but keep a block as key pointing to the keys inserted in a given block - /// this makes for faster search based on block_number, ergo index - pub fn prepare_batch_insertion_indexed(&self, changes: Vec<(K, V)>, current_block: u64, batch: &mut WriteBatch) { - let cf = self.db.cf_handle(&self.column_family).unwrap(); - - let mut keys = vec![]; - - for (key, value) in changes { - let serialized_key = bincode::serialize(&key).unwrap(); - let serialized_value = bincode::serialize(&value).unwrap(); - - keys.push(key); - - // Add each serialized key-value pair to the batch - batch.put_cf(&cf, serialized_key, serialized_value); - } - - let serialized_block_value = bincode::serialize(¤t_block).unwrap(); - let serialized_keys = bincode::serialize(&keys).unwrap(); - batch.put_cf(&cf, serialized_block_value, serialized_keys); - } - - // Deletes an entry from the database by key - pub fn delete(&self, key: &K) -> Result<()> { - let serialized_key = bincode::serialize(key)?; - let cf = self.db.cf_handle(&self.column_family).unwrap(); - - self.db.delete_cf(&cf, serialized_key)?; - Ok(()) - } - - // Deletes an entry from the database by key - pub fn delete_index(&self, key: u64) -> Result<()> { - let serialized_key = bincode::serialize(&key)?; - let cf = self.db.cf_handle(&self.column_family).unwrap(); - //XXX check if value is a vec that can be deserialized as a safety measure - self.db.delete_cf(&cf, serialized_key)?; - Ok(()) - } - - // Custom method that combines entry and or_insert_with from a HashMap - pub fn entry_or_insert_with(&self, key: K, default: F) -> V - where - F: FnOnce() -> V, - { - match self.get(&key) { - Some(value) => value, - None => { - let new_value = default(); - self.insert(key, new_value.clone()); - new_value - } - } - } - - pub fn iter_start(&self) -> RocksDBIterator { - let cf = self.db.cf_handle(&self.column_family).unwrap(); - - let iter = self.db.iterator_cf(&cf, IteratorMode::Start); - RocksDBIterator::::new(iter) - } - - pub fn iter_end(&self) -> RocksDBIterator { - let cf = self.db.cf_handle(&self.column_family).unwrap(); - - let iter = self.db.iterator_cf(&cf, IteratorMode::End); - RocksDBIterator::::new(iter) - } - - pub fn indexed_iter_end(&self) -> IndexedRocksDBIterator { - let cf = self.db.cf_handle(&self.column_family).unwrap(); - - let iter = self.db.iterator_cf(&cf, IteratorMode::End); - IndexedRocksDBIterator::::new(iter) - } - - pub fn iter_from Deserialize<'de> + std::hash::Hash + Eq>( - &self, - key_prefix: P, - direction: rocksdb::Direction, - ) -> RocksDBIterator { - let serialized_key = bincode::serialize(&key_prefix).unwrap(); - let cf = self.db.cf_handle(&self.column_family).unwrap(); - - let iter = self.db.iterator_cf(&cf, IteratorMode::From(&serialized_key, direction)); - RocksDBIterator::::new(iter) - } - - pub fn last_index(&self) -> Option<(u64, Vec)> { - let cf = self.db.cf_handle(&self.column_family).unwrap(); - - let iter = self.db.iterator_cf(&cf, IteratorMode::End); - IndexedRocksDBIterator::::new(iter).next() - } - - pub fn last(&self) -> Option<(K, V)> { - let cf = self.db.cf_handle(&self.column_family).unwrap(); - - let mut iter = self.db.iterator_cf(&cf, IteratorMode::End); - if let Some(Ok((k, v))) = iter.next() { - let key = bincode::deserialize(&k).unwrap(); - let value = bincode::deserialize(&v).unwrap(); - Some((key, value)) - } else { - None - } - } - - #[cfg(feature = "metrics")] - pub fn get_histogram_average_in_interval(&self, hist: Histogram) -> u64 { - // The stats are cumulative since opening the db - // we can get the average in the time interval with: avg = (new_sum - sum)/(new_count - count) - - let mut prev_values = self.prev_stats.lock().unwrap(); - let (prev_sum, prev_count): (Sum, Count) = *prev_values.get(&(hist as u32)).unwrap_or(&(0, 0)); - let data = self.opts.get_histogram_data(hist); - let data_count = data.count(); - let data_sum = data.sum(); - - let Some(avg) = (data_sum - prev_sum).checked_div(data_count - prev_count) else { - return 0; - }; - - prev_values.insert(hist as u32, (data_sum, data_count)); - avg - } - - #[cfg(feature = "metrics")] - pub fn export_metrics(&self) { - let db_get = self.opts.get_histogram_data(Histogram::DbGet); - let db_write = self.opts.get_histogram_data(Histogram::DbWrite); - - let block_cache_miss = self.opts.get_ticker_count(Ticker::BlockCacheMiss); - let block_cache_hit = self.opts.get_ticker_count(Ticker::BlockCacheHit); - let bytes_written = self.opts.get_ticker_count(Ticker::BytesWritten); - let bytes_read = self.opts.get_ticker_count(Ticker::BytesRead); - - let db_name = self.db.path().file_name().unwrap().to_str(); - - metrics::set_rocks_db_get(db_get.count(), db_name); - metrics::set_rocks_db_write(db_write.count(), db_name); - metrics::set_rocks_block_cache_miss(block_cache_miss, db_name); - metrics::set_rocks_block_cache_hit(block_cache_hit, db_name); - metrics::set_rocks_bytes_written(bytes_written, db_name); - metrics::set_rocks_bytes_read(bytes_read, db_name); - - metrics::set_rocks_compaction_time(self.get_histogram_average_in_interval(Histogram::CompactionTime), db_name); - metrics::set_rocks_compaction_cpu_time(self.get_histogram_average_in_interval(Histogram::CompactionCpuTime), db_name); - metrics::set_rocks_flush_time(self.get_histogram_average_in_interval(Histogram::FlushTime), db_name); - } -} - -pub struct RocksDBIterator<'a, K, V> { - iter: DBIteratorWithThreadMode<'a, DB>, - _marker: PhantomData<(K, V)>, -} - -impl<'a, K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Serialize + for<'de> Deserialize<'de> + Clone> RocksDBIterator<'a, K, V> { - pub fn new(iter: DBIteratorWithThreadMode<'a, DB>) -> Self { - Self { iter, _marker: PhantomData } - } -} - -/// Custom iterator for navigating RocksDB entries. -/// -/// This iterator is designed to skip over specific keys used for internal control purposes, such as: -/// - `"current_block"`: Used to indicate the current block number in the database. -/// - Keys representing index keys (if deserialized as `u64`): Used for various indexing purposes. -/// -/// The iterator will: -/// - Ignore any entries where the key is `"current_block"`. -/// - Attempt to deserialize all other keys to the generic type `K`. If deserialization fails, it assumes -/// the key might be an index key or improperly formatted, and skips it. -impl<'a, K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Serialize + for<'de> Deserialize<'de> + Clone> Iterator - for RocksDBIterator<'a, K, V> +/// Create or open the Database with the configs applied to all column families +pub fn create_or_open_db(path: &Path, db_opts: &Options, cf_descriptor_iter: CfDescriptorIter) -> anyhow::Result> +where + CfDescriptorIter: Iterator + Clone, + CfName: AsRef, { - type Item = (K, V); - - /// Retrieves the next key-value pair from the database, skipping over special control keys and - /// potentially misformatted keys. - /// - /// Returns: - /// - `Some((K, V))` if a valid key-value pair is found. - /// - `None` if there are no more items to process, or if only special/control keys remain. - fn next(&mut self) -> Option { - for key_value_result in self.iter.by_ref() { - let Ok((key, value)) = key_value_result else { continue }; - - // Check if the key is a special 'current_block' key and skip it - if key == bincode::serialize(&"current_block").unwrap().into_boxed_slice() { - continue; // Move to the next key-value pair - } + let open_db = || DB::open_cf_with_opts(db_opts, path, cf_descriptor_iter.clone()); - // Attempt to deserialize the key to type `K` - if let Ok(deserialized_key) = bincode::deserialize::(&key) { - // Attempt to deserialize the value to type `V` - if let Ok(deserialized_value) = bincode::deserialize::(&value) { - // Return the deserialized key-value pair if both are successful - return Some((deserialized_key, deserialized_value)); - } - } - // If deserialization fails, continue to the next item + let db = match open_db() { + Ok(db) => db, + Err(e) => { + tracing::error!("Failed to open RocksDB: {}", e); + DB::repair(db_opts, path)?; + open_db()? } - // Return None if all items are processed or if all remaining items fail conditions - None - } -} + }; // XXX in case of corruption, use DB -impl<'a, K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq> IndexedRocksDBIterator<'a, K> { - pub fn new(iter: DBIteratorWithThreadMode<'a, DB>) -> Self { - Self { iter, _marker: PhantomData } - } + Ok(Arc::new(db)) } -pub struct IndexedRocksDBIterator<'a, K> { - iter: DBIteratorWithThreadMode<'a, DB>, - _marker: PhantomData>, +pub fn create_new_backup(db: &DB) -> anyhow::Result<()> { + let mut backup_engine = backup_engine(db)?; + backup_engine.create_new_backup(db)?; + backup_engine.purge_old_backups(2)?; + Ok(()) } -impl<'a, K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq> Iterator for IndexedRocksDBIterator<'a, K> { - type Item = (u64, Vec); - - fn next(&mut self) -> Option { - for key_value_result in self.iter.by_ref() { - let Ok((key, value)) = key_value_result else { continue }; +fn backup_engine(db: &DB) -> anyhow::Result { + let db_path = db.path().to_str().ok_or(anyhow!("Invalid path"))?; + let backup_path = format!("{db_path}backup"); + let backup_opts = BackupEngineOptions::new(backup_path)?; - if let Ok(index_key) = bincode::deserialize::(&key) { - if let Ok(index_values) = bincode::deserialize::>(&value) { - return Some((index_key, index_values)); - } - } - } - None - } + let backup_env = Env::new()?; + Ok(BackupEngine::open(&backup_opts, &backup_env)?) } diff --git a/src/eth/storage/rocks/rocks_permanent.rs b/src/eth/storage/rocks/rocks_permanent.rs index 8a47863e0..6c8346fe3 100644 --- a/src/eth/storage/rocks/rocks_permanent.rs +++ b/src/eth/storage/rocks/rocks_permanent.rs @@ -152,8 +152,8 @@ impl PermanentStorage for RocksPermanentStorage { // save block let number = *block.number(); - let txs_rocks = Arc::clone(&self.state.transactions); - let logs_rocks = Arc::clone(&self.state.logs); + let txs_rocks = self.state.transactions.clone(); + let logs_rocks = self.state.logs.clone(); txs_rocks.prepare_batch_insertion_indexed(txs_batch, number.as_u64(), &mut batch); @@ -161,8 +161,8 @@ impl PermanentStorage for RocksPermanentStorage { let hash = *block.hash(); - let blocks_by_number = Arc::clone(&self.state.blocks_by_number); - let blocks_by_hash = Arc::clone(&self.state.blocks_by_hash); + let blocks_by_number = self.state.blocks_by_number.clone(); + let blocks_by_hash = self.state.blocks_by_hash.clone(); let mut block_without_changes = block.clone(); for transaction in &mut block_without_changes.transactions { // checks if it has a contract address to keep, later this will be used to gather deployed_contract_address diff --git a/src/eth/storage/rocks/rocks_state.rs b/src/eth/storage/rocks/rocks_state.rs index 14120e615..704321307 100644 --- a/src/eth/storage/rocks/rocks_state.rs +++ b/src/eth/storage/rocks/rocks_state.rs @@ -1,21 +1,29 @@ use core::fmt; -use std::collections::HashMap; use std::path::Path; +use std::path::PathBuf; use std::sync::atomic::AtomicU64; use std::sync::Arc; use anyhow::anyhow; use futures::future::join_all; use itertools::Itertools; +use lazy_static::lazy_static; use num_traits::cast::ToPrimitive; +use rocksdb::Options; use rocksdb::WriteBatch; use rocksdb::DB; +use serde::Deserialize; +use serde::Serialize; +use sugars::hmap; use tokio::sync::mpsc; use tokio::task; use tracing::info; -use super::rocks_db::DbConfig; -use super::rocks_db::RocksDb; +use super::rocks_cf::RocksCf; +use super::rocks_config::CacheSetting; +use super::rocks_config::DbConfig; +use super::rocks_db::create_new_backup; +use super::rocks_db::create_or_open_db; use super::types::AccountRocksdb; use super::types::AddressRocksdb; use super::types::BlockNumberRocksdb; @@ -40,68 +48,105 @@ use crate::eth::primitives::TransactionMined; use crate::ext::OptionExt; use crate::log_and_err; +cfg_if::cfg_if! { + if #[cfg(feature = "metrics")] { + use std::collections::HashMap; + use std::sync::Mutex; + + use rocksdb::statistics::Histogram; + use rocksdb::statistics::Ticker; + + use crate::infra::metrics::{self, Count, HistogramInt, Sum}; + } +} + +lazy_static! { + static ref CF_OPTIONS_MAP: HashMap<&'static str, Options> = hmap! { + "accounts" => DbConfig::Default.to_options(CacheSetting::Enabled), + "accounts_history" => DbConfig::FastWriteSST.to_options(CacheSetting::Disabled), + "account_slots" => DbConfig::Default.to_options(CacheSetting::Enabled), + "account_slots_history" => DbConfig::FastWriteSST.to_options(CacheSetting::Disabled), + "transactions" => DbConfig::LargeSSTFiles.to_options(CacheSetting::Disabled), + "blocks_by_number" => DbConfig::LargeSSTFiles.to_options(CacheSetting::Disabled), + "blocks_by_hash" => DbConfig::LargeSSTFiles.to_options(CacheSetting::Disabled), + "logs" => DbConfig::LargeSSTFiles.to_options(CacheSetting::Disabled), + }; +} + +/// State handler for our RocksDB storage, separating "tables" by column families. +/// +/// With data separated by column families, writing and reading should be done via the `RocksCf` fields, +/// while operations that include the whole database (e.g. backup) should refer to the inner `DB` directly. +#[derive(Clone)] pub struct RocksStorageState { - pub db_ref: Arc, - pub accounts: Arc>, - pub accounts_history: Arc>, - pub account_slots: Arc>, - pub account_slots_history: Arc>, - pub transactions: Arc>, - pub blocks_by_number: Arc>, - pub blocks_by_hash: Arc>, - pub logs: Arc>, + pub db: Arc, + pub db_path: PathBuf, + pub accounts: RocksCf, + pub accounts_history: RocksCf<(AddressRocksdb, BlockNumberRocksdb), AccountRocksdb>, + pub account_slots: RocksCf<(AddressRocksdb, SlotIndexRocksdb), SlotValueRocksdb>, + pub account_slots_history: RocksCf<(AddressRocksdb, SlotIndexRocksdb, BlockNumberRocksdb), SlotValueRocksdb>, + pub transactions: RocksCf, + pub blocks_by_number: RocksCf, + pub blocks_by_hash: RocksCf, + pub logs: RocksCf<(HashRocksdb, IndexRocksdb), BlockNumberRocksdb>, pub backup_trigger: Arc>, + /// Last collected stats for a histogram + #[cfg(feature = "metrics")] + pub prev_stats: Arc>>, + /// Options passed at DB creation, stored for metrics + /// + /// a newly created `rocksdb::Options` object is unique, with an underlying pointer identifier inside of it, and is used to access + /// the DB metrics, `Options` can be cloned, but two equal `Options` might not retrieve the same metrics + #[cfg(feature = "metrics")] + pub db_options: Options, } impl RocksStorageState { pub fn new(path: impl AsRef) -> Self { - let (tx, rx) = mpsc::channel::<()>(1); - let (db, _opts) = RocksDb::::new_db(path.as_ref(), DbConfig::Default).unwrap(); + let db_path = path.as_ref().to_path_buf(); + let (backup_trigger_tx, backup_trigger_rx) = mpsc::channel::<()>(1); - let db = || Arc::clone(&db); + // granular settings for each Column Family to be created + let cf_options_iter = CF_OPTIONS_MAP.iter().map(|(name, opts)| (*name, opts.clone())); + + let db_options = DbConfig::Default.to_options(CacheSetting::Disabled); + + let db = create_or_open_db(&db_path, &db_options, cf_options_iter).unwrap(); //XXX TODO while repair/restore from backup, make sure to sync online and only when its in sync with other nodes, receive requests let state = Self { - db_ref: db(), - accounts: RocksDb::new("accounts", db(), DbConfig::Default).unwrap(), - accounts_history: RocksDb::new("accounts_history", db(), DbConfig::FastWriteSST).unwrap(), - account_slots: RocksDb::new("account_slots", db(), DbConfig::Default).unwrap(), - account_slots_history: RocksDb::new("account_slots_history", db(), DbConfig::FastWriteSST).unwrap(), - transactions: RocksDb::new("transactions", db(), DbConfig::LargeSSTFiles).unwrap(), - blocks_by_number: RocksDb::new("blocks_by_number", db(), DbConfig::LargeSSTFiles).unwrap(), - blocks_by_hash: RocksDb::new("blocks_by_hash", db(), DbConfig::LargeSSTFiles).unwrap(), //XXX this is not needed we can afford to have blocks_by_hash pointing into blocks_by_number - logs: RocksDb::new("logs", db(), DbConfig::LargeSSTFiles).unwrap(), - backup_trigger: Arc::new(tx), + db_path, + accounts: new_cf(&db, "accounts"), + accounts_history: new_cf(&db, "accounts_history"), + account_slots: new_cf(&db, "account_slots"), + account_slots_history: new_cf(&db, "account_slots_history"), + transactions: new_cf(&db, "transactions"), + blocks_by_number: new_cf(&db, "blocks_by_number"), + blocks_by_hash: new_cf(&db, "blocks_by_hash"), //XXX this is not needed we can afford to have blocks_by_hash pointing into blocks_by_number + logs: new_cf(&db, "logs"), + backup_trigger: Arc::new(backup_trigger_tx), + #[cfg(feature = "metrics")] + prev_stats: Default::default(), + #[cfg(feature = "metrics")] + db_options, + db, }; - state.listen_for_backup_trigger(rx).unwrap(); + state.listen_for_backup_trigger(backup_trigger_rx).unwrap(); state } - pub fn listen_for_backup_trigger(&self, rx: mpsc::Receiver<()>) -> anyhow::Result<()> { - tracing::info!("starting backup trigger listener"); - let accounts = Arc::>::clone(&self.accounts); - let accounts_history = Arc::>::clone(&self.accounts_history); - let account_slots = Arc::>::clone(&self.account_slots); - let account_slots_history = - Arc::>::clone(&self.account_slots_history); - let blocks_by_hash = Arc::>::clone(&self.blocks_by_hash); - let blocks_by_number = Arc::>::clone(&self.blocks_by_number); - let transactions = Arc::>::clone(&self.transactions); - let logs = Arc::>::clone(&self.logs); - - tokio::spawn(async move { - let mut rx = rx; - while rx.recv().await.is_some() { - accounts.backup().unwrap(); - accounts_history.backup().unwrap(); - account_slots.backup().unwrap(); - account_slots_history.backup().unwrap(); - transactions.backup().unwrap(); - blocks_by_number.backup().unwrap(); - blocks_by_hash.backup().unwrap(); - logs.backup().unwrap(); + pub fn listen_for_backup_trigger(&self, mut rx: mpsc::Receiver<()>) -> anyhow::Result<()> { + tracing::info!("starting rocksdb backup trigger listener"); + + tokio::spawn({ + let db = Arc::clone(&self.db); + + async move { + while rx.recv().await.is_some() { + create_new_backup(&db).expect("failed to backup DB"); + } } }); @@ -117,7 +162,7 @@ impl RocksStorageState { pub async fn reset_at(&self, block_number: BlockNumber) -> anyhow::Result<()> { let tasks = vec![ { - let self_blocks_by_hash_clone = Arc::clone(&self.blocks_by_hash); + let self_blocks_by_hash_clone = self.blocks_by_hash.clone(); task::spawn_blocking(move || { for (block_num, block_hash_vec) in self_blocks_by_hash_clone.indexed_iter_end() { if block_num <= block_number.as_u64() { @@ -136,7 +181,7 @@ impl RocksStorageState { }) }, { - let self_blocks_by_number_clone = Arc::clone(&self.blocks_by_number); + let self_blocks_by_number_clone = self.blocks_by_number.clone(); task::spawn_blocking(move || { let blocks_by_number = self_blocks_by_number_clone.iter_end(); for (num, _) in blocks_by_number { @@ -152,7 +197,7 @@ impl RocksStorageState { }) }, { - let self_transactions_clone = Arc::clone(&self.transactions); + let self_transactions_clone = self.transactions.clone(); task::spawn_blocking(move || { let transactions = self_transactions_clone.indexed_iter_end(); for (index_block_number, hash_vec) in transactions { @@ -171,7 +216,7 @@ impl RocksStorageState { }) }, { - let self_logs_clone = Arc::clone(&self.logs); + let self_logs_clone = self.logs.clone(); task::spawn_blocking(move || { let logs = self_logs_clone.indexed_iter_end(); for (index_block_number, logs_vec) in logs { @@ -190,7 +235,7 @@ impl RocksStorageState { }) }, { - let self_accounts_history_clone = Arc::clone(&self.accounts_history); + let self_accounts_history_clone = self.accounts_history.clone(); task::spawn_blocking(move || { let accounts_history = self_accounts_history_clone.indexed_iter_end(); for (index_block_number, accounts_history_vec) in accounts_history { @@ -209,7 +254,7 @@ impl RocksStorageState { }) }, { - let self_account_slots_history_clone = Arc::clone(&self.account_slots_history); + let self_account_slots_history_clone = self.account_slots_history.clone(); task::spawn_blocking(move || { let account_slots_history = self_account_slots_history_clone.indexed_iter_end(); for (index_block_number, account_slots_history_vec) in account_slots_history { @@ -238,8 +283,8 @@ impl RocksStorageState { // Spawn task for handling accounts let accounts_task = task::spawn_blocking({ - let self_accounts_history_clone = Arc::clone(&self.accounts_history); - let self_accounts_clone = Arc::clone(&self.accounts); + let self_accounts_history_clone = self.accounts_history.clone(); + let self_accounts_clone = self.accounts.clone(); move || { let mut latest_accounts: HashMap = std::collections::HashMap::new(); let account_histories = self_accounts_history_clone.iter_start(); @@ -268,8 +313,8 @@ impl RocksStorageState { // Spawn task for handling slots let slots_task = task::spawn_blocking({ - let self_account_slots_history_clone = Arc::clone(&self.account_slots_history); - let self_account_slots_clone = Arc::clone(&self.account_slots); + let self_account_slots_history_clone = self.account_slots_history.clone(); + let self_account_slots_clone = self.account_slots.clone(); move || { let mut latest_slots: HashMap<(AddressRocksdb, SlotIndexRocksdb), (BlockNumberRocksdb, SlotValueRocksdb)> = std::collections::HashMap::new(); let slot_histories = self_account_slots_history_clone.iter_start(); @@ -308,10 +353,10 @@ impl RocksStorageState { /// Updates the in-memory state with changes from transaction execution pub fn update_state_with_execution_changes(&self, changes: &[ExecutionAccountChanges], block_number: BlockNumber, batch: &mut WriteBatch) { // Directly capture the fields needed by each future from `self` - let accounts = Arc::clone(&self.accounts); - let accounts_history = Arc::clone(&self.accounts_history); - let account_slots = Arc::clone(&self.account_slots); - let account_slots_history = Arc::clone(&self.account_slots_history); + let accounts = self.accounts.clone(); + let accounts_history = self.accounts_history.clone(); + let account_slots = self.account_slots.clone(); + let account_slots_history = self.account_slots_history.clone(); let changes_clone_for_accounts = changes.to_vec(); // Clone changes for accounts future let changes_clone_for_slots = changes.to_vec(); // Clone changes for slots future @@ -511,7 +556,7 @@ impl RocksStorageState { /// Write to all DBs in a batch pub fn write_batch(&self, batch: WriteBatch) -> anyhow::Result<()> { let batch_len = batch.len(); - let result = self.db_ref.write(batch); + let result = self.db.write(batch); if let Err(err) = &result { tracing::error!(?err, batch_len, "failed to write batch to DB"); @@ -531,22 +576,65 @@ impl RocksStorageState { self.logs.clear()?; Ok(()) } +} - #[cfg(feature = "metrics")] +#[cfg(feature = "metrics")] +impl RocksStorageState { pub fn export_metrics(&self) { - self.account_slots.export_metrics(); - self.account_slots_history.export_metrics(); + let db_get = self.db_options.get_histogram_data(Histogram::DbGet); + let db_write = self.db_options.get_histogram_data(Histogram::DbWrite); + + let block_cache_miss = self.db_options.get_ticker_count(Ticker::BlockCacheMiss); + let block_cache_hit = self.db_options.get_ticker_count(Ticker::BlockCacheHit); + let bytes_written = self.db_options.get_ticker_count(Ticker::BytesWritten); + let bytes_read = self.db_options.get_ticker_count(Ticker::BytesRead); + + let db_name = self.db.path().file_name().unwrap().to_str(); + + metrics::set_rocks_db_get(db_get.count(), db_name); + metrics::set_rocks_db_write(db_write.count(), db_name); + metrics::set_rocks_block_cache_miss(block_cache_miss, db_name); + metrics::set_rocks_block_cache_hit(block_cache_hit, db_name); + metrics::set_rocks_bytes_written(bytes_written, db_name); + metrics::set_rocks_bytes_read(bytes_read, db_name); + + metrics::set_rocks_compaction_time(self.get_histogram_average_in_interval(Histogram::CompactionTime), db_name); + metrics::set_rocks_compaction_cpu_time(self.get_histogram_average_in_interval(Histogram::CompactionCpuTime), db_name); + metrics::set_rocks_flush_time(self.get_histogram_average_in_interval(Histogram::FlushTime), db_name); + } - self.accounts.export_metrics(); - self.accounts_history.export_metrics(); + fn get_histogram_average_in_interval(&self, hist: Histogram) -> u64 { + // The stats are cumulative since opening the db + // we can get the average in the time interval with: avg = (new_sum - sum)/(new_count - count) - self.blocks_by_hash.export_metrics(); - self.blocks_by_number.export_metrics(); + let mut prev_values = self.prev_stats.lock().unwrap(); + let (prev_sum, prev_count): (Sum, Count) = *prev_values.get(&(hist as u32)).unwrap_or(&(0, 0)); + let data = self.db_options.get_histogram_data(hist); + let data_count = data.count(); + let data_sum = data.sum(); + + let Some(avg) = (data_sum - prev_sum).checked_div(data_count - prev_count) else { + return 0; + }; + + prev_values.insert(hist as u32, (data_sum, data_count)); + avg } } impl fmt::Debug for RocksStorageState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RocksDb").field("db", &"Arc").finish() + f.debug_struct("RocksStorageState").field("db_path", &self.db_path).finish() } } + +fn new_cf(db: &Arc, column_family: &str) -> RocksCf +where + K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, + V: Serialize + for<'de> Deserialize<'de> + Clone, +{ + let options = CF_OPTIONS_MAP + .get(&column_family) + .unwrap_or_else(|| panic!("column_family `{column_family}` given to `new_cf` not found in options map")); + RocksCf::new_cf(Arc::clone(db), column_family, options.clone()) +} diff --git a/src/infra/metrics/metrics_types.rs b/src/infra/metrics/metrics_types.rs index 77e5579d9..4ffeff251 100644 --- a/src/infra/metrics/metrics_types.rs +++ b/src/infra/metrics/metrics_types.rs @@ -7,6 +7,10 @@ use metrics::Label; use crate::ext::not; +pub type HistogramInt = u32; +pub type Sum = u64; +pub type Count = u64; + /// Metric definition. pub(super) struct Metric { pub(super) kind: &'static str,