From 3d0d9911e860f5e83cc53b6192c8fde75d167a30 Mon Sep 17 00:00:00 2001 From: William Smith Date: Thu, 15 Dec 2022 09:52:57 -0800 Subject: [PATCH] Fix skipped checkpoints bug (#6812) * Fix skipped checkpoints bug * Simplify scheduling logic --- .../checkpoints/checkpoint_executor/mod.rs | 62 +++++++++---------- 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs index f1023582500f5..86c065c39922c 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs @@ -177,6 +177,13 @@ impl CheckpointExecutor { Some(Ok((checkpoint, next_committee))) = pending.next() => { match next_committee { None => { + // Ensure that we are not skipping checkpoints at any point + if let Some(prev_highest) = self.checkpoint_store.get_highest_executed_checkpoint_seq_number().unwrap() { + assert_eq!(prev_highest + 1, checkpoint.sequence_number()); + } else { + assert_eq!(checkpoint.sequence_number(), 0); + } + let new_highest = checkpoint.sequence_number(); debug!( "Bumping highest_executed_checkpoint watermark to {:?}", @@ -287,40 +294,23 @@ impl CheckpointExecutor { // follow case. Avoid reading from DB and used checkpoint passed // from StateSync Ordering::Equal => return self.schedule_checkpoint(latest_synced_checkpoint, pending), - // Need to catch up more than 1. Continue - Ordering::Less => {} - } - - // Schedule as many checkpoints as possible in order to catch up quickly. - // The highest we can schedule is bounded by the min of the number of - // checkpoints that *need* to be scheduled and the number of tasks available - // to schedule within - let checkpoints_diff = latest_synced_checkpoint.sequence_number() - next_to_exec + 1; - let tasks_diff = self.task_limit - pending.len(); - let num_tasks_to_schedule = std::cmp::min(checkpoints_diff, tasks_diff as u64); - - // get all checkpoints to be scheduled, less the last (which we got already above) - let range: Vec = (next_to_exec..(next_to_exec + num_tasks_to_schedule)).collect(); - - let mut checkpoints_to_schedule = self - .checkpoint_store - .multi_get_checkpoint_by_sequence_number(&range)? - .into_iter() - .map(|tx| tx.unwrap()) - .collect::>(); - - checkpoints_to_schedule.sort_by_key(|a| a.sequence_number()); - checkpoints_to_schedule.push(latest_synced_checkpoint); - let checkpoints_to_schedule = checkpoints_to_schedule; - debug!( - "Scheduling {:?} lagging checkpoints", - checkpoints_to_schedule.len(), - ); - - for checkpoint in checkpoints_to_schedule.into_iter() { - self.schedule_checkpoint(checkpoint, pending)?; - if self.end_of_epoch { - return Ok(()); + // Need to catch up more than 1. Read from store + Ordering::Less => { + for i in next_to_exec..=latest_synced_checkpoint.sequence_number() { + if pending.len() >= self.task_limit || self.end_of_epoch { + break; + } + let checkpoint = self + .checkpoint_store + .get_checkpoint_by_sequence_number(i)? + .unwrap_or_else(|| { + panic!( + "Checkpoint sequence number {:?} does not exist in checkpoint store", + i + ) + }); + self.schedule_checkpoint(checkpoint, pending)?; + } } } @@ -332,6 +322,10 @@ impl CheckpointExecutor { checkpoint: VerifiedCheckpoint, pending: &mut CheckpointExecutionBuffer, ) -> SuiResult { + info!( + "Scheduling checkpoint {:?} for execution", + checkpoint.sequence_number() + ); // Mismatch between node epoch and checkpoint epoch after startup // crash recovery is invalid let checkpoint_epoch = checkpoint.epoch();