Skip to content

Commit

Permalink
Add blob_sidecar event to SSE (#4790)
Browse files Browse the repository at this point in the history
* Add `blob_sidecar` event to SSE.

* Return 202 if a block is published but failed blob validation when validation level is `Gossip`.

* Move `BlobSidecar` event to `process_gossip_blob` and add test.

* Emit `BlobSidecar` event when blobs are received over rpc.

* Improve test assertions on `SseBlobSidecar`s.

* Add quotes to blob index serialization in `SseBlobSidecar`

Co-authored-by: realbigsean <[email protected]>

---------

Co-authored-by: realbigsean <[email protected]>
  • Loading branch information
jimmygchen and realbigsean authored Oct 12, 2023
1 parent 4555e33 commit 38e7172
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 7 deletions.
20 changes: 19 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use crate::{
kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore,
BeaconSnapshot, CachedHead,
};
use eth2::types::{EventKind, SseBlock, SseExtendedPayloadAttributes, SyncDuty};
use eth2::types::{EventKind, SseBlobSidecar, SseBlock, SseExtendedPayloadAttributes, SyncDuty};
use execution_layer::{
BlockProposalContents, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition,
PayloadAttributes, PayloadStatus,
Expand Down Expand Up @@ -2809,6 +2809,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlockIsAlreadyKnown);
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
event_handler.register(EventKind::BlobSidecar(SseBlobSidecar::from_blob_sidecar(
blob.as_blob(),
)));
}
}

self.data_availability_checker
.notify_gossip_blob(blob.as_blob().slot, block_root, &blob);
let r = self.check_gossip_blob_availability_and_import(blob).await;
Expand All @@ -2833,6 +2841,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlockIsAlreadyKnown);
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) {
event_handler.register(EventKind::BlobSidecar(
SseBlobSidecar::from_blob_sidecar(blob),
));
}
}
}

self.data_availability_checker
.notify_rpc_blobs(slot, block_root, &blobs);
let r = self
Expand Down
8 changes: 7 additions & 1 deletion beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::block_verification::cheap_state_advance_to_obtain_committees;
use crate::data_availability_checker::AvailabilityCheckError;
use crate::kzg_utils::{validate_blob, validate_blobs};
use crate::{metrics, BeaconChainError};
use kzg::Kzg;
use kzg::{Kzg, KzgCommitment};
use slog::{debug, warn};
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
Expand Down Expand Up @@ -182,6 +182,12 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
pub fn slot(&self) -> Slot {
self.blob.message.slot
}
pub fn index(&self) -> u64 {
self.blob.message.index
}
pub fn kzg_commitment(&self) -> KzgCommitment {
self.blob.message.kzg_commitment
}
pub fn proposer_index(&self) -> u64 {
self.blob.message.proposer_index
}
Expand Down
15 changes: 15 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const DEFAULT_CHANNEL_CAPACITY: usize = 16;
pub struct ServerSentEventHandler<T: EthSpec> {
attestation_tx: Sender<EventKind<T>>,
block_tx: Sender<EventKind<T>>,
blob_sidecar_tx: Sender<EventKind<T>>,
finalized_tx: Sender<EventKind<T>>,
head_tx: Sender<EventKind<T>>,
exit_tx: Sender<EventKind<T>>,
Expand All @@ -31,6 +32,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {
let (attestation_tx, _) = broadcast::channel(capacity);
let (block_tx, _) = broadcast::channel(capacity);
let (blob_sidecar_tx, _) = broadcast::channel(capacity);
let (finalized_tx, _) = broadcast::channel(capacity);
let (head_tx, _) = broadcast::channel(capacity);
let (exit_tx, _) = broadcast::channel(capacity);
Expand All @@ -43,6 +45,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
Self {
attestation_tx,
block_tx,
blob_sidecar_tx,
finalized_tx,
head_tx,
exit_tx,
Expand Down Expand Up @@ -73,6 +76,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
.block_tx
.send(kind)
.map(|count| log_count("block", count)),
EventKind::BlobSidecar(_) => self
.blob_sidecar_tx
.send(kind)
.map(|count| log_count("blob sidecar", count)),
EventKind::FinalizedCheckpoint(_) => self
.finalized_tx
.send(kind)
Expand Down Expand Up @@ -119,6 +126,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.block_tx.subscribe()
}

pub fn subscribe_blob_sidecar(&self) -> Receiver<EventKind<T>> {
self.blob_sidecar_tx.subscribe()
}

pub fn subscribe_finalized(&self) -> Receiver<EventKind<T>> {
self.finalized_tx.subscribe()
}
Expand Down Expand Up @@ -159,6 +170,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.block_tx.receiver_count() > 0
}

pub fn has_blob_sidecar_subscribers(&self) -> bool {
self.blob_sidecar_tx.receiver_count() > 0
}

pub fn has_finalized_subscribers(&self) -> bool {
self.finalized_tx.receiver_count() > 0
}
Expand Down
99 changes: 99 additions & 0 deletions beacon_node/beacon_chain/tests/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use beacon_chain::blob_verification::GossipVerifiedBlob;
use beacon_chain::test_utils::BeaconChainHarness;
use bls::Signature;
use eth2::types::{EventKind, SseBlobSidecar};
use rand::rngs::StdRng;
use rand::SeedableRng;
use std::marker::PhantomData;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, EthSpec, ForkName, MinimalEthSpec, SignedBlobSidecar};

type E = MinimalEthSpec;

/// Verifies that a blob event is emitted when a gossip verified blob is received via gossip or the publish block API.
#[tokio::test]
async fn blob_sidecar_event_on_process_gossip_blob() {
let spec = ForkName::Deneb.make_genesis_spec(E::default_spec());
let harness = BeaconChainHarness::builder(E::default())
.spec(spec)
.deterministic_keypairs(8)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();

// subscribe to blob sidecar events
let event_handler = harness.chain.event_handler.as_ref().unwrap();
let mut blob_event_receiver = event_handler.subscribe_blob_sidecar();

// build and process a gossip verified blob
let kzg = harness.chain.kzg.as_ref().unwrap();
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
let signed_sidecar = SignedBlobSidecar {
message: BlobSidecar::random_valid(&mut rng, kzg)
.map(Arc::new)
.unwrap(),
signature: Signature::empty(),
_phantom: PhantomData,
};
let gossip_verified_blob = GossipVerifiedBlob::__assumed_valid(signed_sidecar);
let expected_sse_blobs = SseBlobSidecar::from_blob_sidecar(gossip_verified_blob.as_blob());

let _ = harness
.chain
.process_gossip_blob(gossip_verified_blob)
.await
.unwrap();

let sidecar_event = blob_event_receiver.try_recv().unwrap();
assert_eq!(sidecar_event, EventKind::BlobSidecar(expected_sse_blobs));
}

/// Verifies that a blob event is emitted when blobs are received via RPC.
#[tokio::test]
async fn blob_sidecar_event_on_process_rpc_blobs() {
let spec = ForkName::Deneb.make_genesis_spec(E::default_spec());
let harness = BeaconChainHarness::builder(E::default())
.spec(spec)
.deterministic_keypairs(8)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();

// subscribe to blob sidecar events
let event_handler = harness.chain.event_handler.as_ref().unwrap();
let mut blob_event_receiver = event_handler.subscribe_blob_sidecar();

// build and process multiple rpc blobs
let kzg = harness.chain.kzg.as_ref().unwrap();
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);

let blob_1 = BlobSidecar::random_valid(&mut rng, kzg)
.map(Arc::new)
.unwrap();
let blob_2 = Arc::new(BlobSidecar {
index: 1,
..BlobSidecar::random_valid(&mut rng, kzg).unwrap()
});
let blobs = FixedBlobSidecarList::from(vec![Some(blob_1.clone()), Some(blob_2.clone())]);
let expected_sse_blobs = vec![
SseBlobSidecar::from_blob_sidecar(blob_1.as_ref()),
SseBlobSidecar::from_blob_sidecar(blob_2.as_ref()),
];

let _ = harness
.chain
.process_rpc_blobs(blob_1.slot, blob_1.block_root, blobs)
.await
.unwrap();

let mut sse_blobs: Vec<SseBlobSidecar> = vec![];
while let Ok(sidecar_event) = blob_event_receiver.try_recv() {
if let EventKind::BlobSidecar(sse_blob_sidecar) = sidecar_event {
sse_blobs.push(sse_blob_sidecar);
} else {
panic!("`BlobSidecar` event kind expected.");
}
}
assert_eq!(sse_blobs, expected_sse_blobs);
}
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod attestation_production;
mod attestation_verification;
mod block_verification;
mod capella;
mod events;
mod merge;
mod op_verification;
mod payload_invalidation;
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4425,6 +4425,9 @@ pub fn serve<T: BeaconChainTypes>(
let receiver = match topic {
api_types::EventTopic::Head => event_handler.subscribe_head(),
api_types::EventTopic::Block => event_handler.subscribe_block(),
api_types::EventTopic::BlobSidecar => {
event_handler.subscribe_blob_sidecar()
}
api_types::EventTopic::Attestation => {
event_handler.subscribe_attestation()
}
Expand Down
14 changes: 11 additions & 3 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,17 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
if let Some(gossip_verified_blobs) = gossip_verified_blobs {
for blob in gossip_verified_blobs {
if let Err(e) = chain.process_gossip_blob(blob).await {
return Err(warp_utils::reject::custom_bad_request(format!(
"Invalid blob: {e}"
)));
let msg = format!("Invalid blob: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
error!(
log,
"Invalid blob provided to HTTP API";
"reason" => &msg
);
Err(warp_utils::reject::custom_bad_request(msg))
};
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn generate_rpc_blobs_process_fn(
self: Arc<Self>,
block_root: Hash256,
block: FixedBlobSidecarList<T::EthSpec>,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> AsyncFn {
let process_fn = async move {
self.clone()
.process_rpc_blobs(block_root, block, seen_timestamp, process_type)
.process_rpc_blobs(block_root, blobs, seen_timestamp, process_type)
.await;
};
Box::pin(process_fn)
Expand Down
30 changes: 30 additions & 0 deletions common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,28 @@ pub struct SseBlock {
pub execution_optimistic: bool,
}

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseBlobSidecar {
pub block_root: Hash256,
#[serde(with = "serde_utils::quoted_u64")]
pub index: u64,
pub slot: Slot,
pub kzg_commitment: KzgCommitment,
pub versioned_hash: VersionedHash,
}

impl SseBlobSidecar {
pub fn from_blob_sidecar<E: EthSpec>(blob_sidecar: &BlobSidecar<E>) -> SseBlobSidecar {
SseBlobSidecar {
block_root: blob_sidecar.block_root,
index: blob_sidecar.index,
slot: blob_sidecar.slot,
kzg_commitment: blob_sidecar.kzg_commitment,
versioned_hash: blob_sidecar.kzg_commitment.calculate_versioned_hash(),
}
}
}

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseFinalizedCheckpoint {
pub block: Hash256,
Expand Down Expand Up @@ -1018,6 +1040,7 @@ impl ForkVersionDeserialize for SseExtendedPayloadAttributes {
pub enum EventKind<T: EthSpec> {
Attestation(Box<Attestation<T>>),
Block(SseBlock),
BlobSidecar(SseBlobSidecar),
FinalizedCheckpoint(SseFinalizedCheckpoint),
Head(SseHead),
VoluntaryExit(SignedVoluntaryExit),
Expand All @@ -1034,6 +1057,7 @@ impl<T: EthSpec> EventKind<T> {
match self {
EventKind::Head(_) => "head",
EventKind::Block(_) => "block",
EventKind::BlobSidecar(_) => "blob_sidecar",
EventKind::Attestation(_) => "attestation",
EventKind::VoluntaryExit(_) => "voluntary_exit",
EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint",
Expand Down Expand Up @@ -1071,6 +1095,9 @@ impl<T: EthSpec> EventKind<T> {
"block" => Ok(EventKind::Block(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)),
)?)),
"blob_sidecar" => Ok(EventKind::BlobSidecar(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Blob Sidecar: {:?}", e)),
)?)),
"chain_reorg" => Ok(EventKind::ChainReorg(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Chain Reorg: {:?}", e)),
)?)),
Expand Down Expand Up @@ -1123,6 +1150,7 @@ pub struct EventQuery {
pub enum EventTopic {
Head,
Block,
BlobSidecar,
Attestation,
VoluntaryExit,
FinalizedCheckpoint,
Expand All @@ -1141,6 +1169,7 @@ impl FromStr for EventTopic {
match s {
"head" => Ok(EventTopic::Head),
"block" => Ok(EventTopic::Block),
"blob_sidecar" => Ok(EventTopic::BlobSidecar),
"attestation" => Ok(EventTopic::Attestation),
"voluntary_exit" => Ok(EventTopic::VoluntaryExit),
"finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint),
Expand All @@ -1160,6 +1189,7 @@ impl fmt::Display for EventTopic {
match self {
EventTopic::Head => write!(f, "head"),
EventTopic::Block => write!(f, "block"),
EventTopic::BlobSidecar => write!(f, "blob_sidecar"),
EventTopic::Attestation => write!(f, "attestation"),
EventTopic::VoluntaryExit => write!(f, "voluntary_exit"),
EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"),
Expand Down

0 comments on commit 38e7172

Please sign in to comment.