From 32b86851a429f13f645b83fbb9254614f5218a62 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 15 Jul 2021 11:32:43 +1000 Subject: [PATCH] [squashed] Cache participating indices for Altair epoch processing #2416 Squashed commit of the following: commit 40b4ac374b279fd1856f1702e97b8b245f1b9bab Author: Paul Hauner Date: Thu Jul 15 10:10:07 2021 +1000 Move ValidatorsStatuses into base commit d4fb7fef5638262ede9b89cb2c4b2a9c387ab9c8 Author: Paul Hauner Date: Thu Jul 15 09:59:44 2021 +1000 Unify participation metrics commit b9d8e0751e976e68fa127d4d6aeec2356ca37c9a Author: Paul Hauner Date: Mon Jul 12 19:04:22 2021 +1000 Log error when exposing metrics fails commit 453319089bf358977c803414e48b386be3a1bc47 Author: Paul Hauner Date: Mon Jul 12 18:45:54 2021 +1000 Adjust error handling commit f7e3f4a597f5adb5a6bf107446cb3e3aff8b3a08 Author: Paul Hauner Date: Mon Jul 12 18:39:36 2021 +1000 Add `Error` enum commit b48e02a3b0f134b34ad6a5eb88885f4b38dc4adf Author: Paul Hauner Date: Mon Jul 12 18:31:31 2021 +1000 Tidy, fix error handling commit 1bdcb9edc976d034b35e1dc38915f74b75c91c40 Author: Paul Hauner Date: Mon Jul 12 18:12:31 2021 +1000 Tidy, change "active" to "active unslashed" commit dbbff17a316c62cf7c1ca16428df45b67991588b Author: Paul Hauner Date: Mon Jul 12 16:58:33 2021 +1000 Tidy commit 751abe58091d6e55075301ff0d99995a82095ccd Author: Paul Hauner Date: Mon Jul 12 11:29:14 2021 +1000 Avoid creating logs for offline vals commit 8644b05967a904be8d7b5c3417fa097cf63a8503 Author: Paul Hauner Date: Fri Jun 25 14:01:22 2021 +1000 Refactor validator inclusion endpoint commit de7ad33264615d298dc02bb148d72f0d4c06591e Author: Paul Hauner Date: Fri Jun 25 12:16:38 2021 +1000 Remove old balance getter in ef_tests commit 3fe97975292915793ffa8a7fa0a00df916f13ff9 Author: Paul Hauner Date: Fri Jun 25 12:11:19 2021 +1000 Remove flag error type FIXME commit e9ec4baaf87f729c3df6e3d23d66b1ea059ab0b9 Author: realbigsean Date: Thu Jun 17 17:15:54 2021 -0400 Couple more `unstable` merge fixes commit 10c793bb4dd4368b5d932f68473c869f6ede4660 Author: Paul Hauner Date: Fri Jun 25 11:56:04 2021 +1000 Tidy, remove unused function, fix init length commit 96efe876bb464af4da7477e829434a29757fcf98 Author: Paul Hauner Date: Fri Jun 25 11:41:40 2021 +1000 Tidy, add comments commit 9f083605e210b077bb42ac3d0366e83551f09ee1 Author: Paul Hauner Date: Fri Jun 25 11:03:36 2021 +1000 Working single-loop refactor commit 1f85249b8dabb631808e45a68ecc568c7ff5f0ef Author: Paul Hauner Date: Fri Jun 25 10:54:02 2021 +1000 Partial one-loop refactor commit 81b6768f00d15c1bb7f6439b7c2d8f48cfeb9cdb Author: Paul Hauner Date: Fri Jun 25 10:27:28 2021 +1000 Fix eligible indices commit 3b01744b14be5126cc784d2ff0a687cf37bd3ae8 Author: Paul Hauner Date: Fri Jun 25 10:08:28 2021 +1000 Used cached indices commit 5584854f46ae987cbc3f76f63f1128d5ec8a79b6 Author: Paul Hauner Date: Fri Jun 25 09:37:56 2021 +1000 Fix active indices commit 3a38047de7ee37ad2d5c7a83aaa8be734063933c Author: Paul Hauner Date: Fri Jun 25 09:23:22 2021 +1000 Fix balances issue commit 957bcad7d8c2bd390c538636eb78fb259c6fbd25 Author: Paul Hauner Date: Thu Jun 24 16:40:45 2021 +1000 Fix EF test compile errors commit a8b0a7c506dde4fd0345dcfce281c69b3d1b95cc Author: Paul Hauner Date: Thu Jun 24 16:40:32 2021 +1000 Remove junk file commit 99292c22da2b167594d8e15f9108b0c53e6d557b Author: Paul Hauner Date: Thu Jun 24 15:49:45 2021 +1000 Add eligible indices commit c6840bc984016690253e7bb438305b13ed499ba6 Author: Paul Hauner Date: Thu Jun 24 15:23:09 2021 +1000 Swap back to ParticipationCache commit ab687f300c683fc7895117e2f7f2ae04a971b903 Author: Paul Hauner Date: Thu Jun 24 13:14:53 2021 +1000 Bring balances into ParticipationCache commit b6d634c6361a6b0401b3ee449f04b7f6c24a606a Author: Paul Hauner Date: Thu Jun 24 11:57:54 2021 +1000 Add EpochCache commit c5658f0864a6f63bdb259ad58393a227736b373f Author: Paul Hauner Date: Thu Jun 24 11:45:44 2021 +1000 Tidy, remove metric commit 22fbf3e8132c2192e50cdf8285cf99bf69dd9160 Author: Paul Hauner Date: Wed Jun 23 16:45:28 2021 +1000 Remove unslashed indices function commit 6f87cd5b5bd45ec2c90cb72566dded17e4d4d26a Author: Paul Hauner Date: Wed Jun 23 15:42:12 2021 +1000 Update EpochProcessingSummary commit 1bb1c8fa03f982fe62d972fff09245555f19e7a7 Author: Paul Hauner Date: Wed Jun 23 15:07:06 2021 +1000 Add initial participation cache --- Cargo.lock | 1 + Makefile | 2 +- .../beacon_chain/src/block_verification.rs | 67 +-- beacon_node/beacon_chain/src/metrics.rs | 15 - .../beacon_chain/src/state_advance_timer.rs | 21 +- .../beacon_chain/src/validator_monitor.rs | 193 +++++---- .../http_api/src/validator_inclusion.rs | 116 +++-- book/src/validator-inclusion.md | 17 +- common/eth2/src/lighthouse.rs | 16 +- consensus/state_processing/Cargo.toml | 5 +- consensus/state_processing/src/lib.rs | 1 + consensus/state_processing/src/metrics.rs | 30 ++ .../src/per_epoch_processing.rs | 11 +- .../src/per_epoch_processing/altair.rs | 25 +- .../altair/inactivity_updates.rs | 13 +- .../altair/justification_and_finalization.rs | 27 +- .../altair/participation_cache.rs | 409 ++++++++++++++++++ .../altair/rewards_and_penalties.rs | 37 +- .../src/per_epoch_processing/base.rs | 7 +- .../base/rewards_and_penalties.rs | 6 +- .../{ => base}/validator_statuses.rs | 0 .../epoch_processing_summary.rs | 280 ++++++++++++ .../src/per_epoch_processing/errors.rs | 8 + consensus/types/src/beacon_state.rs | 56 +-- .../ef_tests/src/cases/epoch_processing.rs | 28 +- testing/ef_tests/src/cases/rewards.rs | 21 +- 26 files changed, 1066 insertions(+), 346 deletions(-) create mode 100644 consensus/state_processing/src/metrics.rs create mode 100644 consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs rename consensus/state_processing/src/per_epoch_processing/{ => base}/validator_statuses.rs (100%) create mode 100644 consensus/state_processing/src/per_epoch_processing/epoch_processing_summary.rs diff --git a/Cargo.lock b/Cargo.lock index 79161b34888..2b60dfcebc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6301,6 +6301,7 @@ dependencies = [ "integer-sqrt", "itertools 0.10.1", "lazy_static", + "lighthouse_metrics", "log", "merkle_proof", "rayon", diff --git a/Makefile b/Makefile index 0e2cdee7141..db88ae2bb01 100644 --- a/Makefile +++ b/Makefile @@ -95,7 +95,7 @@ cargo-fmt: check-benches: cargo check --workspace --benches -# Typechecks consensus code *without* allowing deprecated legacy arithmetic +# Typechecks consensus code *without* allowing deprecated legacy arithmetic or metrics. check-consensus: cargo check --manifest-path=consensus/state_processing/Cargo.toml --no-default-features diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index adeb7d1a413..934c852540a 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -58,14 +58,11 @@ use slot_clock::SlotClock; use ssz::Encode; use state_processing::{ block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError}, - per_block_processing, - per_epoch_processing::EpochProcessingSummary, - per_slot_processing, + per_block_processing, per_slot_processing, state_advance::partial_state_advance, BlockProcessingError, BlockSignatureStrategy, SlotProcessingError, }; use std::borrow::Cow; -use std::convert::TryFrom; use std::fs; use std::io::Write; use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp}; @@ -972,12 +969,19 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { }; if let Some(summary) = per_slot_processing(&mut state, Some(state_root), &chain.spec)? { - summaries.push(summary) + // Expose Prometheus metrics. + if let Err(e) = summary.observe_metrics() { + error!( + chain.log, + "Failed to observe epoch summary metrics"; + "src" => "block_verification", + "error" => ?e + ); + } + summaries.push(summary); } } - expose_participation_metrics(&summaries); - // If the block is sufficiently recent, notify the validator monitor. if let Some(slot) = chain.slot_clock.now() { let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); @@ -991,7 +995,15 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { // performing `per_slot_processing`. for (i, summary) in summaries.iter().enumerate() { let epoch = state.current_epoch() - Epoch::from(summaries.len() - i); - validator_monitor.process_validator_statuses(epoch, &summary.statuses); + if let Err(e) = + validator_monitor.process_validator_statuses(epoch, &summary, &chain.spec) + { + error!( + chain.log, + "Failed to process validator statuses"; + "error" => ?e + ); + } } } } @@ -1442,45 +1454,6 @@ fn verify_header_signature( } } -fn expose_participation_metrics(summaries: &[EpochProcessingSummary]) { - if !cfg!(feature = "participation_metrics") { - return; - } - - for summary in summaries { - let b = &summary.total_balances; - - metrics::maybe_set_float_gauge( - &metrics::PARTICIPATION_PREV_EPOCH_ATTESTER, - participation_ratio(b.previous_epoch_attesters(), b.previous_epoch()), - ); - - metrics::maybe_set_float_gauge( - &metrics::PARTICIPATION_PREV_EPOCH_TARGET_ATTESTER, - participation_ratio(b.previous_epoch_target_attesters(), b.previous_epoch()), - ); - - metrics::maybe_set_float_gauge( - &metrics::PARTICIPATION_PREV_EPOCH_HEAD_ATTESTER, - participation_ratio(b.previous_epoch_head_attesters(), b.previous_epoch()), - ); - } -} - -fn participation_ratio(section: u64, total: u64) -> Option { - // Reduce the precision to help ensure we fit inside a u32. - const PRECISION: u64 = 100_000_000; - - let section: f64 = u32::try_from(section / PRECISION).ok()?.into(); - let total: f64 = u32::try_from(total / PRECISION).ok()?.into(); - - if total > 0_f64 { - Some(section / total) - } else { - None - } -} - fn write_state(prefix: &str, state: &BeaconState, log: &Logger) { if WRITE_BLOCK_PROCESSING_SSZ { let root = state.tree_hash_root(); diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index e0d7052f8e4..213c770a2f5 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -330,21 +330,6 @@ lazy_static! { pub static ref OP_POOL_NUM_SYNC_CONTRIBUTIONS: Result = try_create_int_gauge("beacon_op_pool_sync_contributions_total", "Count of sync contributions in the op pool"); - /* - * Participation Metrics - */ - pub static ref PARTICIPATION_PREV_EPOCH_ATTESTER: Result = try_create_float_gauge( - "beacon_participation_prev_epoch_attester", - "Ratio of attesting balances to total balances" - ); - pub static ref PARTICIPATION_PREV_EPOCH_TARGET_ATTESTER: Result = try_create_float_gauge( - "beacon_participation_prev_epoch_target_attester", - "Ratio of target-attesting balances to total balances" - ); - pub static ref PARTICIPATION_PREV_EPOCH_HEAD_ATTESTER: Result = try_create_float_gauge( - "beacon_participation_prev_epoch_head_attester", - "Ratio of head-attesting balances to total balances" - ); /* * Attestation Observation Metrics diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index eea329a2a3d..d3cacf658ca 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -233,15 +233,32 @@ fn advance_head( if let Some(summary) = per_slot_processing(&mut state, state_root, &beacon_chain.spec) .map_err(BeaconChainError::from)? { + // Expose Prometheus metrics. + if let Err(e) = summary.observe_metrics() { + error!( + log, + "Failed to observe epoch summary metrics"; + "src" => "state_advance_timer", + "error" => ?e + ); + } + // Only notify the validator monitor for recent blocks. if state.current_epoch() + VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 >= current_slot.epoch(T::EthSpec::slots_per_epoch()) { // Potentially create logs/metrics for locally monitored validators. - beacon_chain + if let Err(e) = beacon_chain .validator_monitor .read() - .process_validator_statuses(state.current_epoch(), &summary.statuses); + .process_validator_statuses(state.current_epoch(), &summary, &beacon_chain.spec) + { + error!( + log, + "Unable to process validator statuses"; + "error" => ?e + ); + } } } diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index 3022dd5c92e..871e817b53b 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -6,7 +6,9 @@ use crate::metrics; use parking_lot::RwLock; use slog::{crit, error, info, warn, Logger}; use slot_clock::SlotClock; -use state_processing::per_epoch_processing::ValidatorStatus; +use state_processing::per_epoch_processing::{ + errors::EpochProcessingError, EpochProcessingSummary, +}; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::io; @@ -326,7 +328,12 @@ impl ValidatorMonitor { } } - pub fn process_validator_statuses(&self, epoch: Epoch, summaries: &[ValidatorStatus]) { + pub fn process_validator_statuses( + &self, + epoch: Epoch, + summary: &EpochProcessingSummary, + spec: &ChainSpec, + ) -> Result<(), EpochProcessingError> { for monitored_validator in self.validators.values() { // We subtract two from the state of the epoch that generated these summaries. // @@ -338,93 +345,123 @@ impl ValidatorMonitor { let i = i as usize; let id = &monitored_validator.id; - if let Some(summary) = summaries.get(i) { - if summary.is_previous_epoch_attester { - let lag = summary - .inclusion_info - .map(|i| format!("{} slot(s)", i.delay.saturating_sub(1).to_string())) - .unwrap_or_else(|| "??".to_string()); + /* + * These metrics are reflected differently between Base and Altair. + * + * For Base, any attestation that is included on-chain will match the source. + * + * However, in Altair, only attestations that are "timely" are registered as + * matching the source. + */ + + let previous_epoch_active = summary.is_active_unslashed_in_previous_epoch(i); + let previous_epoch_matched_source = summary.is_previous_epoch_source_attester(i)?; + let previous_epoch_matched_target = summary.is_previous_epoch_target_attester(i)?; + let previous_epoch_matched_head = summary.is_previous_epoch_head_attester(i)?; + let previous_epoch_matched_any = previous_epoch_matched_source + || previous_epoch_matched_target + || previous_epoch_matched_head; + + if !previous_epoch_active { + // Monitored validator is not active, due to awaiting activation + // or being exited/withdrawn. Do not attempt to report on its + // attestations. + continue; + } - info!( - self.log, - "Previous epoch attestation success"; - "inclusion_lag" => lag, - "matched_target" => summary.is_previous_epoch_target_attester, - "matched_head" => summary.is_previous_epoch_head_attester, - "epoch" => prev_epoch, - "validator" => id, - ); - } else if summary.is_active_in_previous_epoch - && !summary.is_previous_epoch_attester - { - error!( - self.log, - "Previous epoch attestation missing"; - "epoch" => prev_epoch, - "validator" => id, - ) - } else if !summary.is_active_in_previous_epoch { - // Monitored validator is not active, due to awaiting activation - // or being exited/withdrawn. Do not attempt to report on its - // attestations. - continue; - } + // Indicates if any attestation made it on-chain. + // + // For Base states, this will be *any* attestation whatsoever. For Altair states, + // this will be any attestation that matched a "timely" flag. + if previous_epoch_matched_any { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_HIT, + &[id], + ); + info!( + self.log, + "Previous epoch attestation success"; + "matched_target" => previous_epoch_matched_target, + "matched_head" => previous_epoch_matched_head, + "epoch" => prev_epoch, + "validator" => id, + + ) + } else { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_MISS, + &[id], + ); + error!( + self.log, + "Previous epoch attestation missing"; + "epoch" => prev_epoch, + "validator" => id, + ) + } - if summary.is_previous_epoch_attester { - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_HIT, - &[id], - ); - } else { - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_MISS, - &[id], - ); - } - if summary.is_previous_epoch_head_attester { - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_HIT, - &[id], - ); - } else { - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_MISS, - &[id], - ); - warn!( - self.log, - "Attested to an incorrect head"; - "epoch" => prev_epoch, - "validator" => id, - ); - } - if summary.is_previous_epoch_target_attester { - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_HIT, - &[id], - ); - } else { - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_MISS, - &[id], - ); + // Indicates if any on-chain attestation hit the head. + if previous_epoch_matched_head { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_HIT, + &[id], + ); + } else { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_MISS, + &[id], + ); + warn!( + self.log, + "Attested to an incorrect head"; + "epoch" => prev_epoch, + "validator" => id, + ); + } + + // Indicates if any on-chain attestation hit the target. + if previous_epoch_matched_target { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_HIT, + &[id], + ); + } else { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_MISS, + &[id], + ); + warn!( + self.log, + "Attested to an incorrect target"; + "epoch" => prev_epoch, + "validator" => id, + ); + } + + // For pre-Altair, state the inclusion distance. This information is not retained in + // the Altair state. + if let Some(inclusion_info) = summary.previous_epoch_inclusion_info(i) { + if inclusion_info.delay > spec.min_attestation_inclusion_delay { warn!( self.log, - "Attested to an incorrect target"; + "Sub-optimal inclusion delay"; + "optimal" => spec.min_attestation_inclusion_delay, + "delay" => inclusion_info.delay, "epoch" => prev_epoch, "validator" => id, ); } - if let Some(inclusion_info) = summary.inclusion_info { - metrics::set_int_gauge( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_INCLUSION_DISTANCE, - &[id], - inclusion_info.delay as i64, - ); - } + + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_INCLUSION_DISTANCE, + &[id], + inclusion_info.delay as i64, + ); } } } + + Ok(()) } fn get_validator_id(&self, validator_index: u64) -> Option<&str> { diff --git a/beacon_node/http_api/src/validator_inclusion.rs b/beacon_node/http_api/src/validator_inclusion.rs index cdd2a51621b..9131d698fc3 100644 --- a/beacon_node/http_api/src/validator_inclusion.rs +++ b/beacon_node/http_api/src/validator_inclusion.rs @@ -4,35 +4,59 @@ use eth2::{ lighthouse::{GlobalValidatorInclusionData, ValidatorInclusionData}, types::ValidatorId, }; -use state_processing::per_epoch_processing::ValidatorStatuses; -use types::{Epoch, EthSpec}; +use state_processing::per_epoch_processing::{ + altair::participation_cache::Error as ParticipationCacheError, process_epoch, + EpochProcessingSummary, +}; +use types::{BeaconState, ChainSpec, Epoch, EthSpec}; -/// Returns information about *all validators* (i.e., global) and how they performed during a given -/// epoch. -pub fn global_validator_inclusion_data( +/// Returns the state in the last slot of `epoch`. +fn end_of_epoch_state( epoch: Epoch, chain: &BeaconChain, -) -> Result { +) -> Result, warp::reject::Rejection> { let target_slot = epoch.end_slot(T::EthSpec::slots_per_epoch()); + StateId::slot(target_slot).state(chain) +} - let state = StateId::slot(target_slot).state(chain)?; +/// Generate an `EpochProcessingSummary` for `state`. +/// +/// ## Notes +/// +/// Will mutate `state`, transitioning it to the next epoch. +fn get_epoch_processing_summary( + state: &mut BeaconState, + spec: &ChainSpec, +) -> Result { + process_epoch(state, spec) + .map_err(|e| warp_utils::reject::custom_server_error(format!("{:?}", e))) +} - let mut validator_statuses = ValidatorStatuses::new(&state, &chain.spec) - .map_err(warp_utils::reject::beacon_state_error)?; - validator_statuses - .process_attestations(&state) - .map_err(warp_utils::reject::beacon_state_error)?; +fn convert_cache_error(error: ParticipationCacheError) -> warp::reject::Rejection { + warp_utils::reject::custom_server_error(format!("{:?}", error)) +} - let totals = validator_statuses.total_balances; +/// Returns information about *all validators* (i.e., global) and how they performed during a given +/// epoch. +pub fn global_validator_inclusion_data( + epoch: Epoch, + chain: &BeaconChain, +) -> Result { + let mut state = end_of_epoch_state(epoch, chain)?; + let summary = get_epoch_processing_summary(&mut state, &chain.spec)?; Ok(GlobalValidatorInclusionData { - current_epoch_active_gwei: totals.current_epoch(), - previous_epoch_active_gwei: totals.previous_epoch(), - current_epoch_attesting_gwei: totals.current_epoch_attesters(), - current_epoch_target_attesting_gwei: totals.current_epoch_target_attesters(), - previous_epoch_attesting_gwei: totals.previous_epoch_attesters(), - previous_epoch_target_attesting_gwei: totals.previous_epoch_target_attesters(), - previous_epoch_head_attesting_gwei: totals.previous_epoch_head_attesters(), + current_epoch_active_gwei: summary.current_epoch_total_active_balance(), + previous_epoch_active_gwei: summary.previous_epoch_total_active_balance(), + current_epoch_target_attesting_gwei: summary + .current_epoch_target_attesting_balance() + .map_err(convert_cache_error)?, + previous_epoch_target_attesting_gwei: summary + .previous_epoch_target_attesting_balance() + .map_err(convert_cache_error)?, + previous_epoch_head_attesting_gwei: summary + .previous_epoch_head_attesting_balance() + .map_err(convert_cache_error)?, }) } @@ -42,15 +66,7 @@ pub fn validator_inclusion_data( validator_id: &ValidatorId, chain: &BeaconChain, ) -> Result, warp::Rejection> { - let target_slot = epoch.end_slot(T::EthSpec::slots_per_epoch()); - - let mut state = StateId::slot(target_slot).state(chain)?; - - let mut validator_statuses = ValidatorStatuses::new(&state, &chain.spec) - .map_err(warp_utils::reject::beacon_state_error)?; - validator_statuses - .process_attestations(&state) - .map_err(warp_utils::reject::beacon_state_error)?; + let mut state = end_of_epoch_state(epoch, chain)?; state .update_pubkey_cache() @@ -70,19 +86,31 @@ pub fn validator_inclusion_data( } }; - Ok(validator_statuses - .statuses - .get(validator_index) - .map(|vote| ValidatorInclusionData { - is_slashed: vote.is_slashed, - is_withdrawable_in_current_epoch: vote.is_withdrawable_in_current_epoch, - is_active_in_current_epoch: vote.is_active_in_current_epoch, - is_active_in_previous_epoch: vote.is_active_in_previous_epoch, - current_epoch_effective_balance_gwei: vote.current_epoch_effective_balance, - is_current_epoch_attester: vote.is_current_epoch_attester, - is_current_epoch_target_attester: vote.is_current_epoch_target_attester, - is_previous_epoch_attester: vote.is_previous_epoch_attester, - is_previous_epoch_target_attester: vote.is_previous_epoch_target_attester, - is_previous_epoch_head_attester: vote.is_previous_epoch_head_attester, - })) + // Obtain the validator *before* transitioning the state into the next epoch. + let validator = if let Ok(validator) = state.get_validator(validator_index) { + validator.clone() + } else { + return Ok(None); + }; + + let summary = get_epoch_processing_summary(&mut state, &chain.spec)?; + + Ok(Some(ValidatorInclusionData { + is_slashed: validator.slashed, + is_withdrawable_in_current_epoch: validator.is_withdrawable_at(epoch), + is_active_unslashed_in_current_epoch: summary + .is_active_unslashed_in_current_epoch(validator_index), + is_active_unslashed_in_previous_epoch: summary + .is_active_unslashed_in_previous_epoch(validator_index), + current_epoch_effective_balance_gwei: validator.effective_balance, + is_current_epoch_target_attester: summary + .is_current_epoch_target_attester(validator_index) + .map_err(convert_cache_error)?, + is_previous_epoch_target_attester: summary + .is_previous_epoch_target_attester(validator_index) + .map_err(convert_cache_error)?, + is_previous_epoch_head_attester: summary + .is_previous_epoch_head_attester(validator_index) + .map_err(convert_cache_error)?, + })) } diff --git a/book/src/validator-inclusion.md b/book/src/validator-inclusion.md index ce8e61cafee..72e2e379c72 100644 --- a/book/src/validator-inclusion.md +++ b/book/src/validator-inclusion.md @@ -52,14 +52,9 @@ The following fields are returned: - `current_epoch_active_gwei`: the total staked gwei that was active (i.e., able to vote) during the current epoch. -- `current_epoch_attesting_gwei`: the total staked gwei that had one or more - attestations included in a block during the current epoch (multiple - attestations by the same validator do not increase this figure). - `current_epoch_target_attesting_gwei`: the total staked gwei that attested to - the majority-elected Casper FFG target epoch during the current epoch. This - figure must be equal to or less than `current_epoch_attesting_gwei`. -- `previous_epoch_active_gwei`: as above, but during the previous epoch. -- `previous_epoch_attesting_gwei`: see `current_epoch_attesting_gwei`. + the majority-elected Casper FFG target epoch during the current epoch. +- `previous_epoch_active_gwei`: as per `current_epoch_active_gwei`, but during the previous epoch. - `previous_epoch_target_attesting_gwei`: see `current_epoch_target_attesting_gwei`. - `previous_epoch_head_attesting_gwei`: the total staked gwei that attested to a head beacon block that is in the canonical chain. @@ -91,9 +86,7 @@ curl -X GET "http://localhost:5052/lighthouse/validator_inclusion/0/global" -H "data": { "current_epoch_active_gwei": 642688000000000, "previous_epoch_active_gwei": 642688000000000, - "current_epoch_attesting_gwei": 366208000000000, "current_epoch_target_attesting_gwei": 366208000000000, - "previous_epoch_attesting_gwei": 1000000000, "previous_epoch_target_attesting_gwei": 1000000000, "previous_epoch_head_attesting_gwei": 1000000000 } @@ -121,12 +114,10 @@ curl -X GET "http://localhost:5052/lighthouse/validator_inclusion/0/42" -H "acc "data": { "is_slashed": false, "is_withdrawable_in_current_epoch": false, - "is_active_in_current_epoch": true, - "is_active_in_previous_epoch": true, + "is_active_unslashed_in_current_epoch": true, + "is_active_unslashed_in_previous_epoch": true, "current_epoch_effective_balance_gwei": 32000000000, - "is_current_epoch_attester": false, "is_current_epoch_target_attester": false, - "is_previous_epoch_attester": false, "is_previous_epoch_target_attester": false, "is_previous_epoch_head_attester": false } diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 716ac41e523..70c5fa2b32e 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -32,13 +32,9 @@ pub struct GlobalValidatorInclusionData { pub current_epoch_active_gwei: u64, /// The total effective balance of all active validators during the _previous_ epoch. pub previous_epoch_active_gwei: u64, - /// The total effective balance of all validators who attested during the _current_ epoch. - pub current_epoch_attesting_gwei: u64, /// The total effective balance of all validators who attested during the _current_ epoch and /// agreed with the state about the beacon block at the first slot of the _current_ epoch. pub current_epoch_target_attesting_gwei: u64, - /// The total effective balance of all validators who attested during the _previous_ epoch. - pub previous_epoch_attesting_gwei: u64, /// The total effective balance of all validators who attested during the _previous_ epoch and /// agreed with the state about the beacon block at the first slot of the _previous_ epoch. pub previous_epoch_target_attesting_gwei: u64, @@ -53,19 +49,15 @@ pub struct ValidatorInclusionData { pub is_slashed: bool, /// True if the validator can withdraw in the current epoch. pub is_withdrawable_in_current_epoch: bool, - /// True if the validator was active in the state's _current_ epoch. - pub is_active_in_current_epoch: bool, - /// True if the validator was active in the state's _previous_ epoch. - pub is_active_in_previous_epoch: bool, + /// True if the validator was active and not slashed in the state's _current_ epoch. + pub is_active_unslashed_in_current_epoch: bool, + /// True if the validator was active and not slashed in the state's _previous_ epoch. + pub is_active_unslashed_in_previous_epoch: bool, /// The validator's effective balance in the _current_ epoch. pub current_epoch_effective_balance_gwei: u64, - /// True if the validator had an attestation included in the _current_ epoch. - pub is_current_epoch_attester: bool, /// True if the validator's beacon block root attestation for the first slot of the _current_ /// epoch matches the block root known to the state. pub is_current_epoch_target_attester: bool, - /// True if the validator had an attestation included in the _previous_ epoch. - pub is_previous_epoch_attester: bool, /// True if the validator's beacon block root attestation for the first slot of the _previous_ /// epoch matches the block root known to the state. pub is_previous_epoch_target_attester: bool, diff --git a/consensus/state_processing/Cargo.toml b/consensus/state_processing/Cargo.toml index 63f8b448686..2d1c43b5cd3 100644 --- a/consensus/state_processing/Cargo.toml +++ b/consensus/state_processing/Cargo.toml @@ -29,11 +29,14 @@ eth2_hashing = "0.1.0" int_to_bytes = { path = "../int_to_bytes" } smallvec = "1.6.1" arbitrary = { version = "0.4.6", features = ["derive"], optional = true } +lighthouse_metrics = { path = "../../common/lighthouse_metrics", optional = true } +lazy_static = { version = "1.4.0", optional = true } [features] -default = ["legacy-arith"] +default = ["legacy-arith", "metrics"] fake_crypto = ["bls/fake_crypto"] legacy-arith = ["types/legacy-arith"] +metrics = ["lighthouse_metrics", "lazy_static"] arbitrary-fuzz = [ "arbitrary", "types/arbitrary-fuzz", diff --git a/consensus/state_processing/src/lib.rs b/consensus/state_processing/src/lib.rs index 91959cd866b..18fee2e2c3b 100644 --- a/consensus/state_processing/src/lib.rs +++ b/consensus/state_processing/src/lib.rs @@ -14,6 +14,7 @@ #[macro_use] mod macros; +mod metrics; pub mod common; pub mod genesis; diff --git a/consensus/state_processing/src/metrics.rs b/consensus/state_processing/src/metrics.rs new file mode 100644 index 00000000000..969bc12a18b --- /dev/null +++ b/consensus/state_processing/src/metrics.rs @@ -0,0 +1,30 @@ +#[cfg(feature = "metrics")] +pub use only_if_enabled::*; + +#[cfg(feature = "metrics")] +pub mod only_if_enabled { + use lazy_static::lazy_static; + pub use lighthouse_metrics::*; + + lazy_static! { + /* + * Participation Metrics + */ + pub static ref PARTICIPATION_PREV_EPOCH_HEAD_ATTESTING_GWEI_TOTAL: Result = try_create_int_gauge( + "beacon_participation_prev_epoch_head_attesting_gwei_total", + "Total effective balance (gwei) of validators who attested to the head in the previous epoch" + ); + pub static ref PARTICIPATION_PREV_EPOCH_TARGET_ATTESTING_GWEI_TOTAL: Result = try_create_int_gauge( + "beacon_participation_prev_epoch_target_attesting_gwei_total", + "Total effective balance (gwei) of validators who attested to the target in the previous epoch" + ); + pub static ref PARTICIPATION_PREV_EPOCH_SOURCE_ATTESTING_GWEI_TOTAL: Result = try_create_int_gauge( + "beacon_participation_prev_epoch_source_attesting_gwei_total", + "Total effective balance (gwei) of validators who attested to the source in the previous epoch" + ); + pub static ref PARTICIPATION_PREV_EPOCH_ACTIVE_GWEI_TOTAL: Result = try_create_int_gauge( + "beacon_participation_prev_epoch_active_gwei_total", + "Total effective balance (gwei) of validators active in the previous epoch" + ); + } +} diff --git a/consensus/state_processing/src/per_epoch_processing.rs b/consensus/state_processing/src/per_epoch_processing.rs index 4c659cfff83..0ef91335d3b 100644 --- a/consensus/state_processing/src/per_epoch_processing.rs +++ b/consensus/state_processing/src/per_epoch_processing.rs @@ -1,7 +1,7 @@ #![deny(clippy::wildcard_imports)] // FIXME(altair): refactor to remove phase0/base structs, including `EpochProcessingSummary` -pub use base::{TotalBalances, ValidatorStatus, ValidatorStatuses}; +pub use epoch_processing_summary::EpochProcessingSummary; use errors::EpochProcessingError as Error; pub use registry_updates::process_registry_updates; use safe_arith::SafeArith; @@ -12,22 +12,15 @@ pub use weigh_justification_and_finalization::weigh_justification_and_finalizati pub mod altair; pub mod base; pub mod effective_balance_updates; +pub mod epoch_processing_summary; pub mod errors; pub mod historical_roots_update; pub mod registry_updates; pub mod resets; pub mod slashings; pub mod tests; -pub mod validator_statuses; pub mod weigh_justification_and_finalization; -/// Provides a summary of validator participation during the epoch. -#[derive(PartialEq, Debug)] -pub struct EpochProcessingSummary { - pub total_balances: TotalBalances, - pub statuses: Vec, -} - /// Performs per-epoch processing on some BeaconState. /// /// Mutates the given `BeaconState`, returning early if an error is encountered. If an error is diff --git a/consensus/state_processing/src/per_epoch_processing/altair.rs b/consensus/state_processing/src/per_epoch_processing/altair.rs index 79a72118cba..dd93ccab216 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair.rs @@ -3,10 +3,10 @@ use crate::per_epoch_processing::{ effective_balance_updates::process_effective_balance_updates, historical_roots_update::process_historical_roots_update, resets::{process_eth1_data_reset, process_randao_mixes_reset, process_slashings_reset}, - validator_statuses::ValidatorStatuses, }; pub use inactivity_updates::process_inactivity_updates; pub use justification_and_finalization::process_justification_and_finalization; +pub use participation_cache::ParticipationCache; pub use participation_flag_updates::process_participation_flag_updates; pub use rewards_and_penalties::process_rewards_and_penalties; pub use sync_committee_updates::process_sync_committee_updates; @@ -14,6 +14,7 @@ use types::{BeaconState, ChainSpec, EthSpec, RelativeEpoch}; pub mod inactivity_updates; pub mod justification_and_finalization; +pub mod participation_cache; pub mod participation_flag_updates; pub mod rewards_and_penalties; pub mod sync_committee_updates; @@ -27,13 +28,16 @@ pub fn process_epoch( state.build_committee_cache(RelativeEpoch::Current, spec)?; state.build_committee_cache(RelativeEpoch::Next, spec)?; + // Pre-compute participating indices and total balances. + let participation_cache = ParticipationCache::new(state, spec)?; + // Justification and finalization. - process_justification_and_finalization(state, spec)?; + process_justification_and_finalization(state, &participation_cache)?; - process_inactivity_updates(state, spec)?; + process_inactivity_updates(state, &participation_cache, spec)?; // Rewards and Penalties. - process_rewards_and_penalties(state, spec)?; + process_rewards_and_penalties(state, &participation_cache, spec)?; // Registry Updates. process_registry_updates(state, spec)?; @@ -41,7 +45,7 @@ pub fn process_epoch( // Slashings. process_slashings( state, - state.get_total_active_balance(spec)?, + participation_cache.current_epoch_total_active_balance(), spec.proportional_slashing_multiplier_altair, spec, )?; @@ -69,14 +73,7 @@ pub fn process_epoch( // Rotate the epoch caches to suit the epoch transition. state.advance_caches()?; - // FIXME(altair): this is an incorrect dummy value, we should think harder - // about how we want to unify validator statuses between phase0 & altair. - // We should benchmark the new state transition and work out whether Altair could - // be accelerated by some similar cache. - let validator_statuses = ValidatorStatuses::new(state, spec)?; - - Ok(EpochProcessingSummary { - total_balances: validator_statuses.total_balances, - statuses: validator_statuses.statuses, + Ok(EpochProcessingSummary::Altair { + participation_cache, }) } diff --git a/consensus/state_processing/src/per_epoch_processing/altair/inactivity_updates.rs b/consensus/state_processing/src/per_epoch_processing/altair/inactivity_updates.rs index cc629c1ef09..038fe770440 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/inactivity_updates.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/inactivity_updates.rs @@ -1,3 +1,4 @@ +use super::ParticipationCache; use crate::EpochProcessingError; use core::result::Result; use core::result::Result::Ok; @@ -10,6 +11,7 @@ use types::eth_spec::EthSpec; pub fn process_inactivity_updates( state: &mut BeaconState, + participation_cache: &ParticipationCache, spec: &ChainSpec, ) -> Result<(), EpochProcessingError> { // Score updates based on previous epoch participation, skip genesis epoch @@ -17,15 +19,12 @@ pub fn process_inactivity_updates( return Ok(()); } - let unslashed_indices = state.get_unslashed_participating_indices( - TIMELY_TARGET_FLAG_INDEX, - state.previous_epoch(), - spec, - )?; + let unslashed_indices = participation_cache + .get_unslashed_participating_indices(TIMELY_TARGET_FLAG_INDEX, state.previous_epoch())?; - for index in state.get_eligible_validator_indices()? { + for &index in participation_cache.eligible_validator_indices() { // Increase inactivity score of inactive validators - if unslashed_indices.contains(&index) { + if unslashed_indices.contains(index)? { let inactivity_score = state.get_inactivity_score_mut(index)?; inactivity_score.safe_sub_assign(min(1, *inactivity_score))?; } else { diff --git a/consensus/state_processing/src/per_epoch_processing/altair/justification_and_finalization.rs b/consensus/state_processing/src/per_epoch_processing/altair/justification_and_finalization.rs index 13e14d4d8cd..f47d9c0e688 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/justification_and_finalization.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/justification_and_finalization.rs @@ -1,13 +1,14 @@ +use super::ParticipationCache; use crate::per_epoch_processing::weigh_justification_and_finalization; use crate::per_epoch_processing::Error; use safe_arith::SafeArith; use types::consts::altair::TIMELY_TARGET_FLAG_INDEX; -use types::{BeaconState, ChainSpec, EthSpec}; +use types::{BeaconState, EthSpec}; /// Update the justified and finalized checkpoints for matching target attestations. pub fn process_justification_and_finalization( state: &mut BeaconState, - spec: &ChainSpec, + participation_cache: &ParticipationCache, ) -> Result<(), Error> { if state.current_epoch() <= T::genesis_epoch().safe_add(1)? { return Ok(()); @@ -15,21 +16,13 @@ pub fn process_justification_and_finalization( let previous_epoch = state.previous_epoch(); let current_epoch = state.current_epoch(); - let previous_indices = state.get_unslashed_participating_indices( - TIMELY_TARGET_FLAG_INDEX, - previous_epoch, - spec, - )?; - let current_indices = - state.get_unslashed_participating_indices(TIMELY_TARGET_FLAG_INDEX, current_epoch, spec)?; - let total_active_balance = state.get_total_balance( - state - .get_active_validator_indices(current_epoch, spec)? - .as_slice(), - spec, - )?; - let previous_target_balance = state.get_total_balance(&previous_indices, spec)?; - let current_target_balance = state.get_total_balance(¤t_indices, spec)?; + let previous_indices = participation_cache + .get_unslashed_participating_indices(TIMELY_TARGET_FLAG_INDEX, previous_epoch)?; + let current_indices = participation_cache + .get_unslashed_participating_indices(TIMELY_TARGET_FLAG_INDEX, current_epoch)?; + let total_active_balance = participation_cache.current_epoch_total_active_balance(); + let previous_target_balance = previous_indices.total_balance()?; + let current_target_balance = current_indices.total_balance()?; weigh_justification_and_finalization( state, total_active_balance, diff --git a/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs b/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs new file mode 100644 index 00000000000..3d1cf124f48 --- /dev/null +++ b/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs @@ -0,0 +1,409 @@ +//! Provides the `ParticipationCache`, a custom Lighthouse cache which attempts to reduce CPU and +//! memory usage by: +//! +//! - Caching a map of `validator_index -> participation_flags` for all active validators in the +//! previous and current epochs. +//! - Caching the total balances of: +//! - All active validators. +//! - All active validators matching each of the three "timely" flags. +//! - Caching the "eligible" validators. +//! +//! Additionally, this cache is returned from the `altair::process_epoch` function and can be used +//! to get useful summaries about the validator participation in an epoch. + +use safe_arith::{ArithError, SafeArith}; +use std::collections::HashMap; +use types::{ + consts::altair::{ + NUM_FLAG_INDICES, TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX, + TIMELY_TARGET_FLAG_INDEX, + }, + BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, ParticipationFlags, RelativeEpoch, +}; + +#[derive(Debug, PartialEq)] +pub enum Error { + InvalidFlagIndex(usize), +} + +/// A balance which will never be below the specified `minimum`. +/// +/// This is an effort to ensure the `EFFECTIVE_BALANCE_INCREMENT` minimum is always respected. +#[derive(PartialEq, Debug, Clone, Copy)] +struct Balance { + raw: u64, + minimum: u64, +} + +impl Balance { + /// Initialize the balance to `0`, or the given `minimum`. + pub fn zero(minimum: u64) -> Self { + Self { raw: 0, minimum } + } + + /// Returns the balance with respect to the initialization `minimum`. + pub fn get(&self) -> u64 { + std::cmp::max(self.raw, self.minimum) + } + + /// Add-assign to the balance. + pub fn safe_add_assign(&mut self, other: u64) -> Result<(), ArithError> { + self.raw.safe_add_assign(other) + } +} + +/// Caches the participation values for one epoch (either the previous or current). +#[derive(PartialEq, Debug)] +struct SingleEpochParticipationCache { + /// Maps an active validator index to their participation flags. + /// + /// To reiterate, only active and unslashed validator indices are stored in this map. + /// + /// ## Note + /// + /// It would be ideal to maintain a reference to the `BeaconState` here rather than copying the + /// `ParticipationFlags`, however that would cause us to run into mutable reference limitations + /// upstream. + unslashed_participating_indices: HashMap, + /// Stores the sum of the balances for all validators in `self.unslashed_participating_indices` + /// for all flags in `NUM_FLAG_INDICES`. + /// + /// A flag balance is only incremented if a validator is that flag set. + total_flag_balances: [Balance; NUM_FLAG_INDICES], + /// Stores the sum of all balances of all validators in `self.unslashed_participating_indices` + /// (regardless of which flags are set). + total_active_balance: Balance, +} + +impl SingleEpochParticipationCache { + fn new(hashmap_len: usize, spec: &ChainSpec) -> Self { + let zero_balance = Balance::zero(spec.effective_balance_increment); + + Self { + unslashed_participating_indices: HashMap::with_capacity(hashmap_len), + total_flag_balances: [zero_balance; NUM_FLAG_INDICES], + total_active_balance: zero_balance, + } + } + + /// Shrink any allocations to the smallest necessary size. + fn shrink_to_fit(&mut self) { + self.unslashed_participating_indices.shrink_to_fit() + } + + /// Returns the total balance of attesters who have `flag_index` set. + fn total_flag_balance(&self, flag_index: usize) -> Result { + self.total_flag_balances + .get(flag_index) + .map(Balance::get) + .ok_or(Error::InvalidFlagIndex(flag_index)) + } + + /// Returns `true` if `val_index` is active, unslashed and has `flag_index` set. + /// + /// ## Errors + /// + /// May return an error if `flag_index` is out-of-bounds. + fn has_flag(&self, val_index: usize, flag_index: usize) -> Result { + if let Some(participation_flags) = self.unslashed_participating_indices.get(&val_index) { + participation_flags + .has_flag(flag_index) + .map_err(|_| Error::InvalidFlagIndex(flag_index)) + } else { + Ok(false) + } + } + + /// Process an **active** validator, reading from the `state` with respect to the + /// `relative_epoch`. + /// + /// ## Errors + /// + /// - The provided `state` **must** be Altair. An error will be returned otherwise. + /// + /// ## Warning + /// + /// - It is a logic error to provide an inactive validator to this function. + fn process_active_validator( + &mut self, + val_index: usize, + state: &BeaconState, + relative_epoch: RelativeEpoch, + ) -> Result<(), BeaconStateError> { + let val_balance = state.get_effective_balance(val_index)?; + + let epoch_participation = match relative_epoch { + RelativeEpoch::Current => state.current_epoch_participation(), + RelativeEpoch::Previous => state.previous_epoch_participation(), + _ => Err(BeaconStateError::EpochOutOfBounds), + }? + .get(val_index) + .ok_or(BeaconStateError::ParticipationOutOfBounds(val_index))?; + + // All active validators increase the total active balance. + self.total_active_balance.safe_add_assign(val_balance)?; + + // Only unslashed validators may proceed. + if state.get_validator(val_index)?.slashed { + return Ok(()); + } + + // Add their `ParticipationFlags` to the map. + self.unslashed_participating_indices + .insert(val_index, *epoch_participation); + + // Iterate through all the flags and increment the total flag balances for whichever flags + // are set for `val_index`. + for (flag, balance) in self.total_flag_balances.iter_mut().enumerate() { + if epoch_participation.has_flag(flag)? { + balance.safe_add_assign(val_balance)?; + } + } + + Ok(()) + } +} + +/// Maintains a cache to be used during `altair::process_epoch`. +#[derive(PartialEq, Debug)] +pub struct ParticipationCache { + current_epoch: Epoch, + /// Caches information about active validators pertaining to `self.current_epoch`. + current_epoch_participation: SingleEpochParticipationCache, + previous_epoch: Epoch, + /// Caches information about active validators pertaining to `self.previous_epoch`. + previous_epoch_participation: SingleEpochParticipationCache, + /// Caches the result of the `get_eligible_validator_indices` function. + eligible_indices: Vec, +} + +impl ParticipationCache { + /// Instantiate `Self`, returning a fully initialized cache. + /// + /// ## Errors + /// + /// - The provided `state` **must** be an Altair state. An error will be returned otherwise. + pub fn new( + state: &BeaconState, + spec: &ChainSpec, + ) -> Result { + let current_epoch = state.current_epoch(); + let previous_epoch = state.previous_epoch(); + + let num_previous_epoch_active_vals = state + .get_cached_active_validator_indices(RelativeEpoch::Previous)? + .len(); + let num_current_epoch_active_vals = state + .get_cached_active_validator_indices(RelativeEpoch::Current)? + .len(); + + // Both the current/previous epoch participations are set to a capacity that is slightly + // larger than required. The difference will be due slashed-but-active validators. + let mut current_epoch_participation = + SingleEpochParticipationCache::new(num_current_epoch_active_vals, spec); + let mut previous_epoch_participation = + SingleEpochParticipationCache::new(num_previous_epoch_active_vals, spec); + // Contains the set of validators which are either: + // + // - Active in the previous epoch. + // - Slashed, but not yet withdrawable. + // + // Using the full length of `state.validators` is almost always overkill, but it ensures no + // reallocations. + let mut eligible_indices = Vec::with_capacity(state.validators().len()); + + // Iterate through all validators, updating: + // + // 1. Validator participation for current and previous epochs. + // 2. The "eligible indices". + // + // Care is taken to ensure that the ordering of `eligible_indices` is the same as the + // `get_eligible_validator_indices` function in the spec. + for (val_index, val) in state.validators().iter().enumerate() { + if val.is_active_at(current_epoch) { + current_epoch_participation.process_active_validator( + val_index, + state, + RelativeEpoch::Current, + )?; + } + + if val.is_active_at(previous_epoch) { + previous_epoch_participation.process_active_validator( + val_index, + state, + RelativeEpoch::Previous, + )?; + } + + // Note: a validator might still be "eligible" whilst returning `false` to + // `Validator::is_active_at`. + if state.is_eligible_validator(val_index)? { + eligible_indices.push(val_index) + } + } + + eligible_indices.shrink_to_fit(); + current_epoch_participation.shrink_to_fit(); + previous_epoch_participation.shrink_to_fit(); + + Ok(Self { + current_epoch, + current_epoch_participation, + previous_epoch, + previous_epoch_participation, + eligible_indices, + }) + } + + /// Equivalent to the specification `get_eligible_validator_indices` function. + pub fn eligible_validator_indices(&self) -> &[usize] { + &self.eligible_indices + } + + /// Equivalent to the `get_unslashed_participating_indices` function in the specification. + pub fn get_unslashed_participating_indices( + &self, + flag_index: usize, + epoch: Epoch, + ) -> Result { + let participation = if epoch == self.current_epoch { + &self.current_epoch_participation + } else if epoch == self.previous_epoch { + &self.previous_epoch_participation + } else { + return Err(BeaconStateError::EpochOutOfBounds); + }; + + Ok(UnslashedParticipatingIndices { + participation, + flag_index, + }) + } + + /* + * Balances + */ + + pub fn current_epoch_total_active_balance(&self) -> u64 { + self.current_epoch_participation.total_active_balance.get() + } + + pub fn current_epoch_target_attesting_balance(&self) -> Result { + self.current_epoch_participation + .total_flag_balance(TIMELY_TARGET_FLAG_INDEX) + } + + pub fn previous_epoch_total_active_balance(&self) -> u64 { + self.previous_epoch_participation.total_active_balance.get() + } + + pub fn previous_epoch_target_attesting_balance(&self) -> Result { + self.previous_epoch_participation + .total_flag_balance(TIMELY_TARGET_FLAG_INDEX) + } + + pub fn previous_epoch_source_attesting_balance(&self) -> Result { + self.previous_epoch_participation + .total_flag_balance(TIMELY_SOURCE_FLAG_INDEX) + } + + pub fn previous_epoch_head_attesting_balance(&self) -> Result { + self.previous_epoch_participation + .total_flag_balance(TIMELY_HEAD_FLAG_INDEX) + } + + /* + * Active/Unslashed + */ + + pub fn is_active_unslashed_in_previous_epoch(&self, val_index: usize) -> bool { + self.previous_epoch_participation + .unslashed_participating_indices + .contains_key(&val_index) + } + + pub fn is_active_unslashed_in_current_epoch(&self, val_index: usize) -> bool { + self.current_epoch_participation + .unslashed_participating_indices + .contains_key(&val_index) + } + + /* + * Flags + */ + + /// Always returns false for a slashed validator. + pub fn is_previous_epoch_timely_source_attester( + &self, + val_index: usize, + ) -> Result { + self.previous_epoch_participation + .has_flag(val_index, TIMELY_SOURCE_FLAG_INDEX) + } + + /// Always returns false for a slashed validator. + pub fn is_previous_epoch_timely_target_attester( + &self, + val_index: usize, + ) -> Result { + self.previous_epoch_participation + .has_flag(val_index, TIMELY_TARGET_FLAG_INDEX) + } + + /// Always returns false for a slashed validator. + pub fn is_previous_epoch_timely_head_attester(&self, val_index: usize) -> Result { + self.previous_epoch_participation + .has_flag(val_index, TIMELY_HEAD_FLAG_INDEX) + } + + /// Always returns false for a slashed validator. + pub fn is_current_epoch_timely_source_attester(&self, val_index: usize) -> Result { + self.current_epoch_participation + .has_flag(val_index, TIMELY_SOURCE_FLAG_INDEX) + } + + /// Always returns false for a slashed validator. + pub fn is_current_epoch_timely_target_attester(&self, val_index: usize) -> Result { + self.current_epoch_participation + .has_flag(val_index, TIMELY_TARGET_FLAG_INDEX) + } + + /// Always returns false for a slashed validator. + pub fn is_current_epoch_timely_head_attester(&self, val_index: usize) -> Result { + self.current_epoch_participation + .has_flag(val_index, TIMELY_HEAD_FLAG_INDEX) + } +} + +/// Imitates the return value of the `get_unslashed_participating_indices` in the +/// specification. +/// +/// This struct exists to help make the Lighthouse code read more like the specification. +pub struct UnslashedParticipatingIndices<'a> { + participation: &'a SingleEpochParticipationCache, + flag_index: usize, +} + +impl<'a> UnslashedParticipatingIndices<'a> { + /// Returns `Ok(true)` if the given `val_index` is both: + /// + /// - An active validator. + /// - Has `self.flag_index` set. + pub fn contains(&self, val_index: usize) -> Result { + self.participation.has_flag(val_index, self.flag_index) + } + + /// Returns the sum of all balances of validators which have `self.flag_index` set. + /// + /// ## Notes + /// + /// Respects the `EFFECTIVE_BALANCE_INCREMENT` minimum. + pub fn total_balance(&self) -> Result { + self.participation + .total_flag_balances + .get(self.flag_index) + .ok_or(Error::InvalidFlagIndex(self.flag_index)) + .map(Balance::get) + } +} diff --git a/consensus/state_processing/src/per_epoch_processing/altair/rewards_and_penalties.rs b/consensus/state_processing/src/per_epoch_processing/altair/rewards_and_penalties.rs index 6e1475d06d0..5906e0f8d29 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/rewards_and_penalties.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/rewards_and_penalties.rs @@ -1,3 +1,4 @@ +use super::ParticipationCache; use safe_arith::SafeArith; use types::consts::altair::{ PARTICIPATION_FLAG_WEIGHTS, TIMELY_HEAD_FLAG_INDEX, TIMELY_TARGET_FLAG_INDEX, @@ -13,6 +14,7 @@ use crate::per_epoch_processing::{Delta, Error}; /// Spec v1.1.0 pub fn process_rewards_and_penalties( state: &mut BeaconState, + participation_cache: &ParticipationCache, spec: &ChainSpec, ) -> Result<(), Error> { if state.current_epoch() == T::genesis_epoch() { @@ -21,13 +23,20 @@ pub fn process_rewards_and_penalties( let mut deltas = vec![Delta::default(); state.validators().len()]; - let total_active_balance = state.get_total_active_balance(spec)?; + let total_active_balance = participation_cache.current_epoch_total_active_balance(); for flag_index in 0..PARTICIPATION_FLAG_WEIGHTS.len() { - get_flag_index_deltas(&mut deltas, state, flag_index, total_active_balance, spec)?; + get_flag_index_deltas( + &mut deltas, + state, + flag_index, + total_active_balance, + participation_cache, + spec, + )?; } - get_inactivity_penalty_deltas(&mut deltas, state, spec)?; + get_inactivity_penalty_deltas(&mut deltas, state, participation_cache, spec)?; // Apply the deltas, erroring on overflow above but not on overflow below (saturating at 0 // instead). @@ -47,23 +56,23 @@ pub fn get_flag_index_deltas( state: &BeaconState, flag_index: usize, total_active_balance: u64, + participation_cache: &ParticipationCache, spec: &ChainSpec, ) -> Result<(), Error> { let previous_epoch = state.previous_epoch(); let unslashed_participating_indices = - state.get_unslashed_participating_indices(flag_index, previous_epoch, spec)?; + participation_cache.get_unslashed_participating_indices(flag_index, previous_epoch)?; let weight = get_flag_weight(flag_index)?; - let unslashed_participating_balance = - state.get_total_balance(&unslashed_participating_indices, spec)?; + let unslashed_participating_balance = unslashed_participating_indices.total_balance()?; let unslashed_participating_increments = unslashed_participating_balance.safe_div(spec.effective_balance_increment)?; let active_increments = total_active_balance.safe_div(spec.effective_balance_increment)?; - for index in state.get_eligible_validator_indices()? { + for &index in participation_cache.eligible_validator_indices() { let base_reward = get_base_reward(state, index, total_active_balance, spec)?; let mut delta = Delta::default(); - if unslashed_participating_indices.contains(&(index as usize)) { + if unslashed_participating_indices.contains(index as usize)? { if !state.is_in_inactivity_leak(spec) { let reward_numerator = base_reward .safe_mul(weight)? @@ -94,18 +103,16 @@ pub fn get_flag_weight(flag_index: usize) -> Result { pub fn get_inactivity_penalty_deltas( deltas: &mut Vec, state: &BeaconState, + participation_cache: &ParticipationCache, spec: &ChainSpec, ) -> Result<(), Error> { let previous_epoch = state.previous_epoch(); - let matching_target_indices = state.get_unslashed_participating_indices( - TIMELY_TARGET_FLAG_INDEX, - previous_epoch, - spec, - )?; - for index in state.get_eligible_validator_indices()? { + let matching_target_indices = participation_cache + .get_unslashed_participating_indices(TIMELY_TARGET_FLAG_INDEX, previous_epoch)?; + for &index in participation_cache.eligible_validator_indices() { let mut delta = Delta::default(); - if !matching_target_indices.contains(&index) { + if !matching_target_indices.contains(index)? { let penalty_numerator = state .get_validator(index)? .effective_balance diff --git a/consensus/state_processing/src/per_epoch_processing/base.rs b/consensus/state_processing/src/per_epoch_processing/base.rs index c28d4b17803..1c1b8bfb715 100644 --- a/consensus/state_processing/src/per_epoch_processing/base.rs +++ b/consensus/state_processing/src/per_epoch_processing/base.rs @@ -1,7 +1,4 @@ use super::{process_registry_updates, process_slashings, EpochProcessingSummary, Error}; -pub use crate::per_epoch_processing::validator_statuses::{ - TotalBalances, ValidatorStatus, ValidatorStatuses, -}; use crate::per_epoch_processing::{ effective_balance_updates::process_effective_balance_updates, historical_roots_update::process_historical_roots_update, @@ -11,10 +8,12 @@ pub use justification_and_finalization::process_justification_and_finalization; pub use participation_record_updates::process_participation_record_updates; pub use rewards_and_penalties::process_rewards_and_penalties; use types::{BeaconState, ChainSpec, EthSpec, RelativeEpoch}; +pub use validator_statuses::{TotalBalances, ValidatorStatus, ValidatorStatuses}; pub mod justification_and_finalization; pub mod participation_record_updates; pub mod rewards_and_penalties; +pub mod validator_statuses; pub fn process_epoch( state: &mut BeaconState, @@ -69,7 +68,7 @@ pub fn process_epoch( // Rotate the epoch caches to suit the epoch transition. state.advance_caches()?; - Ok(EpochProcessingSummary { + Ok(EpochProcessingSummary::Base { total_balances: validator_statuses.total_balances, statuses: validator_statuses.statuses, }) diff --git a/consensus/state_processing/src/per_epoch_processing/base/rewards_and_penalties.rs b/consensus/state_processing/src/per_epoch_processing/base/rewards_and_penalties.rs index d0983a20fb8..39e4243e436 100644 --- a/consensus/state_processing/src/per_epoch_processing/base/rewards_and_penalties.rs +++ b/consensus/state_processing/src/per_epoch_processing/base/rewards_and_penalties.rs @@ -1,8 +1,8 @@ use crate::common::{base::get_base_reward, decrease_balance, increase_balance}; -use crate::per_epoch_processing::validator_statuses::{ - TotalBalances, ValidatorStatus, ValidatorStatuses, +use crate::per_epoch_processing::{ + base::{TotalBalances, ValidatorStatus, ValidatorStatuses}, + Delta, Error, }; -use crate::per_epoch_processing::{Delta, Error}; use safe_arith::SafeArith; use std::array::IntoIter as ArrayIter; use types::{BeaconState, ChainSpec, EthSpec}; diff --git a/consensus/state_processing/src/per_epoch_processing/validator_statuses.rs b/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs similarity index 100% rename from consensus/state_processing/src/per_epoch_processing/validator_statuses.rs rename to consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs diff --git a/consensus/state_processing/src/per_epoch_processing/epoch_processing_summary.rs b/consensus/state_processing/src/per_epoch_processing/epoch_processing_summary.rs new file mode 100644 index 00000000000..25170169b06 --- /dev/null +++ b/consensus/state_processing/src/per_epoch_processing/epoch_processing_summary.rs @@ -0,0 +1,280 @@ +use super::{ + altair::{participation_cache::Error as ParticipationCacheError, ParticipationCache}, + base::{validator_statuses::InclusionInfo, TotalBalances, ValidatorStatus}, +}; +use crate::metrics; + +/// Provides a summary of validator participation during the epoch. +#[derive(PartialEq, Debug)] +pub enum EpochProcessingSummary { + Base { + total_balances: TotalBalances, + statuses: Vec, + }, + Altair { + participation_cache: ParticipationCache, + }, +} + +impl EpochProcessingSummary { + /// Updates some Prometheus metrics with some values in `self`. + #[cfg(feature = "metrics")] + pub fn observe_metrics(&self) -> Result<(), ParticipationCacheError> { + metrics::set_gauge( + &metrics::PARTICIPATION_PREV_EPOCH_HEAD_ATTESTING_GWEI_TOTAL, + self.previous_epoch_head_attesting_balance()? as i64, + ); + metrics::set_gauge( + &metrics::PARTICIPATION_PREV_EPOCH_TARGET_ATTESTING_GWEI_TOTAL, + self.previous_epoch_target_attesting_balance()? as i64, + ); + metrics::set_gauge( + &metrics::PARTICIPATION_PREV_EPOCH_SOURCE_ATTESTING_GWEI_TOTAL, + self.previous_epoch_source_attesting_balance()? as i64, + ); + metrics::set_gauge( + &metrics::PARTICIPATION_PREV_EPOCH_ACTIVE_GWEI_TOTAL, + self.previous_epoch_total_active_balance() as i64, + ); + + Ok(()) + } + + /// Returns the sum of the effective balance of all validators in the current epoch. + pub fn current_epoch_total_active_balance(&self) -> u64 { + match self { + EpochProcessingSummary::Base { total_balances, .. } => total_balances.current_epoch(), + EpochProcessingSummary::Altair { + participation_cache, + } => participation_cache.current_epoch_total_active_balance(), + } + } + + /// Returns the sum of the effective balance of all validators in the current epoch who + /// included an attestation that matched the target. + pub fn current_epoch_target_attesting_balance(&self) -> Result { + match self { + EpochProcessingSummary::Base { total_balances, .. } => { + Ok(total_balances.current_epoch_target_attesters()) + } + EpochProcessingSummary::Altair { + participation_cache, + } => participation_cache.current_epoch_target_attesting_balance(), + } + } + + /// Returns the sum of the effective balance of all validators in the previous epoch. + pub fn previous_epoch_total_active_balance(&self) -> u64 { + match self { + EpochProcessingSummary::Base { total_balances, .. } => total_balances.previous_epoch(), + EpochProcessingSummary::Altair { + participation_cache, + } => participation_cache.previous_epoch_total_active_balance(), + } + } + + /// Returns `true` if `val_index` was included in the active validator indices in the current + /// epoch *and* the validator is not slashed. + /// + /// ## Notes + /// + /// Always returns `false` for an unknown `val_index`. + pub fn is_active_unslashed_in_current_epoch(&self, val_index: usize) -> bool { + match self { + EpochProcessingSummary::Base { statuses, .. } => { + statuses.get(val_index).map_or(false, |s| { + s.is_current_epoch_target_attester && !s.is_slashed + }) + } + EpochProcessingSummary::Altair { + participation_cache, + .. + } => participation_cache.is_active_unslashed_in_current_epoch(val_index), + } + } + + /// Returns `true` if `val_index` had a target-matching attestation included on chain in the + /// current epoch. + /// + /// ## Differences between Base and Altair + /// + /// - Base: active validators return `true`. + /// - Altair: only active and *unslashed* validators return `true`. + /// + /// ## Notes + /// + /// Always returns `false` for an unknown `val_index`. + pub fn is_current_epoch_target_attester( + &self, + val_index: usize, + ) -> Result { + match self { + EpochProcessingSummary::Base { statuses, .. } => Ok(statuses + .get(val_index) + .map_or(false, |s| s.is_current_epoch_target_attester)), + EpochProcessingSummary::Altair { + participation_cache, + .. + } => participation_cache.is_current_epoch_timely_target_attester(val_index), + } + } + + /// Returns the sum of the effective balance of all validators in the previous epoch who + /// included an attestation that matched the target. + pub fn previous_epoch_target_attesting_balance(&self) -> Result { + match self { + EpochProcessingSummary::Base { total_balances, .. } => { + Ok(total_balances.previous_epoch_target_attesters()) + } + EpochProcessingSummary::Altair { + participation_cache, + } => participation_cache.previous_epoch_target_attesting_balance(), + } + } + + /// Returns the sum of the effective balance of all validators in the previous epoch who + /// included an attestation that matched the head. + /// + /// ## Differences between Base and Altair + /// + /// - Base: any attestation can match the head. + /// - Altair: only "timely" attestations can match the head. + pub fn previous_epoch_head_attesting_balance(&self) -> Result { + match self { + EpochProcessingSummary::Base { total_balances, .. } => { + Ok(total_balances.previous_epoch_head_attesters()) + } + EpochProcessingSummary::Altair { + participation_cache, + } => participation_cache.previous_epoch_head_attesting_balance(), + } + } + + /// Returns the sum of the effective balance of all validators in the previous epoch who + /// included an attestation that matched the source. + /// + /// ## Differences between Base and Altair + /// + /// - Base: any attestation can match the source. + /// - Altair: only "timely" attestations can match the source. + pub fn previous_epoch_source_attesting_balance(&self) -> Result { + match self { + EpochProcessingSummary::Base { total_balances, .. } => { + Ok(total_balances.previous_epoch_attesters()) + } + EpochProcessingSummary::Altair { + participation_cache, + } => participation_cache.previous_epoch_source_attesting_balance(), + } + } + + /// Returns `true` if `val_index` was included in the active validator indices in the previous + /// epoch *and* the validator is not slashed. + /// + /// ## Notes + /// + /// Always returns `false` for an unknown `val_index`. + pub fn is_active_unslashed_in_previous_epoch(&self, val_index: usize) -> bool { + match self { + EpochProcessingSummary::Base { statuses, .. } => statuses + .get(val_index) + .map_or(false, |s| s.is_active_in_previous_epoch && s.is_slashed), + EpochProcessingSummary::Altair { + participation_cache, + .. + } => participation_cache.is_active_unslashed_in_previous_epoch(val_index), + } + } + + /// Returns `true` if `val_index` had a target-matching attestation included on chain in the + /// previous epoch. + /// + /// ## Notes + /// + /// Always returns `false` for an unknown `val_index`. + pub fn is_previous_epoch_target_attester( + &self, + val_index: usize, + ) -> Result { + match self { + EpochProcessingSummary::Base { statuses, .. } => Ok(statuses + .get(val_index) + .map_or(false, |s| s.is_previous_epoch_target_attester)), + EpochProcessingSummary::Altair { + participation_cache, + .. + } => participation_cache.is_previous_epoch_timely_target_attester(val_index), + } + } + + /// Returns `true` if `val_index` had a head-matching attestation included on chain in the + /// previous epoch. + /// + /// ## Differences between Base and Altair + /// + /// - Base: any attestation can match the head. + /// - Altair: only "timely" attestations can match the head. + /// + /// ## Notes + /// + /// Always returns `false` for an unknown `val_index`. + pub fn is_previous_epoch_head_attester( + &self, + val_index: usize, + ) -> Result { + match self { + EpochProcessingSummary::Base { statuses, .. } => Ok(statuses + .get(val_index) + .map_or(false, |s| s.is_previous_epoch_head_attester)), + EpochProcessingSummary::Altair { + participation_cache, + .. + } => participation_cache.is_previous_epoch_timely_head_attester(val_index), + } + } + + /// Returns `true` if `val_index` had a source-matching attestation included on chain in the + /// previous epoch. + /// + /// ## Differences between Base and Altair + /// + /// - Base: any attestation can match the head. + /// - Altair: only "timely" attestations can match the source. + /// + /// ## Notes + /// + /// Always returns `false` for an unknown `val_index`. + pub fn is_previous_epoch_source_attester( + &self, + val_index: usize, + ) -> Result { + match self { + EpochProcessingSummary::Base { statuses, .. } => Ok(statuses + .get(val_index) + .map_or(false, |s| s.is_previous_epoch_attester)), + EpochProcessingSummary::Altair { + participation_cache, + .. + } => participation_cache.is_previous_epoch_timely_source_attester(val_index), + } + } + + /// Returns information about the inclusion distance for `val_index` for the previous epoch. + /// + /// ## Differences between Base and Altair + /// + /// - Base: always returns `Some` if the validator had an attestation included on-chain. + /// - Altair: always returns `None`. + /// + /// ## Notes + /// + /// Always returns `false` for an unknown `val_index`. + pub fn previous_epoch_inclusion_info(&self, val_index: usize) -> Option { + match self { + EpochProcessingSummary::Base { statuses, .. } => { + statuses.get(val_index).and_then(|s| s.inclusion_info) + } + EpochProcessingSummary::Altair { .. } => None, + } + } +} diff --git a/consensus/state_processing/src/per_epoch_processing/errors.rs b/consensus/state_processing/src/per_epoch_processing/errors.rs index 651bf41ca26..04797c56342 100644 --- a/consensus/state_processing/src/per_epoch_processing/errors.rs +++ b/consensus/state_processing/src/per_epoch_processing/errors.rs @@ -1,3 +1,4 @@ +use crate::per_epoch_processing::altair::participation_cache::Error as ParticipationCacheError; use types::{BeaconStateError, InconsistentFork}; #[derive(Debug, PartialEq)] @@ -23,6 +24,7 @@ pub enum EpochProcessingError { InconsistentStateFork(InconsistentFork), InvalidJustificationBit(ssz_types::Error), InvalidFlagIndex(usize), + ParticipationCache(ParticipationCacheError), } impl From for EpochProcessingError { @@ -49,6 +51,12 @@ impl From for EpochProcessingError { } } +impl From for EpochProcessingError { + fn from(e: ParticipationCacheError) -> EpochProcessingError { + EpochProcessingError::ParticipationCache(e) + } +} + #[derive(Debug, PartialEq)] pub enum InclusionError { /// The validator did not participate in an attestation in this period. diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index b96d1201628..86445e12b9d 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -13,7 +13,6 @@ use serde_derive::{Deserialize, Serialize}; use ssz::{ssz_encode, Decode, DecodeError, Encode}; use ssz_derive::{Decode, Encode}; use ssz_types::{typenum::Unsigned, BitVector, FixedVector}; -use std::collections::HashSet; use std::convert::TryInto; use std::{fmt, mem, sync::Arc}; use superstruct::superstruct; @@ -1504,60 +1503,15 @@ impl BeaconState { self.clone_with(CloneConfig::committee_caches_only()) } - /// Get the unslashed participating indices for a given `flag_index`. - /// - /// The `self` state must be Altair or later. - pub fn get_unslashed_participating_indices( - &self, - flag_index: usize, - epoch: Epoch, - spec: &ChainSpec, - ) -> Result, Error> { - let epoch_participation = if epoch == self.current_epoch() { - self.current_epoch_participation()? - } else if epoch == self.previous_epoch() { - self.previous_epoch_participation()? - } else { - return Err(Error::EpochOutOfBounds); - }; - let active_validator_indices = self.get_active_validator_indices(epoch, spec)?; - itertools::process_results( - active_validator_indices.into_iter().map(|val_index| { - let has_flag = epoch_participation - .get(val_index) - .ok_or(Error::ParticipationOutOfBounds(val_index))? - .has_flag(flag_index)?; - let not_slashed = !self.get_validator(val_index)?.slashed; - Ok((val_index, has_flag && not_slashed)) - }), - |iter| { - iter.filter(|(_, eligible)| *eligible) - .map(|(validator_index, _)| validator_index) - .collect() - }, - ) - } - - pub fn get_eligible_validator_indices(&self) -> Result, Error> { + pub fn is_eligible_validator(&self, val_index: usize) -> Result { match self { BeaconState::Base(_) => Err(Error::IncorrectStateVariant), BeaconState::Altair(_) => { let previous_epoch = self.previous_epoch(); - Ok(self - .validators() - .iter() - .enumerate() - .filter_map(|(i, val)| { - if val.is_active_at(previous_epoch) - || (val.slashed - && previous_epoch + Epoch::new(1) < val.withdrawable_epoch) - { - Some(i) - } else { - None - } - }) - .collect()) + self.get_validator(val_index).map(|val| { + val.is_active_at(previous_epoch) + || (val.slashed && previous_epoch + Epoch::new(1) < val.withdrawable_epoch) + }) } } } diff --git a/testing/ef_tests/src/cases/epoch_processing.rs b/testing/ef_tests/src/cases/epoch_processing.rs index 8ca3775f06d..b1800e5310b 100644 --- a/testing/ef_tests/src/cases/epoch_processing.rs +++ b/testing/ef_tests/src/cases/epoch_processing.rs @@ -5,7 +5,6 @@ use crate::decode::{ssz_decode_state, yaml_decode_file}; use crate::type_name; use crate::type_name::TypeName; use serde_derive::Deserialize; -use state_processing::per_epoch_processing::validator_statuses::ValidatorStatuses; use state_processing::per_epoch_processing::{ altair, base, effective_balance_updates::process_effective_balance_updates, @@ -87,7 +86,7 @@ impl EpochTransition for JustificationAndFinalization { fn run(state: &mut BeaconState, spec: &ChainSpec) -> Result<(), EpochProcessingError> { match state { BeaconState::Base(_) => { - let mut validator_statuses = ValidatorStatuses::new(state, spec)?; + let mut validator_statuses = base::ValidatorStatuses::new(state, spec)?; validator_statuses.process_attestations(state)?; base::process_justification_and_finalization( state, @@ -95,7 +94,10 @@ impl EpochTransition for JustificationAndFinalization { spec, ) } - BeaconState::Altair(_) => altair::process_justification_and_finalization(state, spec), + BeaconState::Altair(_) => altair::process_justification_and_finalization( + state, + &altair::ParticipationCache::new(state, spec).unwrap(), + ), } } } @@ -104,11 +106,15 @@ impl EpochTransition for RewardsAndPenalties { fn run(state: &mut BeaconState, spec: &ChainSpec) -> Result<(), EpochProcessingError> { match state { BeaconState::Base(_) => { - let mut validator_statuses = ValidatorStatuses::new(state, spec)?; + let mut validator_statuses = base::ValidatorStatuses::new(state, spec)?; validator_statuses.process_attestations(state)?; base::process_rewards_and_penalties(state, &mut validator_statuses, spec) } - BeaconState::Altair(_) => altair::process_rewards_and_penalties(state, spec), + BeaconState::Altair(_) => altair::process_rewards_and_penalties( + state, + &altair::ParticipationCache::new(state, spec).unwrap(), + spec, + ), } } } @@ -123,7 +129,7 @@ impl EpochTransition for Slashings { fn run(state: &mut BeaconState, spec: &ChainSpec) -> Result<(), EpochProcessingError> { match state { BeaconState::Base(_) => { - let mut validator_statuses = ValidatorStatuses::new(&state, spec)?; + let mut validator_statuses = base::ValidatorStatuses::new(&state, spec)?; validator_statuses.process_attestations(&state)?; process_slashings( state, @@ -135,7 +141,9 @@ impl EpochTransition for Slashings { BeaconState::Altair(_) => { process_slashings( state, - state.get_total_active_balance(spec)?, + altair::ParticipationCache::new(state, spec) + .unwrap() + .current_epoch_total_active_balance(), spec.proportional_slashing_multiplier_altair, spec, )?; @@ -198,7 +206,11 @@ impl EpochTransition for InactivityUpdates { fn run(state: &mut BeaconState, spec: &ChainSpec) -> Result<(), EpochProcessingError> { match state { BeaconState::Base(_) => Ok(()), - BeaconState::Altair(_) => altair::process_inactivity_updates(state, spec), + BeaconState::Altair(_) => altair::process_inactivity_updates( + state, + &altair::ParticipationCache::new(state, spec).unwrap(), + spec, + ), } } } diff --git a/testing/ef_tests/src/cases/rewards.rs b/testing/ef_tests/src/cases/rewards.rs index df9c1766199..10fd1b4f133 100644 --- a/testing/ef_tests/src/cases/rewards.rs +++ b/testing/ef_tests/src/cases/rewards.rs @@ -4,11 +4,10 @@ use crate::decode::{ssz_decode_file, ssz_decode_state, yaml_decode_file}; use compare_fields_derive::CompareFields; use serde_derive::Deserialize; use ssz_derive::{Decode, Encode}; -use state_processing::per_epoch_processing::validator_statuses::ValidatorStatuses; use state_processing::{ per_epoch_processing::{ - altair::{self, rewards_and_penalties::get_flag_index_deltas}, - base::{self, rewards_and_penalties::AttestationDelta}, + altair::{self, rewards_and_penalties::get_flag_index_deltas, ParticipationCache}, + base::{self, rewards_and_penalties::AttestationDelta, ValidatorStatuses}, Delta, }, EpochProcessingError, @@ -187,7 +186,14 @@ fn compute_altair_flag_deltas( spec: &ChainSpec, ) -> Result { let mut deltas = vec![Delta::default(); state.validators().len()]; - get_flag_index_deltas(&mut deltas, state, flag_index, total_active_balance, spec)?; + get_flag_index_deltas( + &mut deltas, + state, + flag_index, + total_active_balance, + &ParticipationCache::new(state, spec).unwrap(), + spec, + )?; Ok(convert_altair_deltas(deltas)) } @@ -196,7 +202,12 @@ fn compute_altair_inactivity_deltas( spec: &ChainSpec, ) -> Result { let mut deltas = vec![Delta::default(); state.validators().len()]; - altair::rewards_and_penalties::get_inactivity_penalty_deltas(&mut deltas, state, spec)?; + altair::rewards_and_penalties::get_inactivity_penalty_deltas( + &mut deltas, + state, + &ParticipationCache::new(state, spec).unwrap(), + spec, + )?; Ok(convert_altair_deltas(deltas)) }