diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 5842120c6cc..eceae5db36c 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -44,6 +44,7 @@ use types::{ Epoch, EthSpec, ProposerSlashing, SignedBeaconBlock, SignedBlsToExecutionChange, SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution, Validator, }; +use rayon::prelude::*; type SyncContributions = RwLock>>>; @@ -230,112 +231,120 @@ impl OperationPool { mut validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send, num_valid: &mut i64, spec: &'a ChainSpec, - ) -> Vec<(&CompactAttestationData, CompactIndexedAttestation)> { - let mut cliqued_atts: Vec<(&CompactAttestationData, CompactIndexedAttestation)> = vec![]; - + ) -> Vec<(&CompactAttestationData, Vec>)> { if let Some(AttestationDataMap { - aggregate_attestations, - unaggregate_attestations, - }) = all_attestations.get_attestation_map(checkpoint_key) + aggregate_attestations, + unaggregate_attestations, + }) = all_attestations.get_attestation_map(checkpoint_key) { - for (data, aggregates) in aggregate_attestations { - if data.slot + spec.min_attestation_inclusion_delay <= state.slot() + let mut cliques_from_aggregates: Vec<_> = aggregate_attestations.into_iter().filter(|(data, _)| { + data.slot + spec.min_attestation_inclusion_delay <= state.slot() && state.slot() <= data.slot + T::slots_per_epoch() - { - let aggregates: Vec<&CompactIndexedAttestation> = aggregates - .iter() - .filter(|indexed| { - validity_filter(&AttestationRef { - checkpoint: checkpoint_key, - data: &data, - indexed, - }) - }) - .collect(); - *num_valid += aggregates.len() as i64; - - // aggregate each cliques corresponding attestaiions - let mut clique_aggregates: Vec> = bron_kerbosch(&aggregates, is_compatible) - .iter() - .map(|clique| { - clique.iter().skip(1).fold(aggregates[clique[0]].clone(), |mut acc, &ind| { - acc.aggregate(&aggregates[ind]); - acc - }) + }) + .map(|(data, aggregates)| { + let aggregates: Vec<&CompactIndexedAttestation> = aggregates + .iter() + .filter(|indexed| { + validity_filter(&AttestationRef { + checkpoint: checkpoint_key, + data: &data, + indexed, }) - .collect(); - let mut indices_to_remove = Vec::new(); - clique_aggregates - .sort_unstable_by(|a, b| a.attesting_indices.len().cmp(&b.attesting_indices.len())); - for (index, clique) in clique_aggregates.iter().enumerate() { - for bigger_clique in clique_aggregates.iter().skip(index + 1) { - if clique - .aggregation_bits + }) + .collect(); + *num_valid += aggregates.len() as i64; + (data, aggregates) + }) + .collect::>)>>() + .into_par_iter() + .map(|(data, aggregates)| { + // aggregate each cliques corresponding attestaiions + let mut clique_aggregates: Vec> = bron_kerbosch(&aggregates, is_compatible) + .iter() + .map(|clique| { + clique.iter().skip(1).fold(aggregates[clique[0]].clone(), |mut acc, &ind| { + acc.aggregate(&aggregates[ind]); + acc + }) + }) + .collect(); + let mut indices_to_remove = Vec::new(); + clique_aggregates + .sort_unstable_by(|a, b| a.attesting_indices.len().cmp(&b.attesting_indices.len())); + for (index, clique) in clique_aggregates.iter().enumerate() { + for bigger_clique in clique_aggregates.iter().skip(index + 1) { + if clique + .aggregation_bits .is_subset(&bigger_clique.aggregation_bits) - { - indices_to_remove.push(index); - break; - } - } - } - - for index in indices_to_remove.iter().rev() { - clique_aggregates.swap_remove(*index); + { + indices_to_remove.push(index); + break; + } } + } - // aggregate unaggregate attestations into the clique aggregates - // if compatible - if let Some(unaggregate_attestations) = unaggregate_attestations.get(&data) { - for attestation in unaggregate_attestations.iter().filter(|indexed| { - validity_filter(&AttestationRef { - checkpoint: checkpoint_key, - data: &data, - indexed, - }) - }) { - *num_valid += 1; - for clique_aggregate in &mut clique_aggregates { - if !clique_aggregate - .attesting_indices + for index in indices_to_remove.iter().rev() { + clique_aggregates.swap_remove(*index); + } + (data, clique_aggregates) + }) + .collect::>)>>() + .into_iter() + .map(|(data, mut clique_aggregates)| { + // aggregate unaggregate attestations into the clique aggregates + // if compatible + if let Some(unaggregate_attestations) = unaggregate_attestations.get(&data) { + for attestation in unaggregate_attestations.iter().filter(|indexed| { + validity_filter(&AttestationRef { + checkpoint: checkpoint_key, + data: &data, + indexed, + }) + }) { + *num_valid += 1; + for clique_aggregate in &mut clique_aggregates { + if !clique_aggregate + .attesting_indices .contains(&attestation.attesting_indices[0]) - { - clique_aggregate.aggregate(attestation); - } - } + { + clique_aggregate.aggregate(attestation); + } } } - - cliqued_atts - .extend(clique_aggregates.into_iter().map(|indexed| (data, indexed))); } - } + (data, clique_aggregates) + }) + .collect(); + // include aggregated attestations from unaggregated attestations whose // attestation data doesn't appear in aggregated_attestations - for (data, attestations) in unaggregate_attestations { - if data.slot + spec.min_attestation_inclusion_delay <= state.slot() - && state.slot() <= data.slot + T::slots_per_epoch() - { - if !aggregate_attestations.contains_key(&data) { - let mut valid_attestations = attestations.iter().filter(|indexed| { - validity_filter(&AttestationRef { - checkpoint: checkpoint_key, - data: &data, - indexed, - }) + unaggregate_attestations + .iter() + .filter(|(data, _)| { + data.slot + spec.min_attestation_inclusion_delay <= state.slot() + && state.slot() <= data.slot + T::slots_per_epoch() + && !aggregate_attestations.contains_key(&data) + }) + .for_each(|(data, unaggregates)| { + let mut valid_attestations = unaggregates.iter().filter(|indexed| { + validity_filter(&AttestationRef { + checkpoint: checkpoint_key, + data: &data, + indexed, + }) + }); + if let Some(att) = valid_attestations.next() { + let mut att = att.clone(); + valid_attestations.for_each(|valid_att| { + att.aggregate(valid_att) }); - - if let Some(first) = valid_attestations.next() { - let mut agg = first.clone(); - for attestation in valid_attestations { - agg.aggregate(&attestation); - } - cliqued_atts.push((data, agg)); - } + cliques_from_aggregates.push((data, vec![att])) } - } - } + }); + cliques_from_aggregates + } else { + vec![] } - cliqued_atts } /// Get a list of attestations for inclusion in a block. @@ -386,14 +395,17 @@ impl OperationPool { let prev_epoch_cliqued_atts = prev_epoch_cliqued_atts .iter() - .map(|(data, indexed)| AttestationRef { - checkpoint: &prev_epoch_key, - data, - indexed, + .map(|(data, atts)| { + atts.iter().map(|indexed| AttestationRef { + checkpoint: &prev_epoch_key, + data, + indexed, + }) + .filter_map(|att| { + AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) + }) }) - .filter_map(|att| { - AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) - }); + .flat_map(|att_max_cover| att_max_cover); let curr_epoch_cliqued_atts = self.get_clique_aggregate_attestations_for_epoch( &curr_epoch_key, @@ -406,14 +418,17 @@ impl OperationPool { let curr_epoch_cliqued_atts = curr_epoch_cliqued_atts .iter() - .map(|(data, indexed)| AttestationRef { - checkpoint: &curr_epoch_key, - data, - indexed, + .map(|(data, atts)| { + atts.iter().map(|indexed| AttestationRef { + checkpoint: &curr_epoch_key, + data, + indexed, + }) + .filter_map(|att| { + AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) + }) }) - .filter_map(|att| { - AttMaxCover::new(att, state, &reward_cache, total_active_balance, spec) - }); + .flat_map(|att_max_cover| att_max_cover); let prev_epoch_limit = if let BeaconState::Base(base_state) = state { std::cmp::min(