Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tweak(snapshots): process each chunk separately #98

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ pub enum Command {
#[arg(short, long, default_value = snapshot::DEFAULT_DB_PATH)]
db_path: Option<String>,
/// Number of storage logs to stuff into one chunk.
#[arg(short, long, default_value_t = 1_000_000)]
chunk_size: u64,
#[arg(short, long, default_value_t = snapshot::DEFAULT_CHUNK_SIZE)]
chunk_size: usize,
/// The directory to export the snapshot files to.
directory: String,
},
Expand Down
5 changes: 2 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ async fn main() -> Result<()> {

if let Some(directory) = snapshot {
tracing::info!("Trying to restore state from snapshot...");
let importer =
SnapshotImporter::new(PathBuf::from(directory), &db_path.clone()).await?;
importer.run().await?;
let importer = SnapshotImporter::new(PathBuf::from(directory));
importer.run(&db_path.clone()).await?;
}

match source {
Expand Down
6 changes: 3 additions & 3 deletions src/processor/snapshot/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl SnapshotExporter {
})
}

pub fn export_snapshot(&self, chunk_size: u64) -> Result<()> {
pub fn export_snapshot(&self, chunk_size: usize) -> Result<()> {
let l1_batch_number = self.database.get_latest_l1_batch_number()?;
let mut header = SnapshotHeader {
l1_batch_number,
Expand Down Expand Up @@ -91,7 +91,7 @@ impl SnapshotExporter {
Ok(())
}

fn export_storage_logs(&self, chunk_size: u64, header: &mut SnapshotHeader) -> Result<()> {
fn export_storage_logs(&self, chunk_size: usize, header: &mut SnapshotHeader) -> Result<()> {
tracing::info!("Exporting storage logs...");

let num_logs = self.database.get_last_repeated_key_index()?;
Expand All @@ -102,7 +102,7 @@ impl SnapshotExporter {
.database
.iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start);

let total_num_chunks = (num_logs / chunk_size) + 1;
let total_num_chunks = (num_logs / chunk_size as u64) + 1;
for chunk_id in 0..total_num_chunks {
tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, total_num_chunks);

Expand Down
62 changes: 34 additions & 28 deletions src/processor/snapshot/importer.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
use std::{
fs::{self, DirEntry},
path::{Path, PathBuf},
sync::Arc,
};

use ethers::types::U64;
use eyre::Result;
use regex::{Captures, Regex};
use state_reconstruct_fetcher::constants::storage::INNER_DB_NAME;
use state_reconstruct_storage::{
reconstruction::ReconstructionDatabase,
types::{
Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk,
SnapshotStorageLogsChunkMetadata,
},
use state_reconstruct_storage::types::{
Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk,
SnapshotStorageLogsChunkMetadata,
};
use tokio::sync::Mutex;
use tokio::sync::mpsc::{self, Sender};

use super::{SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME};
use crate::processor::tree::tree_wrapper::TreeWrapper;
Expand All @@ -26,28 +21,36 @@ const FACTORY_DEPS_REGEX: &str = r"snapshot_l1_batch_(\d*)_factory_deps.proto.gz
pub struct SnapshotImporter {
// The path of the directory where snapshot chunks are stored.
directory: PathBuf,
// The tree to import state to.
tree: TreeWrapper,
}

impl SnapshotImporter {
pub async fn new(directory: PathBuf, db_path: &Path) -> Result<Self> {
let inner_db_path = db_path.join(INNER_DB_NAME);
let new_state = ReconstructionDatabase::new(inner_db_path.clone())?;
let snapshot = Arc::new(Mutex::new(new_state));
let tree = TreeWrapper::new(db_path, snapshot.clone(), true).await?;

Ok(Self { directory, tree })
pub fn new(directory: PathBuf) -> Self {
Self { directory }
}

pub async fn run(mut self) -> Result<()> {
pub async fn run(self, db_path: &Path) -> Result<()> {
let (tx, rx) = mpsc::channel(1);

let header = self.read_header()?;
let _factory_deps = self.read_factory_deps(&header)?;
let storage_logs_chunk = self.read_storage_logs_chunks(&header)?;

self.tree
.restore_from_snapshot(storage_logs_chunk, header.l1_batch_number)
// Read storage logs async sending each read one into the tree to process.
tokio::spawn({
let header = header.clone();
async move {
self.read_storage_logs_chunks_async(&header, tx)
.await
.expect("failed to read storage_logs_chunks");
}
});

let mut tree = TreeWrapper::new_snapshot_wrapper(db_path)
.await
.expect("can't create tree");
tree.restore_from_snapshot(rx, header.l1_batch_number)
.await?;

Ok(())
}

fn read_header(&self) -> Result<SnapshotHeader> {
Expand All @@ -71,27 +74,30 @@ impl SnapshotImporter {
SnapshotFactoryDependencies::decode(&bytes)
}

fn read_storage_logs_chunks(
async fn read_storage_logs_chunks_async(
&self,
header: &SnapshotHeader,
) -> Result<Vec<SnapshotStorageLogsChunk>> {
tx: Sender<SnapshotStorageLogsChunk>,
) -> Result<()> {
// NOTE: I think these are sorted by default, but if not, we need to sort them
// before extracting the filepaths.
let filepaths = header
.storage_logs_chunks
.iter()
.map(|meta| PathBuf::from(&meta.filepath));

let mut chunks = Vec::with_capacity(filepaths.len());
for path in filepaths {
let total_chunks = filepaths.len();
for (i, path) in filepaths.into_iter().enumerate() {
let factory_deps_path = self
.directory
.join(path.file_name().expect("path has no file name"));
let bytes = fs::read(factory_deps_path)?;
let storage_logs_chunk = SnapshotStorageLogsChunk::decode(&bytes)?;
chunks.push(storage_logs_chunk);
tracing::info!("Read chunk {}/{}, processing...", i + 1, total_chunks);
tx.send(storage_logs_chunk).await?;
}
Ok(chunks)

Ok(())
}

fn infer_header_from_file_names(&self) -> Result<SnapshotHeader> {
Expand Down
1 change: 1 addition & 0 deletions src/processor/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::Processor;
pub const DEFAULT_DB_PATH: &str = "snapshot_db";
pub const SNAPSHOT_HEADER_FILE_NAME: &str = "snapshot-header.json";
pub const SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX: &str = "factory_deps.proto.gzip";
pub const DEFAULT_CHUNK_SIZE: usize = 1_000_000;

pub struct SnapshotBuilder {
database: SnapshotDatabase,
Expand Down
34 changes: 21 additions & 13 deletions src/processor/tree/tree_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ use std::{collections::HashMap, fs, num::NonZeroU32, path::Path, str::FromStr, s
use blake2::{Blake2s256, Digest};
use ethers::types::{Address, H256, U256, U64};
use eyre::Result;
use state_reconstruct_fetcher::{constants::storage::INITAL_STATE_PATH, types::CommitBlock};
use state_reconstruct_fetcher::{
constants::storage::{INITAL_STATE_PATH, INNER_DB_NAME},
types::CommitBlock,
};
use state_reconstruct_storage::{
reconstruction::ReconstructionDatabase, types::SnapshotStorageLogsChunk, PackingType,
};
use thiserror::Error;
use tokio::sync::Mutex;
use tokio::sync::{mpsc::Receiver, Mutex};
use zksync_merkle_tree::{Database, MerkleTree, RocksDBWrapper, TreeEntry};
use zksync_storage::{RocksDB, RocksDBOptions};

use super::RootHash;
use crate::processor::snapshot::DEFAULT_CHUNK_SIZE;

#[derive(Error, Debug)]
pub enum TreeError {
Expand Down Expand Up @@ -54,6 +58,13 @@ impl TreeWrapper {
})
}

pub async fn new_snapshot_wrapper(db_path: &Path) -> Result<Self> {
let inner_db_path = db_path.join(INNER_DB_NAME);
let new_state = ReconstructionDatabase::new(inner_db_path.clone())?;
let snapshot = Arc::new(Mutex::new(new_state));
Self::new(db_path, snapshot.clone(), true).await
}

/// Inserts a block into the tree and returns the root hash of the resulting state tree.
pub async fn insert_block(&mut self, block: &CommitBlock) -> Result<RootHash> {
self.clear_known_base();
Expand Down Expand Up @@ -123,36 +134,33 @@ impl TreeWrapper {

pub async fn restore_from_snapshot(
&mut self,
chunks: Vec<SnapshotStorageLogsChunk>,
mut rx: Receiver<SnapshotStorageLogsChunk>,
l1_batch_number: U64,
) -> Result<()> {
let mut inner_db = self.inner_db.lock().await;
let mut total_tree_entries = 0;
for (i, chunk) in chunks.iter().enumerate() {
let mut tree_entries = Vec::new();

tracing::info!("Importing chunk {}/{}...", i + 1, chunks.len());
let mut i = 0;
while let Some(chunk) = rx.recv().await {
let mut tree_entries = Vec::with_capacity(DEFAULT_CHUNK_SIZE);

for log in &chunk.storage_logs {
tree_entries.push(TreeEntry::new(log.key, log.enumeration_index, log.value));
self.inner_db
.lock()
.await
.add_key(&log.key)
.expect("cannot add key");
inner_db.add_key(&log.key).expect("cannot add key");
}

total_tree_entries += tree_entries.len();
self.tree.extend(tree_entries);

tracing::info!("Chunk {} was succesfully imported!", i + 1);
i += 1;
}

tracing::info!(
"Succesfully imported snapshot containing {total_tree_entries} storage logs!",
);

let db = self.inner_db.lock().await;
db.set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?;
inner_db.set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?;

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions state-reconstruct-storage/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub trait Proto {
}
}

#[derive(Default, Debug, Serialize, Deserialize)]
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct SnapshotHeader {
pub l1_batch_number: L1BatchNumber,
pub miniblock_number: MiniblockNumber,
Expand All @@ -85,7 +85,7 @@ pub struct SnapshotHeader {
pub generated_at: DateTime<Utc>,
}

#[derive(Default, Debug, Serialize, Deserialize)]
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct SnapshotStorageLogsChunkMetadata {
pub chunk_id: u64,
// can be either a gs or filesystem path
Expand Down