Skip to content

Commit

Permalink
[consensus] Drop pending consensus adapter tasks when epoch ends (Mys…
Browse files Browse the repository at this point in the history
…tenLabs#6625)

Consensus adapter has a bunch of pending tasks that submits to consensus and wait for transaction to be sequenced.

Some of those tasks might never complete after epoch change, because at some point we stop consensus and some submitted transactions will never get committed.

In order to release those tasks, we introduce separate mechanism - `AuthorityPerEpochStore` has a new method `wait_epoch_terminated` which returns future that resolves when epoch ends. We make use of this future to terminate all ConsensusAdapter tasks on epoch change.

In terms of implementation details, we introduce new `NotifyOnce` struct in order to propagate such notification.
  • Loading branch information
andll authored Dec 8, 2022
1 parent 6a33111 commit c8de0d8
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 57 deletions.
10 changes: 0 additions & 10 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2098,16 +2098,6 @@ impl AuthorityState {
))
}

/// Check whether certificate was processed by consensus.
/// Returned future is immediately ready if consensus message was already processed.
/// Otherwise returns future that waits for message to be processed by consensus.
pub async fn consensus_message_processed_notify(
&self,
key: ConsensusTransactionKey,
) -> Result<(), SuiError> {
self.database.consensus_message_processed_notify(key).await
}

/// Get a read reference to an object/seq lock
pub async fn get_transaction_lock(
&self,
Expand Down
37 changes: 29 additions & 8 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use tracing::debug;
use typed_store::rocks::{DBBatch, DBMap, DBOptions, TypedStoreError};
use typed_store::traits::TypedStoreDebug;

use crate::authority::authority_notify_read::{NotifyRead, Registration};
use crate::authority::authority_notify_read::NotifyRead;
use crate::authority::{CertTxGuard, MAX_TX_RECOVERY_RETRY};
use crate::epoch::reconfiguration::ReconfigState;
use crate::notify_once::NotifyOnce;
use crate::stake_aggregator::StakeAggregator;
use sui_types::message_envelope::{TrustedEnvelope, VerifiedEnvelope};
use sui_types::temporary_store::InnerTemporaryStore;
Expand Down Expand Up @@ -53,6 +54,7 @@ pub struct AuthorityPerEpochStore<S> {
/// In-memory cache of the content from the reconfig_state db table.
reconfig_state_mem: RwLock<ReconfigState>,
consensus_notify_read: NotifyRead<ConsensusTransactionKey, ()>,
epoch_alive: NotifyOnce,
end_of_publish: Mutex<StakeAggregator<(), true>>,
/// A write-ahead/recovery log used to ensure we finish fully processing certs after errors or
/// crashes.
Expand Down Expand Up @@ -176,10 +178,12 @@ where
.expect("Load reconfig state at initialization cannot fail");
let wal_path = parent_path.join("recovery_log");
let wal = Arc::new(DBWriteAheadLog::new(wal_path));
let epoch_alive = NotifyOnce::new();
Self {
epoch_id,
tables,
reconfig_state_mem: RwLock::new(reconfig_state),
epoch_alive,
consensus_notify_read: NotifyRead::new(),
end_of_publish: Mutex::new(end_of_publish),
wal,
Expand Down Expand Up @@ -406,6 +410,18 @@ where
Ok(self.tables.consensus_message_processed.contains_key(key)?)
}

pub async fn consensus_message_processed_notify(
&self,
key: ConsensusTransactionKey,
) -> Result<(), SuiError> {
let registration = self.consensus_notify_read.register_one(&key);
if self.is_consensus_message_processed(&key)? {
return Ok(());
}
registration.await;
Ok(())
}

pub fn has_sent_end_of_publish(&self, authority: &AuthorityName) -> SuiResult<bool> {
Ok(self
.end_of_publish
Expand All @@ -414,13 +430,6 @@ where
.contains_key(authority))
}

pub fn register_consensus_message_notify(
&self,
key: &ConsensusTransactionKey,
) -> Registration<ConsensusTransactionKey, ()> {
self.consensus_notify_read.register_one(key)
}

/// Returns Ok(true) if 2f+1 end of publish messages were recorded at this point
pub fn record_end_of_publish(
&self,
Expand Down Expand Up @@ -593,6 +602,18 @@ where
self.store_reconfig_state(&lock_guard)
.expect("Updating reconfig state cannot fail");
}

/// Notify epoch is terminated, can only be called once on epoch store
pub fn epoch_terminated(&self) {
self.epoch_alive
.notify()
.expect("epoch_terminated called twice on same epoch store");
}

/// Waits for the notification about epoch termination
pub async fn wait_epoch_terminated(&self) {
self.epoch_alive.wait().await
}
}

fn transactions_table_default_config() -> DBOptions {
Expand Down
16 changes: 2 additions & 14 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
&self.path,
self.db_options.clone(),
));
self.epoch_store.store(epoch_tables);
let previous_store = self.epoch_store.swap(epoch_tables);
previous_store.epoch_terminated();
}

pub fn epoch_store(&self) -> Guard<Arc<AuthorityPerEpochStore<S>>> {
Expand Down Expand Up @@ -1178,19 +1179,6 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
self.epoch_store().is_consensus_message_processed(key)
}

pub async fn consensus_message_processed_notify(
&self,
key: ConsensusTransactionKey,
) -> Result<(), SuiError> {
let epoch_tables = self.epoch_store();
let registration = epoch_tables.register_consensus_message_notify(&key);
if self.consensus_message_processed(&key)? {
return Ok(());
}
registration.await;
Ok(())
}

pub fn sent_end_of_publish(&self, authority: &AuthorityName) -> SuiResult<bool> {
self.epoch_store().has_sent_end_of_publish(authority)
}
Expand Down
64 changes: 39 additions & 25 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use sui_types::base_types::TransactionDigest;
use sui_types::committee::Committee;
use sui_types::crypto::AuthoritySignInfo;
use sui_types::{
error::{SuiError, SuiResult},
messages::ConsensusTransaction,
Expand Down Expand Up @@ -161,11 +162,8 @@ impl ConsensusAdapter {

fn submit_recovered(self: Arc<Self>) {
// Currently narwhal worker might lose transactions on restart, so we need to resend them
let mut recovered = self
.authority
.database
.epoch_store()
.get_all_pending_consensus_transactions();
let epoch_store = self.authority.database.epoch_store().clone();
let mut recovered = epoch_store.get_all_pending_consensus_transactions();
let pending_certificates = recovered
.iter()
.filter_map(|transaction| {
Expand All @@ -186,10 +184,7 @@ impl ConsensusAdapter {
pending_certificates;

#[allow(clippy::collapsible_if)] // This if can be collapsed but it will be ugly
if self
.authority
.database
.epoch_store()
if epoch_store
.get_reconfig_state_read_lock_guard()
.is_reject_user_certs()
{
Expand All @@ -205,7 +200,7 @@ impl ConsensusAdapter {
}
}
for transaction in recovered {
self.submit_unchecked(transaction);
self.submit_unchecked(transaction, epoch_store.clone());
}
}

Expand Down Expand Up @@ -302,7 +297,7 @@ impl ConsensusAdapter {
self: &Arc<Self>,
transaction: ConsensusTransaction,
) -> SuiResult<JoinHandle<()>> {
let epoch_store = self.authority.database.epoch_store();
let epoch_store = self.authority.database.epoch_store().clone();
let _lock = if transaction.is_user_certificate() {
let lock = epoch_store.get_reconfig_state_read_lock_guard();
if !lock.should_accept_user_certs() {
Expand All @@ -312,31 +307,51 @@ impl ConsensusAdapter {
} else {
None
};
self.authority
.database
.epoch_store()
.insert_pending_consensus_transactions(&transaction)?;
epoch_store.insert_pending_consensus_transactions(&transaction)?;
if let ConsensusTransactionKind::UserTransaction(certificate) = &transaction.kind {
self.pending_certificates
.lock()
.insert(*certificate.digest());
}
Ok(self.submit_unchecked(transaction))
Ok(self.submit_unchecked(transaction, epoch_store.clone()))
}

fn submit_unchecked(self: &Arc<Self>, transaction: ConsensusTransaction) -> JoinHandle<()> {
fn submit_unchecked(
self: &Arc<Self>,
transaction: ConsensusTransaction,
epoch_store: Arc<AuthorityPerEpochStore<AuthoritySignInfo>>,
) -> JoinHandle<()> {
// Reconfiguration lock is dropped when pending_consensus_transactions is persisted, before it is handled by consensus
let async_stage = self.clone().submit_and_wait(transaction);
let async_stage = self.clone().submit_and_wait(transaction, epoch_store);
// Number of this tasks is limited by `sequencing_certificate_inflight` limit
let join_handle = spawn_monitored_task!(async_stage);
join_handle
}

async fn submit_and_wait(
self: Arc<Self>,
transaction: ConsensusTransaction,
epoch_store: Arc<AuthorityPerEpochStore<AuthoritySignInfo>>,
) {
let epoch_terminated = epoch_store.wait_epoch_terminated().boxed();
let submit_and_wait = self
.submit_and_wait_inner(transaction, &epoch_store)
.boxed();
// When epoch_terminated signal is received all pending submit_and_wait_inner are dropped.
//
// This is needed because submit_and_wait_inner waits on read_notify for consensus message to be processed,
// which may never happen on epoch boundary.
select(submit_and_wait, epoch_terminated).await;
}

#[allow(clippy::option_map_unit_fn)]
async fn submit_and_wait(self: Arc<Self>, transaction: ConsensusTransaction) {
async fn submit_and_wait_inner(
self: Arc<Self>,
transaction: ConsensusTransaction,
epoch_store: &Arc<AuthorityPerEpochStore<AuthoritySignInfo>>,
) {
let _guard = InflightDropGuard::acquire(&self);
let processed_waiter = self
.authority
let processed_waiter = epoch_store
.consensus_message_processed_notify(transaction.key())
.boxed();
let await_submit = Self::await_submit_delay(
Expand Down Expand Up @@ -381,7 +396,6 @@ impl ConsensusAdapter {
}
let send_end_of_publish =
if let ConsensusTransactionKind::UserTransaction(certificate) = &transaction.kind {
let epoch_store = self.authority.database.epoch_store();
let reconfig_guard = epoch_store.get_reconfig_state_read_lock_guard();
// note that pending_certificates lock is always acquired *after* reconfiguration lock
// acquiring locks in different order might lead to deadlocks
Expand Down Expand Up @@ -410,9 +424,7 @@ impl ConsensusAdapter {
}
// Removing transaction from persistent storage *after* sending end of epoch
// Doing it in different order won't be restart safe
self.authority
.database
.epoch_store()
epoch_store
.remove_pending_consensus_transaction(&transaction.key())
.expect("Storage error when removing consensus transaction");
self.opt_metrics.as_ref().map(|metrics| {
Expand Down Expand Up @@ -452,6 +464,8 @@ impl<'a> Drop for InflightDropGuard<'a> {
}
}

use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;

#[async_trait::async_trait]
impl SubmitToConsensus for Arc<ConsensusAdapter> {
async fn submit_to_consensus(&self, transaction: &ConsensusTransaction) -> SuiResult {
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod consensus_validator;
mod histogram;
mod module_cache_gauge;
mod node_sync;
mod notify_once;
mod query_helpers;
mod stake_aggregator;
mod transaction_manager;
Expand Down
97 changes: 97 additions & 0 deletions crates/sui-core/src/notify_once.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use parking_lot::Mutex;
use std::sync::Arc;
use tokio::sync::futures::Notified;
use tokio::sync::Notify;

/// Notify once allows waiter to register for certain conditions and unblocks waiter
/// when condition is signalled with `notify` method.
///
/// The functionality is somewhat similar to a tokio watch channel with subscribe method,
/// however it is much less error prone to use NotifyOnce rather then tokio watch.
///
/// Specifically with tokio watch you may miss notification,
/// if you subscribe to it after the value was changed
/// (Note that this is not a bug in tokio watch, but rather a mis-use of it).
///
/// NotifyOnce guarantees that wait() will return once notify() is called,
/// regardless of whether wait() was called before or after notify().
pub struct NotifyOnce {
notify: Mutex<Option<Arc<Notify>>>,
}

impl NotifyOnce {
pub fn new() -> Self {
Self::default()
}

/// Notify all waiters, present and future about event
///
/// After this method all pending and future calls to .wait() will return
///
/// This method returns errors if called more then once
#[allow(clippy::result_unit_err)]
pub fn notify(&self) -> Result<(), ()> {
let Some(notify) = self.notify.lock().take()else { return Err(()) };
// At this point all `register` either registered with current notify,
// or will be returning immediately
notify.notify_waiters();
Ok(())
}

/// Awaits for `notify` method to be called.
///
/// This future is cancellation safe.
pub async fn wait(&self) {
// Note that we only hold lock briefly when registering for notification
// There is a bit of a trickery here with lock - we take a lock and if it is not empty,
// we register .notified() first and then release lock
//
// This is to make sure no notification is lost because Notify::notify_waiters do not
// notify waiters that register **after** notify_waiters was called
let mut notify = None;
let notified = self.make_notified(&mut notify);

if let Some(notified) = notified {
notified.await;
}
}

// This made into separate function as it is only way to make compiler
// not to hold `lock` in a generated async future.
fn make_notified<'a>(&self, notify: &'a mut Option<Arc<Notify>>) -> Option<Notified<'a>> {
let lock = self.notify.lock();
*notify = lock.as_ref().cloned();
notify.as_ref().map(|n| n.notified())
}
}

impl Default for NotifyOnce {
fn default() -> Self {
let notify = Arc::new(Notify::new());
let notify = Mutex::new(Some(notify));
Self { notify }
}
}

#[tokio::test]
async fn notify_once_test() {
let notify_once = NotifyOnce::new();
// Before notify() is called .wait() is not ready
assert!(futures::future::poll_immediate(notify_once.wait())
.await
.is_none());
let wait = notify_once.wait();
notify_once.notify().unwrap();
// Pending wait() call is ready now
assert!(futures::future::poll_immediate(wait).await.is_some());
// Take wait future and don't resolve it.
// This makes sure lock is dropped properly and wait futures resolve independently of each other
let _dangle_wait = notify_once.wait();
// Any new wait() is immediately ready
assert!(futures::future::poll_immediate(notify_once.wait())
.await
.is_some());
}

0 comments on commit c8de0d8

Please sign in to comment.