Skip to content

Commit

Permalink
v1.17: rpc-sts: add config options for stake-weighted qos (backport of
Browse files Browse the repository at this point in the history
…#197) (#340)

rpc-sts: add config options for stake-weighted qos (#197)

* rpc-sts: plumb options for swqos config

* rpc-sts: send to specific tpu peers when configured

(cherry picked from commit f41fb84)

# Conflicts:
#	send-transaction-service/src/send_transaction_service.rs
#	validator/src/cli.rs
#	validator/src/main.rs

Co-authored-by: Trent Nelson <[email protected]>
  • Loading branch information
2 people authored and willhickey committed Mar 23, 2024
1 parent 4f3f939 commit 845f0d4
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 9 deletions.
22 changes: 18 additions & 4 deletions send-transaction-service/src/send_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub struct Config {
pub batch_size: usize,
/// How frequently batches are sent
pub batch_send_rate_ms: u64,
pub tpu_peers: Option<Vec<SocketAddr>>,
}

impl Default for Config {
Expand All @@ -125,6 +126,7 @@ impl Default for Config {
service_max_retries: DEFAULT_SERVICE_MAX_RETRIES,
batch_size: DEFAULT_TRANSACTION_BATCH_SIZE,
batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS,
tpu_peers: None,
}
}
}
Expand Down Expand Up @@ -565,12 +567,18 @@ impl SendTransactionService {
stats: &SendTransactionServiceStats,
) {
// Processing the transactions in batch
let addresses = Self::get_tpu_addresses_with_slots(
let mut addresses = config
.tpu_peers
.as_ref()
.map(|addrs| addrs.iter().map(|a| (a, 0)).collect::<Vec<_>>())
.unwrap_or_default();
let leader_addresses = Self::get_tpu_addresses_with_slots(
tpu_address,
leader_info,
config,
connection_cache.protocol(),
);
addresses.extend(leader_addresses);

let wire_transactions = transactions
.iter()
Expand All @@ -583,8 +591,8 @@ impl SendTransactionService {
})
.collect::<Vec<&[u8]>>();

for address in &addresses {
Self::send_transactions(address.0, &wire_transactions, connection_cache, stats);
for (address, _) in &addresses {
Self::send_transactions(address, &wire_transactions, connection_cache, stats);
}
}

Expand Down Expand Up @@ -701,14 +709,20 @@ impl SendTransactionService {

let iter = wire_transactions.chunks(config.batch_size);
for chunk in iter {
let mut addresses = config
.tpu_peers
.as_ref()
.map(|addrs| addrs.iter().collect::<Vec<_>>())
.unwrap_or_default();
let mut leader_info_provider = leader_info_provider.lock().unwrap();
let leader_info = leader_info_provider.get_leader_info();
let addresses = Self::get_tpu_addresses(
let leader_addresses = Self::get_tpu_addresses(
tpu_address,
leader_info,
config,
connection_cache.protocol(),
);
addresses.extend(leader_addresses);

for address in &addresses {
Self::send_transactions(address, chunk, connection_cache, stats);
Expand Down
16 changes: 16 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,22 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.default_value(&default_args.rpc_send_transaction_batch_size)
.help("The size of transactions to be sent in batch."),
)
.arg(
Arg::with_name("rpc_send_transaction_tpu_peer")
.long("rpc-send-transaction-tpu-peer")
.takes_value(true)
.number_of_values(1)
.multiple(true)
.value_name("HOST:PORT")
.validator(solana_net_utils::is_host_port)
.help("Peer(s) to broadcast transactions to instead of the current leader")
)
.arg(
Arg::with_name("rpc_send_transaction_also_leader")
.long("rpc-send-transaction-also-leader")
.requires("rpc_send_transaction_tpu_peer")
.help("With `--rpc-send-transaction-tpu-peer HOST:PORT`, also send to the current leader")
)
.arg(
Arg::with_name("rpc_scan_and_fix_roots")
.long("rpc-scan-and-fix-roots")
Expand Down
28 changes: 23 additions & 5 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,27 @@ pub fn main() {
);
exit(1);
}
let rpc_send_transaction_tpu_peers = matches
.values_of("rpc_send_transaction_tpu_peer")
.map(|values| {
values
.map(solana_net_utils::parse_host_port)
.collect::<Result<Vec<SocketAddr>, String>>()
})
.transpose()
.unwrap_or_else(|e| {
eprintln!("failed to parse rpc send-transaction-service tpu peer address: {e}");
exit(1);
});
let rpc_send_transaction_also_leader = matches.is_present("rpc_send_transaction_also_leader");
let leader_forward_count =
if rpc_send_transaction_tpu_peers.is_some() && !rpc_send_transaction_also_leader {
// rpc-sts is configured to send only to specific tpu peers. disable leader forwards
0
} else {
value_t_or_exit!(matches, "rpc_send_transaction_leader_forward_count", u64)
};

let full_api = matches.is_present("full_rpc_api");

let mut validator_config = ValidatorConfig {
Expand Down Expand Up @@ -1359,11 +1380,7 @@ pub fn main() {
contact_debug_interval,
send_transaction_service_config: send_transaction_service::Config {
retry_rate_ms: rpc_send_retry_rate_ms,
leader_forward_count: value_t_or_exit!(
matches,
"rpc_send_transaction_leader_forward_count",
u64
),
leader_forward_count,
default_max_retries: value_t!(
matches,
"rpc_send_transaction_default_max_retries",
Expand All @@ -1377,6 +1394,7 @@ pub fn main() {
),
batch_send_rate_ms: rpc_send_batch_send_rate_ms,
batch_size: rpc_send_batch_size,
tpu_peers: rpc_send_transaction_tpu_peers,
},
no_poh_speed_test: matches.is_present("no_poh_speed_test"),
no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
Expand Down

0 comments on commit 845f0d4

Please sign in to comment.