Skip to content

Commit

Permalink
feat: snapshot imports (#79)
Browse files Browse the repository at this point in the history
* ref(snapshot): move exporter out of `mod.rs`

* feat(snapshot): import cli arguments

* deps: remove `deflate` in favor of `flate2`

* feat(snapshot): decode header and factory deps

* feat(snapshot): restore state tree from snapshot

* ref(snapshot): make `snapshot` a cli argument instead

* feat(snapshot): read filepaths from header

* ref(snapshot): modify file names to match upstream

* fixup!

* tweak(snapshot): log a message on successful export

* chore: cargo fmt

* chore: remove `#!allow[warnings]` attribute
  • Loading branch information
zeapoz authored Apr 3, 2024
1 parent 92c219c commit 476621c
Show file tree
Hide file tree
Showing 11 changed files with 361 additions and 216 deletions.
27 changes: 1 addition & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ blobscan-client = { path = "./state-reconstruct-fetcher/blobscan-client" }
bytes = "1.5"
chrono = "0.4.31"
clap = { version = "4.4.7", features = ["derive", "env"] }
deflate = { version = "1.0.0", features = ["gzip"] }
ethers = "1.0.2"
eyre = "0.6.8"
flate2 = "1.0.28"
hex = "0.4.3"
indexmap = { version = "2.0.2" }
primitive-types = "0.12.2"
Expand Down
4 changes: 4 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ pub enum Command {
/// The path to the storage solution.
#[arg(short, long, env = "ZK_SYNC_DB_PATH")]
db_path: Option<String>,
/// If present, try to restore state from snapshot files contained in the specified
/// directory. Note that this will only work when supplied with a fresh database.
#[arg(long)]
snapshot: Option<String>,
},

/// Query the local storage, and optionally, return a JSON-payload of the data.
Expand Down
21 changes: 18 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use std::{
use clap::Parser;
use cli::{Cli, Command, ReconstructSource};
use eyre::Result;
use processor::snapshot::{SnapshotBuilder, SnapshotExporter};
use processor::snapshot::{
exporter::SnapshotExporter, importer::SnapshotImporter, SnapshotBuilder,
};
use state_reconstruct_fetcher::{constants::storage, l1_fetcher::L1Fetcher, types::CommitBlock};
use tikv_jemallocator::Jemalloc;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -59,12 +61,23 @@ async fn main() -> Result<()> {
let cli = Cli::parse();

match cli.subcommand {
Command::Reconstruct { source, db_path } => {
Command::Reconstruct {
source,
db_path,
snapshot,
} => {
let db_path = match db_path {
Some(path) => PathBuf::from(path),
None => env::current_dir()?.join(storage::DEFAULT_DB_NAME),
};

if let Some(directory) = snapshot {
tracing::info!("Trying to restore state from snapshot...");
let importer =
SnapshotImporter::new(PathBuf::from(directory), &db_path.clone()).await?;
importer.run().await?;
}

match source {
ReconstructSource::L1 { l1_fetcher_options } => {
let fetcher_options = l1_fetcher_options.into();
Expand Down Expand Up @@ -159,8 +172,10 @@ async fn main() -> Result<()> {
} => {
let export_path = Path::new(&directory);
std::fs::create_dir_all(export_path)?;
let exporter = SnapshotExporter::new(export_path, db_path);
let exporter = SnapshotExporter::new(export_path, db_path)?;
exporter.export_snapshot(chunk_size)?;

tracing::info!("Succesfully exported snapshot files to \"{directory}\"!");
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/processor/snapshot/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl SnapshotDB {
idx_bytes[7],
])
} else {
self.put_cf(metadata, LAST_REPEATED_KEY_INDEX, u64::to_be_bytes(1))?;
self.put_cf(metadata, LAST_REPEATED_KEY_INDEX, u64::to_be_bytes(0))?;
0
},
)
Expand Down
187 changes: 187 additions & 0 deletions src/processor/snapshot/exporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use std::{
io::Write,
path::{Path, PathBuf},
};

use bytes::BytesMut;
use eyre::Result;
use flate2::{write::GzEncoder, Compression};
use prost::Message;

use super::{
database::{self, SnapshotDB},
types::{self, SnapshotFactoryDependency, SnapshotHeader},
DEFAULT_DB_PATH, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME,
};

pub mod protobuf {
include!(concat!(env!("OUT_DIR"), "/protobuf.rs"));
}

pub struct SnapshotExporter {
basedir: PathBuf,
database: SnapshotDB,
}

impl SnapshotExporter {
pub fn new(basedir: &Path, db_path: Option<String>) -> Result<Self> {
let db_path = match db_path {
Some(p) => PathBuf::from(p),
None => PathBuf::from(DEFAULT_DB_PATH),
};

let database = SnapshotDB::new_read_only(db_path)?;
Ok(Self {
basedir: basedir.to_path_buf(),
database,
})
}

pub fn export_snapshot(&self, chunk_size: u64) -> Result<()> {
let mut header = SnapshotHeader::default();
self.export_storage_logs(chunk_size, &mut header)?;
self.export_factory_deps(&mut header)?;

let path = self.basedir.join(SNAPSHOT_HEADER_FILE_NAME);
let outfile = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)?;

serde_json::to_writer(outfile, &header)?;

Ok(())
}

fn export_factory_deps(&self, header: &mut SnapshotHeader) -> Result<()> {
let mut buf = BytesMut::new();

let storage_logs = self.database.cf_handle(database::FACTORY_DEPS).unwrap();
let mut iterator = self
.database
.iterator_cf(storage_logs, rocksdb::IteratorMode::Start);

let mut factory_deps = protobuf::SnapshotFactoryDependencies::default();
while let Some(Ok((_, bs))) = iterator.next() {
let factory_dep: SnapshotFactoryDependency = bincode::deserialize(&bs)?;
factory_deps
.factory_deps
.push(protobuf::SnapshotFactoryDependency {
bytecode: Some(factory_dep.bytecode),
});
}

let fd_len = factory_deps.encoded_len();
if buf.capacity() < fd_len {
buf.reserve(fd_len - buf.capacity());
}

let path = self.basedir.join(format!(
"snapshot_l1_batch_{}_{}",
header.l1_batch_number, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX
));
header.factory_deps_filepath = path
.clone()
.into_os_string()
.into_string()
.expect("path to string");

// Serialize chunk.
factory_deps.encode(&mut buf)?;

let outfile = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)?;

// Wrap in gzip compression before writing.
let mut encoder = GzEncoder::new(outfile, Compression::default());
encoder.write_all(&buf)?;
encoder.finish()?;

Ok(())
}

fn export_storage_logs(&self, chunk_size: u64, header: &mut SnapshotHeader) -> Result<()> {
let mut buf = BytesMut::new();
let mut chunk_id = 0;

let index_to_key_map = self.database.cf_handle(database::INDEX_TO_KEY_MAP).unwrap();
let mut iterator = self
.database
.iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start);

let mut has_more = true;

while has_more {
let mut chunk = protobuf::SnapshotStorageLogsChunk {
storage_logs: vec![],
};

for _ in 0..chunk_size {
if let Some(Ok((_, key))) = iterator.next() {
if let Ok(Some(entry)) = self.database.get_storage_log(key.as_ref()) {
let pb = protobuf::SnapshotStorageLog {
account_address: None,
storage_key: Some(key.to_vec()),
storage_value: Some(entry.value.0.to_vec()),
l1_batch_number_of_initial_write: Some(
entry.l1_batch_number_of_initial_write.as_u32(),
),
enumeration_index: Some(entry.enumeration_index),
};

chunk.storage_logs.push(pb);
header.l1_batch_number = entry.l1_batch_number_of_initial_write;
}
} else {
has_more = false;
}
}

// Ensure that write buffer has enough capacity.
let chunk_len = chunk.encoded_len();
if buf.capacity() < chunk_len {
buf.reserve(chunk_len - buf.capacity());
}

let path = PathBuf::new().join(&self.basedir).join(format!(
"snapshot_l1_batch_{}_storage_logs_part_{:0>4}.proto.gzip",
header.l1_batch_number, chunk_id
));
chunk_id += 1;

header
.storage_logs_chunks
.push(types::SnapshotStorageLogsChunkMetadata {
chunk_id,
filepath: path
.clone()
.into_os_string()
.into_string()
.expect("path to string"),
});

// Serialize chunk.
chunk.encode(&mut buf)?;

let outfile = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)?;

// Wrap in gzip compression before writing.
let mut encoder = GzEncoder::new(outfile, Compression::default());
encoder.write_all(&buf)?;
encoder.finish()?;

// Clear $tmp buffer.
buf.truncate(0);
}

Ok(())
}
}
Loading

0 comments on commit 476621c

Please sign in to comment.