Skip to content

Commit

Permalink
feat: GlobalState::shutdown instead of CancellationToken everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw committed May 22, 2024
1 parent 76808a9 commit 5555d0f
Show file tree
Hide file tree
Showing 18 changed files with 309 additions and 349 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ humantime = "=2.1.0"
indexmap = { version = "=2.2.6", features = ["serde"] }
itertools = "=0.12.1"
nonempty = { version = "=0.10.0", features = ["serialize"] }
once_cell = "=1.19.0"
paste = "=1.0.14"
phf = "=0.11.2"
pin-project = "=1.1.5"
Expand Down Expand Up @@ -61,12 +62,9 @@ ethereum-types = "=0.14.1"
ethers-core = "=2.0.14"
evm-disassembler = "=0.5.0"
keccak-hasher = "=0.15.3" # this version must be compatible with triehash


rlp = "=0.5.2"
triehash = "=0.8.4"


# network
jsonrpsee = { version = "=0.22.4", features = ["server", "client"] }
reqwest = { version = "=0.12.4", features = ["json"] }
Expand Down
98 changes: 37 additions & 61 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ use stratus::eth::storage::StratusStorage;
use stratus::eth::BlockMiner;
use stratus::eth::Executor;
use stratus::ext::not;
use stratus::log_and_err;
use stratus::utils::signal_handler;
use stratus::infra::tracing::warn_task_cancellation;
use stratus::GlobalServices;
use stratus::GlobalState;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

/// Number of tasks in the backlog. Each task contains 10_000 blocks and all receipts for them.
const BACKLOG_SIZE: usize = 50;
Expand All @@ -40,18 +39,15 @@ const CSV_CHUNKING_BLOCKS_INTERVAL: u64 = 2_000_000;
type BacklogTask = (Vec<ExternalBlock>, Vec<ExternalReceipt>);

fn main() -> anyhow::Result<()> {
let global_services = GlobalServices::<ImporterOfflineConfig>::init();
let global_services = GlobalServices::<ImporterOfflineConfig>::init()?;
global_services.runtime.block_on(run(global_services.config))
}

async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
// init cancellation handler
let cancellation = signal_handler();

// init services
let rpc_storage = config.rpc_storage.init().await?;
let storage = config.storage.init().await?;
let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?;
let miner = config.miner.init(Arc::clone(&storage), None).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), None, None).await;

// init block snapshots to export
Expand Down Expand Up @@ -85,13 +81,11 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
// execute thread: external rpc storage loader
let storage_thread = thread::Builder::new().name("storage-loader".into());
let storage_tokio = Handle::current();
let storage_cancellation = cancellation.clone();
let _ = storage_thread.spawn(move || {
let _tokio_guard = storage_tokio.enter();

let result = storage_tokio.block_on(execute_external_rpc_storage_loader(
rpc_storage,
storage_cancellation,
config.blocks_by_fetch,
config.paralellism,
block_start,
Expand All @@ -106,18 +100,9 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
// execute thread: block importer
let importer_thread = thread::Builder::new().name("block-importer".into());
let importer_tokio = Handle::current();
let importer_cancellation = cancellation.clone();
let importer_join = importer_thread.spawn(move || {
let _tokio_guard = importer_tokio.enter();
let result = importer_tokio.block_on(execute_block_importer(
executor,
miner,
storage,
csv,
importer_cancellation,
backlog_rx,
block_snapshots,
));
let result = importer_tokio.block_on(execute_block_importer(executor, miner, storage, csv, backlog_rx, block_snapshots));
if let Err(e) = result {
tracing::error!(reason = ?e, "block-importer failed");
}
Expand All @@ -138,31 +123,32 @@ async fn execute_block_importer(
miner: Arc<BlockMiner>,
storage: Arc<StratusStorage>,
mut csv: Option<CsvExporter>,
cancellation: CancellationToken,
// data
mut backlog_rx: mpsc::Receiver<BacklogTask>,
blocks_to_export_snapshot: Vec<BlockNumber>,
) -> anyhow::Result<()> {
tracing::info!("block importer starting");

// import blocks and transactions in foreground
let reason = loop {
// retrieve new tasks to execute
let Some((blocks, receipts)) = backlog_rx.recv().await else {
cancellation.cancel();
break "block loader finished or failed";
const TASK_NAME: &str = "external-block-executor";
tracing::info!("starting {}", TASK_NAME);

loop {
// check cancellation
if GlobalState::is_shutdown() {
warn_task_cancellation(TASK_NAME);
return Ok(());
};

if cancellation.is_cancelled() {
break "exiting block importer";
// retrieve new tasks to execute or exit
let Some((blocks, receipts)) = backlog_rx.recv().await else {
tracing::info!("{} has no more blocks to process", TASK_NAME);
return Ok(());
};

// imports block transactions
let block_start = blocks.first().unwrap().number();
let block_end = blocks.last().unwrap().number();
let block_last_index = blocks.len() - 1;
let receipts = ExternalReceipts::from(receipts);

// imports block transactions
tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "importing blocks");
for (block_index, block) in blocks.into_iter().enumerate() {
async {
Expand All @@ -187,10 +173,7 @@ async fn execute_block_importer(
}
.await?;
}
};

tracing::info!(%reason, "block importer finished");
Ok(())
}
}

// -----------------------------------------------------------------------------
Expand All @@ -200,69 +183,62 @@ async fn execute_block_importer(
async fn execute_external_rpc_storage_loader(
// services
rpc_storage: Arc<dyn ExternalRpcStorage>,
cancellation: CancellationToken,
// data
blocks_by_fetch: usize,
paralellism: usize,
mut start: BlockNumber,
end: BlockNumber,
backlog: mpsc::Sender<BacklogTask>,
) -> anyhow::Result<()> {
tracing::info!(%start, %end, "external rpc storage loader starting");
const TASK_NAME: &str = "external-block-loader";
tracing::info!(%start, %end, "starting {}", TASK_NAME);

// prepare loads to be executed in parallel
let mut tasks = Vec::new();
while start <= end {
let end = min(start + (blocks_by_fetch - 1), end);

let task = load_blocks_and_receipts(Arc::clone(&rpc_storage), cancellation.clone(), start, end);
let task = load_blocks_and_receipts(Arc::clone(&rpc_storage), start, end);
tasks.push(task);

start += blocks_by_fetch;
}

// execute loads in parallel
let mut tasks = futures::stream::iter(tasks).buffered(paralellism);
let reason = loop {
loop {
// check cancellation
if GlobalState::is_shutdown() {
warn_task_cancellation(TASK_NAME);
return Ok(());
};

// retrieve next batch of loaded blocks
// if finished, do not cancel, it is expected to finish
let Some(result) = tasks.next().await else {
break "no more blocks to process";
tracing::info!("{} has no more blocks to process", TASK_NAME);
return Ok(());
};

// check if executed correctly
let Ok((blocks, receipts)) = result else {
cancellation.cancel();
break "block or receipt fetch failed";
return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to fetch block or receipt")));
};

// check blocks were really loaded
if blocks.is_empty() {
cancellation.cancel();
return log_and_err!("no blocks returned when they were expected");
return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "no blocks returned when they were expected")));
}

// send to backlog
if backlog.send((blocks, receipts)).await.is_err() {
tracing::error!("failed to send task to importer");
cancellation.cancel();
return log_and_err!("failed to send blocks and receipts to importer");
return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to importer")));
};
};

tracing::info!(%reason, "external rpc storage loader finished");
Ok(())
}
}

async fn load_blocks_and_receipts(
rpc_storage: Arc<dyn ExternalRpcStorage>,
cancellation: CancellationToken,
start: BlockNumber,
end: BlockNumber,
) -> anyhow::Result<BacklogTask> {
async fn load_blocks_and_receipts(rpc_storage: Arc<dyn ExternalRpcStorage>, start: BlockNumber, end: BlockNumber) -> anyhow::Result<BacklogTask> {
tracing::info!(%start, %end, "retrieving blocks and receipts");
if cancellation.is_cancelled() {
return Err(anyhow!("cancelled"));
}
let blocks_task = rpc_storage.read_blocks_in_range(start, end);
let receipts_task = rpc_storage.read_receipts_in_range(start, end);
try_join!(blocks_task, receipts_task)
Expand Down
Loading

0 comments on commit 5555d0f

Please sign in to comment.