diff --git a/.gitignore b/.gitignore index a072a6c..7ea9733 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,10 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb +# Local state files. +db/ +StateSnapshot.json + # Direnv files. .direnv/ .envrc @@ -20,6 +24,3 @@ Cargo.lock # Nix files. flake.nix flake.lock - -# db -db/ diff --git a/Cargo.toml b/Cargo.toml index c13032a..57f8eb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,8 @@ clap = { version = "4.4.0", features = ["string"] } ethers = "1" eyre = "0.6.8" hex = "0.4.3" -indexmap = "2.0.1" +indexmap = { version = "2.0.1", features = ["serde"] } +serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.107" thiserror = "1.0" tokio = { version = "1.32.0", features = ["macros"] } diff --git a/src/lib.rs b/src/lib.rs index 46b7675..8b9fbe8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,11 @@ #![feature(array_chunks)] // #![warn(clippy::pedantic)] -mod state; +mod snapshot; mod tree; -use crate::state::CommitBlockInfoV1; +mod types; + +use crate::types::CommitBlockInfoV1; use ethers::{ abi::{Contract, Function}, @@ -13,6 +15,7 @@ use ethers::{ use eyre::Result; pub const INITAL_STATE_PATH: &str = "InitialState.csv"; +pub const STATE_FILE_PATH: &str = "StateSnapshot.json"; pub const ZK_SYNC_ADDR: &str = "0x32400084C286CF3E17e7B677ea9583e60a000324"; pub const GENESIS_BLOCK: u64 = 16_627_460; pub const BLOCK_STEP: u64 = 128; @@ -33,10 +36,10 @@ pub fn parse_calldata( ) -> Result> { let mut parsed_input = commit_blocks_fn .decode_input(&calldata[4..]) - .map_err(|e| state::ParseError::InvalidCalldata(e.to_string()))?; + .map_err(|e| types::ParseError::InvalidCalldata(e.to_string()))?; if parsed_input.len() != 2 { - return Err(state::ParseError::InvalidCalldata(format!( + return Err(types::ParseError::InvalidCalldata(format!( "invalid number of parameters (got {}, expected 2) for commitBlocks function", parsed_input.len() )) @@ -45,26 +48,26 @@ pub fn parse_calldata( let new_blocks_data = parsed_input .pop() - .ok_or_else(|| state::ParseError::InvalidCalldata("new blocks data".to_string()))?; + .ok_or_else(|| types::ParseError::InvalidCalldata("new blocks data".to_string()))?; let stored_block_info = parsed_input .pop() - .ok_or_else(|| state::ParseError::InvalidCalldata("stored block info".to_string()))?; + .ok_or_else(|| types::ParseError::InvalidCalldata("stored block info".to_string()))?; let abi::Token::Tuple(stored_block_info) = stored_block_info else { return Err( - state::ParseError::InvalidCalldata("invalid StoredBlockInfo".to_string()).into(), + types::ParseError::InvalidCalldata("invalid StoredBlockInfo".to_string()).into(), ); }; let abi::Token::Uint(_previous_l2_block_number) = stored_block_info[0].clone() else { - return Err(state::ParseError::InvalidStoredBlockInfo( + return Err(types::ParseError::InvalidStoredBlockInfo( "cannot parse previous L2 block number".to_string(), ) .into()); }; let abi::Token::Uint(_previous_enumeration_index) = stored_block_info[2].clone() else { - return Err(state::ParseError::InvalidStoredBlockInfo( + return Err(types::ParseError::InvalidStoredBlockInfo( "cannot parse previous enumeration index".to_string(), ) .into()); @@ -81,7 +84,7 @@ fn parse_commit_block_info(data: &abi::Token) -> Result> let mut res = vec![]; let abi::Token::Array(data) = data else { - return Err(state::ParseError::InvalidCommitBlockInfo( + return Err(types::ParseError::InvalidCommitBlockInfo( "cannot convert newBlocksData to array".to_string(), ) .into()); @@ -108,7 +111,7 @@ mod tests { use eyre::Result; - use crate::tree::TreeWrapper; + use crate::{snapshot::StateSnapshot, tree::TreeWrapper}; use super::*; @@ -117,14 +120,31 @@ mod tests { async fn it_works() -> Result<()> { // TODO: This should be an env variable / CLI argument. let db_dir = env::current_dir()?.join("db"); - // TODO: Save / Load from existing db. - if db_dir.exists() { - std::fs::remove_dir_all(&db_dir)?; - } - let mut tree = TreeWrapper::new(db_dir.as_path())?; + + // TODO: Implement graceful shutdown. + // If database directory already exists, we try to restore the latest state. + // The state contains the last processed block and a mapping of index to key + // values, if a state file does not exist, we simply use the defaults instead. + let should_restore_state = db_dir.exists(); + let mut state_snapshot = if should_restore_state { + println!("Loading previous state file..."); + StateSnapshot::read(STATE_FILE_PATH).expect("state file is malformed") + } else { + println!("No existing database found, starting from genesis..."); + StateSnapshot::default() + }; + + // Extract fields from state snapshot. + let StateSnapshot { + mut current_l1_block_number, + mut latest_l2_block_number, + ref index_to_key_map, + } = state_snapshot; + + let mut tree = TreeWrapper::new(db_dir.as_path(), index_to_key_map.clone())?; let (provider, contract) = init_eth_adapter("https://eth.llamarpc.com").await; - let latest_l1_block = provider + let latest_l1_block_number = provider .get_block(BlockNumber::Latest) .await? .unwrap() @@ -134,26 +154,23 @@ mod tests { let event = contract.events_by_name("BlockCommit")?[0].clone(); let function = contract.functions_by_name("commitBlocks")?[0].clone(); - let mut current_block = GENESIS_BLOCK; - let mut latest_l2_block_number = U256::default(); - while current_block <= latest_l1_block.0[0] { + println!("Starting from l1 block: {}", current_l1_block_number); + while current_l1_block_number <= latest_l1_block_number.0[0] { // Create a filter showing only `BlockCommit`s from the [`ZK_SYNC_ADDR`]. // TODO: Filter by executed blocks too. let filter = Filter::new() .address(ZK_SYNC_ADDR.parse::
()?) .topic0(event.signature()) - .from_block(current_block) - .to_block(current_block + BLOCK_STEP); + .from_block(current_l1_block_number) + .to_block(current_l1_block_number + BLOCK_STEP); // Grab all relevant logs. let logs = provider.get_logs(&filter).await?; for log in logs { - println!("{:?}", log); // log.topics: // topics[1]: L2 block number. // topics[2]: L2 block hash. // topics[3]: L2 commitment. - // TODO: Check for already processed blocks. let new_l2_block_number = U256::from_big_endian(log.topics[1].as_fixed_bytes()); if new_l2_block_number <= latest_l2_block_number { @@ -169,13 +186,20 @@ mod tests { println!("Parsed {} new blocks", num_blocks); for block in blocks { - latest_l2_block_number = tree.insert_block(block); + latest_l2_block_number = tree.insert_block(&block); + + // Update snapshot values. + state_snapshot.latest_l2_block_number = latest_l2_block_number; + state_snapshot.index_to_key_map = tree.index_to_key.clone(); } } } - // Increment current block index. - current_block += BLOCK_STEP; + current_l1_block_number += BLOCK_STEP; + + // Update snapshot values and write the current state to a file. + state_snapshot.current_l1_block_number = current_l1_block_number; + state_snapshot.write(STATE_FILE_PATH)?; } Ok(()) diff --git a/src/snapshot.rs b/src/snapshot.rs new file mode 100644 index 0000000..771ac2e --- /dev/null +++ b/src/snapshot.rs @@ -0,0 +1,57 @@ +#![allow(dead_code)] +use std::{fs, io}; + +use ethers::types::U256; +use indexmap::IndexSet; +use serde::{Deserialize, Serialize}; + +use crate::GENESIS_BLOCK; + +/// Struct containing the fields used for restoring the tree state. +#[derive(Serialize, Deserialize)] +pub struct StateSnapshot { + /// The latest l1 block number that was processed. + pub current_l1_block_number: u64, + /// The latest l2 block number that was processed. + pub latest_l2_block_number: U256, + /// The mappings of index to key values. + pub index_to_key_map: IndexSet, +} + +impl Default for StateSnapshot { + fn default() -> Self { + Self { + current_l1_block_number: GENESIS_BLOCK, + latest_l2_block_number: U256::default(), + index_to_key_map: IndexSet::default(), + } + } +} + +impl StateSnapshot { + pub fn new( + latest_l1_block_number: u64, + latest_l2_block_number: U256, + index_to_key_map: IndexSet, + ) -> Self { + Self { + current_l1_block_number: latest_l1_block_number, + latest_l2_block_number, + index_to_key_map, + } + } + + /// Reads and serializes the json file containing the index to key mappings. + pub fn read(path: &str) -> Result { + let contents = fs::read_to_string(path)?; + let state: StateSnapshot = serde_json::from_str(&contents)?; + + Ok(state) + } + + /// Writes the json file containing the index to key mappings. + pub fn write(&self, path: &str) -> Result<(), io::Error> { + let content = serde_json::to_string_pretty(&self)?; + fs::write(path, content) + } +} diff --git a/src/tree.rs b/src/tree.rs index 645a14a..fd6758d 100644 --- a/src/tree.rs +++ b/src/tree.rs @@ -3,30 +3,37 @@ use std::{fs, path::Path, str::FromStr}; use ethers::types::{Address, H256, U256}; +use indexmap::IndexSet; use zk_evm::aux_structures::LogQuery; use zksync_merkle_tree::{Database, MerkleTree, RocksDBWrapper}; use eyre::Result; -use crate::{state::CommitBlockInfoV1, INITAL_STATE_PATH}; +use crate::{types::CommitBlockInfoV1, INITAL_STATE_PATH}; pub struct TreeWrapper<'a> { pub tree: MerkleTree<'a, RocksDBWrapper>, - // FIXME: How to save this for persistant storage? - pub index_to_key: Vec, + pub index_to_key: IndexSet, } impl TreeWrapper<'static> { - pub fn new(db_dir: &Path) -> Result { + /// Attempts to create a new [`TreeWrapper`]. + pub fn new(db_dir: &Path, mut index_to_key: IndexSet) -> Result { let db = RocksDBWrapper::new(db_dir); let mut tree = MerkleTree::new(db); - let index_to_key = reconstruct_genesis_state(&mut tree, INITAL_STATE_PATH)?; + + // If an existing `index_to_key` mapping was supplied, use that. + // Otherwise, reconstruct the genesis state. + if index_to_key.is_empty() { + println!("was empty!"); + reconstruct_genesis_state(&mut tree, &mut index_to_key, INITAL_STATE_PATH)?; + } Ok(Self { tree, index_to_key }) } /// Inserts a block into the tree and returns the new block number. - pub fn insert_block(&mut self, block: CommitBlockInfoV1) -> U256 { + pub fn insert_block(&mut self, block: &CommitBlockInfoV1) -> U256 { let new_l2_block_number = block.block_number; // INITIAL CALLDATA. let mut key_value_pairs: Vec<(U256, H256)> = @@ -36,14 +43,14 @@ impl TreeWrapper<'static> { let value = H256::from(value); key_value_pairs.push((key, value)); - self.index_to_key.push(key); + self.index_to_key.insert(key); } // REPEATED CALLDATA. for (index, value) in &block.repeated_storage_changes { let index = *index as usize; // Index is 1-based so we subtract 1. - let key = *self.index_to_key.get(index - 1).unwrap(); + let key = *self.index_to_key.get_index(index - 1).unwrap(); let value = H256::from(value); key_value_pairs.push((key, value)); @@ -66,8 +73,9 @@ impl TreeWrapper<'static> { /// Attempts to reconstruct the genesis state from a CSV file. fn reconstruct_genesis_state( tree: &mut MerkleTree, + index_to_key: &mut IndexSet, path: &str, -) -> Result> { +) -> Result<()> { fn cleanup_encoding(input: &'_ str) -> &'_ str { input .strip_prefix("E'\\\\x") @@ -139,7 +147,6 @@ fn reconstruct_genesis_state( println!("Have {} unique keys in the tree", key_set.len()); - let mut index_to_key = Vec::with_capacity(batched.len()); let mut key_value_pairs: Vec<(U256, H256)> = Vec::with_capacity(batched.len()); for (address, key, value) in batched { let derived_key = LogQuery::derive_final_address_for_params(&address, &key); @@ -159,12 +166,12 @@ fn reconstruct_genesis_state( let key = U256::from_little_endian(&derived_key); let value = H256::from(tmp); key_value_pairs.push((key, value)); - index_to_key.push(key); + index_to_key.insert(key); } let output = tree.extend(key_value_pairs); dbg!(tree.latest_version()); println!("Initial state root = {}", hex::encode(output.root_hash)); - Ok(index_to_key) + Ok(()) } diff --git a/src/state.rs b/src/types.rs similarity index 100% rename from src/state.rs rename to src/types.rs