From 0f5593c6cfd357da065a0120bff9ea4f08c37bdb Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 22 Jul 2021 12:11:43 +1000 Subject: [PATCH] Altair validator client and API --- Cargo.lock | 3 + beacon_node/beacon_chain/src/beacon_chain.rs | 79 ++- beacon_node/beacon_chain/src/errors.rs | 1 + beacon_node/beacon_chain/src/test_utils.rs | 8 +- beacon_node/http_api/Cargo.toml | 6 + beacon_node/http_api/src/lib.rs | 140 ++++- beacon_node/http_api/src/sync_committees.rs | 300 ++++++++++ beacon_node/http_api/tests/common.rs | 142 +++++ beacon_node/http_api/tests/fork_tests.rs | 194 +++++++ beacon_node/http_api/tests/main.rs | 6 + beacon_node/http_api/tests/tests.rs | 154 ++--- beacon_node/network/Cargo.toml | 1 + .../network/src/beacon_processor/tests.rs | 20 +- .../beacon_processor/worker/gossip_methods.rs | 4 +- .../src/beacon_processor/worker/mod.rs | 6 +- .../beacon_processor/worker/rpc_methods.rs | 2 +- .../beacon_processor/worker/sync_methods.rs | 4 +- beacon_node/network/src/service/tests.rs | 16 +- common/eth2/src/lib.rs | 140 +++-- common/eth2/src/types.rs | 8 + common/rest_types/Cargo.toml | 27 - consensus/types/Cargo.toml | 1 + consensus/types/src/beacon_state.rs | 26 + consensus/types/src/chain_spec.rs | 29 + consensus/types/src/lib.rs | 4 +- .../types/src/sync_committee_contribution.rs | 6 +- consensus/types/src/sync_duty.rs | 83 +++ validator_client/Cargo.toml | 1 + validator_client/src/duties_service.rs | 76 ++- validator_client/src/duties_service/sync.rs | 418 ++++++++++++++ validator_client/src/fork_service.rs | 5 - validator_client/src/http_metrics/metrics.rs | 15 + validator_client/src/lib.rs | 22 +- .../src/sync_committee_service.rs | 535 ++++++++++++++++++ validator_client/src/validator_store.rs | 96 +++- 35 files changed, 2307 insertions(+), 271 deletions(-) create mode 100644 beacon_node/http_api/src/sync_committees.rs create mode 100644 beacon_node/http_api/tests/common.rs create mode 100644 beacon_node/http_api/tests/fork_tests.rs create mode 100644 beacon_node/http_api/tests/main.rs delete mode 100644 common/rest_types/Cargo.toml create mode 100644 consensus/types/src/sync_duty.rs create mode 100644 validator_client/src/duties_service/sync.rs create mode 100644 validator_client/src/sync_committee_service.rs diff --git a/Cargo.lock b/Cargo.lock index 0a6f4391cf7..c2c464973df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2753,6 +2753,7 @@ version = "0.1.0" dependencies = [ "beacon_chain", "bs58", + "discv5", "environment", "eth1", "eth2", @@ -4286,6 +4287,7 @@ name = "network" version = "0.2.0" dependencies = [ "beacon_chain", + "discv5", "environment", "error-chain", "eth2_libp2p", @@ -7311,6 +7313,7 @@ dependencies = [ "futures", "hex", "hyper", + "itertools 0.10.1", "lazy_static", "libc", "libsecp256k1", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 39678a4ecbb..ddb6b8d508d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -42,7 +42,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::BeaconForkChoiceStore; use crate::BeaconSnapshot; use crate::{metrics, BeaconChainError}; -use eth2::types::{EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead}; +use eth2::types::{EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead, SyncDuty}; use fork_choice::ForkChoice; use futures::channel::mpsc::Sender; use itertools::process_results; @@ -1211,6 +1211,16 @@ impl BeaconChain { .get_by_slot_and_root(slot, attestation_data_root) } + /// Return an aggregated `SyncCommitteeContribution` matching the given `root`. + pub fn get_aggregated_sync_committee_contribution( + &self, + sync_contribution_data: &SyncContributionData, + ) -> Option> { + self.naive_sync_aggregation_pool + .read() + .get(sync_contribution_data) + } + /// Produce an unaggregated `Attestation` that is valid for the given `slot` and `index`. /// /// The produced `Attestation` will not be valid until it has been signed by exactly one @@ -1749,6 +1759,19 @@ impl BeaconChain { Ok(()) } + /// Attempt to obtain sync committee duties from the head. + pub fn sync_committee_duties_from_head( + &self, + epoch: Epoch, + validator_indices: &[u64], + ) -> Result>, Error> { + self.with_head(move |head| { + head.beacon_state + .get_sync_committee_duties(epoch, validator_indices, &self.spec) + .map_err(Error::SyncDutiesError) + }) + } + /// Attempt to verify and import a chain of blocks to `self`. /// /// The provided blocks _must_ each reference the previous block via `block.parent_root` (i.e., @@ -2480,6 +2503,22 @@ impl BeaconChain { let proposer_index = state.get_beacon_proposer_index(state.slot(), &self.spec)? as u64; let voluntary_exits = self.op_pool.get_voluntary_exits(&state, &self.spec).into(); + // Closure to fetch a sync aggregate in cases where it is required. + let get_sync_aggregate = || -> Result, BlockProductionError> { + Ok(self + .op_pool + .get_sync_aggregate(&state) + .map_err(BlockProductionError::OpPoolError)? + .unwrap_or_else(|| { + warn!( + self.log, + "Producing block with no sync contributions"; + "slot" => state.slot(), + ); + SyncAggregate::new() + })) + }; + let inner_block = match state { BeaconState::Base(_) => BeaconBlock::Base(BeaconBlockBase { slot, @@ -2497,24 +2536,26 @@ impl BeaconChain { voluntary_exits, }, }), - BeaconState::Altair(_) => BeaconBlock::Altair(BeaconBlockAltair { - slot, - proposer_index, - parent_root, - state_root: Hash256::zero(), - body: BeaconBlockBodyAltair { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings: proposer_slashings.into(), - attester_slashings: attester_slashings.into(), - attestations, - deposits, - voluntary_exits, - // FIXME(altair): put a sync aggregate from the pool here (once implemented) - sync_aggregate: SyncAggregate::new(), - }, - }), + BeaconState::Altair(_) => { + let sync_aggregate = get_sync_aggregate()?; + BeaconBlock::Altair(BeaconBlockAltair { + slot, + proposer_index, + parent_root, + state_root: Hash256::zero(), + body: BeaconBlockBodyAltair { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings: proposer_slashings.into(), + attester_slashings: attester_slashings.into(), + attestations, + deposits, + voluntary_exits, + sync_aggregate, + }, + }) + } }; let block = SignedBeaconBlock::from_block( diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index c4a0bb6d4a6..7479e88ff12 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -114,6 +114,7 @@ pub enum BeaconChainError { state_epoch: Epoch, shuffling_epoch: Epoch, }, + SyncDutiesError(BeaconStateError), InconsistentForwardsIter { request_slot: Slot, slot: Slot, diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 9aba7cc4e6c..4835f7bef10 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -151,7 +151,7 @@ pub fn test_spec() -> ChainSpec { pub struct BeaconChainHarness { pub validator_keypairs: Vec, - pub chain: BeaconChain, + pub chain: Arc>, pub spec: ChainSpec, pub data_dir: TempDir, pub shutdown_receiver: Receiver, @@ -266,7 +266,7 @@ impl BeaconChainHarness> { Self { spec: chain.spec.clone(), - chain, + chain: Arc::new(chain), validator_keypairs, data_dir, shutdown_receiver, @@ -311,7 +311,7 @@ impl BeaconChainHarness> { Self { spec: chain.spec.clone(), - chain, + chain: Arc::new(chain), validator_keypairs, data_dir, shutdown_receiver, @@ -353,7 +353,7 @@ impl BeaconChainHarness> { Self { spec: chain.spec.clone(), - chain, + chain: Arc::new(chain), validator_keypairs, data_dir, shutdown_receiver, diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 0f288cfea0c..1e6dffa45f2 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -3,6 +3,7 @@ name = "http_api" version = "0.1.0" authors = ["Paul Hauner "] edition = "2018" +autotests = false # using a single test binary compiles faster [dependencies] warp = { git = "https://github.com/paulhauner/warp ", branch = "cors-wildcard" } @@ -34,4 +35,9 @@ futures = "0.3.8" store = { path = "../store" } environment = { path = "../../lighthouse/environment" } tree_hash = "0.1.1" +discv5 = { version = "0.1.0-beta.8", features = ["libp2p"] } sensitive_url = { path = "../../common/sensitive_url" } + +[[test]] +name = "bn_http_api_tests" +path = "tests/main.rs" diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index ebe56eb1931..e8d61fcf16c 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -10,6 +10,7 @@ mod block_id; mod metrics; mod proposer_duties; mod state_id; +mod sync_committees; mod validator_inclusion; use beacon_chain::{ @@ -39,7 +40,8 @@ use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ Attestation, AttesterSlashing, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBeaconBlock, - SignedVoluntaryExit, Slot, + SignedContributionAndProof, SignedVoluntaryExit, Slot, SyncCommitteeMessage, + SyncContributionData, }; use warp::http::StatusCode; use warp::sse::Event; @@ -1251,6 +1253,28 @@ pub fn serve( }) }); + // POST beacon/pool/sync_committees + let post_beacon_pool_sync_committees = beacon_pool_path + .clone() + .and(warp::path("sync_committees")) + .and(warp::path::end()) + .and(warp::body::json()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .and_then( + |chain: Arc>, + signatures: Vec, + network_tx: UnboundedSender>, + log: Logger| { + blocking_json_task(move || { + sync_committees::process_sync_committee_signatures( + signatures, network_tx, &chain, log, + )?; + Ok(api_types::GenericResponse::from(())) + }) + }, + ); + /* * config/fork_schedule */ @@ -1773,12 +1797,57 @@ pub fn serve( }, ); + // POST validator/duties/sync + let post_validator_duties_sync = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("duties")) + .and(warp::path("sync")) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid epoch".to_string(), + )) + })) + .and(warp::path::end()) + .and(not_while_syncing_filter.clone()) + .and(warp::body::json()) + .and(chain_filter.clone()) + .and_then( + |epoch: Epoch, indices: api_types::ValidatorIndexData, chain: Arc>| { + blocking_json_task(move || { + sync_committees::sync_committee_duties(epoch, &indices.0, &chain) + }) + }, + ); + + // GET validator/sync_committee_contribution + let get_validator_sync_committee_contribution = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("sync_committee_contribution")) + .and(warp::path::end()) + .and(warp::query::()) + .and(not_while_syncing_filter.clone()) + .and(chain_filter.clone()) + .and_then( + |sync_committee_data: SyncContributionData, chain: Arc>| { + blocking_json_task(move || { + chain + .get_aggregated_sync_committee_contribution(&sync_committee_data) + .map(api_types::GenericResponse::from) + .ok_or_else(|| { + warp_utils::reject::custom_not_found( + "no matching sync contribution found".to_string(), + ) + }) + }) + }, + ); + // POST validator/aggregate_and_proofs let post_validator_aggregate_and_proofs = eth1_v1 .and(warp::path("validator")) .and(warp::path("aggregate_and_proofs")) .and(warp::path::end()) - .and(not_while_syncing_filter) + .and(not_while_syncing_filter.clone()) .and(chain_filter.clone()) .and(warp::body::json()) .and(network_tx_filter.clone()) @@ -1874,13 +1943,39 @@ pub fn serve( }, ); + let post_validator_contribution_and_proofs = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("contribution_and_proofs")) + .and(warp::path::end()) + .and(not_while_syncing_filter) + .and(chain_filter.clone()) + .and(warp::body::json()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .and_then( + |chain: Arc>, + contributions: Vec>, + network_tx: UnboundedSender>, + log: Logger| { + blocking_json_task(move || { + sync_committees::process_signed_contribution_and_proofs( + contributions, + network_tx, + &chain, + log, + )?; + Ok(api_types::GenericResponse::from(())) + }) + }, + ); + // POST validator/beacon_committee_subscriptions let post_validator_beacon_committee_subscriptions = eth1_v1 .and(warp::path("validator")) .and(warp::path("beacon_committee_subscriptions")) .and(warp::path::end()) .and(warp::body::json()) - .and(network_tx_filter) + .and(network_tx_filter.clone()) .and(chain_filter.clone()) .and_then( |subscriptions: Vec, @@ -1914,6 +2009,38 @@ pub fn serve( }, ); + // POST validator/sync_committee_subscriptions + let post_validator_sync_committee_subscriptions = eth1_v1 + .and(warp::path("validator")) + .and(warp::path("sync_committee_subscriptions")) + .and(warp::path::end()) + .and(warp::body::json()) + .and(network_tx_filter) + .and(chain_filter.clone()) + .and_then( + |subscriptions: Vec, + network_tx: UnboundedSender>, + chain: Arc>| { + blocking_json_task(move || { + for subscription in subscriptions { + chain + .validator_monitor + .write() + .auto_register_local_validator(subscription.validator_index); + + publish_network_message( + &network_tx, + NetworkMessage::SyncCommitteeSubscribe { + subscriptions: vec![subscription], + }, + )?; + } + + Ok(()) + }) + }, + ); + // GET lighthouse/health let get_lighthouse_health = warp::path("lighthouse") .and(warp::path("health")) @@ -2233,6 +2360,7 @@ pub fn serve( .or(get_validator_blocks.boxed()) .or(get_validator_attestation_data.boxed()) .or(get_validator_aggregate_attestation.boxed()) + .or(get_validator_sync_committee_contribution.boxed()) .or(get_lighthouse_health.boxed()) .or(get_lighthouse_syncing.boxed()) .or(get_lighthouse_peers.boxed()) @@ -2254,9 +2382,13 @@ pub fn serve( .or(post_beacon_pool_attester_slashings.boxed()) .or(post_beacon_pool_proposer_slashings.boxed()) .or(post_beacon_pool_voluntary_exits.boxed()) + .or(post_beacon_pool_sync_committees.boxed()) .or(post_validator_duties_attester.boxed()) + .or(post_validator_duties_sync.boxed()) .or(post_validator_aggregate_and_proofs.boxed()) - .or(post_validator_beacon_committee_subscriptions.boxed()), + .or(post_validator_contribution_and_proofs.boxed()) + .or(post_validator_beacon_committee_subscriptions.boxed()) + .or(post_validator_sync_committee_subscriptions.boxed()), )) .recover(warp_utils::reject::handle_rejection) .with(slog_logging(log.clone())) diff --git a/beacon_node/http_api/src/sync_committees.rs b/beacon_node/http_api/src/sync_committees.rs new file mode 100644 index 00000000000..56dd04fed07 --- /dev/null +++ b/beacon_node/http_api/src/sync_committees.rs @@ -0,0 +1,300 @@ +//! Handlers for sync committee endpoints. + +use crate::publish_pubsub_message; +use beacon_chain::sync_committee_verification::{ + Error as SyncVerificationError, VerifiedSyncCommitteeMessage, +}; +use beacon_chain::{ + BeaconChain, BeaconChainError, BeaconChainTypes, StateSkipConfig, + MAXIMUM_GOSSIP_CLOCK_DISPARITY, +}; +use eth2::types::{self as api_types}; +use eth2_libp2p::PubsubMessage; +use network::NetworkMessage; +use slog::{error, warn, Logger}; +use slot_clock::SlotClock; +use std::cmp::max; +use std::collections::HashMap; +use tokio::sync::mpsc::UnboundedSender; +use types::{ + slot_data::SlotData, BeaconStateError, Epoch, EthSpec, SignedContributionAndProof, + SyncCommitteeMessage, SyncDuty, SyncSubnetId, +}; + +/// The struct that is returned to the requesting HTTP client. +type SyncDuties = api_types::GenericResponse>; + +/// Handles a request from the HTTP API for sync committee duties. +pub fn sync_committee_duties( + request_epoch: Epoch, + request_indices: &[u64], + chain: &BeaconChain, +) -> Result { + let altair_fork_epoch = if let Some(altair_fork_epoch) = chain.spec.altair_fork_epoch { + altair_fork_epoch + } else { + // Empty response for networks with Altair disabled. + return Ok(convert_to_response(vec![])); + }; + + // Try using the head's sync committees to satisfy the request. This should be sufficient for + // the vast majority of requests. Rather than checking if we think the request will succeed in a + // way prone to data races, we attempt the request immediately and check the error code. + match chain.sync_committee_duties_from_head(request_epoch, request_indices) { + Ok(duties) => return Ok(convert_to_response(duties)), + Err(BeaconChainError::SyncDutiesError(BeaconStateError::SyncCommitteeNotKnown { + .. + })) + | Err(BeaconChainError::SyncDutiesError(BeaconStateError::IncorrectStateVariant)) => (), + Err(e) => return Err(warp_utils::reject::beacon_chain_error(e)), + } + + let duties = duties_from_state_load(request_epoch, request_indices, altair_fork_epoch, chain) + .map_err(|e| match e { + BeaconChainError::SyncDutiesError(BeaconStateError::SyncCommitteeNotKnown { + current_epoch, + .. + }) => warp_utils::reject::custom_bad_request(format!( + "invalid epoch: {}, current epoch: {}", + request_epoch, current_epoch + )), + e => warp_utils::reject::beacon_chain_error(e), + })?; + Ok(convert_to_response(duties)) +} + +/// Slow path for duties: load a state and use it to compute the duties. +fn duties_from_state_load( + request_epoch: Epoch, + request_indices: &[u64], + altair_fork_epoch: Epoch, + chain: &BeaconChain, +) -> Result>, BeaconChainError> { + // Determine what the current epoch would be if we fast-forward our system clock by + // `MAXIMUM_GOSSIP_CLOCK_DISPARITY`. + // + // Most of the time, `tolerant_current_epoch` will be equal to `current_epoch`. However, during + // the first `MAXIMUM_GOSSIP_CLOCK_DISPARITY` duration of the epoch `tolerant_current_epoch` + // will equal `current_epoch + 1` + let current_epoch = chain.epoch()?; + + let tolerant_current_epoch = chain + .slot_clock + .now_with_future_tolerance(MAXIMUM_GOSSIP_CLOCK_DISPARITY) + .ok_or_else(|| BeaconChainError::UnableToReadSlot)? + .epoch(T::EthSpec::slots_per_epoch()); + + let max_sync_committee_period = tolerant_current_epoch.sync_committee_period(&chain.spec)? + 1; + let sync_committee_period = request_epoch.sync_committee_period(&chain.spec)?; + + if tolerant_current_epoch < altair_fork_epoch { + // Empty response if the epoch is pre-Altair. + Ok(vec![]) + } else if sync_committee_period <= max_sync_committee_period { + // Load the state at the start of the *previous* sync committee period. + // This is sufficient for historical duties, and efficient in the case where the head + // is lagging the current epoch and we need duties for the next period (because we only + // have to transition the head to start of the current period). + // + // We also need to ensure that the load slot is after the Altair fork. + let load_slot = max( + chain.spec.epochs_per_sync_committee_period * sync_committee_period.saturating_sub(1), + altair_fork_epoch, + ) + .start_slot(T::EthSpec::slots_per_epoch()); + + let state = chain.state_at_slot(load_slot, StateSkipConfig::WithoutStateRoots)?; + + state + .get_sync_committee_duties(request_epoch, request_indices, &chain.spec) + .map_err(BeaconChainError::SyncDutiesError) + } else { + Err(BeaconChainError::SyncDutiesError( + BeaconStateError::SyncCommitteeNotKnown { + current_epoch, + epoch: request_epoch, + }, + )) + } +} + +fn convert_to_response(duties: Vec>) -> SyncDuties { + api_types::GenericResponse::from( + duties + .into_iter() + .filter_map(|maybe_duty| maybe_duty) + .collect::>(), + ) +} + +/// Receive sync committee duties, storing them in the pools & broadcasting them. +pub fn process_sync_committee_signatures( + sync_committee_signatures: Vec, + network_tx: UnboundedSender>, + chain: &BeaconChain, + log: Logger, +) -> Result<(), warp::reject::Rejection> { + let mut failures = vec![]; + + for (i, sync_committee_signature) in sync_committee_signatures.iter().enumerate() { + let subnet_positions = match get_subnet_positions_for_sync_committee_message( + sync_committee_signature, + chain, + ) { + Ok(positions) => positions, + Err(e) => { + error!( + log, + "Unable to compute subnet positions for sync message"; + "error" => ?e, + "slot" => sync_committee_signature.slot, + ); + failures.push(api_types::Failure::new(i, format!("Verification: {:?}", e))); + continue; + } + }; + + // Verify and publish on all relevant subnets. + // + // The number of assigned subnets on any practical network should be ~1, so the apparent + // inefficiency of verifying multiple times is not a real inefficiency. + let mut verified_for_pool = None; + for subnet_id in subnet_positions.keys().copied() { + match VerifiedSyncCommitteeMessage::verify( + sync_committee_signature.clone(), + subnet_id, + chain, + ) { + Ok(verified) => { + publish_pubsub_message( + &network_tx, + PubsubMessage::SyncCommitteeMessage(Box::new(( + subnet_id, + verified.sync_message().clone(), + ))), + )?; + + verified_for_pool = Some(verified); + } + Err(e) => { + error!( + log, + "Failure verifying sync committee signature for gossip"; + "error" => ?e, + "request_index" => i, + "slot" => sync_committee_signature.slot, + "validator_index" => sync_committee_signature.validator_index, + ); + failures.push(api_types::Failure::new(i, format!("Verification: {:?}", e))); + } + } + } + + if let Some(verified) = verified_for_pool { + if let Err(e) = chain.add_to_naive_sync_aggregation_pool(verified) { + error!( + log, + "Unable to add sync committee signature to pool"; + "error" => ?e, + "slot" => sync_committee_signature.slot, + "validator_index" => sync_committee_signature.validator_index, + ); + } + } + } + + if failures.is_empty() { + Ok(()) + } else { + Err(warp_utils::reject::indexed_bad_request( + "error processing sync committee signatures".to_string(), + failures, + )) + } +} + +/// Get the set of all subnet assignments for a `SyncCommitteeMessage`. +pub fn get_subnet_positions_for_sync_committee_message( + sync_message: &SyncCommitteeMessage, + chain: &BeaconChain, +) -> Result>, SyncVerificationError> { + let pubkey = chain + .validator_pubkey_bytes(sync_message.validator_index as usize)? + .ok_or(SyncVerificationError::UnknownValidatorIndex( + sync_message.validator_index as usize, + ))?; + let sync_committee = chain.sync_committee_at_next_slot(sync_message.get_slot())?; + Ok(sync_committee.subcommittee_positions_for_public_key(&pubkey)?) +} + +/// Receive signed contributions and proofs, storing them in the op pool and broadcasting. +pub fn process_signed_contribution_and_proofs( + signed_contribution_and_proofs: Vec>, + network_tx: UnboundedSender>, + chain: &BeaconChain, + log: Logger, +) -> Result<(), warp::reject::Rejection> { + let mut verified_contributions = Vec::with_capacity(signed_contribution_and_proofs.len()); + let mut failures = vec![]; + + // Verify contributions & broadcast to the network. + for (index, contribution) in signed_contribution_and_proofs.into_iter().enumerate() { + let aggregator_index = contribution.message.aggregator_index; + let subcommittee_index = contribution.message.contribution.subcommittee_index; + let contribution_slot = contribution.message.contribution.slot; + + match chain.verify_sync_contribution_for_gossip(contribution) { + Ok(verified_contribution) => { + publish_pubsub_message( + &network_tx, + PubsubMessage::SignedContributionAndProof(Box::new( + verified_contribution.aggregate().clone(), + )), + )?; + + // FIXME(altair): notify validator monitor + verified_contributions.push((index, verified_contribution)); + } + // If we already know the contribution, don't broadcast it or attempt to + // further verify it. Return success. + Err(SyncVerificationError::SyncContributionAlreadyKnown(_)) => continue, + Err(e) => { + error!( + log, + "Failure verifying signed contribution and proof"; + "error" => ?e, + "request_index" => index, + "aggregator_index" => aggregator_index, + "subcommittee_index" => subcommittee_index, + "contribution_slot" => contribution_slot, + ); + failures.push(api_types::Failure::new( + index, + format!("Verification: {:?}", e), + )); + } + } + } + + // Add to the block inclusion pool. + for (index, verified_contribution) in verified_contributions { + if let Err(e) = chain.add_contribution_to_block_inclusion_pool(verified_contribution) { + warn!( + log, + "Could not add verified sync contribution to the inclusion pool"; + "error" => ?e, + "request_index" => index, + ); + failures.push(api_types::Failure::new(index, format!("Op pool: {:?}", e))); + } + } + + if !failures.is_empty() { + Err(warp_utils::reject::indexed_bad_request( + "error processing contribution and proofs".to_string(), + failures, + )) + } else { + Ok(()) + } +} diff --git a/beacon_node/http_api/tests/common.rs b/beacon_node/http_api/tests/common.rs new file mode 100644 index 00000000000..24ffd38c80f --- /dev/null +++ b/beacon_node/http_api/tests/common.rs @@ -0,0 +1,142 @@ +use beacon_chain::{ + test_utils::{BeaconChainHarness, EphemeralHarnessType}, + BeaconChain, BeaconChainTypes, +}; +use discv5::enr::{CombinedKey, EnrBuilder}; +use eth2::{BeaconNodeHttpClient, Timeouts}; +use eth2_libp2p::{ + rpc::methods::{MetaData, MetaDataV2}, + types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield, SyncState}, + Enr, NetworkGlobals, PeerId, +}; +use http_api::{Config, Context}; +use network::NetworkMessage; +use sensitive_url::SensitiveUrl; +use slog::Logger; +use std::future::Future; +use std::net::{Ipv4Addr, SocketAddr}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{mpsc, oneshot}; +use types::{test_utils::generate_deterministic_keypairs, ChainSpec, EthSpec}; + +pub const TCP_PORT: u16 = 42; +pub const UDP_PORT: u16 = 42; +pub const SEQ_NUMBER: u64 = 0; +pub const EXTERNAL_ADDR: &str = "/ip4/0.0.0.0/tcp/9000"; + +/// HTTP API tester that allows interaction with the underlying beacon chain harness. +pub struct InteractiveTester { + pub harness: BeaconChainHarness>, + pub client: BeaconNodeHttpClient, + pub network_rx: mpsc::UnboundedReceiver>, + _server_shutdown: oneshot::Sender<()>, +} + +/// The result of calling `create_api_server`. +/// +/// Glue-type between `tests::ApiTester` and `InteractiveTester`. +pub struct ApiServer> { + pub server: SFut, + pub listening_socket: SocketAddr, + pub shutdown_tx: oneshot::Sender<()>, + pub network_rx: tokio::sync::mpsc::UnboundedReceiver>, + pub local_enr: Enr, + pub external_peer_id: PeerId, +} + +impl InteractiveTester { + pub fn new(spec: Option, validator_count: usize) -> Self { + let harness = BeaconChainHarness::new( + E::default(), + spec, + generate_deterministic_keypairs(validator_count), + ); + + let ApiServer { + server, + listening_socket, + shutdown_tx: _server_shutdown, + network_rx, + .. + } = create_api_server(harness.chain.clone(), harness.logger().clone()); + + tokio::spawn(server); + + let client = BeaconNodeHttpClient::new( + SensitiveUrl::parse(&format!( + "http://{}:{}", + listening_socket.ip(), + listening_socket.port() + )) + .unwrap(), + Timeouts::set_all(Duration::from_secs(1)), + ); + + Self { + harness, + client, + network_rx, + _server_shutdown, + } + } +} + +pub fn create_api_server( + chain: Arc>, + log: Logger, +) -> ApiServer> { + let (network_tx, network_rx) = mpsc::unbounded_channel(); + + // Default metadata + let meta_data = MetaData::V2(MetaDataV2 { + seq_number: SEQ_NUMBER, + attnets: EnrAttestationBitfield::::default(), + syncnets: EnrSyncCommitteeBitfield::::default(), + }); + let enr_key = CombinedKey::generate_secp256k1(); + let enr = EnrBuilder::new("v4").build(&enr_key).unwrap(); + let network_globals = + NetworkGlobals::new(enr.clone(), TCP_PORT, UDP_PORT, meta_data, vec![], &log); + + let peer_id = PeerId::random(); + network_globals + .peers + .write() + .connect_ingoing(&peer_id, EXTERNAL_ADDR.parse().unwrap(), None); + + *network_globals.sync_state.write() = SyncState::Synced; + + let eth1_service = eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()); + + let context = Arc::new(Context { + config: Config { + enabled: true, + listen_addr: Ipv4Addr::new(127, 0, 0, 1), + listen_port: 0, + allow_origin: None, + serve_legacy_spec: true, + }, + chain: Some(chain.clone()), + network_tx: Some(network_tx), + network_globals: Some(Arc::new(network_globals)), + eth1_service: Some(eth1_service), + log, + }); + let ctx = context.clone(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let server_shutdown = async { + // It's not really interesting why this triggered, just that it happened. + let _ = shutdown_rx.await; + }; + let (listening_socket, server) = http_api::serve(ctx, server_shutdown).unwrap(); + + ApiServer { + server, + listening_socket, + shutdown_tx, + network_rx, + local_enr: enr, + external_peer_id: peer_id, + } +} diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs new file mode 100644 index 00000000000..ae800a02d5c --- /dev/null +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -0,0 +1,194 @@ +//! Tests for API behaviour across fork boundaries. +use crate::common::*; +use beacon_chain::{test_utils::RelativeSyncCommittee, StateSkipConfig}; +use types::{ChainSpec, Epoch, EthSpec, MinimalEthSpec}; + +type E = MinimalEthSpec; + +fn altair_spec(altair_fork_epoch: Epoch) -> ChainSpec { + let mut spec = E::default_spec(); + spec.altair_fork_epoch = Some(altair_fork_epoch); + spec +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn sync_committee_duties_across_fork() { + let validator_count = E::sync_committee_size(); + let fork_epoch = Epoch::new(8); + let spec = altair_spec(fork_epoch); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let harness = &tester.harness; + let client = &tester.client; + + let all_validators = harness.get_all_validators(); + let all_validators_u64 = all_validators.iter().map(|x| *x as u64).collect::>(); + + assert_eq!(harness.get_current_slot(), 0); + + // Prior to the fork the endpoint should return an empty vec. + let early_duties = client + .post_validator_duties_sync(fork_epoch - 1, &all_validators_u64) + .await + .unwrap() + .data; + assert!(early_duties.is_empty()); + + // If there's a skip slot at the fork slot, the endpoint should return duties, even + // though the head state hasn't transitioned yet. + let fork_slot = fork_epoch.start_slot(E::slots_per_epoch()); + let (genesis_state, genesis_state_root) = harness.get_current_state_and_root(); + let (_, state) = harness + .add_attested_block_at_slot( + fork_slot - 1, + genesis_state, + genesis_state_root, + &all_validators, + ) + .unwrap(); + + harness.advance_slot(); + assert_eq!(harness.get_current_slot(), fork_slot); + + let sync_duties = client + .post_validator_duties_sync(fork_epoch, &all_validators_u64) + .await + .unwrap() + .data; + assert_eq!(sync_duties.len(), E::sync_committee_size()); + + // After applying a block at the fork slot the duties should remain unchanged. + let state_root = state.canonical_root(); + harness + .add_attested_block_at_slot(fork_slot, state, state_root, &all_validators) + .unwrap(); + + assert_eq!( + client + .post_validator_duties_sync(fork_epoch, &all_validators_u64) + .await + .unwrap() + .data, + sync_duties + ); + + // Sync duties should also be available for the next period. + let current_period = fork_epoch.sync_committee_period(&spec).unwrap(); + let next_period_epoch = spec.epochs_per_sync_committee_period * (current_period + 1); + + let next_period_duties = client + .post_validator_duties_sync(next_period_epoch, &all_validators_u64) + .await + .unwrap() + .data; + assert_eq!(next_period_duties.len(), E::sync_committee_size()); + + // Sync duties should *not* be available for the period after the next period. + // We expect a 400 (bad request) response. + let next_next_period_epoch = spec.epochs_per_sync_committee_period * (current_period + 2); + assert_eq!( + client + .post_validator_duties_sync(next_next_period_epoch, &all_validators_u64) + .await + .unwrap_err() + .status() + .unwrap(), + 400 + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn attestations_across_fork_with_skip_slots() { + let validator_count = E::sync_committee_size(); + let fork_epoch = Epoch::new(8); + let spec = altair_spec(fork_epoch); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let harness = &tester.harness; + let client = &tester.client; + + let all_validators = harness.get_all_validators(); + + let fork_slot = fork_epoch.start_slot(E::slots_per_epoch()); + let fork_state = harness + .chain + .state_at_slot(fork_slot, StateSkipConfig::WithStateRoots) + .unwrap(); + + harness.set_current_slot(fork_slot); + + let attestations = harness.make_attestations( + &all_validators, + &fork_state, + fork_state.canonical_root(), + (*fork_state.get_block_root(fork_slot - 1).unwrap()).into(), + fork_slot, + ); + + let unaggregated_attestations = attestations + .iter() + .flat_map(|(atts, _)| atts.iter().map(|(att, _)| att.clone())) + .collect::>(); + + assert!(!unaggregated_attestations.is_empty()); + client + .post_beacon_pool_attestations(&unaggregated_attestations) + .await + .unwrap(); + + let signed_aggregates = attestations + .into_iter() + .filter_map(|(_, op_aggregate)| op_aggregate) + .collect::>(); + assert!(!signed_aggregates.is_empty()); + + client + .post_validator_aggregate_and_proof(&signed_aggregates) + .await + .unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn sync_contributions_across_fork_with_skip_slots() { + let validator_count = E::sync_committee_size(); + let fork_epoch = Epoch::new(8); + let spec = altair_spec(fork_epoch); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let harness = &tester.harness; + let client = &tester.client; + + let fork_slot = fork_epoch.start_slot(E::slots_per_epoch()); + let fork_state = harness + .chain + .state_at_slot(fork_slot, StateSkipConfig::WithStateRoots) + .unwrap(); + + harness.set_current_slot(fork_slot); + + let sync_messages = harness.make_sync_contributions( + &fork_state, + *fork_state.get_block_root(fork_slot - 1).unwrap(), + fork_slot, + RelativeSyncCommittee::Current, + ); + + let sync_committee_messages = sync_messages + .iter() + .flat_map(|(messages, _)| messages.iter().map(|(message, _subnet)| message.clone())) + .collect::>(); + assert!(!sync_committee_messages.is_empty()); + + client + .post_beacon_pool_sync_committee_signatures(&sync_committee_messages) + .await + .unwrap(); + + let signed_contributions = sync_messages + .into_iter() + .filter_map(|(_, op_aggregate)| op_aggregate) + .collect::>(); + assert!(!signed_contributions.is_empty()); + + client + .post_validator_contribution_and_proofs(&signed_contributions) + .await + .unwrap(); +} diff --git a/beacon_node/http_api/tests/main.rs b/beacon_node/http_api/tests/main.rs new file mode 100644 index 00000000000..d10725a0264 --- /dev/null +++ b/beacon_node/http_api/tests/main.rs @@ -0,0 +1,6 @@ +#![cfg(not(debug_assertions))] // Tests are too slow in debug. +#![recursion_limit = "256"] + +pub mod common; +pub mod fork_tests; +pub mod tests; diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 80f325cddba..8a952dde59c 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1,6 +1,4 @@ -#![cfg(not(debug_assertions))] // Tests are too slow in debug. -#![recursion_limit = "256"] - +use crate::common::{create_api_server, ApiServer}; use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, BeaconChain, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY, @@ -9,21 +7,14 @@ use environment::null_logger; use eth2::Error; use eth2::StatusCode; use eth2::{types::*, BeaconNodeHttpClient, Timeouts}; -use eth2_libp2p::discv5::enr::{CombinedKey, EnrBuilder}; -use eth2_libp2p::{ - rpc::methods::{MetaData, MetaDataV2}, - types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield, SyncState}, - Enr, EnrExt, NetworkGlobals, PeerId, -}; +use eth2_libp2p::{Enr, EnrExt, PeerId}; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; -use http_api::{Config, Context}; use network::NetworkMessage; use sensitive_url::SensitiveUrl; use slot_clock::SlotClock; use state_processing::per_slot_processing; use std::convert::TryInto; -use std::net::Ipv4Addr; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; @@ -41,9 +32,6 @@ const VALIDATOR_COUNT: usize = SLOTS_PER_EPOCH as usize; const CHAIN_LENGTH: u64 = SLOTS_PER_EPOCH * 5 - 1; // Make `next_block` an epoch transition const JUSTIFIED_EPOCH: u64 = 4; const FINALIZED_EPOCH: u64 = 3; -const TCP_PORT: u16 = 42; -const UDP_PORT: u16 = 42; -const SEQ_NUMBER: u64 = 0; const EXTERNAL_ADDR: &str = "/ip4/0.0.0.0/tcp/9000"; /// Skipping the slots around the epoch boundary allows us to check that we're obtaining states @@ -74,9 +62,13 @@ struct ApiTester { impl ApiTester { pub fn new() -> Self { - let mut harness = BeaconChainHarness::new( + // This allows for testing voluntary exits without building out a massive chain. + let mut spec = E::default_spec(); + spec.shard_committee_period = 2; + + let harness = BeaconChainHarness::new( MainnetEthSpec, - None, + Some(spec), generate_deterministic_keypairs(VALIDATOR_COUNT), ); @@ -134,13 +126,7 @@ impl ApiTester { let proposer_slashing = harness.make_proposer_slashing(2); let voluntary_exit = harness.make_voluntary_exit(3, harness.chain.epoch().unwrap()); - // Changing this *after* the chain has been initialized is a bit cheeky, but it shouldn't - // cause issue. - // - // This allows for testing voluntary exits without building out a massive chain. - harness.chain.spec.shard_committee_period = 2; - - let chain = Arc::new(harness.chain); + let chain = harness.chain.clone(); assert_eq!( chain.head_info().unwrap().finalized_checkpoint.epoch, @@ -157,56 +143,18 @@ impl ApiTester { "precondition: justification" ); - let (network_tx, network_rx) = mpsc::unbounded_channel(); - let log = null_logger().unwrap(); - // Default metadata - let meta_data = MetaData::V2(MetaDataV2 { - seq_number: SEQ_NUMBER, - attnets: EnrAttestationBitfield::::default(), - syncnets: EnrSyncCommitteeBitfield::::default(), - }); - let enr_key = CombinedKey::generate_secp256k1(); - let enr = EnrBuilder::new("v4").build(&enr_key).unwrap(); - let enr_clone = enr.clone(); - let network_globals = NetworkGlobals::new(enr, TCP_PORT, UDP_PORT, meta_data, vec![], &log); - - let peer_id = PeerId::random(); - network_globals.peers.write().connect_ingoing( - &peer_id, - EXTERNAL_ADDR.parse().unwrap(), - None, - ); - - *network_globals.sync_state.write() = SyncState::Synced; - - let eth1_service = - eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()); - - let context = Arc::new(Context { - config: Config { - enabled: true, - listen_addr: Ipv4Addr::new(127, 0, 0, 1), - listen_port: 0, - allow_origin: None, - serve_legacy_spec: true, - }, - chain: Some(chain.clone()), - network_tx: Some(network_tx), - network_globals: Some(Arc::new(network_globals)), - eth1_service: Some(eth1_service), - log, - }); - let ctx = context.clone(); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let server_shutdown = async { - // It's not really interesting why this triggered, just that it happened. - let _ = shutdown_rx.await; - }; - let (listening_socket, server) = http_api::serve(ctx, server_shutdown).unwrap(); + let ApiServer { + server, + listening_socket, + shutdown_tx, + network_rx, + local_enr, + external_peer_id, + } = create_api_server(chain.clone(), log); - tokio::spawn(async { server.await }); + tokio::spawn(server); let client = BeaconNodeHttpClient::new( SensitiveUrl::parse(&format!( @@ -230,8 +178,8 @@ impl ApiTester { _server_shutdown: shutdown_tx, validator_keypairs: harness.validator_keypairs, network_rx, - local_enr: enr_clone, - external_peer_id: peer_id, + local_enr, + external_peer_id, } } @@ -271,58 +219,20 @@ impl ApiTester { let proposer_slashing = harness.make_proposer_slashing(2); let voluntary_exit = harness.make_voluntary_exit(3, harness.chain.epoch().unwrap()); - let chain = Arc::new(harness.chain); - - let (network_tx, network_rx) = mpsc::unbounded_channel(); + let chain = harness.chain.clone(); let log = null_logger().unwrap(); - // Default metadata - let meta_data = MetaData::V2(MetaDataV2 { - seq_number: SEQ_NUMBER, - attnets: EnrAttestationBitfield::::default(), - syncnets: EnrSyncCommitteeBitfield::::default(), - }); - let enr_key = CombinedKey::generate_secp256k1(); - let enr = EnrBuilder::new("v4").build(&enr_key).unwrap(); - let enr_clone = enr.clone(); - let network_globals = NetworkGlobals::new(enr, TCP_PORT, UDP_PORT, meta_data, vec![], &log); - - let peer_id = PeerId::random(); - network_globals.peers.write().connect_ingoing( - &peer_id, - EXTERNAL_ADDR.parse().unwrap(), - None, - ); - - *network_globals.sync_state.write() = SyncState::Synced; - - let eth1_service = - eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()); - - let context = Arc::new(Context { - config: Config { - enabled: true, - listen_addr: Ipv4Addr::new(127, 0, 0, 1), - listen_port: 0, - allow_origin: None, - serve_legacy_spec: true, - }, - chain: Some(chain.clone()), - network_tx: Some(network_tx), - network_globals: Some(Arc::new(network_globals)), - eth1_service: Some(eth1_service), - log, - }); - let ctx = context.clone(); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let server_shutdown = async { - // It's not really interesting why this triggered, just that it happened. - let _ = shutdown_rx.await; - }; - let (listening_socket, server) = http_api::serve(ctx, server_shutdown).unwrap(); + let ApiServer { + server, + listening_socket, + shutdown_tx, + network_rx, + local_enr, + external_peer_id, + } = create_api_server(chain.clone(), log); - tokio::spawn(async { server.await }); + tokio::spawn(server); let client = BeaconNodeHttpClient::new( SensitiveUrl::parse(&format!( @@ -346,8 +256,8 @@ impl ApiTester { _server_shutdown: shutdown_tx, validator_keypairs: harness.validator_keypairs, network_rx, - local_enr: enr_clone, - external_peer_id: peer_id, + local_enr, + external_peer_id, } } diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 63990a54c88..b7d2191e23d 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -15,6 +15,7 @@ slog-term = "2.6.0" slog-async = "2.5.0" logging = { path = "../../common/logging" } environment = { path = "../../lighthouse/environment" } +discv5 = { version = "0.1.0-beta.3" } [dependencies] beacon_chain = { path = "../beacon_chain" } diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index 30cc1724276..0f491527b27 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -23,8 +23,8 @@ use std::time::Duration; use tokio::runtime::Runtime; use tokio::sync::mpsc; use types::{ - test_utils::generate_deterministic_keypairs, Attestation, AttesterSlashing, MainnetEthSpec, - ProposerSlashing, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, + test_utils::generate_deterministic_keypairs, Attestation, AttesterSlashing, EthSpec, + MainnetEthSpec, ProposerSlashing, SignedBeaconBlock, SignedVoluntaryExit, SubnetId, }; type E = MainnetEthSpec; @@ -71,9 +71,13 @@ impl Drop for TestRig { impl TestRig { pub fn new(chain_length: u64) -> Self { - let mut harness = BeaconChainHarness::new( + // This allows for testing voluntary exits without building out a massive chain. + let mut spec = E::default_spec(); + spec.shard_committee_period = 2; + + let harness = BeaconChainHarness::new( MainnetEthSpec, - None, + Some(spec), generate_deterministic_keypairs(VALIDATOR_COUNT), ); @@ -151,13 +155,7 @@ impl TestRig { let proposer_slashing = harness.make_proposer_slashing(2); let voluntary_exit = harness.make_voluntary_exit(3, harness.chain.epoch().unwrap()); - // Changing this *after* the chain has been initialized is a bit cheeky, but it shouldn't - // cause issue. - // - // This allows for testing voluntary exits without building out a massive chain. - harness.chain.spec.shard_committee_period = 2; - - let chain = Arc::new(harness.chain); + let chain = harness.chain; let (network_tx, _network_rx) = mpsc::unbounded_channel(); diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 4e8397964d0..aae0239720d 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -327,7 +327,7 @@ impl Worker { "Unknown parent for gossip block"; "root" => %block.canonical_root() ); - self.send_sync_committee_message(SyncMessage::UnknownBlock(peer_id, block)); + self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); return; } Err(e @ BlockError::FutureSlot { .. }) @@ -504,7 +504,7 @@ impl Worker { "Block with unknown parent attempted to be processed"; "peer_id" => %peer_id ); - self.send_sync_committee_message(SyncMessage::UnknownBlock(peer_id, block)); + self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block)); } other => { debug!( diff --git a/beacon_node/network/src/beacon_processor/worker/mod.rs b/beacon_node/network/src/beacon_processor/worker/mod.rs index adc1ad66903..58fec22d5f5 100644 --- a/beacon_node/network/src/beacon_processor/worker/mod.rs +++ b/beacon_node/network/src/beacon_processor/worker/mod.rs @@ -25,10 +25,10 @@ impl Worker { /// Send a message to `sync_tx`. /// /// Creates a log if there is an internal error. - fn send_sync_committee_message(&self, message: SyncMessage) { + fn send_sync_message(&self, message: SyncMessage) { self.sync_tx.send(message).unwrap_or_else(|e| { - debug!(self.log, "Could not send message to the sync service, likely shutdown"; - "error" => %e) + debug!(self.log, "Could not send message to the sync service"; + "error" => %e) }); } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 76f94abca50..e1e51ef2c6b 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -99,7 +99,7 @@ impl Worker { finalized_epoch: status.finalized_epoch, finalized_root: status.finalized_root, }; - self.send_sync_committee_message(SyncMessage::AddPeer(peer_id, info)); + self.send_sync_message(SyncMessage::AddPeer(peer_id, info)); } Err(e) => error!(self.log, "Could not process status message"; "error" => ?e), } diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 66a71dbdbf5..db2b8db75ed 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -99,7 +99,7 @@ impl Worker { } }; - self.send_sync_committee_message(SyncMessage::BatchProcessed { + self.send_sync_message(SyncMessage::BatchProcessed { chain_id, epoch, result, @@ -117,7 +117,7 @@ impl Worker { match self.process_blocks(downloaded_blocks.iter().rev()) { (_, Err(e)) => { debug!(self.log, "Parent lookup failed"; "last_peer_id" => %peer_id, "error" => e); - self.send_sync_committee_message(SyncMessage::ParentLookupFailed { + self.send_sync_message(SyncMessage::ParentLookupFailed { peer_id, chain_head, }) diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index 02be935c1fc..db61d2a88cf 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -35,15 +35,13 @@ mod tests { fn test_dht_persistence() { let log = get_logger(false); - let beacon_chain = Arc::new( - BeaconChainHarness::new_with_store_config( - MinimalEthSpec, - None, - generate_deterministic_keypairs(8), - StoreConfig::default(), - ) - .chain, - ); + let beacon_chain = BeaconChainHarness::new_with_store_config( + MinimalEthSpec, + None, + generate_deterministic_keypairs(8), + StoreConfig::default(), + ) + .chain; let store = beacon_chain.store.clone(); diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index ea4f013a1e2..aab891f1510 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -12,7 +12,7 @@ pub mod lighthouse; pub mod lighthouse_vc; pub mod types; -use self::types::*; +use self::types::{Error as ResponseError, *}; use eth2_libp2p::PeerId; use futures::Stream; use futures_util::StreamExt; @@ -85,6 +85,7 @@ pub struct Timeouts { pub attester_duties: Duration, pub proposal: Duration, pub proposer_duties: Duration, + pub sync_duties: Duration, } impl Timeouts { @@ -94,6 +95,7 @@ impl Timeouts { attester_duties: timeout, proposal: timeout, proposer_duties: timeout, + sync_duties: timeout, } } } @@ -663,15 +665,8 @@ impl BeaconNodeHttpClient { .push("pool") .push("attestations"); - let response = self - .client - .post(path) - .timeout(self.timeouts.attestation) - .json(attestations) - .send() - .await - .map_err(Error::Reqwest)?; - ok_or_indexed_error(response).await?; + self.post_with_timeout(path, &attestations, self.timeouts.attestation) + .await?; Ok(()) } @@ -802,6 +797,41 @@ impl BeaconNodeHttpClient { self.get(path).await } + /// `POST beacon/pool/sync_committees` + pub async fn post_beacon_pool_sync_committee_signatures( + &self, + signatures: &[SyncCommitteeMessage], + ) -> Result<(), Error> { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("sync_committees"); + + self.post(path, &signatures).await?; + + Ok(()) + } + + /// `POST validator/contribution_and_proofs` + pub async fn post_validator_contribution_and_proofs( + &self, + signed_contributions: &[SignedContributionAndProof], + ) -> Result<(), Error> { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("contribution_and_proofs"); + + self.post(path, &signed_contributions).await?; + + Ok(()) + } + /// `GET config/fork_schedule` pub async fn get_config_fork_schedule(&self) -> Result>, Error> { let mut path = self.eth_path()?; @@ -1103,6 +1133,32 @@ impl BeaconNodeHttpClient { .await } + /// `GET validator/sync_committee_contribution` + pub async fn get_validator_sync_committee_contribution( + &self, + sync_committee_data: &SyncContributionData, + ) -> Result>>, Error> { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("sync_committee_contribution"); + + path.query_pairs_mut() + .append_pair("slot", &sync_committee_data.slot.to_string()) + .append_pair( + "beacon_block_root", + &format!("{:?}", sync_committee_data.beacon_block_root), + ) + .append_pair( + "subcommittee_index", + &sync_committee_data.subcommittee_index.to_string(), + ); + + self.get_opt(path).await + } + /// `POST validator/duties/attester/{epoch}` pub async fn post_validator_duties_attester( &self, @@ -1134,15 +1190,8 @@ impl BeaconNodeHttpClient { .push("validator") .push("aggregate_and_proofs"); - let response = self - .client - .post(path) - .timeout(self.timeouts.attestation) - .json(aggregates) - .send() - .await - .map_err(Error::Reqwest)?; - ok_or_indexed_error(response).await?; + self.post_with_timeout(path, &aggregates, self.timeouts.attestation) + .await?; Ok(()) } @@ -1164,6 +1213,23 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST validator/sync_committee_subscriptions` + pub async fn post_validator_sync_committee_subscriptions( + &self, + subscriptions: &[SyncCommitteeSubscription], + ) -> Result<(), Error> { + let mut path = self.eth_path()?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("sync_committee_subscriptions"); + + self.post(path, &subscriptions).await?; + + Ok(()) + } + /// `GET events?topics` pub async fn get_events( &self, @@ -1193,31 +1259,39 @@ impl BeaconNodeHttpClient { Err(e) => Err(Error::Reqwest(e)), })) } -} -/// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an -/// appropriate error message. -async fn ok_or_error(response: Response) -> Result { - let status = response.status(); + /// `POST validator/duties/sync/{epoch}` + pub async fn post_validator_duties_sync( + &self, + epoch: Epoch, + indices: &[u64], + ) -> Result>, Error> { + let mut path = self.eth_path()?; - if status == StatusCode::OK { - Ok(response) - } else if let Ok(message) = response.json().await { - Err(Error::ServerMessage(message)) - } else { - Err(Error::StatusCode(status)) + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("duties") + .push("sync") + .push(&epoch.to_string()); + + self.post_with_timeout_and_response(path, &indices, self.timeouts.sync_duties) + .await } } /// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an -/// appropriate indexed error message. -async fn ok_or_indexed_error(response: Response) -> Result { +/// appropriate error message. +async fn ok_or_error(response: Response) -> Result { let status = response.status(); if status == StatusCode::OK { Ok(response) } else if let Ok(message) = response.json().await { - Err(Error::ServerIndexedMessage(message)) + match message { + ResponseError::Message(message) => Err(Error::ServerMessage(message)), + ResponseError::Indexed(indexed) => Err(Error::ServerIndexedMessage(indexed)), + } } else { Err(Error::StatusCode(status)) } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 94ace16ccc5..139fbcbe0d3 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -10,6 +10,14 @@ use std::fmt; use std::str::{from_utf8, FromStr}; pub use types::*; +/// An API error serializable to JSON. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum Error { + Indexed(IndexedErrorMessage), + Message(ErrorMessage), +} + /// An API error serializable to JSON. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ErrorMessage { diff --git a/common/rest_types/Cargo.toml b/common/rest_types/Cargo.toml deleted file mode 100644 index 3d4c70c1c33..00000000000 --- a/common/rest_types/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "rest_types" -version = "0.2.0" -authors = ["Sigma Prime "] -edition = "2018" - -[dependencies] -types = { path = "../../consensus/types" } -eth2_ssz_derive = "0.1.0" -eth2_ssz = "0.1.2" -eth2_hashing = "0.1.0" -tree_hash = "0.1.0" -state_processing = { path = "../../consensus/state_processing" } -bls = { path = "../../crypto/bls" } -serde = { version = "1.0.110", features = ["derive"] } -rayon = "1.3.0" -hyper = "0.14.4" -tokio = { version = "1.1.0", features = ["sync"] } -environment = { path = "../../lighthouse/environment" } -store = { path = "../../beacon_node/store" } -beacon_chain = { path = "../../beacon_node/beacon_chain" } -serde_json = "1.0.52" -serde_yaml = "0.8.11" - -[target.'cfg(target_os = "linux")'.dependencies] -psutil = "3.1.0" -procinfo = "0.4.2" diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index 6f3d18d2802..cf420e01aa9 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -50,6 +50,7 @@ superstruct = "0.2.0" serde_json = "1.0.58" criterion = "0.3.3" beacon_chain = { path = "../../beacon_node/beacon_chain" } +eth2_interop_keypairs = { path = "../../common/eth2_interop_keypairs" } [features] default = ["sqlite", "legacy-arith"] diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 673bca4a889..b96d1201628 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -832,6 +832,32 @@ impl BeaconState { }) } + /// Get the sync committee duties for a list of validator indices. + /// + /// Will return a `SyncCommitteeNotKnown` error if the `epoch` is out of bounds with respect + /// to the current or next sync committee periods. + pub fn get_sync_committee_duties( + &self, + epoch: Epoch, + validator_indices: &[u64], + spec: &ChainSpec, + ) -> Result>, Error> { + let sync_committee = self.get_built_sync_committee(epoch, spec)?; + + validator_indices + .iter() + .map(|&validator_index| { + let pubkey = self.get_validator(validator_index as usize)?.pubkey; + + Ok(SyncDuty::from_sync_committee( + validator_index, + pubkey, + sync_committee, + )) + }) + .collect() + } + /// Get the canonical root of the `latest_block_header`, filling in its state root if necessary. /// /// It needs filling in on all slots where there isn't a skip. diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 07fa2ba5ae5..8ff8adb62e9 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -687,6 +687,8 @@ where #[cfg(test)] mod tests { use super::*; + use itertools::Itertools; + use safe_arith::SafeArith; #[test] fn test_mainnet_spec_can_be_constructed() { @@ -747,6 +749,33 @@ mod tests { } } } + + // Test that `next_fork_epoch` is consistent with the other functions. + #[test] + fn next_fork_epoch_consistency() { + type E = MainnetEthSpec; + let spec = ChainSpec::mainnet(); + + let mut last_fork_slot = Slot::new(0); + + for (_, fork) in ForkName::list_all().into_iter().tuple_windows() { + if let Some(fork_epoch) = spec.fork_epoch(fork) { + last_fork_slot = fork_epoch.start_slot(E::slots_per_epoch()); + + // Fork is activated at non-zero epoch: check that `next_fork_epoch` returns + // the correct result. + if let Ok(prior_slot) = last_fork_slot.safe_sub(1) { + let (next_fork, next_fork_epoch) = + spec.next_fork_epoch::(prior_slot).unwrap(); + assert_eq!(fork, next_fork); + assert_eq!(spec.fork_epoch(fork).unwrap(), next_fork_epoch); + } + } else { + // Fork is not activated, check that `next_fork_epoch` returns `None`. + assert_eq!(spec.next_fork_epoch::(last_fork_slot), None); + } + } + } } #[cfg(test)] diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 7e23c477cfe..a65ff1fbc04 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -56,6 +56,7 @@ pub mod signed_contribution_and_proof; pub mod signed_voluntary_exit; pub mod signing_data; pub mod sync_committee_subscription; +pub mod sync_duty; pub mod validator; pub mod validator_subscription; pub mod voluntary_exit; @@ -135,9 +136,10 @@ pub use crate::subnet_id::SubnetId; pub use crate::sync_aggregate::SyncAggregate; pub use crate::sync_aggregator_selection_data::SyncAggregatorSelectionData; pub use crate::sync_committee::SyncCommittee; -pub use crate::sync_committee_contribution::SyncCommitteeContribution; +pub use crate::sync_committee_contribution::{SyncCommitteeContribution, SyncContributionData}; pub use crate::sync_committee_message::SyncCommitteeMessage; pub use crate::sync_committee_subscription::SyncCommitteeSubscription; +pub use crate::sync_duty::SyncDuty; pub use crate::sync_selection_proof::SyncSelectionProof; pub use crate::sync_subnet_id::SyncSubnetId; pub use crate::validator::Validator; diff --git a/consensus/types/src/sync_committee_contribution.rs b/consensus/types/src/sync_committee_contribution.rs index a2934090be6..c8fce78a8bb 100644 --- a/consensus/types/src/sync_committee_contribution.rs +++ b/consensus/types/src/sync_committee_contribution.rs @@ -77,9 +77,9 @@ impl SignedRoot for Hash256 {} /// This is not in the spec, but useful for determining uniqueness of sync committee contributions #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Encode, Decode, TreeHash, TestRandom)] pub struct SyncContributionData { - slot: Slot, - beacon_block_root: Hash256, - subcommittee_index: u64, + pub slot: Slot, + pub beacon_block_root: Hash256, + pub subcommittee_index: u64, } impl SyncContributionData { diff --git a/consensus/types/src/sync_duty.rs b/consensus/types/src/sync_duty.rs new file mode 100644 index 00000000000..e3ffe62bfd1 --- /dev/null +++ b/consensus/types/src/sync_duty.rs @@ -0,0 +1,83 @@ +use crate::{EthSpec, SyncCommittee, SyncSubnetId}; +use bls::PublicKeyBytes; +use safe_arith::ArithError; +use serde_derive::{Deserialize, Serialize}; +use std::collections::HashSet; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SyncDuty { + pub pubkey: PublicKeyBytes, + #[serde(with = "serde_utils::quoted_u64")] + pub validator_index: u64, + #[serde(with = "serde_utils::quoted_u64_vec")] + pub validator_sync_committee_indices: Vec, +} + +impl SyncDuty { + /// Create a new `SyncDuty` from the list of validator indices in a sync committee. + pub fn from_sync_committee_indices( + validator_index: u64, + pubkey: PublicKeyBytes, + sync_committee_indices: &[usize], + ) -> Option { + // Positions of the `validator_index` within the committee. + let validator_sync_committee_indices = sync_committee_indices + .iter() + .enumerate() + .filter_map(|(i, &v)| { + if validator_index == v as u64 { + Some(i as u64) + } else { + None + } + }) + .collect(); + Self::new(validator_index, pubkey, validator_sync_committee_indices) + } + + /// Create a new `SyncDuty` from a `SyncCommittee`, which contains the pubkeys but not the + /// indices. + pub fn from_sync_committee( + validator_index: u64, + pubkey: PublicKeyBytes, + sync_committee: &SyncCommittee, + ) -> Option { + let validator_sync_committee_indices = sync_committee + .pubkeys + .iter() + .enumerate() + .filter_map(|(i, committee_pubkey)| { + if &pubkey == committee_pubkey { + Some(i as u64) + } else { + None + } + }) + .collect(); + Self::new(validator_index, pubkey, validator_sync_committee_indices) + } + + /// Create a duty if the `validator_sync_committee_indices` is non-empty. + fn new( + validator_index: u64, + pubkey: PublicKeyBytes, + validator_sync_committee_indices: Vec, + ) -> Option { + if !validator_sync_committee_indices.is_empty() { + Some(SyncDuty { + pubkey, + validator_index, + validator_sync_committee_indices, + }) + } else { + None + } + } + + /// Get the set of subnet IDs for this duty. + pub fn subnet_ids(&self) -> Result, ArithError> { + SyncSubnetId::compute_subnets_for_sync_committee::( + &self.validator_sync_committee_indices, + ) + } +} diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 792255cf07f..b1bf95160db 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -64,5 +64,6 @@ scrypt = { version = "0.5.0", default-features = false } lighthouse_metrics = { path = "../common/lighthouse_metrics" } lazy_static = "1.4.0" fallback = { path = "../common/fallback" } +itertools = "0.10.0" monitoring_api = { path = "../common/monitoring_api" } sensitive_url = { path = "../common/sensitive_url" } diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index aa7edd29e5a..92a21b64394 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -6,6 +6,8 @@ //! The `DutiesService` is also responsible for sending events to the `BlockService` which trigger //! block production. +mod sync; + use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; use crate::{ block_service::BlockServiceNotification, http_metrics::metrics, validator_store::ValidatorStore, @@ -18,6 +20,8 @@ use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use sync::poll_sync_committee_duties; +use sync::SyncDutiesMap; use tokio::{sync::mpsc::Sender, time::sleep}; use types::{ChainSpec, Epoch, EthSpec, Hash256, PublicKeyBytes, SelectionProof, Slot}; @@ -38,6 +42,14 @@ pub enum Error { FailedToDownloadAttesters(String), FailedToProduceSelectionProof, InvalidModulo(ArithError), + ArithError(ArithError), + SyncDutiesNotFound(u64), +} + +impl From for Error { + fn from(e: ArithError) -> Self { + Self::ArithError(e) + } } /// Neatly joins the server-generated `AttesterData` with the locally-generated `selection_proof`. @@ -93,6 +105,7 @@ pub struct DutiesService { /// Maps an epoch to all *local* proposers in this epoch. Notably, this does not contain /// proposals for any validators which are not registered locally. pub proposers: RwLock, + pub sync_duties: SyncDutiesMap, /// Maps a public key to a validator index. There is a task which ensures this map is kept /// up-to-date. pub indices: RwLock, @@ -167,6 +180,20 @@ impl DutiesService { .cloned() .collect() } + + /// Returns public keys for all enabled validators managed by the VC. + pub fn local_pubkeys(&self) -> HashSet { + self.validator_store.voting_pubkeys().into_iter().collect() + } + + /// Returns the validator indices for all known validators in `local_pubkeys`. + pub fn local_indices(&self, local_pubkeys: &HashSet) -> Vec { + let indices_map = self.indices.read(); + local_pubkeys + .iter() + .filter_map(|pubkey| indices_map.get(pubkey).copied()) + .collect() + } } /// Start the service that periodically polls the beacon node for validator duties. This will start @@ -265,6 +292,37 @@ pub fn start_update_service( }, "duties_service_attesters", ); + + // Spawn the task which keeps track of local sync committee duties. + let duties_service = core_duties_service.clone(); + let log = core_duties_service.context.log().clone(); + core_duties_service.context.executor.spawn( + async move { + loop { + if let Err(e) = poll_sync_committee_duties(&duties_service).await { + error!( + log, + "Failed to poll sync committee duties"; + "error" => ?e + ); + } + + // Wait until the next slot before polling again. + // This doesn't mean that the beacon node will get polled every slot + // as the sync duties service will return early if it deems it already has + // enough information. + if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() { + sleep(duration).await; + } else { + // Just sleep for one slot if we are unable to read the system clock, this gives + // us an opportunity for the clock to eventually come good. + sleep(duties_service.slot_clock.slot_duration()).await; + continue; + } + } + }, + "duties_service_sync_committee", + ); } /// Iterate through all the voting pubkeys in the `ValidatorStore` and attempt to learn any unknown @@ -359,22 +417,8 @@ async fn poll_beacon_attesters( let current_epoch = current_slot.epoch(E::slots_per_epoch()); let next_epoch = current_epoch + 1; - let local_pubkeys: HashSet = duties_service - .validator_store - .voting_pubkeys() - .into_iter() - .collect(); - - let local_indices = { - let mut local_indices = Vec::with_capacity(local_pubkeys.len()); - let indices_map = duties_service.indices.read(); - for &pubkey in &local_pubkeys { - if let Some(validator_index) = indices_map.get(&pubkey) { - local_indices.push(*validator_index) - } - } - local_indices - }; + let local_pubkeys = duties_service.local_pubkeys(); + let local_indices = duties_service.local_indices(&local_pubkeys); // Download the duties and update the duties for the current epoch. if let Err(e) = poll_beacon_attesters_for_epoch( diff --git a/validator_client/src/duties_service/sync.rs b/validator_client/src/duties_service/sync.rs new file mode 100644 index 00000000000..4c6e77a9d07 --- /dev/null +++ b/validator_client/src/duties_service/sync.rs @@ -0,0 +1,418 @@ +use crate::duties_service::{DutiesService, Error}; +use itertools::Itertools; +use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use slog::{crit, debug, info, warn}; +use slot_clock::SlotClock; +use std::collections::HashMap; +use std::sync::Arc; +use types::{ + ChainSpec, Epoch, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId, +}; + +pub struct SyncDutiesMap { + /// Map from sync committee period to duties for members of that sync committee. + committees: RwLock>, +} + +#[derive(Default)] +pub struct CommitteeDuties { + /// Map from validator index to validator duties. + /// + /// A `None` value indicates that the validator index is known *not* to be a member of the sync + /// committee, while a `Some` indicates a known member. An absent value indicates that the + /// validator index was not part of the set of local validators when the duties were fetched. + /// This allows us to track changes to the set of local validators. + validators: RwLock>>, +} + +pub struct ValidatorDuties { + /// The sync duty: including validator sync committee indices & pubkey. + duty: SyncDuty, + /// Map from slot & subnet ID to proof that this validator is an aggregator. + /// + /// The slot is the slot at which the signed contribution and proof should be broadcast, + /// which is 1 less than the slot for which the `duty` was computed. + aggregation_proofs: RwLock>, +} + +/// Duties for a single slot. +pub struct SlotDuties { + /// List of duties for all sync committee members at this slot + /// + /// Note: this is intentionally NOT split by subnet so that we only sign + /// one `SyncCommitteeMessage` per validator (recall a validator may be part of multiple + /// subnets). + pub duties: Vec, + /// Map from subnet ID to validator index and selection proof of each aggregator. + pub aggregators: HashMap>, +} + +impl Default for SyncDutiesMap { + fn default() -> Self { + Self { + committees: RwLock::new(HashMap::new()), + } + } +} + +impl SyncDutiesMap { + pub fn all_duties_known(&self, committee_period: u64, validator_indices: &[u64]) -> bool { + self.committees + .read() + .get(&committee_period) + .map_or(false, |committee_duties| { + let validator_duties = committee_duties.validators.read(); + validator_indices + .iter() + .all(|index| validator_duties.contains_key(index)) + }) + } + + pub fn get_or_create_committee_duties<'a, 'b>( + &'a self, + committee_period: u64, + validator_indices: impl IntoIterator, + ) -> MappedRwLockReadGuard<'a, CommitteeDuties> { + let mut committees_writer = self.committees.write(); + + committees_writer + .entry(committee_period) + .or_insert_with(CommitteeDuties::default) + .init(validator_indices); + + // Return shared reference + RwLockReadGuard::map( + RwLockWriteGuard::downgrade(committees_writer), + |committees_reader| &committees_reader[&committee_period], + ) + } + + pub fn get_duties_for_slot( + &self, + wall_clock_slot: Slot, + spec: &ChainSpec, + ) -> Option { + // Sync duties lag their assigned slot by 1 + let duty_slot = wall_clock_slot + 1; + + let sync_committee_period = duty_slot + .epoch(E::slots_per_epoch()) + .sync_committee_period(spec) + .ok()?; + + let committees_reader = self.committees.read(); + let committee_duties = committees_reader.get(&sync_committee_period)?; + + let mut duties = vec![]; + let mut aggregators = HashMap::new(); + + committee_duties + .validators + .read() + .values() + // Filter out non-members & failed subnet IDs. + .filter_map(|opt_duties| { + let duty = opt_duties.as_ref()?; + let subnet_ids = duty.duty.subnet_ids::().ok()?; + Some((duty, subnet_ids)) + }) + // Add duties for members to the vec of all duties, and aggregators to the + // aggregators map. + .for_each(|(validator_duty, subnet_ids)| { + duties.push(validator_duty.duty.clone()); + + let proofs = validator_duty.aggregation_proofs.read(); + + for subnet_id in subnet_ids { + if let Some(proof) = proofs.get(&(wall_clock_slot, subnet_id)) { + aggregators.entry(subnet_id).or_insert_with(Vec::new).push(( + validator_duty.duty.validator_index, + validator_duty.duty.pubkey, + proof.clone(), + )); + } + } + }); + + Some(SlotDuties { + duties, + aggregators, + }) + } +} + +impl CommitteeDuties { + fn init<'b>(&mut self, validator_indices: impl IntoIterator) { + validator_indices.into_iter().for_each(|validator_index| { + self.validators + .get_mut() + .entry(*validator_index) + .or_insert(None); + }) + } +} + +impl ValidatorDuties { + fn new(duty: SyncDuty) -> Self { + Self { + duty, + aggregation_proofs: RwLock::new(HashMap::new()), + } + } +} + +/// Number of epochs to wait from the start of the period before actually fetching duties. +fn epoch_offset(spec: &ChainSpec) -> u64 { + spec.epochs_per_sync_committee_period.as_u64() / 2 +} + +pub async fn poll_sync_committee_duties( + duties_service: &Arc>, +) -> Result<(), Error> { + let sync_duties = &duties_service.sync_duties; + let spec = &duties_service.spec; + let current_epoch = duties_service + .slot_clock + .now() + .ok_or(Error::UnableToReadSlotClock)? + .epoch(E::slots_per_epoch()); + + // If the Altair fork is yet to be activated, do not attempt to poll for duties. + if spec + .altair_fork_epoch + .map_or(true, |altair_epoch| current_epoch < altair_epoch) + { + return Ok(()); + } + + let current_sync_committee_period = current_epoch.sync_committee_period(spec)?; + let next_sync_committee_period = current_sync_committee_period + 1; + + let local_pubkeys = duties_service.local_pubkeys(); + let local_indices = duties_service.local_indices(&local_pubkeys); + + // If duties aren't known for the current period, poll for them + if !sync_duties.all_duties_known(current_sync_committee_period, &local_indices) { + poll_sync_committee_duties_for_period( + duties_service, + &local_indices, + current_sync_committee_period, + ) + .await?; + } + + // If we're past the point in the current period where we should determine duties for the next + // period and they are not yet known, then poll. + if current_epoch.as_u64() % spec.epochs_per_sync_committee_period.as_u64() >= epoch_offset(spec) + && !sync_duties.all_duties_known(next_sync_committee_period, &local_indices) + { + poll_sync_committee_duties_for_period( + duties_service, + &local_indices, + next_sync_committee_period, + ) + .await?; + } + + Ok(()) +} + +pub async fn poll_sync_committee_duties_for_period( + duties_service: &Arc>, + local_indices: &[u64], + sync_committee_period: u64, +) -> Result<(), Error> { + let spec = &duties_service.spec; + let log = duties_service.context.log(); + + info!( + log, + "Fetching sync committee duties"; + "sync_committee_period" => sync_committee_period, + "num_validators" => local_indices.len(), + ); + + let period_start_epoch = spec.epochs_per_sync_committee_period * sync_committee_period; + + let duties_response = duties_service + .beacon_nodes + .first_success(duties_service.require_synced, |beacon_node| async move { + beacon_node + .post_validator_duties_sync(period_start_epoch, local_indices) + .await + }) + .await; + + let duties = match duties_response { + Ok(res) => res.data, + Err(e) => { + warn!( + log, + "Failed to download sync committee duties"; + "sync_committee_period" => sync_committee_period, + "error" => %e, + ); + return Ok(()); + } + }; + + info!(log, "Fetched duties from BN"; "count" => duties.len()); + + // Add duties to map. + let committee_duties = duties_service + .sync_duties + .get_or_create_committee_duties(sync_committee_period, local_indices); + + // Track updated validator indices & pubkeys. + let mut updated_validators = vec![]; + + { + let mut validator_writer = committee_duties.validators.write(); + for duty in duties { + let validator_duties = validator_writer + .get_mut(&duty.validator_index) + .ok_or(Error::SyncDutiesNotFound(duty.validator_index))?; + + let updated = validator_duties.as_ref().map_or(true, |existing_duties| { + let updated_due_to_reorg = existing_duties.duty.validator_sync_committee_indices + != duty.validator_sync_committee_indices; + if updated_due_to_reorg { + warn!( + log, + "Sync committee duties changed"; + "message" => "this could be due to a really long re-org, or a bug" + ); + } + updated_due_to_reorg + }); + + if updated { + info!( + log, + "Validator in sync committee"; + "validator_index" => duty.validator_index, + "sync_committee_period" => sync_committee_period, + ); + + updated_validators.push(duty.clone()); + *validator_duties = Some(ValidatorDuties::new(duty)); + } + } + } + + // Spawn background task to fill in aggregator selection proofs. + let sub_duties_service = duties_service.clone(); + duties_service.context.executor.spawn_blocking( + move || { + fill_in_aggregation_proofs( + sub_duties_service, + &updated_validators, + sync_committee_period, + ) + }, + "duties_service_sync_selection_proofs", + ); + + Ok(()) +} + +pub fn fill_in_aggregation_proofs( + duties_service: Arc>, + validators: &[SyncDuty], + sync_committee_period: u64, +) { + let spec = &duties_service.spec; + let log = duties_service.context.log(); + + // Generate selection proofs for each validator at each slot, one epoch at a time + let start_epoch = spec.epochs_per_sync_committee_period * sync_committee_period; + let end_epoch = start_epoch + spec.epochs_per_sync_committee_period; + + for epoch in (start_epoch.as_u64()..end_epoch.as_u64()).map(Epoch::new) { + // Generate proofs. + let validator_proofs: Vec<(u64, Vec<_>)> = validators + .iter() + .filter_map(|duty| { + let subnet_ids = duty + .subnet_ids::() + .map_err(|e| { + crit!( + log, + "Arithmetic error computing subnet IDs"; + "error" => ?e, + ); + }) + .ok()?; + + let proofs = epoch + .slot_iter(E::slots_per_epoch()) + .cartesian_product(&subnet_ids) + .filter_map(|(duty_slot, &subnet_id)| { + // Construct proof for prior slot. + let slot = duty_slot - 1; + + let proof = duties_service + .validator_store + .produce_sync_selection_proof(&duty.pubkey, slot, subnet_id) + .or_else(|| { + warn!( + log, + "Pubkey missing when signing selection proof"; + "pubkey" => ?duty.pubkey, + "slot" => slot, + ); + None + })?; + + let is_aggregator = proof + .is_aggregator::() + .map_err(|e| { + warn!( + log, + "Error determining is_aggregator"; + "pubkey" => ?duty.pubkey, + "slot" => slot, + "error" => ?e, + ); + }) + .ok()?; + + if is_aggregator { + debug!( + log, + "Validator is sync aggregator"; + "validator_index" => duty.validator_index, + "slot" => slot, + "subnet_id" => %subnet_id, + ); + Some(((slot, subnet_id), proof)) + } else { + None + } + }) + .collect(); + + Some((duty.validator_index, proofs)) + }) + .collect(); + + // Add to global storage (we add regularly in case the proofs are required). + let committee_duties = duties_service.sync_duties.get_or_create_committee_duties( + sync_committee_period, + validator_proofs.iter().map(|(index, _)| index), + ); + let validators_reader = committee_duties.validators.read(); + + for (validator_index, proofs) in validator_proofs { + if let Some(Some(duty)) = validators_reader.get(&validator_index) { + duty.aggregation_proofs.write().extend(proofs); + } else { + debug!( + log, + "Missing sync duty to update"; + "validator_index" => validator_index, + ); + } + } + } +} diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index 2c2df187b23..40842d22f75 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -132,11 +132,6 @@ impl Deref for ForkService { } impl ForkService { - /// Returns the last fork downloaded from the beacon node, if any. - pub fn fork(&self) -> Fork { - *self.fork.read() - } - /// Starts the service that periodically polls for the `Fork`. pub fn start_update_service(self, context: &RuntimeContext) -> Result<(), String> { // Run an immediate update before starting the updater service. diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index 120e6c2c7e6..ca33996d377 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -67,6 +67,21 @@ lazy_static::lazy_static! { "Total count of attempted SelectionProof signings", &["status"] ); + pub static ref SIGNED_SYNC_COMMITTEE_MESSAGES_TOTAL: Result = try_create_int_counter_vec( + "vc_signed_sync_committee_messages_total", + "Total count of attempted SyncCommitteeMessage signings", + &["status"] + ); + pub static ref SIGNED_SYNC_COMMITTEE_CONTRIBUTIONS_TOTAL: Result = try_create_int_counter_vec( + "vc_signed_sync_committee_contributions_total", + "Total count of attempted ContributionAndProof signings", + &["status"] + ); + pub static ref SIGNED_SYNC_SELECTION_PROOFS_TOTAL: Result = try_create_int_counter_vec( + "vc_signed_sync_selection_proofs_total", + "Total count of attempted SyncSelectionProof signings", + &["status"] + ); pub static ref DUTIES_SERVICE_TIMES: Result = try_create_histogram_vec( "vc_duties_service_task_times_seconds", "Duration to perform duties service tasks", diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index d9fe21111be..741a50a6fbe 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -11,6 +11,7 @@ mod http_metrics; mod initialized_validators; mod key_cache; mod notifier; +mod sync_committee_service; mod validator_store; pub mod http_api; @@ -44,6 +45,7 @@ use std::marker::PhantomData; use std::net::SocketAddr; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use sync_committee_service::SyncCommitteeService; use tokio::{ sync::mpsc, time::{sleep, Duration}, @@ -63,6 +65,7 @@ const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2; const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; #[derive(Clone)] pub struct ProductionValidatorClient { @@ -71,6 +74,7 @@ pub struct ProductionValidatorClient { fork_service: ForkService, block_service: BlockService, attestation_service: AttestationService, + sync_committee_service: SyncCommitteeService, validator_store: ValidatorStore, http_api_listen_addr: Option, http_metrics_ctx: Option>>, @@ -256,6 +260,7 @@ impl ProductionValidatorClient { attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT, proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT, proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT, + sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT, } } else { Timeouts::set_all(slot_duration) @@ -339,6 +344,7 @@ impl ProductionValidatorClient { let duties_service = Arc::new(DutiesService { attesters: <_>::default(), proposers: <_>::default(), + sync_duties: <_>::default(), indices: <_>::default(), slot_clock: slot_clock.clone(), beacon_nodes: beacon_nodes.clone(), @@ -369,12 +375,20 @@ impl ProductionValidatorClient { let attestation_service = AttestationServiceBuilder::new() .duties_service(duties_service.clone()) - .slot_clock(slot_clock) + .slot_clock(slot_clock.clone()) .validator_store(validator_store.clone()) .beacon_nodes(beacon_nodes.clone()) .runtime_context(context.service_context("attestation".into())) .build()?; + let sync_committee_service = SyncCommitteeService::new( + duties_service.clone(), + validator_store.clone(), + slot_clock, + beacon_nodes.clone(), + context.service_context("sync_committee".into()), + ); + // Wait until genesis has occured. // // It seems most sensible to move this into the `start_service` function, but I'm caution @@ -387,6 +401,7 @@ impl ProductionValidatorClient { fork_service, block_service, attestation_service, + sync_committee_service, validator_store, config, http_api_listen_addr: None, @@ -419,6 +434,11 @@ impl ProductionValidatorClient { .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start attestation service: {}", e))?; + self.sync_committee_service + .clone() + .start_update_service(&self.context.eth2_config.spec) + .map_err(|e| format!("Unable to start sync committee service: {}", e))?; + spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?; let api_secret = ApiSecret::create_or_open(&self.config.validator_dir)?; diff --git a/validator_client/src/sync_committee_service.rs b/validator_client/src/sync_committee_service.rs new file mode 100644 index 00000000000..4234c9f8687 --- /dev/null +++ b/validator_client/src/sync_committee_service.rs @@ -0,0 +1,535 @@ +use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::{duties_service::DutiesService, validator_store::ValidatorStore}; +use environment::RuntimeContext; +use eth2::types::BlockId; +use futures::future::FutureExt; +use slog::{crit, debug, error, info, trace, warn}; +use slot_clock::SlotClock; +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::time::{sleep, sleep_until, Duration, Instant}; +use types::{ + ChainSpec, EthSpec, Hash256, PublicKeyBytes, Slot, SyncCommitteeSubscription, + SyncContributionData, SyncDuty, SyncSelectionProof, SyncSubnetId, +}; + +pub const SUBSCRIPTION_LOOKAHEAD_EPOCHS: u64 = 4; + +pub struct SyncCommitteeService { + inner: Arc>, +} + +impl Clone for SyncCommitteeService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for SyncCommitteeService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +pub struct Inner { + duties_service: Arc>, + validator_store: ValidatorStore, + slot_clock: T, + beacon_nodes: Arc>, + context: RuntimeContext, + /// Boolean to track whether the service has posted subscriptions to the BN at least once. + /// + /// This acts as a latch that fires once upon start-up, and then never again. + first_subscription_done: AtomicBool, +} + +impl SyncCommitteeService { + pub fn new( + duties_service: Arc>, + validator_store: ValidatorStore, + slot_clock: T, + beacon_nodes: Arc>, + context: RuntimeContext, + ) -> Self { + Self { + inner: Arc::new(Inner { + duties_service, + validator_store, + slot_clock, + beacon_nodes, + context, + first_subscription_done: AtomicBool::new(false), + }), + } + } + + /// Check if the Altair fork has been activated and therefore sync duties should be performed. + /// + /// Slot clock errors are mapped to `false`. + fn altair_fork_activated(&self) -> bool { + self.duties_service + .spec + .altair_fork_epoch + .and_then(|fork_epoch| { + let current_epoch = self.slot_clock.now()?.epoch(E::slots_per_epoch()); + Some(current_epoch >= fork_epoch) + }) + .unwrap_or(false) + } + + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { + let log = self.context.log().clone(); + let slot_duration = Duration::from_secs(spec.seconds_per_slot); + let duration_to_next_slot = self + .slot_clock + .duration_to_next_slot() + .ok_or("Unable to determine duration to next slot")?; + + info!( + log, + "Sync committee service started"; + "next_update_millis" => duration_to_next_slot.as_millis() + ); + + let executor = self.context.executor.clone(); + + let interval_fut = async move { + loop { + if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() { + // Wait for contribution broadcast interval 1/3 of the way through the slot. + let log = self.context.log(); + sleep(duration_to_next_slot + slot_duration / 3).await; + + // Do nothing if the Altair fork has not yet occurred. + if !self.altair_fork_activated() { + continue; + } + + if let Err(e) = self.spawn_contribution_tasks(slot_duration).await { + crit!( + log, + "Failed to spawn sync contribution tasks"; + "error" => e + ) + } else { + trace!( + log, + "Spawned sync contribution tasks"; + ) + } + + // Do subscriptions for future slots/epochs. + self.spawn_subscription_tasks(); + } else { + error!(log, "Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + } + } + }; + + executor.spawn(interval_fut, "sync_committee_service"); + Ok(()) + } + + async fn spawn_contribution_tasks(&self, slot_duration: Duration) -> Result<(), String> { + let log = self.context.log().clone(); + let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + let duration_to_next_slot = self + .slot_clock + .duration_to_next_slot() + .ok_or("Unable to determine duration to next slot")?; + + // If a validator needs to publish a sync aggregate, they must do so at 2/3 + // through the slot. This delay triggers at this time + let aggregate_production_instant = Instant::now() + + duration_to_next_slot + .checked_sub(slot_duration / 3) + .unwrap_or_else(|| Duration::from_secs(0)); + + let slot_duties = self + .duties_service + .sync_duties + .get_duties_for_slot::(slot, &self.duties_service.spec) + .ok_or_else(|| format!("Error fetching duties for slot {}", slot))?; + + if slot_duties.duties.is_empty() { + debug!( + log, + "No local validators in current sync committee"; + "slot" => slot, + ); + return Ok(()); + } + + // Fetch block root for `SyncCommitteeContribution`. + let block_root = self + .beacon_nodes + .first_success(RequireSynced::Yes, |beacon_node| async move { + beacon_node.get_beacon_blocks_root(BlockId::Head).await + }) + .await + .map_err(|e| e.to_string())? + .ok_or_else(|| format!("No block root found for slot {}", slot))? + .data + .root; + + // Spawn one task to publish all of the sync committee signatures. + let validator_duties = slot_duties.duties; + self.inner.context.executor.spawn( + self.clone() + .publish_sync_committee_signatures(slot, block_root, validator_duties) + .map(|_| ()), + "sync_committee_signature_publish", + ); + + let aggregators = slot_duties.aggregators; + self.inner.context.executor.spawn( + self.clone() + .publish_sync_committee_aggregates( + slot, + block_root, + aggregators, + aggregate_production_instant, + ) + .map(|_| ()), + "sync_committee_aggregate_publish", + ); + + Ok(()) + } + + /// Publish sync committee signatures. + async fn publish_sync_committee_signatures( + self, + slot: Slot, + beacon_block_root: Hash256, + validator_duties: Vec, + ) -> Result<(), ()> { + let log = self.context.log().clone(); + + let committee_signatures = validator_duties + .iter() + .filter_map(|duty| { + self.validator_store + .produce_sync_committee_signature( + slot, + beacon_block_root, + duty.validator_index, + &duty.pubkey, + ) + .or_else(|| { + crit!( + log, + "Failed to sign sync committee signature"; + "validator_index" => duty.validator_index, + "slot" => slot, + ); + None + }) + }) + .collect::>(); + + let signatures_slice = &committee_signatures; + + self.beacon_nodes + .first_success(RequireSynced::No, |beacon_node| async move { + beacon_node + .post_beacon_pool_sync_committee_signatures(signatures_slice) + .await + }) + .await + .map_err(|e| { + error!( + log, + "Unable to publish sync committee messages"; + "slot" => slot, + "error" => %e, + ); + })?; + + info!( + log, + "Successfully published sync committee messages"; + "count" => committee_signatures.len(), + "head_block" => ?beacon_block_root, + "slot" => slot, + ); + + Ok(()) + } + + async fn publish_sync_committee_aggregates( + self, + slot: Slot, + beacon_block_root: Hash256, + aggregators: HashMap>, + aggregate_instant: Instant, + ) { + for (subnet_id, subnet_aggregators) in aggregators { + let service = self.clone(); + self.inner.context.executor.spawn( + service + .publish_sync_committee_aggregate_for_subnet( + slot, + beacon_block_root, + subnet_id, + subnet_aggregators, + aggregate_instant, + ) + .map(|_| ()), + "sync_committee_aggregate_publish_subnet", + ); + } + } + + async fn publish_sync_committee_aggregate_for_subnet( + self, + slot: Slot, + beacon_block_root: Hash256, + subnet_id: SyncSubnetId, + subnet_aggregators: Vec<(u64, PublicKeyBytes, SyncSelectionProof)>, + aggregate_instant: Instant, + ) -> Result<(), ()> { + sleep_until(aggregate_instant).await; + + let log = self.context.log(); + + let contribution = self + .beacon_nodes + .first_success(RequireSynced::No, |beacon_node| async move { + let sync_contribution_data = SyncContributionData { + slot, + beacon_block_root, + subcommittee_index: subnet_id.into(), + }; + + beacon_node + .get_validator_sync_committee_contribution::(&sync_contribution_data) + .await + }) + .await + .map_err(|e| { + crit!( + log, + "Failed to produce sync contribution"; + "slot" => slot, + "beacon_block_root" => ?beacon_block_root, + "error" => %e, + ) + })? + .ok_or_else(|| { + crit!( + log, + "No aggregate contribution found"; + "slot" => slot, + "beacon_block_root" => ?beacon_block_root, + ); + })? + .data; + + // Make `SignedContributionAndProof`s + let signed_contributions = subnet_aggregators + .into_iter() + .filter_map(|(aggregator_index, aggregator_pk, selection_proof)| { + self.validator_store + .produce_signed_contribution_and_proof( + aggregator_index, + &aggregator_pk, + contribution.clone(), + selection_proof, + ) + .or_else(|| { + crit!( + log, + "Unable to sign sync committee contribution"; + "slot" => slot, + ); + None + }) + }) + .collect::>(); + + // Publish to the beacon node. + let signed_contributions_slice = &signed_contributions; + self.beacon_nodes + .first_success(RequireSynced::No, |beacon_node| async move { + beacon_node + .post_validator_contribution_and_proofs(signed_contributions_slice) + .await + }) + .await + .map_err(|e| { + error!( + log, + "Unable to publish signed contributions and proofs"; + "slot" => slot, + "error" => %e, + ); + })?; + + info!( + log, + "Successfully published sync contributions"; + "subnet" => %subnet_id, + "beacon_block_root" => %beacon_block_root, + "num_signers" => contribution.aggregation_bits.num_set_bits(), + "slot" => slot, + ); + + Ok(()) + } + + fn spawn_subscription_tasks(&self) { + let service = self.clone(); + let log = self.context.log().clone(); + self.inner.context.executor.spawn( + async move { + service.publish_subscriptions().await.unwrap_or_else(|e| { + error!( + log, + "Error publishing subscriptions"; + "error" => ?e, + ) + }); + }, + "sync_committee_subscription_publish", + ); + } + + async fn publish_subscriptions(self) -> Result<(), String> { + let log = self.context.log().clone(); + let spec = &self.duties_service.spec; + let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + + let mut duty_slots = vec![]; + let mut all_succeeded = true; + + // At the start of every epoch during the current period, re-post the subscriptions + // to the beacon node. This covers the case where the BN has forgotten the subscriptions + // due to a restart, or where the VC has switched to a fallback BN. + let current_period = sync_period_of_slot::(slot, spec)?; + + if !self.first_subscription_done.load(Ordering::Relaxed) + || slot.as_u64() % E::slots_per_epoch() == 0 + { + duty_slots.push((slot, current_period)); + } + + // Near the end of the current period, push subscriptions for the next period to the + // beacon node. We aggressively push every slot in the lead-up, as this is the main way + // that we want to ensure that the BN is subscribed (well in advance). + let lookahead_slot = slot + SUBSCRIPTION_LOOKAHEAD_EPOCHS * E::slots_per_epoch(); + + let lookahead_period = sync_period_of_slot::(lookahead_slot, spec)?; + + if lookahead_period > current_period { + duty_slots.push((lookahead_slot, lookahead_period)); + } + + if duty_slots.is_empty() { + return Ok(()); + } + + // Collect subscriptions. + let mut subscriptions = vec![]; + + for (duty_slot, sync_committee_period) in duty_slots { + info!( + log, + "Fetching subscription duties"; + "duty_slot" => duty_slot, + "current_slot" => slot, + ); + match self + .duties_service + .sync_duties + .get_duties_for_slot::(duty_slot, spec) + { + Some(duties) => subscriptions.extend(subscriptions_from_sync_duties( + duties.duties, + sync_committee_period, + spec, + )), + None => { + warn!( + log, + "Missing duties for subscription"; + "slot" => duty_slot, + ); + all_succeeded = false; + } + } + } + + // Post subscriptions to BN. + info!( + log, + "Posting sync subscriptions to BN"; + "count" => subscriptions.len(), + ); + let subscriptions_slice = &subscriptions; + + for subscription in subscriptions_slice { + debug!( + log, + "Subscription"; + "validator_index" => subscription.validator_index, + "validator_sync_committee_indices" => ?subscription.sync_committee_indices, + "until_epoch" => subscription.until_epoch, + ); + } + + if let Err(e) = self + .beacon_nodes + .first_success(RequireSynced::No, |beacon_node| async move { + beacon_node + .post_validator_sync_committee_subscriptions(subscriptions_slice) + .await + }) + .await + { + error!( + log, + "Unable to post sync committee subscriptions"; + "slot" => slot, + "error" => %e, + ); + all_succeeded = false; + } + + // Disable first-subscription latch once all duties have succeeded once. + if all_succeeded { + self.first_subscription_done.store(true, Ordering::Relaxed); + } + + Ok(()) + } +} + +fn sync_period_of_slot(slot: Slot, spec: &ChainSpec) -> Result { + slot.epoch(E::slots_per_epoch()) + .sync_committee_period(spec) + .map_err(|e| format!("Error computing sync period: {:?}", e)) +} + +fn subscriptions_from_sync_duties( + duties: Vec, + sync_committee_period: u64, + spec: &ChainSpec, +) -> impl Iterator { + let until_epoch = spec.epochs_per_sync_committee_period * (sync_committee_period + 1); + duties + .into_iter() + .map(move |duty| SyncCommitteeSubscription { + validator_index: duty.validator_index, + sync_committee_indices: duty.validator_sync_committee_indices, + until_epoch, + }) +} diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 96024990e67..a1e9257f4da 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -12,7 +12,8 @@ use tempfile::TempDir; use types::{ graffiti::GraffitiString, Attestation, BeaconBlock, ChainSpec, Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof, Signature, SignedAggregateAndProof, - SignedBeaconBlock, SignedRoot, Slot, + SignedBeaconBlock, SignedContributionAndProof, SignedRoot, Slot, SyncCommitteeContribution, + SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, }; use validator_dir::ValidatorDir; @@ -132,8 +133,8 @@ impl ValidatorStore { self.validators.read().num_enabled() } - fn fork(&self) -> Fork { - self.fork_service.fork() + fn fork(&self, epoch: Epoch) -> Fork { + self.spec.fork_at_epoch(epoch) } pub fn randao_reveal( @@ -148,7 +149,7 @@ impl ValidatorStore { let domain = self.spec.get_domain( epoch, Domain::Randao, - &self.fork(), + &self.fork(epoch), self.genesis_validators_root, ); let message = epoch.signing_root(domain); @@ -179,7 +180,7 @@ impl ValidatorStore { } // Check for slashing conditions. - let fork = self.fork(); + let fork = self.fork(block.epoch()); let domain = self.spec.get_domain( block.epoch(), Domain::BeaconProposer, @@ -251,7 +252,7 @@ impl ValidatorStore { } // Checking for slashing conditions. - let fork = self.fork(); + let fork = self.fork(attestation.data.target.epoch); let domain = self.spec.get_domain( attestation.data.target.epoch, @@ -345,6 +346,7 @@ impl ValidatorStore { ) -> Option> { let validators = self.validators.read(); let voting_keypair = &validators.voting_keypair(validator_pubkey)?; + let fork = self.fork(aggregate.data.target.epoch); metrics::inc_counter_vec(&metrics::SIGNED_AGGREGATES_TOTAL, &[metrics::SUCCESS]); @@ -353,7 +355,7 @@ impl ValidatorStore { aggregate, Some(selection_proof), &voting_keypair.sk, - &self.fork(), + &fork, self.genesis_validators_root, &self.spec, )) @@ -374,7 +376,85 @@ impl ValidatorStore { Some(SelectionProof::new::( slot, &voting_keypair.sk, - &self.fork(), + &self.fork(slot.epoch(E::slots_per_epoch())), + self.genesis_validators_root, + &self.spec, + )) + } + + /// Produce a `SyncSelectionProof` for `slot` signed by the secret key of `validator_pubkey`. + pub fn produce_sync_selection_proof( + &self, + validator_pubkey: &PublicKeyBytes, + slot: Slot, + subnet_id: SyncSubnetId, + ) -> Option { + let validators = self.validators.read(); + let voting_keypair = validators.voting_keypair(validator_pubkey)?; + + metrics::inc_counter_vec( + &metrics::SIGNED_SYNC_SELECTION_PROOFS_TOTAL, + &[metrics::SUCCESS], + ); + + Some(SyncSelectionProof::new::( + slot, + subnet_id.into(), + &voting_keypair.sk, + &self.fork(slot.epoch(E::slots_per_epoch())), + self.genesis_validators_root, + &self.spec, + )) + } + + pub fn produce_sync_committee_signature( + &self, + slot: Slot, + beacon_block_root: Hash256, + validator_index: u64, + validator_pubkey: &PublicKeyBytes, + ) -> Option { + let validators = self.validators.read(); + let voting_keypair = validators.voting_keypair(validator_pubkey)?; + + metrics::inc_counter_vec( + &metrics::SIGNED_SYNC_COMMITTEE_MESSAGES_TOTAL, + &[metrics::SUCCESS], + ); + + Some(SyncCommitteeMessage::new::( + slot, + beacon_block_root, + validator_index, + &voting_keypair.sk, + &self.fork(slot.epoch(E::slots_per_epoch())), + self.genesis_validators_root, + &self.spec, + )) + } + + pub fn produce_signed_contribution_and_proof( + &self, + aggregator_index: u64, + aggregator_pubkey: &PublicKeyBytes, + contribution: SyncCommitteeContribution, + selection_proof: SyncSelectionProof, + ) -> Option> { + let validators = self.validators.read(); + let voting_keypair = validators.voting_keypair(aggregator_pubkey)?; + let fork = self.fork(contribution.slot.epoch(E::slots_per_epoch())); + + metrics::inc_counter_vec( + &metrics::SIGNED_SYNC_COMMITTEE_CONTRIBUTIONS_TOTAL, + &[metrics::SUCCESS], + ); + + Some(SignedContributionAndProof::from_aggregate( + aggregator_index, + contribution, + Some(selection_proof), + &voting_keypair.sk, + &fork, self.genesis_validators_root, &self.spec, ))