Skip to content

Commit

Permalink
feat: separate threads for importer-offline, so perf profiling is easier
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw committed May 3, 2024
1 parent b4c6168 commit 211a8fe
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 21 deletions.
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ alias rpc-downloader := bin-rpc-downloader

# Bin: Import external RPC blocks from temporary storage to Stratus storage
bin-importer-offline *args="":
cargo run --bin importer-offline {{release_flag}} --features dev -- {{args}}
cargo run --bin importer-offline {{release_flag}} --features {{feature_flags}} -- {{args}}
alias importer-offline := bin-importer-offline

# Bin: Import external RPC blocks from temporary storage to Stratus storage - with rocksdb
Expand Down
57 changes: 37 additions & 20 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::min;
use std::fs;
use std::sync::Arc;
use std::thread;

use anyhow::anyhow;
use futures::try_join;
Expand All @@ -23,6 +24,7 @@ use stratus::ext::not;
use stratus::infra::metrics;
use stratus::log_and_err;
use stratus::GlobalServices;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

Expand All @@ -48,6 +50,9 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
let stratus_storage = config.stratus_storage.init().await?;
let executor = config.executor.init(Arc::clone(&stratus_storage)).await;

// init block snapshots to export
let block_snapshots = config.export_snapshot.into_iter().map_into().collect();

// init block range
let block_start = match config.block_start {
Some(start) => BlockNumber::from(start),
Expand Down Expand Up @@ -76,27 +81,39 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
}

// execute parallel tasks (external rpc storage loader and block importer)
let _loader_task = tokio::spawn(execute_external_rpc_storage_loader(
rpc_storage,
cancellation.clone(),
config.blocks_by_fetch,
config.paralellism,
block_start,
block_end,
backlog_tx,
));
let storage_thread = thread::Builder::new().name("loader".into());
let storage_tokio = Handle::current();
let storage_cancellation = cancellation.clone();
let _ = storage_thread.spawn(move || {
let _tokio_guard = storage_tokio.enter();

storage_tokio.block_on(execute_external_rpc_storage_loader(
rpc_storage,
storage_cancellation,
config.blocks_by_fetch,
config.paralellism,
block_start,
block_end,
backlog_tx,
))
});

let block_snapshots = config.export_snapshot.into_iter().map_into().collect();
let importer_task = tokio::spawn(execute_block_importer(
executor,
stratus_storage,
csv,
cancellation.clone(),
backlog_rx,
block_snapshots,
));

importer_task.await??;
let importer_thread = thread::Builder::new().name("importer".into());
let importer_tokio = Handle::current();
let importer_cancellation = cancellation.clone();
let importer_join = importer_thread.spawn(move || {
let _tokio_guard = importer_tokio.enter();
importer_tokio.block_on(execute_block_importer(
executor,
stratus_storage,
csv,
importer_cancellation,
backlog_rx,
block_snapshots,
))
})?;

let _ = importer_join.join();

Ok(())
}
Expand Down

0 comments on commit 211a8fe

Please sign in to comment.