Skip to content

Commit

Permalink
chore: remove duplicate code from run_with_importer.rs (#558)
Browse files Browse the repository at this point in the history
* chore: remove duplicate code from run_with_importer.rs

* doc
  • Loading branch information
carneiro-cw authored Apr 9, 2024
1 parent 2974441 commit ea5658b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 106 deletions.
8 changes: 7 additions & 1 deletion src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use stratus::eth::primitives::ExternalBlock;
use stratus::eth::primitives::ExternalReceipt;
use stratus::eth::primitives::ExternalReceipts;
use stratus::eth::primitives::Hash;
use stratus::eth::storage::StratusStorage;
#[cfg(feature = "metrics")]
use stratus::infra::metrics;
use stratus::infra::BlockchainClient;
Expand All @@ -17,16 +18,21 @@ use stratus::log_and_err;
/// Number of transactions receipts that can be fetched in parallel.
const RECEIPTS_PARALELLISM: usize = 10;

#[allow(dead_code)]
fn main() -> anyhow::Result<()> {
let config: ImporterOnlineConfig = init_global_services();
let runtime = config.init_runtime();
runtime.block_on(run(config))
}

async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> {
let storage = config.stratus_storage.init().await?;
run_importer_online(config, storage).await
}

pub async fn run_importer_online(config: ImporterOnlineConfig, storage: Arc<StratusStorage>) -> anyhow::Result<()> {
// init services
let chain = BlockchainClient::new(&config.external_rpc).await?;
let storage = Arc::new(config.stratus_storage.init().await?);
let executor = config.executor.init(Arc::clone(&storage));

// start from last imported block
Expand Down
109 changes: 4 additions & 105 deletions src/bin/run_with_importer.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,15 @@
mod importer_online;

use std::sync::Arc;

use futures::StreamExt;
use futures::TryStreamExt;
use stratus::config::ImporterOnlineConfig;
use importer_online::run_importer_online;
use stratus::config::RunWithImporterConfig;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::ExternalBlock;
use stratus::eth::primitives::ExternalReceipt;
use stratus::eth::primitives::ExternalReceipts;
use stratus::eth::primitives::Hash;
use stratus::eth::rpc::serve_rpc;
use stratus::eth::storage::StratusStorage;
#[cfg(feature = "metrics")]
use stratus::infra::metrics;
use stratus::infra::BlockchainClient;
use stratus::init_global_services;
use stratus::log_and_err;
use tokio::try_join;

const RECEIPTS_PARALELLISM: usize = 10;

fn main() -> anyhow::Result<()> {
let config: RunWithImporterConfig = init_global_services();
let runtime = config.init_runtime();
Expand All @@ -34,102 +24,11 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
let executor = stratus_config.executor.init(Arc::clone(&storage));

let rpc_task = tokio::spawn(serve_rpc(executor, Arc::clone(&storage), stratus_config));
let importer_task = tokio::spawn(run_importer(importer_config, storage));
let importer_task = tokio::spawn(run_importer_online(importer_config, storage));

let join_result = try_join!(rpc_task, importer_task)?;
join_result.0?;
join_result.1?;

Ok(())
}

// TODO: I extracted this from importer_online.rs, in the future it'd make sense to extract this function into a separate file.
async fn run_importer(config: ImporterOnlineConfig, storage: Arc<StratusStorage>) -> anyhow::Result<()> {
// init services
let chain = BlockchainClient::new(&config.external_rpc).await?;
let executor = config.executor.init(Arc::clone(&storage));

// start from last imported block
let mut number = storage.read_mined_block_number().await?;

// keep importing forever
loop {
#[cfg(feature = "metrics")]
let start = metrics::now();

number = number.next();

// fetch block and receipts
let block = fetch_block(&chain, number).await?;

// fetch receipts in parallel
let mut receipts = Vec::with_capacity(block.transactions.len());
for tx in &block.transactions {
receipts.push(fetch_receipt(&chain, tx.hash()));
}
let receipts = futures::stream::iter(receipts).buffered(RECEIPTS_PARALELLISM).try_collect::<Vec<_>>().await?;

// import block
let receipts: ExternalReceipts = receipts.into();
executor.import_external_to_perm(block, &receipts).await?;

#[cfg(feature = "metrics")]
metrics::inc_import_online(start.elapsed());
}
}

async fn fetch_block(chain: &BlockchainClient, number: BlockNumber) -> anyhow::Result<ExternalBlock> {
let block = loop {
tracing::info!(%number, "fetching block");
let block = match chain.get_block_by_number(number).await {
Ok(json) => json,
Err(e) => {
tracing::warn!(reason = ?e, "retrying block download because error");
continue;
}
};

if block.is_null() {
#[cfg(not(feature = "perf"))]
{
tracing::warn!(reason = %"null", "retrying block download because block is not mined yet");
continue;
}

#[cfg(feature = "perf")]
std::process::exit(0);
}

break block;
};

match serde_json::from_value(block.clone()) {
Ok(block) => Ok(block),
Err(e) => log_and_err!(reason = e, payload = block, "failed to deserialize external block"),
}
}

async fn fetch_receipt(chain: &BlockchainClient, hash: Hash) -> anyhow::Result<ExternalReceipt> {
let receipt = loop {
tracing::info!(%hash, "fetching receipt");
let receipt = match chain.get_transaction_receipt(&hash).await {
Ok(json) => json,
Err(e) => {
tracing::warn!(reason = ?e, "retrying receipt download because error");
continue;
}
};

if receipt.is_null() {
tracing::warn!(reason = %"null", "retrying receipt download because block is not mined yet");
continue;
}

break receipt;
};

match serde_json::from_value(receipt.clone()) {
Ok(receipt) => Ok(receipt),
Err(e) => log_and_err!(reason = e, payload = receipt, "failed to deserialize external receipt"),
}
}

0 comments on commit ea5658b

Please sign in to comment.