diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 14733a89ab4..918d0bd29b7 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -70,7 +70,7 @@ use crate::{ kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead, }; -use eth2::types::{EventKind, SseBlock, SseExtendedPayloadAttributes, SyncDuty}; +use eth2::types::{EventKind, SseBlobSidecar, SseBlock, SseExtendedPayloadAttributes, SyncDuty}; use execution_layer::{ BlockProposalContents, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition, PayloadAttributes, PayloadStatus, @@ -2809,6 +2809,14 @@ impl BeaconChain { return Err(BlockError::BlockIsAlreadyKnown); } + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_blob_sidecar_subscribers() { + event_handler.register(EventKind::BlobSidecar(SseBlobSidecar::from_blob_sidecar( + blob.as_blob(), + ))); + } + } + self.data_availability_checker .notify_gossip_blob(blob.as_blob().slot, block_root, &blob); let r = self.check_gossip_blob_availability_and_import(blob).await; @@ -2833,6 +2841,16 @@ impl BeaconChain { return Err(BlockError::BlockIsAlreadyKnown); } + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_blob_sidecar_subscribers() { + for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) { + event_handler.register(EventKind::BlobSidecar( + SseBlobSidecar::from_blob_sidecar(blob), + )); + } + } + } + self.data_availability_checker .notify_rpc_blobs(slot, block_root, &blobs); let r = self diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index b78859e10fe..1c8bc9be85b 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -10,7 +10,7 @@ use crate::block_verification::cheap_state_advance_to_obtain_committees; use crate::data_availability_checker::AvailabilityCheckError; use crate::kzg_utils::{validate_blob, validate_blobs}; use crate::{metrics, BeaconChainError}; -use kzg::Kzg; +use kzg::{Kzg, KzgCommitment}; use slog::{debug, warn}; use ssz_derive::{Decode, Encode}; use ssz_types::VariableList; @@ -182,6 +182,12 @@ impl GossipVerifiedBlob { pub fn slot(&self) -> Slot { self.blob.message.slot } + pub fn index(&self) -> u64 { + self.blob.message.index + } + pub fn kzg_commitment(&self) -> KzgCommitment { + self.blob.message.kzg_commitment + } pub fn proposer_index(&self) -> u64 { self.blob.message.proposer_index } diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index b267cc853f8..8d3a6827946 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -9,6 +9,7 @@ const DEFAULT_CHANNEL_CAPACITY: usize = 16; pub struct ServerSentEventHandler { attestation_tx: Sender>, block_tx: Sender>, + blob_sidecar_tx: Sender>, finalized_tx: Sender>, head_tx: Sender>, exit_tx: Sender>, @@ -31,6 +32,7 @@ impl ServerSentEventHandler { pub fn new_with_capacity(log: Logger, capacity: usize) -> Self { let (attestation_tx, _) = broadcast::channel(capacity); let (block_tx, _) = broadcast::channel(capacity); + let (blob_sidecar_tx, _) = broadcast::channel(capacity); let (finalized_tx, _) = broadcast::channel(capacity); let (head_tx, _) = broadcast::channel(capacity); let (exit_tx, _) = broadcast::channel(capacity); @@ -43,6 +45,7 @@ impl ServerSentEventHandler { Self { attestation_tx, block_tx, + blob_sidecar_tx, finalized_tx, head_tx, exit_tx, @@ -73,6 +76,10 @@ impl ServerSentEventHandler { .block_tx .send(kind) .map(|count| log_count("block", count)), + EventKind::BlobSidecar(_) => self + .blob_sidecar_tx + .send(kind) + .map(|count| log_count("blob sidecar", count)), EventKind::FinalizedCheckpoint(_) => self .finalized_tx .send(kind) @@ -119,6 +126,10 @@ impl ServerSentEventHandler { self.block_tx.subscribe() } + pub fn subscribe_blob_sidecar(&self) -> Receiver> { + self.blob_sidecar_tx.subscribe() + } + pub fn subscribe_finalized(&self) -> Receiver> { self.finalized_tx.subscribe() } @@ -159,6 +170,10 @@ impl ServerSentEventHandler { self.block_tx.receiver_count() > 0 } + pub fn has_blob_sidecar_subscribers(&self) -> bool { + self.blob_sidecar_tx.receiver_count() > 0 + } + pub fn has_finalized_subscribers(&self) -> bool { self.finalized_tx.receiver_count() > 0 } diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs new file mode 100644 index 00000000000..c48cf310a2a --- /dev/null +++ b/beacon_node/beacon_chain/tests/events.rs @@ -0,0 +1,99 @@ +use beacon_chain::blob_verification::GossipVerifiedBlob; +use beacon_chain::test_utils::BeaconChainHarness; +use bls::Signature; +use eth2::types::{EventKind, SseBlobSidecar}; +use rand::rngs::StdRng; +use rand::SeedableRng; +use std::marker::PhantomData; +use std::sync::Arc; +use types::blob_sidecar::FixedBlobSidecarList; +use types::{BlobSidecar, EthSpec, ForkName, MinimalEthSpec, SignedBlobSidecar}; + +type E = MinimalEthSpec; + +/// Verifies that a blob event is emitted when a gossip verified blob is received via gossip or the publish block API. +#[tokio::test] +async fn blob_sidecar_event_on_process_gossip_blob() { + let spec = ForkName::Deneb.make_genesis_spec(E::default_spec()); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // subscribe to blob sidecar events + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut blob_event_receiver = event_handler.subscribe_blob_sidecar(); + + // build and process a gossip verified blob + let kzg = harness.chain.kzg.as_ref().unwrap(); + let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + let signed_sidecar = SignedBlobSidecar { + message: BlobSidecar::random_valid(&mut rng, kzg) + .map(Arc::new) + .unwrap(), + signature: Signature::empty(), + _phantom: PhantomData, + }; + let gossip_verified_blob = GossipVerifiedBlob::__assumed_valid(signed_sidecar); + let expected_sse_blobs = SseBlobSidecar::from_blob_sidecar(gossip_verified_blob.as_blob()); + + let _ = harness + .chain + .process_gossip_blob(gossip_verified_blob) + .await + .unwrap(); + + let sidecar_event = blob_event_receiver.try_recv().unwrap(); + assert_eq!(sidecar_event, EventKind::BlobSidecar(expected_sse_blobs)); +} + +/// Verifies that a blob event is emitted when blobs are received via RPC. +#[tokio::test] +async fn blob_sidecar_event_on_process_rpc_blobs() { + let spec = ForkName::Deneb.make_genesis_spec(E::default_spec()); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // subscribe to blob sidecar events + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut blob_event_receiver = event_handler.subscribe_blob_sidecar(); + + // build and process multiple rpc blobs + let kzg = harness.chain.kzg.as_ref().unwrap(); + let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + + let blob_1 = BlobSidecar::random_valid(&mut rng, kzg) + .map(Arc::new) + .unwrap(); + let blob_2 = Arc::new(BlobSidecar { + index: 1, + ..BlobSidecar::random_valid(&mut rng, kzg).unwrap() + }); + let blobs = FixedBlobSidecarList::from(vec![Some(blob_1.clone()), Some(blob_2.clone())]); + let expected_sse_blobs = vec![ + SseBlobSidecar::from_blob_sidecar(blob_1.as_ref()), + SseBlobSidecar::from_blob_sidecar(blob_2.as_ref()), + ]; + + let _ = harness + .chain + .process_rpc_blobs(blob_1.slot, blob_1.block_root, blobs) + .await + .unwrap(); + + let mut sse_blobs: Vec = vec![]; + while let Ok(sidecar_event) = blob_event_receiver.try_recv() { + if let EventKind::BlobSidecar(sse_blob_sidecar) = sidecar_event { + sse_blobs.push(sse_blob_sidecar); + } else { + panic!("`BlobSidecar` event kind expected."); + } + } + assert_eq!(sse_blobs, expected_sse_blobs); +} diff --git a/beacon_node/beacon_chain/tests/main.rs b/beacon_node/beacon_chain/tests/main.rs index c81a547406a..332f6a48298 100644 --- a/beacon_node/beacon_chain/tests/main.rs +++ b/beacon_node/beacon_chain/tests/main.rs @@ -2,6 +2,7 @@ mod attestation_production; mod attestation_verification; mod block_verification; mod capella; +mod events; mod merge; mod op_verification; mod payload_invalidation; diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index ca81ac5082b..1b83b8531e7 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4425,6 +4425,9 @@ pub fn serve( let receiver = match topic { api_types::EventTopic::Head => event_handler.subscribe_head(), api_types::EventTopic::Block => event_handler.subscribe_block(), + api_types::EventTopic::BlobSidecar => { + event_handler.subscribe_blob_sidecar() + } api_types::EventTopic::Attestation => { event_handler.subscribe_attestation() } diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index a0e63ec9e9e..cd9ccb6c5c6 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -199,9 +199,17 @@ pub async fn publish_block &msg + ); + Err(warp_utils::reject::custom_bad_request(msg)) + }; } } } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index d2d589b35ad..d6bb7421e87 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -260,13 +260,13 @@ impl NetworkBeaconProcessor { pub fn generate_rpc_blobs_process_fn( self: Arc, block_root: Hash256, - block: FixedBlobSidecarList, + blobs: FixedBlobSidecarList, seen_timestamp: Duration, process_type: BlockProcessType, ) -> AsyncFn { let process_fn = async move { self.clone() - .process_rpc_blobs(block_root, block, seen_timestamp, process_type) + .process_rpc_blobs(block_root, blobs, seen_timestamp, process_type) .await; }; Box::pin(process_fn) diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index cc790f467ff..7f3040a9b0b 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -888,6 +888,28 @@ pub struct SseBlock { pub execution_optimistic: bool, } +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SseBlobSidecar { + pub block_root: Hash256, + #[serde(with = "serde_utils::quoted_u64")] + pub index: u64, + pub slot: Slot, + pub kzg_commitment: KzgCommitment, + pub versioned_hash: VersionedHash, +} + +impl SseBlobSidecar { + pub fn from_blob_sidecar(blob_sidecar: &BlobSidecar) -> SseBlobSidecar { + SseBlobSidecar { + block_root: blob_sidecar.block_root, + index: blob_sidecar.index, + slot: blob_sidecar.slot, + kzg_commitment: blob_sidecar.kzg_commitment, + versioned_hash: blob_sidecar.kzg_commitment.calculate_versioned_hash(), + } + } +} + #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct SseFinalizedCheckpoint { pub block: Hash256, @@ -1018,6 +1040,7 @@ impl ForkVersionDeserialize for SseExtendedPayloadAttributes { pub enum EventKind { Attestation(Box>), Block(SseBlock), + BlobSidecar(SseBlobSidecar), FinalizedCheckpoint(SseFinalizedCheckpoint), Head(SseHead), VoluntaryExit(SignedVoluntaryExit), @@ -1034,6 +1057,7 @@ impl EventKind { match self { EventKind::Head(_) => "head", EventKind::Block(_) => "block", + EventKind::BlobSidecar(_) => "blob_sidecar", EventKind::Attestation(_) => "attestation", EventKind::VoluntaryExit(_) => "voluntary_exit", EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint", @@ -1071,6 +1095,9 @@ impl EventKind { "block" => Ok(EventKind::Block(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)), )?)), + "blob_sidecar" => Ok(EventKind::BlobSidecar(serde_json::from_str(data).map_err( + |e| ServerError::InvalidServerSentEvent(format!("Blob Sidecar: {:?}", e)), + )?)), "chain_reorg" => Ok(EventKind::ChainReorg(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Chain Reorg: {:?}", e)), )?)), @@ -1123,6 +1150,7 @@ pub struct EventQuery { pub enum EventTopic { Head, Block, + BlobSidecar, Attestation, VoluntaryExit, FinalizedCheckpoint, @@ -1141,6 +1169,7 @@ impl FromStr for EventTopic { match s { "head" => Ok(EventTopic::Head), "block" => Ok(EventTopic::Block), + "blob_sidecar" => Ok(EventTopic::BlobSidecar), "attestation" => Ok(EventTopic::Attestation), "voluntary_exit" => Ok(EventTopic::VoluntaryExit), "finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint), @@ -1160,6 +1189,7 @@ impl fmt::Display for EventTopic { match self { EventTopic::Head => write!(f, "head"), EventTopic::Block => write!(f, "block"), + EventTopic::BlobSidecar => write!(f, "blob_sidecar"), EventTopic::Attestation => write!(f, "attestation"), EventTopic::VoluntaryExit => write!(f, "voluntary_exit"), EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"),