Skip to content

Commit

Permalink
feat: graceful shutdown for offline importer (#557)
Browse files Browse the repository at this point in the history
* feat: graceful shutdown for offline importer

* fmt

* handle signal in a new task

* fmt

* extract signal handler
  • Loading branch information
carneiro-cw authored Apr 9, 2024
1 parent ea5658b commit 28af439
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ thiserror = "=1.0.56"
url = "=2.5.0"

# async
tokio = { version = "=1.36.0", features = ["rt-multi-thread", "macros"] }
tokio = { version = "=1.36.0", features = ["rt-multi-thread", "macros", "signal"] }
tokio-util = "=0.7.10"

# config
Expand Down
35 changes: 31 additions & 4 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
// init shared data between importer and external rpc storage loader
let (backlog_tx, backlog_rx) = mpsc::channel::<BacklogTask>(BACKLOG_SIZE);
let cancellation = CancellationToken::new();
signal_handler(cancellation.clone());

// load genesis accounts
let initial_accounts = rpc_storage.read_initial_accounts().await?;
Expand All @@ -79,7 +80,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
}

// execute parallel tasks (external rpc storage loader and block importer)
tokio::spawn(execute_external_rpc_storage_loader(
let _loader_task = tokio::spawn(execute_external_rpc_storage_loader(
rpc_storage,
cancellation.clone(),
config.paralellism,
Expand All @@ -89,18 +90,40 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
));

let block_snapshots = config.export_snapshot.into_iter().map_into().collect();
execute_block_importer(executor, &stratus_storage, csv, cancellation, backlog_rx, block_snapshots).await?;
let importer_task = tokio::spawn(execute_block_importer(
executor,
stratus_storage,
csv,
cancellation.clone(),
backlog_rx,
block_snapshots,
));

importer_task.await??;

Ok(())
}

fn signal_handler(cancellation: CancellationToken) {
tokio::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => {
tracing::info!("shutting down");
cancellation.cancel();
}

Err(err) => tracing::error!("Unable to listen for shutdown signal: {}", err),
}
});
}

// -----------------------------------------------------------------------------
// Block importer
// -----------------------------------------------------------------------------
async fn execute_block_importer(
// services
executor: EthExecutor,
stratus_storage: &StratusStorage,
stratus_storage: Arc<StratusStorage>,
mut csv: Option<CsvExporter>,
cancellation: CancellationToken,
// data
Expand All @@ -117,6 +140,10 @@ async fn execute_block_importer(
break "block loader finished or failed";
};

if cancellation.is_cancelled() {
break "exiting block importer";
};

let block_start = blocks.first().unwrap().number();
let block_end = blocks.last().unwrap().number();
let block_last_index = blocks.len() - 1;
Expand All @@ -134,7 +161,7 @@ async fn execute_block_importer(
let mined_block = match csv {
Some(ref mut csv) => {
let mined_block = executor.import_external_to_temp(block.clone(), &receipts).await?;
import_external_to_csv(stratus_storage, csv, mined_block.clone(), block_index, block_last_index).await?;
import_external_to_csv(&stratus_storage, csv, mined_block.clone(), block_index, block_last_index).await?;
mined_block
}
None => executor.import_external_to_perm(block.clone(), &receipts).await?,
Expand Down

0 comments on commit 28af439

Please sign in to comment.