Skip to content

Commit

Permalink
feat: importer-online (#860)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw authored May 17, 2024
1 parent f7f8ac7 commit 1680c95
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 107 deletions.
284 changes: 179 additions & 105 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::collections::HashMap;
use std::cmp::min;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use futures::try_join;
use futures::StreamExt;
use serde::Deserialize;
use stratus::config::ImporterOnlineConfig;
use stratus::eth::primitives::BlockNumber;
Expand All @@ -15,14 +19,34 @@ use stratus::eth::Executor;
#[cfg(feature = "metrics")]
use stratus::infra::metrics;
use stratus::infra::BlockchainClient;
use stratus::log_and_err;
use stratus::utils::signal_handler;
use stratus::GlobalServices;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio::task::yield_now;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;

// -----------------------------------------------------------------------------
// Globals
// -----------------------------------------------------------------------------

/// Current block number used by the number fetcher and block fetcher.
///
/// It is a global to avoid unnecessary synchronization using a channel.
static RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0);

// -----------------------------------------------------------------------------
// Constants
// -----------------------------------------------------------------------------
/// Number of blocks that are downloaded in parallel.
const PARALLEL_BLOCKS: usize = 3;

/// Number of receipts that are downloaded in parallel.
const PARALLEL_RECEIPTS: usize = 100;

// -----------------------------------------------------------------------------
// Execution
// -----------------------------------------------------------------------------
#[allow(dead_code)]
fn main() -> anyhow::Result<()> {
let global_services = GlobalServices::<ImporterOnlineConfig>::init();
Expand All @@ -34,7 +58,7 @@ async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> {
let relayer = config.relayer.init(Arc::clone(&storage)).await?;
let miner = config.miner.init(Arc::clone(&storage)).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer).await;
let chain = BlockchainClient::new(&config.external_rpc).await?;
let chain = Arc::new(BlockchainClient::new(&config.external_rpc).await?);
let cancellation: CancellationToken = signal_handler();

let result = run_importer_online(executor, miner, storage, chain, cancellation, config.sync_interval).await;
Expand All @@ -48,170 +72,220 @@ pub async fn run_importer_online(
executor: Arc<Executor>,
miner: Arc<BlockMiner>,
storage: Arc<StratusStorage>,
chain: BlockchainClient,
chain: Arc<BlockchainClient>,
cancellation: CancellationToken,
sync_interval: Duration,
) -> anyhow::Result<()> {
// start from last imported block
let mut number = storage.read_mined_block_number().await?;
let (data_tx, mut data_rx) = mpsc::channel(10);

if number != BlockNumber::from(0) {
number = number.next();
}

let task_cancellation = cancellation.clone();
tokio::spawn(async move {
prefetch_blocks_and_receipts(number, chain, data_tx, task_cancellation, sync_interval).await;
});
let (backlog_tx, backlog_rx) = mpsc::unbounded_channel();

// spawn block executor:
// it executes and mines blocks and expects to receive them via channel in the correct order.
let executor_cancellation = cancellation.clone();
let task_executor = tokio::spawn(start_block_executor(executor, miner, backlog_rx, executor_cancellation));

// spawn block number:
// it keeps track of the blockchain current block number.
let number_fetcher_chain = Arc::clone(&chain);
let number_fetcher_cancellation = cancellation.clone();
let task_number_fetcher = tokio::spawn(start_number_fetcher(number_fetcher_chain, number_fetcher_cancellation, sync_interval));

// spawn block fetcher:
// it fetches blocks and receipts in parallel and sends them to the executor in the correct order.
// it uses the number fetcher current block to determine if should keep downloading more blocks or not.
let block_fetcher_chain = Arc::clone(&chain);
let block_fetcher_cancellation = cancellation.clone();
let task_block_fetcher = tokio::spawn(start_block_fetcher(block_fetcher_chain, block_fetcher_cancellation, backlog_tx, number));

while let Some((block, receipts)) = data_rx.recv().await {
// await all tasks
try_join!(task_executor, task_block_fetcher, task_number_fetcher)?;
Ok(())
}

// -----------------------------------------------------------------------------
// Executor
// -----------------------------------------------------------------------------

// Executes external blocks and persist them to storage.
async fn start_block_executor(
executor: Arc<Executor>,
miner: Arc<BlockMiner>,
mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec<ExternalReceipt>)>,
cancellation: CancellationToken,
) {
while let Some((block, receipts)) = backlog_rx.recv().await {
if cancellation.is_cancelled() {
tracing::info!("run_importer_online task cancelled, exiting");
tracing::warn!("exiting importer-online block-executor because cancellation");
break;
}

#[cfg(feature = "metrics")]
let start = metrics::now();

executor.reexecute_external(&block, &receipts).await?;

// mine block
miner.mine_external_mixed_and_commit().await?;
// execute and mine
let receipts = ExternalReceipts::from(receipts);
if let Err(e) = executor.reexecute_external(&block, &receipts).await {
tracing::error!(reason = ?e, number = %block.number(), "cancelling importer-online because failed to reexecute block");
cancellation.cancel();
break;
};
if let Err(e) = miner.mine_external_mixed_and_commit().await {
tracing::error!(reason = ?e, number = %block.number(), "cancelling importer-online because failed to mine external block");
cancellation.cancel();
break;
};

#[cfg(feature = "metrics")]
{
metrics::inc_n_importer_online_transactions_total(receipts.len() as u64);
metrics::inc_import_online_mined_block(start.elapsed());
}
}

Ok(())
tracing::warn!("exiting importer-online block-executor because backlog channel was closed by the other side");
}

async fn prefetch_blocks_and_receipts(
mut number: BlockNumber,
chain: BlockchainClient,
data_tx: mpsc::Sender<(ExternalBlock, ExternalReceipts)>,
cancellation: CancellationToken,
sync_interval: Duration,
) {
let buffered_data = Arc::new(RwLock::new(HashMap::new()));
let chain_clone = chain.clone();

// This task will handle the ordered sending of blocks and receipts
{
let buffered_data = Arc::clone(&buffered_data);
tokio::spawn(async move {
let mut next_block_number = number;
loop {
if cancellation.is_cancelled() {
tracing::info!("prefetch_blocks_and_receipts task cancelled, closing channel");
break;
}
let mut data = buffered_data.write().await;

//if it is close to the last block, use the sync interval
match chain_clone.get_current_block_number().await {
Ok(current_block_number) =>
if current_block_number < next_block_number.next() {
sleep(sync_interval).await;
},
Err(e) => {
tracing::error!("failed to get current block number {:?}", e);
sleep(sync_interval).await;
}
}

if let Some((block, receipts)) = data.remove(&next_block_number) {
data_tx.send((block, receipts)).await.expect("Failed to send block and receipts");
next_block_number = next_block_number.next();
}
// -----------------------------------------------------------------------------
// Number fetcher
// -----------------------------------------------------------------------------

/// Retrieves the blockchain current block number.
async fn start_number_fetcher(chain: Arc<BlockchainClient>, cancellation: CancellationToken, sync_interval: Duration) {
loop {
if cancellation.is_cancelled() {
tracing::warn!("exiting importer-online number-fetcher because cancellation");
break;
}

tracing::info!("fetching current block number");
match chain.get_current_block_number().await {
Ok(number) => {
tracing::info!(
%number,
sync_interval = %humantime::Duration::from(sync_interval),
"fetched current block number. awaiting sync interval to retrieve again."
);
RPC_CURRENT_BLOCK.store(number.as_u64(), Ordering::SeqCst);
sleep(sync_interval).await;
}
Err(e) => {
tracing::error!(reason = ?e, "failed to retrieve block number. retrying now.");
}
});
}
}
}

// -----------------------------------------------------------------------------
// Block fetcher
// -----------------------------------------------------------------------------

/// Retrieves blocks and receipts.
async fn start_block_fetcher(
chain: Arc<BlockchainClient>,
cancellation: CancellationToken,
backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec<ExternalReceipt>)>,
mut number: BlockNumber,
) {
loop {
let mut handles = Vec::new();
// Spawn tasks for concurrent fetching
for _ in 0..2 {
// Number of concurrent fetch tasks
let chain = chain.clone();
let buffered_data = Arc::clone(&buffered_data);
handles.push(tokio::spawn(async move {
let block = fetch_block(&chain, number).await.unwrap();
let receipts = fetch_receipts_in_parallel(&chain, &block).await;

let mut data = buffered_data.write().await;
data.insert(number, (block, receipts.clone().into()));
}));
if cancellation.is_cancelled() {
tracing::warn!("exiting importer-online block-fetcher because cancellation");
break;
}

// if we are ahead of current block number, await until we are behind again
let rpc_current_number = RPC_CURRENT_BLOCK.load(Ordering::SeqCst);
if number.as_u64() > rpc_current_number {
yield_now().await;
continue;
}

// we are behind current, so we will fetch multiple blocks in parallel to catch up
let mut block_diff = rpc_current_number.saturating_sub(number.as_u64());
block_diff = min(block_diff, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe

let mut tasks = Vec::with_capacity(block_diff as usize);
while block_diff > 0 {
block_diff -= 1;
tasks.push(fetch_block_and_receipts(Arc::clone(&chain), number));
number = number.next();
}

futures::future::join_all(handles).await;
// keep fetching in order
let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS);
while let Some((block, receipts)) = tasks.next().await {
if backlog_tx.send((block, receipts)).is_err() {
tracing::error!("cancelling importer-online block-fetcher because backlog channel was closed by the other side");
cancellation.cancel();
break;
}
}
}
}

// This is an example helper function to fetch receipts in parallel
async fn fetch_receipts_in_parallel(chain: &BlockchainClient, block: &ExternalBlock) -> Vec<ExternalReceipt> {
let receipts_futures = block.transactions.iter().map(|tx| fetch_receipt(chain, tx.hash()));
futures::future::join_all(receipts_futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.unwrap()
#[tracing::instrument(skip_all)]
async fn fetch_block_and_receipts(chain: Arc<BlockchainClient>, number: BlockNumber) -> (ExternalBlock, Vec<ExternalReceipt>) {
// fetch block
let block = fetch_block(Arc::clone(&chain), number).await;

// fetch receipts in parallel
let mut receipts_tasks = Vec::with_capacity(block.transactions.len());
for hash in block.transactions.iter().map(|tx| tx.hash()) {
receipts_tasks.push(fetch_receipt(Arc::clone(&chain), number, hash));
}
let receipts = futures::stream::iter(receipts_tasks).buffer_unordered(PARALLEL_RECEIPTS).collect().await;

(block, receipts)
}

#[tracing::instrument(skip_all)]
async fn fetch_block(chain: &BlockchainClient, number: BlockNumber) -> anyhow::Result<ExternalBlock> {
let mut delay = 10;
let block = loop {
async fn fetch_block(chain: Arc<BlockchainClient>, number: BlockNumber) -> ExternalBlock {
let mut backoff = 10;
loop {
tracing::info!(%number, "fetching block");
let block = match chain.get_block_by_number(number).await {
Ok(json) => json,
Err(e) => {
tracing::warn!(reason = ?e, "retrying block download because error");
sleep(Duration::from_millis(delay)).await;
delay *= 2;
backoff *= 2;
backoff = min(backoff, 1000); // no more than 1000ms of backoff
tracing::warn!(reason = ?e, %number, %backoff, "failed to retrieve block. retrying with backoff.");
sleep(Duration::from_millis(backoff)).await;
continue;
}
};

if block.is_null() {
#[cfg(not(feature = "perf"))]
{
tracing::warn!(reason = %"null", "retrying block download because block is not mined yet");
tracing::warn!(%number, "block not available yet because block is not mined. retrying now.");
continue;
}

#[cfg(feature = "perf")]
std::process::exit(0);
}

break block;
};

match ExternalBlock::deserialize(&block) {
Ok(block) => Ok(block),
Err(e) => log_and_err!(reason = e, payload = block, "failed to deserialize external block"),
return ExternalBlock::deserialize(&block).expect("cannot fail to deserialize external block");
}
}

#[tracing::instrument(skip_all)]
async fn fetch_receipt(chain: &BlockchainClient, hash: Hash) -> anyhow::Result<ExternalReceipt> {
let receipt = loop {
tracing::info!(%hash, "fetching receipt");
let receipt = chain.get_transaction_receipt(hash).await?;

match receipt {
Some(receipt) => break receipt,
None => {
tracing::warn!(reason = %"null", "retrying receipt download because block is not mined yet");
async fn fetch_receipt(chain: Arc<BlockchainClient>, number: BlockNumber, hash: Hash) -> ExternalReceipt {
loop {
tracing::info!(%number, %hash, "fetching receipt");

match chain.get_transaction_receipt(hash).await {
Ok(Some(receipt)) => return receipt,
Ok(None) => {
tracing::warn!(%number, %hash, "receipt not available yet because block is not mined. retrying now.");
continue;
}
Err(e) => {
tracing::error!(reason = ?e, %number, %hash, "failed to fetch receipt. retrying now.");
}
}
};

Ok(receipt)
}
}
2 changes: 1 addition & 1 deletion src/bin/run_with_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
let miner = config.miner.init(Arc::clone(&storage)).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer).await;
let chain_url = get_chain_url(config.clone());
let chain = BlockchainClient::new(&chain_url).await?;
let chain = Arc::new(BlockchainClient::new(&chain_url).await?);
let rpc_storage = Arc::clone(&storage);
let rpc_executor = Arc::clone(&executor);
let rpc_miner = Arc::clone(&miner);
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ pub struct ImporterOnlineConfig {
#[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")]
pub external_rpc: String,

#[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "600ms")]
#[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "100ms")]
pub sync_interval: Duration,

#[clap(flatten)]
Expand Down

0 comments on commit 1680c95

Please sign in to comment.