From 2e9ea0dc12320fac28806d2d3cbc05656a692879 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 7 Nov 2024 12:48:13 +0100 Subject: [PATCH] PVF: drop backing jobs if it is too late (#5616) Fixes https://github.com/paritytech/polkadot-sdk/issues/5530 This PR introduces the removal of backing jobs that have been back pressured for longer than `allowedAncestryLen`, as these candidates are no longer viable. It is reasonable to expect a result for a backing job execution within `allowedAncestryLen` blocks. Therefore, we set the job TTL as a relay block number and synchronize the validation host by sending activated leaves. --------- Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com> Co-authored-by: Branislav Kontur --- Cargo.lock | 2 +- polkadot/node/core/backing/src/lib.rs | 5 +- polkadot/node/core/backing/src/tests/mod.rs | 20 +- .../src/tests/prospective_parachains.rs | 2 +- .../node/core/candidate-validation/src/lib.rs | 106 ++++++- .../core/candidate-validation/src/tests.rs | 54 +++- polkadot/node/core/pvf/Cargo.toml | 1 + polkadot/node/core/pvf/src/error.rs | 5 + polkadot/node/core/pvf/src/execute/queue.rs | 277 ++++++++++++++---- polkadot/node/core/pvf/src/host.rs | 55 +++- polkadot/node/core/pvf/src/priority.rs | 4 +- polkadot/node/core/pvf/tests/it/adder.rs | 5 + polkadot/node/core/pvf/tests/it/main.rs | 47 ++- polkadot/node/core/pvf/tests/it/process.rs | 5 + .../node/overseer/examples/minimal-example.rs | 2 +- polkadot/node/overseer/src/lib.rs | 1 + polkadot/node/overseer/src/tests.rs | 4 +- polkadot/node/subsystem-types/Cargo.toml | 1 - polkadot/node/subsystem-types/src/messages.rs | 21 +- prdoc/pr_5616.prdoc | 25 ++ 20 files changed, 513 insertions(+), 129 deletions(-) create mode 100644 prdoc/pr_5616.prdoc diff --git a/Cargo.lock b/Cargo.lock index 5f81f77991a7..1036e4016746 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14697,6 +14697,7 @@ dependencies = [ "polkadot-node-metrics", "polkadot-node-primitives", "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", "polkadot-parachain-primitives", "polkadot-primitives", "procfs", @@ -14960,7 +14961,6 @@ dependencies = [ "sp-blockchain", "sp-consensus-babe", "sp-runtime 31.0.1", - "strum 0.26.3", "substrate-prometheus-endpoint", "thiserror", ] diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index b5362d32ad88..30121418a2fd 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -632,6 +632,7 @@ async fn request_candidate_validation( ) -> Result { let (tx, rx) = oneshot::channel(); let is_system = candidate_receipt.descriptor.para_id().is_system(); + let relay_parent = candidate_receipt.descriptor.relay_parent(); sender .send_message(CandidateValidationMessage::ValidateFromExhaustive { @@ -641,9 +642,9 @@ async fn request_candidate_validation( pov, executor_params, exec_kind: if is_system { - PvfExecKind::BackingSystemParas + PvfExecKind::BackingSystemParas(relay_parent) } else { - PvfExecKind::Backing + PvfExecKind::Backing(relay_parent) }, response_sender: tx, }) diff --git a/polkadot/node/core/backing/src/tests/mod.rs b/polkadot/node/core/backing/src/tests/mod.rs index dbb974a634fe..97e25c04282c 100644 --- a/polkadot/node/core/backing/src/tests/mod.rs +++ b/polkadot/node/core/backing/src/tests/mod.rs @@ -435,7 +435,7 @@ async fn assert_validate_from_exhaustive( ) if validation_data == *assert_pvd && validation_code == *assert_validation_code && *pov == *assert_pov && candidate_receipt.descriptor == assert_candidate.descriptor && - exec_kind == PvfExecKind::BackingSystemParas && + matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) && candidate_receipt.commitments_hash == assert_candidate.commitments.hash() => { response_sender.send(Ok(ValidationResult::Valid( @@ -652,7 +652,7 @@ fn backing_works(#[case] elastic_scaling_mvp: bool) { ) if validation_data == pvd_ab && validation_code == validation_code_ab && *pov == pov_ab && candidate_receipt.descriptor == candidate_a.descriptor && - exec_kind == PvfExecKind::BackingSystemParas && + matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) && candidate_receipt.commitments_hash == candidate_a_commitments_hash => { response_sender.send(Ok( @@ -1288,7 +1288,7 @@ fn backing_works_while_validation_ongoing() { ) if validation_data == pvd_abc && validation_code == validation_code_abc && *pov == pov_abc && candidate_receipt.descriptor == candidate_a.descriptor && - exec_kind == PvfExecKind::BackingSystemParas && + matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) && candidate_a_commitments_hash == candidate_receipt.commitments_hash => { // we never validate the candidate. our local node @@ -1455,7 +1455,7 @@ fn backing_misbehavior_works() { ) if validation_data == pvd_a && validation_code == validation_code_a && *pov == pov_a && candidate_receipt.descriptor == candidate_a.descriptor && - exec_kind == PvfExecKind::BackingSystemParas && + matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) && candidate_a_commitments_hash == candidate_receipt.commitments_hash => { response_sender.send(Ok( @@ -1622,7 +1622,7 @@ fn backing_dont_second_invalid() { ) if validation_data == pvd_a && validation_code == validation_code_a && *pov == pov_block_a && candidate_receipt.descriptor == candidate_a.descriptor && - exec_kind == PvfExecKind::BackingSystemParas && + matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) && candidate_a.commitments.hash() == candidate_receipt.commitments_hash => { response_sender.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap(); @@ -1662,7 +1662,7 @@ fn backing_dont_second_invalid() { ) if validation_data == pvd_b && validation_code == validation_code_b && *pov == pov_block_b && candidate_receipt.descriptor == candidate_b.descriptor && - exec_kind == PvfExecKind::BackingSystemParas && + matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) && candidate_b.commitments.hash() == candidate_receipt.commitments_hash => { response_sender.send(Ok( @@ -1789,7 +1789,7 @@ fn backing_second_after_first_fails_works() { ) if validation_data == pvd_a && validation_code == validation_code_a && *pov == pov_a && candidate_receipt.descriptor == candidate.descriptor && - exec_kind == PvfExecKind::BackingSystemParas && + matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) && candidate.commitments.hash() == candidate_receipt.commitments_hash => { response_sender.send(Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn))).unwrap(); @@ -1933,7 +1933,7 @@ fn backing_works_after_failed_validation() { ) if validation_data == pvd_a && validation_code == validation_code_a && *pov == pov_a && candidate_receipt.descriptor == candidate.descriptor && - exec_kind == PvfExecKind::BackingSystemParas && + matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) && candidate.commitments.hash() == candidate_receipt.commitments_hash => { response_sender.send(Err(ValidationFailed("Internal test error".into()))).unwrap(); @@ -2212,7 +2212,7 @@ fn retry_works() { ) if validation_data == pvd_a && validation_code == validation_code_a && *pov == pov_a && candidate_receipt.descriptor == candidate.descriptor && - exec_kind == PvfExecKind::BackingSystemParas && + matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) && candidate.commitments.hash() == candidate_receipt.commitments_hash ); virtual_overseer @@ -2754,7 +2754,7 @@ fn validator_ignores_statements_from_disabled_validators() { ) if validation_data == pvd && validation_code == expected_validation_code && *pov == expected_pov && candidate_receipt.descriptor == candidate.descriptor && - exec_kind == PvfExecKind::BackingSystemParas && + matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) && candidate_commitments_hash == candidate_receipt.commitments_hash => { response_sender.send(Ok( diff --git a/polkadot/node/core/backing/src/tests/prospective_parachains.rs b/polkadot/node/core/backing/src/tests/prospective_parachains.rs index caddd2408057..db5409ee4bd5 100644 --- a/polkadot/node/core/backing/src/tests/prospective_parachains.rs +++ b/polkadot/node/core/backing/src/tests/prospective_parachains.rs @@ -276,7 +276,7 @@ async fn assert_validate_seconded_candidate( &validation_code == assert_validation_code && &*pov == assert_pov && candidate_receipt.descriptor == candidate.descriptor && - exec_kind == PvfExecKind::BackingSystemParas && + matches!(exec_kind, PvfExecKind::BackingSystemParas(_)) && candidate.commitments.hash() == candidate_receipt.commitments_hash => { response_sender.send(Ok(ValidationResult::Valid( diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 1e732e2f1f03..25614349486e 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -31,13 +31,16 @@ use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult}; use polkadot_node_subsystem::{ errors::RuntimeApiError, messages::{ - CandidateValidationMessage, PreCheckOutcome, PvfExecKind, RuntimeApiMessage, - RuntimeApiRequest, ValidationFailed, + CandidateValidationMessage, ChainApiMessage, PreCheckOutcome, PvfExecKind, + RuntimeApiMessage, RuntimeApiRequest, ValidationFailed, }, overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult, SubsystemSender, }; -use polkadot_node_subsystem_util::{self as util, runtime::ClaimQueueSnapshot}; +use polkadot_node_subsystem_util::{ + self as util, + runtime::{prospective_parachains_mode, ClaimQueueSnapshot, ProspectiveParachainsMode}, +}; use polkadot_overseer::ActiveLeavesUpdate; use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult; use polkadot_primitives::{ @@ -279,6 +282,7 @@ async fn run( comm = ctx.recv().fuse() => { match comm { Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update))) => { + update_active_leaves(ctx.sender(), validation_host.clone(), update.clone()).await; maybe_prepare_validation(ctx.sender(), keystore.clone(), validation_host.clone(), update, &mut prepare_state).await; }, Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {}, @@ -551,6 +555,66 @@ where Some(processed_code_hashes) } +async fn update_active_leaves( + sender: &mut Sender, + mut validation_backend: impl ValidationBackend, + update: ActiveLeavesUpdate, +) where + Sender: SubsystemSender + SubsystemSender, +{ + let ancestors = get_block_ancestors(sender, update.activated.as_ref().map(|x| x.hash)).await; + if let Err(err) = validation_backend.update_active_leaves(update, ancestors).await { + gum::warn!( + target: LOG_TARGET, + ?err, + "cannot update active leaves in validation backend", + ); + }; +} + +async fn get_allowed_ancestry_len(sender: &mut Sender, relay_parent: Hash) -> Option +where + Sender: SubsystemSender + SubsystemSender, +{ + match prospective_parachains_mode(sender, relay_parent).await { + Ok(ProspectiveParachainsMode::Enabled { allowed_ancestry_len, .. }) => + Some(allowed_ancestry_len), + res => { + gum::warn!(target: LOG_TARGET, ?res, "async backing is disabled"); + None + }, + } +} + +async fn get_block_ancestors( + sender: &mut Sender, + maybe_relay_parent: Option, +) -> Vec +where + Sender: SubsystemSender + SubsystemSender, +{ + let Some(relay_parent) = maybe_relay_parent else { return vec![] }; + let Some(allowed_ancestry_len) = get_allowed_ancestry_len(sender, relay_parent).await else { + return vec![] + }; + + let (tx, rx) = oneshot::channel(); + sender + .send_message(ChainApiMessage::Ancestors { + hash: relay_parent, + k: allowed_ancestry_len, + response_channel: tx, + }) + .await; + match rx.await { + Ok(Ok(x)) => x, + res => { + gum::warn!(target: LOG_TARGET, ?res, "cannot request ancestors"); + vec![] + }, + } +} + struct RuntimeRequestFailed; async fn runtime_api_request( @@ -698,7 +762,7 @@ async fn validate_candidate_exhaustive( // We only check the session index for backing. match (exec_kind, candidate_receipt.descriptor.session_index()) { - (PvfExecKind::Backing | PvfExecKind::BackingSystemParas, Some(session_index)) => { + (PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_), Some(session_index)) => { let Some(expected_session_index) = maybe_expected_session_index else { let error = "cannot fetch session index from the runtime"; gum::warn!( @@ -731,7 +795,7 @@ async fn validate_candidate_exhaustive( let result = match exec_kind { // Retry is disabled to reduce the chance of nondeterministic blocks getting backed and // honest backers getting slashed. - PvfExecKind::Backing | PvfExecKind::BackingSystemParas => { + PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_) => { let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare); let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind.into()); let pvf = PvfPrepData::from_code( @@ -809,6 +873,15 @@ async fn validate_candidate_exhaustive( ); Err(ValidationFailed(e.to_string())) }, + Err(e @ ValidationError::ExecutionDeadline) => { + gum::warn!( + target: LOG_TARGET, + ?para_id, + ?e, + "Job assigned too late, execution queue probably overloaded", + ); + Err(ValidationFailed(e.to_string())) + }, Ok(res) => if res.head_data.hash() != candidate_receipt.descriptor.para_head() { gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (para_head)"); @@ -846,7 +919,7 @@ async fn validate_candidate_exhaustive( // descriptor core index. ( Some(_core_index), - PvfExecKind::Backing | PvfExecKind::BackingSystemParas, + PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_), ) => { let Some(claim_queue) = maybe_claim_queue else { let error = "cannot fetch the claim queue from the runtime"; @@ -994,7 +1067,12 @@ trait ValidationBackend { retry_immediately = true; }, - Ok(_) | Err(ValidationError::Invalid(_) | ValidationError::Preparation(_)) => break, + Ok(_) | + Err( + ValidationError::Invalid(_) | + ValidationError::Preparation(_) | + ValidationError::ExecutionDeadline, + ) => break, } // If we got a possibly transient error, retry once after a brief delay, on the @@ -1035,6 +1113,12 @@ trait ValidationBackend { async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError>; async fn heads_up(&mut self, active_pvfs: Vec) -> Result<(), String>; + + async fn update_active_leaves( + &mut self, + update: ActiveLeavesUpdate, + ancestors: Vec, + ) -> Result<(), String>; } #[async_trait] @@ -1085,6 +1169,14 @@ impl ValidationBackend for ValidationHost { async fn heads_up(&mut self, active_pvfs: Vec) -> Result<(), String> { self.heads_up(active_pvfs).await } + + async fn update_active_leaves( + &mut self, + update: ActiveLeavesUpdate, + ancestors: Vec, + ) -> Result<(), String> { + self.update_active_leaves(update, ancestors).await + } } /// Does basic checks of a candidate. Provide the encoded PoV-block. Returns `Ok` if basic checks diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index 391247858ed6..98e34a1cb4c1 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -473,6 +473,14 @@ impl ValidationBackend for MockValidateCandidateBackend { async fn heads_up(&mut self, _active_pvfs: Vec) -> Result<(), String> { unreachable!() } + + async fn update_active_leaves( + &mut self, + _update: ActiveLeavesUpdate, + _ancestors: Vec, + ) -> Result<(), String> { + unreachable!() + } } #[test] @@ -531,7 +539,7 @@ fn session_index_checked_only_in_backing() { candidate_receipt.clone(), Arc::new(pov.clone()), ExecutorParams::default(), - PvfExecKind::Backing, + PvfExecKind::Backing(dummy_hash()), &Default::default(), Default::default(), )) @@ -670,7 +678,7 @@ fn candidate_validation_ok_is_ok(#[case] v2_descriptor: bool) { candidate_receipt, Arc::new(pov), ExecutorParams::default(), - PvfExecKind::Backing, + PvfExecKind::Backing(dummy_hash()), &Default::default(), Some(ClaimQueueSnapshot(cq)), )) @@ -748,7 +756,7 @@ fn invalid_session_or_core_index() { candidate_receipt.clone(), Arc::new(pov.clone()), ExecutorParams::default(), - PvfExecKind::Backing, + PvfExecKind::Backing(dummy_hash()), &Default::default(), Default::default(), )) @@ -764,7 +772,7 @@ fn invalid_session_or_core_index() { candidate_receipt.clone(), Arc::new(pov.clone()), ExecutorParams::default(), - PvfExecKind::BackingSystemParas, + PvfExecKind::BackingSystemParas(dummy_hash()), &Default::default(), Default::default(), )) @@ -782,7 +790,7 @@ fn invalid_session_or_core_index() { candidate_receipt.clone(), Arc::new(pov.clone()), ExecutorParams::default(), - PvfExecKind::Backing, + PvfExecKind::Backing(dummy_hash()), &Default::default(), Some(Default::default()), )) @@ -797,7 +805,7 @@ fn invalid_session_or_core_index() { candidate_receipt.clone(), Arc::new(pov.clone()), ExecutorParams::default(), - PvfExecKind::BackingSystemParas, + PvfExecKind::BackingSystemParas(dummy_hash()), &Default::default(), Some(Default::default()), )) @@ -866,7 +874,7 @@ fn invalid_session_or_core_index() { candidate_receipt.clone(), Arc::new(pov.clone()), ExecutorParams::default(), - PvfExecKind::Backing, + PvfExecKind::Backing(dummy_hash()), &Default::default(), Some(ClaimQueueSnapshot(cq.clone())), )) @@ -889,7 +897,7 @@ fn invalid_session_or_core_index() { candidate_receipt.clone(), Arc::new(pov.clone()), ExecutorParams::default(), - PvfExecKind::BackingSystemParas, + PvfExecKind::BackingSystemParas(dummy_hash()), &Default::default(), Some(ClaimQueueSnapshot(cq)), )) @@ -944,7 +952,7 @@ fn candidate_validation_bad_return_is_invalid() { candidate_receipt, Arc::new(pov), ExecutorParams::default(), - PvfExecKind::Backing, + PvfExecKind::Backing(dummy_hash()), &Default::default(), Default::default(), )) @@ -1102,7 +1110,7 @@ fn candidate_validation_retry_internal_errors() { #[test] fn candidate_validation_dont_retry_internal_errors() { let v = candidate_validation_retry_on_error_helper( - PvfExecKind::Backing, + PvfExecKind::Backing(dummy_hash()), vec![ Err(InternalValidationError::HostCommunication("foo".into()).into()), // Throw an AWD error, we should still retry again. @@ -1136,7 +1144,7 @@ fn candidate_validation_retry_panic_errors() { #[test] fn candidate_validation_dont_retry_panic_errors() { let v = candidate_validation_retry_on_error_helper( - PvfExecKind::Backing, + PvfExecKind::Backing(dummy_hash()), vec![ Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError("foo".into()))), // Throw an AWD error, we should still retry again. @@ -1233,7 +1241,7 @@ fn candidate_validation_timeout_is_internal_error() { candidate_receipt, Arc::new(pov), ExecutorParams::default(), - PvfExecKind::Backing, + PvfExecKind::Backing(dummy_hash()), &Default::default(), Default::default(), )); @@ -1282,7 +1290,7 @@ fn candidate_validation_commitment_hash_mismatch_is_invalid() { candidate_receipt, Arc::new(pov), ExecutorParams::default(), - PvfExecKind::Backing, + PvfExecKind::Backing(dummy_hash()), &Default::default(), Default::default(), )) @@ -1337,7 +1345,7 @@ fn candidate_validation_code_mismatch_is_invalid() { candidate_receipt, Arc::new(pov), ExecutorParams::default(), - PvfExecKind::Backing, + PvfExecKind::Backing(dummy_hash()), &Default::default(), Default::default(), )) @@ -1397,7 +1405,7 @@ fn compressed_code_works() { candidate_receipt, Arc::new(pov), ExecutorParams::default(), - PvfExecKind::Backing, + PvfExecKind::Backing(dummy_hash()), &Default::default(), Default::default(), )); @@ -1436,6 +1444,14 @@ impl ValidationBackend for MockPreCheckBackend { async fn heads_up(&mut self, _active_pvfs: Vec) -> Result<(), String> { unreachable!() } + + async fn update_active_leaves( + &mut self, + _update: ActiveLeavesUpdate, + _ancestors: Vec, + ) -> Result<(), String> { + unreachable!() + } } #[test] @@ -1592,6 +1608,14 @@ impl ValidationBackend for MockHeadsUp { let _ = self.heads_up_call_count.fetch_add(1, Ordering::SeqCst); Ok(()) } + + async fn update_active_leaves( + &mut self, + _update: ActiveLeavesUpdate, + _ancestors: Vec, + ) -> Result<(), String> { + unreachable!() + } } fn alice_keystore() -> KeystorePtr { diff --git a/polkadot/node/core/pvf/Cargo.toml b/polkadot/node/core/pvf/Cargo.toml index 13fcdc69a99a..a9f97c308f26 100644 --- a/polkadot/node/core/pvf/Cargo.toml +++ b/polkadot/node/core/pvf/Cargo.toml @@ -52,6 +52,7 @@ criterion = { features = [ hex-literal = { workspace = true, default-features = true } polkadot-node-core-pvf-common = { features = ["test-utils"], workspace = true, default-features = true } +polkadot-node-subsystem-test-helpers = { workspace = true } # For benches and integration tests, depend on ourselves with the test-utils # feature. polkadot-node-core-pvf = { features = ["test-utils"], workspace = true, default-features = true } diff --git a/polkadot/node/core/pvf/src/error.rs b/polkadot/node/core/pvf/src/error.rs index a0634106052d..e68ba595ef5a 100644 --- a/polkadot/node/core/pvf/src/error.rs +++ b/polkadot/node/core/pvf/src/error.rs @@ -39,6 +39,11 @@ pub enum ValidationError { /// Preparation or execution issue caused by an internal condition. Should not vote against. #[error("candidate validation: internal: {0}")] Internal(#[from] InternalValidationError), + /// The execution deadline of allowed_ancestry_len + 1 has been reached. Jobs like backing have + /// a limited time to execute. Once the deadline is reached, the current candidate cannot be + /// backed, regardless of its validity. + #[error("candidate validation: execution deadline has been reached.")] + ExecutionDeadline, } /// A description of an error raised during executing a PVF and can be attributed to the combination diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index 2ac5116912eb..6d27ab0261d9 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -35,8 +35,8 @@ use polkadot_node_core_pvf_common::{ SecurityStatus, }; use polkadot_node_primitives::PoV; -use polkadot_node_subsystem::messages::PvfExecKind; -use polkadot_primitives::{ExecutorParams, ExecutorParamsHash, PersistedValidationData}; +use polkadot_node_subsystem::{messages::PvfExecKind, ActiveLeavesUpdate}; +use polkadot_primitives::{ExecutorParams, ExecutorParamsHash, Hash, PersistedValidationData}; use slotmap::HopSlotMap; use std::{ collections::{HashMap, VecDeque}, @@ -45,7 +45,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use strum::IntoEnumIterator; +use strum::{EnumIter, IntoEnumIterator}; /// The amount of time a job for which the queue does not have a compatible worker may wait in the /// queue. After that time passes, the queue will kill the first worker which becomes idle to @@ -58,6 +58,7 @@ slotmap::new_key_type! { struct Worker; } #[derive(Debug)] pub enum ToQueue { + UpdateActiveLeaves { update: ActiveLeavesUpdate, ancestors: Vec }, Enqueue { artifact: ArtifactPathId, pending_execution_request: PendingExecutionRequest }, } @@ -82,6 +83,7 @@ pub struct PendingExecutionRequest { struct ExecuteJob { artifact: ArtifactPathId, exec_timeout: Duration, + exec_kind: PvfExecKind, pvd: Arc, pov: Arc, executor_params: ExecutorParams, @@ -172,6 +174,9 @@ struct Queue { unscheduled: Unscheduled, workers: Workers, mux: Mux, + + /// Active leaves and their ancestors to check the viability of backing jobs. + active_leaves: HashMap>, } impl Queue { @@ -202,6 +207,7 @@ impl Queue { spawn_inflight: 0, capacity: worker_capacity, }, + active_leaves: Default::default(), } } @@ -278,15 +284,74 @@ impl Queue { } let job = queue.remove(job_index).expect("Job is just checked to be in queue; qed"); + let exec_kind = job.exec_kind; if let Some(worker) = worker { assign(self, worker, job); } else { spawn_extra_worker(self, job); } - self.metrics.on_execute_kind(priority); + self.metrics.on_execute_kind(exec_kind); self.unscheduled.mark_scheduled(priority); } + + fn update_active_leaves(&mut self, update: ActiveLeavesUpdate, ancestors: Vec) { + self.prune_deactivated_leaves(&update); + self.insert_active_leaf(update, ancestors); + self.prune_old_jobs(); + } + + fn prune_deactivated_leaves(&mut self, update: &ActiveLeavesUpdate) { + for hash in &update.deactivated { + let _ = self.active_leaves.remove(&hash); + } + + gum::debug!(target: LOG_TARGET, size = ?self.active_leaves.len(), "Active leaves pruned"); + } + + fn insert_active_leaf(&mut self, update: ActiveLeavesUpdate, ancestors: Vec) { + let Some(leaf) = update.activated else { return }; + let _ = self.active_leaves.insert(leaf.hash, ancestors); + } + + fn prune_old_jobs(&mut self) { + for &priority in &[Priority::Backing, Priority::BackingSystemParas] { + let Some(queue) = self.unscheduled.get_mut(priority) else { continue }; + let to_remove: Vec = queue + .iter() + .enumerate() + .filter_map(|(index, job)| { + let relay_parent = match job.exec_kind { + PvfExecKind::Backing(x) | PvfExecKind::BackingSystemParas(x) => x, + _ => return None, + }; + let in_active_fork = self.active_leaves.iter().any(|(hash, ancestors)| { + *hash == relay_parent || ancestors.contains(&relay_parent) + }); + if in_active_fork { + None + } else { + Some(index) + } + }) + .collect(); + + for &index in to_remove.iter().rev() { + if index > queue.len() { + continue + } + + let Some(job) = queue.remove(index) else { continue }; + let _ = job.result_tx.send(Err(ValidationError::ExecutionDeadline)); + gum::warn!( + target: LOG_TARGET, + ?priority, + exec_kind = ?job.exec_kind, + "Job exceeded its deadline and was dropped without execution", + ); + } + } + } } async fn purge_dead(metrics: &Metrics, workers: &mut Workers) { @@ -305,27 +370,40 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) { } fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { - let ToQueue::Enqueue { artifact, pending_execution_request } = to_queue; - let PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx, exec_kind } = - pending_execution_request; - gum::debug!( - target: LOG_TARGET, - validation_code_hash = ?artifact.id.code_hash, - "enqueueing an artifact for execution", - ); - queue.metrics.observe_pov_size(pov.block_data.0.len(), true); - queue.metrics.execute_enqueued(); - let job = ExecuteJob { - artifact, - exec_timeout, - pvd, - pov, - executor_params, - result_tx, - waiting_since: Instant::now(), - }; - queue.unscheduled.add(job, exec_kind); - queue.try_assign_next_job(None); + match to_queue { + ToQueue::UpdateActiveLeaves { update, ancestors } => { + queue.update_active_leaves(update, ancestors); + }, + ToQueue::Enqueue { artifact, pending_execution_request } => { + let PendingExecutionRequest { + exec_timeout, + pvd, + pov, + executor_params, + result_tx, + exec_kind, + } = pending_execution_request; + gum::debug!( + target: LOG_TARGET, + validation_code_hash = ?artifact.id.code_hash, + "enqueueing an artifact for execution", + ); + queue.metrics.observe_pov_size(pov.block_data.0.len(), true); + queue.metrics.execute_enqueued(); + let job = ExecuteJob { + artifact, + exec_timeout, + exec_kind, + pvd, + pov, + executor_params, + result_tx, + waiting_since: Instant::now(), + }; + queue.unscheduled.add(job, exec_kind.into()); + queue.try_assign_next_job(None); + }, + } } async fn handle_mux(queue: &mut Queue, event: QueueEvent) { @@ -648,9 +726,32 @@ pub fn start( (to_queue_tx, from_queue_rx, run) } +/// Priority of execution jobs based on PvfExecKind. +/// +/// The order is important, because we iterate through the values and assume it is going from higher +/// to lowest priority. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, EnumIter)] +enum Priority { + Dispute, + Approval, + BackingSystemParas, + Backing, +} + +impl From for Priority { + fn from(kind: PvfExecKind) -> Self { + match kind { + PvfExecKind::Dispute => Priority::Dispute, + PvfExecKind::Approval => Priority::Approval, + PvfExecKind::BackingSystemParas(_) => Priority::BackingSystemParas, + PvfExecKind::Backing(_) => Priority::Backing, + } + } +} + struct Unscheduled { - unscheduled: HashMap>, - counter: HashMap, + unscheduled: HashMap>, + counter: HashMap, } impl Unscheduled { @@ -677,34 +778,34 @@ impl Unscheduled { /// approvals could not exceed 24%, even if there are no disputes. /// - We cannot fully prioritize backing system parachains over backing other parachains based /// on the distribution of the original 100%. - const PRIORITY_ALLOCATION_THRESHOLDS: &'static [(PvfExecKind, usize)] = &[ - (PvfExecKind::Dispute, 70), - (PvfExecKind::Approval, 80), - (PvfExecKind::BackingSystemParas, 100), - (PvfExecKind::Backing, 100), + const PRIORITY_ALLOCATION_THRESHOLDS: &'static [(Priority, usize)] = &[ + (Priority::Dispute, 70), + (Priority::Approval, 80), + (Priority::BackingSystemParas, 100), + (Priority::Backing, 100), ]; fn new() -> Self { Self { - unscheduled: PvfExecKind::iter().map(|priority| (priority, VecDeque::new())).collect(), - counter: PvfExecKind::iter().map(|priority| (priority, 0)).collect(), + unscheduled: Priority::iter().map(|priority| (priority, VecDeque::new())).collect(), + counter: Priority::iter().map(|priority| (priority, 0)).collect(), } } - fn select_next_priority(&self) -> PvfExecKind { + fn select_next_priority(&self) -> Priority { gum::debug!( target: LOG_TARGET, - unscheduled = ?self.unscheduled.iter().map(|(p, q)| (*p, q.len())).collect::>(), + unscheduled = ?self.unscheduled.iter().map(|(p, q)| (*p, q.len())).collect::>(), counter = ?self.counter, "Selecting next execution priority...", ); - let priority = PvfExecKind::iter() + let priority = Priority::iter() .find(|priority| self.has_pending(priority) && !self.has_reached_threshold(priority)) .unwrap_or_else(|| { - PvfExecKind::iter() + Priority::iter() .find(|priority| self.has_pending(priority)) - .unwrap_or(PvfExecKind::Backing) + .unwrap_or(Priority::Backing) }); gum::debug!( @@ -716,19 +817,19 @@ impl Unscheduled { priority } - fn get_mut(&mut self, priority: PvfExecKind) -> Option<&mut VecDeque> { + fn get_mut(&mut self, priority: Priority) -> Option<&mut VecDeque> { self.unscheduled.get_mut(&priority) } - fn add(&mut self, job: ExecuteJob, priority: PvfExecKind) { + fn add(&mut self, job: ExecuteJob, priority: Priority) { self.unscheduled.entry(priority).or_default().push_back(job); } - fn has_pending(&self, priority: &PvfExecKind) -> bool { + fn has_pending(&self, priority: &Priority) -> bool { !self.unscheduled.get(priority).unwrap_or(&VecDeque::new()).is_empty() } - fn priority_allocation_threshold(priority: &PvfExecKind) -> Option { + fn priority_allocation_threshold(priority: &Priority) -> Option { Self::PRIORITY_ALLOCATION_THRESHOLDS.iter().find_map(|&(p, value)| { if p == *priority { Some(value) @@ -740,7 +841,7 @@ impl Unscheduled { /// Checks if a given priority has reached its allocated threshold /// The thresholds are defined in `PRIORITY_ALLOCATION_THRESHOLDS`. - fn has_reached_threshold(&self, priority: &PvfExecKind) -> bool { + fn has_reached_threshold(&self, priority: &Priority) -> bool { let Some(threshold) = Self::priority_allocation_threshold(priority) else { return false }; let Some(count) = self.counter.get(&priority) else { return false }; // Every time we iterate by lower level priorities @@ -769,22 +870,28 @@ impl Unscheduled { has_reached_threshold } - fn mark_scheduled(&mut self, priority: PvfExecKind) { + fn mark_scheduled(&mut self, priority: Priority) { *self.counter.entry(priority).or_default() += 1; if self.counter.values().sum::() >= Self::SCHEDULING_WINDOW_SIZE { self.reset_counter(); } + gum::debug!( + target: LOG_TARGET, + ?priority, + "Job marked as scheduled", + ); } fn reset_counter(&mut self) { - self.counter = PvfExecKind::iter().map(|kind| (kind, 0)).collect(); + self.counter = Priority::iter().map(|kind| (kind, 0)).collect(); } } #[cfg(test)] mod tests { use polkadot_node_primitives::BlockData; + use polkadot_node_subsystem_test_helpers::mock::new_leaf; use sp_core::H256; use super::*; @@ -803,6 +910,7 @@ mod tests { ExecuteJob { artifact: ArtifactPathId { id: artifact_id(0), path: PathBuf::new() }, exec_timeout: Duration::from_secs(10), + exec_kind: PvfExecKind::Approval, pvd, pov, executor_params: ExecutorParams::default(), @@ -815,11 +923,11 @@ mod tests { fn test_unscheduled_add() { let mut unscheduled = Unscheduled::new(); - PvfExecKind::iter().for_each(|priority| { + Priority::iter().for_each(|priority| { unscheduled.add(create_execution_job(), priority); }); - PvfExecKind::iter().for_each(|priority| { + Priority::iter().for_each(|priority| { let queue = unscheduled.unscheduled.get(&priority).unwrap(); assert_eq!(queue.len(), 1); }); @@ -827,7 +935,7 @@ mod tests { #[test] fn test_unscheduled_priority_distribution() { - use PvfExecKind::*; + use Priority::*; let mut priorities = vec![]; @@ -852,7 +960,7 @@ mod tests { #[test] fn test_unscheduled_priority_distribution_without_backing_system_paras() { - use PvfExecKind::*; + use Priority::*; let mut priorities = vec![]; @@ -876,7 +984,7 @@ mod tests { #[test] fn test_unscheduled_priority_distribution_without_disputes() { - use PvfExecKind::*; + use Priority::*; let mut priorities = vec![]; @@ -900,7 +1008,7 @@ mod tests { #[test] fn test_unscheduled_priority_distribution_without_disputes_and_only_one_backing() { - use PvfExecKind::*; + use Priority::*; let mut priorities = vec![]; @@ -922,7 +1030,7 @@ mod tests { #[test] fn test_unscheduled_does_not_postpone_backing() { - use PvfExecKind::*; + use Priority::*; let mut priorities = vec![]; @@ -940,4 +1048,67 @@ mod tests { assert_eq!(&priorities[..4], &[Approval, Backing, Approval, Approval]); } + + #[tokio::test] + async fn test_prunes_old_jobs_on_active_leaves_update() { + // Set up a queue, but without a real worker, we won't execute any jobs. + let (_, to_queue_rx) = mpsc::channel(1); + let (from_queue_tx, _) = mpsc::unbounded(); + let mut queue = Queue::new( + Metrics::default(), + PathBuf::new(), + PathBuf::new(), + 1, + Duration::from_secs(1), + None, + SecurityStatus::default(), + to_queue_rx, + from_queue_tx, + ); + let old_relay_parent = Hash::random(); + let relevant_relay_parent = Hash::random(); + + assert_eq!(queue.unscheduled.unscheduled.values().map(|x| x.len()).sum::(), 0); + let mut result_rxs = vec![]; + let (result_tx, _result_rx) = oneshot::channel(); + let relevant_job = ExecuteJob { + artifact: ArtifactPathId { id: artifact_id(0), path: PathBuf::new() }, + exec_timeout: Duration::from_secs(1), + exec_kind: PvfExecKind::Backing(relevant_relay_parent), + pvd: Arc::new(PersistedValidationData::default()), + pov: Arc::new(PoV { block_data: BlockData(Vec::new()) }), + executor_params: ExecutorParams::default(), + result_tx, + waiting_since: Instant::now(), + }; + queue.unscheduled.add(relevant_job, Priority::Backing); + for _ in 0..10 { + let (result_tx, result_rx) = oneshot::channel(); + let expired_job = ExecuteJob { + artifact: ArtifactPathId { id: artifact_id(0), path: PathBuf::new() }, + exec_timeout: Duration::from_secs(1), + exec_kind: PvfExecKind::Backing(old_relay_parent), + pvd: Arc::new(PersistedValidationData::default()), + pov: Arc::new(PoV { block_data: BlockData(Vec::new()) }), + executor_params: ExecutorParams::default(), + result_tx, + waiting_since: Instant::now(), + }; + queue.unscheduled.add(expired_job, Priority::Backing); + result_rxs.push(result_rx); + } + assert_eq!(queue.unscheduled.unscheduled.values().map(|x| x.len()).sum::(), 11); + + // Add an active leaf + queue.update_active_leaves( + ActiveLeavesUpdate::start_work(new_leaf(Hash::random(), 1)), + vec![relevant_relay_parent], + ); + + // It prunes all old jobs and drops them with an `ExecutionDeadline` error. + for rx in result_rxs { + assert!(matches!(rx.await, Ok(Err(ValidationError::ExecutionDeadline)))); + } + assert_eq!(queue.unscheduled.unscheduled.values().map(|x| x.len()).sum::(), 1); + } } diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 37cd6fcbf74a..8252904095b3 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -37,9 +37,11 @@ use polkadot_node_core_pvf_common::{ pvf::PvfPrepData, }; use polkadot_node_primitives::PoV; -use polkadot_node_subsystem::{messages::PvfExecKind, SubsystemError, SubsystemResult}; +use polkadot_node_subsystem::{ + messages::PvfExecKind, ActiveLeavesUpdate, SubsystemError, SubsystemResult, +}; use polkadot_parachain_primitives::primitives::ValidationResult; -use polkadot_primitives::PersistedValidationData; +use polkadot_primitives::{Hash, PersistedValidationData}; use std::{ collections::HashMap, path::PathBuf, @@ -143,12 +145,27 @@ impl ValidationHost { .await .map_err(|_| "the inner loop hung up".to_string()) } + + /// Sends a signal to the validation host requesting to update best block. + /// + /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. + pub async fn update_active_leaves( + &mut self, + update: ActiveLeavesUpdate, + ancestors: Vec, + ) -> Result<(), String> { + self.to_host_tx + .send(ToHost::UpdateActiveLeaves { update, ancestors }) + .await + .map_err(|_| "the inner loop hung up".to_string()) + } } enum ToHost { PrecheckPvf { pvf: PvfPrepData, result_tx: PrecheckResultSender }, ExecutePvf(ExecutePvfInputs), HeadsUp { active_pvfs: Vec }, + UpdateActiveLeaves { update: ActiveLeavesUpdate, ancestors: Vec }, } struct ExecutePvfInputs { @@ -488,6 +505,8 @@ async fn handle_to_host( }, ToHost::HeadsUp { active_pvfs } => handle_heads_up(artifacts, prepare_queue, active_pvfs).await?, + ToHost::UpdateActiveLeaves { update, ancestors } => + handle_update_active_leaves(execute_queue, update, ancestors).await?, } Ok(()) @@ -855,6 +874,14 @@ async fn handle_prepare_done( Ok(()) } +async fn handle_update_active_leaves( + execute_queue: &mut mpsc::Sender, + update: ActiveLeavesUpdate, + ancestors: Vec, +) -> Result<(), Fatal> { + send_execute(execute_queue, execute::ToQueue::UpdateActiveLeaves { update, ancestors }).await +} + async fn send_prepare( prepare_queue: &mut mpsc::Sender, to_queue: prepare::ToQueue, @@ -1255,7 +1282,7 @@ pub(crate) mod tests { pvd.clone(), pov1.clone(), Priority::Normal, - PvfExecKind::Backing, + PvfExecKind::Backing(H256::default()), result_tx, ) .await @@ -1268,7 +1295,7 @@ pub(crate) mod tests { pvd.clone(), pov1, Priority::Critical, - PvfExecKind::Backing, + PvfExecKind::Backing(H256::default()), result_tx, ) .await @@ -1281,7 +1308,7 @@ pub(crate) mod tests { pvd, pov2, Priority::Normal, - PvfExecKind::Backing, + PvfExecKind::Backing(H256::default()), result_tx, ) .await @@ -1431,7 +1458,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, - PvfExecKind::Backing, + PvfExecKind::Backing(H256::default()), result_tx, ) .await @@ -1480,7 +1507,7 @@ pub(crate) mod tests { pvd, pov, Priority::Critical, - PvfExecKind::Backing, + PvfExecKind::Backing(H256::default()), result_tx, ) .await @@ -1591,7 +1618,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, - PvfExecKind::Backing, + PvfExecKind::Backing(H256::default()), result_tx, ) .await @@ -1623,7 +1650,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, - PvfExecKind::Backing, + PvfExecKind::Backing(H256::default()), result_tx_2, ) .await @@ -1647,7 +1674,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, - PvfExecKind::Backing, + PvfExecKind::Backing(H256::default()), result_tx_3, ) .await @@ -1706,7 +1733,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, - PvfExecKind::Backing, + PvfExecKind::Backing(H256::default()), result_tx, ) .await @@ -1738,7 +1765,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, - PvfExecKind::Backing, + PvfExecKind::Backing(H256::default()), result_tx_2, ) .await @@ -1762,7 +1789,7 @@ pub(crate) mod tests { pvd.clone(), pov.clone(), Priority::Critical, - PvfExecKind::Backing, + PvfExecKind::Backing(H256::default()), result_tx_3, ) .await @@ -1837,7 +1864,7 @@ pub(crate) mod tests { pvd, pov, Priority::Normal, - PvfExecKind::Backing, + PvfExecKind::Backing(H256::default()), result_tx, ) .await diff --git a/polkadot/node/core/pvf/src/priority.rs b/polkadot/node/core/pvf/src/priority.rs index 7aaeacf36220..5a58fbc8ade3 100644 --- a/polkadot/node/core/pvf/src/priority.rs +++ b/polkadot/node/core/pvf/src/priority.rs @@ -43,8 +43,8 @@ impl From for Priority { match priority { PvfExecKind::Dispute => Priority::Critical, PvfExecKind::Approval => Priority::Critical, - PvfExecKind::BackingSystemParas => Priority::Normal, - PvfExecKind::Backing => Priority::Normal, + PvfExecKind::BackingSystemParas(_) => Priority::Normal, + PvfExecKind::Backing(_) => Priority::Normal, } } } diff --git a/polkadot/node/core/pvf/tests/it/adder.rs b/polkadot/node/core/pvf/tests/it/adder.rs index 1a95a28fe077..924ea7166702 100644 --- a/polkadot/node/core/pvf/tests/it/adder.rs +++ b/polkadot/node/core/pvf/tests/it/adder.rs @@ -46,6 +46,7 @@ async fn execute_good_block_on_parent() { pvd, pov, Default::default(), + H256::default(), ) .await .unwrap(); @@ -82,6 +83,7 @@ async fn execute_good_chain_on_parent() { pvd, pov, Default::default(), + H256::default(), ) .await .unwrap(); @@ -120,6 +122,7 @@ async fn execute_bad_block_on_parent() { pvd, pov, Default::default(), + H256::default(), ) .await .unwrap_err(); @@ -145,6 +148,7 @@ async fn stress_spawn() { pvd, pov, Default::default(), + H256::default(), ) .await .unwrap(); @@ -185,6 +189,7 @@ async fn execute_can_run_serially() { pvd, pov, Default::default(), + H256::default(), ) .await .unwrap(); diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index 4cbc6fb04a8e..cfb78fd530d2 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -28,8 +28,8 @@ use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT}; use polkadot_node_subsystem::messages::PvfExecKind; use polkadot_parachain_primitives::primitives::{BlockData, ValidationResult}; use polkadot_primitives::{ - ExecutorParam, ExecutorParams, PersistedValidationData, PvfExecKind as RuntimePvfExecKind, - PvfPrepKind, + ExecutorParam, ExecutorParams, Hash, PersistedValidationData, + PvfExecKind as RuntimePvfExecKind, PvfPrepKind, }; use sp_core::H256; @@ -108,6 +108,7 @@ impl TestHost { pvd: PersistedValidationData, pov: PoV, executor_params: ExecutorParams, + relay_parent: Hash, ) -> Result { let (result_tx, result_rx) = futures::channel::oneshot::channel(); @@ -125,7 +126,7 @@ impl TestHost { Arc::new(pvd), Arc::new(pov), polkadot_node_core_pvf::Priority::Normal, - PvfExecKind::Backing, + PvfExecKind::Backing(relay_parent), result_tx, ) .await @@ -171,7 +172,13 @@ async fn execute_job_terminates_on_timeout() { let start = std::time::Instant::now(); let result = host - .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) + .validate_candidate( + test_parachain_halt::wasm_binary_unwrap(), + pvd, + pov, + Default::default(), + H256::default(), + ) .await; match result { @@ -201,12 +208,14 @@ async fn ensure_parallel_execution() { pvd.clone(), pov.clone(), Default::default(), + H256::default(), ); let execute_pvf_future_2 = host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default(), + H256::default(), ); let start = std::time::Instant::now(); @@ -254,6 +263,7 @@ async fn execute_queue_doesnt_stall_if_workers_died() { pvd.clone(), pov.clone(), Default::default(), + H256::default(), ) })) .await; @@ -303,6 +313,7 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { 0 => executor_params_1.clone(), _ => executor_params_2.clone(), }, + H256::default(), ) })) .await; @@ -359,7 +370,13 @@ async fn deleting_prepared_artifact_does_not_dispute() { // Try to validate, artifact should get recreated. let result = host - .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) + .validate_candidate( + test_parachain_halt::wasm_binary_unwrap(), + pvd, + pov, + Default::default(), + H256::default(), + ) .await; assert_matches!(result, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout))); @@ -410,7 +427,13 @@ async fn corrupted_prepared_artifact_does_not_dispute() { // Try to validate, artifact should get removed because of the corruption. let result = host - .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) + .validate_candidate( + test_parachain_halt::wasm_binary_unwrap(), + pvd, + pov, + Default::default(), + H256::default(), + ) .await; assert_matches!( @@ -684,7 +707,9 @@ async fn invalid_compressed_code_fails_validation() { let validation_code = sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1).unwrap(); - let result = host.validate_candidate(&validation_code, pvd, pov, Default::default()).await; + let result = host + .validate_candidate(&validation_code, pvd, pov, Default::default(), H256::default()) + .await; assert_matches!( result, @@ -708,7 +733,13 @@ async fn invalid_compressed_pov_fails_validation() { let pov = PoV { block_data: BlockData(block_data) }; let result = host - .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) + .validate_candidate( + test_parachain_halt::wasm_binary_unwrap(), + pvd, + pov, + Default::default(), + H256::default(), + ) .await; assert_matches!( diff --git a/polkadot/node/core/pvf/tests/it/process.rs b/polkadot/node/core/pvf/tests/it/process.rs index b3023c8a45c3..353367b394f3 100644 --- a/polkadot/node/core/pvf/tests/it/process.rs +++ b/polkadot/node/core/pvf/tests/it/process.rs @@ -141,6 +141,7 @@ rusty_fork_test! { pvd, pov, Default::default(), + H256::default(), ) .await .unwrap(); @@ -187,6 +188,7 @@ rusty_fork_test! { pvd, pov, Default::default(), + H256::default(), ), // Send a stop signal to pause the worker. async { @@ -242,6 +244,7 @@ rusty_fork_test! { pvd, pov, Default::default(), + H256::default(), ), // Run a future that kills the job while it's running. async { @@ -301,6 +304,7 @@ rusty_fork_test! { pvd, pov, Default::default(), + H256::default(), ), // Run a future that kills the job while it's running. async { @@ -372,6 +376,7 @@ rusty_fork_test! { pvd, pov, Default::default(), + H256::default(), ), // Run a future that tests the thread count while the worker is running. async { diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs index e1b2af733b47..f2cf60280b72 100644 --- a/polkadot/node/overseer/examples/minimal-example.rs +++ b/polkadot/node/overseer/examples/minimal-example.rs @@ -83,7 +83,7 @@ impl Subsystem1 { candidate_receipt, pov: PoV { block_data: BlockData(Vec::new()) }.into(), executor_params: Default::default(), - exec_kind: PvfExecKind::Backing, + exec_kind: PvfExecKind::Backing(dummy_hash()), response_sender: tx, }; ctx.send_message(msg).await; diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 87ef63d8a5d7..3881ddbcc904 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -468,6 +468,7 @@ pub async fn forward_events>(client: Arc

, mut hand )] pub struct Overseer { #[subsystem(CandidateValidationMessage, sends: [ + ChainApiMessage, RuntimeApiMessage, ])] candidate_validation: CandidateValidation, diff --git a/polkadot/node/overseer/src/tests.rs b/polkadot/node/overseer/src/tests.rs index c3c47335cd3e..0b9b783ef9b1 100644 --- a/polkadot/node/overseer/src/tests.rs +++ b/polkadot/node/overseer/src/tests.rs @@ -111,7 +111,7 @@ where candidate_receipt, pov: PoV { block_data: BlockData(Vec::new()) }.into(), executor_params: Default::default(), - exec_kind: PvfExecKind::Backing, + exec_kind: PvfExecKind::Backing(dummy_hash()), response_sender: tx, }) .await; @@ -811,7 +811,7 @@ fn test_candidate_validation_msg() -> CandidateValidationMessage { candidate_receipt, pov, executor_params: Default::default(), - exec_kind: PvfExecKind::Backing, + exec_kind: PvfExecKind::Backing(dummy_hash()), response_sender, } } diff --git a/polkadot/node/subsystem-types/Cargo.toml b/polkadot/node/subsystem-types/Cargo.toml index b8bad8f8a295..b5686ec96be1 100644 --- a/polkadot/node/subsystem-types/Cargo.toml +++ b/polkadot/node/subsystem-types/Cargo.toml @@ -32,4 +32,3 @@ prometheus-endpoint = { workspace = true, default-features = true } thiserror = { workspace = true } async-trait = { workspace = true } bitvec = { features = ["alloc"], workspace = true } -strum = { features = ["derive"], workspace = true, default-features = true } diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index ba1ba5755be0..28a3a1ab82ab 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -24,7 +24,6 @@ use futures::channel::oneshot; use sc_network::{Multiaddr, ReputationChange}; -use strum::EnumIter; use thiserror::Error; pub use sc_network::IfDisconnected; @@ -189,18 +188,16 @@ pub enum CandidateValidationMessage { /// Extends primitives::PvfExecKind, which is a runtime parameter we don't want to change, /// to separate and prioritize execution jobs by request type. -/// The order is important, because we iterate through the values and assume it is going from higher -/// to lowest priority. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, EnumIter)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PvfExecKind { /// For dispute requests Dispute, /// For approval requests Approval, - /// For backing requests from system parachains. - BackingSystemParas, - /// For backing requests. - Backing, + /// For backing requests from system parachains. With relay parent hash + BackingSystemParas(Hash), + /// For backing requests. With relay parent hash + Backing(Hash), } impl PvfExecKind { @@ -209,8 +206,8 @@ impl PvfExecKind { match *self { Self::Dispute => "dispute", Self::Approval => "approval", - Self::BackingSystemParas => "backing_system_paras", - Self::Backing => "backing", + Self::BackingSystemParas(_) => "backing_system_paras", + Self::Backing(_) => "backing", } } } @@ -220,8 +217,8 @@ impl From for RuntimePvfExecKind { match exec { PvfExecKind::Dispute => RuntimePvfExecKind::Approval, PvfExecKind::Approval => RuntimePvfExecKind::Approval, - PvfExecKind::BackingSystemParas => RuntimePvfExecKind::Backing, - PvfExecKind::Backing => RuntimePvfExecKind::Backing, + PvfExecKind::BackingSystemParas(_) => RuntimePvfExecKind::Backing, + PvfExecKind::Backing(_) => RuntimePvfExecKind::Backing, } } } diff --git a/prdoc/pr_5616.prdoc b/prdoc/pr_5616.prdoc new file mode 100644 index 000000000000..16d81c291c30 --- /dev/null +++ b/prdoc/pr_5616.prdoc @@ -0,0 +1,25 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: "PVF: drop backing jobs if it is too late" + +doc: + - audience: [ Node Dev, Node Operator ] + description: | + Introduces the removal of backing jobs that have been back pressured for longer than `allowedAncestryLen`, as these candidates are no longer viable. + +crates: + - name: polkadot-overseer + bump: major + - name: polkadot-node-core-pvf + bump: major + - name: polkadot-node-subsystem-types + bump: major + - name: polkadot-node-core-approval-voting + bump: patch + - name: polkadot-node-core-backing + bump: patch + - name: polkadot-node-core-candidate-validation + bump: patch + - name: polkadot-node-core-dispute-coordinator + bump: patch