Skip to content

Commit

Permalink
chore: snapshot format update (#108)
Browse files Browse the repository at this point in the history
* fix: protobuf

* chore: renaming fields

* chore: adjust snapshots to always chunk in 10s

* chore: serialize values as numbers not strings

* chore(snapshots): l1 batch number offset

* chore: serde rename all

* doc: update comment
  • Loading branch information
zeapoz authored Jul 16, 2024
1 parent ead5d4a commit 6be1288
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 79 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ StateSnapshot.json

# Markdown linting rules.
.markdownlint.json

.DS_Store
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[workspace]
members = [
"state-reconstruct-fetcher",
"state-reconstruct-storage",
]
members = ["state-reconstruct-fetcher", "state-reconstruct-storage"]

[dependencies]
async-trait = "0.1.74"
Expand Down
23 changes: 0 additions & 23 deletions proto/snapshot.proto

This file was deleted.

6 changes: 3 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ pub enum Command {
/// The path to the storage solution.
#[arg(short, long, default_value = snapshot::DEFAULT_DB_PATH)]
db_path: Option<String>,
/// Number of storage logs to stuff into one chunk.
#[arg(short, long, default_value_t = snapshot::DEFAULT_CHUNK_SIZE)]
chunk_size: usize,
/// Number of chunks to split storage chunks into.
#[arg(short, long, default_value_t = snapshot::DEFAULT_NUM_CHUNKS)]
num_chunks: usize,
/// The directory to export the snapshot files to.
directory: String,
},
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ async fn main() -> Result<()> {
}
Command::ExportSnapshot {
db_path,
chunk_size,
num_chunks,
directory,
} => {
let export_path = Path::new(&directory);
std::fs::create_dir_all(export_path)?;
let exporter = SnapshotExporter::new(export_path, db_path)?;
exporter.export_snapshot(chunk_size)?;
exporter.export_snapshot(num_chunks)?;

tracing::info!("Succesfully exported snapshot files to \"{directory}\"!");
}
Expand Down
26 changes: 15 additions & 11 deletions src/processor/snapshot/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::path::{Path, PathBuf};

use chrono::offset::Utc;
use ethers::types::U256;
use eyre::Result;
use state_reconstruct_fetcher::constants::ethereum::GENESIS_BLOCK;
use state_reconstruct_storage::{
snapshot::SnapshotDatabase,
snapshot_columns,
Expand Down Expand Up @@ -36,15 +36,19 @@ impl SnapshotExporter {
})
}

pub fn export_snapshot(&self, chunk_size: usize) -> Result<()> {
let l1_batch_number = self.database.get_latest_l1_batch_number()?;
pub fn export_snapshot(&self, num_chunks: usize) -> Result<()> {
let latest_l1_batch_number = self.database.get_latest_l1_batch_number()?;
// L1 batch number is calculated from the batch number where the
// DiamondProxy contract was deployed (`GENESIS_BLOCK`).
let l1_batch_number = latest_l1_batch_number - GENESIS_BLOCK;
let l2_batch_number = self.database.get_latest_l2_batch_number()?;
let mut header = SnapshotHeader {
l1_batch_number,
generated_at: Utc::now(),
l1_batch_number: l1_batch_number.as_u64(),
miniblock_number: l2_batch_number.as_u64(),
..Default::default()
};

self.export_storage_logs(chunk_size, &mut header)?;
self.export_storage_logs(num_chunks, &mut header)?;
self.export_factory_deps(&mut header)?;

let path = self.basedir.join(SNAPSHOT_HEADER_FILE_NAME);
Expand Down Expand Up @@ -91,7 +95,7 @@ impl SnapshotExporter {
Ok(())
}

fn export_storage_logs(&self, chunk_size: usize, header: &mut SnapshotHeader) -> Result<()> {
fn export_storage_logs(&self, num_chunks: usize, header: &mut SnapshotHeader) -> Result<()> {
tracing::info!("Exporting storage logs...");

let num_logs = self.database.get_last_repeated_key_index()?;
Expand All @@ -102,9 +106,9 @@ impl SnapshotExporter {
.database
.iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start);

let total_num_chunks = (num_logs / chunk_size as u64) + 1;
for chunk_id in 0..total_num_chunks {
tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, total_num_chunks);
let chunk_size = num_logs / num_chunks as u64;
for chunk_id in 0..num_chunks {
tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, num_chunks);

let mut chunk = SnapshotStorageLogsChunk::default();
for _ in 0..chunk_size {
Expand All @@ -125,7 +129,7 @@ impl SnapshotExporter {
header
.storage_logs_chunks
.push(SnapshotStorageLogsChunkMetadata {
chunk_id,
chunk_id: chunk_id as u64,
filepath: path
.clone()
.into_os_string()
Expand Down
28 changes: 12 additions & 16 deletions src/processor/snapshot/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use std::{
use ethers::types::U64;
use eyre::Result;
use regex::{Captures, Regex};
use state_reconstruct_fetcher::constants::ethereum::GENESIS_BLOCK;
use state_reconstruct_storage::types::{
Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk,
SnapshotStorageLogsChunkMetadata,
};
use tokio::sync::mpsc::{self, Sender};

use super::{SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME};
use super::SNAPSHOT_HEADER_FILE_NAME;
use crate::processor::tree::tree_wrapper::TreeWrapper;

const SNAPSHOT_CHUNK_REGEX: &str = r"snapshot_l1_batch_(\d*)_storage_logs_part_\d*.proto.gzip";
Expand All @@ -31,23 +32,25 @@ impl SnapshotImporter {
pub async fn run(self, db_path: &Path) -> Result<()> {
let (tx, rx) = mpsc::channel(1);

let header = self.read_header()?;
let _factory_deps = self.read_factory_deps(&header)?;
let header = self.read_header().expect("failed to read header filepath");
let _factory_deps =
Self::read_factory_deps(&header).expect("failed to read factory deps filepath");

// Read storage logs async sending each read one into the tree to process.
tokio::spawn({
let header = header.clone();
async move {
self.read_storage_logs_chunks_async(&header, tx)
Self::read_storage_logs_chunks_async(&header, tx)
.await
.expect("failed to read storage_logs_chunks");
}
});

let l1_batch_number = header.l1_batch_number + GENESIS_BLOCK;
let mut tree = TreeWrapper::new_snapshot_wrapper(db_path)
.await
.expect("can't create tree");
tree.restore_from_snapshot(rx, header.l1_batch_number)
tree.restore_from_snapshot(rx, U64::from(l1_batch_number))
.await?;

Ok(())
Expand All @@ -65,17 +68,13 @@ impl SnapshotImporter {
Ok(header)
}

fn read_factory_deps(&self, header: &SnapshotHeader) -> Result<SnapshotFactoryDependencies> {
let factory_deps_path = self.directory.join(format!(
"snapshot_l1_batch_{}_{}",
header.l1_batch_number, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX
));
fn read_factory_deps(header: &SnapshotHeader) -> Result<SnapshotFactoryDependencies> {
let factory_deps_path = header.factory_deps_filepath.clone();
let bytes = fs::read(factory_deps_path)?;
SnapshotFactoryDependencies::decode(&bytes)
}

async fn read_storage_logs_chunks_async(
&self,
header: &SnapshotHeader,
tx: Sender<SnapshotStorageLogsChunk>,
) -> Result<()> {
Expand All @@ -88,10 +87,7 @@ impl SnapshotImporter {

let total_chunks = filepaths.len();
for (i, path) in filepaths.into_iter().enumerate() {
let factory_deps_path = self
.directory
.join(path.file_name().expect("path has no file name"));
let bytes = fs::read(factory_deps_path)?;
let bytes = fs::read(path)?;
let storage_logs_chunk = SnapshotStorageLogsChunk::decode(&bytes)?;
tracing::info!("Read chunk {}/{}, processing...", i + 1, total_chunks);
tx.send(storage_logs_chunk).await?;
Expand Down Expand Up @@ -151,7 +147,7 @@ impl SnapshotImporter {
}

Ok(SnapshotHeader {
l1_batch_number: l1_batch_number.expect("no l1 batch number found"),
l1_batch_number: l1_batch_number.expect("no l1 batch number found").as_u64(),
storage_logs_chunks,
factory_deps_filepath,
..Default::default()
Expand Down
12 changes: 7 additions & 5 deletions src/processor/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use state_reconstruct_fetcher::{
};
use state_reconstruct_storage::{
bytecode,
types::{MiniblockNumber, SnapshotFactoryDependency, SnapshotStorageLog},
types::{SnapshotFactoryDependency, SnapshotStorageLog},
};
use tokio::sync::mpsc;

Expand All @@ -24,7 +24,7 @@ use super::Processor;
pub const DEFAULT_DB_PATH: &str = "snapshot_db";
pub const SNAPSHOT_HEADER_FILE_NAME: &str = "snapshot-header.json";
pub const SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX: &str = "factory_deps.proto.gzip";
pub const DEFAULT_CHUNK_SIZE: usize = 1_000_000;
pub const DEFAULT_NUM_CHUNKS: usize = 10;

pub struct SnapshotBuilder {
database: SnapshotDatabase,
Expand Down Expand Up @@ -73,7 +73,6 @@ impl Processor for SnapshotBuilder {
.insert_storage_log(&mut SnapshotStorageLog {
key: *key,
value,
miniblock_number_of_initial_write: U64::from(0),
l1_batch_number_of_initial_write: U64::from(
block.l1_block_number.unwrap_or(0),
),
Expand Down Expand Up @@ -121,6 +120,10 @@ impl Processor for SnapshotBuilder {
.expect("failed to save factory dep");
}

let _ = self
.database
.set_latest_l2_batch_number(block.l2_block_number);

if let Some(number) = block.l1_block_number {
let _ = self.database.set_latest_l1_batch_number(number);
};
Expand Down Expand Up @@ -202,7 +205,7 @@ fn reconstruct_genesis_state(database: &mut SnapshotDatabase, path: &str) -> Res

tracing::trace!("Have {} unique keys in the tree", key_set.len());

for (address, key, value, miniblock_number) in batched {
for (address, key, value, _miniblock_number) in batched {
let derived_key = derive_final_address_for_params(&address, &key);
let mut tmp = [0u8; 32];
value.to_big_endian(&mut tmp);
Expand All @@ -213,7 +216,6 @@ fn reconstruct_genesis_state(database: &mut SnapshotDatabase, path: &str) -> Res
database.insert_storage_log(&mut SnapshotStorageLog {
key,
value,
miniblock_number_of_initial_write: MiniblockNumber::from(miniblock_number),
l1_batch_number_of_initial_write: U64::from(ethereum::GENESIS_BLOCK),
enumeration_index: 0,
})?;
Expand Down
3 changes: 1 addition & 2 deletions src/processor/tree/tree_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use zksync_merkle_tree::{Database, MerkleTree, RocksDBWrapper, TreeEntry};
use zksync_storage::{RocksDB, RocksDBOptions};

use super::RootHash;
use crate::processor::snapshot::DEFAULT_CHUNK_SIZE;

#[derive(Error, Debug)]
pub enum TreeError {
Expand Down Expand Up @@ -148,7 +147,7 @@ impl TreeWrapper {
async move {
let mut inner_db = inner_db.lock().await;
while let Some(chunk) = rx.recv().await {
let mut tree_entries = Vec::with_capacity(DEFAULT_CHUNK_SIZE);
let mut tree_entries = Vec::new();

for log in &chunk.storage_logs {
tree_entries.push(TreeEntry::new(
Expand Down
1 change: 1 addition & 0 deletions state-reconstruct-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ethers = "1.0.2"
eyre = "0.6.8"
flate2 = "1.0.28"
serde = { version = "1.0.189", features = ["derive"] }
serde_repr = "0.1.19"
prost = "0.12.4"
rocksdb = "0.21.0"
thiserror = "1.0.50"
Expand Down
10 changes: 7 additions & 3 deletions state-reconstruct-storage/proto/snapshot.proto
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
syntax = "proto3";

package protobuf;
package zksync.types;

message SnapshotStorageLogsChunk {
repeated SnapshotStorageLog storage_logs = 1;
}

message SnapshotStorageLog {
optional bytes account_address = 1; // required; H160
optional bytes storage_key = 2; // required; H256
// `account_address` and `storage_key` fields are obsolete and are not used in the new snapshot format;
// `hashed_key` is used instead. The fields are retained for now to support recovery from old snapshots.
optional bytes account_address = 1; // optional; H160
optional bytes storage_key = 2; // optional; H256
optional bytes hashed_key = 6; // optional; H256

optional bytes storage_value = 3; // required; H256
optional uint32 l1_batch_number_of_initial_write = 4; // required
optional uint64 enumeration_index = 5; // required
Expand Down
2 changes: 2 additions & 0 deletions state-reconstruct-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub mod snapshot_columns {
pub const LAST_REPEATED_KEY_INDEX: &str = "SNAPSHOT_LAST_REPEATED_KEY_INDEX";
/// The latest l1 block number that was processed.
pub const LATEST_L1_BATCH: &str = "SNAPSHOT_LATEST_L1_BATCH";
/// The latest l2 block number that was processed.
pub const LATEST_L2_BATCH: &str = "SNAPSHOT_LATEST_L2_BATCH";
}

// NOTE: This is moved here as a temporary measure to resolve a cyclic dependency issue.
Expand Down
13 changes: 13 additions & 0 deletions state-reconstruct-storage/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ impl SnapshotDatabase {
KEY_TO_INDEX_MAP,
snapshot_columns::STORAGE_LOGS,
snapshot_columns::FACTORY_DEPS,
snapshot_columns::LATEST_L1_BATCH,
snapshot_columns::LATEST_L2_BATCH,
],
)?;

Expand All @@ -56,6 +58,8 @@ impl SnapshotDatabase {
KEY_TO_INDEX_MAP,
snapshot_columns::STORAGE_LOGS,
snapshot_columns::FACTORY_DEPS,
snapshot_columns::LATEST_L1_BATCH,
snapshot_columns::LATEST_L2_BATCH,
],
false,
)?;
Expand Down Expand Up @@ -159,6 +163,15 @@ impl SnapshotDatabase {
self.set_metadata_value(snapshot_columns::LATEST_L1_BATCH, number)
}

pub fn get_latest_l2_batch_number(&self) -> Result<U64> {
self.get_metadata_value(snapshot_columns::LATEST_L2_BATCH)
.map(U64::from)
}

pub fn set_latest_l2_batch_number(&self, number: u64) -> Result<()> {
self.set_metadata_value(snapshot_columns::LATEST_L2_BATCH, number)
}

pub fn get_last_repeated_key_index(&self) -> Result<u64> {
self.get_metadata_value(snapshot_columns::LAST_REPEATED_KEY_INDEX)
}
Expand Down
Loading

0 comments on commit 6be1288

Please sign in to comment.