Skip to content

Commit

Permalink
[epoch] Pass epoch store to TransactionManager (MystenLabs#7133)
Browse files Browse the repository at this point in the history
TransactionManager runs passively, which means we could pass epoch store
to every call to it.
  • Loading branch information
lxfind authored Jan 5, 2023
1 parent 5fb51ac commit 48c447b
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 29 deletions.
17 changes: 13 additions & 4 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ impl AuthorityState {
signed_effects,
tx_guard,
execution_guard,
epoch_store,
)
.await;
}
Expand Down Expand Up @@ -828,6 +829,7 @@ impl AuthorityState {
signed_effects,
tx_guard,
execution_guard,
epoch_store,
)
.await
}
Expand All @@ -839,6 +841,7 @@ impl AuthorityState {
signed_effects: SignedTransactionEffects,
tx_guard: CertTxGuard<'_>,
_execution_guard: ExecutionLockReadGuard<'_>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<SignedTransactionEffects> {
let digest = *certificate.digest();
let input_object_count = inner_temporary_store.objects.len();
Expand All @@ -864,7 +867,8 @@ impl AuthorityState {
//
// REQUIRED: this must be called before tx_guard.commit_tx() (below), to ensure
// TransactionManager can get the notifications after the node crashes and restarts.
self.transaction_manager.objects_committed(output_keys);
self.transaction_manager
.objects_committed(output_keys, epoch_store);

// commit_certificate finished, the tx is fully committed to the store.
tx_guard.commit_tx();
Expand Down Expand Up @@ -967,8 +971,13 @@ impl AuthorityState {
}

/// Notifies TransactionManager about an executed certificate.
pub fn certificate_executed(&self, digest: &TransactionDigest) {
self.transaction_manager.certificate_executed(digest)
pub fn certificate_executed(
&self,
digest: &TransactionDigest,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
self.transaction_manager
.certificate_executed(digest, epoch_store)
}

pub async fn dry_exec_transaction(
Expand Down Expand Up @@ -1625,7 +1634,7 @@ impl AuthorityState {
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<()> {
epoch_store.insert_pending_certificates(&certs)?;
self.transaction_manager.enqueue(certs)
self.transaction_manager.enqueue(certs, epoch_store)
}

// Continually pop in-progress txes from the WAL and try to drive them to completion.
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ impl AuthorityPerEpochStore {
{
// The certificate has already been inserted into the pending_certificates table by
// process_consensus_transaction() above.
transaction_manager.enqueue(vec![certificate])?;
transaction_manager.enqueue(vec![certificate], self)?;
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ async fn execute_transactions(
}
epoch_store.insert_pending_certificates(&synced_txns)?;

transaction_manager.enqueue(synced_txns)?;
transaction_manager.enqueue(synced_txns, &epoch_store)?;

// Once synced_txns have been awaited, all txns should have effects committed.
let mut periods = 1;
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/execution_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub async fn execution_process(
}

// Remove the certificate that finished execution from the pending_certificates table.
authority.certificate_executed(&digest);
authority.certificate_executed(&digest, &epoch_store);

authority
.metrics
Expand Down
44 changes: 22 additions & 22 deletions crates/sui-core/src/transaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::Verif
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error};

use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::authority::{AuthorityMetrics, AuthorityStore};

/// TransactionManager is responsible for managing pending certificates and publishes a stream
Expand Down Expand Up @@ -58,13 +59,11 @@ impl TransactionManager {
inner: Default::default(),
tx_ready_certificates,
};
let epoch_store = transaction_manager.authority_store.epoch_store();
transaction_manager
.enqueue(
transaction_manager
.authority_store
.epoch_store()
.all_pending_certificates()
.unwrap(),
epoch_store.all_pending_certificates().unwrap(),
&epoch_store,
)
.expect("Initialize TransactionManager with pending certificates failed.");
transaction_manager
Expand All @@ -79,13 +78,16 @@ impl TransactionManager {
/// TODO: it may be less error prone to take shared object locks inside this function, or
/// require shared object lock versions get passed in as input. But this function should not
/// have many callsites. Investigate the alternatives here.
pub(crate) fn enqueue(&self, certs: Vec<VerifiedCertificate>) -> SuiResult<()> {
pub(crate) fn enqueue(
&self,
certs: Vec<VerifiedCertificate>,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult<()> {
let inner = &mut self.inner.lock();
for cert in certs {
let digest = *cert.digest();
// hold the tx lock until we have finished checking if objects are missing, so that we
// don't race with a concurrent execution of this tx.
let epoch_store = self.authority_store.epoch_store();
let _tx_lock = epoch_store.acquire_tx_lock(&digest);

// skip already pending txes
Expand All @@ -99,10 +101,7 @@ impl TransactionManager {
// skip already executed txes
if self.authority_store.effects_exists(&digest)? {
// also ensure the transaction will not be retried after restart.
let _ = self
.authority_store
.epoch_store()
.remove_pending_certificate(&digest);
let _ = epoch_store.remove_pending_certificate(&digest);
continue;
}

Expand Down Expand Up @@ -157,7 +156,11 @@ impl TransactionManager {
}

/// Notifies TransactionManager that the given objects have been committed.
pub(crate) fn objects_committed(&self, object_keys: Vec<ObjectKey>) {
pub(crate) fn objects_committed(
&self,
object_keys: Vec<ObjectKey>,
epoch_store: &AuthorityPerEpochStore,
) {
let mut ready_digests = Vec::new();

{
Expand Down Expand Up @@ -196,11 +199,7 @@ impl TransactionManager {
for digest in ready_digests.iter() {
// NOTE: failing and ignoring the certificate is fine, if it will be retried at a higher level.
// Otherwise, this has to crash.
let cert = match self
.authority_store
.epoch_store()
.get_pending_certificate(digest)
{
let cert = match epoch_store.get_pending_certificate(digest) {
Ok(Some(cert)) => cert,
Ok(None) => {
error!(tx_digest = ?digest,
Expand All @@ -221,18 +220,19 @@ impl TransactionManager {
}

/// Notifies TransactionManager about a certificate that has been executed.
pub(crate) fn certificate_executed(&self, digest: &TransactionDigest) {
pub(crate) fn certificate_executed(
&self,
digest: &TransactionDigest,
epoch_store: &AuthorityPerEpochStore,
) {
{
let inner = &mut self.inner.lock();
inner.executing_certificates.remove(digest);
self.metrics
.transaction_manager_num_executing_certificates
.set(inner.executing_certificates.len() as i64);
}
let _ = self
.authority_store
.epoch_store()
.remove_pending_certificate(digest);
let _ = epoch_store.remove_pending_certificate(digest);
}

/// Sends the ready certificate for execution.
Expand Down

0 comments on commit 48c447b

Please sign in to comment.