Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pmantica11 committed Dec 13, 2024
1 parent b1c7beb commit ceb621a
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 20 deletions.
13 changes: 13 additions & 0 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ pub struct ConfigGrpc {
with = "humantime_serde"
)]
pub filter_names_cleanup_interval: Duration,
/// Disconnect if node is lagging behind
#[serde(default)]
pub force_disconnect_if_node_is_unhealthy: bool,
/// RPC port to use for health monitoring
pub rpc_port: Option<u16>,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigHealthMonitor {
/// Recommended to set as same threshold as the RPC
pub max_slot_behind_threshold: u64,
pub rpc_port: u16,
}

impl ConfigGrpc {
Expand Down
16 changes: 12 additions & 4 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
config::{ConfigBlockFailAction, ConfigGrpc, ConfigGrpcFilters},
filters::Filter,
metrics::{self, DebugClientMessage},
monitor::{HEALTH_CHECK_SLOT_DISTANCE, NUM_SLOTS_BEHIND},
monitor::SHOULD_DISCONNECT,
version::GrpcVersionInfo,
},
anyhow::Context,
Expand Down Expand Up @@ -839,10 +839,12 @@ impl GrpcService {
}
}
message = messages_rx.recv() => {
let num_slots_behind = NUM_SLOTS_BEHIND.load(Ordering::SeqCst);
if num_slots_behind > HEALTH_CHECK_SLOT_DISTANCE {
if SHOULD_DISCONNECT.load(Ordering::SeqCst) {
error!("gRPC node is lagging behind. Disconnecting client #{id}");
stream_tx.send(Err(Status::internal("Node is significantly behind the chain tip. Disconnecting to maintain service quality. Please reconnect - you will be automatically routed to a healthy node if using a load balancer."))).await.unwrap();
stream_tx
.send(Err(Status::internal("Disconnecting since node is lagging behind. If you are connected through a load balancer, please try reconnecting. You might be automatically routed to a healthy node.")))
.await
.unwrap();
break 'outer;
}

Expand Down Expand Up @@ -972,6 +974,12 @@ impl Geyser for GrpcService {
&self,
mut request: Request<Streaming<SubscribeRequest>>,
) -> TonicResult<Response<Self::SubscribeStream>> {
if SHOULD_DISCONNECT.load(Ordering::SeqCst) {
error!("gRPC node is lagging behind. Preventing client from connecting.");
return Err(Status::internal(
"Node is lagging behind. If you are connected through a load balancer, please try reconnecting. You might be automatically routed to a healthy node.",
));
}
let id = self.subscribe_id.fetch_add(1, Ordering::Relaxed);

let x_request_snapshot = request.metadata().contains_key("x-request-snapshot");
Expand Down
22 changes: 10 additions & 12 deletions yellowstone-grpc-geyser/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
Expand All @@ -11,23 +11,21 @@ use solana_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::{client_error, request};
use tokio::time::interval;

pub const HEALTH_CHECK_SLOT_DISTANCE: u64 = 100;
pub static NUM_SLOTS_BEHIND: Lazy<Arc<AtomicU64>> = Lazy::new(|| Arc::new(AtomicU64::new(0)));
pub static SHOULD_DISCONNECT: Lazy<Arc<AtomicBool>> =
Lazy::new(|| Arc::new(AtomicBool::new(false)));

pub async fn fetch_node_blocks_behind_with_infinite_retry(client: &RpcClient) -> u64 {
pub async fn is_node_healthy(client: &RpcClient) -> bool {
loop {
match client.get_health().await {
Ok(()) => {
return 0;
}
Ok(()) => return true,
Err(err) => {
if let client_error::ErrorKind::RpcError(request::RpcError::RpcResponseError {
code: _,
message: _,
data: request::RpcResponseErrorData::NodeUnhealthy { num_slots_behind },
data: request::RpcResponseErrorData::NodeUnhealthy { .. },
}) = &err.kind
{
return num_slots_behind.unwrap_or(2000);
return false;
} else {
log::error!("Failed to get health: {}", err);
tokio::time::sleep(Duration::from_secs(5)).await;
Expand All @@ -38,11 +36,11 @@ pub async fn fetch_node_blocks_behind_with_infinite_retry(client: &RpcClient) ->
}
}

pub async fn keep_track_of_node_health(rpc_client: RpcClient) {
pub async fn run_forced_disconnection_monitor(rpc_client: RpcClient) {
let mut interval = interval(Duration::from_millis(100));
loop {
interval.tick().await;
let blocks_behind = fetch_node_blocks_behind_with_infinite_retry(&rpc_client).await;
NUM_SLOTS_BEHIND.store(blocks_behind, Ordering::SeqCst);
let is_healthy = !is_node_healthy(&rpc_client).await;
SHOULD_DISCONNECT.store(is_healthy, Ordering::SeqCst);
}
}
14 changes: 10 additions & 4 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
config::Config,
grpc::GrpcService,
metrics::{self, PrometheusService},
monitor::keep_track_of_node_health,
monitor::run_forced_disconnection_monitor,
},
agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
Expand Down Expand Up @@ -77,9 +77,15 @@ impl GeyserPlugin for Plugin {
.build()
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;

// Monitor node health
let rpc_client = RpcClient::new("http://127.0.0.1:8899".to_string());
runtime.spawn(keep_track_of_node_health(rpc_client));
if config.grpc.force_disconnect_if_node_is_unhealthy {
let rpc_client = RpcClient::new(format!(
"http://127.0.0.1:{}",
config.grpc.rpc_port.expect(
"RPC port is required when force_disconnect_if_node_is_unhealthy is true"
)
));
runtime.spawn(run_forced_disconnection_monitor(rpc_client));
}

let (snapshot_channel, grpc_channel, grpc_shutdown, prometheus) =
runtime.block_on(async move {
Expand Down

0 comments on commit ceb621a

Please sign in to comment.