diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 1c8bc9be85b..3ac8ae37402 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -520,7 +520,7 @@ pub fn verify_kzg_for_blob( pub fn verify_kzg_for_blob_list( blob_list: &BlobSidecarList, kzg: &Kzg, -) -> Result<(), AvailabilityCheckError> { +) -> Result>, AvailabilityCheckError> { let _timer = crate::metrics::start_timer(&crate::metrics::KZG_VERIFICATION_BATCH_TIMES); let (blobs, (commitments, proofs)): (Vec<_>, (Vec<_>, Vec<_>)) = blob_list .iter() @@ -534,7 +534,12 @@ pub fn verify_kzg_for_blob_list( ) .map_err(AvailabilityCheckError::Kzg)? { - Ok(()) + Ok(blob_list + .into_iter() + .map(|blob_sidecar| KzgVerifiedBlob { + blob: blob_sidecar.clone(), + }) + .collect()) } else { Err(AvailabilityCheckError::KzgVerificationFailed) } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index e1024da46c9..47eaa6c0e38 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -197,16 +197,18 @@ impl DataAvailabilityChecker { block_root: Hash256, blobs: FixedBlobSidecarList, ) -> Result, AvailabilityCheckError> { - let mut verified_blobs = vec![]; if let Some(kzg) = self.kzg.as_ref() { - for blob in blobs.iter().flatten() { - verified_blobs.push(verify_kzg_for_blob(blob.clone(), kzg)?) - } + let blob_list: BlobSidecarList = blobs + .into_iter() + .flat_map(|blob| blob.clone()) + .collect::>() + .into(); + let verified_blob_list = verify_kzg_for_blob_list(&blob_list, kzg)?; + self.availability_cache + .put_kzg_verified_blobs(block_root, verified_blob_list) } else { return Err(AvailabilityCheckError::KzgNotInitialized); - }; - self.availability_cache - .put_kzg_verified_blobs(block_root, verified_blobs) + } } /// This first validates the KZG commitments included in the blob sidecar. diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 05c678b250f..a807655189a 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4705,6 +4705,14 @@ fn publish_pubsub_message( ) } +/// Publish a message to the libp2p pubsub network. +fn publish_pubsub_messages( + network_tx: &UnboundedSender>, + messages: Vec>, +) -> Result<(), warp::Rejection> { + publish_network_message(network_tx, NetworkMessage::Publish { messages }) +} + /// Publish a message to the libp2p network. fn publish_network_message( network_tx: &UnboundedSender>, diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index e68691ce8b9..e41cf51ec3b 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -85,19 +85,17 @@ pub async fn publish_block { - crate::publish_pubsub_message(&sender, PubsubMessage::BeaconBlock(block.clone())) - .map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?; + let mut pubsub_messages = vec![PubsubMessage::BeaconBlock(block.clone())]; if let Some(signed_blobs) = blobs_opt { for (blob_index, blob) in signed_blobs.into_iter().enumerate() { - crate::publish_pubsub_message( - &sender, - PubsubMessage::BlobSidecar(Box::new((blob_index as u64, blob))), - ) - .map_err(|_| { - BlockError::BeaconChainError(BeaconChainError::UnableToPublish) - })?; + pubsub_messages.push(PubsubMessage::BlobSidecar(Box::new(( + blob_index as u64, + blob, + )))); } } + crate::publish_pubsub_messages(&sender, pubsub_messages) + .map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?; } }; Ok(()) diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index ee0d5e62346..04f761ad238 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -295,13 +295,22 @@ lazy_static! { */ pub static ref BEACON_BLOB_GOSSIP_PROPAGATION_VERIFICATION_DELAY_TIME: Result = try_create_histogram_with_buckets( "beacon_blob_gossip_propagation_verification_delay_time", - "Duration between when the blob is received and when it is verified for propagation.", + "Duration between when the blob is received over gossip and when it is verified for propagation.", // [0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5] decimal_buckets(-3,-1) ); pub static ref BEACON_BLOB_GOSSIP_SLOT_START_DELAY_TIME: Result = try_create_histogram_with_buckets( "beacon_blob_gossip_slot_start_delay_time", - "Duration between when the blob is received and the start of the slot it belongs to.", + "Duration between when the blob is received over gossip and the start of the slot it belongs to.", + // Create a custom bucket list for greater granularity in block delay + Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0]) + // NOTE: Previous values, which we may want to switch back to. + // [0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50] + //decimal_buckets(-1,2) + ); + pub static ref BEACON_BLOB_RPC_SLOT_START_DELAY_TIME: Result = try_create_histogram_with_buckets( + "beacon_blob_rpc_slot_start_delay_time", + "Duration between when a blob is received over rpc and the start of the slot it belongs to.", // Create a custom bucket list for greater granularity in block delay Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0]) // NOTE: Previous values, which we may want to switch back to. diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index d6bb7421e87..217ad0e03c8 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -9,7 +9,8 @@ use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::MaybeAvailableBlock; use beacon_chain::{ - observed_block_producers::Error as ObserveError, validator_monitor::get_block_delay_ms, + observed_block_producers::Error as ObserveError, + validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, }; @@ -277,7 +278,7 @@ impl NetworkBeaconProcessor { self: Arc>, block_root: Hash256, blobs: FixedBlobSidecarList, - _seen_timestamp: Duration, + seen_timestamp: Duration, process_type: BlockProcessType, ) { let Some(slot) = blobs @@ -287,8 +288,58 @@ impl NetworkBeaconProcessor { return; }; + let indices: Vec<_> = blobs + .iter() + .filter_map(|blob_opt| blob_opt.as_ref().map(|blob| blob.index)) + .collect(); + + debug!( + self.log, + "RPC blobs received"; + "indices" => ?indices, + "block_root" => %block_root, + "slot" => %slot, + ); + + // Note: this metric is useful to gauge how long it takes to receive blobs requested + // over rpc. Since we always send the request for block components at `slot_clock.single_lookup_delay()` + // we can use that as a baseline to measure against. + let delay = get_slot_delay_ms(seen_timestamp, slot, &self.chain.slot_clock) + .saturating_sub(self.chain.slot_clock.single_lookup_delay()); + + metrics::observe_duration(&metrics::BEACON_BLOB_RPC_SLOT_START_DELAY_TIME, delay); + let result = self.chain.process_rpc_blobs(slot, block_root, blobs).await; + match &result { + Ok(AvailabilityProcessingStatus::Imported(hash)) => { + debug!( + self.log, + "Block components retrieved"; + "result" => "imported block and blobs", + "slot" => %slot, + "block_hash" => %hash, + ); + } + Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { + warn!( + self.log, + "Missing components over rpc"; + "block_hash" => %block_root, + "slot" => %slot, + ); + } + Err(e) => { + warn!( + self.log, + "Error when importing rpc blobs"; + "error" => ?e, + "block_hash" => %block_root, + "slot" => %slot, + ); + } + } + // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type,