Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial export snapshot skeleton #40

Merged
merged 2 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = ["state-reconstruct-fetcher"]
[dependencies]
async-trait = "0.1.74"
blake2 = "0.10.6"
chrono = "0.4.31"
clap = { version = "4.4.7", features = ["derive", "env"] }
ethers = "1.0.2"
eyre = "0.6.8"
Expand Down
8 changes: 8 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ pub enum Command {
#[arg(short, long, env = "ZK_SYNC_DB_PATH")]
db_path: Option<String>,
},

/// Testing.
ExportSnapshot {
#[command(flatten)]
l1_fetcher_options: L1FetcherOptions,
/// The path of the file to export the snapshot to.
file: Option<String>,
},
}

#[derive(Parser)]
Expand Down
25 changes: 25 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
use clap::Parser;
use cli::{Cli, Command, ReconstructSource};
use eyre::Result;
use processor::snapshot::SnapshotExporter;
use state_reconstruct_fetcher::{
constants::storage,
l1_fetcher::{L1Fetcher, L1FetcherOptions},
Expand Down Expand Up @@ -52,6 +53,7 @@ fn start_logger(default_level: LevelFilter) {
}

#[tokio::main]
#[allow(clippy::too_many_lines)]
async fn main() -> Result<()> {
start_logger(LevelFilter::INFO);

Expand Down Expand Up @@ -150,6 +152,29 @@ async fn main() -> Result<()> {
println!("{result}");
}
}
Command::ExportSnapshot {
l1_fetcher_options,
file,
} => {
let fetcher_options = L1FetcherOptions {
http_url: l1_fetcher_options.http_url,
start_block: l1_fetcher_options.start_block,
block_step: l1_fetcher_options.block_step,
block_count: l1_fetcher_options.block_count,
disable_polling: l1_fetcher_options.disable_polling,
};

let fetcher = L1Fetcher::new(fetcher_options, None)?;
let processor = SnapshotExporter::new(file);

let (tx, rx) = mpsc::channel::<CommitBlockInfoV1>(5);
let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
});

fetcher.run(tx).await?;
processor_handle.await?;
}
}

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion src/processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use state_reconstruct_fetcher::types::CommitBlockInfoV1;
use tokio::sync::mpsc;

pub mod json;
pub mod snapshot;
pub mod tree;

#[async_trait]
pub trait Processor {
async fn run(self, rx: mpsc::Receiver<CommitBlockInfoV1>);
async fn run(self, mut rx: mpsc::Receiver<CommitBlockInfoV1>);
}
238 changes: 238 additions & 0 deletions src/processor/snapshot/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
use std::{collections::HashMap, fmt, fs, path::PathBuf, str::FromStr};

mod types;

use async_trait::async_trait;
use blake2::{Blake2s256, Digest};
use ethers::types::{Address, H256, U256, U64};
use eyre::Result;
use indexmap::IndexSet;
use state_reconstruct_fetcher::{
constants::{ethereum, storage},
types::CommitBlockInfoV1,
};
use tokio::sync::mpsc;

use self::types::{SnapshotStorageLog, StorageKey, StorageValue};
use super::Processor;
use crate::processor::snapshot::types::MiniblockNumber;

// NOTE: What file extension to use?
const DEFAULT_EXPORT_PATH: &str = "snapshot_export";

pub struct SnapshotExporter {
storage_log_entries: HashMap<StorageKey, SnapshotStorageLog>,
index_to_key_map: IndexSet<U256>,
path: PathBuf,
}

impl SnapshotExporter {
pub fn new(path: Option<String>) -> Self {
let path = match path {
Some(p) => PathBuf::from(p),
None => PathBuf::from(DEFAULT_EXPORT_PATH),
};

let mut index_to_key_map = IndexSet::new();
let mut storage_log_entries = HashMap::new();

reconstruct_genesis_state(
&mut storage_log_entries,
&mut index_to_key_map,
storage::INITAL_STATE_PATH,
)
.unwrap();

Self {
storage_log_entries,
index_to_key_map,
path,
}
}
}

impl fmt::Display for SnapshotExporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut s = String::new();

for entry in &self.storage_log_entries {
s.push_str(&entry.1.to_string());
s.push('\n');
}

write!(f, "{s}")
}
}

#[async_trait]
impl Processor for SnapshotExporter {
async fn run(mut self, mut rx: mpsc::Receiver<CommitBlockInfoV1>) {
// TODO: Send these from fetcher.
let miniblock_number = U64::from(0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let miniblock_number = U64::from(0);

This will be sort of "WONTFIX", because we don't have any information about miniblocks. AFAIU miniblocks are the L2 transaction bundles and those aren't stored in L1. Only the resulting state.

let l1_block_number = U64::from(0);

while let Some(block) = rx.recv().await {
// Initial calldata.
for (key, value) in &block.initial_storage_changes {
let key = U256::from_little_endian(key);
let value = H256::from(value);
self.index_to_key_map.insert(key);

let log = self
.storage_log_entries
.entry(key)
.or_insert(SnapshotStorageLog {
key,
value: StorageValue::default(),
miniblock_number_of_initial_write: miniblock_number,
l1_batch_number_of_initial_write: l1_block_number,
enumeration_index: 0,
});
log.value = value;
}

// Repeated calldata.
for (index, value) in &block.repeated_storage_changes {
let index = usize::try_from(*index).expect("truncation failed");
// Index is 1-based so we subtract 1.
let key = *self.index_to_key_map.get_index(index - 1).unwrap();
let value = H256::from(value);

self.storage_log_entries
.entry(key)
.and_modify(|log| log.value = value);
}

// TODO: We need to index these by hash.
// Factory dependencies.
// for dep in &block.factory_deps {}
}

fs::write(&self.path, self.to_string()).expect("failed to export snapshot");
tracing::info!("Successfully exported snapshot to {}", self.path.display());
}
}

// TODO: Can this be made somewhat generic?
/// Attempts to reconstruct the genesis state from a CSV file.
fn reconstruct_genesis_state(
storage_log_entries: &mut HashMap<U256, SnapshotStorageLog>,
index_to_key: &mut IndexSet<U256>,
path: &str,
) -> Result<()> {
fn cleanup_encoding(input: &'_ str) -> &'_ str {
input
.strip_prefix("E'\\\\x")
.unwrap()
.strip_suffix('\'')
.unwrap()
}

let mut block_batched_accesses = vec![];

let input = fs::read_to_string(path)?;
for line in input.lines() {
let mut separated = line.split(',');
let _derived_key = separated.next().unwrap();
let address = separated.next().unwrap();
let key = separated.next().unwrap();
let value = separated.next().unwrap();
let op_number: u32 = separated.next().unwrap().parse()?;
let _ = separated.next().unwrap();
let miniblock_number: u32 = separated.next().unwrap().parse()?;

if miniblock_number != 0 {
break;
}

let address = Address::from_str(cleanup_encoding(address))?;
let key = U256::from_str_radix(cleanup_encoding(key), 16)?;
let value = U256::from_str_radix(cleanup_encoding(value), 16)?;

let record = (address, key, value, op_number, miniblock_number);
block_batched_accesses.push(record);
}

// Sort in block block.
block_batched_accesses.sort_by(|a, b| match a.0.cmp(&b.0) {
std::cmp::Ordering::Equal => match a.1.cmp(&b.1) {
std::cmp::Ordering::Equal => match a.3.cmp(&b.3) {
std::cmp::Ordering::Equal => {
panic!("must be unique")
}
a => a,
},
a => a,
},
a => a,
});

let mut key_set = std::collections::HashSet::new();

// Batch.
for el in &block_batched_accesses {
let derived_key = derive_final_address_for_params(&el.0, &el.1);
key_set.insert(derived_key);
}

let mut batched = vec![];
let mut it = block_batched_accesses.into_iter();
let mut previous = it.next().unwrap();
for el in it {
if el.0 != previous.0 || el.1 != previous.1 {
batched.push((previous.0, previous.1, previous.2, previous.4));
}

previous = el;
}

// Finalize.
batched.push((previous.0, previous.1, previous.2, previous.4));

tracing::trace!("Have {} unique keys in the tree", key_set.len());

for (address, key, value, miniblock_number) in batched {
let derived_key = derive_final_address_for_params(&address, &key);
// TODO: what to do here?
// let version = tree.latest_version().unwrap_or_default();
// let _leaf = tree.read_leaves(version, &[key]);

// let existing_value = U256::from_big_endian(existing_leaf.leaf.value());
// if existing_value == value {
// // we downgrade to read
// // println!("Downgrading to read")
// } else {
// we write
let mut tmp = [0u8; 32];
value.to_big_endian(&mut tmp);

let key = U256::from_little_endian(&derived_key);
let value = H256::from(tmp);

let log = storage_log_entries
.entry(key)
.or_insert(SnapshotStorageLog {
key,
value: StorageValue::default(),
miniblock_number_of_initial_write: MiniblockNumber::from(miniblock_number),
l1_batch_number_of_initial_write: U64::from(ethereum::GENESIS_BLOCK),
enumeration_index: 0,
});

log.value = value;
index_to_key.insert(key);
}

Ok(())
}

fn derive_final_address_for_params(address: &Address, key: &U256) -> [u8; 32] {
let mut buffer = [0u8; 64];
buffer[12..32].copy_from_slice(&address.0);
key.to_big_endian(&mut buffer[32..64]);

let mut result = [0u8; 32];
result.copy_from_slice(Blake2s256::digest(buffer).as_slice());

result
}
Loading