diff --git a/Cargo.lock b/Cargo.lock index 0e9569f47e6a82..8be26642cb791d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8407,6 +8407,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-tpu-client-next" +version = "2.1.0" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures 0.3.31", + "log", + "lru", + "quinn", + "rustls 0.23.14", + "solana-cli-config", + "solana-connection-cache", + "solana-logger", + "solana-measure", + "solana-rpc-client", + "solana-sdk", + "solana-streamer", + "solana-tpu-client", + "thiserror", + "tokio", + "tokio-util 0.7.12", +] + [[package]] name = "solana-transaction-dos" version = "2.1.0" diff --git a/Cargo.toml b/Cargo.toml index 925d3c01cb13e3..db33afda423e7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -148,6 +148,7 @@ members = [ "tokens", "tps-client", "tpu-client", + "tpu-client-next", "transaction-dos", "transaction-metrics-tracker", "transaction-status", diff --git a/tpu-client-next/Cargo.toml b/tpu-client-next/Cargo.toml new file mode 100644 index 00000000000000..13ceb0e584a10f --- /dev/null +++ b/tpu-client-next/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "solana-tpu-client-next" +description = "Client code to send transaction to TPU." +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +[dependencies] +async-trait = { workspace = true } +log = { workspace = true } +lru = { workspace = true } +quinn = { workspace = true } +rustls = { workspace = true } +solana-connection-cache = { workspace = true } +solana-logger = { workspace = true } +solana-measure = { workspace = true } +solana-rpc-client = { workspace = true } +solana-sdk = { workspace = true } +solana-streamer = { workspace = true } +solana-tpu-client = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } + +[dev-dependencies] +crossbeam-channel = { workspace = true } +futures = { workspace = true } +solana-cli-config = { workspace = true } + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/tpu-client-next/src/connection_worker.rs b/tpu-client-next/src/connection_worker.rs new file mode 100644 index 00000000000000..7d77bc3f6ed2a2 --- /dev/null +++ b/tpu-client-next/src/connection_worker.rs @@ -0,0 +1,258 @@ +//! This module defines [`ConnectionWorker`] which encapsulates the functionality +//! needed to handle one connection within the scope of task. + +use { + super::SendTransactionStats, + crate::{ + quic_networking::send_data_over_stream, send_transaction_stats::record_error, + transaction_batch::TransactionBatch, + }, + log::*, + quinn::{ConnectError, Connection, Endpoint}, + solana_measure::measure::Measure, + solana_sdk::{ + clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS}, + timing::timestamp, + }, + std::net::SocketAddr, + tokio::{ + sync::mpsc, + time::{sleep, Duration}, + }, + tokio_util::sync::CancellationToken, +}; + +/// Interval between retry attempts for creating a new connection. This value is +/// a best-effort estimate, based on current network conditions. +const RETRY_SLEEP_INTERVAL: Duration = + Duration::from_millis(NUM_CONSECUTIVE_LEADER_SLOTS * DEFAULT_MS_PER_SLOT); + +/// Maximum age (in milliseconds) of a blockhash, beyond which transaction +/// batches are dropped. +const MAX_PROCESSING_AGE_MS: u64 = MAX_PROCESSING_AGE as u64 * DEFAULT_MS_PER_SLOT; + +/// [`ConnectionState`] represents the current state of a quic connection. +/// +/// It tracks the lifecycle of connection from initial setup to closing phase. +/// The transition function between states is defined in `ConnectionWorker` +/// implementation. +enum ConnectionState { + NotSetup, + Active(Connection), + Retry(usize), + Closing, +} + +impl Drop for ConnectionState { + /// When [`ConnectionState`] is dropped, underlying connection is closed + /// which means that there is no guarantee that the open streams will + /// finish. + fn drop(&mut self) { + if let Self::Active(connection) = self { + debug!( + "Close connection with {:?}, stats: {:?}. All pending streams will be dropped.", + connection.remote_address(), + connection.stats() + ); + connection.close(0u32.into(), b"done"); + } + } +} + +/// [`ConnectionWorker`] holds connection to the validator with address `peer`. +/// +/// If connection has been closed, [`ConnectionWorker`] tries to reconnect +/// `max_reconnect_attempts` times. If connection is in `Active` state, it sends +/// transactions received from `transactions_receiver`. Additionally, it +/// accumulates statistics about connections and streams failures. +pub(crate) struct ConnectionWorker { + endpoint: Endpoint, + peer: SocketAddr, + transactions_receiver: mpsc::Receiver, + connection: ConnectionState, + skip_check_transaction_age: bool, + max_reconnect_attempts: usize, + send_txs_stats: SendTransactionStats, + cancel: CancellationToken, +} + +impl ConnectionWorker { + /// Constructs a [`ConnectionWorker`]. + /// + /// [`ConnectionWorker`] maintains a connection to a `peer` and processes + /// transactions from `transactions_receiver`. If + /// `skip_check_transaction_age` is set to `true`, the worker skips checking + /// for transaction blockhash expiration. The `max_reconnect_attempts` + /// parameter controls how many times the worker will attempt to reconnect + /// in case of connection failure. Returns the created `ConnectionWorker` + /// along with a cancellation token that can be used by the caller to stop + /// the worker. + pub fn new( + endpoint: Endpoint, + peer: SocketAddr, + transactions_receiver: mpsc::Receiver, + skip_check_transaction_age: bool, + max_reconnect_attempts: usize, + ) -> (Self, CancellationToken) { + let cancel = CancellationToken::new(); + + let this = Self { + endpoint, + peer, + transactions_receiver, + connection: ConnectionState::NotSetup, + skip_check_transaction_age, + max_reconnect_attempts, + send_txs_stats: SendTransactionStats::default(), + cancel: cancel.clone(), + }; + + (this, cancel) + } + + /// Starts the main loop of the [`ConnectionWorker`]. + /// + /// This method manages the connection to the peer and handles state + /// transitions. It runs indefinitely until the connection is closed or an + /// unrecoverable error occurs. + pub async fn run(&mut self) { + let cancel = self.cancel.clone(); + + let main_loop = async move { + loop { + match &self.connection { + ConnectionState::Closing => { + break; + } + ConnectionState::NotSetup => { + self.create_connection(0).await; + } + ConnectionState::Active(connection) => { + let Some(transactions) = self.transactions_receiver.recv().await else { + debug!("Transactions sender has been dropped."); + self.connection = ConnectionState::Closing; + continue; + }; + self.send_transactions(connection.clone(), transactions) + .await; + } + ConnectionState::Retry(num_reconnects) => { + if *num_reconnects > self.max_reconnect_attempts { + error!("Failed to establish connection: reach max reconnect attempts."); + self.connection = ConnectionState::Closing; + continue; + } + sleep(RETRY_SLEEP_INTERVAL).await; + self.reconnect(*num_reconnects).await; + } + } + } + }; + + tokio::select! { + () = main_loop => (), + () = cancel.cancelled() => (), + } + } + + /// Retrieves the statistics for transactions sent by this worker. + pub fn transaction_stats(&self) -> &SendTransactionStats { + &self.send_txs_stats + } + + /// Sends a batch of transactions using the provided `connection`. + /// + /// Each transaction in the batch is sent over the QUIC streams one at the + /// time, which prevents traffic fragmentation and shows better TPS in + /// comparison with multistream send. If the batch is determined to be + /// outdated and flag `skip_check_transaction_age` is unset, it will be + /// dropped without being sent. + /// + /// In case of error, it doesn't retry to send the same transactions again. + async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) { + let now = timestamp(); + if !self.skip_check_transaction_age + && now.saturating_sub(transactions.timestamp()) > MAX_PROCESSING_AGE_MS + { + debug!("Drop outdated transaction batch."); + return; + } + let mut measure_send = Measure::start("send transaction batch"); + for data in transactions.into_iter() { + let result = send_data_over_stream(&connection, &data).await; + + if let Err(error) = result { + trace!("Failed to send transaction over stream with error: {error}."); + record_error(error, &mut self.send_txs_stats); + self.connection = ConnectionState::Retry(0); + } else { + self.send_txs_stats.successfully_sent = + self.send_txs_stats.successfully_sent.saturating_add(1); + } + } + measure_send.stop(); + debug!( + "Time to send transactions batch: {} us", + measure_send.as_us() + ); + } + + /// Attempts to create a new connection to the specified `peer` address. + /// + /// If the connection is successful, the state is updated to `Active`. + /// + /// If an error occurs, the state may transition to `Retry` or `Closing`, + /// depending on the nature of the error. + async fn create_connection(&mut self, max_retries_attempt: usize) { + let connecting = self.endpoint.connect(self.peer, "connect"); + match connecting { + Ok(connecting) => { + let mut measure_connection = Measure::start("establish connection"); + let res = connecting.await; + measure_connection.stop(); + debug!( + "Establishing connection with {} took: {} us", + self.peer, + measure_connection.as_us() + ); + match res { + Ok(connection) => { + self.connection = ConnectionState::Active(connection); + } + Err(err) => { + warn!("Connection error {}: {}", self.peer, err); + record_error(err.into(), &mut self.send_txs_stats); + self.connection = + ConnectionState::Retry(max_retries_attempt.saturating_add(1)); + } + } + } + Err(connecting_error) => { + record_error(connecting_error.clone().into(), &mut self.send_txs_stats); + match connecting_error { + ConnectError::EndpointStopping => { + debug!("Endpoint stopping, exit connection worker."); + self.connection = ConnectionState::Closing; + } + ConnectError::InvalidRemoteAddress(_) => { + warn!("Invalid remote address."); + self.connection = ConnectionState::Closing; + } + e => { + error!("Unexpected error has happen while trying to create connection {e}"); + self.connection = ConnectionState::Closing; + } + } + } + } + } + + /// Attempts to reconnect to the peer after a connection failure. + async fn reconnect(&mut self, num_reconnects: usize) { + debug!("Trying to reconnect. Reopen connection, 0rtt is not implemented yet."); + // We can reconnect using 0rtt, but not a priority for now. Check if we + // need to call config.enable_0rtt() on the client side and where + // session tickets are stored. + self.create_connection(num_reconnects).await; + } +} diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs new file mode 100644 index 00000000000000..82b038827b48eb --- /dev/null +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -0,0 +1,213 @@ +//! This module defines [`ConnectionWorkersScheduler`] which sends transactions +//! to the upcoming leaders. + +use { + super::{leader_updater::LeaderUpdater, SendTransactionStatsPerAddr}, + crate::{ + connection_worker::ConnectionWorker, + quic_networking::{ + create_client_config, create_client_endpoint, QuicClientCertificate, QuicError, + }, + transaction_batch::TransactionBatch, + workers_cache::{WorkerInfo, WorkersCache, WorkersCacheError}, + }, + log::*, + quinn::Endpoint, + solana_sdk::signature::Keypair, + std::{net::SocketAddr, sync::Arc}, + thiserror::Error, + tokio::sync::mpsc, + tokio_util::sync::CancellationToken, +}; + +/// The [`ConnectionWorkersScheduler`] sends transactions from the provided +/// receiver channel to upcoming leaders. It obtains information about future +/// leaders from the implementation of the [`LeaderUpdater`] trait. +/// +/// Internally, it enables the management and coordination of multiple network +/// connections, schedules and oversees connection workers. +pub struct ConnectionWorkersScheduler; + +/// Errors that arise from running [`ConnectionWorkersSchedulerError`]. +#[derive(Debug, Error, PartialEq)] +pub enum ConnectionWorkersSchedulerError { + #[error(transparent)] + QuicError(#[from] QuicError), + #[error(transparent)] + WorkersCacheError(#[from] WorkersCacheError), + #[error("Leader receiver unexpectedly dropped.")] + LeaderReceiverDropped, +} + +/// Configuration for the [`ConnectionWorkersScheduler`]. +/// +/// This struct holds the necessary settings to initialize and manage connection +/// workers, including network binding, identity, connection limits, and +/// behavior related to transaction handling. +pub struct ConnectionWorkersSchedulerConfig { + /// The local address to bind the scheduler to. + pub bind: SocketAddr, + + /// Optional stake identity keypair used in the endpoint certificate for + /// identifying the sender. + pub stake_identity: Option, + + /// The number of connections to be maintained by the scheduler. + pub num_connections: usize, + + /// Whether to skip checking the transaction blockhash expiration. + pub skip_check_transaction_age: bool, + + /// The size of the channel used to transmit transaction batches to the + /// worker tasks. + pub worker_channel_size: usize, + + /// The maximum number of reconnection attempts allowed in case of + /// connection failure. + pub max_reconnect_attempts: usize, + + /// The number of slots to look ahead during the leader estimation + /// procedure. Determines how far into the future leaders are estimated, + /// allowing connections to be established with those leaders in advance. + pub lookahead_slots: u64, +} + +impl ConnectionWorkersScheduler { + /// Starts the scheduler, which manages the distribution of transactions to + /// the network's upcoming leaders. + /// + /// Runs the main loop that handles worker scheduling and management for + /// connections. Returns the error quic statistics per connection address or + /// an error. + /// + /// Importantly, if some transactions were not delivered due to network + /// problems, they will not be retried when the problem is resolved. + pub async fn run( + ConnectionWorkersSchedulerConfig { + bind, + stake_identity: validator_identity, + num_connections, + skip_check_transaction_age, + worker_channel_size, + max_reconnect_attempts, + lookahead_slots, + }: ConnectionWorkersSchedulerConfig, + mut leader_updater: Box, + mut transaction_receiver: mpsc::Receiver, + cancel: CancellationToken, + ) -> Result { + let endpoint = Self::setup_endpoint(bind, validator_identity)?; + debug!("Client endpoint bind address: {:?}", endpoint.local_addr()); + let mut workers = WorkersCache::new(num_connections, cancel.clone()); + + loop { + let transaction_batch = tokio::select! { + recv_res = transaction_receiver.recv() => match recv_res { + Some(txs) => txs, + None => { + debug!("End of `transaction_receiver`: shutting down."); + break; + } + }, + () = cancel.cancelled() => { + debug!("Cancelled: Shutting down"); + break; + } + }; + let updated_leaders = leader_updater.next_leaders(lookahead_slots); + let new_leader = &updated_leaders[0]; + let future_leaders = &updated_leaders[1..]; + if !workers.contains(new_leader) { + debug!("No existing workers for {new_leader:?}, starting a new one."); + let worker = Self::spawn_worker( + &endpoint, + new_leader, + worker_channel_size, + skip_check_transaction_age, + max_reconnect_attempts, + ); + workers.push(*new_leader, worker).await; + } + + tokio::select! { + send_res = workers.send_transactions_to_address(new_leader, transaction_batch) => match send_res { + Ok(()) => (), + Err(WorkersCacheError::ShutdownError) => { + debug!("Connection to {new_leader} was closed, worker cache shutdown"); + } + Err(err) => { + warn!("Connection to {new_leader} was closed, worker error: {err}"); + // If we has failed to send batch, it will be dropped. + } + }, + () = cancel.cancelled() => { + debug!("Cancelled: Shutting down"); + break; + } + }; + + // Regardless of who is leader, add future leaders to the cache to + // hide the latency of opening the connection. + for peer in future_leaders { + if !workers.contains(peer) { + let worker = Self::spawn_worker( + &endpoint, + peer, + worker_channel_size, + skip_check_transaction_age, + max_reconnect_attempts, + ); + workers.push(*peer, worker).await; + } + } + } + + workers.shutdown().await; + + endpoint.close(0u32.into(), b"Closing connection"); + leader_updater.stop().await; + Ok(workers.transaction_stats().clone()) + } + + /// Sets up the QUIC endpoint for the scheduler to handle connections. + fn setup_endpoint( + bind: SocketAddr, + validator_identity: Option, + ) -> Result { + let client_certificate = if let Some(validator_identity) = validator_identity { + Arc::new(QuicClientCertificate::new(&validator_identity)) + } else { + Arc::new(QuicClientCertificate::new(&Keypair::new())) + }; + let client_config = create_client_config(client_certificate); + let endpoint = create_client_endpoint(bind, client_config)?; + Ok(endpoint) + } + + /// Spawns a worker to handle communication with a given peer. + fn spawn_worker( + endpoint: &Endpoint, + peer: &SocketAddr, + worker_channel_size: usize, + skip_check_transaction_age: bool, + max_reconnect_attempts: usize, + ) -> WorkerInfo { + let (txs_sender, txs_receiver) = mpsc::channel(worker_channel_size); + let endpoint = endpoint.clone(); + let peer = *peer; + + let (mut worker, cancel) = ConnectionWorker::new( + endpoint, + peer, + txs_receiver, + skip_check_transaction_age, + max_reconnect_attempts, + ); + let handle = tokio::spawn(async move { + worker.run().await; + worker.transaction_stats().clone() + }); + + WorkerInfo::new(txs_sender, handle, cancel) + } +} diff --git a/tpu-client-next/src/leader_updater.rs b/tpu-client-next/src/leader_updater.rs new file mode 100644 index 00000000000000..5e07b9b0bfe612 --- /dev/null +++ b/tpu-client-next/src/leader_updater.rs @@ -0,0 +1,124 @@ +//! This module provides [`LeaderUpdater`] trait along with +//! `create_leader_updater` function to create an instance of this trait. +//! +//! Currently, the main purpose of [`LeaderUpdater`] is to abstract over leader +//! updates, hiding the details of how leaders are retrieved and which +//! structures are used. It contains trait implementations +//! `LeaderUpdaterService` and `PinnedLeaderUpdater`, where +//! `LeaderUpdaterService` keeps [`LeaderTpuService`] internal to this module. +//! Yet, it also allows to implement custom leader estimation. + +use { + async_trait::async_trait, + log::*, + solana_connection_cache::connection_cache::Protocol, + solana_rpc_client::nonblocking::rpc_client::RpcClient, + solana_tpu_client::nonblocking::tpu_client::LeaderTpuService, + std::{ + fmt, + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + }, +}; + +/// [`LeaderUpdater`] trait abstracts out functionality required for the +/// [`ConnectionWorkersScheduler`](crate::ConnectionWorkersScheduler) to +/// identify next leaders to send transactions to. +#[async_trait] +pub trait LeaderUpdater: Send { + /// Returns next unique leaders for the next `lookahead_slots` starting from + /// current estimated slot. + /// + /// If the current leader estimation is incorrect and transactions are sent to + /// only one estimated leader, there is a risk of losing all the transactions, + /// depending on the forwarding policy. + fn next_leaders(&self, lookahead_slots: u64) -> Vec; + + /// Stop [`LeaderUpdater`] and releases all associated resources. + async fn stop(&mut self); +} + +/// Error type for [`LeaderUpdater`]. +pub struct LeaderUpdaterError; + +impl fmt::Display for LeaderUpdaterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Leader updater encountered an error") + } +} + +impl fmt::Debug for LeaderUpdaterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "LeaderUpdaterError") + } +} + +/// Creates a [`LeaderUpdater`] based on the configuration provided by the +/// caller. +/// +/// If `pinned_address` is provided, it returns a `PinnedLeaderUpdater` that +/// always returns the provided address instead of checking leader schedule. +/// Otherwise, it creates a `LeaderUpdaterService` which dynamically updates the +/// leaders by connecting to the network via the [`LeaderTpuService`]. +pub async fn create_leader_updater( + rpc_client: Arc, + websocket_url: String, + pinned_address: Option, +) -> Result, LeaderUpdaterError> { + if let Some(pinned_address) = pinned_address { + return Ok(Box::new(PinnedLeaderUpdater { + address: vec![pinned_address], + })); + } + + let exit = Arc::new(AtomicBool::new(false)); + let leader_tpu_service = + LeaderTpuService::new(rpc_client, &websocket_url, Protocol::QUIC, exit.clone()) + .await + .map_err(|error| { + error!("Failed to create a LeaderTpuService: {error}"); + LeaderUpdaterError + })?; + Ok(Box::new(LeaderUpdaterService { + leader_tpu_service, + exit, + })) +} + +/// `LeaderUpdaterService` is an implementation of the [`LeaderUpdater`] trait +/// that dynamically retrieves the current and upcoming leaders by communicating +/// with the Solana network using [`LeaderTpuService`]. +struct LeaderUpdaterService { + leader_tpu_service: LeaderTpuService, + exit: Arc, +} + +#[async_trait] +impl LeaderUpdater for LeaderUpdaterService { + fn next_leaders(&self, lookahead_slots: u64) -> Vec { + self.leader_tpu_service.leader_tpu_sockets(lookahead_slots) + } + + async fn stop(&mut self) { + self.exit.store(true, Ordering::Relaxed); + self.leader_tpu_service.join().await; + } +} + +/// `PinnedLeaderUpdater` is an implementation of [`LeaderUpdater`] that always +/// returns a fixed, "pinned" leader address. It is mainly used for testing. +struct PinnedLeaderUpdater { + address: Vec, +} + +#[async_trait] +impl LeaderUpdater for PinnedLeaderUpdater { + fn next_leaders(&self, _lookahead_slots: u64) -> Vec { + self.address.clone() + } + + async fn stop(&mut self) {} +} diff --git a/tpu-client-next/src/lib.rs b/tpu-client-next/src/lib.rs new file mode 100644 index 00000000000000..720b3876b47cb4 --- /dev/null +++ b/tpu-client-next/src/lib.rs @@ -0,0 +1,12 @@ +pub(crate) mod connection_worker; +pub mod connection_workers_scheduler; +pub mod send_transaction_stats; +pub(crate) mod workers_cache; +pub use crate::{ + connection_workers_scheduler::{ConnectionWorkersScheduler, ConnectionWorkersSchedulerError}, + send_transaction_stats::{SendTransactionStats, SendTransactionStatsPerAddr}, +}; +pub(crate) mod quic_networking; +pub(crate) use crate::quic_networking::QuicError; +pub mod leader_updater; +pub mod transaction_batch; diff --git a/tpu-client-next/src/quic_networking.rs b/tpu-client-next/src/quic_networking.rs new file mode 100644 index 00000000000000..b18fa469241da9 --- /dev/null +++ b/tpu-client-next/src/quic_networking.rs @@ -0,0 +1,70 @@ +//! Utility code to handle quic networking. + +use { + quinn::{ + crypto::rustls::QuicClientConfig, ClientConfig, Connection, Endpoint, IdleTimeout, + TransportConfig, + }, + skip_server_verification::SkipServerVerification, + solana_sdk::quic::{QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT}, + solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID, + std::{net::SocketAddr, sync::Arc}, +}; + +pub mod error; +pub mod quic_client_certificate; +pub mod skip_server_verification; + +pub use { + error::{IoErrorWithPartialEq, QuicError}, + quic_client_certificate::QuicClientCertificate, +}; + +pub(crate) fn create_client_config(client_certificate: Arc) -> ClientConfig { + // adapted from QuicLazyInitializedEndpoint::create_endpoint + let mut crypto = rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(SkipServerVerification::new()) + .with_client_auth_cert( + vec![client_certificate.certificate.clone()], + client_certificate.key.clone_key(), + ) + .expect("Failed to set QUIC client certificates"); + crypto.enable_early_data = true; + crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()]; + + let transport_config = { + let mut res = TransportConfig::default(); + + let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap(); + res.max_idle_timeout(Some(timeout)); + res.keep_alive_interval(Some(QUIC_KEEP_ALIVE)); + + res + }; + + let mut config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto).unwrap())); + config.transport_config(Arc::new(transport_config)); + + config +} + +pub(crate) fn create_client_endpoint( + bind_addr: SocketAddr, + client_config: ClientConfig, +) -> Result { + let mut endpoint = Endpoint::client(bind_addr).map_err(IoErrorWithPartialEq::from)?; + endpoint.set_default_client_config(client_config); + Ok(endpoint) +} + +pub(crate) async fn send_data_over_stream( + connection: &Connection, + data: &[u8], +) -> Result<(), QuicError> { + let mut send_stream = connection.open_uni().await?; + send_stream.write_all(data).await.map_err(QuicError::from)?; + + // Stream will be finished when dropped. Finishing here explicitly is a noop. + Ok(()) +} diff --git a/tpu-client-next/src/quic_networking/error.rs b/tpu-client-next/src/quic_networking/error.rs new file mode 100644 index 00000000000000..8fa79265cb69a4 --- /dev/null +++ b/tpu-client-next/src/quic_networking/error.rs @@ -0,0 +1,49 @@ +use { + quinn::{ConnectError, ConnectionError, WriteError}, + std::{ + fmt::{self, Formatter}, + io, + }, + thiserror::Error, +}; + +/// Wrapper for [`io::Error`] implementing [`PartialEq`] to simplify error +/// checking for the [`QuicError`] type. The reasons why [`io::Error`] doesn't +/// implement [`PartialEq`] are discusses in +/// . +#[derive(Debug, Error)] +pub struct IoErrorWithPartialEq(pub io::Error); + +impl PartialEq for IoErrorWithPartialEq { + fn eq(&self, other: &Self) -> bool { + let formatted_self = format!("{self:?}"); + let formatted_other = format!("{other:?}"); + formatted_self == formatted_other + } +} + +impl fmt::Display for IoErrorWithPartialEq { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl From for IoErrorWithPartialEq { + fn from(err: io::Error) -> Self { + IoErrorWithPartialEq(err) + } +} + +/// Error types that can occur when dealing with QUIC connections or +/// transmissions. +#[derive(Error, Debug, PartialEq)] +pub enum QuicError { + #[error(transparent)] + StreamWrite(#[from] WriteError), + #[error(transparent)] + Connection(#[from] ConnectionError), + #[error(transparent)] + Connect(#[from] ConnectError), + #[error(transparent)] + Endpoint(#[from] IoErrorWithPartialEq), +} diff --git a/tpu-client-next/src/quic_networking/quic_client_certificate.rs b/tpu-client-next/src/quic_networking/quic_client_certificate.rs new file mode 100644 index 00000000000000..b9f0c8d1cf27a6 --- /dev/null +++ b/tpu-client-next/src/quic_networking/quic_client_certificate.rs @@ -0,0 +1,17 @@ +use { + rustls::pki_types::{CertificateDer, PrivateKeyDer}, + solana_sdk::signature::Keypair, + solana_streamer::tls_certificates::new_dummy_x509_certificate, +}; + +pub struct QuicClientCertificate { + pub certificate: CertificateDer<'static>, + pub key: PrivateKeyDer<'static>, +} + +impl QuicClientCertificate { + pub fn new(keypair: &Keypair) -> Self { + let (certificate, key) = new_dummy_x509_certificate(keypair); + Self { certificate, key } + } +} diff --git a/tpu-client-next/src/quic_networking/skip_server_verification.rs b/tpu-client-next/src/quic_networking/skip_server_verification.rs new file mode 100644 index 00000000000000..70b17e0fcd5bab --- /dev/null +++ b/tpu-client-next/src/quic_networking/skip_server_verification.rs @@ -0,0 +1,74 @@ +use { + rustls::{ + client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}, + crypto::{ring, verify_tls12_signature, verify_tls13_signature, CryptoProvider}, + pki_types::{CertificateDer, ServerName, UnixTime}, + DigitallySignedStruct, Error, SignatureScheme, + }, + std::{ + fmt::{self, Debug, Formatter}, + sync::Arc, + }, +}; + +/// Implementation of [`ServerCertVerifier`] that ignores the server +/// certificate. Yet still checks the TLS signatures. +pub(crate) struct SkipServerVerification(Arc); + +impl SkipServerVerification { + pub fn new() -> Arc { + Arc::new(Self(Arc::new(ring::default_provider()))) + } +} + +impl ServerCertVerifier for SkipServerVerification { + fn verify_tls12_signature( + &self, + message: &[u8], + cert: &CertificateDer<'_>, + dss: &DigitallySignedStruct, + ) -> Result { + verify_tls12_signature( + message, + cert, + dss, + &self.0.signature_verification_algorithms, + ) + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &CertificateDer<'_>, + dss: &DigitallySignedStruct, + ) -> Result { + verify_tls13_signature( + message, + cert, + dss, + &self.0.signature_verification_algorithms, + ) + } + + fn supported_verify_schemes(&self) -> Vec { + self.0.signature_verification_algorithms.supported_schemes() + } + + fn verify_server_cert( + &self, + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &ServerName, + _ocsp_response: &[u8], + _now: UnixTime, + ) -> Result { + Ok(ServerCertVerified::assertion()) + } +} + +impl Debug for SkipServerVerification { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("SkipServerVerification") + .finish_non_exhaustive() + } +} diff --git a/tpu-client-next/src/send_transaction_stats.rs b/tpu-client-next/src/send_transaction_stats.rs new file mode 100644 index 00000000000000..abe68b8bf60213 --- /dev/null +++ b/tpu-client-next/src/send_transaction_stats.rs @@ -0,0 +1,166 @@ +//! This module defines [`SendTransactionStats`] which is used to collect per IP +//! statistics about relevant network errors. + +use { + super::QuicError, + quinn::{ConnectError, ConnectionError, WriteError}, + std::{collections::HashMap, fmt, net::IpAddr}, +}; + +/// [`SendTransactionStats`] aggregates counters related to sending transactions. +#[derive(Debug, Default, Clone, PartialEq)] +pub struct SendTransactionStats { + pub successfully_sent: u64, + pub connect_error_cids_exhausted: u64, + pub connect_error_invalid_remote_address: u64, + pub connect_error_other: u64, + pub connection_error_application_closed: u64, + pub connection_error_cids_exhausted: u64, + pub connection_error_connection_closed: u64, + pub connection_error_locally_closed: u64, + pub connection_error_reset: u64, + pub connection_error_timed_out: u64, + pub connection_error_transport_error: u64, + pub connection_error_version_mismatch: u64, + pub write_error_closed_stream: u64, + pub write_error_connection_lost: u64, + pub write_error_stopped: u64, + pub write_error_zero_rtt_rejected: u64, +} + +#[allow(clippy::arithmetic_side_effects)] +pub fn record_error(err: QuicError, stats: &mut SendTransactionStats) { + match err { + QuicError::Connect(ConnectError::EndpointStopping) => { + stats.connect_error_other += 1; + } + QuicError::Connect(ConnectError::CidsExhausted) => { + stats.connect_error_cids_exhausted += 1; + } + QuicError::Connect(ConnectError::InvalidServerName(_)) => { + stats.connect_error_other += 1; + } + QuicError::Connect(ConnectError::InvalidRemoteAddress(_)) => { + stats.connect_error_invalid_remote_address += 1; + } + QuicError::Connect(ConnectError::NoDefaultClientConfig) => { + stats.connect_error_other += 1; + } + QuicError::Connect(ConnectError::UnsupportedVersion) => { + stats.connect_error_other += 1; + } + QuicError::Connection(ConnectionError::VersionMismatch) => { + stats.connection_error_version_mismatch += 1; + } + QuicError::Connection(ConnectionError::TransportError(_)) => { + stats.connection_error_transport_error += 1; + } + QuicError::Connection(ConnectionError::ConnectionClosed(_)) => { + stats.connection_error_connection_closed += 1; + } + QuicError::Connection(ConnectionError::ApplicationClosed(_)) => { + stats.connection_error_application_closed += 1; + } + QuicError::Connection(ConnectionError::Reset) => { + stats.connection_error_reset += 1; + } + QuicError::Connection(ConnectionError::TimedOut) => { + stats.connection_error_timed_out += 1; + } + QuicError::Connection(ConnectionError::LocallyClosed) => { + stats.connection_error_locally_closed += 1; + } + QuicError::Connection(ConnectionError::CidsExhausted) => { + stats.connection_error_cids_exhausted += 1; + } + QuicError::StreamWrite(WriteError::Stopped(_)) => { + stats.write_error_stopped += 1; + } + QuicError::StreamWrite(WriteError::ConnectionLost(_)) => { + stats.write_error_connection_lost += 1; + } + QuicError::StreamWrite(WriteError::ClosedStream) => { + stats.write_error_closed_stream += 1; + } + QuicError::StreamWrite(WriteError::ZeroRttRejected) => { + stats.write_error_zero_rtt_rejected += 1; + } + // Endpoint is created on the scheduler level and handled separately + // No counters are used for this case. + QuicError::Endpoint(_) => (), + } +} + +pub type SendTransactionStatsPerAddr = HashMap; + +macro_rules! add_fields { + ($self:ident += $other:ident for: $( $field:ident ),* $(,)? ) => { + $( + $self.$field = $self.$field.saturating_add($other.$field); + )* + }; +} + +impl SendTransactionStats { + pub fn add(&mut self, other: &SendTransactionStats) { + add_fields!( + self += other for: + successfully_sent, + connect_error_cids_exhausted, + connect_error_invalid_remote_address, + connect_error_other, + connection_error_application_closed, + connection_error_cids_exhausted, + connection_error_connection_closed, + connection_error_locally_closed, + connection_error_reset, + connection_error_timed_out, + connection_error_transport_error, + connection_error_version_mismatch, + write_error_closed_stream, + write_error_connection_lost, + write_error_stopped, + write_error_zero_rtt_rejected, + ); + } +} + +macro_rules! display_send_transaction_stats_body { + ($self:ident, $f:ident, $($field:ident),* $(,)?) => { + write!( + $f, + concat!( + "SendTransactionStats:\n", + $( + "\x20 ", stringify!($field), ": {},\n", + )* + ), + $($self.$field),* + ) + }; +} + +impl fmt::Display for SendTransactionStats { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + display_send_transaction_stats_body!( + self, + f, + successfully_sent, + connect_error_cids_exhausted, + connect_error_invalid_remote_address, + connect_error_other, + connection_error_application_closed, + connection_error_cids_exhausted, + connection_error_connection_closed, + connection_error_locally_closed, + connection_error_reset, + connection_error_timed_out, + connection_error_transport_error, + connection_error_version_mismatch, + write_error_closed_stream, + write_error_connection_lost, + write_error_stopped, + write_error_zero_rtt_rejected, + ) + } +} diff --git a/tpu-client-next/src/transaction_batch.rs b/tpu-client-next/src/transaction_batch.rs new file mode 100644 index 00000000000000..a3c2d92fc8386e --- /dev/null +++ b/tpu-client-next/src/transaction_batch.rs @@ -0,0 +1,33 @@ +//! This module holds [`TransactionBatch`] structure. + +use solana_sdk::timing::timestamp; + +/// Batch of generated transactions timestamp is used to discard batches which +/// are too old to have valid blockhash. +#[derive(Clone, PartialEq)] +pub struct TransactionBatch { + wired_transactions: Vec, + timestamp: u64, +} + +type WiredTransaction = Vec; + +impl IntoIterator for TransactionBatch { + type Item = Vec; + type IntoIter = std::vec::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.wired_transactions.into_iter() + } +} + +impl TransactionBatch { + pub fn new(wired_transactions: Vec) -> Self { + Self { + wired_transactions, + timestamp: timestamp(), + } + } + pub fn timestamp(&self) -> u64 { + self.timestamp + } +} diff --git a/tpu-client-next/src/workers_cache.rs b/tpu-client-next/src/workers_cache.rs new file mode 100644 index 00000000000000..90d2954b669d7f --- /dev/null +++ b/tpu-client-next/src/workers_cache.rs @@ -0,0 +1,184 @@ +//! This module defines `WorkersCache` along with aux struct `WorkerInfo`. These +//! structures provide mechanisms for caching workers, sending transaction +//! batches, and gathering send transaction statistics. + +use { + super::SendTransactionStats, + crate::transaction_batch::TransactionBatch, + log::*, + lru::LruCache, + std::{ + collections::HashMap, + net::{IpAddr, SocketAddr}, + }, + thiserror::Error, + tokio::{sync::mpsc, task::JoinHandle}, + tokio_util::sync::CancellationToken, +}; + +/// [`WorkerInfo`] holds information about a worker responsible for sending +/// transaction batches. +pub(crate) struct WorkerInfo { + pub sender: mpsc::Sender, + pub handle: JoinHandle, + pub cancel: CancellationToken, +} + +impl WorkerInfo { + pub fn new( + sender: mpsc::Sender, + handle: JoinHandle, + cancel: CancellationToken, + ) -> Self { + Self { + sender, + handle, + cancel, + } + } + + async fn send_transactions( + &self, + txs_batch: TransactionBatch, + ) -> Result<(), WorkersCacheError> { + self.sender + .send(txs_batch) + .await + .map_err(|_| WorkersCacheError::ReceiverDropped)?; + Ok(()) + } + + /// Closes the worker by dropping the sender and awaiting the worker's + /// statistics. + async fn shutdown(self) -> Result { + self.cancel.cancel(); + drop(self.sender); + let stats = self + .handle + .await + .map_err(|_| WorkersCacheError::TaskJoinFailure)?; + Ok(stats) + } +} + +/// [`WorkersCache`] manages and caches workers. It uses an LRU cache to store and +/// manage workers. It also tracks transaction statistics for each peer. +pub(crate) struct WorkersCache { + workers: LruCache, + send_stats_per_addr: HashMap, + + /// Indicates that the `WorkersCache` is been `shutdown()`, interrupting any outstanding + /// `send_txs()` invocations. + cancel: CancellationToken, +} + +#[derive(Debug, Error, PartialEq)] +pub enum WorkersCacheError { + /// typically happens when the client could not establish the connection. + #[error("Work receiver has been dropped unexpectedly.")] + ReceiverDropped, + + #[error("Task failed to join.")] + TaskJoinFailure, + + #[error("The WorkersCache is being shutdown.")] + ShutdownError, +} + +impl WorkersCache { + pub fn new(capacity: usize, cancel: CancellationToken) -> Self { + Self { + workers: LruCache::new(capacity), + send_stats_per_addr: HashMap::new(), + cancel, + } + } + + pub fn contains(&self, peer: &SocketAddr) -> bool { + self.workers.contains(peer) + } + + pub async fn push(&mut self, peer: SocketAddr, peer_worker: WorkerInfo) { + // Although there might be concerns about the performance implications + // of waiting for the worker to be closed when trying to add a new + // worker, the idea is that these workers are almost always created in + // advance so the latency is hidden. + if let Some((leader, popped_worker)) = self.workers.push(peer, peer_worker) { + self.shutdown_worker(leader, popped_worker).await; + } + } + + /// Sends a batch of transactions to the worker for a given peer. If the + /// worker for the peer is disconnected or fails, it is removed from the + /// cache. + pub async fn send_transactions_to_address( + &mut self, + peer: &SocketAddr, + txs_batch: TransactionBatch, + ) -> Result<(), WorkersCacheError> { + let Self { + workers, cancel, .. + } = self; + + let body = async move { + let current_worker = workers.get(peer).expect( + "Failed to fetch worker for peer {peer}.\n\ + Peer existence must be checked before this call using `contains` method.", + ); + let send_res = current_worker.send_transactions(txs_batch).await; + + if let Err(WorkersCacheError::ReceiverDropped) = send_res { + // Remove the worker from the cache, if the peer has disconnected. + if let Some(current_worker) = workers.pop(peer) { + // To avoid obscuring the error from send, ignore a possible + // `TaskJoinFailure`. + let close_result = current_worker.shutdown().await; + if let Err(error) = close_result { + error!("Error while closing worker: {error}."); + } + } + } + + send_res + }; + + tokio::select! { + send_res = body => send_res, + () = cancel.cancelled() => Err(WorkersCacheError::ShutdownError), + } + } + + pub fn transaction_stats(&self) -> &HashMap { + &self.send_stats_per_addr + } + + /// Closes and removes all workers in the cache. This is typically done when + /// shutting down the system. + pub async fn shutdown(&mut self) { + // Interrupt any outstanding `send_txs()` calls. + self.cancel.cancel(); + + while let Some((leader, worker)) = self.workers.pop_lru() { + self.shutdown_worker(leader, worker).await; + } + } + + /// Shuts down a worker for a given peer by closing the worker and gathering + /// its transaction statistics. + async fn shutdown_worker(&mut self, leader: SocketAddr, worker: WorkerInfo) { + let res = worker.shutdown().await; + + let stats = match res { + Ok(stats) => stats, + Err(err) => { + debug!("Error while shutting down worker for {leader}: {err}"); + return; + } + }; + + self.send_stats_per_addr + .entry(leader.ip()) + .and_modify(|e| e.add(&stats)) + .or_insert(stats); + } +} diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs new file mode 100644 index 00000000000000..0ffabb6640f7a3 --- /dev/null +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -0,0 +1,671 @@ +use { + crossbeam_channel::Receiver as CrossbeamReceiver, + futures::future::BoxFuture, + solana_cli_config::ConfigInput, + solana_rpc_client::nonblocking::rpc_client::RpcClient, + solana_sdk::{ + commitment_config::CommitmentConfig, + pubkey::Pubkey, + signer::{keypair::Keypair, Signer}, + }, + solana_streamer::{ + nonblocking::testing_utilities::{ + make_client_endpoint, setup_quic_server, SpawnTestServerResult, TestServerConfig, + }, + packet::PacketBatch, + streamer::StakedNodes, + }, + solana_tpu_client_next::{ + connection_workers_scheduler::ConnectionWorkersSchedulerConfig, + leader_updater::create_leader_updater, transaction_batch::TransactionBatch, + ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStats, + SendTransactionStatsPerAddr, + }, + std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + num::Saturating, + str::FromStr, + sync::{atomic::Ordering, Arc}, + time::Duration, + }, + tokio::{ + sync::{ + mpsc::{channel, Receiver}, + oneshot, + }, + task::JoinHandle, + time::{sleep, Instant}, + }, + tokio_util::sync::CancellationToken, +}; + +fn test_config(validator_identity: Option) -> ConnectionWorkersSchedulerConfig { + ConnectionWorkersSchedulerConfig { + bind: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0), + stake_identity: validator_identity, + num_connections: 1, + skip_check_transaction_age: false, + worker_channel_size: 2, + max_reconnect_attempts: 4, + lookahead_slots: 1, + } +} + +async fn setup_connection_worker_scheduler( + tpu_address: SocketAddr, + transaction_receiver: Receiver, + validator_identity: Option, +) -> ( + JoinHandle>, + CancellationToken, +) { + let json_rpc_url = "http://127.0.0.1:8899"; + let (_, websocket_url) = ConfigInput::compute_websocket_url_setting("", "", json_rpc_url, ""); + + let rpc_client = Arc::new(RpcClient::new_with_commitment( + json_rpc_url.to_string(), + CommitmentConfig::confirmed(), + )); + + // Setup sending txs + let leader_updater = create_leader_updater(rpc_client, websocket_url, Some(tpu_address)) + .await + .expect("Leader updates was successfully created"); + + let cancel = CancellationToken::new(); + let config = test_config(validator_identity); + let scheduler = tokio::spawn(ConnectionWorkersScheduler::run( + config, + leader_updater, + transaction_receiver, + cancel.clone(), + )); + + (scheduler, cancel) +} + +async fn join_scheduler( + scheduler_handle: JoinHandle< + Result, + >, +) -> SendTransactionStats { + let stats_per_ip = scheduler_handle + .await + .unwrap() + .expect("Scheduler should stop successfully."); + stats_per_ip + .get(&IpAddr::from_str("127.0.0.1").unwrap()) + .expect("setup_connection_worker_scheduler() connected to a leader at 127.0.0.1") + .clone() +} + +// Specify the pessimistic time to finish generation and result checks. +const TEST_MAX_TIME: Duration = Duration::from_millis(2500); + +struct SpawnTxGenerator { + tx_receiver: Receiver, + tx_sender_shutdown: BoxFuture<'static, ()>, + tx_sender_done: oneshot::Receiver<()>, +} + +/// Generates `num_tx_batches` batches of transactions, each holding a single transaction of +/// `tx_size` bytes. +/// +/// It will not close the returned `tx_receiver` until `tx_sender_shutdown` is invoked. Otherwise, +/// there is a race condition, that exists between the last transaction being scheduled for delivery +/// and the server connection being closed. +fn spawn_tx_sender( + tx_size: usize, + num_tx_batches: usize, + time_per_tx: Duration, +) -> SpawnTxGenerator { + let num_tx_batches: u32 = num_tx_batches + .try_into() + .expect("`num_tx_batches` fits into u32 for all the tests"); + let (tx_sender, tx_receiver) = channel(1); + let cancel = CancellationToken::new(); + let (done_sender, tx_sender_done) = oneshot::channel(); + + let sender = tokio::spawn({ + let start = Instant::now(); + + let tx_sender = tx_sender.clone(); + + let main_loop = async move { + for i in 0..num_tx_batches { + let txs = vec![vec![i as u8; tx_size]; 1]; + tx_sender + .send(TransactionBatch::new(txs)) + .await + .expect("Receiver should not close their side"); + + // Pretend the client runs at the specified TPS. + let sleep_time = time_per_tx + .saturating_mul(i) + .saturating_sub(start.elapsed()); + if !sleep_time.is_zero() { + sleep(sleep_time).await; + } + } + + // It is OK if the receiver has disconnected. + let _ = done_sender.send(()); + }; + + let cancel = cancel.clone(); + async move { + tokio::select! { + () = main_loop => (), + () = cancel.cancelled() => (), + } + } + }); + + let tx_sender_shutdown = Box::pin(async move { + cancel.cancel(); + // This makes sure that the sender exists up until the shutdown is invoked. + drop(tx_sender); + + sender.await.unwrap(); + }); + + SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + tx_sender_done, + } +} + +#[tokio::test] +async fn test_basic_transactions_sending() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server(None, TestServerConfig::default()); + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 100; + // Pretend that we are running at ~100 TPS. + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(10)); + + let (scheduler_handle, _scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + // Check results + let mut received_data = Vec::with_capacity(expected_num_txs); + let now = Instant::now(); + let mut actual_num_packets = 0; + while actual_num_packets < expected_num_txs { + { + let elapsed = now.elapsed(); + assert!( + elapsed < TEST_MAX_TIME, + "Failed to send {} transaction in {:?}. Only sent {}", + expected_num_txs, + elapsed, + actual_num_packets, + ); + } + + let Ok(packets) = receiver.try_recv() else { + sleep(Duration::from_millis(10)).await; + continue; + }; + + actual_num_packets += packets.len(); + for p in packets.iter() { + let packet_id = p.data(0).expect("Data should not be lost by server."); + received_data.push(*packet_id); + assert_eq!(p.meta().size, 1); + } + } + + received_data.sort_unstable(); + for i in 1..received_data.len() { + assert_eq!(received_data[i - 1] + 1, received_data[i]); + } + + // Stop sending + tx_sender_shutdown.await; + let localhost_stats = join_scheduler(scheduler_handle).await; + assert_eq!( + localhost_stats, + SendTransactionStats { + successfully_sent: expected_num_txs as u64, + ..Default::default() + } + ); + + // Stop server + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +} + +async fn count_received_packets_for( + receiver: CrossbeamReceiver, + expected_tx_size: usize, + receive_duration: Duration, +) -> usize { + let now = Instant::now(); + let mut num_packets_received = Saturating(0usize); + + while now.elapsed() < receive_duration { + if let Ok(packets) = receiver.try_recv() { + num_packets_received += packets.len(); + for p in packets.iter() { + assert_eq!(p.meta().size, expected_tx_size); + } + } else { + sleep(Duration::from_millis(100)).await; + } + } + + num_packets_received.0 +} + +// Check that client can create connection even if the first several attempts were unsuccessful. +#[tokio::test] +async fn test_connection_denied_until_allowed() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server(None, TestServerConfig::default()); + + // To prevent server from accepting a new connection, we use the following observation. + // Since max_connections_per_peer == 1 (< max_unstaked_connections == 500), if we create a first + // connection and later try another one, the second connection will be immediately closed. + // + // Since client is not retrying sending failed transactions, this leads to the packets loss. + // The connection has been created and closed when we already have sent the data. + let throttling_connection = make_client_endpoint(&server_address, None).await; + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 10; + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100)); + + let (scheduler_handle, _scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + // Check results + let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await; + assert!( + actual_num_packets < expected_num_txs, + "Expected to receive {expected_num_txs} packets in {TEST_MAX_TIME:?}\n\ + Got packets: {actual_num_packets}" + ); + + // Wait for the exchange to finish. + tx_sender_shutdown.await; + let localhost_stats = join_scheduler(scheduler_handle).await; + // in case of pruning, server closes the connection with code 1 and error + // message b"dropped". This might lead to connection error + // (ApplicationClosed::ApplicationClose) or to stream error + // (ConnectionLost::ApplicationClosed::ApplicationClose). + assert_eq!( + localhost_stats.write_error_connection_lost + + localhost_stats.connection_error_application_closed, + 1 + ); + + drop(throttling_connection); + + // Exit server + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +} + +// Check that if the client connection has been pruned, client manages to +// reestablish it. Pruning will lead to 1 packet loss, because when we send the +// next packet we will reestablish connection. +#[tokio::test] +async fn test_connection_pruned_and_reopened() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server( + None, + TestServerConfig { + max_connections_per_peer: 100, + max_unstaked_connections: 1, + ..Default::default() + }, + ); + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 16; + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100)); + + let (scheduler_handle, _scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + sleep(Duration::from_millis(400)).await; + let _connection_to_prune_client = make_client_endpoint(&server_address, None).await; + + // Check results + let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await; + assert!(actual_num_packets < expected_num_txs); + + // Wait for the exchange to finish. + tx_sender_shutdown.await; + let localhost_stats = join_scheduler(scheduler_handle).await; + // in case of pruning, server closes the connection with code 1 and error + // message b"dropped". This might lead to connection error + // (ApplicationClosed::ApplicationClose) or to stream error + // (ConnectionLost::ApplicationClosed::ApplicationClose). + assert_eq!( + localhost_stats.connection_error_application_closed + + localhost_stats.write_error_connection_lost, + 1, + ); + + // Exit server + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +} + +/// Check that client creates staked connection. To do that prohibit unstaked +/// connection and verify that all the txs has been received. +#[tokio::test] +async fn test_staked_connection() { + let validator_identity = Keypair::new(); + let stakes = HashMap::from([(validator_identity.pubkey(), 100_000)]); + let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::::default()); + + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server( + Some(staked_nodes), + TestServerConfig { + // Must use at least the number of endpoints (10) because + // `max_staked_connections` and `max_unstaked_connections` are + // cumulative for all the endpoints. + max_staked_connections: 10, + max_unstaked_connections: 0, + ..Default::default() + }, + ); + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 10; + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100)); + + let (scheduler_handle, _scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, Some(validator_identity)) + .await; + + // Check results + let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await; + assert_eq!(actual_num_packets, expected_num_txs); + + // Wait for the exchange to finish. + tx_sender_shutdown.await; + let localhost_stats = join_scheduler(scheduler_handle).await; + assert_eq!( + localhost_stats, + SendTransactionStats { + successfully_sent: expected_num_txs as u64, + ..Default::default() + } + ); + + // Exit server + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +} + +// Check that if client sends transactions at a reasonably high rate that is +// higher than what the server accepts, nevertheless all the transactions are +// delivered and there are no errors on the client side. +#[tokio::test] +async fn test_connection_throttling() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server(None, TestServerConfig::default()); + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 50; + // Send at 1000 TPS - x10 more than the throttling interval of 10ms used in other tests allows. + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(1)); + + let (scheduler_handle, _scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + // Check results + let actual_num_packets = + count_received_packets_for(receiver, tx_size, Duration::from_secs(1)).await; + assert_eq!(actual_num_packets, expected_num_txs); + + // Stop sending + tx_sender_shutdown.await; + let localhost_stats = join_scheduler(scheduler_handle).await; + assert_eq!( + localhost_stats, + SendTransactionStats { + successfully_sent: expected_num_txs as u64, + ..Default::default() + } + ); + + // Exit server + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +} + +// Check that when the host cannot be reached, the client exits gracefully. +#[tokio::test] +async fn test_no_host() { + // A "black hole" address for the TPU. + let server_ip = IpAddr::V6(Ipv6Addr::new(0x100, 0, 0, 0, 0, 0, 0, 1)); + let server_address = SocketAddr::new(server_ip, 49151); + + // Setup sending side. + let tx_size = 1; + let max_send_attempts: usize = 10; + + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + tx_sender_done, + .. + } = spawn_tx_sender(tx_size, max_send_attempts, Duration::from_millis(10)); + + let (scheduler_handle, _scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + // Wait for all the transactions to be sent, and some extra time for the delivery to be + // attempted. + tx_sender_done.await.unwrap(); + sleep(Duration::from_millis(100)).await; + + // Wait for the generator to finish. + tx_sender_shutdown.await; + + // While attempting to establish a connection with a nonexistent host, we fill the worker's + // channel. Transactions from this channel will never be sent and will eventually be dropped + // without increasing the `SendTransactionStats` counters. + let stats = scheduler_handle + .await + .expect("Scheduler should stop successfully") + .expect("Scheduler execution was successful"); + assert_eq!(stats, HashMap::new()); +} + +// Check that when the client is rate-limited by server, we update counters +// accordingly. To implement it we: +// * set the connection limit per minute to 1 +// * create a dummy connection to reach the limit and immediately close it +// * set up client which will try to create a new connection which it will be +// rate-limited. This test doesn't check what happens when the rate-limiting +// period ends because it too long for test (1min). +#[tokio::test] +async fn test_rate_limiting() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server( + None, + TestServerConfig { + max_connections_per_peer: 100, + max_connections_per_ipaddr_per_minute: 1, + ..Default::default() + }, + ); + + let connection_to_reach_limit = make_client_endpoint(&server_address, None).await; + drop(connection_to_reach_limit); + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 16; + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100)); + + let (scheduler_handle, scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await; + assert_eq!(actual_num_packets, 0); + + // Stop the sender. + tx_sender_shutdown.await; + + // And the scheduler. + scheduler_cancel.cancel(); + let localhost_stats = join_scheduler(scheduler_handle).await; + + // We do not expect to see any errors, as the connection is in the pending state still, when we + // do the shutdown. If we increase the time we wait in `count_received_packets_for`, we would + // start seeing a `connection_error_timed_out` incremented to 1. Potentially, we may want to + // accept both 0 and 1 as valid values for it. + assert_eq!(localhost_stats, SendTransactionStats::default()); + + // Stop the server. + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +} + +// The same as test_rate_limiting but here we wait for 1 min to check that the +// connection has been established. +#[tokio::test] +// TODO Provide an alternative testing interface for `streamer::nonblocking::quic::spawn_server` +// that would accept throttling at a granularity below 1 minute. +#[ignore = "takes 70s to complete"] +async fn test_rate_limiting_establish_connection() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server( + None, + TestServerConfig { + max_connections_per_peer: 100, + max_connections_per_ipaddr_per_minute: 1, + ..Default::default() + }, + ); + + let connection_to_reach_limit = make_client_endpoint(&server_address, None).await; + drop(connection_to_reach_limit); + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 65; + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(1000)); + + let (scheduler_handle, scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + let actual_num_packets = + count_received_packets_for(receiver, tx_size, Duration::from_secs(70)).await; + assert!( + actual_num_packets > 0, + "As we wait longer than 1 minute, at least one transaction should be delivered. \ + After 1 minute the server is expected to accept our connection.\n\ + Actual packets delivered: {actual_num_packets}" + ); + + // Stop the sender. + tx_sender_shutdown.await; + + // And the scheduler. + scheduler_cancel.cancel(); + let mut localhost_stats = join_scheduler(scheduler_handle).await; + assert!( + localhost_stats.connection_error_timed_out > 0, + "As the quinn timeout is below 1 minute, a few connections will fail to connect during \ + the 1 minute delay.\n\ + Actual connection_error_timed_out: {}", + localhost_stats.connection_error_timed_out + ); + assert!( + localhost_stats.successfully_sent > 0, + "As we run the test for longer than 1 minute, we expect a connection to be established, \ + and a number of transactions to be delivered.\n\ + Actual successfully_sent: {}", + localhost_stats.successfully_sent + ); + + // All the rest of the error counters should be 0. + localhost_stats.connection_error_timed_out = 0; + localhost_stats.successfully_sent = 0; + assert_eq!(localhost_stats, SendTransactionStats::default()); + + // Stop the server. + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +}