Skip to content

Commit

Permalink
Fix skipped checkpoints bug (MystenLabs#6812)
Browse files Browse the repository at this point in the history
* Fix skipped checkpoints bug

* Simplify scheduling logic
  • Loading branch information
williampsmith authored Dec 15, 2022
1 parent 33296d0 commit 3d0d991
Showing 1 changed file with 28 additions and 34 deletions.
62 changes: 28 additions & 34 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ impl CheckpointExecutor {
Some(Ok((checkpoint, next_committee))) = pending.next() => {
match next_committee {
None => {
// Ensure that we are not skipping checkpoints at any point
if let Some(prev_highest) = self.checkpoint_store.get_highest_executed_checkpoint_seq_number().unwrap() {
assert_eq!(prev_highest + 1, checkpoint.sequence_number());
} else {
assert_eq!(checkpoint.sequence_number(), 0);
}

let new_highest = checkpoint.sequence_number();
debug!(
"Bumping highest_executed_checkpoint watermark to {:?}",
Expand Down Expand Up @@ -287,40 +294,23 @@ impl CheckpointExecutor {
// follow case. Avoid reading from DB and used checkpoint passed
// from StateSync
Ordering::Equal => return self.schedule_checkpoint(latest_synced_checkpoint, pending),
// Need to catch up more than 1. Continue
Ordering::Less => {}
}

// Schedule as many checkpoints as possible in order to catch up quickly.
// The highest we can schedule is bounded by the min of the number of
// checkpoints that *need* to be scheduled and the number of tasks available
// to schedule within
let checkpoints_diff = latest_synced_checkpoint.sequence_number() - next_to_exec + 1;
let tasks_diff = self.task_limit - pending.len();
let num_tasks_to_schedule = std::cmp::min(checkpoints_diff, tasks_diff as u64);

// get all checkpoints to be scheduled, less the last (which we got already above)
let range: Vec<u64> = (next_to_exec..(next_to_exec + num_tasks_to_schedule)).collect();

let mut checkpoints_to_schedule = self
.checkpoint_store
.multi_get_checkpoint_by_sequence_number(&range)?
.into_iter()
.map(|tx| tx.unwrap())
.collect::<Vec<VerifiedCheckpoint>>();

checkpoints_to_schedule.sort_by_key(|a| a.sequence_number());
checkpoints_to_schedule.push(latest_synced_checkpoint);
let checkpoints_to_schedule = checkpoints_to_schedule;
debug!(
"Scheduling {:?} lagging checkpoints",
checkpoints_to_schedule.len(),
);

for checkpoint in checkpoints_to_schedule.into_iter() {
self.schedule_checkpoint(checkpoint, pending)?;
if self.end_of_epoch {
return Ok(());
// Need to catch up more than 1. Read from store
Ordering::Less => {
for i in next_to_exec..=latest_synced_checkpoint.sequence_number() {
if pending.len() >= self.task_limit || self.end_of_epoch {
break;
}
let checkpoint = self
.checkpoint_store
.get_checkpoint_by_sequence_number(i)?
.unwrap_or_else(|| {
panic!(
"Checkpoint sequence number {:?} does not exist in checkpoint store",
i
)
});
self.schedule_checkpoint(checkpoint, pending)?;
}
}
}

Expand All @@ -332,6 +322,10 @@ impl CheckpointExecutor {
checkpoint: VerifiedCheckpoint,
pending: &mut CheckpointExecutionBuffer,
) -> SuiResult {
info!(
"Scheduling checkpoint {:?} for execution",
checkpoint.sequence_number()
);
// Mismatch between node epoch and checkpoint epoch after startup
// crash recovery is invalid
let checkpoint_epoch = checkpoint.epoch();
Expand Down

0 comments on commit 3d0d991

Please sign in to comment.