diff --git a/src/ingester/fetchers/grpc.rs b/src/ingester/fetchers/grpc.rs index 5ca7a9c..05033ee 100644 --- a/src/ingester/fetchers/grpc.rs +++ b/src/ingester/fetchers/grpc.rs @@ -28,7 +28,7 @@ use yellowstone_grpc_proto::solana::storage::confirmed_block::InnerInstructions; use crate::api::method::get_indexer_health::HEALTH_CHECK_SLOT_DISTANCE; use crate::common::typedefs::hash::Hash; use crate::common::typedefs::rpc_client_with_uri::RpcClientWithUri; -use crate::ingester::fetchers::poller::get_poller_block_stream; +use crate::ingester::fetchers::poller::get_block_poller_stream; use crate::ingester::typedefs::block_info::{ BlockInfo, BlockMetadata, Instruction, InstructionGroup, TransactionInfo, }; @@ -48,7 +48,7 @@ pub fn get_grpc_stream_with_rpc_fallback( let grpc_stream = get_grpc_block_stream(endpoint, auth_header); pin_mut!(grpc_stream); let mut rpc_poll_stream: Option> + Send>>> = Some( - Box::pin(get_poller_block_stream( + Box::pin(get_block_poller_stream( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, @@ -103,7 +103,7 @@ pub fn get_grpc_stream_with_rpc_fallback( statsd_count!("grpc_timeout", 1); } info!("gRPC stream timed out, enabling RPC block fetching"); - rpc_poll_stream = Some(Box::pin(get_poller_block_stream( + rpc_poll_stream = Some(Box::pin(get_block_poller_stream( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, @@ -120,7 +120,7 @@ pub fn get_grpc_stream_with_rpc_fallback( statsd_count!("grpc_out_of_order", 1); } info!("Switching to RPC block fetching"); - rpc_poll_stream = Some(Box::pin(get_poller_block_stream( + rpc_poll_stream = Some(Box::pin(get_block_poller_stream( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, @@ -132,7 +132,7 @@ pub fn get_grpc_stream_with_rpc_fallback( metric! { statsd_count!("grpc_stale", 1); } - rpc_poll_stream = Some(Box::pin(get_poller_block_stream( + rpc_poll_stream = Some(Box::pin(get_block_poller_stream( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, diff --git a/src/ingester/fetchers/mod.rs b/src/ingester/fetchers/mod.rs index fe4a4eb..dccb8b8 100644 --- a/src/ingester/fetchers/mod.rs +++ b/src/ingester/fetchers/mod.rs @@ -11,7 +11,7 @@ pub mod grpc; pub mod poller; use grpc::get_grpc_stream_with_rpc_fallback; -use poller::get_poller_block_stream; +use poller::get_block_poller_stream; pub struct BlockStreamConfig { pub rpc_client: Arc, @@ -34,7 +34,7 @@ impl BlockStreamConfig { }); let poller_stream = if self.geyser_url.is_none() { - Some(get_poller_block_stream( + Some(get_block_poller_stream( self.rpc_client.clone(), self.last_indexed_slot, self.max_concurrent_block_fetches, diff --git a/src/ingester/fetchers/poller.rs b/src/ingester/fetchers/poller.rs index 31891be..5bdc538 100644 --- a/src/ingester/fetchers/poller.rs +++ b/src/ingester/fetchers/poller.rs @@ -1,18 +1,14 @@ use std::{ - cmp::max, - collections::{BTreeMap, HashSet}, - num::NonZeroUsize, + collections::BTreeMap, sync::{atomic::Ordering, Arc}, time::Duration, }; use async_stream::stream; use cadence_macros::statsd_count; -use futures::{stream::FuturesUnordered, StreamExt}; -use lru::LruCache; +use futures::{pin_mut, Stream, StreamExt}; use solana_client::{ - client_error, nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig, - rpc_request::RpcError, + nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig, rpc_request::RpcError, }; use solana_sdk::commitment_config::CommitmentConfig; @@ -26,148 +22,58 @@ use crate::{ }; const SKIPPED_BLOCK_ERRORS: [i64; 2] = [-32007, -32009]; -const RETRIES: u64 = 3; -const INFINITY: u64 = u64::MAX; -/// This function creates a stream that continuously fetches and emits blocks from a Solana blockchain. -/// It implements a concurrent block fetching algorithm with the following key features: -/// -/// 1. Concurrent block fetching: It can fetch multiple blocks simultaneously up to a specified limit. -/// 2. Block caching: Fetched blocks are cached if they can't be immediately processed. -/// 3. Skipped slot handling: It keeps track of skipped slots to avoid unnecessary fetching attempts. -/// 4. Ordered block emission: It ensures blocks are emitted in the correct order, even if fetched out of order. -/// -/// Algorithm overview: -/// - Initialize data structures for block fetching, caching, and tracking. -/// - Enter a loop that continues indefinitely: -/// a. Fetch the current latest slot. -/// b. Initiate new block fetches up to the concurrent limit. -/// c. Process completed block fetches: -/// - If the block is the next in sequence, emit it along with any cached blocks that follow. -/// - If not, cache the block and fetch its parent if necessary. -/// d. Refill the block fetching queue. -/// e. Brief sleep to allow other threads to update the latest slot. -/// -/// This approach allows for efficient block fetching while maintaining the correct order of block processing. -pub fn get_poller_block_stream( - client: Arc, +fn get_slot_stream(rpc_client: Arc, start_slot: u64) -> impl Stream { + stream! { + start_latest_slot_updater(rpc_client.clone()).await; + let mut next_slot_to_fetch = start_slot; + loop { + if next_slot_to_fetch > LATEST_SLOT.load(Ordering::SeqCst) { + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + continue; + } + yield next_slot_to_fetch; + next_slot_to_fetch += 1; + } + } +} + +pub fn get_block_poller_stream( + rpc_client: Arc, mut last_indexed_slot: u64, max_concurrent_block_fetches: usize, -) -> impl futures::Stream> { +) -> impl Stream> { stream! { - start_latest_slot_updater(client.clone()).await; - let mut block_fetching_futures = FuturesUnordered::new(); - let mut block_cache: BTreeMap = BTreeMap::new(); - let mut in_process_slots = HashSet::new(); - let mut next_slot_to_fetch = match last_indexed_slot { + let start_slot = match last_indexed_slot { 0 => 0, last_indexed_slot => last_indexed_slot + 1 }; - let mut skipped_slot_cache = LruCache::new(NonZeroUsize::new(1000).unwrap()); - - - loop { - let current_slot = LATEST_SLOT.load(Ordering::SeqCst); - - // Refill the block fetching futures with new slots to fetch - // - // If we just continued from the last indexed slot + 1, we might get into an infinite retry loop when - // we encounter a large number of skipped slots. To avoid that, when we refill the block fetching - // futures, we continue from the max of the block fetched in the previous outer loop iteration and - // the last indexed slot + 1. - next_slot_to_fetch = max(next_slot_to_fetch, last_indexed_slot + 1); - for _ in 0..max_concurrent_block_fetches { - if next_slot_to_fetch > current_slot { - break; - } - if !skipped_slot_cache.contains(&next_slot_to_fetch) && is_slot_unprocessed(next_slot_to_fetch, &in_process_slots, &block_cache, last_indexed_slot) { - block_fetching_futures.push(fetch_block( - client.uri.clone(), - next_slot_to_fetch, - RETRIES - )); - in_process_slots.insert(next_slot_to_fetch); - } - next_slot_to_fetch += 1; + let slot_stream = get_slot_stream(rpc_client.clone(), start_slot); + pin_mut!(slot_stream); + let block_stream = slot_stream + .map(|slot| { + let rpc_client = rpc_client.clone(); + async move { fetch_block_with_infinite_retries(rpc_client.uri.clone(), slot).await } + }) + .buffer_unordered(max_concurrent_block_fetches); + pin_mut!(block_stream); + let mut block_cache: BTreeMap = BTreeMap::new(); + while let Some(block) = block_stream.next().await { + if let Some(block) = block { + block_cache.insert(block.metadata.slot, block); } - - while let Some(block) = block_fetching_futures.next().await { - let (block_result, slot) = block; - in_process_slots.remove(&slot); - if let Ok(block) = block_result { - if let None = block { - skipped_slot_cache.push(slot, true); - continue; - } - let block = block.unwrap(); - - let current_slot = LATEST_SLOT.load(Ordering::SeqCst); - - // If the block is the next block to index, emit it and the consecutive blocks in the block cache - if block.metadata.slot == 0 || block.metadata.parent_slot == last_indexed_slot { - last_indexed_slot = block.metadata.slot; - let mut blocks_to_index = vec![block]; - let (cached_blocks_to_index, last_indexed_slot_from_cache) = pop_cached_blocks_to_index(&mut block_cache, last_indexed_slot); - last_indexed_slot = last_indexed_slot_from_cache; - blocks_to_index.extend(cached_blocks_to_index); - let blocks_to_index_len = blocks_to_index.len(); - metric! { - statsd_count!("rpc_block_emitted", blocks_to_index_len as i64); - } - yield blocks_to_index; - } - else { - let parent_slot = block.metadata.parent_slot; - - // If the parent block is not processed, fetch it - if is_slot_unprocessed(parent_slot, &in_process_slots, &block_cache, last_indexed_slot) { - block_fetching_futures.push(fetch_block( - client.uri.clone(), - parent_slot, - INFINITY - )); - in_process_slots.insert(parent_slot); - } - block_cache.insert(block.metadata.slot, block.clone()); - } - - // Refill the block fetching futures with new slots to fetch - for next_slot in (last_indexed_slot + 1)..(slot + 1 + max_concurrent_block_fetches as u64) { - if in_process_slots.len() >= max_concurrent_block_fetches { - break; - } - if next_slot > current_slot { - break; - } - if !skipped_slot_cache.contains(&next_slot) && is_slot_unprocessed(next_slot, &in_process_slots, &block_cache, last_indexed_slot) { - block_fetching_futures.push(fetch_block( - client.uri.clone(), - next_slot, - RETRIES - )); - in_process_slots.insert(next_slot); - } - } - } + let (blocks_to_index, last_indexed_slot_from_cache) = pop_cached_blocks_to_index(&mut block_cache, last_indexed_slot); + last_indexed_slot = last_indexed_slot_from_cache; + metric! { + statsd_count!("rpc_block_emitted", blocks_to_index.len() as i64); + } + if !blocks_to_index.is_empty() { + yield blocks_to_index; } - // Sleep to give the chance to other thread to update LATEST_SLOT - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - } } } -fn is_slot_unprocessed( - slot: u64, - in_process_slots: &HashSet, - block_cache: &BTreeMap, - last_indexed_slot: u64, -) -> bool { - !in_process_slots.contains(&slot) - && !block_cache.contains_key(&slot) - && slot > last_indexed_slot -} - fn pop_cached_blocks_to_index( block_cache: &mut BTreeMap, mut last_indexed_slot: u64, @@ -196,14 +102,11 @@ fn pop_cached_blocks_to_index( (blocks, last_indexed_slot) } -pub async fn fetch_block( - rpc_uri: String, - slot: u64, - retries: u64, -) -> (Result, client_error::ClientError>, u64) { +pub async fn fetch_block_with_infinite_retries(rpc_uri: String, slot: u64) -> Option { let mut attempt_counter = 0; loop { let timeout_sec = if attempt_counter <= 1 { 5 } else { 30 }; + attempt_counter += 1; let client = RpcClient::new_with_timeout_and_commitment( rpc_uri.clone(), Duration::from_secs(timeout_sec), @@ -226,10 +129,7 @@ pub async fn fetch_block( metric! { statsd_count!("rpc_block_fetched", 1); } - return ( - Ok(Some(parse_ui_confirmed_blocked(block, slot).unwrap())), - slot, - ); + return Some(parse_ui_confirmed_blocked(block, slot).unwrap()); } Err(e) => { if let solana_client::client_error::ClientErrorKind::RpcError( @@ -237,27 +137,16 @@ pub async fn fetch_block( ) = e.kind { if SKIPPED_BLOCK_ERRORS.contains(&code) { - if retries == INFINITY { - log::error!( - "Got skipped block error for block that is supposed to exist: {}", - slot - ); - } else { - metric! { - statsd_count!("rpc_skipped_block", 1); - } - log::info!("Skipped block: {}", slot); + metric! { + statsd_count!("rpc_skipped_block", 1); } - return (Ok(None), slot); + log::info!("Skipped block: {}", slot); + return None; } } log::debug!("Failed to fetch block: {}. {}", slot, e.to_string()); - attempt_counter += 1; - if attempt_counter >= retries { - metric! { - statsd_count!("rpc_block_fetch_failed", 1); - } - return (Err(e), slot); + metric! { + statsd_count!("rpc_block_fetch_failed", 1); } } } diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index 074e5f4..24bef8f 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -7,7 +7,7 @@ use std::{ }; use cadence_macros::{statsd_count, statsd_gauge}; -use log::error; +use log::{error, info}; use once_cell::sync::Lazy; use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use solana_client::nonblocking::rpc_client::RpcClient; @@ -28,13 +28,10 @@ use light_concurrent_merkle_tree::copy::ConcurrentMerkleTreeCopy; use light_concurrent_merkle_tree::light_hasher::Poseidon; use light_sdk::state::MerkleTreeMetadata; - use crate::common::typedefs::hash::Hash; - use solana_sdk::account::Account as SolanaAccount; - use solana_sdk::pubkey::Pubkey; use std::mem; const CHUNK_SIZE: usize = 100; @@ -73,6 +70,7 @@ pub fn continously_monitor_photon( if lag < HEALTH_CHECK_SLOT_DISTANCE as u64 { has_been_healthy = true; } + info!("Indexing lag: {}", lag); if has_been_healthy && lag > HEALTH_CHECK_SLOT_DISTANCE as u64 { error!("Indexing lag is too high: {}", lag); continue;