From 7346e84117ef9c3908b82c4e684b384764e62500 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Tue, 5 Dec 2023 15:21:15 +0100 Subject: [PATCH] refactor create_slot_task --- cluster-endpoints/src/grpc_subscription.rs | 127 +++++++++++---------- 1 file changed, 64 insertions(+), 63 deletions(-) diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index ecf5e389..0f3740e6 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -329,71 +329,12 @@ pub fn create_grpc_subscription( let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10); let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10); - let mut slots = HashMap::new(); - slots.insert( - "client".to_string(), - SubscribeRequestFilterSlots { - filter_by_commitment: Some(true), - }, + let slot_task: AnyhowJoinHandle = create_slot_task( + grpc_addr.clone(), + grpc_x_token.clone(), + slot_sx, ); - let slot_task: AnyhowJoinHandle = { - let grpc_x_token = grpc_x_token.clone(); - let grpc_addr = grpc_addr.clone(); - tokio::spawn(async move { - // connect to grpc - let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token.clone(), None)?; - - let version = client.get_version().await?.version; - if version != expected_grpc_version { - log::warn!( - "Expected grpc version {:?}, got {:?}, continue", - expected_grpc_version, - version - ); - } - let mut stream = client - .subscribe_once( - slots, - Default::default(), - HashMap::new(), - Default::default(), - HashMap::new(), - Default::default(), - Some(CommitmentLevel::Processed), - Default::default(), - None, - ) - .await?; - - while let Some(message) = stream.next().await { - let message = message?; - - let Some(update) = message.update_oneof else { - continue; - }; - - match update { - UpdateOneof::Slot(slot) => { - slot_sx - .send(SlotNotification { - estimated_processed_slot: slot.slot, - processed_slot: slot.slot, - }) - .context("Error sending slot notification")?; - } - UpdateOneof::Ping(_) => { - log::trace!("GRPC Ping"); - } - k => { - bail!("Unexpected update: {k:?}"); - } - }; - } - bail!("geyser slot stream ended"); - }) - }; - let block_confirmed_task: AnyhowJoinHandle = create_block_processing_task( grpc_addr.clone(), grpc_x_token.clone(), @@ -425,3 +366,63 @@ pub fn create_grpc_subscription( ]; Ok((streamers, endpoint_tasks)) } + + +// note: cannot add confirmation level as parameter because the data produced is declared as "processed" +pub fn create_slot_task( + grpc_addr: String, + grpc_x_token: Option, + slot_sx: Sender, +) -> AnyhowJoinHandle { + let mut slots = HashMap::new(); + slots.insert( + "client".to_string(), + SubscribeRequestFilterSlots { + filter_by_commitment: Some(true), + }, + ); + + tokio::spawn(async move { + let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token.clone(), None)?; + + let mut stream = client + .subscribe_once( + slots, + Default::default(), + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + Some(CommitmentLevel::Processed), + Default::default(), + None, + ) + .await?; + + while let Some(message) = stream.next().await { + let message = message?; + + let Some(update) = message.update_oneof else { + continue; + }; + + match update { + UpdateOneof::Slot(slot) => { + slot_sx + .send(SlotNotification { + estimated_processed_slot: slot.slot, + processed_slot: slot.slot, + }) + .context("Error sending slot notification")?; + } + UpdateOneof::Ping(_) => { + log::trace!("GRPC Ping"); + } + k => { + bail!("Unexpected update: {k:?}"); + } + }; + } + bail!("geyser slot stream ended"); + }) +} \ No newline at end of file