From 476621cd43da900d6bd9b48092a662989918305b Mon Sep 17 00:00:00 2001 From: Jonathan <94441036+zeapoz@users.noreply.github.com> Date: Wed, 3 Apr 2024 09:31:37 +0200 Subject: [PATCH] feat: snapshot imports (#79) * ref(snapshot): move exporter out of `mod.rs` * feat(snapshot): import cli arguments * deps: remove `deflate` in favor of `flate2` * feat(snapshot): decode header and factory deps * feat(snapshot): restore state tree from snapshot * ref(snapshot): make `snapshot` a cli argument instead * feat(snapshot): read filepaths from header * ref(snapshot): modify file names to match upstream * fixup! * tweak(snapshot): log a message on successful export * chore: cargo fmt * chore: remove `#!allow[warnings]` attribute --- Cargo.lock | 27 +-- Cargo.toml | 2 +- src/cli.rs | 4 + src/main.rs | 21 ++- src/processor/snapshot/database.rs | 2 +- src/processor/snapshot/exporter.rs | 187 +++++++++++++++++++ src/processor/snapshot/importer.rs | 101 +++++++++++ src/processor/snapshot/mod.rs | 189 +------------------- src/processor/tree/mod.rs | 2 +- src/processor/tree/tree_wrapper.rs | 38 +++- state-reconstruct-fetcher/src/l1_fetcher.rs | 4 +- 11 files changed, 361 insertions(+), 216 deletions(-) create mode 100644 src/processor/snapshot/exporter.rs create mode 100644 src/processor/snapshot/importer.rs diff --git a/Cargo.lock b/Cargo.lock index 1bdc4af..184f43a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,12 +27,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "adler32" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" - [[package]] name = "aes" version = "0.8.4" @@ -1129,16 +1123,6 @@ dependencies = [ "uuid 1.7.0", ] -[[package]] -name = "deflate" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c86f7e25f518f4b81808a2cf1c50996a61f5c2eb394b2393bd87f2a4780a432f" -dependencies = [ - "adler32", - "gzip-header", -] - [[package]] name = "der" version = "0.6.1" @@ -2046,15 +2030,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "gzip-header" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95cc527b92e6029a62960ad99aa8a6660faa4555fe5f731aab13aa6a921795a2" -dependencies = [ - "crc32fast", -] - [[package]] name = "h2" version = "0.3.25" @@ -4473,9 +4448,9 @@ dependencies = [ "bytes", "chrono", "clap", - "deflate", "ethers", "eyre", + "flate2", "hex", "indexmap", "primitive-types", diff --git a/Cargo.toml b/Cargo.toml index e0eab2c..1c6ffda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,9 +15,9 @@ blobscan-client = { path = "./state-reconstruct-fetcher/blobscan-client" } bytes = "1.5" chrono = "0.4.31" clap = { version = "4.4.7", features = ["derive", "env"] } -deflate = { version = "1.0.0", features = ["gzip"] } ethers = "1.0.2" eyre = "0.6.8" +flate2 = "1.0.28" hex = "0.4.3" indexmap = { version = "2.0.2" } primitive-types = "0.12.2" diff --git a/src/cli.rs b/src/cli.rs index b3fca59..f5ca19f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -75,6 +75,10 @@ pub enum Command { /// The path to the storage solution. #[arg(short, long, env = "ZK_SYNC_DB_PATH")] db_path: Option, + /// If present, try to restore state from snapshot files contained in the specified + /// directory. Note that this will only work when supplied with a fresh database. + #[arg(long)] + snapshot: Option, }, /// Query the local storage, and optionally, return a JSON-payload of the data. diff --git a/src/main.rs b/src/main.rs index 07c6e7f..12657f7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,9 @@ use std::{ use clap::Parser; use cli::{Cli, Command, ReconstructSource}; use eyre::Result; -use processor::snapshot::{SnapshotBuilder, SnapshotExporter}; +use processor::snapshot::{ + exporter::SnapshotExporter, importer::SnapshotImporter, SnapshotBuilder, +}; use state_reconstruct_fetcher::{constants::storage, l1_fetcher::L1Fetcher, types::CommitBlock}; use tikv_jemallocator::Jemalloc; use tokio::sync::mpsc; @@ -59,12 +61,23 @@ async fn main() -> Result<()> { let cli = Cli::parse(); match cli.subcommand { - Command::Reconstruct { source, db_path } => { + Command::Reconstruct { + source, + db_path, + snapshot, + } => { let db_path = match db_path { Some(path) => PathBuf::from(path), None => env::current_dir()?.join(storage::DEFAULT_DB_NAME), }; + if let Some(directory) = snapshot { + tracing::info!("Trying to restore state from snapshot..."); + let importer = + SnapshotImporter::new(PathBuf::from(directory), &db_path.clone()).await?; + importer.run().await?; + } + match source { ReconstructSource::L1 { l1_fetcher_options } => { let fetcher_options = l1_fetcher_options.into(); @@ -159,8 +172,10 @@ async fn main() -> Result<()> { } => { let export_path = Path::new(&directory); std::fs::create_dir_all(export_path)?; - let exporter = SnapshotExporter::new(export_path, db_path); + let exporter = SnapshotExporter::new(export_path, db_path)?; exporter.export_snapshot(chunk_size)?; + + tracing::info!("Succesfully exported snapshot files to \"{directory}\"!"); } } diff --git a/src/processor/snapshot/database.rs b/src/processor/snapshot/database.rs index afd61ad..e45ac56 100644 --- a/src/processor/snapshot/database.rs +++ b/src/processor/snapshot/database.rs @@ -104,7 +104,7 @@ impl SnapshotDB { idx_bytes[7], ]) } else { - self.put_cf(metadata, LAST_REPEATED_KEY_INDEX, u64::to_be_bytes(1))?; + self.put_cf(metadata, LAST_REPEATED_KEY_INDEX, u64::to_be_bytes(0))?; 0 }, ) diff --git a/src/processor/snapshot/exporter.rs b/src/processor/snapshot/exporter.rs new file mode 100644 index 0000000..68cfe37 --- /dev/null +++ b/src/processor/snapshot/exporter.rs @@ -0,0 +1,187 @@ +use std::{ + io::Write, + path::{Path, PathBuf}, +}; + +use bytes::BytesMut; +use eyre::Result; +use flate2::{write::GzEncoder, Compression}; +use prost::Message; + +use super::{ + database::{self, SnapshotDB}, + types::{self, SnapshotFactoryDependency, SnapshotHeader}, + DEFAULT_DB_PATH, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME, +}; + +pub mod protobuf { + include!(concat!(env!("OUT_DIR"), "/protobuf.rs")); +} + +pub struct SnapshotExporter { + basedir: PathBuf, + database: SnapshotDB, +} + +impl SnapshotExporter { + pub fn new(basedir: &Path, db_path: Option) -> Result { + let db_path = match db_path { + Some(p) => PathBuf::from(p), + None => PathBuf::from(DEFAULT_DB_PATH), + }; + + let database = SnapshotDB::new_read_only(db_path)?; + Ok(Self { + basedir: basedir.to_path_buf(), + database, + }) + } + + pub fn export_snapshot(&self, chunk_size: u64) -> Result<()> { + let mut header = SnapshotHeader::default(); + self.export_storage_logs(chunk_size, &mut header)?; + self.export_factory_deps(&mut header)?; + + let path = self.basedir.join(SNAPSHOT_HEADER_FILE_NAME); + let outfile = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(false) + .open(path)?; + + serde_json::to_writer(outfile, &header)?; + + Ok(()) + } + + fn export_factory_deps(&self, header: &mut SnapshotHeader) -> Result<()> { + let mut buf = BytesMut::new(); + + let storage_logs = self.database.cf_handle(database::FACTORY_DEPS).unwrap(); + let mut iterator = self + .database + .iterator_cf(storage_logs, rocksdb::IteratorMode::Start); + + let mut factory_deps = protobuf::SnapshotFactoryDependencies::default(); + while let Some(Ok((_, bs))) = iterator.next() { + let factory_dep: SnapshotFactoryDependency = bincode::deserialize(&bs)?; + factory_deps + .factory_deps + .push(protobuf::SnapshotFactoryDependency { + bytecode: Some(factory_dep.bytecode), + }); + } + + let fd_len = factory_deps.encoded_len(); + if buf.capacity() < fd_len { + buf.reserve(fd_len - buf.capacity()); + } + + let path = self.basedir.join(format!( + "snapshot_l1_batch_{}_{}", + header.l1_batch_number, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX + )); + header.factory_deps_filepath = path + .clone() + .into_os_string() + .into_string() + .expect("path to string"); + + // Serialize chunk. + factory_deps.encode(&mut buf)?; + + let outfile = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(false) + .open(path)?; + + // Wrap in gzip compression before writing. + let mut encoder = GzEncoder::new(outfile, Compression::default()); + encoder.write_all(&buf)?; + encoder.finish()?; + + Ok(()) + } + + fn export_storage_logs(&self, chunk_size: u64, header: &mut SnapshotHeader) -> Result<()> { + let mut buf = BytesMut::new(); + let mut chunk_id = 0; + + let index_to_key_map = self.database.cf_handle(database::INDEX_TO_KEY_MAP).unwrap(); + let mut iterator = self + .database + .iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start); + + let mut has_more = true; + + while has_more { + let mut chunk = protobuf::SnapshotStorageLogsChunk { + storage_logs: vec![], + }; + + for _ in 0..chunk_size { + if let Some(Ok((_, key))) = iterator.next() { + if let Ok(Some(entry)) = self.database.get_storage_log(key.as_ref()) { + let pb = protobuf::SnapshotStorageLog { + account_address: None, + storage_key: Some(key.to_vec()), + storage_value: Some(entry.value.0.to_vec()), + l1_batch_number_of_initial_write: Some( + entry.l1_batch_number_of_initial_write.as_u32(), + ), + enumeration_index: Some(entry.enumeration_index), + }; + + chunk.storage_logs.push(pb); + header.l1_batch_number = entry.l1_batch_number_of_initial_write; + } + } else { + has_more = false; + } + } + + // Ensure that write buffer has enough capacity. + let chunk_len = chunk.encoded_len(); + if buf.capacity() < chunk_len { + buf.reserve(chunk_len - buf.capacity()); + } + + let path = PathBuf::new().join(&self.basedir).join(format!( + "snapshot_l1_batch_{}_storage_logs_part_{:0>4}.proto.gzip", + header.l1_batch_number, chunk_id + )); + chunk_id += 1; + + header + .storage_logs_chunks + .push(types::SnapshotStorageLogsChunkMetadata { + chunk_id, + filepath: path + .clone() + .into_os_string() + .into_string() + .expect("path to string"), + }); + + // Serialize chunk. + chunk.encode(&mut buf)?; + + let outfile = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(false) + .open(path)?; + + // Wrap in gzip compression before writing. + let mut encoder = GzEncoder::new(outfile, Compression::default()); + encoder.write_all(&buf)?; + encoder.finish()?; + + // Clear $tmp buffer. + buf.truncate(0); + } + + Ok(()) + } +} diff --git a/src/processor/snapshot/importer.rs b/src/processor/snapshot/importer.rs new file mode 100644 index 0000000..e351e05 --- /dev/null +++ b/src/processor/snapshot/importer.rs @@ -0,0 +1,101 @@ +use std::{ + fs, + io::Read, + path::{Path, PathBuf}, + sync::Arc, +}; + +use eyre::Result; +use flate2::read::GzDecoder; +use prost::Message; +use state_reconstruct_fetcher::{constants::storage::INNER_DB_NAME, database::InnerDB}; +use tokio::sync::Mutex; + +use super::{ + exporter::protobuf::{SnapshotFactoryDependencies, SnapshotStorageLogsChunk}, + types::SnapshotHeader, + SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME, +}; +use crate::processor::tree::tree_wrapper::TreeWrapper; + +pub struct SnapshotImporter { + // The path of the directory where snapshot chunks are stored. + directory: PathBuf, + // The tree to import state to. + tree: TreeWrapper, +} + +impl SnapshotImporter { + pub async fn new(directory: PathBuf, db_path: &Path) -> Result { + let inner_db_path = db_path.join(INNER_DB_NAME); + let new_state = InnerDB::new(inner_db_path.clone())?; + let snapshot = Arc::new(Mutex::new(new_state)); + let tree = TreeWrapper::new(db_path, snapshot.clone(), true).await?; + + Ok(Self { directory, tree }) + } + + pub async fn run(mut self) -> Result<()> { + let header = self.read_header()?; + let _factory_deps = self.read_factory_deps(&header)?; + let storage_logs_chunk = self.read_storage_logs_chunks(&header)?; + + self.tree + .restore_from_snapshot(storage_logs_chunk, header.l1_batch_number) + .await + } + + fn read_header(&self) -> Result { + let header_path = self.directory.join(SNAPSHOT_HEADER_FILE_NAME); + let header_string = fs::read_to_string(header_path)?; + let header: SnapshotHeader = serde_json::from_str(&header_string)?; + + Ok(header) + } + + fn read_factory_deps(&self, header: &SnapshotHeader) -> Result { + let factory_deps_path = self.directory.join(format!( + "snapshot_l1_batch_{}_{}", + header.l1_batch_number, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX + )); + let bytes = fs::read(factory_deps_path)?; + let mut decoder = GzDecoder::new(&bytes[..]); + + let mut decompressed_bytes = Vec::new(); + decoder.read_to_end(&mut decompressed_bytes)?; + + let factory_deps = SnapshotFactoryDependencies::decode(&decompressed_bytes[..])?; + Ok(factory_deps) + } + + fn read_storage_logs_chunks( + &self, + header: &SnapshotHeader, + ) -> Result> { + // NOTE: I think these are sorted by default, but if not, we need to sort them + // before extracting the filepaths. + let filepaths = header + .storage_logs_chunks + .iter() + .map(|meta| PathBuf::from(&meta.filepath)); + + let mut chunks = Vec::with_capacity(filepaths.len()); + for path in filepaths { + let factory_deps_path = self + .directory + .join(path.file_name().expect("path has no file name")); + let bytes = fs::read(factory_deps_path)?; + let mut decoder = GzDecoder::new(&bytes[..]); + + let mut decompressed_bytes = Vec::new(); + decoder.read_to_end(&mut decompressed_bytes)?; + + // TODO: It would be nice to avoid the intermediary step of decoding. Something like + // implementing a method on the types::* that does it automatically. Will improve + // readabitly for the export code too as a bonus. + let storage_logs_chunk = SnapshotStorageLogsChunk::decode(&decompressed_bytes[..])?; + chunks.push(storage_logs_chunk); + } + Ok(chunks) + } +} diff --git a/src/processor/snapshot/mod.rs b/src/processor/snapshot/mod.rs index 897e214..9f21c75 100644 --- a/src/processor/snapshot/mod.rs +++ b/src/processor/snapshot/mod.rs @@ -1,21 +1,16 @@ -use std::{ - fs, - io::Write, - path::{Path, PathBuf}, - str::FromStr, -}; +use std::{fs, path::PathBuf, str::FromStr}; + +pub mod database; +pub mod exporter; +pub mod importer; mod bytecode; -mod database; mod types; use async_trait::async_trait; use blake2::{Blake2s256, Digest}; -use bytes::BytesMut; -use deflate::deflate_bytes_gzip; use ethers::types::{Address, H256, U256, U64}; use eyre::Result; -use prost::Message; use state_reconstruct_fetcher::{ constants::{ethereum, storage}, types::CommitBlock, @@ -24,16 +19,14 @@ use tokio::sync::mpsc; use self::{ database::SnapshotDB, - types::{SnapshotFactoryDependency, SnapshotHeader, SnapshotStorageLog}, + types::{SnapshotFactoryDependency, SnapshotStorageLog}, }; use super::Processor; use crate::processor::snapshot::types::MiniblockNumber; -pub mod protobuf { - include!(concat!(env!("OUT_DIR"), "/protobuf.rs")); -} - pub const DEFAULT_DB_PATH: &str = "snapshot_db"; +pub const SNAPSHOT_HEADER_FILE_NAME: &str = "snapshot-header.json"; +pub const SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX: &str = "factory_deps.proto.gzip"; pub struct SnapshotBuilder { database: SnapshotDB, @@ -228,169 +221,3 @@ fn derive_final_address_for_params(address: &Address, key: &U256) -> [u8; 32] { result } - -pub struct SnapshotExporter { - basedir: PathBuf, - database: SnapshotDB, -} - -impl SnapshotExporter { - pub fn new(basedir: &Path, db_path: Option) -> Self { - let db_path = match db_path { - Some(p) => PathBuf::from(p), - None => PathBuf::from(DEFAULT_DB_PATH), - }; - - let database = SnapshotDB::new_read_only(db_path).unwrap(); - Self { - basedir: basedir.to_path_buf(), - database, - } - } - - pub fn export_snapshot(&self, chunk_size: u64) -> Result<()> { - let mut header = SnapshotHeader::default(); - self.export_storage_logs(chunk_size, &mut header)?; - self.export_factory_deps(&mut header)?; - - let path = PathBuf::new() - .join(&self.basedir) - .join("snapshot-header.json"); - - let outfile = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(false) - .open(path)?; - - serde_json::to_writer(outfile, &header)?; - - Ok(()) - } - - fn export_factory_deps(&self, header: &mut SnapshotHeader) -> Result<()> { - let mut buf = BytesMut::new(); - - let storage_logs = self.database.cf_handle(database::FACTORY_DEPS).unwrap(); - let mut iterator = self - .database - .iterator_cf(storage_logs, rocksdb::IteratorMode::Start); - - let mut factory_deps = protobuf::SnapshotFactoryDependencies::default(); - while let Some(Ok((_, bs))) = iterator.next() { - let factory_dep: SnapshotFactoryDependency = bincode::deserialize(bs.as_ref())?; - factory_deps - .factory_deps - .push(protobuf::SnapshotFactoryDependency { - bytecode: Some(factory_dep.bytecode), - }); - } - - let fd_len = factory_deps.encoded_len(); - if buf.capacity() < fd_len { - buf.reserve(fd_len - buf.capacity()); - } - - let path = PathBuf::new().join(&self.basedir).join("factory_deps.dat"); - header.factory_deps_filepath = path - .clone() - .into_os_string() - .into_string() - .expect("path to string"); - - let mut outfile = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(false) - .open(path)?; - - // Serialize chunk. - factory_deps.encode(&mut buf)?; - - // Wrap in gzip compression before writing. - let compressed_buf = deflate_bytes_gzip(&buf); - outfile.write_all(&compressed_buf)?; - outfile.flush()?; - - Ok(()) - } - - fn export_storage_logs(&self, chunk_size: u64, header: &mut SnapshotHeader) -> Result<()> { - let mut buf = BytesMut::new(); - let mut chunk_index = 0; - - let index_to_key_map = self.database.cf_handle(database::INDEX_TO_KEY_MAP).unwrap(); - let mut iterator = self - .database - .iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start); - - let mut has_more = true; - - while has_more { - let mut chunk = protobuf::SnapshotStorageLogsChunk { - storage_logs: vec![], - }; - - for _ in 0..chunk_size { - if let Some(Ok((_, key))) = iterator.next() { - if let Ok(Some(entry)) = self.database.get_storage_log(key.as_ref()) { - let pb = protobuf::SnapshotStorageLog { - account_address: None, - storage_key: Some(key.to_vec()), - storage_value: Some(entry.value.0.to_vec()), - l1_batch_number_of_initial_write: Some( - entry.l1_batch_number_of_initial_write.as_u32(), - ), - enumeration_index: Some(entry.enumeration_index), - }; - - chunk.storage_logs.push(pb); - } - } else { - has_more = false; - } - } - - // Ensure that write buffer has enough capacity. - let chunk_len = chunk.encoded_len(); - if buf.capacity() < chunk_len { - buf.reserve(chunk_len - buf.capacity()); - } - - chunk_index += 1; - let path = PathBuf::new() - .join(&self.basedir) - .join(format!("{chunk_index}.gz")); - - header - .storage_logs_chunks - .push(types::SnapshotStorageLogsChunkMetadata { - chunk_id: chunk_index, - filepath: path - .clone() - .into_os_string() - .into_string() - .expect("path to string"), - }); - - let mut outfile = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(false) - .open(path)?; - - // Serialize chunk. - chunk.encode(&mut buf)?; - - // Wrap in gzip compression before writing. - let compressed_buf = deflate_bytes_gzip(&buf); - outfile.write_all(&compressed_buf)?; - outfile.flush()?; - - // Clear $tmp buffer. - buf.truncate(0); - } - - Ok(()) - } -} diff --git a/src/processor/tree/mod.rs b/src/processor/tree/mod.rs index cacf745..0a85c1b 100644 --- a/src/processor/tree/mod.rs +++ b/src/processor/tree/mod.rs @@ -1,5 +1,5 @@ pub mod query_tree; -mod tree_wrapper; +pub mod tree_wrapper; use std::{path::PathBuf, sync::Arc}; diff --git a/src/processor/tree/tree_wrapper.rs b/src/processor/tree/tree_wrapper.rs index cf23e22..fe0336c 100644 --- a/src/processor/tree/tree_wrapper.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, fs, num::NonZeroU32, path::Path, str::FromStr, sync::Arc}; use blake2::{Blake2s256, Digest}; -use ethers::types::{Address, H256, U256}; +use ethers::types::{Address, H256, U256, U64}; use eyre::Result; use state_reconstruct_fetcher::{ constants::storage::INITAL_STATE_PATH, @@ -14,6 +14,7 @@ use zksync_merkle_tree::{Database, MerkleTree, RocksDBWrapper, TreeEntry}; use zksync_storage::{RocksDB, RocksDBOptions}; use super::RootHash; +use crate::processor::snapshot::exporter::protobuf::SnapshotStorageLogsChunk; #[derive(Error, Debug)] pub enum TreeError { @@ -123,6 +124,41 @@ impl TreeWrapper { } } + pub async fn restore_from_snapshot( + &mut self, + chunks: Vec, + l1_batch_number: U64, + ) -> Result<()> { + let mut tree_entries = Vec::new(); + + for chunk in &chunks { + for log in &chunk.storage_logs { + let key = U256::from_big_endian(log.storage_key()); + let index = log.enumeration_index(); + + let value_bytes: [u8; 32] = log.storage_value().try_into()?; + let value = H256::from(&value_bytes); + + tree_entries.push(TreeEntry::new(key, index, value)); + self.snapshot + .lock() + .await + .add_key(&key) + .expect("cannot add key"); + } + } + + let num_tree_entries = tree_entries.len(); + self.tree.extend(tree_entries); + + tracing::info!("Succesfully imported snapshot containing {num_tree_entries} storage logs!",); + + let snapshot = self.snapshot.lock().await; + snapshot.set_latest_l1_block_number(l1_batch_number.as_u64())?; + + Ok(()) + } + fn process_value(&mut self, key: U256, value: PackingType) -> H256 { let version = self.tree.latest_version().unwrap_or_default(); if let Ok(leaf) = self.tree.entries(version, &[key]) { diff --git a/state-reconstruct-fetcher/src/l1_fetcher.rs b/state-reconstruct-fetcher/src/l1_fetcher.rs index 791ca5f..e691d27 100644 --- a/state-reconstruct-fetcher/src/l1_fetcher.rs +++ b/state-reconstruct-fetcher/src/l1_fetcher.rs @@ -101,8 +101,8 @@ impl L1Fetcher { // use of the snapshot value. if current_l1_block_number == GENESIS_BLOCK.into() { if let Some(snapshot) = &self.snapshot { - let snapshot = snapshot.lock().await; - let snapshot_latest_l1_block_number = snapshot.get_latest_l1_block_number()?; + let snapshot_latest_l1_block_number = + snapshot.lock().await.get_latest_l1_block_number()?; if snapshot_latest_l1_block_number > current_l1_block_number { current_l1_block_number = snapshot_latest_l1_block_number; tracing::info!(