Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use timeout instead of yield #960

Merged
merged 4 commits into from
May 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 29 additions & 19 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use stratus::eth::storage::StratusStorage;
use stratus::eth::BlockMiner;
use stratus::eth::Executor;
use stratus::ext::DisplayExt;
use stratus::if_else;
#[cfg(feature = "metrics")]
use stratus::infra::metrics;
use stratus::infra::tracing::warn_task_rx_closed;
Expand All @@ -29,18 +30,23 @@ use stratus::utils::calculate_tps;
use stratus::GlobalServices;
use stratus::GlobalState;
use tokio::sync::mpsc;
use tokio::task::yield_now;
use tokio::time::sleep;
use tokio::time::timeout;

// -----------------------------------------------------------------------------
// Globals
// -----------------------------------------------------------------------------

/// Current block number used by the number fetcher and block fetcher.
///
/// It is a global to avoid unnecessary synchronization using a channel.
static RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0);
/// Current block number of the external RPC blockchain.
static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0);

/// Only sets the external RPC current block number if it is equals or greater than the current one.
fn set_external_rpc_current_block(new_number: BlockNumber) {
let new_number_u64 = new_number.as_u64();
let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| {
if_else!(new_number_u64 >= current_number, Some(new_number_u64), None)
});
}

// -----------------------------------------------------------------------------
// Constants
Expand All @@ -51,11 +57,14 @@ const PARALLEL_BLOCKS: usize = 3;
/// Number of receipts that are downloaded in parallel.
const PARALLEL_RECEIPTS: usize = 100;

/// Timeout for new newHeads event before fallback to polling.
/// Timeout awaiting for newHeads event before fallback to polling.
const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000);

/// Time to wait before we starting retrieving receipts because they are not immediatly available after the block is retrieved.
const BACKOFF_RECEIPTS: Duration = Duration::from_millis(45);
/// Interval before we check again if we are behind the external rpc current block number.
const INTERVAL_CATCH_UP: Duration = Duration::from_millis(20);

/// Interval before we starting retrieving receipts because they are not immediately available after the block is retrieved.
const INTERVAL_FETCH_RECEIPTS: Duration = Duration::from_millis(50);

// -----------------------------------------------------------------------------
// Execution
Expand Down Expand Up @@ -217,7 +226,7 @@ async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Durat
let resubscribe = match timeout(TIMEOUT_NEW_HEADS, sub.next()).await {
Ok(Some(Ok(block))) => {
tracing::info!(number = %block.number(), "newHeads event received");
RPC_CURRENT_BLOCK.store(block.number().as_u64(), Ordering::SeqCst);
set_external_rpc_current_block(block.number());
continue;
}
Ok(None) => {
Expand Down Expand Up @@ -256,7 +265,7 @@ async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Durat
sync_interval = %sync_interval.to_string_ext(),
"fetched current block number via http. awaiting sync interval to retrieve again."
);
RPC_CURRENT_BLOCK.store(number.as_u64(), Ordering::SeqCst);
set_external_rpc_current_block(number);
sleep(sync_interval).await;
}
Err(e) => {
Expand All @@ -274,7 +283,7 @@ async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Durat
async fn start_block_fetcher(
chain: Arc<BlockchainClient>,
backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec<ExternalReceipt>)>,
mut number: BlockNumber,
mut importer_block_number: BlockNumber,
) -> anyhow::Result<()> {
const TASK_NAME: &str = "external-block-fetcher";

Expand All @@ -284,21 +293,22 @@ async fn start_block_fetcher(
}

// if we are ahead of current block number, await until we are behind again
let rpc_current_number = RPC_CURRENT_BLOCK.load(Ordering::SeqCst);
if number.as_u64() > rpc_current_number {
yield_now().await;
let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed);
if importer_block_number.as_u64() > external_rpc_current_block {
sleep(INTERVAL_CATCH_UP).await;
continue;
}

// we are behind current, so we will fetch multiple blocks in parallel to catch up
let mut blocks_to_fetch = rpc_current_number.saturating_sub(number.as_u64()) + 1;
blocks_to_fetch = min(blocks_to_fetch, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe
let blocks_behind = external_rpc_current_block.saturating_sub(importer_block_number.as_u64()) + 1;
let mut blocks_to_fetch = min(blocks_behind, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe
tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks");

let mut tasks = Vec::with_capacity(blocks_to_fetch as usize);
while blocks_to_fetch > 0 {
blocks_to_fetch -= 1;
tasks.push(fetch_block_and_receipts(Arc::clone(&chain), number));
number = number.next();
tasks.push(fetch_block_and_receipts(Arc::clone(&chain), importer_block_number));
importer_block_number = importer_block_number.next();
}

// keep fetching in order
Expand All @@ -318,7 +328,7 @@ async fn fetch_block_and_receipts(chain: Arc<BlockchainClient>, number: BlockNum
let block = fetch_block(Arc::clone(&chain), number).await;

// wait some time until receipts are available
let _ = tokio::time::sleep(BACKOFF_RECEIPTS).await;
let _ = sleep(INTERVAL_FETCH_RECEIPTS).await;

// fetch receipts in parallel
let mut receipts_tasks = Vec::with_capacity(block.transactions.len());
Expand Down
Loading