Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix order of resending messages after restart #6729

Merged
merged 7 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,8 +1582,9 @@ async fn handle_actions<
session_info_provider,
)
.await?;

approval_voting_sender.send_messages(messages.into_iter()).await;
for message in messages.into_iter() {
approval_voting_sender.send_unbounded_message(message);
sandreim marked this conversation as resolved.
Show resolved Hide resolved
}
let next_actions: Vec<Action> =
next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect();

Expand Down Expand Up @@ -1668,6 +1669,7 @@ async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApi

let mut approval_meta = Vec::with_capacity(all_blocks.len());
let mut messages = Vec::new();
let mut approvals = Vec::new();
let mut actions = Vec::new();

messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.
Expand Down Expand Up @@ -1839,7 +1841,7 @@ async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApi
if signatures_queued
.insert(approval_sig.signed_candidates_indices.clone())
{
messages.push(ApprovalDistributionMessage::DistributeApproval(
approvals.push(ApprovalDistributionMessage::DistributeApproval(
IndirectSignedApprovalVoteV2 {
block_hash,
candidate_indices: approval_sig.signed_candidates_indices,
Expand All @@ -1864,6 +1866,7 @@ async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApi
}

messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
messages.extend(approvals.into_iter());
alexggh marked this conversation as resolved.
Show resolved Hide resolved
Ok((messages, actions))
}

Expand Down Expand Up @@ -2466,12 +2469,7 @@ fn schedule_wakeup_action(
last_assignment_tick.map(|l| l + APPROVAL_DELAY).filter(|t| t > &tick_now),
next_no_show,
)
.map(|tick| Action::ScheduleWakeup {
block_hash,
block_number,
candidate_hash,
tick,
})
.map(|tick| Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick })
},
RequiredTranches::Pending { considered, next_no_show, clock_drift, .. } => {
// select the minimum of `next_no_show`, or the tick of the next non-empty tranche
Expand Down
314 changes: 314 additions & 0 deletions polkadot/node/core/approval-voting/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4459,6 +4459,114 @@ async fn setup_overseer_with_two_blocks_each_with_one_assignment_triggered(
assert!(our_assignment.triggered());
}

// Builds a chain with a fork where both relay blocks include the same candidate.
async fn build_chain_with_block_with_two_candidates(
block_hash1: Hash,
slot: Slot,
sync_oracle_handle: TestSyncOracleHandle,
candidate_receipt: Vec<CandidateReceipt>,
) -> (ChainBuilder, SessionInfo) {
let validators = vec![
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Eve,
];
let session_info = SessionInfo {
validator_groups: IndexedVec::<GroupIndex, Vec<ValidatorIndex>>::from(vec![
vec![ValidatorIndex(0), ValidatorIndex(1)],
vec![ValidatorIndex(2)],
vec![ValidatorIndex(3), ValidatorIndex(4)],
]),
..session_info(&validators)
};

let candidates = Some(
candidate_receipt
.iter()
.enumerate()
.map(|(i, receipt)| (receipt.clone(), CoreIndex(i as u32), GroupIndex(i as u32)))
.collect(),
);
let mut chain_builder = ChainBuilder::new();

chain_builder
.major_syncing(sync_oracle_handle.is_major_syncing.clone())
.add_block(
block_hash1,
ChainBuilder::GENESIS_HASH,
1,
BlockConfig {
slot,
candidates: candidates.clone(),
session_info: Some(session_info.clone()),
end_syncing: true,
},
);
(chain_builder, session_info)
}

async fn setup_overseer_with_blocks_with_two_assignments_triggered(
virtual_overseer: &mut VirtualOverseer,
store: TestStore,
clock: &Arc<MockClock>,
sync_oracle_handle: TestSyncOracleHandle,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => {
rx.send(Ok(0)).unwrap();
}
);

let block_hash = Hash::repeat_byte(0x01);
let candidate_commitments = CandidateCommitments::default();
let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash);
candidate_receipt.commitments_hash = candidate_commitments.hash();
let candidate_hash = candidate_receipt.hash();

let mut candidate_commitments2 = CandidateCommitments::default();
candidate_commitments2.processed_downward_messages = 3;
let mut candidate_receipt2 = dummy_candidate_receipt_v2(block_hash);
candidate_receipt2.commitments_hash = candidate_commitments2.hash();
let candidate_hash2 = candidate_receipt2.hash();

let slot = Slot::from(1);
let (chain_builder, _session_info) = build_chain_with_block_with_two_candidates(
block_hash,
slot,
sync_oracle_handle,
vec![candidate_receipt, candidate_receipt2],
)
.await;
chain_builder.build(virtual_overseer).await;

assert!(!clock.inner.lock().current_wakeup_is(1));
clock.inner.lock().wakeup_all(1);

assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot)));
clock.inner.lock().wakeup_all(slot_to_tick(slot));

futures_timer::Delay::new(Duration::from_millis(200)).await;

clock.inner.lock().wakeup_all(slot_to_tick(slot + 2));

assert_eq!(clock.inner.lock().wakeups.len(), 0);

futures_timer::Delay::new(Duration::from_millis(200)).await;

let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap();
let our_assignment =
candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap();
assert!(our_assignment.triggered());

let candidate_entry = store.load_candidate_entry(&candidate_hash2).unwrap().unwrap();
let our_assignment =
candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap();
assert!(our_assignment.triggered());
}

// Tests that for candidates that we did not approve yet, for which we triggered the assignment and
// the approval work we restart the work to approve it.
#[test]
Expand Down Expand Up @@ -4920,6 +5028,212 @@ fn subsystem_sends_pending_approvals_on_approval_restart() {
});
}

// Test that after restart approvals are sent after all assignments have been distributed.
#[test]
fn subsystem_sends_assignment_approval_in_correct_order_on_approval_restart() {
let assignment_criteria = Box::new(MockAssignmentCriteria(
|| {
let mut assignments = HashMap::new();

let _ = assignments.insert(
CoreIndex(0),
approval_db::v2::OurAssignment {
cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFModuloCompact {
core_bitfield: vec![CoreIndex(0), CoreIndex(2)].try_into().unwrap(),
}),
tranche: 0,
validator_index: ValidatorIndex(0),
triggered: false,
}
.into(),
);

let _ = assignments.insert(
CoreIndex(1),
approval_db::v2::OurAssignment {
cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFDelay {
core_index: CoreIndex(1),
}),
tranche: 0,
validator_index: ValidatorIndex(0),
triggered: false,
}
.into(),
);
assignments
},
|_| Ok(0),
));
let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build();
let store = config.backend();
let store_clone = config.backend();

test_harness(config, |test_harness| async move {
let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;

setup_overseer_with_blocks_with_two_assignments_triggered(
&mut virtual_overseer,
store,
&clock,
sync_oracle_handle,
)
.await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

recover_available_data(&mut virtual_overseer).await;
fetch_validation_code(&mut virtual_overseer).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_
)) => {
}
);

recover_available_data(&mut virtual_overseer).await;
fetch_validation_code(&mut virtual_overseer).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive {
exec_kind,
response_sender,
..
}) if exec_kind == PvfExecKind::Approval => {
response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
.unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive {
exec_kind,
response_sender,
..
}) if exec_kind == PvfExecKind::Approval => {
response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
.unwrap();
}
);

// Configure a big coalesce number, so that the signature is cached instead of being sent to
// approval-distribution.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => {
let _ = sender.send(Ok(ApprovalVotingParams {
max_approval_coalesce_count: 2,
}));
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => {
let _ = sender.send(Ok(ApprovalVotingParams {
max_approval_coalesce_count: 2,
}));
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_))
);

// Assert that there are no more messages being sent by the subsystem
assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none());

virtual_overseer
});

let config = HarnessConfigBuilder::default().backend(store_clone).major_syncing(true).build();
// On restart we should first distribute all assignments covering a coalesced approval.
test_harness(config, |test_harness| async move {
let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => {
rx.send(Ok(0)).unwrap();
}
);

let block_hash = Hash::repeat_byte(0x01);
let candidate_commitments = CandidateCommitments::default();
let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash);
candidate_receipt.commitments_hash = candidate_commitments.hash();

let mut candidate_commitments2 = CandidateCommitments::default();
candidate_commitments2.processed_downward_messages = 3;
let mut candidate_receipt2 = dummy_candidate_receipt_v2(block_hash);
candidate_receipt2.commitments_hash = candidate_commitments2.hash();

let slot = Slot::from(1);

clock.inner.lock().set_tick(slot_to_tick(slot + 2));
let (chain_builder, _session_info) = build_chain_with_block_with_two_candidates(
block_hash,
slot,
sync_oracle_handle,
vec![candidate_receipt.into(), candidate_receipt2.into()],
)
.await;
chain_builder.build(&mut virtual_overseer).await;

futures_timer::Delay::new(Duration::from_millis(2000)).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NewBlocks(
_,
)) => {
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(approval)) => {
assert_eq!(approval.candidate_indices.count_ones(), 2);
}
);

// Assert that there are no more messages being sent by the subsystem
assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none());

virtual_overseer
});
}

// Test we correctly update the timer when we mark the beginning of gathering assignments.
#[test]
fn test_gathering_assignments_statements() {
Expand Down
Loading