From 48c447bcafc44f625548f5ae1f10c60aadba8e74 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Wed, 4 Jan 2023 16:56:14 -0800 Subject: [PATCH] [epoch] Pass epoch store to TransactionManager (#7133) TransactionManager runs passively, which means we could pass epoch store to every call to it. --- crates/sui-core/src/authority.rs | 17 +++++-- .../authority/authority_per_epoch_store.rs | 2 +- .../checkpoints/checkpoint_executor/mod.rs | 2 +- crates/sui-core/src/execution_driver.rs | 2 +- crates/sui-core/src/transaction_manager.rs | 44 +++++++++---------- 5 files changed, 38 insertions(+), 29 deletions(-) diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 369577599d33b..5b18ef17aa26e 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -788,6 +788,7 @@ impl AuthorityState { signed_effects, tx_guard, execution_guard, + epoch_store, ) .await; } @@ -828,6 +829,7 @@ impl AuthorityState { signed_effects, tx_guard, execution_guard, + epoch_store, ) .await } @@ -839,6 +841,7 @@ impl AuthorityState { signed_effects: SignedTransactionEffects, tx_guard: CertTxGuard<'_>, _execution_guard: ExecutionLockReadGuard<'_>, + epoch_store: &Arc, ) -> SuiResult { let digest = *certificate.digest(); let input_object_count = inner_temporary_store.objects.len(); @@ -864,7 +867,8 @@ impl AuthorityState { // // REQUIRED: this must be called before tx_guard.commit_tx() (below), to ensure // TransactionManager can get the notifications after the node crashes and restarts. - self.transaction_manager.objects_committed(output_keys); + self.transaction_manager + .objects_committed(output_keys, epoch_store); // commit_certificate finished, the tx is fully committed to the store. tx_guard.commit_tx(); @@ -967,8 +971,13 @@ impl AuthorityState { } /// Notifies TransactionManager about an executed certificate. - pub fn certificate_executed(&self, digest: &TransactionDigest) { - self.transaction_manager.certificate_executed(digest) + pub fn certificate_executed( + &self, + digest: &TransactionDigest, + epoch_store: &Arc, + ) { + self.transaction_manager + .certificate_executed(digest, epoch_store) } pub async fn dry_exec_transaction( @@ -1625,7 +1634,7 @@ impl AuthorityState { epoch_store: &Arc, ) -> SuiResult<()> { epoch_store.insert_pending_certificates(&certs)?; - self.transaction_manager.enqueue(certs) + self.transaction_manager.enqueue(certs, epoch_store) } // Continually pop in-progress txes from the WAL and try to drive them to completion. diff --git a/crates/sui-core/src/authority/authority_per_epoch_store.rs b/crates/sui-core/src/authority/authority_per_epoch_store.rs index 7313cd963d8c3..8cb59664be569 100644 --- a/crates/sui-core/src/authority/authority_per_epoch_store.rs +++ b/crates/sui-core/src/authority/authority_per_epoch_store.rs @@ -1100,7 +1100,7 @@ impl AuthorityPerEpochStore { { // The certificate has already been inserted into the pending_certificates table by // process_consensus_transaction() above. - transaction_manager.enqueue(vec![certificate])?; + transaction_manager.enqueue(vec![certificate], self)?; } Ok(()) } diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs index 274d98b7503ea..cb4d605daa955 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs @@ -586,7 +586,7 @@ async fn execute_transactions( } epoch_store.insert_pending_certificates(&synced_txns)?; - transaction_manager.enqueue(synced_txns)?; + transaction_manager.enqueue(synced_txns, &epoch_store)?; // Once synced_txns have been awaited, all txns should have effects committed. let mut periods = 1; diff --git a/crates/sui-core/src/execution_driver.rs b/crates/sui-core/src/execution_driver.rs index 391fbfc0e2515..80129b1d1ae43 100644 --- a/crates/sui-core/src/execution_driver.rs +++ b/crates/sui-core/src/execution_driver.rs @@ -98,7 +98,7 @@ pub async fn execution_process( } // Remove the certificate that finished execution from the pending_certificates table. - authority.certificate_executed(&digest); + authority.certificate_executed(&digest, &epoch_store); authority .metrics diff --git a/crates/sui-core/src/transaction_manager.rs b/crates/sui-core/src/transaction_manager.rs index b7277ff1f9489..127b04cd5b27b 100644 --- a/crates/sui-core/src/transaction_manager.rs +++ b/crates/sui-core/src/transaction_manager.rs @@ -12,6 +12,7 @@ use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::Verif use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, error}; +use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore; use crate::authority::{AuthorityMetrics, AuthorityStore}; /// TransactionManager is responsible for managing pending certificates and publishes a stream @@ -58,13 +59,11 @@ impl TransactionManager { inner: Default::default(), tx_ready_certificates, }; + let epoch_store = transaction_manager.authority_store.epoch_store(); transaction_manager .enqueue( - transaction_manager - .authority_store - .epoch_store() - .all_pending_certificates() - .unwrap(), + epoch_store.all_pending_certificates().unwrap(), + &epoch_store, ) .expect("Initialize TransactionManager with pending certificates failed."); transaction_manager @@ -79,13 +78,16 @@ impl TransactionManager { /// TODO: it may be less error prone to take shared object locks inside this function, or /// require shared object lock versions get passed in as input. But this function should not /// have many callsites. Investigate the alternatives here. - pub(crate) fn enqueue(&self, certs: Vec) -> SuiResult<()> { + pub(crate) fn enqueue( + &self, + certs: Vec, + epoch_store: &AuthorityPerEpochStore, + ) -> SuiResult<()> { let inner = &mut self.inner.lock(); for cert in certs { let digest = *cert.digest(); // hold the tx lock until we have finished checking if objects are missing, so that we // don't race with a concurrent execution of this tx. - let epoch_store = self.authority_store.epoch_store(); let _tx_lock = epoch_store.acquire_tx_lock(&digest); // skip already pending txes @@ -99,10 +101,7 @@ impl TransactionManager { // skip already executed txes if self.authority_store.effects_exists(&digest)? { // also ensure the transaction will not be retried after restart. - let _ = self - .authority_store - .epoch_store() - .remove_pending_certificate(&digest); + let _ = epoch_store.remove_pending_certificate(&digest); continue; } @@ -157,7 +156,11 @@ impl TransactionManager { } /// Notifies TransactionManager that the given objects have been committed. - pub(crate) fn objects_committed(&self, object_keys: Vec) { + pub(crate) fn objects_committed( + &self, + object_keys: Vec, + epoch_store: &AuthorityPerEpochStore, + ) { let mut ready_digests = Vec::new(); { @@ -196,11 +199,7 @@ impl TransactionManager { for digest in ready_digests.iter() { // NOTE: failing and ignoring the certificate is fine, if it will be retried at a higher level. // Otherwise, this has to crash. - let cert = match self - .authority_store - .epoch_store() - .get_pending_certificate(digest) - { + let cert = match epoch_store.get_pending_certificate(digest) { Ok(Some(cert)) => cert, Ok(None) => { error!(tx_digest = ?digest, @@ -221,7 +220,11 @@ impl TransactionManager { } /// Notifies TransactionManager about a certificate that has been executed. - pub(crate) fn certificate_executed(&self, digest: &TransactionDigest) { + pub(crate) fn certificate_executed( + &self, + digest: &TransactionDigest, + epoch_store: &AuthorityPerEpochStore, + ) { { let inner = &mut self.inner.lock(); inner.executing_certificates.remove(digest); @@ -229,10 +232,7 @@ impl TransactionManager { .transaction_manager_num_executing_certificates .set(inner.executing_certificates.len() as i64); } - let _ = self - .authority_store - .epoch_store() - .remove_pending_certificate(digest); + let _ = epoch_store.remove_pending_certificate(digest); } /// Sends the ready certificate for execution.