Skip to content

Commit

Permalink
Making block subscription processed and moving confirmed block subscr… (
Browse files Browse the repository at this point in the history
#291)

* Making block subscription processed and moving confirmed block subscription to meta

* Sending both processed and confirmed blocks, if block has already been confirmed
  • Loading branch information
godmodegalactus committed Jan 22, 2024
1 parent 6cbccd0 commit 3139970
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 79 deletions.
205 changes: 126 additions & 79 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::grpc_subscription::{
create_block_processing_task, create_slot_stream_task, map_block_update,
};
use anyhow::Context;
use futures::StreamExt;
use futures::{Stream, StreamExt};
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcSourceConfig,
};
Expand All @@ -15,9 +15,10 @@ use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::UnboundedSender;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;

Expand Down Expand Up @@ -50,6 +51,67 @@ impl FromYellowstoneExtractor for BlockMetaHashExtractor {
}
}

fn create_grpc_multiplex_block_stream(
grpc_sources: &Vec<GrpcSourceConfig>,
confirmed_block_sender: UnboundedSender<ProducedBlock>,
) -> Vec<AnyhowJoinHandle> {
let commitment_config = CommitmentConfig::confirmed();

let mut tasks = Vec::new();
let mut streams = vec![];
for grpc_source in grpc_sources {
let (block_sender, block_reciever) = async_channel::unbounded();
tasks.push(create_block_processing_task(
grpc_source.grpc_addr.clone(),
grpc_source.grpc_x_token.clone(),
block_sender,
yellowstone_grpc_proto::geyser::CommitmentLevel::Confirmed,
));
streams.push(block_reciever)
}
let merging_streams: AnyhowJoinHandle = tokio::task::spawn(async move {
let mut slots_processed = BTreeSet::<u64>::new();
loop {
let block_message = futures::stream::select_all(streams.clone()).next().await;
const MAX_SIZE: usize = 1024;
if let Some(block) = block_message {
let slot = block.slot;
// check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value
// it means that the slot is too old to process
if !slots_processed.contains(&slot)
&& (slots_processed.len() < MAX_SIZE / 2
|| slot > slots_processed.first().cloned().unwrap_or_default())
{
confirmed_block_sender
.send(map_block_update(block, commitment_config))
.context("Issue to send confirmed block")?;
slots_processed.insert(slot);
if slots_processed.len() > MAX_SIZE {
slots_processed.pop_first();
}
}
}
}
});
tasks.push(merging_streams);
tasks
}

fn create_grpc_multiplex_block_meta_stream(
grpc_sources: &Vec<GrpcSourceConfig>,
commitment_config: CommitmentConfig,
) -> impl Stream<Item = String> {
let mut streams = Vec::new();
for grpc_source in grpc_sources {
let stream = create_geyser_reconnecting_stream(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_meta(),
);
streams.push(stream);
}
create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config))
}

/// connect to multiple grpc sources to consume confirmed blocks and block status update
pub fn create_grpc_multiplex_blocks_subscription(
grpc_sources: Vec<GrpcSourceConfig>,
Expand All @@ -69,120 +131,105 @@ pub fn create_grpc_multiplex_blocks_subscription(
let jh_block_emitter_task = {
tokio::task::spawn(async move {
loop {
let (confirmed_block_sender, mut confirmed_block_reciever) =
let (processed_block_sender, mut processed_block_reciever) =
tokio::sync::mpsc::unbounded_channel::<ProducedBlock>();
let confirmed_blocks_tasks = {
let commitment_config = CommitmentConfig::confirmed();

let mut tasks = Vec::new();
let mut streams = vec![];
for grpc_source in &grpc_sources {
let (block_sender, block_reciever) = async_channel::unbounded();
tasks.push(create_block_processing_task(
grpc_source.grpc_addr.clone(),
grpc_source.grpc_x_token.clone(),
block_sender,
yellowstone_grpc_proto::geyser::CommitmentLevel::Confirmed,
));
streams.push(block_reciever)
}
let merging_streams: AnyhowJoinHandle = tokio::task::spawn(async move {
let mut slots_processed = BTreeSet::<u64>::new();
loop {
let block_message =
futures::stream::select_all(streams.clone()).next().await;
const MAX_SIZE: usize = 1024;
if let Some(block) = block_message {
let slot = block.slot;
// check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value
// it means that the slot is too old to process
if !slots_processed.contains(&slot)
&& (slots_processed.len() < MAX_SIZE / 2
|| slot
> slots_processed.first().cloned().unwrap_or_default())
{
confirmed_block_sender
.send(map_block_update(block, commitment_config))
.context("Issue to send confirmed block")?;
slots_processed.insert(slot);
if slots_processed.len() > MAX_SIZE {
slots_processed.pop_first();
}
}
}
}
});
tasks.push(merging_streams);
tasks
};

let finalized_blockmeta_stream = {
let commitment_config = CommitmentConfig::finalized();
let confirmed_blocks_tasks =
create_grpc_multiplex_block_stream(&grpc_sources, processed_block_sender);

let mut streams = Vec::new();
for grpc_source in &grpc_sources {
let stream = create_geyser_reconnecting_stream(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_meta(),
);
streams.push(stream);
}
create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config))
};
let confirmed_blockmeta_stream = create_grpc_multiplex_block_meta_stream(
&grpc_sources,
CommitmentConfig::confirmed(),
);
let finalized_blockmeta_stream = create_grpc_multiplex_block_meta_stream(
&grpc_sources,
CommitmentConfig::finalized(),
);

// by blockhash
let mut recent_confirmed_blocks = HashMap::<String, ProducedBlock>::new();
let mut recent_processed_blocks = HashMap::<String, ProducedBlock>::new();
let mut confirmed_blockmeta_stream = std::pin::pin!(confirmed_blockmeta_stream);
let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream);

let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5));
let mut last_finalized_slot: Slot = 0;
let mut cleanup_without_recv_blocks: u8 = 0;
let mut cleanup_without_recv_blocks_meta: u8 = 0;
let mut cleanup_without_confirmed_recv_blocks_meta: u8 = 0;
let mut cleanup_without_finalized_recv_blocks_meta: u8 = 0;
let mut confirmed_block_not_yet_processed = HashSet::<String>::new();

// start logging errors when we recieve first finalized block
let mut finalized_block_recieved = false;
const MAX_ALLOWED_CLEANUP_WITHOUT_RECV: u8 = 12; // 12*5 = 60s without recving data
loop {
tokio::select! {
confirmed_block = confirmed_block_reciever.recv() => {
processed_block = processed_block_reciever.recv() => {
cleanup_without_recv_blocks = 0;

let confirmed_block = confirmed_block.expect("confirmed block from stream");
trace!("got confirmed block {} with blockhash {}",
confirmed_block.slot, confirmed_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(confirmed_block.clone()) {
warn!("Confirmed block channel has no receivers {e:?}");
let processed_block = processed_block.expect("processed block from stream");
trace!("got processed block {} with blockhash {}",
processed_block.slot, processed_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(processed_block.clone()) {
warn!("produced block channel has no receivers {e:?}");
continue
}
recent_confirmed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block);
if confirmed_block_not_yet_processed.remove(&processed_block.blockhash) {
if let Err(e) = producedblock_sender.send(processed_block.to_confirmed_block()) {
warn!("produced block channel has no receivers {e:?}");
continue
}
}
recent_processed_blocks.insert(processed_block.blockhash.clone(), processed_block);
},
meta_confirmed = confirmed_blockmeta_stream.next() => {
cleanup_without_confirmed_recv_blocks_meta = 0;
let blockhash = meta_confirmed.expect("confirmed block meta from stream");
if let Some(cached_processed_block) = recent_processed_blocks.get(&blockhash) {
let confirmed_block = cached_processed_block.to_confirmed_block();
debug!("got confirmed blockmeta {} with blockhash {}",
confirmed_block.slot, confirmed_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(confirmed_block) {
warn!("Finalized block channel has no receivers {e:?}");
continue;
}
} else {
confirmed_block_not_yet_processed.insert(blockhash.clone());
log::info!("confirmed blocks not found : {}", confirmed_block_not_yet_processed.len());
}
},
meta_finalized = finalized_blockmeta_stream.next() => {
cleanup_without_recv_blocks_meta = 0;
cleanup_without_finalized_recv_blocks_meta = 0;
let blockhash = meta_finalized.expect("finalized block meta from stream");
if let Some(cached_confirmed_block) = recent_confirmed_blocks.remove(&blockhash) {
let finalized_block = cached_confirmed_block.to_finalized_block();
if let Some(cached_processed_block) = recent_processed_blocks.remove(&blockhash) {
let finalized_block = cached_processed_block.to_finalized_block();
last_finalized_slot = finalized_block.slot;
finalized_block_recieved = true;
debug!("got finalized blockmeta {} with blockhash {}",
finalized_block.slot, finalized_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(finalized_block) {
warn!("Finalized block channel has no receivers {e:?}");
continue;
}
} else {
} else if finalized_block_recieved {
// this warning is ok for first few blocks when we start lrpc
warn!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
log::error!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
}
},
_ = cleanup_tick.tick() => {
if cleanup_without_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
cleanup_without_recv_blocks > MAX_ALLOWED_CLEANUP_WITHOUT_RECV {
if cleanup_without_finalized_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
cleanup_without_recv_blocks > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
cleanup_without_confirmed_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV {
log::error!("block or block meta stream stopped restaring blocks");
break;
}
cleanup_without_recv_blocks += 1;
cleanup_without_recv_blocks_meta += 1;
let size_before = recent_confirmed_blocks.len();
recent_confirmed_blocks.retain(|_blockhash, block| {
cleanup_without_finalized_recv_blocks_meta += 1;
cleanup_without_confirmed_recv_blocks_meta += 1;
let size_before = recent_processed_blocks.len();
recent_processed_blocks.retain(|_blockhash, block| {
last_finalized_slot == 0 || block.slot > last_finalized_slot - 100
});
let cnt_cleaned = size_before - recent_confirmed_blocks.len();
let cnt_cleaned = size_before - recent_processed_blocks.len();
if cnt_cleaned > 0 {
debug!("cleaned {} confirmed blocks from cache", cnt_cleaned);
}
Expand Down
8 changes: 8 additions & 0 deletions core/src/structures/produced_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ impl ProducedBlock {
..self.clone()
}
}

/// moving commitment level to finalized
pub fn to_confirmed_block(&self) -> Self {
ProducedBlock {
commitment_config: CommitmentConfig::confirmed(),
..self.clone()
}
}
}

#[inline]
Expand Down

0 comments on commit 3139970

Please sign in to comment.