Skip to content

Commit

Permalink
refactor create_slot_task
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Dec 5, 2023
1 parent fe8743e commit 7346e84
Showing 1 changed file with 64 additions and 63 deletions.
127 changes: 64 additions & 63 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,71 +329,12 @@ pub fn create_grpc_subscription(
let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10);

Check warning on line 329 in cluster-endpoints/src/grpc_subscription.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/grpc_subscription.rs
let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10);

let mut slots = HashMap::new();
slots.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
let slot_task: AnyhowJoinHandle = create_slot_task(
grpc_addr.clone(),
grpc_x_token.clone(),
slot_sx,
);

let slot_task: AnyhowJoinHandle = {
let grpc_x_token = grpc_x_token.clone();
let grpc_addr = grpc_addr.clone();
tokio::spawn(async move {
// connect to grpc
let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token.clone(), None)?;

let version = client.get_version().await?.version;
if version != expected_grpc_version {
log::warn!(
"Expected grpc version {:?}, got {:?}, continue",
expected_grpc_version,
version
);
}
let mut stream = client
.subscribe_once(
slots,
Default::default(),
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
Some(CommitmentLevel::Processed),
Default::default(),
None,
)
.await?;

while let Some(message) = stream.next().await {
let message = message?;

let Some(update) = message.update_oneof else {
continue;
};

match update {
UpdateOneof::Slot(slot) => {
slot_sx
.send(SlotNotification {
estimated_processed_slot: slot.slot,
processed_slot: slot.slot,
})
.context("Error sending slot notification")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
k => {
bail!("Unexpected update: {k:?}");
}
};
}
bail!("geyser slot stream ended");
})
};

let block_confirmed_task: AnyhowJoinHandle = create_block_processing_task(
grpc_addr.clone(),
grpc_x_token.clone(),
Expand Down Expand Up @@ -425,3 +366,63 @@ pub fn create_grpc_subscription(
];

Check warning on line 366 in cluster-endpoints/src/grpc_subscription.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/grpc_subscription.rs
Ok((streamers, endpoint_tasks))
}


// note: cannot add confirmation level as parameter because the data produced is declared as "processed"
pub fn create_slot_task(
grpc_addr: String,
grpc_x_token: Option<String>,
slot_sx: Sender<SlotNotification>,
) -> AnyhowJoinHandle {
let mut slots = HashMap::new();
slots.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);

tokio::spawn(async move {
let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token.clone(), None)?;

let mut stream = client
.subscribe_once(
slots,
Default::default(),
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
Some(CommitmentLevel::Processed),
Default::default(),
None,
)
.await?;

while let Some(message) = stream.next().await {
let message = message?;

let Some(update) = message.update_oneof else {
continue;
};

match update {
UpdateOneof::Slot(slot) => {
slot_sx
.send(SlotNotification {
estimated_processed_slot: slot.slot,
processed_slot: slot.slot,
})
.context("Error sending slot notification")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
k => {
bail!("Unexpected update: {k:?}");
}
};
}
bail!("geyser slot stream ended");

Check warning on line 426 in cluster-endpoints/src/grpc_subscription.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/cluster-endpoints/src/grpc_subscription.rs
})
}

0 comments on commit 7346e84

Please sign in to comment.