Skip to content

Commit

Permalink
Merge branch 'external_relayer' of https://github.com/cloudwalk/stratus
Browse files Browse the repository at this point in the history
… into external_relayer
  • Loading branch information
gabriel-aranha-cw committed May 28, 2024
2 parents 7ef66a5 + 50b1806 commit 0403f82
Show file tree
Hide file tree
Showing 27 changed files with 347 additions and 171 deletions.
106 changes: 64 additions & 42 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ use stratus::eth::storage::InMemoryPermanentStorage;
use stratus::eth::storage::StratusStorage;
use stratus::eth::BlockMiner;
use stratus::eth::Executor;
use stratus::ext::ResultExt;
use stratus::infra::tracing::info_task_spawn;
use stratus::log_and_err;
use stratus::utils::calculate_tps_and_bpm;
use stratus::GlobalServices;
use stratus::GlobalState;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -79,34 +83,48 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
// execute thread: external rpc storage loader
let storage_thread = thread::Builder::new().name("storage-loader".into());
let storage_tokio = Handle::current();
let _ = storage_thread.spawn(move || {
let _tokio_guard = storage_tokio.enter();

let result = storage_tokio.block_on(execute_external_rpc_storage_loader(
rpc_storage,
config.blocks_by_fetch,
config.paralellism,
block_start,
block_end,
backlog_tx,
));
if let Err(e) = result {
tracing::error!(reason = ?e, "storage-loader failed");
}
});

info_task_spawn("storage-loader");
let storage_loader_thread = storage_thread
.spawn(move || {
let _tokio_guard = storage_tokio.enter();

let result = storage_tokio.block_on(execute_external_rpc_storage_loader(
rpc_storage,
config.blocks_by_fetch,
config.paralellism,
block_start,
block_end,
backlog_tx,
));
if let Err(e) = result {
tracing::error!(reason = ?e, "storage-loader failed");
}
})
.expect("spawning storage-loader thread should not fail");

// execute thread: block importer
let importer_thread = thread::Builder::new().name("block-importer".into());
let importer_tokio = Handle::current();
let importer_join = importer_thread.spawn(move || {
let _tokio_guard = importer_tokio.enter();
let result = importer_tokio.block_on(execute_block_importer(executor, miner, storage, csv, backlog_rx, block_snapshots));
if let Err(e) = result {
tracing::error!(reason = ?e, "block-importer failed");
}
})?;

let _ = importer_join.join();
info_task_spawn("block-importer");
let block_importer_thread = importer_thread
.spawn(move || {
let _tokio_guard = importer_tokio.enter();
let result = importer_tokio.block_on(execute_block_importer(executor, miner, storage, csv, backlog_rx, block_snapshots));
if let Err(e) = result {
tracing::error!(reason = ?e, "block-importer failed");
}
})
.expect("spawning block-importer thread should not fail");

// await tasks
if let Err(e) = block_importer_thread.join() {
tracing::error!(reason = ?e, "block-importer thread failed");
}
if let Err(e) = storage_loader_thread.join() {
tracing::error!(reason = ?e, "storage-loader thread failed");
}

Ok(())
}
Expand All @@ -126,7 +144,6 @@ async fn execute_block_importer(
blocks_to_export_snapshot: Vec<BlockNumber>,
) -> anyhow::Result<()> {
const TASK_NAME: &str = "external-block-executor";
tracing::info!("starting {}", TASK_NAME);

// receives blocks and receipts from the backlog to reexecute and import
loop {
Expand Down Expand Up @@ -175,21 +192,15 @@ async fn execute_block_importer(
}
}

let seconds_elapsed = match instant_before_execution.elapsed().as_secs() as usize {
// avoid division by zero
0 => 1,
non_zero => non_zero,
};
let tps = transaction_count.checked_div(seconds_elapsed).unwrap_or(transaction_count);
let minutes_elapsed = seconds_elapsed as f64 / 60.0;
let blocks_per_minute = blocks_len as f64 / minutes_elapsed;
let duration = instant_before_execution.elapsed();
let (tps, bpm) = calculate_tps_and_bpm(duration, transaction_count, blocks_len);

tracing::info!(
tps,
blocks_per_minute = format_args!("{blocks_per_minute:.2}"),
seconds_elapsed,
blocks_per_minute = format_args!("{bpm:.2}"),
?duration,
%block_start,
%block_end,
transaction_count,
receipts = receipts.len(),
"reexecuted blocks batch",
);
Expand All @@ -211,7 +222,7 @@ async fn execute_external_rpc_storage_loader(
backlog: mpsc::Sender<BacklogTask>,
) -> anyhow::Result<()> {
const TASK_NAME: &str = "external-block-loader";
tracing::info!(%start, %end, "starting {}", TASK_NAME);
tracing::info!(%start, %end, "creating task {}", TASK_NAME);

// prepare loads to be executed in parallel
let mut tasks = Vec::new();
Expand Down Expand Up @@ -239,13 +250,18 @@ async fn execute_external_rpc_storage_loader(
};

// check if executed correctly
let Ok((blocks, receipts)) = result else {
return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to fetch block or receipt")));
let (blocks, receipts) = match result {
Ok((blocks, receipts)) => (blocks, receipts),
Err(e) => {
let message = GlobalState::shutdown_from(TASK_NAME, "failed to fetch block or receipt");
return log_and_err!(reason = e, message);
}
};

// check blocks were really loaded
if blocks.is_empty() {
return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "no blocks returned when they were expected")));
let message = GlobalState::shutdown_from(TASK_NAME, "no blocks returned when they were expected");
return log_and_err!(message);
}

// send to backlog
Expand Down Expand Up @@ -284,9 +300,15 @@ fn export_snapshot(external_block: &ExternalBlock, external_receipts: &ExternalR
fs::create_dir_all(&dir)?;

// write json
fs::write(format!("{}/block.json", dir), serde_json::to_string_pretty(external_block)?)?;
fs::write(format!("{}/receipts.json", dir), serde_json::to_string_pretty(&receipts_snapshot)?)?;
fs::write(format!("{}/snapshot.json", dir), serde_json::to_string_pretty(&state_snapshot)?)?;
fs::write(format!("{}/block.json", dir), serde_json::to_string_pretty(external_block).expect_infallible())?;
fs::write(
format!("{}/receipts.json", dir),
serde_json::to_string_pretty(&receipts_snapshot).expect_infallible(),
)?;
fs::write(
format!("{}/snapshot.json", dir),
serde_json::to_string_pretty(&state_snapshot).expect_infallible(),
)?;

Ok(())
}
Expand Down
67 changes: 47 additions & 20 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use stratus::infra::metrics;
use stratus::infra::tracing::warn_task_rx_closed;
use stratus::infra::tracing::warn_task_tx_closed;
use stratus::infra::BlockchainClient;
use stratus::log_and_err;
use stratus::utils::calculate_tps;
use stratus::GlobalServices;
use stratus::GlobalState;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -111,7 +113,9 @@ pub async fn run_importer_online(
let task_block_fetcher = tokio::spawn(start_block_fetcher(block_fetcher_chain, backlog_tx, number));

// await all tasks
try_join!(task_executor, task_block_fetcher, task_number_fetcher)?;
if let Err(e) = try_join!(task_executor, task_block_fetcher, task_number_fetcher) {
tracing::error!(reason = ?e, "importer-online failed");
}
Ok(())
}

Expand All @@ -120,28 +124,46 @@ pub async fn run_importer_online(
// -----------------------------------------------------------------------------

// Executes external blocks and persist them to storage.
async fn start_block_executor(executor: Arc<Executor>, miner: Arc<BlockMiner>, mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec<ExternalReceipt>)>) {
async fn start_block_executor(
executor: Arc<Executor>,
miner: Arc<BlockMiner>,
mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec<ExternalReceipt>)>,
) -> anyhow::Result<()> {
const TASK_NAME: &str = "block-executor";

while let Some((block, receipts)) = backlog_rx.recv().await {
if GlobalState::warn_if_shutdown(TASK_NAME) {
return;
return Ok(());
}

#[cfg(feature = "metrics")]
let start = metrics::now();

// execute and mine
let receipts = ExternalReceipts::from(receipts);

tracing::info!(number = %block.number(), txs_len = block.transactions.len(), "reexecuting external block");
if executor.reexecute_external(&block, &receipts).await.is_err() {
GlobalState::shutdown_from(TASK_NAME, "failed to re-execute external block");
return;
if let Err(e) = executor.reexecute_external(&block, &receipts).await {
let message = GlobalState::shutdown_from(TASK_NAME, "failed to re-execute external block");
return log_and_err!(reason = e, message);
};
if miner.mine_external_mixed_and_commit().await.is_err() {
GlobalState::shutdown_from(TASK_NAME, "failed to mine external block");
return;

// statistics
#[cfg(feature = "metrics")]
{
let duration = start.elapsed();
let tps = calculate_tps(duration, block.transactions.len());

tracing::info!(
tps,
duraton = %duration.to_string_ext(),
block_number = ?block.number(),
receipts = receipts.len(),
"reexecuted external block",
);
}

if let Err(e) = miner.mine_external_mixed_and_commit().await {
let message = GlobalState::shutdown_from(TASK_NAME, "failed to mine external block");
return log_and_err!(reason = e, message);
};

#[cfg(feature = "metrics")]
Expand All @@ -152,14 +174,15 @@ async fn start_block_executor(executor: Arc<Executor>, miner: Arc<BlockMiner>, m
}

warn_task_tx_closed(TASK_NAME);
Ok(())
}

// -----------------------------------------------------------------------------
// Number fetcher
// -----------------------------------------------------------------------------

/// Retrieves the blockchain current block number.
async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Duration) {
async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Duration) -> anyhow::Result<()> {
const TASK_NAME: &str = "external-number-fetcher";

// subscribe to newHeads event if WS is enabled
Expand All @@ -168,9 +191,9 @@ async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Durat
tracing::info!("subscribing {} to newHeads event", TASK_NAME);
match chain.subscribe_new_heads().await {
Ok(sub) => Some(sub),
Err(_) => {
GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event");
return;
Err(e) => {
let message = GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event");
return log_and_err!(reason = e, message);
}
}
}
Expand All @@ -182,7 +205,7 @@ async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Durat

loop {
if GlobalState::warn_if_shutdown(TASK_NAME) {
return;
return Ok(());
}

// if we have a subscription, try to read from subscription.
Expand Down Expand Up @@ -223,7 +246,7 @@ async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Durat
}

// fallback to polling
tracing::warn!("number-fetcher falling back to http polling because subscription failed or it not enabled");
tracing::warn!("number-fetcher falling back to http polling because subscription failed or it is not enabled");
match chain.fetch_block_number().await {
Ok(number) => {
tracing::info!(
Expand All @@ -246,12 +269,16 @@ async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Durat
// -----------------------------------------------------------------------------

/// Retrieves blocks and receipts.
async fn start_block_fetcher(chain: Arc<BlockchainClient>, backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec<ExternalReceipt>)>, mut number: BlockNumber) {
async fn start_block_fetcher(
chain: Arc<BlockchainClient>,
backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec<ExternalReceipt>)>,
mut number: BlockNumber,
) -> anyhow::Result<()> {
const TASK_NAME: &str = "external-block-fetcher";

loop {
if GlobalState::warn_if_shutdown(TASK_NAME) {
return;
return Ok(());
}

// if we are ahead of current block number, await until we are behind again
Expand All @@ -277,7 +304,7 @@ async fn start_block_fetcher(chain: Arc<BlockchainClient>, backlog_tx: mpsc::Unb
while let Some((block, receipts)) = tasks.next().await {
if backlog_tx.send((block, receipts)).is_err() {
warn_task_rx_closed(TASK_NAME);
return;
return Ok(());
}
}
}
Expand Down
Loading

0 comments on commit 0403f82

Please sign in to comment.