Skip to content

Commit

Permalink
Fix subscribing to attestation subnets for aggregating (#6681)
Browse files Browse the repository at this point in the history
* Prevent scheduled subnet subscriptions from being overwritten by other subscriptions from same subnet with additional scoping by slot
  • Loading branch information
povi authored and Povilas Liubauskas committed Dec 11, 2024
1 parent c5a48a9 commit 461f7c1
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 7 deletions.
7 changes: 4 additions & 3 deletions beacon_node/network/src/subnet_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub struct SubnetService<T: BeaconChainTypes> {
subscriptions: HashSetDelay<Subnet>,

/// Subscriptions that need to be executed in the future.
scheduled_subscriptions: HashSetDelay<Subnet>,
scheduled_subscriptions: HashSetDelay<ExactSubnet>,

/// A list of permanent subnets that this node is subscribed to.
// TODO: Shift this to a dynamic bitfield
Expand Down Expand Up @@ -485,7 +485,7 @@ impl<T: BeaconChainTypes> SubnetService<T> {
} else {
// This is a future slot, schedule subscribing.
self.scheduled_subscriptions
.insert_at(subnet, time_to_subscription_start);
.insert_at(ExactSubnet { subnet, slot }, time_to_subscription_start);
}

Ok(())
Expand Down Expand Up @@ -626,7 +626,8 @@ impl<T: BeaconChainTypes> Stream for SubnetService<T> {
// Process scheduled subscriptions that might be ready, since those can extend a soon to
// expire subscription.
match self.scheduled_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(subnet))) => {
Poll::Ready(Some(Ok(exact_subnet))) => {
let ExactSubnet { subnet, .. } = exact_subnet;
let current_slot = self.beacon_chain.slot_clock.now().unwrap_or_default();
if let Err(e) = self.subscribe_to_subnet_immediately(subnet, current_slot + 1) {
debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet, "err" => e);
Expand Down
55 changes: 51 additions & 4 deletions beacon_node/network/src/subnet_service/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,12 +500,15 @@ mod test {
// subscription config
let committee_count = 1;

// Makes 2 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
// Makes 3 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for each of the later slots subscriptions
// (subscription_slot2 and subscription_slot3).
let subscription_slot1 = 0;
let subscription_slot2 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4;
let subscription_slot3 = subscription_slot2 * 2;
let com1 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4;
let com2 = 0;
let com3 = CHAIN.chain.spec.attestation_subnet_count - com1;

// create the attestation service and subscriptions
let mut subnet_service = get_subnet_service();
Expand All @@ -532,6 +535,13 @@ mod test {
true,
);

let sub3 = get_subscription(
com3,
current_slot + Slot::new(subscription_slot3),
committee_count,
true,
);

let subnet_id1 = SubnetId::compute_subnet::<MainnetEthSpec>(
current_slot + Slot::new(subscription_slot1),
com1,
Expand All @@ -548,12 +558,23 @@ mod test {
)
.unwrap();

let subnet_id3 = SubnetId::compute_subnet::<MainnetEthSpec>(
current_slot + Slot::new(subscription_slot3),
com3,
committee_count,
&subnet_service.beacon_chain.spec,
)
.unwrap();

// Assert that subscriptions are different but their subnet is the same
assert_ne!(sub1, sub2);
assert_ne!(sub1, sub3);
assert_ne!(sub2, sub3);
assert_eq!(subnet_id1, subnet_id2);
assert_eq!(subnet_id1, subnet_id3);

// submit the subscriptions
subnet_service.validator_subscriptions(vec![sub1, sub2].into_iter());
subnet_service.validator_subscriptions(vec![sub1, sub2, sub3].into_iter());

// Unsubscription event should happen at the end of the slot.
// We wait for 2 slots, to avoid timeout issues
Expand Down Expand Up @@ -590,10 +611,36 @@ mod test {
// If the permanent and short lived subnets are different, we should get an unsubscription event.
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
assert_eq!(
[expected_subscription, expected_unsubscription],
[
expected_subscription.clone(),
expected_unsubscription.clone(),
],
second_subscribe_event[..]
);
}

let subscription_slot = current_slot + subscription_slot3 - 1;

let wait_slots = subnet_service
.beacon_chain
.slot_clock
.duration_to_slot(subscription_slot)
.unwrap()
.as_millis() as u64
/ SLOT_DURATION_MILLIS;

let no_events = dbg!(get_events(&mut subnet_service, None, wait_slots as u32).await);

assert_eq!(no_events, []);

let third_subscribe_event = get_events(&mut subnet_service, None, 2).await;

if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
assert_eq!(
[expected_subscription, expected_unsubscription],
third_subscribe_event[..]
);
}
}

#[tokio::test]
Expand Down

0 comments on commit 461f7c1

Please sign in to comment.