From 0e75fefe6109709cb461a032be3f48fa7471693c Mon Sep 17 00:00:00 2001 From: carneiro-cw <156914855+carneiro-cw@users.noreply.github.com> Date: Tue, 9 Apr 2024 11:11:26 -0300 Subject: [PATCH] feat: run-with-importer bin (#547) --- Cargo.toml | 6 +- src/bin/run_with_importer.rs | 135 +++++++++++++++++++++++++++++++++++ src/config.rs | 57 +++++++++++++-- 3 files changed, 192 insertions(+), 6 deletions(-) create mode 100644 src/bin/run_with_importer.rs diff --git a/Cargo.toml b/Cargo.toml index dab728ba8..ac9989131 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,6 +116,10 @@ path = "src/bin/importer_online.rs" name = "state-validator" path = "src/bin/state_validator.rs" +[[bin]] +name = "run-with-importer" +path = "src/bin/run_with_importer.rs" + # ------------------------------------------------------------------------------ # Features # ------------------------------------------------------------------------------ @@ -143,4 +147,4 @@ rocks = ["rocksdb", "bincode"] clone_on_ref_ptr = "warn" disallowed_names = "warn" manual_let_else = "warn" -semicolon_if_nothing_returned = "warn" \ No newline at end of file +semicolon_if_nothing_returned = "warn" diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs new file mode 100644 index 000000000..d3f3bf6b1 --- /dev/null +++ b/src/bin/run_with_importer.rs @@ -0,0 +1,135 @@ +use std::sync::Arc; + +use futures::StreamExt; +use futures::TryStreamExt; +use stratus::config::ImporterOnlineConfig; +use stratus::config::RunWithImporterConfig; +use stratus::eth::primitives::BlockNumber; +use stratus::eth::primitives::ExternalBlock; +use stratus::eth::primitives::ExternalReceipt; +use stratus::eth::primitives::ExternalReceipts; +use stratus::eth::primitives::Hash; +use stratus::eth::rpc::serve_rpc; +use stratus::eth::storage::StratusStorage; +#[cfg(feature = "metrics")] +use stratus::infra::metrics; +use stratus::infra::BlockchainClient; +use stratus::init_global_services; +use stratus::log_and_err; +use tokio::try_join; + +const RECEIPTS_PARALELLISM: usize = 10; + +fn main() -> anyhow::Result<()> { + let config: RunWithImporterConfig = init_global_services(); + let runtime = config.init_runtime(); + runtime.block_on(run(config)) +} + +async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { + let stratus_config = config.as_stratus(); + let importer_config = config.as_importer(); + + let storage = stratus_config.stratus_storage.init().await?; + let executor = stratus_config.executor.init(Arc::clone(&storage)); + + let rpc_task = tokio::spawn(serve_rpc(executor, Arc::clone(&storage), stratus_config)); + let importer_task = tokio::spawn(run_importer(importer_config, storage)); + + let join_result = try_join!(rpc_task, importer_task)?; + join_result.0?; + join_result.1?; + + Ok(()) +} + +// TODO: I extracted this from importer_online.rs, in the future it'd make sense to extract this function into a separate file. +async fn run_importer(config: ImporterOnlineConfig, storage: Arc) -> anyhow::Result<()> { + // init services + let chain = BlockchainClient::new(&config.external_rpc).await?; + let executor = config.executor.init(Arc::clone(&storage)); + + // start from last imported block + let mut number = storage.read_mined_block_number().await?; + + // keep importing forever + loop { + #[cfg(feature = "metrics")] + let start = metrics::now(); + + number = number.next(); + + // fetch block and receipts + let block = fetch_block(&chain, number).await?; + + // fetch receipts in parallel + let mut receipts = Vec::with_capacity(block.transactions.len()); + for tx in &block.transactions { + receipts.push(fetch_receipt(&chain, tx.hash())); + } + let receipts = futures::stream::iter(receipts).buffered(RECEIPTS_PARALELLISM).try_collect::>().await?; + + // import block + let receipts: ExternalReceipts = receipts.into(); + executor.import_external_to_perm(block, &receipts).await?; + + #[cfg(feature = "metrics")] + metrics::inc_import_online(start.elapsed()); + } +} + +async fn fetch_block(chain: &BlockchainClient, number: BlockNumber) -> anyhow::Result { + let block = loop { + tracing::info!(%number, "fetching block"); + let block = match chain.get_block_by_number(number).await { + Ok(json) => json, + Err(e) => { + tracing::warn!(reason = ?e, "retrying block download because error"); + continue; + } + }; + + if block.is_null() { + #[cfg(not(feature = "perf"))] + { + tracing::warn!(reason = %"null", "retrying block download because block is not mined yet"); + continue; + } + + #[cfg(feature = "perf")] + std::process::exit(0); + } + + break block; + }; + + match serde_json::from_value(block.clone()) { + Ok(block) => Ok(block), + Err(e) => log_and_err!(reason = e, payload = block, "failed to deserialize external block"), + } +} + +async fn fetch_receipt(chain: &BlockchainClient, hash: Hash) -> anyhow::Result { + let receipt = loop { + tracing::info!(%hash, "fetching receipt"); + let receipt = match chain.get_transaction_receipt(&hash).await { + Ok(json) => json, + Err(e) => { + tracing::warn!(reason = ?e, "retrying receipt download because error"); + continue; + } + }; + + if receipt.is_null() { + tracing::warn!(reason = %"null", "retrying receipt download because block is not mined yet"); + continue; + } + + break receipt; + }; + + match serde_json::from_value(receipt.clone()) { + Ok(receipt) => Ok(receipt), + Err(e) => log_and_err!(reason = e, payload = receipt, "failed to deserialize external receipt"), + } +} diff --git a/src/config.rs b/src/config.rs index c91492728..78c0522b1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -62,7 +62,7 @@ pub trait WithCommonConfig { } /// Configuration that can be used by any binary. -#[derive(Parser, Debug)] +#[derive(Clone, Parser, Debug)] #[command(author, version, about, long_about = None)] pub struct CommonConfig { /// Number of threads to execute global async tasks. @@ -114,7 +114,7 @@ impl CommonConfig { // ----------------------------------------------------------------------------- /// Configuration that can be used by any binary that interacts with Stratus storage. -#[derive(Parser, Debug)] +#[derive(Parser, Debug, Clone)] pub struct StratusStorageConfig { #[clap(flatten)] pub temp_storage: TemporaryStorageConfig, @@ -171,7 +171,7 @@ impl StratusStorageConfig { // Config: Executor // ----------------------------------------------------------------------------- -#[derive(Parser, Debug)] +#[derive(Parser, Debug, Clone, Copy)] pub struct ExecutorConfig { /// Chain ID of the network. #[arg(long = "chain-id", env = "CHAIN_ID")] @@ -365,6 +365,53 @@ impl WithCommonConfig for ImporterOnlineConfig { } } +#[derive(Parser, Debug, derive_more::Deref)] +pub struct RunWithImporterConfig { + /// JSON-RPC binding address. + #[arg(short = 'a', long = "address", env = "ADDRESS", default_value = "0.0.0.0:3000")] + pub address: SocketAddr, + + #[clap(flatten)] + pub stratus_storage: StratusStorageConfig, + + #[clap(flatten)] + pub executor: ExecutorConfig, + + /// External RPC endpoint to sync blocks with Stratus. + #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")] + pub external_rpc: String, + + #[deref] + #[clap(flatten)] + pub common: CommonConfig, +} + +impl RunWithImporterConfig { + pub fn as_importer(&self) -> ImporterOnlineConfig { + ImporterOnlineConfig { + external_rpc: self.external_rpc.clone(), + executor: self.executor, + stratus_storage: self.stratus_storage.clone(), + common: self.common.clone(), + } + } + + pub fn as_stratus(&self) -> StratusConfig { + StratusConfig { + address: self.address, + executor: self.executor, + stratus_storage: self.stratus_storage.clone(), + common: self.common.clone(), + } + } +} + +impl WithCommonConfig for RunWithImporterConfig { + fn common(&self) -> &CommonConfig { + &self.common + } +} + // ----------------------------------------------------------------------------- // Config: StateValidator // ----------------------------------------------------------------------------- @@ -490,7 +537,7 @@ impl FromStr for ExternalRpcStorageKind { // ----------------------------------------------------------------------------- /// Temporary storage configuration. -#[derive(Parser, Debug)] +#[derive(Parser, Debug, Clone)] pub struct TemporaryStorageConfig { /// Temporary storage implementation. #[arg(long = "temp-storage", env = "TEMP_STORAGE")] @@ -530,7 +577,7 @@ impl FromStr for TemporaryStorageKind { // ----------------------------------------------------------------------------- /// Permanent storage configuration. -#[derive(Parser, Debug)] +#[derive(Clone, Parser, Debug)] pub struct PermanentStorageConfig { /// Permamenent storage implementation. #[arg(long = "perm-storage", env = "PERM_STORAGE")]