Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

staking: curate consensus set at epoch boundary #3875

Merged
merged 4 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions crates/core/component/stake/src/component/epoch_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use crate::{
StateReadExt,
};

use super::{ConsensusIndexRead, StateWriteExt};
use super::StateWriteExt;
use crate::component::stake::{ConsensusIndexRead, ConsensusIndexWrite};

#[async_trait]
pub trait EpochHandler: StateWriteExt + ConsensusIndexRead {
Expand All @@ -44,8 +45,7 @@ pub trait EpochHandler: StateWriteExt + ConsensusIndexRead {
let mut num_delegations = 0usize;
let mut num_undelegations = 0usize;

// TODO(erwan): we can turn this into a joinset later, but since the storage locality is pretty good
// and the number of ticks is about ~700, we can just do this in a loop for now.
// Performance: see #3874.
for height in epoch_to_end.start_height..=end_height {
let changes = self
.get_delegation_changes(
Expand Down Expand Up @@ -89,8 +89,7 @@ pub trait EpochHandler: StateWriteExt + ConsensusIndexRead {
// Compute and set the chain base rate for the upcoming epoch.
let next_base_rate = self.process_chain_base_rate().await?;

// TODO(erwan): no doubt, this can be optimized, but we're on a cold path and for now,
// we want the simplest possible implementation.
// TODO(erwan): replace this with a tagged stream once we have tests. See #3874.
let delegation_set = delegations_by_validator
.keys()
.cloned()
Expand Down Expand Up @@ -127,7 +126,6 @@ pub trait EpochHandler: StateWriteExt + ConsensusIndexRead {
.remove(validator_identity)
.unwrap_or_else(Amount::zero);

// These operations should be commutative, so later, we can consider using a Joinset for this loop.
if let Some(rewards) = self
.process_validator(
validator_identity,
Expand Down Expand Up @@ -350,6 +348,17 @@ pub trait EpochHandler: StateWriteExt + ConsensusIndexRead {
e
})?;

// Finally, we decide whether to keep this validator in the consensus set.
// Doing this here means that we no longer have to worry about validators
// escaping end-epoch processing.
//
// Performance: NV-storage layer churn because the validator might not actually
// be in the CS index. We should replace the union set approach with a merged
// stream that tags items with their source. See #3874.
if !self.belongs_in_index(&validator.identity_key).await {
self.remove_consensus_set_index(&validator.identity_key);
}

Ok(reward_queue_entry)
}

Expand Down
24 changes: 24 additions & 0 deletions crates/core/component/stake/src/component/stake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,30 @@ pub trait ConsensusIndexRead: StateRead {
})
.boxed())
}

/// Returns whether the given validator should be indexed in the consensus set.
#[instrument(level = "error", skip(self))]
async fn belongs_in_index(&self, validator_id: &IdentityKey) -> bool {
let Some(state) = self
.get_validator_state(validator_id)
.await
.expect("no deserialization error")
else {
tracing::error!("validator state was not found");
return false;
};

match state {
validator::State::Active | validator::State::Inactive => {
tracing::debug!(?state, "validator belongs in the consensus set");
true
}
_ => {
tracing::debug!(?state, "validator does not belong in the consensus set");
false
}
}
}
}

impl<T: StateRead + ?Sized> ConsensusIndexRead for T {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub trait ValidatorManager: StateWrite {
(Inactive, Defined) => {
// The validator has fallen below the minimum threshold to be
// part of the "greater" consensus set.
self.remove_consensus_set_index(identity_key);
tracing::debug!(identity_key = ?identity_key, "validator has fallen below minimum stake threshold to be considered inactive");
self.put(validator_state_path, Defined);
}
(Inactive, Active) => {
Expand Down Expand Up @@ -265,7 +265,16 @@ pub trait ValidatorManager: StateWrite {
// The validator was part of the active set, but its delegation pool fell below
// the minimum threshold. We remove it from the active set and the consensus set.
tracing::debug!(unbonds_at_epoch, "ejecting from active set");
self.remove_consensus_set_index(identity_key);

// The validator's delegation pool begins unbonding.
self.set_validator_bonding_state(
identity_key,
Unbonding {
unbonds_at_epoch: self
.compute_unbonding_epoch(identity_key, current_epoch.index)
.await?,
},
);
self.put(validator_state_path, Defined);
}
(Defined | Disabled | Inactive | Active | Jailed, Tombstoned) => {
Expand Down Expand Up @@ -294,9 +303,6 @@ pub trait ValidatorManager: StateWrite {
"tombstoning validator and unbond its pool"
);

// Remove the validator from the consensus set.
self.remove_consensus_set_index(identity_key);

// Finally, set the validator to be tombstoned.
self.put(validator_state_path, Tombstoned);
}
Expand Down
Loading