From 4c5f14dbd68b55d5383af718b05becab6f36146f Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 1 Sep 2023 18:29:42 +1000 Subject: [PATCH] Start testing blob pruning --- .../src/data_availability_checker.rs | 4 +- beacon_node/beacon_chain/src/migrate.rs | 4 +- beacon_node/beacon_chain/tests/store_tests.rs | 104 ++++++++++++++-- .../test_utils/execution_block_generator.rs | 1 + beacon_node/store/src/hot_cold_store.rs | 112 ++++++++++-------- consensus/types/src/consts.rs | 2 +- 6 files changed, 158 insertions(+), 69 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 080addb3a78..08c98a54908 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -293,7 +293,7 @@ impl DataAvailabilityChecker { .map(|current_epoch| { std::cmp::max( fork_epoch, - current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), + current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), ) }) }) @@ -466,7 +466,7 @@ async fn availability_cache_maintenance_service( let cutoff_epoch = std::cmp::max( finalized_epoch + 1, std::cmp::max( - current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), + current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), deneb_fork_epoch, ), ); diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 32c13ccb040..5653aea76cd 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -671,8 +671,8 @@ impl, Cold: ItemStore> BackgroundMigrator( "did not expect block {:?} to be in the DB", block_hash ); + assert!( + !harness + .chain + .store + .blob_sidecar_exists(&block_hash.into()) + .unwrap(), + "blobs for abandoned block {block_hash:?} should have been pruned" + ); } } @@ -2812,15 +2826,6 @@ async fn schema_downgrade_to_min_version() { .expect("schema upgrade from minimum version should work"); // Recreate the harness. - /* - let slot_clock = TestingSlotClock::new( - Slot::new(0), - Duration::from_secs(harness.chain.genesis_time), - Duration::from_secs(spec.seconds_per_slot), - ); - slot_clock.set_slot(harness.get_current_slot().as_u64()); - */ - let harness = BeaconChainHarness::builder(MinimalEthSpec) .default_spec() .keypairs(KEYPAIRS[0..LOW_VALIDATOR_COUNT].to_vec()) @@ -2848,6 +2853,81 @@ async fn schema_downgrade_to_min_version() { .expect_err("should not downgrade below minimum version"); } +/// Check that blob pruning prunes blobs older than the data availability boundary. +#[tokio::test] +async fn deneb_prune_blobs_happy_case() { + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + + if store.get_chain_spec().deneb_fork_epoch.is_none() { + // No-op prior to Deneb. + return; + } + + let num_blocks_produced = E::slots_per_epoch() * 8; + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + num_blocks_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Prior to manual pruning with an artifically low data availability boundary all blobs should + // be stored. + assert_eq!(store.get_blob_info().oldest_blob_slot, None); + check_blob_existence(&harness, Slot::new(1), harness.head_slot(), true); + + // Trigger blob pruning of blobs older than epoch 2. + let data_availability_boundary = Epoch::new(2); + store + .try_prune_blobs(true, data_availability_boundary) + .unwrap(); + + // Check oldest blob slot is updated accordingly and prior blobs have been deleted. + let oldest_blob_slot = store.get_blob_info().oldest_blob_slot.unwrap(); + assert_eq!( + oldest_blob_slot, + data_availability_boundary.start_slot(E::slots_per_epoch()) + ); + check_blob_existence(&harness, Slot::new(0), oldest_blob_slot - 1, false); + check_blob_existence(&harness, oldest_blob_slot, harness.head_slot(), true); +} + +/// Check that there are blob sidecars (or not) at every slot in the range. +fn check_blob_existence( + harness: &TestHarness, + start_slot: Slot, + end_slot: Slot, + should_exist: bool, +) { + let mut blobs_seen = 0; + for (block_root, slot) in harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap() + .map(Result::unwrap) + { + if let Some(blobs) = harness.chain.store.get_blobs(&block_root).unwrap() { + assert!(should_exist, "blobs at slot {slot} exist but should not"); + blobs_seen += blobs.len(); + } else { + // FIXME(sproul): seems weird that we don't store empty blob lists + /* + assert!( + !should_exist, + "blobs at slot {slot} should exist but do not" + ); + */ + } + } + if should_exist { + assert_ne!(blobs_seen, 0, "expected non-zero number of blobs"); + } +} + /// Checks that two chains are the same, for the purpose of these tests. /// /// Several fields that are hard/impossible to check are ignored (e.g., the store). diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index 444f9353312..57e96d47c52 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -634,6 +634,7 @@ pub fn generate_random_blobs( ) -> Result<(BlobsBundle, Transactions), String> { let mut bundle = BlobsBundle::::default(); let mut transactions = vec![]; + // FIXME(sproul): this is not deterministic for CI, we should use a seed like in TestHarness for blob_index in 0..n_blobs { let random_valid_sidecar = BlobSidecar::::random_valid(&mut thread_rng(), kzg)?; diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index e8d5da38b1b..9d0fcb166e4 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -97,10 +97,7 @@ impl BlockCache { pub fn put_blobs(&mut self, block_root: Hash256, blobs: BlobSidecarList) { self.blob_cache.put(block_root, blobs); } - pub fn get_block<'a>( - &'a mut self, - block_root: &Hash256, - ) -> Option<&'a SignedBeaconBlock>> { + pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock> { self.block_cache.get(block_root) } pub fn get_blobs<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a BlobSidecarList> { @@ -588,9 +585,9 @@ impl, Cold: ItemStore> HotColdDB } /// Check if the blobs sidecar for a block exists on disk. - pub fn blobs_sidecar_exists(&self, block_root: &Hash256) -> Result { - self.get_item::>(block_root) - .map(|blobs| blobs.is_some()) + pub fn blob_sidecar_exists(&self, block_root: &Hash256) -> Result { + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + blobs_db.key_exists(DBColumn::BeaconBlob.into(), block_root.as_bytes()) } /// Determine whether a block exists in the database. @@ -985,8 +982,8 @@ impl, Cold: ItemStore> HotColdDB StoreOp::PutBlobs(_, _) => true, StoreOp::DeleteBlobs(block_root) => { match self.get_blobs(block_root) { - Ok(Some(blobs_sidecar_list)) => { - blobs_to_delete.push((*block_root, blobs_sidecar_list)); + Ok(Some(blob_sidecar_list)) => { + blobs_to_delete.push((*block_root, blob_sidecar_list)); } Err(e) => { error!( @@ -1020,6 +1017,12 @@ impl, Cold: ItemStore> HotColdDB }; // Rollback on failure if let Err(e) = tx_res { + error!( + self.log, + "Database write failed"; + "error" => ?e, + "action" => "reverting blob DB changes" + ); let mut blob_cache_ops = blob_cache_ops; for op in blob_cache_ops.iter_mut() { let reverse_op = match op { @@ -1815,7 +1818,7 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.get(state_root) } - /// Verify that a parsed config. + /// Verify that a parsed config is valid. fn verify_config(config: &StoreConfig) -> Result<(), HotColdDBError> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; Self::verify_epochs_per_blob_prune(config.epochs_per_blob_prune) @@ -2014,8 +2017,7 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - /// Try to prune blobs, approximating the current epoch from lower epoch numbers end (older - /// end) and is useful when the data availability boundary is not at hand. + /// Try to prune blobs, approximating the current epoch from the split slot. pub fn try_prune_most_blobs(&self, force: bool) -> Result<(), Error> { let deneb_fork = match self.spec.deneb_fork_epoch { Some(epoch) => epoch, @@ -2024,18 +2026,28 @@ impl, Cold: ItemStore> HotColdDB return Ok(()); } }; - // At best, current_epoch = split_epoch + 2. However, if finalization doesn't advance, the - // `split.slot` is not updated and current_epoch > split_epoch + 2. - let min_current_epoch = self.get_split_slot().epoch(E::slots_per_epoch()) + Epoch::new(2); + // The current epoch is >= split_epoch + 2. It could be greater if the database is + // configured to delay updating the split or finalization has ceased. In this instance we + // choose to also delay the pruning of blobs (we never prune without finalization anyway). + let min_current_epoch = self.get_split_slot().epoch(E::slots_per_epoch()) + 2; let min_data_availability_boundary = std::cmp::max( deneb_fork, - min_current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), + min_current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), ); self.try_prune_blobs(force, min_data_availability_boundary) } /// Try to prune blobs older than the data availability boundary. + /// + /// Blobs from the epoch `data_availability_boundary - blob_prune_margin_epochs` are retained. + /// This epoch is an _exclusive_ endpoint for the pruning process. + /// + /// This function only supports pruning blobs older than the split point, which is older than + /// (or equal to) finalization. Pruning blobs newer than finalization is not supported. + /// + /// This function also assumes that the split is stationary while it runs. It should only be + /// run from the migrator thread (where `migrate_database` runs) or the database manager. pub fn try_prune_blobs( &self, force: bool, @@ -2050,6 +2062,9 @@ impl, Cold: ItemStore> HotColdDB }; let should_prune_blobs = self.get_config().prune_blobs; + let margin_epochs = self.get_config().blob_prune_margin_epochs; + let epochs_per_blob_prune = self.get_config().epochs_per_blob_prune; + if !should_prune_blobs && !force { debug!( self.log, @@ -2064,51 +2079,44 @@ impl, Cold: ItemStore> HotColdDB .oldest_blob_slot .unwrap_or_else(|| deneb_fork.start_slot(E::slots_per_epoch())); - // The last entirely pruned epoch, blobs sidecar pruning may have stopped early in the - // middle of an epoch otherwise the oldest blob slot is a start slot. - let last_pruned_epoch = oldest_blob_slot.epoch(E::slots_per_epoch()) - 1; + // Start pruning from the epoch of the oldest blob stored (inclusive). + let start_epoch = oldest_blob_slot.epoch(E::slots_per_epoch()); - // At most prune blobs up until the data availability boundary epoch, leaving at least - // blobs of the data availability boundary epoch and younger. - let earliest_prunable_epoch = data_availability_boundary - 1; - // Stop pruning before reaching the data availability boundary if a margin is configured. - let margin_epochs = self.get_config().blob_prune_margin_epochs; - let end_epoch = earliest_prunable_epoch - margin_epochs; + // Prune blobs up until the `data_availability_boundary - margin` (inclusive) or the split + // slot's epoch, whichever is older. We can't prune blobs newer than the split. + let split = self.get_split_info(); + let end_epoch = std::cmp::min( + data_availability_boundary - margin_epochs - 1, + split.slot.epoch(E::slots_per_epoch()) - 1, + ); + let end_slot = end_epoch.end_slot(E::slots_per_epoch()); - if !force - && last_pruned_epoch.as_u64() + self.get_config().epochs_per_blob_prune - > end_epoch.as_u64() - { - debug!(self.log, "Blobs sidecars are pruned"); + if !force && start_epoch + epochs_per_blob_prune >= end_epoch { + debug!( + self.log, + "Blobs are pruned"; + "oldest_blob_slot" => oldest_blob_slot, + "data_availability_boundary" => data_availability_boundary, + ); return Ok(()); } // Iterate block roots forwards from the oldest blob slot. debug!( self.log, - "Pruning blobs sidecars stored longer than data availability boundary"; + "Pruning blobs"; + "start_epoch" => start_epoch, + "end_epoch" => end_epoch, + "data_availability_boundary" => data_availability_boundary, ); - // todo(emhane): If we notice degraded I/O for users switching modes (prune_blobs=true to - // prune_blobs=false) we could add a warning that only fires on a threshold, e.g. more - // than 2x epochs_per_blob_prune epochs without a prune. let mut ops = vec![]; let mut last_pruned_block_root = None; - let end_slot = end_epoch.end_slot(E::slots_per_epoch()); for res in self.forwards_block_roots_iterator_until( oldest_blob_slot, end_slot, || { - // todo(emhane): In the future, if the data availability boundary is more recent - // than the split (finalized) epoch, this code will have to change to decide what - // to do with pruned blobs in our not-yet-finalized canonical chain and - // not-yet-orphaned forks (see DBColumn::BeaconBlobOrphan). - // - // Related to review and the spec PRs linked in it: - // https://github.com/sigp/lighthouse/pull/3852#pullrequestreview-1244785136 - let split = self.get_split_info(); - let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or( HotColdDBError::MissingSplitState(split.state_root, split.slot), )?; @@ -2123,7 +2131,7 @@ impl, Cold: ItemStore> HotColdDB Err(e) => { warn!( self.log, - "Stopping blobs sidecar pruning early"; + "Stopping blob pruning early"; "error" => ?e, ); break; @@ -2131,11 +2139,11 @@ impl, Cold: ItemStore> HotColdDB }; if Some(block_root) != last_pruned_block_root - && self.blobs_sidecar_exists(&block_root)? + && self.blob_sidecar_exists(&block_root)? { debug!( self.log, - "Pruning blobs sidecar"; + "Pruning blob sidecar"; "slot" => slot, "block_root" => ?block_root, ); @@ -2144,15 +2152,15 @@ impl, Cold: ItemStore> HotColdDB } if slot >= end_slot { - info!( + debug!( self.log, - "Blobs sidecar pruning reached earliest available blobs sidecar"; + "Blob pruning reached earliest available blobs sidecar"; "slot" => slot ); break; } } - let blobs_sidecars_pruned = ops.len(); + let blob_sidecars_pruned = ops.len(); let new_blob_info = BlobInfo { oldest_blob_slot: Some(end_slot + 1), blobs_db: blob_info.blobs_db, @@ -2163,8 +2171,8 @@ impl, Cold: ItemStore> HotColdDB self.do_atomically_with_block_and_blobs_cache(ops)?; info!( self.log, - "Blobs sidecar pruning complete"; - "blobs_sidecars_pruned" => blobs_sidecars_pruned, + "Blob pruning complete"; + "blob_sidecars_pruned" => blob_sidecars_pruned, ); Ok(()) diff --git a/consensus/types/src/consts.rs b/consensus/types/src/consts.rs index 7fa03dc5f85..a515217e33d 100644 --- a/consensus/types/src/consts.rs +++ b/consensus/types/src/consts.rs @@ -32,9 +32,9 @@ pub mod deneb { "52435875175126190479447740508185965837690552500527637822603658699938581184513" ) .expect("should initialize BLS_MODULUS"); - pub static ref MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: Epoch = Epoch::from(4096_u64); } pub const VERSIONED_HASH_VERSION_KZG: u8 = 1; pub const BLOB_SIDECAR_SUBNET_COUNT: u64 = 6; pub const MAX_BLOBS_PER_BLOCK: u64 = BLOB_SIDECAR_SUBNET_COUNT; + pub const MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: Epoch = Epoch::new(4096); }