Skip to content

Commit

Permalink
Warn if vaildator appears to be repairing excessively (part 2)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbiseda committed Oct 10, 2023
1 parent 185ab33 commit df3e5c2
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 4 deletions.
14 changes: 14 additions & 0 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use {
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
blockstore_metrics::ExcessiveRepairContext,
leader_schedule_cache::LeaderScheduleCache,
shred::{self, Nonce, ReedSolomonCache, Shred},
},
Expand Down Expand Up @@ -236,6 +237,7 @@ fn run_insert<F>(
handle_duplicate: F,
metrics: &mut BlockstoreInsertionMetrics,
ws_metrics: &mut WindowServiceMetrics,
excessive_repair_context: &mut ExcessiveRepairContext,
completed_data_sets_sender: &CompletedDataSetsSender,
retransmit_sender: &Sender<Vec<ShredPayload>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
Expand Down Expand Up @@ -303,6 +305,7 @@ where
&handle_duplicate,
reed_solomon_cache,
metrics,
excessive_repair_context,
)?;

completed_data_sets_sender.try_send(completed_data_sets)?;
Expand Down Expand Up @@ -369,6 +372,7 @@ impl WindowService {
);

let t_insert = Self::start_window_insert_thread(
cluster_info,
exit,
blockstore,
leader_schedule_cache,
Expand Down Expand Up @@ -416,6 +420,7 @@ impl WindowService {
}

fn start_window_insert_thread(
cluster_info: Arc<ClusterInfo>,
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
Expand All @@ -442,6 +447,7 @@ impl WindowService {
};
let mut metrics = BlockstoreInsertionMetrics::default();
let mut ws_metrics = WindowServiceMetrics::default();
let mut excessive_repair_context = ExcessiveRepairContext::default();

let mut last_print = Instant::now();
while !exit.load(Ordering::Relaxed) {
Expand All @@ -453,6 +459,7 @@ impl WindowService {
handle_duplicate,
&mut metrics,
&mut ws_metrics,
&mut excessive_repair_context,
&completed_data_sets_sender,
&retransmit_sender,
&outstanding_requests,
Expand All @@ -465,6 +472,12 @@ impl WindowService {
}

if last_print.elapsed().as_secs() > 2 {
excessive_repair_context.report_excessive_repairs(|| {
(
cluster_info.get_my_max_slot(),
cluster_info.get_known_validators_max_slot(),
)
});
metrics.report_metrics("blockstore-insert-shreds");
metrics = BlockstoreInsertionMetrics::default();
ws_metrics.report_metrics("recv-window-insert-shreds");
Expand Down Expand Up @@ -653,6 +666,7 @@ mod test {
&handle_duplicate,
&ReedSolomonCache::default(),
&mut BlockstoreInsertionMetrics::default(),
&mut ExcessiveRepairContext::default(),
)
.unwrap();

Expand Down
24 changes: 22 additions & 2 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use {
IteratorMode, LedgerColumn, Result, WriteBatch,
},
blockstore_meta::*,
blockstore_metrics::ExcessiveRepairContext,
blockstore_options::{
AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_FIFO,
BLOCKSTORE_DIRECTORY_ROCKS_LEVEL,
Expand Down Expand Up @@ -845,6 +846,7 @@ impl Blockstore {
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
reed_solomon_cache: &ReedSolomonCache,
metrics: &mut BlockstoreInsertionMetrics,
excessive_repair_context: &mut ExcessiveRepairContext,
) -> Result<InsertResults> {
assert_eq!(shreds.len(), is_repaired.len());
let mut total_start = Measure::start("Total elapsed");
Expand Down Expand Up @@ -885,6 +887,7 @@ impl Blockstore {
&mut duplicate_shreds,
leader_schedule,
shred_source,
excessive_repair_context,
) {
Err(InsertDataShredError::Exists) => {
if is_repaired {
Expand Down Expand Up @@ -921,6 +924,7 @@ impl Blockstore {
is_trusted,
shred_source,
metrics,
excessive_repair_context,
);
}
};
Expand Down Expand Up @@ -968,6 +972,7 @@ impl Blockstore {
&mut duplicate_shreds,
leader_schedule,
ShredSource::Recovered,
excessive_repair_context,
) {
Err(InsertDataShredError::Exists) => {
metrics.num_recovered_exists += 1;
Expand Down Expand Up @@ -1066,6 +1071,7 @@ impl Blockstore {
handle_duplicate: &F,
reed_solomon_cache: &ReedSolomonCache,
metrics: &mut BlockstoreInsertionMetrics,
excessive_repair_context: &mut ExcessiveRepairContext,
) -> Result<Vec<CompletedDataSetInfo>>
where
F: Fn(PossibleDuplicateShred),
Expand All @@ -1081,6 +1087,7 @@ impl Blockstore {
retransmit_sender,
reed_solomon_cache,
metrics,
excessive_repair_context,
)?;

for shred in duplicate_shreds {
Expand Down Expand Up @@ -1173,6 +1180,7 @@ impl Blockstore {
None, // retransmit-sender
&ReedSolomonCache::default(),
&mut BlockstoreInsertionMetrics::default(),
&mut ExcessiveRepairContext::default(),
)?;
Ok(insert_results.completed_data_set_infos)
}
Expand All @@ -1190,6 +1198,7 @@ impl Blockstore {
is_trusted: bool,
shred_source: ShredSource,
metrics: &mut BlockstoreInsertionMetrics,
excessive_repair_context: &mut ExcessiveRepairContext,
) -> bool {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
Expand Down Expand Up @@ -1267,8 +1276,13 @@ impl Blockstore {
return false;
}

self.slots_stats
.record_shred(shred.slot(), shred.fec_set_index(), shred_source, None);
self.slots_stats.record_shred(
shred.slot(),
shred.fec_set_index(),
shred_source,
None,
excessive_repair_context,
);

// insert coding shred into rocks
let result = self
Expand Down Expand Up @@ -1367,6 +1381,7 @@ impl Blockstore {
duplicate_shreds: &mut Vec<PossibleDuplicateShred>,
leader_schedule: Option<&LeaderScheduleCache>,
shred_source: ShredSource,
excessive_repair_context: &mut ExcessiveRepairContext,
) -> std::result::Result<Vec<CompletedDataSetInfo>, InsertDataShredError> {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
Expand Down Expand Up @@ -1424,6 +1439,7 @@ impl Blockstore {
&shred,
write_batch,
shred_source,
excessive_repair_context,
)?;
just_inserted_shreds.insert(shred.id(), shred);
index_meta_working_set_entry.did_insert_occur = true;
Expand Down Expand Up @@ -1615,6 +1631,7 @@ impl Blockstore {
shred: &Shred,
write_batch: &mut WriteBatch,
shred_source: ShredSource,
excessive_repair_context: &mut ExcessiveRepairContext,
) -> Result<Vec<CompletedDataSetInfo>> {
let slot = shred.slot();
let index = u64::from(shred.index());
Expand Down Expand Up @@ -1673,6 +1690,7 @@ impl Blockstore {
shred.fec_set_index(),
shred_source,
Some(slot_meta),
excessive_repair_context,
);

// slot is full, send slot full timing to poh_timing_report service.
Expand Down Expand Up @@ -6718,6 +6736,7 @@ pub mod tests {
false,
ShredSource::Turbine,
&mut BlockstoreInsertionMetrics::default(),
&mut ExcessiveRepairContext::default(),
));

// insert again fails on dupe
Expand All @@ -6733,6 +6752,7 @@ pub mod tests {
false,
ShredSource::Turbine,
&mut BlockstoreInsertionMetrics::default(),
&mut ExcessiveRepairContext::default(),
));
assert_eq!(
duplicate_shreds,
Expand Down
67 changes: 66 additions & 1 deletion ledger/src/blockstore_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,80 @@ use {
PerfContext,
},
solana_metrics::datapoint_info,
solana_sdk::timing::timestamp,
solana_sdk::{slot_history::Slot, timing::timestamp},
std::{
cell::RefCell,
collections::VecDeque,
fmt::Debug,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
time::{Duration, Instant},
},
};

// Max number of samples to maintain
const EXCESSIVE_REPAIR_SAMPLES_MAX: usize = 1_000;
// Percent of samples which will trigger an excessive repair warning
const EXCESSIVE_REPAIR_SAMPLES_THRESHOLD: f32 = 0.5;
// Slot distance to use for comparison with known validators
const EXCESSIVE_REPAIR_SLOT_DISTANCE: u64 = 150;
// Percent of shreds repaired to indicate excessively repaired slot
const EXCESSIVE_REPAIR_SLOT_THRESHOLD: f32 = 0.5;

#[derive(Default)]
pub struct ExcessiveRepairContext {
// Record excessive repair status of slots as they are completed as 'full'
excessive_repairs: VecDeque<bool>,
caught_up: bool,
}

impl ExcessiveRepairContext {
pub fn record_slot(&mut self, num_shreds: u64, num_repairs: usize) {
let slot_excessive_threshold = num_shreds as f32 * EXCESSIVE_REPAIR_SLOT_THRESHOLD;
let excessive_repairs = num_repairs as f32 > slot_excessive_threshold;
self.excessive_repairs.push_back(excessive_repairs);
if self.excessive_repairs.len() > EXCESSIVE_REPAIR_SAMPLES_MAX {
self.excessive_repairs.pop_front();
}
}

pub fn report_excessive_repairs(
&mut self,
get_cluster_max_slots: impl FnOnce() -> (
/*my_max_slot*/ Option<Slot>,
/*known_validators_max_slot*/ Option<Slot>,
),
) {
// Only check when we have a full sample set
if self.excessive_repairs.len() < EXCESSIVE_REPAIR_SAMPLES_MAX {
return;
}
let excessive_repair_count = self.excessive_repairs.iter().filter(|x| **x).count();
let threshold_count =
self.excessive_repairs.len() as f32 * EXCESSIVE_REPAIR_SAMPLES_THRESHOLD;
if excessive_repair_count as f32 > threshold_count {
// We're repairing many slots, check if we think we're caught up
if let (Some(my_max_slot), Some(known_validators_max_slot)) = get_cluster_max_slots() {
if my_max_slot
> known_validators_max_slot.saturating_sub(EXCESSIVE_REPAIR_SLOT_DISTANCE)
{
// Avoid warning the first time the validator catches up after repair
if !self.caught_up {
self.caught_up = true;
} else {
warn!(
"This node is caught up but is relying heavily on repair. {excessive_repair_count} of the \
last {EXCESSIVE_REPAIR_SAMPLES_MAX} slots where mostly repaired."
);
}
self.excessive_repairs.clear();
} else {
self.caught_up = false;
}
}
}
}
}

#[derive(Default)]
pub struct BlockstoreInsertionMetrics {
pub insert_lock_elapsed_us: u64,
Expand Down
5 changes: 4 additions & 1 deletion ledger/src/slot_stats.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::blockstore_meta::SlotMeta,
crate::{blockstore_meta::SlotMeta, blockstore_metrics::ExcessiveRepairContext},
bitflags::bitflags,
lru::LruCache,
solana_sdk::clock::Slot,
Expand Down Expand Up @@ -96,6 +96,7 @@ impl SlotsStats {
fec_set_index: u32,
source: ShredSource,
slot_meta: Option<&SlotMeta>,
excessive_repair_context: &mut ExcessiveRepairContext,
) {
let mut slot_full_reporting_info = None;
let mut stats = self.stats.lock().unwrap();
Expand All @@ -118,6 +119,8 @@ impl SlotsStats {
slot_full_reporting_info =
Some((slot_stats.num_repaired, slot_stats.num_recovered));
}
excessive_repair_context
.record_slot(slot_stats.last_index, slot_stats.num_repaired);
}
}
drop(stats);
Expand Down
1 change: 1 addition & 0 deletions wen-restart/src/wen_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ mod tests {
},
node_keypair,
SocketAddrSpace::Unspecified,
/*known_validators*/ None,
));
let ledger_path = get_tmp_ledger_path_auto_delete!();
let mut wen_restart_proto_path = ledger_path.path().to_path_buf();
Expand Down

0 comments on commit df3e5c2

Please sign in to comment.