From 27a749f2460e6d96230e30d5eb5cd8952ef8fbb1 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Tue, 10 Dec 2024 14:40:28 +0200 Subject: [PATCH 1/3] BACKPORT-CONFLICT --- polkadot/node/core/approval-voting/src/lib.rs | 13 +- .../node/core/approval-voting/src/tests.rs | 314 ++++++++++++++++++ prdoc/pr_6729.prdoc | 15 + 3 files changed, 341 insertions(+), 1 deletion(-) create mode 100644 prdoc/pr_6729.prdoc diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 942922cba6df..55e91c1ae183 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -1491,8 +1491,14 @@ async fn handle_actions( session_info_provider, ) .await?; +<<<<<<< HEAD ctx.send_messages(messages.into_iter()).await; +======= + for message in messages.into_iter() { + approval_voting_sender.send_unbounded_message(message); + } +>>>>>>> 65a4e5ee (Fix order of resending messages after restart (#6729)) let next_actions: Vec = next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect(); @@ -1577,6 +1583,7 @@ async fn distribution_messages_for_activation( let mut approval_meta = Vec::with_capacity(all_blocks.len()); let mut messages = Vec::new(); + let mut approvals = Vec::new(); let mut actions = Vec::new(); messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value. @@ -1762,7 +1769,7 @@ async fn distribution_messages_for_activation( if signatures_queued .insert(approval_sig.signed_candidates_indices.clone()) { - messages.push(ApprovalDistributionMessage::DistributeApproval( + approvals.push(ApprovalDistributionMessage::DistributeApproval( IndirectSignedApprovalVoteV2 { block_hash, candidate_indices: approval_sig.signed_candidates_indices, @@ -1787,6 +1794,10 @@ async fn distribution_messages_for_activation( } messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta); + // Approvals are appended at the end, to make sure all assignments are sent + // before the approvals, otherwise if they arrive ahead in approval-distribution + // they will be ignored. + messages.extend(approvals.into_iter()); Ok((messages, actions)) } diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index 7126f209a94f..493a38766c1f 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -4459,6 +4459,114 @@ async fn setup_overseer_with_two_blocks_each_with_one_assignment_triggered( assert!(our_assignment.triggered()); } +// Builds a chain with a fork where both relay blocks include the same candidate. +async fn build_chain_with_block_with_two_candidates( + block_hash1: Hash, + slot: Slot, + sync_oracle_handle: TestSyncOracleHandle, + candidate_receipt: Vec, +) -> (ChainBuilder, SessionInfo) { + let validators = vec![ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Dave, + Sr25519Keyring::Eve, + ]; + let session_info = SessionInfo { + validator_groups: IndexedVec::>::from(vec![ + vec![ValidatorIndex(0), ValidatorIndex(1)], + vec![ValidatorIndex(2)], + vec![ValidatorIndex(3), ValidatorIndex(4)], + ]), + ..session_info(&validators) + }; + + let candidates = Some( + candidate_receipt + .iter() + .enumerate() + .map(|(i, receipt)| (receipt.clone(), CoreIndex(i as u32), GroupIndex(i as u32))) + .collect(), + ); + let mut chain_builder = ChainBuilder::new(); + + chain_builder + .major_syncing(sync_oracle_handle.is_major_syncing.clone()) + .add_block( + block_hash1, + ChainBuilder::GENESIS_HASH, + 1, + BlockConfig { + slot, + candidates: candidates.clone(), + session_info: Some(session_info.clone()), + end_syncing: true, + }, + ); + (chain_builder, session_info) +} + +async fn setup_overseer_with_blocks_with_two_assignments_triggered( + virtual_overseer: &mut VirtualOverseer, + store: TestStore, + clock: &Arc, + sync_oracle_handle: TestSyncOracleHandle, +) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => { + rx.send(Ok(0)).unwrap(); + } + ); + + let block_hash = Hash::repeat_byte(0x01); + let candidate_commitments = CandidateCommitments::default(); + let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash); + candidate_receipt.commitments_hash = candidate_commitments.hash(); + let candidate_hash = candidate_receipt.hash(); + + let mut candidate_commitments2 = CandidateCommitments::default(); + candidate_commitments2.processed_downward_messages = 3; + let mut candidate_receipt2 = dummy_candidate_receipt_v2(block_hash); + candidate_receipt2.commitments_hash = candidate_commitments2.hash(); + let candidate_hash2 = candidate_receipt2.hash(); + + let slot = Slot::from(1); + let (chain_builder, _session_info) = build_chain_with_block_with_two_candidates( + block_hash, + slot, + sync_oracle_handle, + vec![candidate_receipt, candidate_receipt2], + ) + .await; + chain_builder.build(virtual_overseer).await; + + assert!(!clock.inner.lock().current_wakeup_is(1)); + clock.inner.lock().wakeup_all(1); + + assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot))); + clock.inner.lock().wakeup_all(slot_to_tick(slot)); + + futures_timer::Delay::new(Duration::from_millis(200)).await; + + clock.inner.lock().wakeup_all(slot_to_tick(slot + 2)); + + assert_eq!(clock.inner.lock().wakeups.len(), 0); + + futures_timer::Delay::new(Duration::from_millis(200)).await; + + let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap(); + let our_assignment = + candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap(); + assert!(our_assignment.triggered()); + + let candidate_entry = store.load_candidate_entry(&candidate_hash2).unwrap().unwrap(); + let our_assignment = + candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap(); + assert!(our_assignment.triggered()); +} + // Tests that for candidates that we did not approve yet, for which we triggered the assignment and // the approval work we restart the work to approve it. #[test] @@ -4920,6 +5028,212 @@ fn subsystem_sends_pending_approvals_on_approval_restart() { }); } +// Test that after restart approvals are sent after all assignments have been distributed. +#[test] +fn subsystem_sends_assignment_approval_in_correct_order_on_approval_restart() { + let assignment_criteria = Box::new(MockAssignmentCriteria( + || { + let mut assignments = HashMap::new(); + + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFModuloCompact { + core_bitfield: vec![CoreIndex(0), CoreIndex(2)].try_into().unwrap(), + }), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + + let _ = assignments.insert( + CoreIndex(1), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFDelay { + core_index: CoreIndex(1), + }), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + assignments + }, + |_| Ok(0), + )); + let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build(); + let store = config.backend(); + let store_clone = config.backend(); + + test_harness(config, |test_harness| async move { + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness; + + setup_overseer_with_blocks_with_two_assignments_triggered( + &mut virtual_overseer, + store, + &clock, + sync_oracle_handle, + ) + .await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + recover_available_data(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _ + )) => { + } + ); + + recover_available_data(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive { + exec_kind, + response_sender, + .. + }) if exec_kind == PvfExecKind::Approval => { + response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) + .unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive { + exec_kind, + response_sender, + .. + }) if exec_kind == PvfExecKind::Approval => { + response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) + .unwrap(); + } + ); + + // Configure a big coalesce number, so that the signature is cached instead of being sent to + // approval-distribution. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 2, + })); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 2, + })); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_)) + ); + + // Assert that there are no more messages being sent by the subsystem + assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); + + virtual_overseer + }); + + let config = HarnessConfigBuilder::default().backend(store_clone).major_syncing(true).build(); + // On restart we should first distribute all assignments covering a coalesced approval. + test_harness(config, |test_harness| async move { + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => { + rx.send(Ok(0)).unwrap(); + } + ); + + let block_hash = Hash::repeat_byte(0x01); + let candidate_commitments = CandidateCommitments::default(); + let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash); + candidate_receipt.commitments_hash = candidate_commitments.hash(); + + let mut candidate_commitments2 = CandidateCommitments::default(); + candidate_commitments2.processed_downward_messages = 3; + let mut candidate_receipt2 = dummy_candidate_receipt_v2(block_hash); + candidate_receipt2.commitments_hash = candidate_commitments2.hash(); + + let slot = Slot::from(1); + + clock.inner.lock().set_tick(slot_to_tick(slot + 2)); + let (chain_builder, _session_info) = build_chain_with_block_with_two_candidates( + block_hash, + slot, + sync_oracle_handle, + vec![candidate_receipt.into(), candidate_receipt2.into()], + ) + .await; + chain_builder.build(&mut virtual_overseer).await; + + futures_timer::Delay::new(Duration::from_millis(2000)).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NewBlocks( + _, + )) => { + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(approval)) => { + assert_eq!(approval.candidate_indices.count_ones(), 2); + } + ); + + // Assert that there are no more messages being sent by the subsystem + assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); + + virtual_overseer + }); +} + // Test we correctly update the timer when we mark the beginning of gathering assignments. #[test] fn test_gathering_assignments_statements() { diff --git a/prdoc/pr_6729.prdoc b/prdoc/pr_6729.prdoc new file mode 100644 index 000000000000..9eaa67363c9a --- /dev/null +++ b/prdoc/pr_6729.prdoc @@ -0,0 +1,15 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Fix order of resending messages after restart + +doc: + - audience: Node Dev + description: | + At restart when dealing with a coalesced approval we might end up in a situation where we sent to + approval-distribution the approval before all assignments covering it, in that case, the approval + is ignored and never distribute, which will lead to no-shows. + +crates: + - name: polkadot-node-core-approval-voting + bump: minor From 077682e82990c745bfa3ca91a06bb9dd222b665b Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Tue, 10 Dec 2024 16:00:42 +0200 Subject: [PATCH 2/3] Solve conflicts Signed-off-by: Alexandru Gheorghe --- polkadot/node/core/approval-voting/src/lib.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 55e91c1ae183..5f1e1c2c786f 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -1491,14 +1491,10 @@ async fn handle_actions( session_info_provider, ) .await?; -<<<<<<< HEAD - - ctx.send_messages(messages.into_iter()).await; -======= for message in messages.into_iter() { - approval_voting_sender.send_unbounded_message(message); + ctx.send_unbounded_message(message); } ->>>>>>> 65a4e5ee (Fix order of resending messages after restart (#6729)) + let next_actions: Vec = next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect(); From 7e7731181b99d0323cbe7ba558ffa3e9e11ccdd2 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Tue, 10 Dec 2024 16:07:54 +0200 Subject: [PATCH 3/3] Make test compile on this old version Signed-off-by: Alexandru Gheorghe --- polkadot/node/core/approval-voting/src/tests.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index 493a38766c1f..561f5fbbdf2c 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -4510,7 +4510,7 @@ async fn build_chain_with_block_with_two_candidates( async fn setup_overseer_with_blocks_with_two_assignments_triggered( virtual_overseer: &mut VirtualOverseer, store: TestStore, - clock: &Arc, + clock: &Box, sync_oracle_handle: TestSyncOracleHandle, ) { assert_matches!( @@ -4522,13 +4522,13 @@ async fn setup_overseer_with_blocks_with_two_assignments_triggered( let block_hash = Hash::repeat_byte(0x01); let candidate_commitments = CandidateCommitments::default(); - let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash); + let mut candidate_receipt = dummy_candidate_receipt(block_hash); candidate_receipt.commitments_hash = candidate_commitments.hash(); let candidate_hash = candidate_receipt.hash(); let mut candidate_commitments2 = CandidateCommitments::default(); candidate_commitments2.processed_downward_messages = 3; - let mut candidate_receipt2 = dummy_candidate_receipt_v2(block_hash); + let mut candidate_receipt2 = dummy_candidate_receipt(block_hash); candidate_receipt2.commitments_hash = candidate_commitments2.hash(); let candidate_hash2 = candidate_receipt2.hash(); @@ -5172,12 +5172,12 @@ fn subsystem_sends_assignment_approval_in_correct_order_on_approval_restart() { let block_hash = Hash::repeat_byte(0x01); let candidate_commitments = CandidateCommitments::default(); - let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash); + let mut candidate_receipt = dummy_candidate_receipt(block_hash); candidate_receipt.commitments_hash = candidate_commitments.hash(); let mut candidate_commitments2 = CandidateCommitments::default(); candidate_commitments2.processed_downward_messages = 3; - let mut candidate_receipt2 = dummy_candidate_receipt_v2(block_hash); + let mut candidate_receipt2 = dummy_candidate_receipt(block_hash); candidate_receipt2.commitments_hash = candidate_commitments2.hash(); let slot = Slot::from(1);