diff --git a/Cargo.lock b/Cargo.lock index fad7e44b6..cbffddffb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4015,6 +4015,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -4688,6 +4697,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", diff --git a/Cargo.toml b/Cargo.toml index ac9989131..8cf121499 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ thiserror = "=1.0.56" url = "=2.5.0" # async -tokio = { version = "=1.36.0", features = ["rt-multi-thread", "macros"] } +tokio = { version = "=1.36.0", features = ["rt-multi-thread", "macros", "signal"] } tokio-util = "=0.7.10" # config diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index fec75bd02..1ac55cbab 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -68,6 +68,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { // init shared data between importer and external rpc storage loader let (backlog_tx, backlog_rx) = mpsc::channel::(BACKLOG_SIZE); let cancellation = CancellationToken::new(); + signal_handler(cancellation.clone()); // load genesis accounts let initial_accounts = rpc_storage.read_initial_accounts().await?; @@ -79,7 +80,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { } // execute parallel tasks (external rpc storage loader and block importer) - tokio::spawn(execute_external_rpc_storage_loader( + let _loader_task = tokio::spawn(execute_external_rpc_storage_loader( rpc_storage, cancellation.clone(), config.paralellism, @@ -89,18 +90,40 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { )); let block_snapshots = config.export_snapshot.into_iter().map_into().collect(); - execute_block_importer(executor, &stratus_storage, csv, cancellation, backlog_rx, block_snapshots).await?; + let importer_task = tokio::spawn(execute_block_importer( + executor, + stratus_storage, + csv, + cancellation.clone(), + backlog_rx, + block_snapshots, + )); + + importer_task.await??; Ok(()) } +fn signal_handler(cancellation: CancellationToken) { + tokio::spawn(async move { + match tokio::signal::ctrl_c().await { + Ok(()) => { + tracing::info!("shutting down"); + cancellation.cancel(); + } + + Err(err) => tracing::error!("Unable to listen for shutdown signal: {}", err), + } + }); +} + // ----------------------------------------------------------------------------- // Block importer // ----------------------------------------------------------------------------- async fn execute_block_importer( // services executor: EthExecutor, - stratus_storage: &StratusStorage, + stratus_storage: Arc, mut csv: Option, cancellation: CancellationToken, // data @@ -117,6 +140,10 @@ async fn execute_block_importer( break "block loader finished or failed"; }; + if cancellation.is_cancelled() { + break "exiting block importer"; + }; + let block_start = blocks.first().unwrap().number(); let block_end = blocks.last().unwrap().number(); let block_last_index = blocks.len() - 1; @@ -134,7 +161,7 @@ async fn execute_block_importer( let mined_block = match csv { Some(ref mut csv) => { let mined_block = executor.import_external_to_temp(block.clone(), &receipts).await?; - import_external_to_csv(stratus_storage, csv, mined_block.clone(), block_index, block_last_index).await?; + import_external_to_csv(&stratus_storage, csv, mined_block.clone(), block_index, block_last_index).await?; mined_block } None => executor.import_external_to_perm(block.clone(), &receipts).await?,