From df3e5c224ec1ea865b77718c3b054cfb1b51487e Mon Sep 17 00:00:00 2001 From: Jeff Biseda Date: Mon, 25 Sep 2023 12:26:40 -0700 Subject: [PATCH] Warn if vaildator appears to be repairing excessively (part 2) --- core/src/window_service.rs | 14 +++++++ ledger/src/blockstore.rs | 24 +++++++++++- ledger/src/blockstore_metrics.rs | 67 +++++++++++++++++++++++++++++++- ledger/src/slot_stats.rs | 5 ++- wen-restart/src/wen_restart.rs | 1 + 5 files changed, 107 insertions(+), 4 deletions(-) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 80abaa33323311..e73a3f2e714719 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -21,6 +21,7 @@ use { solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred}, + blockstore_metrics::ExcessiveRepairContext, leader_schedule_cache::LeaderScheduleCache, shred::{self, Nonce, ReedSolomonCache, Shred}, }, @@ -236,6 +237,7 @@ fn run_insert( handle_duplicate: F, metrics: &mut BlockstoreInsertionMetrics, ws_metrics: &mut WindowServiceMetrics, + excessive_repair_context: &mut ExcessiveRepairContext, completed_data_sets_sender: &CompletedDataSetsSender, retransmit_sender: &Sender>, outstanding_requests: &RwLock, @@ -303,6 +305,7 @@ where &handle_duplicate, reed_solomon_cache, metrics, + excessive_repair_context, )?; completed_data_sets_sender.try_send(completed_data_sets)?; @@ -369,6 +372,7 @@ impl WindowService { ); let t_insert = Self::start_window_insert_thread( + cluster_info, exit, blockstore, leader_schedule_cache, @@ -416,6 +420,7 @@ impl WindowService { } fn start_window_insert_thread( + cluster_info: Arc, exit: Arc, blockstore: Arc, leader_schedule_cache: Arc, @@ -442,6 +447,7 @@ impl WindowService { }; let mut metrics = BlockstoreInsertionMetrics::default(); let mut ws_metrics = WindowServiceMetrics::default(); + let mut excessive_repair_context = ExcessiveRepairContext::default(); let mut last_print = Instant::now(); while !exit.load(Ordering::Relaxed) { @@ -453,6 +459,7 @@ impl WindowService { handle_duplicate, &mut metrics, &mut ws_metrics, + &mut excessive_repair_context, &completed_data_sets_sender, &retransmit_sender, &outstanding_requests, @@ -465,6 +472,12 @@ impl WindowService { } if last_print.elapsed().as_secs() > 2 { + excessive_repair_context.report_excessive_repairs(|| { + ( + cluster_info.get_my_max_slot(), + cluster_info.get_known_validators_max_slot(), + ) + }); metrics.report_metrics("blockstore-insert-shreds"); metrics = BlockstoreInsertionMetrics::default(); ws_metrics.report_metrics("recv-window-insert-shreds"); @@ -653,6 +666,7 @@ mod test { &handle_duplicate, &ReedSolomonCache::default(), &mut BlockstoreInsertionMetrics::default(), + &mut ExcessiveRepairContext::default(), ) .unwrap(); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 0aa7248c784847..af93ef5691ef58 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -10,6 +10,7 @@ use { IteratorMode, LedgerColumn, Result, WriteBatch, }, blockstore_meta::*, + blockstore_metrics::ExcessiveRepairContext, blockstore_options::{ AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_FIFO, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, @@ -845,6 +846,7 @@ impl Blockstore { retransmit_sender: Option<&Sender>>>, reed_solomon_cache: &ReedSolomonCache, metrics: &mut BlockstoreInsertionMetrics, + excessive_repair_context: &mut ExcessiveRepairContext, ) -> Result { assert_eq!(shreds.len(), is_repaired.len()); let mut total_start = Measure::start("Total elapsed"); @@ -885,6 +887,7 @@ impl Blockstore { &mut duplicate_shreds, leader_schedule, shred_source, + excessive_repair_context, ) { Err(InsertDataShredError::Exists) => { if is_repaired { @@ -921,6 +924,7 @@ impl Blockstore { is_trusted, shred_source, metrics, + excessive_repair_context, ); } }; @@ -968,6 +972,7 @@ impl Blockstore { &mut duplicate_shreds, leader_schedule, ShredSource::Recovered, + excessive_repair_context, ) { Err(InsertDataShredError::Exists) => { metrics.num_recovered_exists += 1; @@ -1066,6 +1071,7 @@ impl Blockstore { handle_duplicate: &F, reed_solomon_cache: &ReedSolomonCache, metrics: &mut BlockstoreInsertionMetrics, + excessive_repair_context: &mut ExcessiveRepairContext, ) -> Result> where F: Fn(PossibleDuplicateShred), @@ -1081,6 +1087,7 @@ impl Blockstore { retransmit_sender, reed_solomon_cache, metrics, + excessive_repair_context, )?; for shred in duplicate_shreds { @@ -1173,6 +1180,7 @@ impl Blockstore { None, // retransmit-sender &ReedSolomonCache::default(), &mut BlockstoreInsertionMetrics::default(), + &mut ExcessiveRepairContext::default(), )?; Ok(insert_results.completed_data_set_infos) } @@ -1190,6 +1198,7 @@ impl Blockstore { is_trusted: bool, shred_source: ShredSource, metrics: &mut BlockstoreInsertionMetrics, + excessive_repair_context: &mut ExcessiveRepairContext, ) -> bool { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -1267,8 +1276,13 @@ impl Blockstore { return false; } - self.slots_stats - .record_shred(shred.slot(), shred.fec_set_index(), shred_source, None); + self.slots_stats.record_shred( + shred.slot(), + shred.fec_set_index(), + shred_source, + None, + excessive_repair_context, + ); // insert coding shred into rocks let result = self @@ -1367,6 +1381,7 @@ impl Blockstore { duplicate_shreds: &mut Vec, leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, + excessive_repair_context: &mut ExcessiveRepairContext, ) -> std::result::Result, InsertDataShredError> { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -1424,6 +1439,7 @@ impl Blockstore { &shred, write_batch, shred_source, + excessive_repair_context, )?; just_inserted_shreds.insert(shred.id(), shred); index_meta_working_set_entry.did_insert_occur = true; @@ -1615,6 +1631,7 @@ impl Blockstore { shred: &Shred, write_batch: &mut WriteBatch, shred_source: ShredSource, + excessive_repair_context: &mut ExcessiveRepairContext, ) -> Result> { let slot = shred.slot(); let index = u64::from(shred.index()); @@ -1673,6 +1690,7 @@ impl Blockstore { shred.fec_set_index(), shred_source, Some(slot_meta), + excessive_repair_context, ); // slot is full, send slot full timing to poh_timing_report service. @@ -6718,6 +6736,7 @@ pub mod tests { false, ShredSource::Turbine, &mut BlockstoreInsertionMetrics::default(), + &mut ExcessiveRepairContext::default(), )); // insert again fails on dupe @@ -6733,6 +6752,7 @@ pub mod tests { false, ShredSource::Turbine, &mut BlockstoreInsertionMetrics::default(), + &mut ExcessiveRepairContext::default(), )); assert_eq!( duplicate_shreds, diff --git a/ledger/src/blockstore_metrics.rs b/ledger/src/blockstore_metrics.rs index 46cceb55b36a7c..95ed7297e8a257 100644 --- a/ledger/src/blockstore_metrics.rs +++ b/ledger/src/blockstore_metrics.rs @@ -5,15 +5,80 @@ use { PerfContext, }, solana_metrics::datapoint_info, - solana_sdk::timing::timestamp, + solana_sdk::{slot_history::Slot, timing::timestamp}, std::{ cell::RefCell, + collections::VecDeque, fmt::Debug, sync::atomic::{AtomicU64, AtomicUsize, Ordering}, time::{Duration, Instant}, }, }; +// Max number of samples to maintain +const EXCESSIVE_REPAIR_SAMPLES_MAX: usize = 1_000; +// Percent of samples which will trigger an excessive repair warning +const EXCESSIVE_REPAIR_SAMPLES_THRESHOLD: f32 = 0.5; +// Slot distance to use for comparison with known validators +const EXCESSIVE_REPAIR_SLOT_DISTANCE: u64 = 150; +// Percent of shreds repaired to indicate excessively repaired slot +const EXCESSIVE_REPAIR_SLOT_THRESHOLD: f32 = 0.5; + +#[derive(Default)] +pub struct ExcessiveRepairContext { + // Record excessive repair status of slots as they are completed as 'full' + excessive_repairs: VecDeque, + caught_up: bool, +} + +impl ExcessiveRepairContext { + pub fn record_slot(&mut self, num_shreds: u64, num_repairs: usize) { + let slot_excessive_threshold = num_shreds as f32 * EXCESSIVE_REPAIR_SLOT_THRESHOLD; + let excessive_repairs = num_repairs as f32 > slot_excessive_threshold; + self.excessive_repairs.push_back(excessive_repairs); + if self.excessive_repairs.len() > EXCESSIVE_REPAIR_SAMPLES_MAX { + self.excessive_repairs.pop_front(); + } + } + + pub fn report_excessive_repairs( + &mut self, + get_cluster_max_slots: impl FnOnce() -> ( + /*my_max_slot*/ Option, + /*known_validators_max_slot*/ Option, + ), + ) { + // Only check when we have a full sample set + if self.excessive_repairs.len() < EXCESSIVE_REPAIR_SAMPLES_MAX { + return; + } + let excessive_repair_count = self.excessive_repairs.iter().filter(|x| **x).count(); + let threshold_count = + self.excessive_repairs.len() as f32 * EXCESSIVE_REPAIR_SAMPLES_THRESHOLD; + if excessive_repair_count as f32 > threshold_count { + // We're repairing many slots, check if we think we're caught up + if let (Some(my_max_slot), Some(known_validators_max_slot)) = get_cluster_max_slots() { + if my_max_slot + > known_validators_max_slot.saturating_sub(EXCESSIVE_REPAIR_SLOT_DISTANCE) + { + // Avoid warning the first time the validator catches up after repair + if !self.caught_up { + self.caught_up = true; + } else { + warn!( + "This node is caught up but is relying heavily on repair. {excessive_repair_count} of the \ + last {EXCESSIVE_REPAIR_SAMPLES_MAX} slots where mostly repaired." + ); + } + self.excessive_repairs.clear(); + } else { + self.caught_up = false; + } + } + } + } +} + #[derive(Default)] pub struct BlockstoreInsertionMetrics { pub insert_lock_elapsed_us: u64, diff --git a/ledger/src/slot_stats.rs b/ledger/src/slot_stats.rs index 9033c3d1600f89..015e6dacac1f57 100644 --- a/ledger/src/slot_stats.rs +++ b/ledger/src/slot_stats.rs @@ -1,5 +1,5 @@ use { - crate::blockstore_meta::SlotMeta, + crate::{blockstore_meta::SlotMeta, blockstore_metrics::ExcessiveRepairContext}, bitflags::bitflags, lru::LruCache, solana_sdk::clock::Slot, @@ -96,6 +96,7 @@ impl SlotsStats { fec_set_index: u32, source: ShredSource, slot_meta: Option<&SlotMeta>, + excessive_repair_context: &mut ExcessiveRepairContext, ) { let mut slot_full_reporting_info = None; let mut stats = self.stats.lock().unwrap(); @@ -118,6 +119,8 @@ impl SlotsStats { slot_full_reporting_info = Some((slot_stats.num_repaired, slot_stats.num_recovered)); } + excessive_repair_context + .record_slot(slot_stats.last_index, slot_stats.num_repaired); } } drop(stats); diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 75e4e21ce9431a..3da41b0a891bcf 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -91,6 +91,7 @@ mod tests { }, node_keypair, SocketAddrSpace::Unspecified, + /*known_validators*/ None, )); let ledger_path = get_tmp_ledger_path_auto_delete!(); let mut wen_restart_proto_path = ledger_path.path().to_path_buf();