diff --git a/src/cli.rs b/src/cli.rs index 7f819f8..a4bbea6 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -110,8 +110,8 @@ pub enum Command { #[arg(short, long, default_value = snapshot::DEFAULT_DB_PATH)] db_path: Option, /// Number of storage logs to stuff into one chunk. - #[arg(short, long, default_value_t = 1_000_000)] - chunk_size: u64, + #[arg(short, long, default_value_t = snapshot::DEFAULT_CHUNK_SIZE)] + chunk_size: usize, /// The directory to export the snapshot files to. directory: String, }, diff --git a/src/main.rs b/src/main.rs index d0ca365..3c7d1fc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -77,9 +77,8 @@ async fn main() -> Result<()> { if let Some(directory) = snapshot { tracing::info!("Trying to restore state from snapshot..."); - let importer = - SnapshotImporter::new(PathBuf::from(directory), &db_path.clone()).await?; - importer.run().await?; + let importer = SnapshotImporter::new(PathBuf::from(directory)); + importer.run(&db_path.clone()).await?; } match source { diff --git a/src/processor/snapshot/exporter.rs b/src/processor/snapshot/exporter.rs index 9955ced..73df1ce 100644 --- a/src/processor/snapshot/exporter.rs +++ b/src/processor/snapshot/exporter.rs @@ -36,7 +36,7 @@ impl SnapshotExporter { }) } - pub fn export_snapshot(&self, chunk_size: u64) -> Result<()> { + pub fn export_snapshot(&self, chunk_size: usize) -> Result<()> { let l1_batch_number = self.database.get_latest_l1_batch_number()?; let mut header = SnapshotHeader { l1_batch_number, @@ -91,7 +91,7 @@ impl SnapshotExporter { Ok(()) } - fn export_storage_logs(&self, chunk_size: u64, header: &mut SnapshotHeader) -> Result<()> { + fn export_storage_logs(&self, chunk_size: usize, header: &mut SnapshotHeader) -> Result<()> { tracing::info!("Exporting storage logs..."); let num_logs = self.database.get_last_repeated_key_index()?; @@ -102,7 +102,7 @@ impl SnapshotExporter { .database .iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start); - let total_num_chunks = (num_logs / chunk_size) + 1; + let total_num_chunks = (num_logs / chunk_size as u64) + 1; for chunk_id in 0..total_num_chunks { tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, total_num_chunks); diff --git a/src/processor/snapshot/importer.rs b/src/processor/snapshot/importer.rs index c8478e5..a466bd3 100644 --- a/src/processor/snapshot/importer.rs +++ b/src/processor/snapshot/importer.rs @@ -1,21 +1,16 @@ use std::{ fs::{self, DirEntry}, path::{Path, PathBuf}, - sync::Arc, }; use ethers::types::U64; use eyre::Result; use regex::{Captures, Regex}; -use state_reconstruct_fetcher::constants::storage::INNER_DB_NAME; -use state_reconstruct_storage::{ - reconstruction::ReconstructionDatabase, - types::{ - Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk, - SnapshotStorageLogsChunkMetadata, - }, +use state_reconstruct_storage::types::{ + Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk, + SnapshotStorageLogsChunkMetadata, }; -use tokio::sync::Mutex; +use tokio::sync::mpsc::{self, Sender}; use super::{SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME}; use crate::processor::tree::tree_wrapper::TreeWrapper; @@ -26,28 +21,36 @@ const FACTORY_DEPS_REGEX: &str = r"snapshot_l1_batch_(\d*)_factory_deps.proto.gz pub struct SnapshotImporter { // The path of the directory where snapshot chunks are stored. directory: PathBuf, - // The tree to import state to. - tree: TreeWrapper, } impl SnapshotImporter { - pub async fn new(directory: PathBuf, db_path: &Path) -> Result { - let inner_db_path = db_path.join(INNER_DB_NAME); - let new_state = ReconstructionDatabase::new(inner_db_path.clone())?; - let snapshot = Arc::new(Mutex::new(new_state)); - let tree = TreeWrapper::new(db_path, snapshot.clone(), true).await?; - - Ok(Self { directory, tree }) + pub fn new(directory: PathBuf) -> Self { + Self { directory } } - pub async fn run(mut self) -> Result<()> { + pub async fn run(self, db_path: &Path) -> Result<()> { + let (tx, rx) = mpsc::channel(1); + let header = self.read_header()?; let _factory_deps = self.read_factory_deps(&header)?; - let storage_logs_chunk = self.read_storage_logs_chunks(&header)?; - self.tree - .restore_from_snapshot(storage_logs_chunk, header.l1_batch_number) + // Read storage logs async sending each read one into the tree to process. + tokio::spawn({ + let header = header.clone(); + async move { + self.read_storage_logs_chunks_async(&header, tx) + .await + .expect("failed to read storage_logs_chunks"); + } + }); + + let mut tree = TreeWrapper::new_snapshot_wrapper(db_path) .await + .expect("can't create tree"); + tree.restore_from_snapshot(rx, header.l1_batch_number) + .await?; + + Ok(()) } fn read_header(&self) -> Result { @@ -71,10 +74,11 @@ impl SnapshotImporter { SnapshotFactoryDependencies::decode(&bytes) } - fn read_storage_logs_chunks( + async fn read_storage_logs_chunks_async( &self, header: &SnapshotHeader, - ) -> Result> { + tx: Sender, + ) -> Result<()> { // NOTE: I think these are sorted by default, but if not, we need to sort them // before extracting the filepaths. let filepaths = header @@ -82,16 +86,18 @@ impl SnapshotImporter { .iter() .map(|meta| PathBuf::from(&meta.filepath)); - let mut chunks = Vec::with_capacity(filepaths.len()); - for path in filepaths { + let total_chunks = filepaths.len(); + for (i, path) in filepaths.into_iter().enumerate() { let factory_deps_path = self .directory .join(path.file_name().expect("path has no file name")); let bytes = fs::read(factory_deps_path)?; let storage_logs_chunk = SnapshotStorageLogsChunk::decode(&bytes)?; - chunks.push(storage_logs_chunk); + tracing::info!("Read chunk {}/{}, processing...", i + 1, total_chunks); + tx.send(storage_logs_chunk).await?; } - Ok(chunks) + + Ok(()) } fn infer_header_from_file_names(&self) -> Result { diff --git a/src/processor/snapshot/mod.rs b/src/processor/snapshot/mod.rs index 87413d0..db2d346 100644 --- a/src/processor/snapshot/mod.rs +++ b/src/processor/snapshot/mod.rs @@ -24,6 +24,7 @@ use super::Processor; pub const DEFAULT_DB_PATH: &str = "snapshot_db"; pub const SNAPSHOT_HEADER_FILE_NAME: &str = "snapshot-header.json"; pub const SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX: &str = "factory_deps.proto.gzip"; +pub const DEFAULT_CHUNK_SIZE: usize = 1_000_000; pub struct SnapshotBuilder { database: SnapshotDatabase, diff --git a/src/processor/tree/tree_wrapper.rs b/src/processor/tree/tree_wrapper.rs index 545cd0b..c68bcef 100644 --- a/src/processor/tree/tree_wrapper.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -3,16 +3,20 @@ use std::{collections::HashMap, fs, num::NonZeroU32, path::Path, str::FromStr, s use blake2::{Blake2s256, Digest}; use ethers::types::{Address, H256, U256, U64}; use eyre::Result; -use state_reconstruct_fetcher::{constants::storage::INITAL_STATE_PATH, types::CommitBlock}; +use state_reconstruct_fetcher::{ + constants::storage::{INITAL_STATE_PATH, INNER_DB_NAME}, + types::CommitBlock, +}; use state_reconstruct_storage::{ reconstruction::ReconstructionDatabase, types::SnapshotStorageLogsChunk, PackingType, }; use thiserror::Error; -use tokio::sync::Mutex; +use tokio::sync::{mpsc::Receiver, Mutex}; use zksync_merkle_tree::{Database, MerkleTree, RocksDBWrapper, TreeEntry}; use zksync_storage::{RocksDB, RocksDBOptions}; use super::RootHash; +use crate::processor::snapshot::DEFAULT_CHUNK_SIZE; #[derive(Error, Debug)] pub enum TreeError { @@ -54,6 +58,13 @@ impl TreeWrapper { }) } + pub async fn new_snapshot_wrapper(db_path: &Path) -> Result { + let inner_db_path = db_path.join(INNER_DB_NAME); + let new_state = ReconstructionDatabase::new(inner_db_path.clone())?; + let snapshot = Arc::new(Mutex::new(new_state)); + Self::new(db_path, snapshot.clone(), true).await + } + /// Inserts a block into the tree and returns the root hash of the resulting state tree. pub async fn insert_block(&mut self, block: &CommitBlock) -> Result { self.clear_known_base(); @@ -123,36 +134,33 @@ impl TreeWrapper { pub async fn restore_from_snapshot( &mut self, - chunks: Vec, + mut rx: Receiver, l1_batch_number: U64, ) -> Result<()> { + let mut inner_db = self.inner_db.lock().await; let mut total_tree_entries = 0; - for (i, chunk) in chunks.iter().enumerate() { - let mut tree_entries = Vec::new(); - tracing::info!("Importing chunk {}/{}...", i + 1, chunks.len()); + let mut i = 0; + while let Some(chunk) = rx.recv().await { + let mut tree_entries = Vec::with_capacity(DEFAULT_CHUNK_SIZE); for log in &chunk.storage_logs { tree_entries.push(TreeEntry::new(log.key, log.enumeration_index, log.value)); - self.inner_db - .lock() - .await - .add_key(&log.key) - .expect("cannot add key"); + inner_db.add_key(&log.key).expect("cannot add key"); } total_tree_entries += tree_entries.len(); self.tree.extend(tree_entries); tracing::info!("Chunk {} was succesfully imported!", i + 1); + i += 1; } tracing::info!( "Succesfully imported snapshot containing {total_tree_entries} storage logs!", ); - let db = self.inner_db.lock().await; - db.set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?; + inner_db.set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?; Ok(()) } diff --git a/state-reconstruct-storage/src/types.rs b/state-reconstruct-storage/src/types.rs index c57427b..49c83af 100644 --- a/state-reconstruct-storage/src/types.rs +++ b/state-reconstruct-storage/src/types.rs @@ -73,7 +73,7 @@ pub trait Proto { } } -#[derive(Default, Debug, Serialize, Deserialize)] +#[derive(Clone, Default, Debug, Serialize, Deserialize)] pub struct SnapshotHeader { pub l1_batch_number: L1BatchNumber, pub miniblock_number: MiniblockNumber, @@ -85,7 +85,7 @@ pub struct SnapshotHeader { pub generated_at: DateTime, } -#[derive(Default, Debug, Serialize, Deserialize)] +#[derive(Clone, Default, Debug, Serialize, Deserialize)] pub struct SnapshotStorageLogsChunkMetadata { pub chunk_id: u64, // can be either a gs or filesystem path