Skip to content

Commit

Permalink
add processing and processed caching to the DA checker (#4732)
Browse files Browse the repository at this point in the history
* add processing and processed caching to the DA checker

* move processing cache out of critical cache

* get it compiling

* fix lints

* add docs to `AvailabilityView`

* some self review

* fix lints

* fix beacon chain tests

* cargo fmt

* make availability view easier to implement, start on testing

* move child component cache and finish test

* cargo fix

* cargo fix

* cargo fix

* fmt and lint

* make blob commitments not optional, rename some caches, add missing blobs struct

* Update beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs

Co-authored-by: ethDreamer <[email protected]>

* marks review feedback and other general cleanup

* cargo fix

* improve availability view docs

* some renames

* some renames and docs

* fix should delay lookup logic

* get rid of some wrapper methods

* fix up single lookup changes

* add a couple docs

* add single blob merge method and improve process_... docs

* update some names

* lints

* fix merge

* remove blob indices from lookup creation log

* remove blob indices from lookup creation log

* delayed lookup logging improvement

* check fork choice before doing any blob processing

* remove unused dep

* Update beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs

Co-authored-by: Michael Sproul <[email protected]>

* Update beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs

Co-authored-by: Michael Sproul <[email protected]>

* Update beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs

Co-authored-by: Michael Sproul <[email protected]>

* Update beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs

Co-authored-by: Michael Sproul <[email protected]>

* Update beacon_node/network/src/sync/block_lookups/delayed_lookup.rs

Co-authored-by: Michael Sproul <[email protected]>

* remove duplicate deps

* use gen range in random blobs geneartor

* rename processing cache fields

* require block root in rpc block construction and check block root consistency

* send peers as vec in single message

* spawn delayed lookup service from network beacon processor

* fix tests

---------

Co-authored-by: ethDreamer <[email protected]>
Co-authored-by: Michael Sproul <[email protected]>
  • Loading branch information
3 people authored Oct 3, 2023
1 parent 67aeb6b commit c7ddf1f
Show file tree
Hide file tree
Showing 38 changed files with 1,893 additions and 1,189 deletions.
68 changes: 33 additions & 35 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 97 additions & 11 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::blob_verification::{self, GossipBlobError, GossipVerifiedBlob};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::POS_PANDA_BANNER;
use crate::block_verification::{
check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy, get_block_root,
check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy,
signature_verify_chain_segment, BlockError, ExecutionPendingBlock, GossipVerifiedBlock,
IntoExecutionPendingBlock,
};
Expand Down Expand Up @@ -477,8 +477,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
/// The slot at which blocks are downloaded back to.
pub genesis_backfill_slot: Slot,
// Provides a KZG verification and temporary storage for blocks and blobs as
// they are collected and combined.
/// Provides a KZG verification and temporary storage for blocks and blobs as
/// they are collected and combined.
pub data_availability_checker: Arc<DataAvailabilityChecker<T>>,
/// The KZG trusted setup used by this chain.
pub kzg: Option<Arc<Kzg<<T::EthSpec as EthSpec>::Kzg>>>,
Expand Down Expand Up @@ -2552,7 +2552,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
});
}

let block_root = get_block_root(block.as_block());
let block_root = block.block_root();

if let Some((child_parent_root, child_slot)) = children.get(i) {
// If this block has a child in this chain segment, ensure that its parent root matches
Expand Down Expand Up @@ -2791,11 +2791,97 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(BeaconChainError::TokioJoin)?
}

pub async fn process_blob(
/// Cache the blob in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_gossip_blob(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
self.check_gossip_blob_availability_and_import(blob).await
let block_root = blob.block_root();

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown);
}

self.data_availability_checker
.notify_gossip_blob(blob.as_blob().slot, block_root, &blob);
let r = self.check_gossip_blob_availability_and_import(blob).await;
self.remove_notified(&block_root, r)
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_rpc_blobs(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown);
}

self.data_availability_checker
.notify_rpc_blobs(slot, block_root, &blobs);
let r = self
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await;
self.remove_notified(&block_root, r)
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
&self,
block_root: &Hash256,
r: Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
self.data_availability_checker.remove_notified(block_root);
}
r
}

/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
/// and evict if the block was imported or erred.
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
self: &Arc<Self>,
block_root: Hash256,
unverified_block: B,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Ok(commitments) = unverified_block
.block()
.message()
.body()
.blob_kzg_commitments()
{
self.data_availability_checker.notify_block_commitments(
unverified_block.block().slot(),
block_root,
commitments.clone(),
);
};
let r = self
.process_block(block_root, unverified_block, notify_execution_layer, || {
Ok(())
})
.await;
self.remove_notified(&block_root, r)
}

/// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and
Expand Down Expand Up @@ -2961,7 +3047,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Checks if the block is available, and imports immediately if so, otherwise caches the block
/// in the data availability checker.
pub async fn check_block_availability_and_import(
async fn check_block_availability_and_import(
self: &Arc<Self>,
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
Expand All @@ -2974,7 +3060,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Checks if the provided blob can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
pub async fn check_gossip_blob_availability_and_import(
async fn check_gossip_blob_availability_and_import(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
Expand All @@ -2986,7 +3072,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
pub async fn check_rpc_blob_availability_and_import(
async fn check_rpc_blob_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
Expand Down Expand Up @@ -3238,7 +3324,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (signed_block, blobs) = signed_block.deconstruct();
let (_, signed_block, blobs) = signed_block.deconstruct();
let block = signed_block.message();
ops.extend(
confirmed_state_roots
Expand Down Expand Up @@ -5250,7 +5336,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(());
}

// Fetch payoad attributes from the execution layer's cache, or compute them from scratch
// Fetch payload attributes from the execution layer's cache, or compute them from scratch
// if no matching entry is found. This saves recomputing the withdrawals which can take
// considerable time to compute if a state load is required.
let head_root = forkchoice_update_params.head_root;
Expand Down
Loading

0 comments on commit c7ddf1f

Please sign in to comment.