Skip to content

Commit

Permalink
Adding some more metrics for quic tpu, and changing some parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Mar 18, 2024
1 parent b51a9ab commit 85fcb60
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 110 deletions.
147 changes: 74 additions & 73 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 14 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ license = "AGPL"
edition = "2021"

[workspace.dependencies]
solana-sdk = "~1.17.15"
solana-rpc-client = "~1.17.15"
solana-rpc-client-api = "~1.17.15"
solana-transaction-status = "~1.17.15"
solana-version = "~1.17.15"
solana-client = "~1.17.15"
solana-net-utils = "~1.17.15"
solana-pubsub-client = "~1.17.15"
solana-streamer = "~1.17.15"
solana-account-decoder = "~1.17.15"
solana-ledger = "~1.17.15"
solana-program = "~1.17.15"
solana-sdk = "~1.17.25"
solana-rpc-client = "~1.17.25"
solana-rpc-client-api = "~1.17.25"
solana-transaction-status = "~1.17.25"
solana-version = "~1.17.25"
solana-client = "~1.17.25"
solana-net-utils = "~1.17.25"
solana-pubsub-client = "~1.17.25"
solana-streamer = "~1.17.25"
solana-account-decoder = "~1.17.25"
solana-ledger = "~1.17.25"
solana-program = "~1.17.25"
itertools = "0.10.5"
rangetools = "0.1.4"
serde = { version = "1.0.160", features = ["derive"] }
Expand Down Expand Up @@ -73,5 +73,5 @@ solana-lite-rpc-stakevote = {path = "stake_vote", version="0.2.4"}
solana-lite-rpc-block-priofees = {path = "block_priofees", version="0.2.4"}

async-trait = "0.1.68"
yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.13.0+solana.1.17.25" }
yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.13.0+solana.1.17.25" }
2 changes: 1 addition & 1 deletion cluster-endpoints/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "AGPL"

[dependencies]
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
geyser-grpc-connector = { tag = "v1.17.15", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
geyser-grpc-connector = { git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git", branch = "v1.13.0+solana.1.17.25" }

solana-sdk = { workspace = true }
solana-rpc-client-api = { workspace = true }
Expand Down
5 changes: 2 additions & 3 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ use crate::grpc_subscription::{
};
use anyhow::Context;
use futures::{Stream, StreamExt};
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcSourceConfig,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::{GeyserFilter, GrpcSourceConfig};
use log::{debug, info, trace, warn};
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
Expand Down
2 changes: 1 addition & 1 deletion cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};
use anyhow::Context;
use futures::StreamExt;
use geyser_grpc_connector::grpc_subscription_autoreconnect::GrpcSourceConfig;
use geyser_grpc_connector::GrpcSourceConfig;
use itertools::Itertools;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{
Expand Down
2 changes: 1 addition & 1 deletion cluster-endpoints/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ pub mod json_rpc_leaders_getter;
pub mod json_rpc_subscription;
pub mod rpc_polling;

pub use geyser_grpc_connector::grpc_subscription_autoreconnect;
pub use geyser_grpc_connector;
pub use yellowstone_grpc_proto::geyser::CommitmentLevel;
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ chrono = { workspace = true }
rustls = { workspace = true }
async-trait = { workspace = true }
itertools = { workspace = true }
prometheus = { workspace = true }

[dev-dependencies]
rand = "0.8.5"
2 changes: 1 addition & 1 deletion core/src/structures/prioritization_fee_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ mod tests {
};
p_heap.insert(info).await;
}
tokio::time::sleep(Duration::from_millis(1)).await;
tokio::time::sleep(Duration::from_millis(10)).await;
p_heap.remove_expired_transactions(height).await;
height += 1;
}
Expand Down
51 changes: 49 additions & 2 deletions lite-rpc/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::env;
use std::{env, time::Duration};

use crate::{
DEFAULT_FANOUT_SIZE, DEFAULT_GRPC_ADDR, DEFAULT_RETRY_TIMEOUT, DEFAULT_RPC_ADDR,
Expand All @@ -8,6 +8,7 @@ use anyhow::Context;
use clap::Parser;
use dotenv::dotenv;
use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig;
use solana_lite_rpc_services::quic_connection_utils::QuicConnectionParameters;

#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -67,6 +68,9 @@ pub struct Config {
/// postgres config
#[serde(default)]
pub postgres: Option<PostgresSessionConfig>,

#[serde(default)]
pub quic_connection_parameters: Option<QuicConnectionParameters>,
}

impl Config {
Expand Down Expand Up @@ -174,7 +178,9 @@ impl Config {
.unwrap_or(config.grpc_x_token4);

config.postgres = PostgresSessionConfig::new_from_env()?.or(config.postgres);

config.quic_connection_parameters = config
.quic_connection_parameters
.or(quic_params_from_environment());
Ok(config)
}

Expand Down Expand Up @@ -256,3 +262,44 @@ pub struct GrpcSource {
pub addr: String,
pub x_token: Option<String>,
}

fn quic_params_from_environment() -> Option<QuicConnectionParameters> {
let mut quic_connection_parameters = QuicConnectionParameters::default();

quic_connection_parameters.connection_timeout = env::var("QUIC_CONNECTION_TIMEOUT_MILLIS")
.map(|millis| Duration::from_millis(millis.parse().unwrap()))
.unwrap_or(quic_connection_parameters.connection_timeout);

quic_connection_parameters.unistream_timeout = env::var("QUIC_UNISTREAM_TIMEOUT_MILLIS")
.map(|millis| Duration::from_millis(millis.parse().unwrap()))
.unwrap_or(quic_connection_parameters.unistream_timeout);

quic_connection_parameters.write_timeout = env::var("QUIC_WRITE_TIMEOUT_MILLIS")
.map(|millis| Duration::from_millis(millis.parse().unwrap()))
.unwrap_or(quic_connection_parameters.write_timeout);

quic_connection_parameters.finalize_timeout = env::var("QUIC_FINALIZE_TIMEOUT_MILLIS")
.map(|millis| Duration::from_millis(millis.parse().unwrap()))
.unwrap_or(quic_connection_parameters.finalize_timeout);

quic_connection_parameters.connection_retry_count = env::var("QUIC_CONNECTION_RETRY_COUNT")
.map(|millis| millis.parse().unwrap())
.unwrap_or(quic_connection_parameters.connection_retry_count);

quic_connection_parameters.max_number_of_connections =
env::var("QUIC_MAX_NUMBER_OF_CONNECTIONS")
.map(|millis| millis.parse().unwrap())
.unwrap_or(quic_connection_parameters.max_number_of_connections);

quic_connection_parameters.number_of_transactions_per_unistream =
env::var("QUIC_NUMBER_OF_TRANSACTIONS_PER_TASK")
.map(|millis| millis.parse().unwrap())
.unwrap_or(quic_connection_parameters.number_of_transactions_per_unistream);

quic_connection_parameters.percentage_of_connection_limit_to_create_new =
env::var("QUIC_PERCENTAGE_TO_CREATE_NEW_CONNECTION")
.map(|millis| millis.parse().unwrap())
.unwrap_or(quic_connection_parameters.percentage_of_connection_limit_to_create_new);

Some(quic_connection_parameters)
}
17 changes: 5 additions & 12 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use lite_rpc::service_spawner::ServiceSpawner;
use lite_rpc::DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE;
use log::{debug, info};
use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming;
use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription;
use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{
use solana_lite_rpc_cluster_endpoints::geyser_grpc_connector::{
GrpcConnectionTimeouts, GrpcSourceConfig,
};
use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription;
use solana_lite_rpc_cluster_endpoints::json_rpc_leaders_getter::JsonRpcLeaderGetter;
use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription;
use solana_lite_rpc_cluster_endpoints::rpc_polling::poll_blocks::NUM_PARALLEL_TASKS_DEFAULT;
Expand All @@ -36,7 +36,6 @@ use solana_lite_rpc_history::history::History;
use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig;
use solana_lite_rpc_history::postgres::postgres_session::PostgresSessionCache;
use solana_lite_rpc_services::data_caching_service::DataCachingService;
use solana_lite_rpc_services::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig};
use solana_lite_rpc_services::transaction_replayer::TransactionReplayer;
Expand Down Expand Up @@ -115,6 +114,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
transaction_retry_after_secs,
quic_proxy_addr,
use_grpc,
quic_connection_parameters,
..
} = args;

Expand All @@ -135,6 +135,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};

create_grpc_subscription(
Expand Down Expand Up @@ -210,15 +211,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
let tpu_config = TpuServiceConfig {
fanout_slots: fanout_size,
maximum_transaction_in_queue: 20000,
quic_connection_params: QuicConnectionParameters {
connection_timeout: Duration::from_secs(1),
connection_retry_count: 10,
finalize_timeout: Duration::from_millis(1000),
max_number_of_connections: 8,
unistream_timeout: Duration::from_millis(500),
write_timeout: Duration::from_secs(1),
number_of_transactions_per_unistream: 1,
},
quic_connection_params: quic_connection_parameters.unwrap_or_default(),
tpu_connection_path,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
unistream_timeout: Duration::from_secs(2),
write_timeout: Duration::from_secs(2),
number_of_transactions_per_unistream: 10,
percentage_of_connection_limit_to_create_new: 10,
};

#[test]
Expand Down
5 changes: 4 additions & 1 deletion services/src/quic_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,10 @@ impl QuicConnectionPool {
v
},
permit_threshold: max_number_of_unistream_connection
.saturating_mul(90)
.saturating_mul(std::cmp::max(
connection_parameters.percentage_of_connection_limit_to_create_new,
100,
) as usize)
.saturating_div(100),
}
}
Expand Down
33 changes: 32 additions & 1 deletion services/src/quic_connection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
TokioRuntime, TransportConfig, VarInt,
};
use serde::{Deserialize, Serialize};
use solana_lite_rpc_core::network_utils::apply_gso_workaround;
use solana_sdk::pubkey::Pubkey;
use std::{
Expand All @@ -17,6 +18,15 @@ use std::{
use tokio::time::timeout;

lazy_static::lazy_static! {
static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_0RTT_successful", "Number of times 0RTT attempted")).unwrap();
static ref NB_QUIC_CONN_ATTEMPTED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_0RTT_successful", "Number of times conn attempted")).unwrap();
static ref NB_QUIC_0RTT_SUCCESSFUL: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_0RTT_successful", "Number of times 0RTT successful")).unwrap();
static ref NB_QUIC_CONN_SUCCESSFUL: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_conn_successful", "Number of times conn successful")).unwrap();

static ref NB_QUIC_0RTT_TIMEOUT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_0RTT_timedout", "Number of times 0RTT timedout")).unwrap();
static ref NB_QUIC_CONNECTION_TIMEOUT: GenericGauge<prometheus::core::AtomicI64> =
Expand All @@ -40,7 +50,7 @@ pub enum QuicConnectionError {
ConnectionError { retry: bool },
}

#[derive(Clone, Copy)]
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
pub struct QuicConnectionParameters {
pub connection_timeout: Duration,
pub unistream_timeout: Duration,
Expand All @@ -49,6 +59,22 @@ pub struct QuicConnectionParameters {
pub connection_retry_count: usize,
pub max_number_of_connections: usize,
pub number_of_transactions_per_unistream: usize,
pub percentage_of_connection_limit_to_create_new: u8,
}

impl Default for QuicConnectionParameters {
fn default() -> Self {
Self {
connection_timeout: Duration::from_millis(5000),
unistream_timeout: Duration::from_millis(5000),
write_timeout: Duration::from_millis(5000),
finalize_timeout: Duration::from_millis(5000),
connection_retry_count: 20,
max_number_of_connections: 8,
number_of_transactions_per_unistream: 1,
percentage_of_connection_limit_to_create_new: 50,
}
}
}

pub struct QuicConnectionUtils {}
Expand Down Expand Up @@ -133,6 +159,7 @@ impl QuicConnectionUtils {
let connection = match connecting.into_0rtt() {
Ok((connection, zero_rtt)) => {
if (timeout(connection_timeout, zero_rtt).await).is_ok() {
NB_QUIC_0RTT_SUCCESSFUL.inc();
connection
} else {
NB_QUIC_0RTT_TIMEOUT.inc();
Expand All @@ -143,6 +170,8 @@ impl QuicConnectionUtils {
if let Ok(connecting_result) = timeout(connection_timeout, connecting).await {
if connecting_result.is_err() {
NB_QUIC_CONNECTION_ERRORED.inc();
} else {
NB_QUIC_CONN_SUCCESSFUL.inc();
}
connecting_result?
} else {
Expand All @@ -166,8 +195,10 @@ impl QuicConnectionUtils {
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
NB_QUIC_0RTT_ATTEMPTED.inc();
Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout).await
} else {
NB_QUIC_CONN_ATTEMPTED.inc();
Self::make_connection(endpoint.clone(), addr, connection_timeout).await
};
match conn {
Expand Down

0 comments on commit 85fcb60

Please sign in to comment.