Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jan 8, 2024
1 parent e42db51 commit 9ef773c
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 38 deletions.
88 changes: 50 additions & 38 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use crate::grpc_stream_utils::channelize_stream;
use crate::grpc_subscription::map_block_update;
use futures::StreamExt;
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig,
create_geyser_reconnecting_stream, GeyserFilter, GrpcSourceConfig,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use log::info;
use log::{debug, info, trace};
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
use solana_lite_rpc_core::AnyhowJoinHandle;
Expand Down Expand Up @@ -36,13 +36,12 @@ impl FromYellowstoneExtractor for BlockExtractor {
}
}

struct BlockHashExtractor(CommitmentConfig);
struct BlockMetaHashExtractor(CommitmentConfig);

impl FromYellowstoneExtractor for BlockHashExtractor {
impl FromYellowstoneExtractor for BlockMetaHashExtractor {
type Target = String;
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(u64, String)> {
match update.update_oneof {
Some(UpdateOneof::Block(block)) => Some((block.slot, block.blockhash)),
Some(UpdateOneof::BlockMeta(block_meta)) => {
Some((block_meta.slot, block_meta.blockhash))
}
Expand All @@ -51,6 +50,7 @@ impl FromYellowstoneExtractor for BlockHashExtractor {
}
}

/// connect to multiple grpc sources to consume confirmed blocks and block status update
pub fn create_grpc_multiplex_blocks_subscription(
grpc_sources: Vec<GrpcSourceConfig>,
) -> (Receiver<ProducedBlock>, AnyhowJoinHandle) {
Expand All @@ -62,13 +62,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
info!("- connection to {}", grpc_source);
}

let _timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
};

let multiplex_stream_confirmed = {
let confirmed_blocks_stream = {
let commitment_config = CommitmentConfig::confirmed();

let mut streams = Vec::new();
Expand All @@ -83,10 +77,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
create_multiplexed_stream(streams, BlockExtractor(commitment_config))
};

let (sender, multiplexed_merged_blocks) =
tokio::sync::broadcast::channel::<ProducedBlock>(1000);

let meta_stream_finalized = {
let finalized_blockmeta_stream = {
let commitment_config = CommitmentConfig::finalized();

let mut streams = Vec::new();
Expand All @@ -97,43 +88,64 @@ pub fn create_grpc_multiplex_blocks_subscription(
);
streams.push(stream);
}
create_multiplexed_stream(streams, BlockHashExtractor(commitment_config))
create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config))
};
let jh_channelizer = {
// spawn merged

// return value is the broadcast receiver
let (producedblock_sender, blocks_output_stream) =
tokio::sync::broadcast::channel::<ProducedBlock>(1000);

let jh_block_emitter_task = {
tokio::task::spawn(async move {
let mut map_of_confimed_blocks = HashMap::<String, ProducedBlock>::new();
let mut multiplex_stream_confirmed = std::pin::pin!(multiplex_stream_confirmed);
let mut meta_stream_finalized = std::pin::pin!(meta_stream_finalized);
let sender = sender;
// by blockhash
let mut recent_confirmed_blocks = HashMap::<String, ProducedBlock>::new();
let mut confirmed_blocks_stream = std::pin::pin!(confirmed_blocks_stream);
let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream);

let sender = producedblock_sender;
let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5));
let mut current_slot: Slot = 0;
loop {
tokio::select! {
confirmed_block = multiplex_stream_confirmed.next() => {
if let Some(confirmed_block) = confirmed_block {
if let Err(e) = sender.send(confirmed_block.clone()) {
panic!("Confirmed block stream send gave error {e:?}");
confirmed_block = confirmed_blocks_stream.next() => {
let confirmed_block = confirmed_block.expect("confirmed block from stream");
current_slot = confirmed_block.slot;
trace!("got confirmed block {} with blockhash {}",
confirmed_block.slot, confirmed_block.blockhash.clone());
if let Err(e) = sender.send(confirmed_block.clone()) {
panic!("Confirmed block stream send gave error {e:?}");
}
recent_confirmed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block);
},
meta_finalized = finalized_blockmeta_stream.next() => {
let blockhash = meta_finalized.expect("finalized block meta from stream");
if let Some(cached_confirmed_block) = recent_confirmed_blocks.remove(&blockhash) {
let finalized_block = cached_confirmed_block.to_finalized_block();
debug!("got finalized blockmeta {} with blockhash {}",
finalized_block.slot, finalized_block.blockhash.clone());
if let Err(e) = sender.send(finalized_block) {
panic!("Finalized block stream send gave error {e:?}");
}
map_of_confimed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block);
} else {
panic!("Confirmed stream broke");
debug!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
}
},
meta_finalized = meta_stream_finalized.next() => {
if let Some(blockhash) = meta_finalized {
if let Some(mut finalized_block) = map_of_confimed_blocks.remove(&blockhash) {
finalized_block.commitment_config = CommitmentConfig::finalized();
if let Err(e) = sender.send(finalized_block.clone()) {
panic!("Finalized block stream send gave error {e:?}");
}
}
_ = cleanup_tick.tick() => {
let size_before = recent_confirmed_blocks.len();
recent_confirmed_blocks.retain(|_blockhash, block| {
block.slot > current_slot - 100 // must be greater than finalized slot distance (31)
});
let cleaned = size_before - recent_confirmed_blocks.len();
if cleaned > 0 {
debug!("cleaned {} confirmed blocks from cache", cleaned);
}
}
}
}
})
};

(multiplexed_merged_blocks, jh_channelizer)
(blocks_output_stream, jh_block_emitter_task)
}

struct SlotExtractor {}
Expand Down
8 changes: 8 additions & 0 deletions core/src/structures/produced_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,12 @@ impl ProducedBlock {
rewards,
}
}

/// moving commitment level to finalized
pub fn to_finalized_block(&self) -> Self {
ProducedBlock {
commitment_config: CommitmentConfig::finalized(),
..self.clone()
}
}
}

0 comments on commit 9ef773c

Please sign in to comment.