Skip to content

Commit

Permalink
Warn if vaildator appears to repairing excessively
Browse files Browse the repository at this point in the history
  • Loading branch information
jbiseda committed Sep 20, 2023
1 parent 288e8a6 commit 18e6e57
Show file tree
Hide file tree
Showing 36 changed files with 199 additions and 81 deletions.
2 changes: 1 addition & 1 deletion banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ fn main() {
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified, None)
};
let cluster_info = Arc::new(cluster_info);
let tpu_disable_quic = matches.is_present("tpu_disable_quic");
Expand Down
2 changes: 1 addition & 1 deletion bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl<O: BucketOccupied> BucketStorage<O> {

/// delete the backing file on disk
fn delete(&self) {
_ = remove_file(&self.path);
_ = remove_file(&self.path);
}

pub fn max_search(&self) -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified, None)
};
let cluster_info = Arc::new(cluster_info);
let (s, _r) = unbounded();
Expand Down
2 changes: 1 addition & 1 deletion core/src/accounts_hash_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ mod tests {
fn new_test_cluster_info() -> ClusterInfo {
let keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified)
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified, None)
}

#[test]
Expand Down
8 changes: 6 additions & 2 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,12 @@ mod tests {
pub(crate) fn new_test_cluster_info(keypair: Option<Arc<Keypair>>) -> (Node, ClusterInfo) {
let keypair = keypair.unwrap_or_else(|| Arc::new(Keypair::new()));
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
let cluster_info =
ClusterInfo::new(node.info.clone(), keypair, SocketAddrSpace::Unspecified);
let cluster_info = ClusterInfo::new(
node.info.clone(),
keypair,
SocketAddrSpace::Unspecified,
None,
);
(node, cluster_info)
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/cluster_slots_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ mod test {
let keypair = Arc::new(Keypair::new());
let pubkey = keypair.pubkey();
let node_info = Node::new_localhost_with_pubkey(&pubkey);
let cluster_info = ClusterInfo::new(node_info.info, keypair, SocketAddrSpace::Unspecified);
let cluster_info =
ClusterInfo::new(node_info.info, keypair, SocketAddrSpace::Unspecified, None);
ClusterSlotsService::update_lowest_slot(5, &cluster_info);
cluster_info.flush_push_queue();
let lowest = {
Expand Down
2 changes: 2 additions & 0 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,7 @@ mod test {
responder_node.info.clone(),
Arc::new(keypair),
SocketAddrSpace::Unspecified,
None,
);
let responder_serve_repair = ServeRepair::new(
Arc::new(cluster_info),
Expand Down Expand Up @@ -1350,6 +1351,7 @@ mod test {
Node::new_localhost_with_pubkey(&keypair.pubkey()).info,
Arc::new(keypair),
SocketAddrSpace::Unspecified,
None,
));
let repair_whitelist = Arc::new(RwLock::new(HashSet::default()));
let requester_serve_repair = ServeRepair::new(
Expand Down
2 changes: 1 addition & 1 deletion core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ mod test {
fn new_test_cluster_info() -> ClusterInfo {
let keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified)
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified, None)
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1969,7 +1969,7 @@ mod tests {
fn new_test_cluster_info() -> ClusterInfo {
let keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified)
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified, None)
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4110,6 +4110,7 @@ pub(crate) mod tests {
Node::new_localhost_with_pubkey(&my_pubkey).info,
Arc::new(my_keypairs.node_keypair.insecure_clone()),
SocketAddrSpace::Unspecified,
None,
);
assert_eq!(my_pubkey, cluster_info.id());

Expand Down
8 changes: 6 additions & 2 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,12 @@ pub mod tests {
let (repair_quic_endpoint_sender, _repair_quic_endpoint_receiver) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);
//start cluster_info1
let cluster_info1 =
ClusterInfo::new(target1.info.clone(), keypair, SocketAddrSpace::Unspecified);
let cluster_info1 = ClusterInfo::new(
target1.info.clone(),
keypair,
SocketAddrSpace::Unspecified,
None,
);
cluster_info1.insert_info(leader.info);
let cref1 = Arc::new(cluster_info1);

Expand Down
3 changes: 2 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ impl Validator {
node.info.clone(),
identity_keypair.clone(),
socket_addr_space,
config.known_validators.clone(),
);
cluster_info.set_contact_debug_interval(config.contact_debug_interval);
cluster_info.set_entrypoints(cluster_entrypoints);
Expand Down Expand Up @@ -971,7 +972,6 @@ impl Validator {
ledger_path,
config.validator_exit.clone(),
exit.clone(),
config.known_validators.clone(),
rpc_override_health_check.clone(),
startup_verification_complete,
optimistically_confirmed_bank.clone(),
Expand Down Expand Up @@ -2576,6 +2576,7 @@ mod tests {
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
SocketAddrSpace::Unspecified,
None,
);

let (genesis_config, _mint_keypair) = create_genesis_config(1);
Expand Down
39 changes: 38 additions & 1 deletion core/src/window_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! `window_service` handles the data plane incoming shreds, storing them in
//! blockstore and retransmitting where required
//!
use {
crate::{
cluster_info_vote_listener::VerifiedVoteReceiver,
Expand All @@ -21,6 +22,10 @@ use {
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
blockstore_metrics::{
EXCESSIVE_REPAIR_SAMPLES_MAX, EXCESSIVE_REPAIR_SAMPLES_THRESHOLD,
EXCESSIVE_REPAIR_SLOT_DISTANCE,
},
leader_schedule_cache::LeaderScheduleCache,
shred::{self, Nonce, ReedSolomonCache, Shred},
},
Expand Down Expand Up @@ -361,14 +366,15 @@ impl WindowService {
let (duplicate_sender, duplicate_receiver) = unbounded();

let t_check_duplicate = Self::start_check_duplicate_thread(
cluster_info,
cluster_info.clone(),
exit.clone(),
blockstore.clone(),
duplicate_receiver,
duplicate_slots_sender,
);

let t_insert = Self::start_window_insert_thread(
cluster_info,
exit,
blockstore,
leader_schedule_cache,
Expand Down Expand Up @@ -415,7 +421,35 @@ impl WindowService {
.unwrap()
}

fn report_excessive_repairs(
cluster_info: &ClusterInfo,
metrics: &mut BlockstoreInsertionMetrics,
) {
// Only check when we have a full sample set
if metrics.excessive_repairs.len() >= EXCESSIVE_REPAIR_SAMPLES_MAX {
let excessive_repair_count = metrics.excessive_repairs.iter().filter(|x| **x).count();
if excessive_repair_count > EXCESSIVE_REPAIR_SAMPLES_THRESHOLD {
// We're repairing many slots, check if we think we're caught up
if let (Some(my_max_slot), Some(known_validators_max_slot)) = (
cluster_info.get_my_max_slot(),
cluster_info.get_known_validators_max_slot(),
) {
if my_max_slot
> known_validators_max_slot.saturating_sub(EXCESSIVE_REPAIR_SLOT_DISTANCE)
{
warn!(
"This node is caught up but is relying heavily on repair. {excessive_repair_count} of the \
last {EXCESSIVE_REPAIR_SAMPLES_MAX} slots where mostly repaired."
);
}
}
metrics.excessive_repairs.clear();
}
}
}

fn start_window_insert_thread(
cluster_info: Arc<ClusterInfo>,
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
Expand Down Expand Up @@ -464,6 +498,7 @@ impl WindowService {
}

if last_print.elapsed().as_secs() > 2 {
Self::report_excessive_repairs(&cluster_info, &mut metrics);
metrics.report_metrics("blockstore-insert-shreds");
metrics = BlockstoreInsertionMetrics::default();
ws_metrics.report_metrics("recv-window-insert-shreds");
Expand Down Expand Up @@ -581,6 +616,7 @@ mod test {
contact_info,
Arc::new(keypair),
SocketAddrSpace::Unspecified,
None,
);
run_check_duplicate(
&cluster_info,
Expand Down Expand Up @@ -615,6 +651,7 @@ mod test {
contact_info,
Arc::new(keypair),
SocketAddrSpace::Unspecified,
None,
));

// Start duplicate thread receiving and inserting duplicates
Expand Down
1 change: 1 addition & 0 deletions core/tests/epoch_accounts_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl TestEnvironment {
ContactInfo::new_localhost(&node_id.pubkey(), timestamp()),
Arc::clone(&node_id),
SocketAddrSpace::Unspecified,
None,
));

let pruned_banks_receiver =
Expand Down
3 changes: 2 additions & 1 deletion core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ fn test_concurrent_snapshot_packaging(
timestamp(), // wallclock
0u16, // shred_version
);
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified)
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified, None)
});

let (snapshot_package_sender, snapshot_package_receiver) = crossbeam_channel::unbounded();
Expand Down Expand Up @@ -955,6 +955,7 @@ fn test_snapshots_with_background_services(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
SocketAddrSpace::Unspecified,
None,
));

let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
Expand Down
Loading

0 comments on commit 18e6e57

Please sign in to comment.