From 435818ec34a1c39665c86ced99b643b416c5e847 Mon Sep 17 00:00:00 2001 From: Renato Dinhani Date: Thu, 23 May 2024 19:03:13 -0300 Subject: [PATCH] fix: unify how to detemrine the block to start import job --- src/bin/importer_offline.rs | 23 +--------------------- src/bin/importer_online.rs | 6 +----- src/eth/storage/stratus_storage.rs | 31 ++++++++++++++++++++++++++++++ 3 files changed, 33 insertions(+), 27 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 8d2baa1d1..7aa0065da 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -10,7 +10,6 @@ use itertools::Itertools; use stratus::config::ImporterOfflineConfig; use stratus::eth::primitives::Block; use stratus::eth::primitives::BlockNumber; -use stratus::eth::primitives::BlockSelection; use stratus::eth::primitives::ExternalBlock; use stratus::eth::primitives::ExternalReceipt; use stratus::eth::primitives::ExternalReceipts; @@ -20,7 +19,6 @@ use stratus::eth::storage::InMemoryPermanentStorage; use stratus::eth::storage::StratusStorage; use stratus::eth::BlockMiner; use stratus::eth::Executor; -use stratus::ext::not; use stratus::GlobalServices; use stratus::GlobalState; use tokio::runtime::Handle; @@ -55,7 +53,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { // init block range let block_start = match config.block_start { Some(start) => BlockNumber::from(start), - None => block_number_to_start(&storage).await?, + None => storage.read_block_number_to_resume_import().await?, }; let block_end = match config.block_end { Some(end) => BlockNumber::from(end), @@ -238,25 +236,6 @@ async fn load_blocks_and_receipts(rpc_storage: Arc, star try_join!(blocks_task, receipts_task) } -// Finds the block number to start the import job. -async fn block_number_to_start(storage: &StratusStorage) -> anyhow::Result { - // when has an active number, resume from it because it was not imported yet. - let active_number = storage.read_active_block_number().await?; - if let Some(active_number) = active_number { - return Ok(active_number); - } - - // fallback to last mined block - // if mined is zero, we need to check if we have the zero block or not to decide if we start from zero or the next. - // if mined is not zero, then can assume it is the next number after it. - let mut mined_number = storage.read_mined_block_number().await?; - let zero_block = storage.read_block(&BlockSelection::Number(BlockNumber::ZERO)).await?; - if not(mined_number.is_zero()) || zero_block.is_some() { - mined_number = mined_number.next(); - } - Ok(mined_number) -} - // Finds the block number to stop the import job. async fn block_number_to_stop(rpc_storage: &Arc) -> anyhow::Result { match rpc_storage.read_max_block_number_in_range(BlockNumber::ZERO, BlockNumber::MAX).await { diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index cecd72149..bcfe2497f 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -88,11 +88,7 @@ pub async fn run_importer_online( chain: Arc, sync_interval: Duration, ) -> anyhow::Result<()> { - // start from last imported block - let mut number = storage.read_mined_block_number().await?; - if number != BlockNumber::from(0) { - number = number.next(); - } + let number = storage.read_block_number_to_resume_import().await?; let (backlog_tx, backlog_rx) = mpsc::unbounded_channel(); diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index d6627b05b..47608d217 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -69,6 +69,37 @@ impl StratusStorage { // Block number // ------------------------------------------------------------------------- + #[tracing::instrument(skip_all)] + pub async fn read_block_number_to_resume_import(&self) -> anyhow::Result { + // if does not have the zero block present, should resume from zero + let zero = self.read_block(&BlockSelection::Number(BlockNumber::ZERO)).await?; + if zero.is_none() { + tracing::info!(number = %0, "block number to resume is ZERO because it does not exist in the storage"); + return Ok(BlockNumber::ZERO); + } + + // try to resume from active block number + let active_number = self.read_active_block_number().await?; + if let Some(active_number) = active_number { + tracing::info!(number = %active_number, "block number to resume is ACTIVE because it set in the storage"); + return Ok(active_number); + } + + // fallback to last mined block number + let mined_number = self.read_mined_block_number().await?; + let mined_block = self.read_block(&BlockSelection::Number(mined_number)).await?; + match mined_block { + Some(_) => { + tracing::info!(number = %mined_number, "block number to resume is MINED + 1 because it set in the storage and the block exist"); + Ok(mined_number.next()) + } + None => { + tracing::info!(number = %mined_number, "block number to resume is MINED because it is set in the storage but the block does not exist"); + Ok(mined_number) + } + } + } + #[tracing::instrument(skip_all)] pub async fn read_active_block_number(&self) -> anyhow::Result> { #[cfg(feature = "metrics")]