diff --git a/src/main.rs b/src/main.rs index 8ae1e0c..d0ca365 100644 --- a/src/main.rs +++ b/src/main.rs @@ -86,7 +86,7 @@ async fn main() -> Result<()> { ReconstructSource::L1 { l1_fetcher_options } => { let fetcher_options = l1_fetcher_options.into(); let processor = TreeProcessor::new(db_path.clone()).await?; - let fetcher = L1Fetcher::new(fetcher_options, Some(processor.get_snapshot()))?; + let fetcher = L1Fetcher::new(fetcher_options, Some(processor.get_inner_db()))?; let (tx, rx) = mpsc::channel::(5); let processor_handle = tokio::spawn(async move { @@ -160,7 +160,8 @@ async fn main() -> Result<()> { let processor = SnapshotBuilder::new(db_path); let mut fetcher_options: L1FetcherOptions = l1_fetcher_options.into(); - if let Ok(Some(batch_number)) = processor.get_last_l1_batch_number() { + if let Ok(batch_number) = processor.get_latest_l1_batch_number() { + let batch_number = batch_number.as_u64(); if batch_number > ethereum::GENESIS_BLOCK { tracing::info!( "Found a preexisting snapshot db, continuing from L1 block: {batch_number}" diff --git a/src/processor/snapshot/exporter.rs b/src/processor/snapshot/exporter.rs index fc5b3a6..a61b62c 100644 --- a/src/processor/snapshot/exporter.rs +++ b/src/processor/snapshot/exporter.rs @@ -1,17 +1,19 @@ use std::path::{Path, PathBuf}; -use ethers::types::{U256, U64}; +use ethers::types::U256; use eyre::Result; use state_reconstruct_storage::{ + snapshot_columns, types::{ Proto, SnapshotFactoryDependencies, SnapshotFactoryDependency, SnapshotHeader, SnapshotStorageLogsChunk, SnapshotStorageLogsChunkMetadata, }, - InnerDB, FACTORY_DEPS, INDEX_TO_KEY_MAP, + DBMode, InnerDB, INDEX_TO_KEY_MAP, }; -use super::{DEFAULT_DB_PATH, SNAPSHOT_HEADER_FILE_NAME}; -use crate::processor::snapshot::SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX; +use crate::processor::snapshot::{ + DEFAULT_DB_PATH, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME, +}; pub struct SnapshotExporter { basedir: PathBuf, @@ -25,7 +27,7 @@ impl SnapshotExporter { None => PathBuf::from(DEFAULT_DB_PATH), }; - let database = InnerDB::new_read_only(db_path)?; + let database = InnerDB::new_read_only(db_path, DBMode::Snapshot)?; Ok(Self { basedir: basedir.to_path_buf(), database, @@ -33,12 +35,7 @@ impl SnapshotExporter { } pub fn export_snapshot(&self, chunk_size: u64) -> Result<()> { - let l1_batch_number = U64::from( - self.database - .get_last_l1_batch_number()? - .expect("snapshot db contains no L1 batch number"), - ); - + let l1_batch_number = self.database.get_latest_l1_batch_number()?; let mut header = SnapshotHeader { l1_batch_number, ..Default::default() @@ -62,7 +59,10 @@ impl SnapshotExporter { fn export_factory_deps(&self, header: &mut SnapshotHeader) -> Result<()> { tracing::info!("Exporting factory dependencies..."); - let storage_logs = self.database.cf_handle(FACTORY_DEPS).unwrap(); + let storage_logs = self + .database + .cf_handle(snapshot_columns::FACTORY_DEPS) + .unwrap(); let mut iterator = self .database .iterator_cf(storage_logs, rocksdb::IteratorMode::Start); diff --git a/src/processor/snapshot/importer.rs b/src/processor/snapshot/importer.rs index d320443..0de0220 100644 --- a/src/processor/snapshot/importer.rs +++ b/src/processor/snapshot/importer.rs @@ -8,7 +8,7 @@ use eyre::Result; use state_reconstruct_fetcher::constants::storage::INNER_DB_NAME; use state_reconstruct_storage::{ types::{Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk}, - InnerDB, + DBMode, InnerDB, }; use tokio::sync::Mutex; @@ -25,7 +25,7 @@ pub struct SnapshotImporter { impl SnapshotImporter { pub async fn new(directory: PathBuf, db_path: &Path) -> Result { let inner_db_path = db_path.join(INNER_DB_NAME); - let new_state = InnerDB::new(inner_db_path.clone())?; + let new_state = InnerDB::new(inner_db_path.clone(), DBMode::Reconstruction)?; let snapshot = Arc::new(Mutex::new(new_state)); let tree = TreeWrapper::new(db_path, snapshot.clone(), true).await?; diff --git a/src/processor/snapshot/mod.rs b/src/processor/snapshot/mod.rs index 9e7aede..eeb6c5b 100644 --- a/src/processor/snapshot/mod.rs +++ b/src/processor/snapshot/mod.rs @@ -14,7 +14,7 @@ use state_reconstruct_fetcher::{ use state_reconstruct_storage::{ bytecode, types::{MiniblockNumber, SnapshotFactoryDependency, SnapshotStorageLog}, - InnerDB, + DBMode, InnerDB, }; use tokio::sync::mpsc; @@ -35,7 +35,7 @@ impl SnapshotBuilder { None => PathBuf::from(DEFAULT_DB_PATH), }; - let mut database = InnerDB::new(db_path).unwrap(); + let mut database = InnerDB::new(db_path, DBMode::Snapshot).unwrap(); let idx = database .get_last_repeated_key_index() @@ -51,8 +51,8 @@ impl SnapshotBuilder { } // Gets the next L1 batch number to be processed for ues in state recovery. - pub fn get_last_l1_batch_number(&self) -> Result> { - self.database.get_last_l1_batch_number() + pub fn get_latest_l1_batch_number(&self) -> Result { + self.database.get_latest_l1_batch_number() } } @@ -120,7 +120,7 @@ impl Processor for SnapshotBuilder { } if let Some(number) = block.l1_block_number { - let _ = self.database.set_last_l1_batch_number(number); + let _ = self.database.set_latest_l1_batch_number(number); }; } } @@ -236,7 +236,7 @@ mod tests { use std::fs; use indexmap::IndexMap; - use state_reconstruct_storage::{InnerDB, PackingType}; + use state_reconstruct_storage::{DBMode, InnerDB, PackingType}; use super::*; @@ -271,7 +271,7 @@ mod tests { builder.run(rx).await; } - let db = InnerDB::new(PathBuf::from(db_dir.clone())).unwrap(); + let db = InnerDB::new(PathBuf::from(db_dir.clone()), DBMode::Snapshot).unwrap(); let key = U256::from_dec_str("1234").unwrap(); let Some(log) = db.get_storage_log(&key).unwrap() else { diff --git a/src/processor/tree/mod.rs b/src/processor/tree/mod.rs index 0ffdf0e..3bbb2f8 100644 --- a/src/processor/tree/mod.rs +++ b/src/processor/tree/mod.rs @@ -11,7 +11,7 @@ use state_reconstruct_fetcher::{ metrics::{PerfMetric, METRICS_TRACING_TARGET}, types::CommitBlock, }; -use state_reconstruct_storage::InnerDB; +use state_reconstruct_storage::{DBMode::Reconstruction, InnerDB}; use tokio::{ sync::{mpsc, Mutex}, time::Instant, @@ -26,7 +26,7 @@ pub struct TreeProcessor { /// The internal merkle tree. tree: TreeWrapper, /// The stored state snapshot. - snapshot: Arc>, + inner_db: Arc>, } impl TreeProcessor { @@ -46,15 +46,15 @@ impl TreeProcessor { ); } - let new_state = InnerDB::new(inner_db_path.clone())?; - let snapshot = Arc::new(Mutex::new(new_state)); - let tree = TreeWrapper::new(&db_path, snapshot.clone(), init).await?; + let new_state = InnerDB::new(inner_db_path.clone(), Reconstruction)?; + let inner_db = Arc::new(Mutex::new(new_state)); + let tree = TreeWrapper::new(&db_path, inner_db.clone(), init).await?; - Ok(Self { tree, snapshot }) + Ok(Self { tree, inner_db }) } - pub fn get_snapshot(&self) -> Arc> { - self.snapshot.clone() + pub fn get_inner_db(&self) -> Arc> { + self.inner_db.clone() } } @@ -66,10 +66,10 @@ impl Processor for TreeProcessor { while let Some(block) = rx.recv().await { // Check if we've already processed this block. let latest_l2 = self - .snapshot + .inner_db .lock() .await - .get_latest_l2_block_number() + .get_latest_l2_batch_number() .expect("value should default to 0"); if latest_l2 >= block.l2_block_number { tracing::debug!( @@ -89,10 +89,10 @@ impl Processor for TreeProcessor { // Update snapshot values. before = Instant::now(); - self.snapshot + self.inner_db .lock() .await - .set_latest_l2_block_number(block.l2_block_number) + .set_latest_l2_batch_number(block.l2_block_number) .expect("db failed"); if snapshot_metric.add(before.elapsed()) > 10 { diff --git a/src/processor/tree/tree_wrapper.rs b/src/processor/tree/tree_wrapper.rs index e533aae..bb8423e 100644 --- a/src/processor/tree/tree_wrapper.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -22,14 +22,14 @@ pub struct TreeWrapper { index_to_key: HashMap, key_to_value: HashMap, tree: MerkleTree, - snapshot: Arc>, + inner_db: Arc>, } impl TreeWrapper { /// Attempts to create a new [`TreeWrapper`]. pub async fn new( db_path: &Path, - snapshot: Arc>, + inner_db: Arc>, reconstruct: bool, ) -> Result { let db_opt = RocksDBOptions { @@ -40,7 +40,7 @@ impl TreeWrapper { let mut tree = MerkleTree::new(db); if reconstruct { - let mut guard = snapshot.lock().await; + let mut guard = inner_db.lock().await; reconstruct_genesis_state(&mut tree, &mut guard, INITAL_STATE_PATH)?; } @@ -48,7 +48,7 @@ impl TreeWrapper { index_to_key: HashMap::new(), key_to_value: HashMap::new(), tree, - snapshot, + inner_db, }) } @@ -64,7 +64,7 @@ impl TreeWrapper { let value = self.process_value(*key, *value); tree_entries.push(TreeEntry::new(*key, index, value)); - self.snapshot + self.inner_db .lock() .await .add_key(key) @@ -77,7 +77,7 @@ impl TreeWrapper { let index = *index; // Index is 1-based so we subtract 1. let key = self - .snapshot + .inner_db .lock() .await .get_key(index - 1) @@ -131,7 +131,7 @@ impl TreeWrapper { for log in &chunk.storage_logs { tree_entries.push(TreeEntry::new(log.key, log.enumeration_index, log.value)); - self.snapshot + self.inner_db .lock() .await .add_key(&log.key) @@ -147,8 +147,8 @@ impl TreeWrapper { tracing::info!("Succesfully imported snapshot containing {num_tree_entries} storage logs!",); - let snapshot = self.snapshot.lock().await; - snapshot.set_latest_l1_block_number(l1_batch_number.as_u64() + 1)?; + let db = self.inner_db.lock().await; + db.set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?; Ok(()) } diff --git a/state-reconstruct-fetcher/src/l1_fetcher.rs b/state-reconstruct-fetcher/src/l1_fetcher.rs index fb7280b..024f85a 100644 --- a/state-reconstruct-fetcher/src/l1_fetcher.rs +++ b/state-reconstruct-fetcher/src/l1_fetcher.rs @@ -73,7 +73,7 @@ pub struct L1Fetcher { } impl L1Fetcher { - pub fn new(config: L1FetcherOptions, snapshot: Option>>) -> Result { + pub fn new(config: L1FetcherOptions, inner_db: Option>>) -> Result { let provider = Provider::::try_from(&config.http_url) .expect("could not instantiate HTTP Provider"); @@ -87,7 +87,7 @@ impl L1Fetcher { provider, contracts, config, - inner_db: snapshot, + inner_db, metrics, }) } @@ -102,7 +102,7 @@ impl L1Fetcher { if current_l1_block_number == GENESIS_BLOCK.into() { if let Some(snapshot) = &self.inner_db { let snapshot_latest_l1_block_number = - snapshot.lock().await.get_latest_l1_block_number()?; + snapshot.lock().await.get_latest_l1_batch_number()?; if snapshot_latest_l1_block_number > current_l1_block_number { current_l1_block_number = snapshot_latest_l1_block_number; tracing::info!( @@ -124,7 +124,7 @@ impl L1Fetcher { metrics.first_l1_block_num = current_l1_block_number.as_u64(); metrics.latest_l1_block_num = current_l1_block_number.as_u64(); if let Some(snapshot) = &self.inner_db { - metrics.latest_l2_block_num = snapshot.lock().await.get_latest_l2_block_number()?; + metrics.latest_l2_block_num = snapshot.lock().await.get_latest_l2_batch_number()?; metrics.first_l2_block_num = metrics.latest_l2_block_num; } } @@ -194,7 +194,7 @@ impl L1Fetcher { snapshot .lock() .await - .set_latest_l1_block_number(block_num)?; + .set_latest_l1_batch_number(block_num)?; } // Fetching is naturally ahead of parsing, but the data diff --git a/state-reconstruct-storage/src/lib.rs b/state-reconstruct-storage/src/lib.rs index eeb97b6..50ef104 100644 --- a/state-reconstruct-storage/src/lib.rs +++ b/state-reconstruct-storage/src/lib.rs @@ -12,17 +12,24 @@ use types::{SnapshotFactoryDependency, SnapshotStorageLog}; pub const INDEX_TO_KEY_MAP: &str = "index_to_key_map"; pub const KEY_TO_INDEX_MAP: &str = "key_to_index_map"; +pub const METADATA: &str = "metadata"; -pub const STORAGE_LOGS: &str = "storage_logs"; -pub const FACTORY_DEPS: &str = "factory_deps"; +pub mod reconstruction_columns { + pub const LAST_REPEATED_KEY_INDEX: &str = "RECONSTRUCTION_LAST_REPEATED_KEY_INDEX"; + /// The latest l1 block number that was processed. + pub const LATEST_L1_BATCH: &str = "RECONSTRUCTION_LATEST_L1_BATCH"; + /// The latest l2 block number that was processed. + pub const LATEST_L2_BATCH: &str = "RECONSTRUCTION_LATEST_L2_BATCH"; +} -pub const METADATA: &str = "metadata"; +pub mod snapshot_columns { + pub const STORAGE_LOGS: &str = "storage_logs"; + pub const FACTORY_DEPS: &str = "factory_deps"; -pub const LAST_REPEATED_KEY_INDEX: &str = "LAST_REPEATED_KEY_INDEX"; -/// The latest l1 block number that was processed. -pub const LATEST_L1_BLOCK_NUMBER: &str = "LATEST_L1_BLOCK_NUMBER"; -/// The latest l2 block number that was processed. -pub const LATEST_L2_BLOCK_NUMBER: &str = "LATEST_L2_BLOCK_NUMBER"; + pub const LAST_REPEATED_KEY_INDEX: &str = "SNAPSHOT_LAST_REPEATED_KEY_INDEX"; + /// The latest l1 block number that was processed. + pub const LATEST_L1_BATCH: &str = "SNAPSHOT_LATEST_L1_BATCH"; +} // NOTE: This is moved here as a temporary measure to resolve a cyclic dependency issue. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] @@ -39,18 +46,28 @@ pub enum DatabaseError { NoSuchKey, } -pub struct InnerDB(DB); +#[derive(Default)] +pub enum DBMode { + #[default] + Reconstruction, + Snapshot, +} + +pub struct InnerDB { + db: DB, + mode: DBMode, +} impl Deref for InnerDB { type Target = DB; fn deref(&self) -> &Self::Target { - &self.0 + &self.db } } impl InnerDB { - pub fn new(db_path: PathBuf) -> Result { + pub fn new(db_path: PathBuf, mode: DBMode) -> Result { let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); db_opts.create_if_missing(true); @@ -61,14 +78,35 @@ impl InnerDB { db_path, vec![ METADATA, - STORAGE_LOGS, - FACTORY_DEPS, INDEX_TO_KEY_MAP, KEY_TO_INDEX_MAP, + snapshot_columns::STORAGE_LOGS, + snapshot_columns::FACTORY_DEPS, + ], + )?; + + Ok(Self { db, mode }) + } + + pub fn new_read_only(db_path: PathBuf, mode: DBMode) -> Result { + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + + let db = DB::open_cf_for_read_only( + &db_opts, + db_path, + vec![ + METADATA, + INDEX_TO_KEY_MAP, + KEY_TO_INDEX_MAP, + snapshot_columns::STORAGE_LOGS, + snapshot_columns::FACTORY_DEPS, ], + false, )?; - Ok(Self(db)) + Ok(Self { db, mode }) } pub fn process_value(&self, key: U256, value: PackingType) -> Result { @@ -95,46 +133,10 @@ impl InnerDB { Ok(H256::from(buffer)) } - pub fn new_read_only(db_path: PathBuf) -> Result { - let mut db_opts = Options::default(); - db_opts.create_missing_column_families(true); - db_opts.create_if_missing(true); - - let db = DB::open_cf_for_read_only( - &db_opts, - db_path, - vec![METADATA, STORAGE_LOGS, INDEX_TO_KEY_MAP, FACTORY_DEPS], - false, - )?; - - Ok(Self(db)) - } - - pub fn get_last_l1_batch_number(&self) -> Result> { - // Unwrapping column family handle here is safe because presence of - // those CFs is ensured in construction of this DB. - let metadata = self.cf_handle(METADATA).unwrap(); - let batch = self.get_cf(metadata, LATEST_L1_BLOCK_NUMBER)?.map(|bytes| { - u64::from_be_bytes([ - bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], - ]) - }); - - Ok(batch) - } - - pub fn set_last_l1_batch_number(&self, batch_number: u64) -> Result<()> { - // Unwrapping column family handle here is safe because presence of - // those CFs is ensured in construction of this DB. - let metadata = self.cf_handle(METADATA).unwrap(); - self.put_cf(metadata, LATEST_L1_BLOCK_NUMBER, batch_number.to_be_bytes()) - .map_err(Into::into) - } - pub fn get_storage_log(&self, key: &U256) -> Result> { // Unwrapping column family handle here is safe because presence of // those CFs is ensured in construction of this DB. - let storage_logs = self.cf_handle(STORAGE_LOGS).unwrap(); + let storage_logs = self.cf_handle(snapshot_columns::STORAGE_LOGS).unwrap(); let mut byte_key = [0u8; 32]; key.to_big_endian(&mut byte_key); self.get_cf(storage_logs, byte_key) @@ -146,7 +148,7 @@ impl InnerDB { // Unwrapping column family handle here is safe because presence of // those CFs is ensured in construction of this DB. let index_to_key_map = self.cf_handle(INDEX_TO_KEY_MAP).unwrap(); - let storage_logs = self.cf_handle(STORAGE_LOGS).unwrap(); + let storage_logs = self.cf_handle(snapshot_columns::STORAGE_LOGS).unwrap(); let mut key: [u8; 32] = [0; 32]; storage_log_entry.key.to_big_endian(&mut key); @@ -175,7 +177,7 @@ impl InnerDB { pub fn update_storage_log_value(&self, key_idx: u64, value: &[u8]) -> Result<()> { // Unwrapping column family handle here is safe because presence of // those CFs is ensured in construction of this DB. - let storage_logs = self.cf_handle(STORAGE_LOGS).unwrap(); + let storage_logs = self.cf_handle(snapshot_columns::STORAGE_LOGS).unwrap(); let key = self.get_key_from_index(key_idx)?; // XXX: These should really be inside a transaction... @@ -189,34 +191,49 @@ impl InnerDB { pub fn insert_factory_dep(&self, fdep: &SnapshotFactoryDependency) -> Result<()> { // Unwrapping column family handle here is safe because presence of // those CFs is ensured in construction of this DB. - let factory_deps = self.cf_handle(FACTORY_DEPS).unwrap(); + let factory_deps = self.cf_handle(snapshot_columns::FACTORY_DEPS).unwrap(); self.put_cf(factory_deps, fdep.bytecode_hash, bincode::serialize(&fdep)?) .map_err(Into::into) } - pub fn get_latest_l1_block_number(&self) -> Result { - self.get_metadata_value(LATEST_L1_BLOCK_NUMBER) - .map(U64::from) + pub fn get_latest_l1_batch_number(&self) -> Result { + let column = match self.mode { + DBMode::Reconstruction => reconstruction_columns::LATEST_L1_BATCH, + DBMode::Snapshot => snapshot_columns::LATEST_L1_BATCH, + }; + self.get_metadata_value(column).map(U64::from) } - pub fn set_latest_l1_block_number(&self, number: u64) -> Result<()> { - self.set_metadata_value(LATEST_L1_BLOCK_NUMBER, number) + pub fn set_latest_l1_batch_number(&self, number: u64) -> Result<()> { + let column = match self.mode { + DBMode::Reconstruction => reconstruction_columns::LATEST_L1_BATCH, + DBMode::Snapshot => snapshot_columns::LATEST_L1_BATCH, + }; + self.set_metadata_value(column, number) } - pub fn get_latest_l2_block_number(&self) -> Result { - self.get_metadata_value(LATEST_L2_BLOCK_NUMBER) + pub fn get_latest_l2_batch_number(&self) -> Result { + self.get_metadata_value(reconstruction_columns::LATEST_L2_BATCH) } - pub fn set_latest_l2_block_number(&self, number: u64) -> Result<()> { - self.set_metadata_value(LATEST_L2_BLOCK_NUMBER, number) + pub fn set_latest_l2_batch_number(&self, number: u64) -> Result<()> { + self.set_metadata_value(reconstruction_columns::LATEST_L2_BATCH, number) } pub fn get_last_repeated_key_index(&self) -> Result { - self.get_metadata_value(LAST_REPEATED_KEY_INDEX) + let column = match self.mode { + DBMode::Reconstruction => reconstruction_columns::LAST_REPEATED_KEY_INDEX, + DBMode::Snapshot => snapshot_columns::LAST_REPEATED_KEY_INDEX, + }; + self.get_metadata_value(column) } pub fn set_last_repeated_key_index(&self, idx: u64) -> Result<()> { - self.set_metadata_value(LAST_REPEATED_KEY_INDEX, idx) + let column = match self.mode { + DBMode::Reconstruction => reconstruction_columns::LAST_REPEATED_KEY_INDEX, + DBMode::Snapshot => snapshot_columns::LAST_REPEATED_KEY_INDEX, + }; + self.set_metadata_value(column, idx) } fn get_metadata_value(&self, value_name: &str) -> Result { @@ -285,7 +302,7 @@ mod tests { fn basics() { let db_dir = PathBuf::from("./test_inner_db"); { - let db = InnerDB::new(db_dir.clone()).unwrap(); + let db = InnerDB::new(db_dir.clone(), DBMode::default()).unwrap(); let zero = db.get_last_repeated_key_index().unwrap(); assert_eq!(zero, 0); db.set_last_repeated_key_index(1).unwrap();