Skip to content

Commit

Permalink
feat: add state to prepare snapshot command (#83)
Browse files Browse the repository at this point in the history
* feat(snapshot): store state when preparing snapshot

This allows the prepare snapshot command to resume from the same point
after a restart.

* feat(snapshot): add progress tracing when importing

* chore: remove redundant line
  • Loading branch information
zeapoz authored Apr 10, 2024
1 parent 2635d3a commit 6f7f973
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 8 deletions.
20 changes: 17 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ use eyre::Result;
use processor::snapshot::{
exporter::SnapshotExporter, importer::SnapshotImporter, SnapshotBuilder,
};
use state_reconstruct_fetcher::{constants::storage, l1_fetcher::L1Fetcher, types::CommitBlock};
use state_reconstruct_fetcher::{
constants::{ethereum, storage},
l1_fetcher::{L1Fetcher, L1FetcherOptions},
types::CommitBlock,
};
use tikv_jemallocator::Jemalloc;
use tokio::sync::mpsc;
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
Expand Down Expand Up @@ -153,10 +157,20 @@ async fn main() -> Result<()> {
l1_fetcher_options,
db_path,
} => {
let fetcher_options = l1_fetcher_options.into();
let fetcher = L1Fetcher::new(fetcher_options, None)?;
let processor = SnapshotBuilder::new(db_path);

let mut fetcher_options: L1FetcherOptions = l1_fetcher_options.into();
if let Ok(Some(batch_number)) = processor.get_last_l1_batch_number() {
if batch_number > ethereum::GENESIS_BLOCK {
tracing::info!(
"Found a preexisting snapshot db, continuing from L1 block: {batch_number}"
);
fetcher_options.start_block = batch_number + 1;
}
}

let fetcher = L1Fetcher::new(fetcher_options, None)?;

let (tx, rx) = mpsc::channel::<CommitBlock>(5);
let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
Expand Down
22 changes: 22 additions & 0 deletions src/processor/snapshot/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub const FACTORY_DEPS: &str = "factory_deps";
const METADATA: &str = "metadata";

const LAST_REPEATED_KEY_INDEX: &str = "LAST_REPEATED_KEY_INDEX";
const LAST_L1_BATCH_NUMBER: &str = "LAST_L1_BATCH_NUMBER";

#[allow(clippy::enum_variant_names)]
#[derive(Error, Debug)]
Expand Down Expand Up @@ -118,6 +119,27 @@ impl SnapshotDB {
.map_err(Into::into)
}

pub fn get_last_l1_batch_number(&self) -> Result<Option<u64>> {
// Unwrapping column family handle here is safe because presence of
// those CFs is ensured in construction of this DB.
let metadata = self.cf_handle(METADATA).unwrap();
let batch = self.get_cf(metadata, LAST_L1_BATCH_NUMBER)?.map(|bytes| {
u64::from_be_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
])
});

Ok(batch)
}

pub fn set_last_l1_batch_number(&self, batch_number: u64) -> Result<()> {
// Unwrapping column family handle here is safe because presence of
// those CFs is ensured in construction of this DB.
let metadata = self.cf_handle(METADATA).unwrap();
self.put_cf(metadata, LAST_L1_BATCH_NUMBER, batch_number.to_be_bytes())
.map_err(Into::into)
}

pub fn get_storage_log(&self, key: &[u8]) -> Result<Option<SnapshotStorageLog>> {
// Unwrapping column family handle here is safe because presence of
// those CFs is ensured in construction of this DB.
Expand Down
16 changes: 13 additions & 3 deletions src/processor/snapshot/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
};

use bytes::BytesMut;
use ethers::types::U64;
use eyre::Result;
use flate2::{write::GzEncoder, Compression};
use prost::Message;
Expand Down Expand Up @@ -38,7 +39,17 @@ impl SnapshotExporter {
}

pub fn export_snapshot(&self, chunk_size: u64) -> Result<()> {
let mut header = SnapshotHeader::default();
let l1_batch_number = U64::from(
self.database
.get_last_l1_batch_number()?
.expect("snapshot db contains no L1 batch number"),
);

let mut header = SnapshotHeader {
l1_batch_number,
..Default::default()
};

self.export_storage_logs(chunk_size, &mut header)?;
self.export_factory_deps(&mut header)?;

Expand Down Expand Up @@ -146,7 +157,6 @@ impl SnapshotExporter {
};

chunk.storage_logs.push(pb);
header.l1_batch_number = entry.l1_batch_number_of_initial_write;
}
} else {
has_more = false;
Expand All @@ -159,7 +169,7 @@ impl SnapshotExporter {
buf.reserve(chunk_len - buf.capacity());
}

let path = PathBuf::new().join(&self.basedir).join(format!(
let path = &self.basedir.join(format!(
"snapshot_l1_batch_{}_storage_logs_part_{:0>4}.proto.gzip",
header.l1_batch_number, chunk_id
));
Expand Down
9 changes: 9 additions & 0 deletions src/processor/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ impl SnapshotBuilder {

Self { database }
}

// Gets the next L1 batch number to be processed for ues in state recovery.
pub fn get_last_l1_batch_number(&self) -> Result<Option<u64>> {
self.database.get_last_l1_batch_number()
}
}

#[async_trait]
Expand Down Expand Up @@ -109,6 +114,10 @@ impl Processor for SnapshotBuilder {
})
.expect("failed to save factory dep");
}

if let Some(number) = block.l1_block_number {
let _ = self.database.set_last_l1_batch_number(number);
};
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/processor/tree/tree_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ impl TreeWrapper {
) -> Result<()> {
let mut tree_entries = Vec::new();

for chunk in &chunks {
for (i, chunk) in chunks.iter().enumerate() {
tracing::info!("Importing chunk {}/{}...", i + 1, chunks.len());

for log in &chunk.storage_logs {
let key = U256::from_big_endian(log.storage_key());
let index = log.enumeration_index();
Expand All @@ -146,15 +148,18 @@ impl TreeWrapper {
.add_key(&key)
.expect("cannot add key");
}

tracing::info!("Chunk {} was succesfully imported!", i + 1);
}

tracing::info!("Extending merkle tree with imported storage logs...");
let num_tree_entries = tree_entries.len();
self.tree.extend(tree_entries);

tracing::info!("Succesfully imported snapshot containing {num_tree_entries} storage logs!",);

let snapshot = self.snapshot.lock().await;
snapshot.set_latest_l1_block_number(l1_batch_number.as_u64())?;
snapshot.set_latest_l1_block_number(l1_batch_number.as_u64() + 1)?;

Ok(())
}
Expand Down

0 comments on commit 6f7f973

Please sign in to comment.