Skip to content

Commit

Permalink
feat: run-with-importer bin (#547)
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw authored Apr 9, 2024
1 parent fa70325 commit 0e75fef
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 6 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ path = "src/bin/importer_online.rs"
name = "state-validator"
path = "src/bin/state_validator.rs"

[[bin]]
name = "run-with-importer"
path = "src/bin/run_with_importer.rs"

# ------------------------------------------------------------------------------
# Features
# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -143,4 +147,4 @@ rocks = ["rocksdb", "bincode"]
clone_on_ref_ptr = "warn"
disallowed_names = "warn"
manual_let_else = "warn"
semicolon_if_nothing_returned = "warn"
semicolon_if_nothing_returned = "warn"
135 changes: 135 additions & 0 deletions src/bin/run_with_importer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::sync::Arc;

use futures::StreamExt;
use futures::TryStreamExt;
use stratus::config::ImporterOnlineConfig;
use stratus::config::RunWithImporterConfig;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::ExternalBlock;
use stratus::eth::primitives::ExternalReceipt;
use stratus::eth::primitives::ExternalReceipts;
use stratus::eth::primitives::Hash;
use stratus::eth::rpc::serve_rpc;
use stratus::eth::storage::StratusStorage;
#[cfg(feature = "metrics")]
use stratus::infra::metrics;
use stratus::infra::BlockchainClient;
use stratus::init_global_services;
use stratus::log_and_err;
use tokio::try_join;

const RECEIPTS_PARALELLISM: usize = 10;

fn main() -> anyhow::Result<()> {
let config: RunWithImporterConfig = init_global_services();
let runtime = config.init_runtime();
runtime.block_on(run(config))
}

async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
let stratus_config = config.as_stratus();
let importer_config = config.as_importer();

let storage = stratus_config.stratus_storage.init().await?;
let executor = stratus_config.executor.init(Arc::clone(&storage));

let rpc_task = tokio::spawn(serve_rpc(executor, Arc::clone(&storage), stratus_config));
let importer_task = tokio::spawn(run_importer(importer_config, storage));

let join_result = try_join!(rpc_task, importer_task)?;
join_result.0?;
join_result.1?;

Ok(())
}

// TODO: I extracted this from importer_online.rs, in the future it'd make sense to extract this function into a separate file.
async fn run_importer(config: ImporterOnlineConfig, storage: Arc<StratusStorage>) -> anyhow::Result<()> {
// init services
let chain = BlockchainClient::new(&config.external_rpc).await?;
let executor = config.executor.init(Arc::clone(&storage));

// start from last imported block
let mut number = storage.read_mined_block_number().await?;

// keep importing forever
loop {
#[cfg(feature = "metrics")]
let start = metrics::now();

number = number.next();

// fetch block and receipts
let block = fetch_block(&chain, number).await?;

// fetch receipts in parallel
let mut receipts = Vec::with_capacity(block.transactions.len());
for tx in &block.transactions {
receipts.push(fetch_receipt(&chain, tx.hash()));
}
let receipts = futures::stream::iter(receipts).buffered(RECEIPTS_PARALELLISM).try_collect::<Vec<_>>().await?;

// import block
let receipts: ExternalReceipts = receipts.into();
executor.import_external_to_perm(block, &receipts).await?;

#[cfg(feature = "metrics")]
metrics::inc_import_online(start.elapsed());
}
}

async fn fetch_block(chain: &BlockchainClient, number: BlockNumber) -> anyhow::Result<ExternalBlock> {
let block = loop {
tracing::info!(%number, "fetching block");
let block = match chain.get_block_by_number(number).await {
Ok(json) => json,
Err(e) => {
tracing::warn!(reason = ?e, "retrying block download because error");
continue;
}
};

if block.is_null() {
#[cfg(not(feature = "perf"))]
{
tracing::warn!(reason = %"null", "retrying block download because block is not mined yet");
continue;
}

#[cfg(feature = "perf")]
std::process::exit(0);
}

break block;
};

match serde_json::from_value(block.clone()) {
Ok(block) => Ok(block),
Err(e) => log_and_err!(reason = e, payload = block, "failed to deserialize external block"),
}
}

async fn fetch_receipt(chain: &BlockchainClient, hash: Hash) -> anyhow::Result<ExternalReceipt> {
let receipt = loop {
tracing::info!(%hash, "fetching receipt");
let receipt = match chain.get_transaction_receipt(&hash).await {
Ok(json) => json,
Err(e) => {
tracing::warn!(reason = ?e, "retrying receipt download because error");
continue;
}
};

if receipt.is_null() {
tracing::warn!(reason = %"null", "retrying receipt download because block is not mined yet");
continue;
}

break receipt;
};

match serde_json::from_value(receipt.clone()) {
Ok(receipt) => Ok(receipt),
Err(e) => log_and_err!(reason = e, payload = receipt, "failed to deserialize external receipt"),
}
}
57 changes: 52 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub trait WithCommonConfig {
}

/// Configuration that can be used by any binary.
#[derive(Parser, Debug)]
#[derive(Clone, Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct CommonConfig {
/// Number of threads to execute global async tasks.
Expand Down Expand Up @@ -114,7 +114,7 @@ impl CommonConfig {
// -----------------------------------------------------------------------------

/// Configuration that can be used by any binary that interacts with Stratus storage.
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
pub struct StratusStorageConfig {
#[clap(flatten)]
pub temp_storage: TemporaryStorageConfig,
Expand Down Expand Up @@ -171,7 +171,7 @@ impl StratusStorageConfig {
// Config: Executor
// -----------------------------------------------------------------------------

#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone, Copy)]
pub struct ExecutorConfig {
/// Chain ID of the network.
#[arg(long = "chain-id", env = "CHAIN_ID")]
Expand Down Expand Up @@ -365,6 +365,53 @@ impl WithCommonConfig for ImporterOnlineConfig {
}
}

#[derive(Parser, Debug, derive_more::Deref)]
pub struct RunWithImporterConfig {
/// JSON-RPC binding address.
#[arg(short = 'a', long = "address", env = "ADDRESS", default_value = "0.0.0.0:3000")]
pub address: SocketAddr,

#[clap(flatten)]
pub stratus_storage: StratusStorageConfig,

#[clap(flatten)]
pub executor: ExecutorConfig,

/// External RPC endpoint to sync blocks with Stratus.
#[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")]
pub external_rpc: String,

#[deref]
#[clap(flatten)]
pub common: CommonConfig,
}

impl RunWithImporterConfig {
pub fn as_importer(&self) -> ImporterOnlineConfig {
ImporterOnlineConfig {
external_rpc: self.external_rpc.clone(),
executor: self.executor,
stratus_storage: self.stratus_storage.clone(),
common: self.common.clone(),
}
}

pub fn as_stratus(&self) -> StratusConfig {
StratusConfig {
address: self.address,
executor: self.executor,
stratus_storage: self.stratus_storage.clone(),
common: self.common.clone(),
}
}
}

impl WithCommonConfig for RunWithImporterConfig {
fn common(&self) -> &CommonConfig {
&self.common
}
}

// -----------------------------------------------------------------------------
// Config: StateValidator
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -490,7 +537,7 @@ impl FromStr for ExternalRpcStorageKind {
// -----------------------------------------------------------------------------

/// Temporary storage configuration.
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
pub struct TemporaryStorageConfig {
/// Temporary storage implementation.
#[arg(long = "temp-storage", env = "TEMP_STORAGE")]
Expand Down Expand Up @@ -530,7 +577,7 @@ impl FromStr for TemporaryStorageKind {
// -----------------------------------------------------------------------------

/// Permanent storage configuration.
#[derive(Parser, Debug)]
#[derive(Clone, Parser, Debug)]
pub struct PermanentStorageConfig {
/// Permamenent storage implementation.
#[arg(long = "perm-storage", env = "PERM_STORAGE")]
Expand Down

0 comments on commit 0e75fef

Please sign in to comment.