From 722e0836b200f77161d66624ed823e71d216cf09 Mon Sep 17 00:00:00 2001 From: buffalu <85544055+buffalu@users.noreply.github.com> Date: Thu, 7 Nov 2024 21:00:11 -0600 Subject: [PATCH] Separate out broadcast + retransmit shredstream (#703) --- core/src/admin_rpc_post_init.rs | 1 + core/src/validator.rs | 4 ++++ local-cluster/src/validator_configs.rs | 1 + validator/src/admin_rpc_service.rs | 31 ++++++++++++++++++++++++++ validator/src/cli.rs | 21 ++++++++++++++++- validator/src/main.rs | 23 +++++++++++++++++++ 6 files changed, 80 insertions(+), 1 deletion(-) diff --git a/core/src/admin_rpc_post_init.rs b/core/src/admin_rpc_post_init.rs index 425a4375c1..c58ba0db9c 100644 --- a/core/src/admin_rpc_post_init.rs +++ b/core/src/admin_rpc_post_init.rs @@ -27,4 +27,5 @@ pub struct AdminRpcRequestMetadataPostInit { pub block_engine_config: Arc>, pub relayer_config: Arc>, pub shred_receiver_address: Arc>>, + pub shred_retransmit_receiver_address: Arc>>, } diff --git a/core/src/validator.rs b/core/src/validator.rs index 7977b9aff8..0072756a51 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -290,6 +290,7 @@ pub struct ValidatorConfig { pub block_engine_config: Arc>, // Using Option inside RwLock is ugly, but only convenient way to allow toggle on/off pub shred_receiver_address: Arc>>, + pub shred_retransmit_receiver_address: Arc>>, pub tip_manager_config: TipManagerConfig, pub preallocated_bundle_cost: u64, } @@ -367,6 +368,7 @@ impl Default for ValidatorConfig { relayer_config: Arc::new(Mutex::new(RelayerConfig::default())), block_engine_config: Arc::new(Mutex::new(BlockEngineConfig::default())), shred_receiver_address: Arc::new(RwLock::new(None)), + shred_retransmit_receiver_address: Arc::new(RwLock::new(None)), tip_manager_config: TipManagerConfig::default(), preallocated_bundle_cost: u64::default(), } @@ -1409,6 +1411,7 @@ impl Validator { cluster_slots.clone(), wen_restart_repair_slots.clone(), config.shred_receiver_address.clone(), + config.shred_retransmit_receiver_address.clone(), )?; if in_wen_restart { @@ -1509,6 +1512,7 @@ impl Validator { block_engine_config: config.block_engine_config.clone(), relayer_config: config.relayer_config.clone(), shred_receiver_address: config.shred_receiver_address.clone(), + shred_retransmit_receiver_address: config.shred_retransmit_receiver_address.clone(), }); Ok(Self { diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 05971e1f02..48c97b655b 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -76,6 +76,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { relayer_config: config.relayer_config.clone(), block_engine_config: config.block_engine_config.clone(), shred_receiver_address: config.shred_receiver_address.clone(), + shred_retransmit_receiver_address: config.shred_retransmit_receiver_address.clone(), tip_manager_config: config.tip_manager_config.clone(), preallocated_bundle_cost: config.preallocated_bundle_cost, } diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 4cd7b44c93..0fe758bfe9 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -269,6 +269,13 @@ pub trait AdminRpc { #[rpc(meta, name = "setShredReceiverAddress")] fn set_shred_receiver_address(&self, meta: Self::Metadata, addr: String) -> Result<()>; + + #[rpc(meta, name = "setShredRetransmitReceiverAddress")] + fn set_shred_retransmit_receiver_address( + &self, + meta: Self::Metadata, + addr: String, + ) -> Result<()>; } pub struct AdminRpcImpl; @@ -574,6 +581,28 @@ impl AdminRpc for AdminRpcImpl { }) } + fn set_shred_retransmit_receiver_address( + &self, + meta: Self::Metadata, + addr: String, + ) -> Result<()> { + let shred_receiver_address = if addr.is_empty() { + None + } else { + Some(SocketAddr::from_str(&addr).map_err(|_| { + jsonrpc_core::error::Error::invalid_params(format!( + "invalid shred receiver address: {}", + addr + )) + })?) + }; + + meta.with_post_init(|post_init| { + *post_init.shred_retransmit_receiver_address.write().unwrap() = shred_receiver_address; + Ok(()) + }) + } + fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> { let loaded_config = load_staked_nodes_overrides(&path) .map_err(|err| { @@ -1023,6 +1052,7 @@ mod tests { let block_engine_config = Arc::new(Mutex::new(BlockEngineConfig::default())); let relayer_config = Arc::new(Mutex::new(RelayerConfig::default())); let shred_receiver_address = Arc::new(RwLock::new(None)); + let shred_retransmit_receiver_address = Arc::new(RwLock::new(None)); let meta = AdminRpcRequestMetadata { rpc_addr: None, start_time: SystemTime::now(), @@ -1046,6 +1076,7 @@ mod tests { block_engine_config, relayer_config, shred_receiver_address, + shred_retransmit_receiver_address, }))), staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())), rpc_to_plugin_manager_sender: None, diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 8d8c4767a6..d7af738af0 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -164,7 +164,14 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .long("shred-receiver-address") .value_name("SHRED_RECEIVER_ADDRESS") .takes_value(true) - .help("Validator will forward all shreds to this address in addition to normal turbine operation. Set to empty string to disable.") + .help("Validator will forward all leader shreds to this address in addition to normal turbine operation. Set to empty string to disable.") + ) + .arg( + Arg::with_name("shred_retransmit_receiver_address") + .long("shred-retransmit-receiver-address") + .value_name("SHRED_RETRANSMIT_RECEIVER_ADDRESS") + .takes_value(true) + .help("Validator will forward all retransmit shreds to this address in addition to normal turbine operation. Set to empty string to disable.") ) .arg( Arg::with_name("identity") @@ -1728,6 +1735,18 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .required(true) ) ) + .subcommand( + SubCommand::with_name("set-shred-retransmit-receiver-address") + .about("Changes shred retransmit receiver address") + .arg( + Arg::with_name("shred_receiver_address") + .long("shred-receiver-address") + .value_name("SHRED_RECEIVER_ADDRESS") + .takes_value(true) + .help("Validator will forward all retransmit shreds to this address in addition to normal turbine operation. Set to empty string to disable.") + .required(true) + ) + ) .subcommand( SubCommand::with_name("exit") .about("Send an exit request to the validator") diff --git a/validator/src/main.rs b/validator/src/main.rs index 9ce6a23b79..083ed464ff 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -536,6 +536,22 @@ pub fn main() { }); return; } + ("set-shred-retransmit-receiver-address", Some(subcommand_matches)) => { + let addr = value_t_or_exit!(subcommand_matches, "shred_receiver_address", String); + let admin_client = admin_rpc_service::connect(&ledger_path); + admin_rpc_service::runtime() + .block_on(async move { + admin_client + .await? + .set_shred_retransmit_receiver_address(addr) + .await + }) + .unwrap_or_else(|err| { + println!("set shred receiver address failed: {}", err); + exit(1); + }); + return; + } ("authorized-voter", Some(authorized_voter_subcommand_matches)) => { match authorized_voter_subcommand_matches.subcommand() { ("add", Some(subcommand_matches)) => { @@ -1706,6 +1722,13 @@ pub fn main() { .value_of("shred_receiver_address") .map(|addr| SocketAddr::from_str(addr).expect("shred_receiver_address invalid")), )), + shred_retransmit_receiver_address: Arc::new(RwLock::new( + matches + .value_of("shred_retransmit_receiver_address") + .map(|addr| { + SocketAddr::from_str(addr).expect("shred_retransmit_receiver_address invalid") + }), + )), staked_nodes_overrides: staked_nodes_overrides.clone(), use_snapshot_archives_at_startup: value_t_or_exit!( matches,