From b42dde79b78eeeebca8cdabe033a7106170a9ca9 Mon Sep 17 00:00:00 2001 From: Renato Dinhani Date: Thu, 25 Jul 2024 17:20:24 -0300 Subject: [PATCH] enha: additional checks in importer-offline --- .clippy.toml | 1 + Cargo.toml | 3 ++- src/bin/importer_offline.rs | 35 ++++++++++++++++++++++++----------- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/.clippy.toml b/.clippy.toml index 354c5df4f..83ea73519 100644 --- a/.clippy.toml +++ b/.clippy.toml @@ -1 +1,2 @@ +allow-unwrap-in-tests = true disallowed-names = ["lock"] \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 86ef41241..325fb6655 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -194,4 +194,5 @@ semicolon_if_nothing_returned = "warn" unused_async = "warn" unused_self = "warn" used_underscore_binding = "warn" -wildcard_imports = "warn" \ No newline at end of file +wildcard_imports = "warn" +unwrap_used = "allow" \ No newline at end of file diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 1ccbbb8a7..e5e99d3c6 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -119,19 +119,32 @@ fn execute_block_importer( return Ok(()); }; - // receive new tasks to execute, or exit + // receive blocks to execute let Some((blocks, receipts)) = backlog_rx.blocking_recv() else { - tracing::info!("{} has no more blocks to process", TASK_NAME); + tracing::info!("{} has no more blocks to reexecute", TASK_NAME); return Ok(()); }; - // imports block transactions - let block_start = blocks.first().unwrap().number(); - let block_end = blocks.last().unwrap().number(); + // ensure range is not empty + let (Some(block_start), Some(block_end)) = (blocks.first(), blocks.last()) else { + let message = GlobalState::shutdown_from(TASK_NAME, "received empty block range to reexecute"); + return log_and_err!(message); + }; + + // track operation + let block_start = block_start.number(); + let block_end = block_end.number(); let blocks_len = blocks.len(); - let receipts = ExternalReceipts::from(receipts); + tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "reexecuting blocks"); - tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "reexecuting (and importing) blocks"); + // ensure block range have no gaps + if block_start.count_to(&block_end) != blocks_len as u64 { + let message = GlobalState::shutdown_from(TASK_NAME, "received block range with gaps to reexecute"); + return log_and_err!(message); + } + + // imports block transactions + let receipts = ExternalReceipts::from(receipts); let mut transaction_count = 0; let instant_before_execution = Instant::now(); @@ -230,10 +243,10 @@ async fn execute_external_rpc_storage_loader( } } -async fn load_blocks_and_receipts(rpc_storage: Arc, start: BlockNumber, end: BlockNumber) -> anyhow::Result { - tracing::info!(%start, %end, "loading blocks and receipts"); - let blocks_task = rpc_storage.read_blocks_in_range(start, end); - let receipts_task = rpc_storage.read_receipts_in_range(start, end); +async fn load_blocks_and_receipts(rpc_storage: Arc, block_start: BlockNumber, block_end: BlockNumber) -> anyhow::Result { + tracing::info!(%block_start, %block_end, "loading blocks and receipts"); + let blocks_task = rpc_storage.read_blocks_in_range(block_start, block_end); + let receipts_task = rpc_storage.read_receipts_in_range(block_start, block_end); try_join!(blocks_task, receipts_task) }