Skip to content

Commit

Permalink
enha: additional checks in importer-offline
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw committed Jul 25, 2024
1 parent 42ae1ce commit b42dde7
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 12 deletions.
1 change: 1 addition & 0 deletions .clippy.toml
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
allow-unwrap-in-tests = true
disallowed-names = ["lock"]
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,5 @@ semicolon_if_nothing_returned = "warn"
unused_async = "warn"
unused_self = "warn"
used_underscore_binding = "warn"
wildcard_imports = "warn"
wildcard_imports = "warn"
unwrap_used = "allow"
35 changes: 24 additions & 11 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,32 @@ fn execute_block_importer(
return Ok(());
};

// receive new tasks to execute, or exit
// receive blocks to execute
let Some((blocks, receipts)) = backlog_rx.blocking_recv() else {
tracing::info!("{} has no more blocks to process", TASK_NAME);
tracing::info!("{} has no more blocks to reexecute", TASK_NAME);
return Ok(());
};

// imports block transactions
let block_start = blocks.first().unwrap().number();
let block_end = blocks.last().unwrap().number();
// ensure range is not empty
let (Some(block_start), Some(block_end)) = (blocks.first(), blocks.last()) else {
let message = GlobalState::shutdown_from(TASK_NAME, "received empty block range to reexecute");
return log_and_err!(message);
};

// track operation
let block_start = block_start.number();
let block_end = block_end.number();
let blocks_len = blocks.len();
let receipts = ExternalReceipts::from(receipts);
tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "reexecuting blocks");

tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "reexecuting (and importing) blocks");
// ensure block range have no gaps
if block_start.count_to(&block_end) != blocks_len as u64 {
let message = GlobalState::shutdown_from(TASK_NAME, "received block range with gaps to reexecute");
return log_and_err!(message);
}

// imports block transactions
let receipts = ExternalReceipts::from(receipts);
let mut transaction_count = 0;
let instant_before_execution = Instant::now();

Expand Down Expand Up @@ -230,10 +243,10 @@ async fn execute_external_rpc_storage_loader(
}
}

async fn load_blocks_and_receipts(rpc_storage: Arc<dyn ExternalRpcStorage>, start: BlockNumber, end: BlockNumber) -> anyhow::Result<BacklogTask> {
tracing::info!(%start, %end, "loading blocks and receipts");
let blocks_task = rpc_storage.read_blocks_in_range(start, end);
let receipts_task = rpc_storage.read_receipts_in_range(start, end);
async fn load_blocks_and_receipts(rpc_storage: Arc<dyn ExternalRpcStorage>, block_start: BlockNumber, block_end: BlockNumber) -> anyhow::Result<BacklogTask> {
tracing::info!(%block_start, %block_end, "loading blocks and receipts");
let blocks_task = rpc_storage.read_blocks_in_range(block_start, block_end);
let receipts_task = rpc_storage.read_receipts_in_range(block_start, block_end);
try_join!(blocks_task, receipts_task)
}

Expand Down

0 comments on commit b42dde7

Please sign in to comment.