Skip to content

Commit

Permalink
Add a new client implementation targeting TPU (anza-xyz#2905)
Browse files Browse the repository at this point in the history
Although tpu-client, component which is currently used for sending transactions over TPU, suited for bulk transaction sent, it was not designed for handling the stream of transactions. Additionally, the call stack for sending transactions using tpu-client has grown too deep, making the code difficult to maintain. This motivated us to create a new client implementation that is optimized for handling transaction streams and is built using Tokio from the start.
  • Loading branch information
KirillLykov authored Oct 14, 2024
1 parent 6c264d7 commit f03bce5
Show file tree
Hide file tree
Showing 15 changed files with 1,930 additions and 0 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ members = [
"tokens",
"tps-client",
"tpu-client",
"tpu-client-next",
"transaction-dos",
"transaction-metrics-tracker",
"transaction-status",
Expand Down
34 changes: 34 additions & 0 deletions tpu-client-next/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "solana-tpu-client-next"
description = "Client code to send transaction to TPU."
version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }

[dependencies]
async-trait = { workspace = true }
log = { workspace = true }
lru = { workspace = true }
quinn = { workspace = true }
rustls = { workspace = true }
solana-connection-cache = { workspace = true }
solana-logger = { workspace = true }
solana-measure = { workspace = true }
solana-rpc-client = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-tpu-client = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }

[dev-dependencies]
crossbeam-channel = { workspace = true }
futures = { workspace = true }
solana-cli-config = { workspace = true }

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
258 changes: 258 additions & 0 deletions tpu-client-next/src/connection_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
//! This module defines [`ConnectionWorker`] which encapsulates the functionality
//! needed to handle one connection within the scope of task.
use {
super::SendTransactionStats,
crate::{
quic_networking::send_data_over_stream, send_transaction_stats::record_error,
transaction_batch::TransactionBatch,
},
log::*,
quinn::{ConnectError, Connection, Endpoint},
solana_measure::measure::Measure,
solana_sdk::{
clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS},
timing::timestamp,
},
std::net::SocketAddr,
tokio::{
sync::mpsc,
time::{sleep, Duration},
},
tokio_util::sync::CancellationToken,
};

/// Interval between retry attempts for creating a new connection. This value is
/// a best-effort estimate, based on current network conditions.
const RETRY_SLEEP_INTERVAL: Duration =
Duration::from_millis(NUM_CONSECUTIVE_LEADER_SLOTS * DEFAULT_MS_PER_SLOT);

/// Maximum age (in milliseconds) of a blockhash, beyond which transaction
/// batches are dropped.
const MAX_PROCESSING_AGE_MS: u64 = MAX_PROCESSING_AGE as u64 * DEFAULT_MS_PER_SLOT;

/// [`ConnectionState`] represents the current state of a quic connection.
///
/// It tracks the lifecycle of connection from initial setup to closing phase.
/// The transition function between states is defined in `ConnectionWorker`
/// implementation.
enum ConnectionState {
NotSetup,
Active(Connection),
Retry(usize),
Closing,
}

impl Drop for ConnectionState {
/// When [`ConnectionState`] is dropped, underlying connection is closed
/// which means that there is no guarantee that the open streams will
/// finish.
fn drop(&mut self) {
if let Self::Active(connection) = self {
debug!(
"Close connection with {:?}, stats: {:?}. All pending streams will be dropped.",
connection.remote_address(),
connection.stats()
);
connection.close(0u32.into(), b"done");
}
}
}

/// [`ConnectionWorker`] holds connection to the validator with address `peer`.
///
/// If connection has been closed, [`ConnectionWorker`] tries to reconnect
/// `max_reconnect_attempts` times. If connection is in `Active` state, it sends
/// transactions received from `transactions_receiver`. Additionally, it
/// accumulates statistics about connections and streams failures.
pub(crate) struct ConnectionWorker {
endpoint: Endpoint,
peer: SocketAddr,
transactions_receiver: mpsc::Receiver<TransactionBatch>,
connection: ConnectionState,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: SendTransactionStats,
cancel: CancellationToken,
}

impl ConnectionWorker {
/// Constructs a [`ConnectionWorker`].
///
/// [`ConnectionWorker`] maintains a connection to a `peer` and processes
/// transactions from `transactions_receiver`. If
/// `skip_check_transaction_age` is set to `true`, the worker skips checking
/// for transaction blockhash expiration. The `max_reconnect_attempts`
/// parameter controls how many times the worker will attempt to reconnect
/// in case of connection failure. Returns the created `ConnectionWorker`
/// along with a cancellation token that can be used by the caller to stop
/// the worker.
pub fn new(
endpoint: Endpoint,
peer: SocketAddr,
transactions_receiver: mpsc::Receiver<TransactionBatch>,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
) -> (Self, CancellationToken) {
let cancel = CancellationToken::new();

let this = Self {
endpoint,
peer,
transactions_receiver,
connection: ConnectionState::NotSetup,
skip_check_transaction_age,
max_reconnect_attempts,
send_txs_stats: SendTransactionStats::default(),
cancel: cancel.clone(),
};

(this, cancel)
}

/// Starts the main loop of the [`ConnectionWorker`].
///
/// This method manages the connection to the peer and handles state
/// transitions. It runs indefinitely until the connection is closed or an
/// unrecoverable error occurs.
pub async fn run(&mut self) {
let cancel = self.cancel.clone();

let main_loop = async move {
loop {
match &self.connection {
ConnectionState::Closing => {
break;
}
ConnectionState::NotSetup => {
self.create_connection(0).await;
}
ConnectionState::Active(connection) => {
let Some(transactions) = self.transactions_receiver.recv().await else {
debug!("Transactions sender has been dropped.");
self.connection = ConnectionState::Closing;
continue;
};
self.send_transactions(connection.clone(), transactions)
.await;
}
ConnectionState::Retry(num_reconnects) => {
if *num_reconnects > self.max_reconnect_attempts {
error!("Failed to establish connection: reach max reconnect attempts.");
self.connection = ConnectionState::Closing;
continue;
}
sleep(RETRY_SLEEP_INTERVAL).await;
self.reconnect(*num_reconnects).await;
}
}
}
};

tokio::select! {
() = main_loop => (),
() = cancel.cancelled() => (),
}
}

/// Retrieves the statistics for transactions sent by this worker.
pub fn transaction_stats(&self) -> &SendTransactionStats {
&self.send_txs_stats
}

/// Sends a batch of transactions using the provided `connection`.
///
/// Each transaction in the batch is sent over the QUIC streams one at the
/// time, which prevents traffic fragmentation and shows better TPS in
/// comparison with multistream send. If the batch is determined to be
/// outdated and flag `skip_check_transaction_age` is unset, it will be
/// dropped without being sent.
///
/// In case of error, it doesn't retry to send the same transactions again.
async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) {
let now = timestamp();
if !self.skip_check_transaction_age
&& now.saturating_sub(transactions.timestamp()) > MAX_PROCESSING_AGE_MS
{
debug!("Drop outdated transaction batch.");
return;
}
let mut measure_send = Measure::start("send transaction batch");
for data in transactions.into_iter() {
let result = send_data_over_stream(&connection, &data).await;

if let Err(error) = result {
trace!("Failed to send transaction over stream with error: {error}.");
record_error(error, &mut self.send_txs_stats);
self.connection = ConnectionState::Retry(0);
} else {
self.send_txs_stats.successfully_sent =
self.send_txs_stats.successfully_sent.saturating_add(1);
}
}
measure_send.stop();
debug!(
"Time to send transactions batch: {} us",
measure_send.as_us()
);
}

/// Attempts to create a new connection to the specified `peer` address.
///
/// If the connection is successful, the state is updated to `Active`.
///
/// If an error occurs, the state may transition to `Retry` or `Closing`,
/// depending on the nature of the error.
async fn create_connection(&mut self, max_retries_attempt: usize) {
let connecting = self.endpoint.connect(self.peer, "connect");
match connecting {
Ok(connecting) => {
let mut measure_connection = Measure::start("establish connection");
let res = connecting.await;
measure_connection.stop();
debug!(
"Establishing connection with {} took: {} us",
self.peer,
measure_connection.as_us()
);
match res {
Ok(connection) => {
self.connection = ConnectionState::Active(connection);
}
Err(err) => {
warn!("Connection error {}: {}", self.peer, err);
record_error(err.into(), &mut self.send_txs_stats);
self.connection =
ConnectionState::Retry(max_retries_attempt.saturating_add(1));
}
}
}
Err(connecting_error) => {
record_error(connecting_error.clone().into(), &mut self.send_txs_stats);
match connecting_error {
ConnectError::EndpointStopping => {
debug!("Endpoint stopping, exit connection worker.");
self.connection = ConnectionState::Closing;
}
ConnectError::InvalidRemoteAddress(_) => {
warn!("Invalid remote address.");
self.connection = ConnectionState::Closing;
}
e => {
error!("Unexpected error has happen while trying to create connection {e}");
self.connection = ConnectionState::Closing;
}
}
}
}
}

/// Attempts to reconnect to the peer after a connection failure.
async fn reconnect(&mut self, num_reconnects: usize) {
debug!("Trying to reconnect. Reopen connection, 0rtt is not implemented yet.");
// We can reconnect using 0rtt, but not a priority for now. Check if we
// need to call config.enable_0rtt() on the client side and where
// session tickets are stored.
self.create_connection(num_reconnects).await;
}
}
Loading

0 comments on commit f03bce5

Please sign in to comment.