Skip to content

Commit

Permalink
tweak(snapshots): process and extend snapshot imports as separate tas…
Browse files Browse the repository at this point in the history
…ks (#99)
  • Loading branch information
zeapoz authored May 3, 2024
1 parent 77d7bc9 commit 2453674
Showing 1 changed file with 30 additions and 10 deletions.
40 changes: 30 additions & 10 deletions src/processor/tree/tree_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use state_reconstruct_storage::{
reconstruction::ReconstructionDatabase, types::SnapshotStorageLogsChunk, PackingType,
};
use thiserror::Error;
use tokio::sync::{mpsc::Receiver, Mutex};
use tokio::sync::{
mpsc::{self, Receiver},
Mutex,
};
use zksync_merkle_tree::{Database, MerkleTree, RocksDBWrapper, TreeEntry};
use zksync_storage::{RocksDB, RocksDBOptions};

Expand Down Expand Up @@ -137,18 +140,32 @@ impl TreeWrapper {
mut rx: Receiver<SnapshotStorageLogsChunk>,
l1_batch_number: U64,
) -> Result<()> {
let mut inner_db = self.inner_db.lock().await;
let mut total_tree_entries = 0;

let mut i = 0;
while let Some(chunk) = rx.recv().await {
let mut tree_entries = Vec::with_capacity(DEFAULT_CHUNK_SIZE);

for log in &chunk.storage_logs {
tree_entries.push(TreeEntry::new(log.key, log.enumeration_index, log.value));
inner_db.add_key(&log.key).expect("cannot add key");
let (tree_tx, mut tree_rx) = mpsc::channel(1);
tokio::spawn({
let inner_db = self.inner_db.clone();
async move {
let mut inner_db = inner_db.lock().await;
while let Some(chunk) = rx.recv().await {
let mut tree_entries = Vec::with_capacity(DEFAULT_CHUNK_SIZE);

for log in &chunk.storage_logs {
tree_entries.push(TreeEntry::new(
log.key,
log.enumeration_index,
log.value,
));
inner_db.add_key(&log.key).expect("cannot add key");
}

tree_tx.send(tree_entries).await.expect("receiver dropped");
}
}
});

let mut i = 0;
while let Some(tree_entries) = tree_rx.recv().await {
total_tree_entries += tree_entries.len();
self.tree.extend(tree_entries);

Expand All @@ -160,7 +177,10 @@ impl TreeWrapper {
"Succesfully imported snapshot containing {total_tree_entries} storage logs!",
);

inner_db.set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?;
self.inner_db
.lock()
.await
.set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?;

Ok(())
}
Expand Down

0 comments on commit 2453674

Please sign in to comment.