From 24536742a869a474358f9d97d5e0fe83266341b5 Mon Sep 17 00:00:00 2001 From: Jonathan <94441036+zeapoz@users.noreply.github.com> Date: Fri, 3 May 2024 12:37:42 +0200 Subject: [PATCH] tweak(snapshots): process and extend snapshot imports as separate tasks (#99) --- src/processor/tree/tree_wrapper.rs | 40 ++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/src/processor/tree/tree_wrapper.rs b/src/processor/tree/tree_wrapper.rs index c68bcef..99503e2 100644 --- a/src/processor/tree/tree_wrapper.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -11,7 +11,10 @@ use state_reconstruct_storage::{ reconstruction::ReconstructionDatabase, types::SnapshotStorageLogsChunk, PackingType, }; use thiserror::Error; -use tokio::sync::{mpsc::Receiver, Mutex}; +use tokio::sync::{ + mpsc::{self, Receiver}, + Mutex, +}; use zksync_merkle_tree::{Database, MerkleTree, RocksDBWrapper, TreeEntry}; use zksync_storage::{RocksDB, RocksDBOptions}; @@ -137,18 +140,32 @@ impl TreeWrapper { mut rx: Receiver, l1_batch_number: U64, ) -> Result<()> { - let mut inner_db = self.inner_db.lock().await; let mut total_tree_entries = 0; - let mut i = 0; - while let Some(chunk) = rx.recv().await { - let mut tree_entries = Vec::with_capacity(DEFAULT_CHUNK_SIZE); - - for log in &chunk.storage_logs { - tree_entries.push(TreeEntry::new(log.key, log.enumeration_index, log.value)); - inner_db.add_key(&log.key).expect("cannot add key"); + let (tree_tx, mut tree_rx) = mpsc::channel(1); + tokio::spawn({ + let inner_db = self.inner_db.clone(); + async move { + let mut inner_db = inner_db.lock().await; + while let Some(chunk) = rx.recv().await { + let mut tree_entries = Vec::with_capacity(DEFAULT_CHUNK_SIZE); + + for log in &chunk.storage_logs { + tree_entries.push(TreeEntry::new( + log.key, + log.enumeration_index, + log.value, + )); + inner_db.add_key(&log.key).expect("cannot add key"); + } + + tree_tx.send(tree_entries).await.expect("receiver dropped"); + } } + }); + let mut i = 0; + while let Some(tree_entries) = tree_rx.recv().await { total_tree_entries += tree_entries.len(); self.tree.extend(tree_entries); @@ -160,7 +177,10 @@ impl TreeWrapper { "Succesfully imported snapshot containing {total_tree_entries} storage logs!", ); - inner_db.set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?; + self.inner_db + .lock() + .await + .set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?; Ok(()) }