From f62e43c3efbc47fdf44e0456818689d32a41eb8b Mon Sep 17 00:00:00 2001 From: Jonathan <94441036+zeapoz@users.noreply.github.com> Date: Thu, 5 Oct 2023 11:23:26 +0200 Subject: [PATCH] Port all `lib.rs` code into binary (#11) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: tree processor * ref: move lib.rs code into bin * Update src/constants.rs Co-authored-by: Tuomas Mäkinen <1947505+tuommaki@users.noreply.github.com> * ref: async trait functions * chore: update imports --------- Co-authored-by: Tuomas Mäkinen <1947505+tuommaki@users.noreply.github.com> --- Cargo.toml | 1 + src/constants.rs | 9 + src/l1_fetcher.rs | 75 ++++++- src/lib.rs | 205 ------------------ src/main.rs | 25 ++- src/processor/mod.rs | 11 + src/processor/tree/mod.rs | 74 +++++++ src/{ => processor/tree}/snapshot.rs | 31 +-- .../tree/tree_wrapper.rs} | 34 +-- src/types.rs | 3 + 10 files changed, 209 insertions(+), 259 deletions(-) delete mode 100644 src/lib.rs create mode 100644 src/processor/mod.rs create mode 100644 src/processor/tree/mod.rs rename src/{ => processor/tree}/snapshot.rs (53%) rename src/{tree.rs => processor/tree/tree_wrapper.rs} (87%) diff --git a/Cargo.toml b/Cargo.toml index 5d725a9..64de80e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1.73" clap = { version = "4.4.0", features = ["string"] } ethers = "1" eyre = "0.6.8" diff --git a/src/constants.rs b/src/constants.rs index f666eba..84a0b79 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -8,3 +8,12 @@ pub mod ethereum { /// zkSync smart contract address. pub const ZK_SYNC_ADDR: &str = "0x32400084C286CF3E17e7B677ea9583e60a000324"; } + +pub mod storage { + /// The path to the initial state file. + pub const INITAL_STATE_PATH: &str = "InitialState.csv"; + + // TODO: Configure via env / CLI. + /// The path of the state file. + pub const STATE_FILE_PATH: &str = "StateSnapshot.json"; +} diff --git a/src/l1_fetcher.rs b/src/l1_fetcher.rs index cfbbabb..5b64bef 100644 --- a/src/l1_fetcher.rs +++ b/src/l1_fetcher.rs @@ -1,12 +1,13 @@ +use ethers::abi::Function; use ethers::{abi::Contract, prelude::*, providers::Provider}; use eyre::Result; use rand::random; -use state_reconstruct::constants::ethereum::{BLOCK_STEP, GENESIS_BLOCK, ZK_SYNC_ADDR}; -use state_reconstruct::parse_calldata; -use state_reconstruct::CommitBlockInfoV1; use tokio::sync::mpsc; use tokio::time::{sleep, Duration}; +use crate::constants::ethereum::{BLOCK_STEP, GENESIS_BLOCK, ZK_SYNC_ADDR}; +use crate::types::{CommitBlockInfoV1, ParseError}; + pub struct L1Fetcher { provider: Provider, contract: Contract, @@ -131,3 +132,71 @@ impl L1Fetcher { Ok(()) } } + +pub fn parse_calldata( + commit_blocks_fn: &Function, + calldata: &[u8], +) -> Result> { + let mut parsed_input = commit_blocks_fn + .decode_input(&calldata[4..]) + .map_err(|e| ParseError::InvalidCalldata(e.to_string()))?; + + if parsed_input.len() != 2 { + return Err(ParseError::InvalidCalldata(format!( + "invalid number of parameters (got {}, expected 2) for commitBlocks function", + parsed_input.len() + )) + .into()); + } + + let new_blocks_data = parsed_input + .pop() + .ok_or_else(|| ParseError::InvalidCalldata("new blocks data".to_string()))?; + let stored_block_info = parsed_input + .pop() + .ok_or_else(|| ParseError::InvalidCalldata("stored block info".to_string()))?; + + let abi::Token::Tuple(stored_block_info) = stored_block_info else { + return Err(ParseError::InvalidCalldata("invalid StoredBlockInfo".to_string()).into()); + }; + + let abi::Token::Uint(_previous_l2_block_number) = stored_block_info[0].clone() else { + return Err(ParseError::InvalidStoredBlockInfo( + "cannot parse previous L2 block number".to_string(), + ) + .into()); + }; + + let abi::Token::Uint(_previous_enumeration_index) = stored_block_info[2].clone() else { + return Err(ParseError::InvalidStoredBlockInfo( + "cannot parse previous enumeration index".to_string(), + ) + .into()); + }; + + //let previous_enumeration_index = previous_enumeration_index.0[0]; + // TODO: What to do here? + // assert_eq!(previous_enumeration_index, tree.next_enumeration_index()); + + parse_commit_block_info(&new_blocks_data) +} + +fn parse_commit_block_info(data: &abi::Token) -> Result> { + let mut res = vec![]; + + let abi::Token::Array(data) = data else { + return Err(ParseError::InvalidCommitBlockInfo( + "cannot convert newBlocksData to array".to_string(), + ) + .into()); + }; + + for data in data.iter() { + match CommitBlockInfoV1::try_from(data) { + Ok(blk) => res.push(blk), + Err(e) => println!("failed to parse commit block info: {}", e), + } + } + + Ok(res) +} diff --git a/src/lib.rs b/src/lib.rs deleted file mode 100644 index 1a12f0a..0000000 --- a/src/lib.rs +++ /dev/null @@ -1,205 +0,0 @@ -#![feature(array_chunks)] -// #![warn(clippy::pedantic)] - -pub mod constants; -mod snapshot; -mod tree; -mod types; - -pub use crate::types::CommitBlockInfoV1; - -use ethers::{ - abi::{Contract, Function}, - prelude::*, - providers::Provider, -}; -use eyre::Result; - -pub const INITAL_STATE_PATH: &str = "InitialState.csv"; -pub const STATE_FILE_PATH: &str = "StateSnapshot.json"; - -pub async fn init_eth_adapter(http_url: &str) -> (Provider, Contract) { - let provider = - Provider::::try_from(http_url).expect("could not instantiate HTTP Provider"); - - let abi_file = std::fs::File::open("./IZkSync.json").unwrap(); - let contract = Contract::load(abi_file).unwrap(); - - (provider, contract) -} - -pub fn parse_calldata( - commit_blocks_fn: &Function, - calldata: &[u8], -) -> Result> { - let mut parsed_input = commit_blocks_fn - .decode_input(&calldata[4..]) - .map_err(|e| types::ParseError::InvalidCalldata(e.to_string()))?; - - if parsed_input.len() != 2 { - return Err(types::ParseError::InvalidCalldata(format!( - "invalid number of parameters (got {}, expected 2) for commitBlocks function", - parsed_input.len() - )) - .into()); - } - - let new_blocks_data = parsed_input - .pop() - .ok_or_else(|| types::ParseError::InvalidCalldata("new blocks data".to_string()))?; - let stored_block_info = parsed_input - .pop() - .ok_or_else(|| types::ParseError::InvalidCalldata("stored block info".to_string()))?; - - let abi::Token::Tuple(stored_block_info) = stored_block_info else { - return Err( - types::ParseError::InvalidCalldata("invalid StoredBlockInfo".to_string()).into(), - ); - }; - - let abi::Token::Uint(_previous_l2_block_number) = stored_block_info[0].clone() else { - return Err(types::ParseError::InvalidStoredBlockInfo( - "cannot parse previous L2 block number".to_string(), - ) - .into()); - }; - - let abi::Token::Uint(_previous_enumeration_index) = stored_block_info[2].clone() else { - return Err(types::ParseError::InvalidStoredBlockInfo( - "cannot parse previous enumeration index".to_string(), - ) - .into()); - }; - - //let previous_enumeration_index = previous_enumeration_index.0[0]; - // TODO: What to do here? - // assert_eq!(previous_enumeration_index, tree.next_enumeration_index()); - - parse_commit_block_info(&new_blocks_data) -} - -fn parse_commit_block_info(data: &abi::Token) -> Result> { - let mut res = vec![]; - - let abi::Token::Array(data) = data else { - return Err(types::ParseError::InvalidCommitBlockInfo( - "cannot convert newBlocksData to array".to_string(), - ) - .into()); - }; - - for data in data.iter() { - match CommitBlockInfoV1::try_from(data) { - Ok(blk) => res.push(blk), - Err(e) => println!("failed to parse commit block info: {}", e), - } - } - - Ok(res) -} - -#[cfg(test)] -mod tests { - use std::env; - - use ethers::{ - providers::Middleware, - types::{Address, BlockNumber, Filter}, - }; - - use eyre::Result; - - use crate::{constants::ethereum, snapshot::StateSnapshot, tree::TreeWrapper}; - - use super::*; - - #[ignore] - #[tokio::test] - async fn it_works() -> Result<()> { - // TODO: This should be an env variable / CLI argument. - let db_dir = env::current_dir()?.join("db"); - - // TODO: Implement graceful shutdown. - // If database directory already exists, we try to restore the latest state. - // The state contains the last processed block and a mapping of index to key - // values, if a state file does not exist, we simply use the defaults instead. - let should_restore_state = db_dir.exists(); - let mut state_snapshot = if should_restore_state { - println!("Loading previous state file..."); - StateSnapshot::read(STATE_FILE_PATH).expect("state file is malformed") - } else { - println!("No existing database found, starting from genesis..."); - StateSnapshot::default() - }; - - // Extract fields from state snapshot. - let StateSnapshot { - mut current_l1_block_number, - mut latest_l2_block_number, - ref index_to_key_map, - } = state_snapshot; - - let mut tree = TreeWrapper::new(db_dir.as_path(), index_to_key_map.clone())?; - - let (provider, contract) = init_eth_adapter("https://eth.llamarpc.com").await; - let latest_l1_block_number = provider - .get_block(BlockNumber::Latest) - .await? - .unwrap() - .number - .unwrap(); - - let event = contract.events_by_name("BlockCommit")?[0].clone(); - let function = contract.functions_by_name("commitBlocks")?[0].clone(); - - println!("Starting from l1 block: {}", current_l1_block_number); - while current_l1_block_number <= latest_l1_block_number.0[0] { - // Create a filter showing only `BlockCommit`s from the [`ZK_SYNC_ADDR`]. - // TODO: Filter by executed blocks too. - let filter = Filter::new() - .address(ethereum::ZK_SYNC_ADDR.parse::
()?) - .topic0(event.signature()) - .from_block(current_l1_block_number) - .to_block(current_l1_block_number + ethereum::BLOCK_STEP); - - // Grab all relevant logs. - let logs = provider.get_logs(&filter).await?; - for log in logs { - // log.topics: - // topics[1]: L2 block number. - // topics[2]: L2 block hash. - // topics[3]: L2 commitment. - - let new_l2_block_number = U256::from_big_endian(log.topics[1].as_fixed_bytes()); - if new_l2_block_number <= latest_l2_block_number { - continue; - } - - if let Some(tx_hash) = log.transaction_hash { - let tx = provider.get_transaction(tx_hash).await?.unwrap(); - let calldata = tx.input; - let blocks = parse_calldata(&function, &calldata)?; - - let num_blocks = blocks.len(); - println!("Parsed {} new blocks", num_blocks); - - for block in blocks { - latest_l2_block_number = tree.insert_block(&block); - - // Update snapshot values. - state_snapshot.latest_l2_block_number = latest_l2_block_number; - state_snapshot.index_to_key_map = tree.index_to_key.clone(); - } - } - } - // Increment current block index. - current_l1_block_number += ethereum::BLOCK_STEP; - - // Update snapshot values and write the current state to a file. - state_snapshot.current_l1_block_number = current_l1_block_number; - state_snapshot.write(STATE_FILE_PATH)?; - } - - Ok(()) - } -} diff --git a/src/main.rs b/src/main.rs index 3dea5f1..7fdd9ef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,23 @@ +#![feature(array_chunks)] + mod constants; mod l1_fetcher; +mod processor; +mod types; + +use std::env; use clap::{arg, value_parser, Command}; +use constants::ethereum; use ethers::types::U64; use eyre::Result; use l1_fetcher::L1Fetcher; -use state_reconstruct::CommitBlockInfoV1; use tokio::sync::mpsc; -use constants::ethereum; +use crate::{ + processor::{tree::TreeProcessor, Processor}, + types::CommitBlockInfoV1, +}; fn cli() -> Command { Command::new("state-reconstruct") @@ -52,17 +61,21 @@ async fn main() -> Result<()> { match matches.subcommand() { Some(("reconstruct", sub_matches)) => match sub_matches.subcommand() { Some(("l1", args)) => { + // TODO: Use start_block from snapshot. let start_block = args.get_one::("start-block").expect("required"); let block_step = args.get_one::("block-step").expect("required"); let http_url = args.get_one::("http-url").expect("required"); println!("reconstruct from L1, starting from block number {}, processing {} blocks at a time", start_block, block_step); + // TODO: This should be an env variable / CLI argument. + let db_dir = env::current_dir()?.join("db"); + let fetcher = L1Fetcher::new(http_url)?; - let (tx, mut rx) = mpsc::channel::>(5); + let processor = TreeProcessor::new(&db_dir)?; + let (tx, rx) = mpsc::channel::>(5); + tokio::spawn(async move { - while let Some(blks) = rx.recv().await { - blks.iter().for_each(|x| println!("{:?}", x)); - } + processor.run(rx).await; }); fetcher.fetch(tx, Some(U64([*start_block])), None).await?; diff --git a/src/processor/mod.rs b/src/processor/mod.rs new file mode 100644 index 0000000..aa3c42d --- /dev/null +++ b/src/processor/mod.rs @@ -0,0 +1,11 @@ +use async_trait::async_trait; +use tokio::sync::mpsc; + +use crate::types::CommitBlockInfoV1; + +pub mod tree; + +#[async_trait] +pub trait Processor { + async fn run(self, rx: mpsc::Receiver>); +} diff --git a/src/processor/tree/mod.rs b/src/processor/tree/mod.rs new file mode 100644 index 0000000..d511a60 --- /dev/null +++ b/src/processor/tree/mod.rs @@ -0,0 +1,74 @@ +mod snapshot; +mod tree_wrapper; + +use std::path::Path; + +use async_trait::async_trait; +use eyre::Result; +use tokio::sync::mpsc; + +use crate::constants::storage::STATE_FILE_PATH; +use crate::types::CommitBlockInfoV1; + +use self::{snapshot::StateSnapshot, tree_wrapper::TreeWrapper}; + +use super::Processor; + +pub struct TreeProcessor<'a> { + tree: TreeWrapper<'a>, + snapshot: StateSnapshot, +} + +impl TreeProcessor<'static> { + pub fn new(db_dir: &Path) -> Result { + // TODO: Implement graceful shutdown. + // If database directory already exists, we try to restore the latest state. + // The state contains the last processed block and a mapping of index to key + // values, if a state file does not exist, we simply use the defaults instead. + let should_restore_state = db_dir.exists(); + let snapshot = if should_restore_state { + println!("Loading previous state file..."); + StateSnapshot::read(STATE_FILE_PATH).expect("state file is malformed") + } else { + println!("No existing database found, starting from genesis..."); + StateSnapshot::default() + }; + + // Extract `index_to_key_map` from state snapshot. + let StateSnapshot { + ref index_to_key_map, + .. // Ignore the rest of the fields. + } = snapshot; + + let tree = TreeWrapper::new(db_dir, index_to_key_map.clone())?; + + Ok(Self { tree, snapshot }) + } +} + +#[async_trait] +impl Processor for TreeProcessor<'static> { + async fn run(mut self, mut rx: mpsc::Receiver>) { + while let Some(blocks) = rx.recv().await { + for block in blocks { + // Check if we've already processed this block. + if self.snapshot.latest_l2_block_number >= block.block_number { + println!( + "Block {} has already been processed, skipping.", + block.block_number + ); + continue; + } + + self.tree.insert_block(&block); + + // Update snapshot values. + self.snapshot.latest_l2_block_number = block.block_number; + self.snapshot.index_to_key_map = self.tree.index_to_key_map.clone(); + } + + // Write the current state to a file. + self.snapshot.write(STATE_FILE_PATH).unwrap(); + } + } +} diff --git a/src/snapshot.rs b/src/processor/tree/snapshot.rs similarity index 53% rename from src/snapshot.rs rename to src/processor/tree/snapshot.rs index 2804a55..f6f06e8 100644 --- a/src/snapshot.rs +++ b/src/processor/tree/snapshot.rs @@ -1,46 +1,19 @@ -#![allow(dead_code)] use std::{fs, io}; use ethers::types::U256; use indexmap::IndexSet; use serde::{Deserialize, Serialize}; -use crate::constants::ethereum; - /// Struct containing the fields used for restoring the tree state. -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Default)] pub struct StateSnapshot { - /// The latest l1 block number that was processed. - pub current_l1_block_number: u64, /// The latest l2 block number that was processed. - pub latest_l2_block_number: U256, + pub latest_l2_block_number: u64, /// The mappings of index to key values. pub index_to_key_map: IndexSet, } -impl Default for StateSnapshot { - fn default() -> Self { - Self { - current_l1_block_number: ethereum::GENESIS_BLOCK, - latest_l2_block_number: U256::default(), - index_to_key_map: IndexSet::default(), - } - } -} - impl StateSnapshot { - pub fn new( - latest_l1_block_number: u64, - latest_l2_block_number: U256, - index_to_key_map: IndexSet, - ) -> Self { - Self { - current_l1_block_number: latest_l1_block_number, - latest_l2_block_number, - index_to_key_map, - } - } - /// Reads and serializes the json file containing the index to key mappings. pub fn read(path: &str) -> Result { let contents = fs::read_to_string(path)?; diff --git a/src/tree.rs b/src/processor/tree/tree_wrapper.rs similarity index 87% rename from src/tree.rs rename to src/processor/tree/tree_wrapper.rs index fd6758d..85d9772 100644 --- a/src/tree.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -1,5 +1,3 @@ -// FIXME: Remove once we have a binary in place. -#![allow(dead_code)] use std::{fs, path::Path, str::FromStr}; use ethers::types::{Address, H256, U256}; @@ -9,32 +7,36 @@ use zksync_merkle_tree::{Database, MerkleTree, RocksDBWrapper}; use eyre::Result; -use crate::{types::CommitBlockInfoV1, INITAL_STATE_PATH}; +use crate::{constants::storage::INITAL_STATE_PATH, CommitBlockInfoV1}; + +pub type RootHash = H256; pub struct TreeWrapper<'a> { - pub tree: MerkleTree<'a, RocksDBWrapper>, - pub index_to_key: IndexSet, + tree: MerkleTree<'a, RocksDBWrapper>, + pub index_to_key_map: IndexSet, } impl TreeWrapper<'static> { /// Attempts to create a new [`TreeWrapper`]. - pub fn new(db_dir: &Path, mut index_to_key: IndexSet) -> Result { + pub fn new(db_dir: &Path, mut index_to_key_map: IndexSet) -> Result { let db = RocksDBWrapper::new(db_dir); let mut tree = MerkleTree::new(db); // If an existing `index_to_key` mapping was supplied, use that. // Otherwise, reconstruct the genesis state. - if index_to_key.is_empty() { + if index_to_key_map.is_empty() { println!("was empty!"); - reconstruct_genesis_state(&mut tree, &mut index_to_key, INITAL_STATE_PATH)?; + reconstruct_genesis_state(&mut tree, &mut index_to_key_map, INITAL_STATE_PATH)?; } - Ok(Self { tree, index_to_key }) + Ok(Self { + tree, + index_to_key_map, + }) } - /// Inserts a block into the tree and returns the new block number. - pub fn insert_block(&mut self, block: &CommitBlockInfoV1) -> U256 { - let new_l2_block_number = block.block_number; + /// Inserts a block into the tree and returns the root hash of the resulting state tree. + pub fn insert_block(&mut self, block: &CommitBlockInfoV1) -> RootHash { // INITIAL CALLDATA. let mut key_value_pairs: Vec<(U256, H256)> = Vec::with_capacity(block.initial_storage_changes.len()); @@ -43,14 +45,14 @@ impl TreeWrapper<'static> { let value = H256::from(value); key_value_pairs.push((key, value)); - self.index_to_key.insert(key); + self.index_to_key_map.insert(key); } // REPEATED CALLDATA. for (index, value) in &block.repeated_storage_changes { let index = *index as usize; // Index is 1-based so we subtract 1. - let key = *self.index_to_key.get_index(index - 1).unwrap(); + let key = *self.index_to_key_map.get_index(index - 1).unwrap(); let value = H256::from(value); key_value_pairs.push((key, value)); @@ -62,11 +64,11 @@ impl TreeWrapper<'static> { assert_eq!(root_hash.as_bytes(), block.new_state_root); println!( "Root hash of block {} = {}", - new_l2_block_number, + block.block_number, hex::encode(root_hash) ); - U256::from(new_l2_block_number) + root_hash } } diff --git a/src/types.rs b/src/types.rs index f805060..aa89dd1 100644 --- a/src/types.rs +++ b/src/types.rs @@ -269,6 +269,7 @@ pub fn decompress_bytecode(data: Vec) -> Result> { Ok(bytecode) } +#[allow(dead_code)] pub enum L2ToL1Pubdata { L2ToL1Log, L2ToL2Message, @@ -277,6 +278,7 @@ pub enum L2ToL1Pubdata { } /// Data needed to commit new block +#[allow(dead_code)] pub struct CommitBlockInfoV2 { /// L2 block number. pub block_number: u64, @@ -297,6 +299,7 @@ pub struct CommitBlockInfoV2 { } impl CommitBlockInfoV1 { + #[allow(dead_code)] pub fn as_v2(&self) -> CommitBlockInfoV2 { CommitBlockInfoV2 { block_number: self.block_number,