Skip to content

Commit

Permalink
fix: unify how to detemrine the block to start import job
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw committed May 23, 2024
1 parent 50491bf commit 435818e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 27 deletions.
23 changes: 1 addition & 22 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use itertools::Itertools;
use stratus::config::ImporterOfflineConfig;
use stratus::eth::primitives::Block;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::BlockSelection;
use stratus::eth::primitives::ExternalBlock;
use stratus::eth::primitives::ExternalReceipt;
use stratus::eth::primitives::ExternalReceipts;
Expand All @@ -20,7 +19,6 @@ use stratus::eth::storage::InMemoryPermanentStorage;
use stratus::eth::storage::StratusStorage;
use stratus::eth::BlockMiner;
use stratus::eth::Executor;
use stratus::ext::not;
use stratus::GlobalServices;
use stratus::GlobalState;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -55,7 +53,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
// init block range
let block_start = match config.block_start {
Some(start) => BlockNumber::from(start),
None => block_number_to_start(&storage).await?,
None => storage.read_block_number_to_resume_import().await?,
};
let block_end = match config.block_end {
Some(end) => BlockNumber::from(end),
Expand Down Expand Up @@ -238,25 +236,6 @@ async fn load_blocks_and_receipts(rpc_storage: Arc<dyn ExternalRpcStorage>, star
try_join!(blocks_task, receipts_task)
}

// Finds the block number to start the import job.
async fn block_number_to_start(storage: &StratusStorage) -> anyhow::Result<BlockNumber> {
// when has an active number, resume from it because it was not imported yet.
let active_number = storage.read_active_block_number().await?;
if let Some(active_number) = active_number {
return Ok(active_number);
}

// fallback to last mined block
// if mined is zero, we need to check if we have the zero block or not to decide if we start from zero or the next.
// if mined is not zero, then can assume it is the next number after it.
let mut mined_number = storage.read_mined_block_number().await?;
let zero_block = storage.read_block(&BlockSelection::Number(BlockNumber::ZERO)).await?;
if not(mined_number.is_zero()) || zero_block.is_some() {
mined_number = mined_number.next();
}
Ok(mined_number)
}

// Finds the block number to stop the import job.
async fn block_number_to_stop(rpc_storage: &Arc<dyn ExternalRpcStorage>) -> anyhow::Result<BlockNumber> {
match rpc_storage.read_max_block_number_in_range(BlockNumber::ZERO, BlockNumber::MAX).await {
Expand Down
6 changes: 1 addition & 5 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,7 @@ pub async fn run_importer_online(
chain: Arc<BlockchainClient>,
sync_interval: Duration,
) -> anyhow::Result<()> {
// start from last imported block
let mut number = storage.read_mined_block_number().await?;
if number != BlockNumber::from(0) {
number = number.next();
}
let number = storage.read_block_number_to_resume_import().await?;

let (backlog_tx, backlog_rx) = mpsc::unbounded_channel();

Expand Down
31 changes: 31 additions & 0 deletions src/eth/storage/stratus_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,37 @@ impl StratusStorage {
// Block number
// -------------------------------------------------------------------------

#[tracing::instrument(skip_all)]
pub async fn read_block_number_to_resume_import(&self) -> anyhow::Result<BlockNumber> {
// if does not have the zero block present, should resume from zero
let zero = self.read_block(&BlockSelection::Number(BlockNumber::ZERO)).await?;
if zero.is_none() {
tracing::info!(number = %0, "block number to resume is ZERO because it does not exist in the storage");
return Ok(BlockNumber::ZERO);
}

// try to resume from active block number
let active_number = self.read_active_block_number().await?;
if let Some(active_number) = active_number {
tracing::info!(number = %active_number, "block number to resume is ACTIVE because it set in the storage");
return Ok(active_number);
}

// fallback to last mined block number
let mined_number = self.read_mined_block_number().await?;
let mined_block = self.read_block(&BlockSelection::Number(mined_number)).await?;
match mined_block {
Some(_) => {
tracing::info!(number = %mined_number, "block number to resume is MINED + 1 because it set in the storage and the block exist");
Ok(mined_number.next())
}
None => {
tracing::info!(number = %mined_number, "block number to resume is MINED because it is set in the storage but the block does not exist");
Ok(mined_number)
}
}
}

#[tracing::instrument(skip_all)]
pub async fn read_active_block_number(&self) -> anyhow::Result<Option<BlockNumber>> {
#[cfg(feature = "metrics")]
Expand Down

0 comments on commit 435818e

Please sign in to comment.