diff --git a/crates/sui-core/src/authority_aggregator.rs b/crates/sui-core/src/authority_aggregator.rs index d4ef302912839..03c57c23df41b 100644 --- a/crates/sui-core/src/authority_aggregator.rs +++ b/crates/sui-core/src/authority_aggregator.rs @@ -4,10 +4,10 @@ use crate::authority_client::{ make_authority_clients, make_network_authority_client_sets_from_committee, - make_network_authority_client_sets_from_system_state, AuthorityAPI, LocalAuthorityClient, - NetworkAuthorityClient, + make_network_authority_client_sets_from_system_state, AuthorityAPI, NetworkAuthorityClient, }; use crate::safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase}; +use crate::test_authority_clients::LocalAuthorityClient; use crate::validator_info::make_committee; use async_trait::async_trait; @@ -175,6 +175,7 @@ impl AuthAggMetrics { } } +#[derive(Debug)] struct EffectsStakeInfo { stake: StakeUnit, effects: TransactionEffects, @@ -1199,7 +1200,7 @@ where let state = ProcessTransactionState::default(); let transaction_ref = &transaction; - let state = self + let mut state = self .quorum_map_then_reduce_with_timeout( state, |_name, client| { @@ -1210,37 +1211,56 @@ where |mut state, name, weight, result| { Box::pin(async move { match result { - // If we are given back a certificate, then we do not need - // to re-submit this transaction, we just returned the ready made - // certificate. A certificate is only valid if it's formed in the - // current epoch. Ok(VerifiedTransactionInfoResponse { certified_transaction: Some(inner_certificate), - .. - }) if inner_certificate.epoch() == self.committee.epoch => { - // A validator could return a certificate from an epoch that's - // different from what the authority aggregator is expecting. - // In that case, we should not accept that certificate. - debug!(tx_digest = ?tx_digest, name=?name.concise(), weight, "Received prev certificate from validator handle_transaction"); - state.certificate = Some(inner_certificate); - } - - // If we didn't match the above case but here, it means that we have - // a cert from a different epoch, and also have effects (i.e. already - // executed), we can accept the certificate if we get 2f+1 effects. - // It's an proof that the transaction has already been finalized - // in a different epoch, and hence it's ok to reuse the old certificate. - Ok(VerifiedTransactionInfoResponse { signed_effects: Some(inner_effects), - certified_transaction: Some(inner_certificate), .. }) => { - if state.effects_map.add(inner_effects, weight, &self.committee) { + // If we get a certificate in the same epoch, then we use it. + // A certificate in a past epoch does not guaranteee finality + // and validators may reject to process it. + if inner_certificate.epoch() == self.committee.epoch { + debug!(tx_digest = ?tx_digest, name=?name.concise(), weight, "Received prev certificate from validator handle_transaction"); + state.certificate = Some(inner_certificate); + } else if inner_effects.epoch() == self.committee.epoch { + // If we get 2f+1 effects, it's an proof that the transaction + // has already been finalized in a different epoch. Regardless + // of the cert's epoch, we can accept it. + // This is safe when the signed-effects's epoch is equal to + // the local epoch because validators re-sign effects that are + // committed in past epochs. However it's not safe when the + // signed effects comes from the future because the stake + // distribution may have changed. + // Theoretically, the signed effects could be in a previous + // epoch from a stale validator, but in `effects_map` we try to + // form a CertifiedTransactionEffects which requires all sigs + // in the same epoch, this is not necessary but not a big deal + // anyways. + // TODO: we may return a CertifiedTransactionEffects directly here + if state.effects_map.add(inner_effects, weight, &self.committee) { + debug!( + tx_digest = ?tx_digest, + "Got quorum for effects for certs that are from previous epochs handle_transaction" + ); + state.certificate = Some(inner_certificate); + } + } else { + // We reach here when + // inner_certificate and inner_effects.epoch() > self.committee.epoch + // and the shared committee store in SafeClient is updated. In this case + // we record a transient error. debug!( tx_digest = ?tx_digest, - "Got quorum for effects for certs that are from previous epochs handle_transaction" + name=?name.concise(), + weight, + actual_epoch = inner_certificate.epoch(), + expected_epoch = self.committee.epoch, + "Received epoch-mismatched transaction cert from validator handle_transaction", ); - state.certificate = Some(inner_certificate); + state.errors.push( + SuiError::WrongEpoch { expected_epoch: self.committee.epoch, actual_epoch: inner_certificate.epoch() } + ); + state.bad_stake += weight; } } @@ -1250,30 +1270,42 @@ where Ok(VerifiedTransactionInfoResponse { signed_transaction: Some(inner_signed_transaction), .. - }) if inner_signed_transaction.epoch() == self.committee.epoch => { - let tx_digest = inner_signed_transaction.digest(); - debug!(tx_digest = ?tx_digest, name=?name.concise(), weight, "Received signed transaction from validator handle_transaction"); - state.signatures.push(inner_signed_transaction.into_inner().into_data_and_sig().1); - state.good_stake += weight; - if state.good_stake >= threshold { - self.metrics - .num_signatures - .observe(state.signatures.len() as f64); - self.metrics.num_good_stake.observe(state.good_stake as f64); - self.metrics.num_bad_stake.observe(state.bad_stake as f64); - state.certificate = - Some( CertifiedTransaction::new( - transaction_ref.data().clone(), - state.signatures.clone(), - &self.committee, - )?.verify(&self.committee)?); + }) => { + // If the signed transaction's epoch is older, than the validator is falling behind. + // If it's newer than we need a reconfig. Either way, we return a transient error. + if inner_signed_transaction.epoch() != self.committee.epoch { + debug!( + tx_digest = ?tx_digest, + name=?name.concise(), + weight, + actual_epoch = inner_signed_transaction.epoch(), + expected_epoch = self.committee.epoch, + "Received epoch-mismatched signed transaction from validator handle_transaction" + ); + state.errors.push( + SuiError::WrongEpoch { expected_epoch: self.committee.epoch, actual_epoch: inner_signed_transaction.epoch() } + ); + state.bad_stake += weight; + } else { + let tx_digest = inner_signed_transaction.digest(); + debug!(tx_digest = ?tx_digest, name=?name.concise(), weight, "Received signed transaction from validator handle_transaction"); + state.signatures.push(inner_signed_transaction.into_inner().into_data_and_sig().1); + state.good_stake += weight; + if state.good_stake >= threshold { + self.metrics + .num_signatures + .observe(state.signatures.len() as f64); + self.metrics.num_good_stake.observe(state.good_stake as f64); + self.metrics.num_bad_stake.observe(state.bad_stake as f64); + state.certificate = + Some(CertifiedTransaction::new( + transaction_ref.data().clone(), + state.signatures.clone(), + &self.committee, + )?.verify(&self.committee)?); + } } } - // If we get back an error, then we aggregate and check - // if we have too many errors - // In this case we will not be able to use this response - // to make a certificate. If this happens for more than f - // authorities we just stop, as there is no hope to finish. Err(err) => { let concise_name = name.concise(); debug!(tx_digest = ?tx_digest, name=?concise_name, weight, "Failed to let validator sign transaction by handle_transaction: {:?}", err); @@ -1294,37 +1326,24 @@ where state.errors.push(err); state.bad_stake += weight; // This is the bad stake counter } - // In case we don't get an error but also don't get a valid value - Ok(ret) => { - // If we are here and yet there are either certs of signed tx, - // it's because their epoch doesn't match with the committee. - // This should start happen less over time as we are working on - // eliminating this on honest validators. - // Log a warning to keep track. - let error = if let Some(inner_certificate) = &ret.certified_transaction { - debug!( - ?tx_digest, - name=?name.concise(), - expected_epoch=?self.committee.epoch, - returned_epoch=?inner_certificate.epoch(), - "Returned certificate is from wrong epoch" - ); - SuiError::WrongEpoch { expected_epoch: self.committee.epoch, actual_epoch: inner_certificate.epoch() } - } else if let Some(inner_signed) = &ret.signed_transaction { - debug!( - ?tx_digest, - name=?name.concise(), - expected_epoch=?self.committee.epoch, - returned_epoch=?inner_signed.epoch(), - "Returned signed transaction is from wrong epoch" - ); - SuiError::WrongEpoch { expected_epoch: self.committee.epoch, actual_epoch: inner_signed.epoch() } - } else { - SuiError::UnexpectedResultFromValidatorHandleTransaction { - err: format!("{:?}", ret), + // In case we don't get an error but also don't get a valid value: + // the response contains either signed transaction or transaction certificate. + // This should only happen on byzantine validators. + Ok(rep) => { + let error_msg = format!( + "Validator returned unexpected response for handle_transaction. has_signed_tx: {}, has_tx_cert: {}, has_signed_effects: {}", + rep.signed_transaction.is_some(), + rep.certified_transaction.is_some(), + rep.signed_effects.is_some(), + ); + error!(?tx_digest, name=?name.concise(), error_msg); + + state.errors.push( + SuiError::ByzantineAuthoritySuspicion { + authority: name, + reason: error_msg, } - }; - state.errors.push(error); + ); state.bad_stake += weight; // This is the bad stake counter } }; @@ -1364,6 +1383,34 @@ where debug!(?tx_digest, "Errors received: {:?}", state.errors); } + if state.certificate.is_none() && !state.effects_map.effects_map.is_empty() { + debug!( + ?tx_digest, + "Received signed Effects but not with a quprum {:?}", state.effects_map.effects_map + ); + state.errors.push( + SuiError::QuorumFailedToFormEffectsCertWhenProcessingTransaction { + effects_map: state + .effects_map + .effects_map + .into_iter() + .map(|(k, v)| { + ( + k, + ( + v.signatures + .into_iter() + .map(|s| s.authority) + .collect::>(), + v.stake, + ), + ) + }) + .collect(), + }, + ); + } + // If we have some certificate return it, or return an error. state .certificate @@ -1414,28 +1461,9 @@ where state, |name, client| { Box::pin(async move { - // Here is the per-authority logic to process a certificate: - // - we try to process a cert, and return Ok on success. - // - we try to update the authority with the cert, and on error return Err. - // - we try to re-process the certificate and return the result. - - let res = - client.handle_certificate(cert_ref.clone()) - .instrument(tracing::trace_span!("handle_certificate", authority =? name.concise())) - .await; - - if res.is_ok() { - debug!( - tx_digest = ?tx_digest, - name = ?name.concise(), - "Validator handled certificate successfully", - ); - } - - // The authority may have failed to process the certificate if there were - // missing parents. In that case, the authority will attempt to perform causal - // completion and execute the cert later. - res + client.handle_certificate(cert_ref.clone()) + .instrument(tracing::trace_span!("handle_certificate", authority =? name.concise())) + .await }) }, |mut state, name, weight, result| { @@ -1446,6 +1474,11 @@ where Ok(VerifiedHandleCertificateResponse { signed_effects, }) => { + debug!( + tx_digest = ?tx_digest, + name = ?name.concise(), + "Validator handled certificate successfully", + ); // Note: here we aggregate votes by the hash of the effects structure if state.effects_map.add(signed_effects, weight, &self.committee) { debug!( diff --git a/crates/sui-core/src/authority_client.rs b/crates/sui-core/src/authority_client.rs index 66a8f0745076d..3428006f851d1 100644 --- a/crates/sui-core/src/authority_client.rs +++ b/crates/sui-core/src/authority_client.rs @@ -2,15 +2,12 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::authority::AuthorityState; use anyhow::anyhow; use async_trait::async_trait; use fastcrypto::traits::ToFromBytes; use multiaddr::Multiaddr; -use mysten_metrics::spawn_monitored_task; use mysten_network::config::Config; use std::collections::BTreeMap; -use std::sync::Arc; use std::time::Duration; use sui_config::genesis::Genesis; use sui_config::ValidatorInfo; @@ -20,7 +17,6 @@ use sui_types::committee::CommitteeWithNetAddresses; use sui_types::crypto::AuthorityPublicKeyBytes; use sui_types::messages_checkpoint::{CheckpointRequest, CheckpointResponse}; use sui_types::sui_system_state::SuiSystemState; -use sui_types::{committee::Committee, crypto::AuthorityKeyPair, object::Object}; use sui_types::{error::SuiError, messages::*}; use sui_network::tonic::transport::Channel; @@ -254,168 +250,3 @@ pub fn make_authority_clients( } authority_clients } - -#[derive(Clone, Copy, Default)] -pub struct LocalAuthorityClientFaultConfig { - pub fail_before_handle_transaction: bool, - pub fail_after_handle_transaction: bool, - pub fail_before_handle_confirmation: bool, - pub fail_after_handle_confirmation: bool, -} - -impl LocalAuthorityClientFaultConfig { - pub fn reset(&mut self) { - *self = Self::default(); - } -} - -#[derive(Clone)] -pub struct LocalAuthorityClient { - pub state: Arc, - pub fault_config: LocalAuthorityClientFaultConfig, -} - -#[async_trait] -impl AuthorityAPI for LocalAuthorityClient { - async fn handle_transaction( - &self, - transaction: Transaction, - ) -> Result { - if self.fault_config.fail_before_handle_transaction { - return Err(SuiError::from("Mock error before handle_transaction")); - } - let state = self.state.clone(); - let transaction = transaction.verify()?; - let result = state.handle_transaction(transaction).await; - if self.fault_config.fail_after_handle_transaction { - return Err(SuiError::GenericAuthorityError { - error: "Mock error after handle_transaction".to_owned(), - }); - } - result.map(|r| r.into()) - } - - async fn handle_certificate( - &self, - certificate: CertifiedTransaction, - ) -> Result { - let state = self.state.clone(); - let fault_config = self.fault_config; - spawn_monitored_task!(Self::handle_certificate(state, certificate, fault_config)) - .await - .unwrap() - } - - async fn handle_account_info_request( - &self, - request: AccountInfoRequest, - ) -> Result { - let state = self.state.clone(); - state.handle_account_info_request(request).await - } - - async fn handle_object_info_request( - &self, - request: ObjectInfoRequest, - ) -> Result { - let state = self.state.clone(); - state - .handle_object_info_request(request) - .await - .map(|r| r.into()) - } - - /// Handle Object information requests for this account. - async fn handle_transaction_info_request( - &self, - request: TransactionInfoRequest, - ) -> Result { - let state = self.state.clone(); - state - .handle_transaction_info_request(request) - .await - .map(|r| r.into()) - } - - async fn handle_checkpoint( - &self, - request: CheckpointRequest, - ) -> Result { - let state = self.state.clone(); - - state.handle_checkpoint_request(&request) - } - - async fn handle_committee_info_request( - &self, - request: CommitteeInfoRequest, - ) -> Result { - let state = self.state.clone(); - - state.handle_committee_info_request(&request) - } -} - -impl LocalAuthorityClient { - pub async fn new(committee: Committee, secret: AuthorityKeyPair, genesis: &Genesis) -> Self { - let state = AuthorityState::new_for_testing(committee, &secret, None, Some(genesis)).await; - Self { - state, - fault_config: LocalAuthorityClientFaultConfig::default(), - } - } - - pub async fn new_with_objects( - committee: Committee, - secret: AuthorityKeyPair, - objects: Vec, - genesis: &Genesis, - ) -> Self { - let client = Self::new(committee, secret, genesis).await; - - for object in objects { - client.state.insert_genesis_object(object).await; - } - - client - } - - pub fn new_from_authority(state: Arc) -> Self { - Self { - state, - fault_config: LocalAuthorityClientFaultConfig::default(), - } - } - - async fn handle_certificate( - state: Arc, - certificate: CertifiedTransaction, - fault_config: LocalAuthorityClientFaultConfig, - ) -> Result { - if fault_config.fail_before_handle_confirmation { - return Err(SuiError::GenericAuthorityError { - error: "Mock error before handle_confirmation_transaction".to_owned(), - }); - } - // Check existing effects before verifying the cert to allow querying certs finalized - // from previous epochs. - let tx_digest = *certificate.digest(); - let epoch_store = state.epoch_store(); - let signed_effects = - match state.get_signed_effects_and_maybe_resign(epoch_store.epoch(), &tx_digest) { - Ok(Some(effects)) => effects, - _ => { - let certificate = { certificate.verify(epoch_store.committee())? }; - state - .try_execute_immediately(&certificate, &epoch_store) - .await? - } - }; - if fault_config.fail_after_handle_confirmation { - return Err(SuiError::GenericAuthorityError { - error: "Mock error after handle_confirmation_transaction".to_owned(), - }); - } - Ok(HandleCertificateResponse { signed_effects }) - } -} diff --git a/crates/sui-core/src/lib.rs b/crates/sui-core/src/lib.rs index 57d328da9bb06..5bc2aafb0e89b 100644 --- a/crates/sui-core/src/lib.rs +++ b/crates/sui-core/src/lib.rs @@ -34,5 +34,6 @@ pub mod validator_info; #[cfg(test)] #[path = "unit_tests/pay_sui_tests.rs"] mod pay_sui_tests; +pub mod test_authority_clients; pub const SUI_CORE_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/crates/sui-core/src/test_authority_clients.rs b/crates/sui-core/src/test_authority_clients.rs new file mode 100644 index 0000000000000..1b72aa8b0daf9 --- /dev/null +++ b/crates/sui-core/src/test_authority_clients.rs @@ -0,0 +1,372 @@ +// Copyright (c) 2021, Facebook, Inc. and its affiliates +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; + +use crate::{authority::AuthorityState, authority_client::AuthorityAPI}; +use async_trait::async_trait; +use mysten_metrics::spawn_monitored_task; +use sui_config::genesis::Genesis; +use sui_types::{ + committee::Committee, + crypto::AuthorityKeyPair, + error::SuiError, + messages::{ + AccountInfoRequest, AccountInfoResponse, CertifiedTransaction, CommitteeInfoRequest, + CommitteeInfoResponse, ObjectInfoRequest, ObjectInfoResponse, Transaction, + TransactionInfoRequest, TransactionInfoResponse, + }, + messages_checkpoint::{CheckpointRequest, CheckpointResponse}, + object::Object, +}; +use sui_types::{error::SuiResult, messages::HandleCertificateResponse}; + +#[derive(Clone, Copy, Default)] +pub struct LocalAuthorityClientFaultConfig { + pub fail_before_handle_transaction: bool, + pub fail_after_handle_transaction: bool, + pub fail_before_handle_confirmation: bool, + pub fail_after_handle_confirmation: bool, +} + +impl LocalAuthorityClientFaultConfig { + pub fn reset(&mut self) { + *self = Self::default(); + } +} + +#[derive(Clone)] +pub struct LocalAuthorityClient { + pub state: Arc, + pub fault_config: LocalAuthorityClientFaultConfig, +} + +#[async_trait] +impl AuthorityAPI for LocalAuthorityClient { + async fn handle_transaction( + &self, + transaction: Transaction, + ) -> Result { + if self.fault_config.fail_before_handle_transaction { + return Err(SuiError::from("Mock error before handle_transaction")); + } + let state = self.state.clone(); + let transaction = transaction.verify()?; + let result = state.handle_transaction(transaction).await; + if self.fault_config.fail_after_handle_transaction { + return Err(SuiError::GenericAuthorityError { + error: "Mock error after handle_transaction".to_owned(), + }); + } + result.map(|r| r.into()) + } + + async fn handle_certificate( + &self, + certificate: CertifiedTransaction, + ) -> Result { + let state = self.state.clone(); + let fault_config = self.fault_config; + spawn_monitored_task!(Self::handle_certificate(state, certificate, fault_config)) + .await + .unwrap() + } + + async fn handle_account_info_request( + &self, + request: AccountInfoRequest, + ) -> Result { + let state = self.state.clone(); + state.handle_account_info_request(request).await + } + + async fn handle_object_info_request( + &self, + request: ObjectInfoRequest, + ) -> Result { + let state = self.state.clone(); + state + .handle_object_info_request(request) + .await + .map(|r| r.into()) + } + + /// Handle Object information requests for this account. + async fn handle_transaction_info_request( + &self, + request: TransactionInfoRequest, + ) -> Result { + let state = self.state.clone(); + state + .handle_transaction_info_request(request) + .await + .map(|r| r.into()) + } + + async fn handle_checkpoint( + &self, + request: CheckpointRequest, + ) -> Result { + let state = self.state.clone(); + + state.handle_checkpoint_request(&request) + } + + async fn handle_committee_info_request( + &self, + request: CommitteeInfoRequest, + ) -> Result { + let state = self.state.clone(); + + state.handle_committee_info_request(&request) + } +} + +impl LocalAuthorityClient { + pub async fn new(committee: Committee, secret: AuthorityKeyPair, genesis: &Genesis) -> Self { + let state = AuthorityState::new_for_testing(committee, &secret, None, Some(genesis)).await; + Self { + state, + fault_config: LocalAuthorityClientFaultConfig::default(), + } + } + + pub async fn new_with_objects( + committee: Committee, + secret: AuthorityKeyPair, + objects: Vec, + genesis: &Genesis, + ) -> Self { + let client = Self::new(committee, secret, genesis).await; + + for object in objects { + client.state.insert_genesis_object(object).await; + } + + client + } + + pub fn new_from_authority(state: Arc) -> Self { + Self { + state, + fault_config: LocalAuthorityClientFaultConfig::default(), + } + } + + async fn handle_certificate( + state: Arc, + certificate: CertifiedTransaction, + fault_config: LocalAuthorityClientFaultConfig, + ) -> Result { + if fault_config.fail_before_handle_confirmation { + return Err(SuiError::GenericAuthorityError { + error: "Mock error before handle_confirmation_transaction".to_owned(), + }); + } + // Check existing effects before verifying the cert to allow querying certs finalized + // from previous epochs. + let tx_digest = *certificate.digest(); + let epoch_store = state.epoch_store(); + let signed_effects = + match state.get_signed_effects_and_maybe_resign(epoch_store.epoch(), &tx_digest) { + Ok(Some(effects)) => effects, + _ => { + let certificate = { certificate.verify(epoch_store.committee())? }; + state + .try_execute_immediately(&certificate, &epoch_store) + .await? + } + }; + if fault_config.fail_after_handle_confirmation { + return Err(SuiError::GenericAuthorityError { + error: "Mock error after handle_confirmation_transaction".to_owned(), + }); + } + Ok(HandleCertificateResponse { signed_effects }) + } +} + +#[derive(Clone)] +pub struct MockAuthorityApi { + delay: Duration, + count: Arc>, + handle_committee_info_request_result: Option>, + handle_object_info_request_result: Option>, +} + +impl MockAuthorityApi { + pub fn new(delay: Duration, count: Arc>) -> Self { + MockAuthorityApi { + delay, + count, + handle_committee_info_request_result: None, + handle_object_info_request_result: None, + } + } + pub fn set_handle_committee_info_request_result( + &mut self, + result: SuiResult, + ) { + self.handle_committee_info_request_result = Some(result); + } + + pub fn set_handle_object_info_request(&mut self, result: SuiResult) { + self.handle_object_info_request_result = Some(result); + } +} + +#[async_trait] +impl AuthorityAPI for MockAuthorityApi { + /// Initiate a new transaction to a Sui or Primary account. + async fn handle_transaction( + &self, + _transaction: Transaction, + ) -> Result { + unreachable!(); + } + + /// Execute a certificate. + async fn handle_certificate( + &self, + _certificate: CertifiedTransaction, + ) -> Result { + unreachable!() + } + + /// Handle Account information requests for this account. + async fn handle_account_info_request( + &self, + _request: AccountInfoRequest, + ) -> Result { + unreachable!(); + } + + /// Handle Object information requests for this account. + async fn handle_object_info_request( + &self, + _request: ObjectInfoRequest, + ) -> Result { + self.handle_object_info_request_result.clone().unwrap() + } + + /// Handle Object information requests for this account. + async fn handle_transaction_info_request( + &self, + _request: TransactionInfoRequest, + ) -> Result { + let count = { + let mut count = self.count.lock().unwrap(); + *count += 1; + *count + }; + + // timeout until the 15th request + if count < 15 { + tokio::time::sleep(self.delay).await; + } + + let res = TransactionInfoResponse { + signed_transaction: None, + certified_transaction: None, + signed_effects: None, + }; + Ok(res) + } + + async fn handle_checkpoint( + &self, + _request: CheckpointRequest, + ) -> Result { + unreachable!(); + } + + async fn handle_committee_info_request( + &self, + _request: CommitteeInfoRequest, + ) -> Result { + self.handle_committee_info_request_result.clone().unwrap() + } +} + +#[derive(Clone)] +pub struct HandleTransactionTestAuthorityClient { + pub tx_info_resp_to_return: TransactionInfoResponse, +} + +#[async_trait] +impl AuthorityAPI for HandleTransactionTestAuthorityClient { + async fn handle_transaction( + &self, + _transaction: Transaction, + ) -> Result { + Ok(self.tx_info_resp_to_return.clone()) + } + + async fn handle_certificate( + &self, + _certificate: CertifiedTransaction, + ) -> Result { + unimplemented!() + } + + async fn handle_account_info_request( + &self, + _request: AccountInfoRequest, + ) -> Result { + unimplemented!() + } + + async fn handle_object_info_request( + &self, + _request: ObjectInfoRequest, + ) -> Result { + unimplemented!() + } + + async fn handle_transaction_info_request( + &self, + _request: TransactionInfoRequest, + ) -> Result { + unimplemented!() + } + + async fn handle_checkpoint( + &self, + _request: CheckpointRequest, + ) -> Result { + unimplemented!() + } + + async fn handle_committee_info_request( + &self, + _request: CommitteeInfoRequest, + ) -> Result { + unimplemented!() + } +} + +impl HandleTransactionTestAuthorityClient { + pub fn new() -> Self { + Self { + tx_info_resp_to_return: TransactionInfoResponse { + signed_transaction: None, + certified_transaction: None, + signed_effects: None, + }, + } + } + + pub fn set_tx_info_response(&mut self, resp: TransactionInfoResponse) { + self.tx_info_resp_to_return = resp; + } +} + +impl Default for HandleTransactionTestAuthorityClient { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/sui-core/src/test_utils.rs b/crates/sui-core/src/test_utils.rs index 47e0f96ef1a75..c64bc48631ecc 100644 --- a/crates/sui-core/src/test_utils.rs +++ b/crates/sui-core/src/test_utils.rs @@ -3,8 +3,8 @@ use crate::authority::{AuthorityState, EffectsNotifyRead}; use crate::authority_aggregator::{AuthorityAggregator, TimeoutConfig}; -use crate::authority_client::LocalAuthorityClient; use crate::epoch::committee_store::CommitteeStore; +use crate::test_authority_clients::LocalAuthorityClient; use fastcrypto::traits::KeyPair; use prometheus::Registry; use signature::Signer; diff --git a/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs b/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs index e842d250cfbef..bfc8dc075f217 100644 --- a/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs +++ b/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use async_trait::async_trait; +use crate::test_utils::make_transfer_sui_transaction; use bcs::to_bytes; use move_core_types::{account_address::AccountAddress, ident_str}; use multiaddr::Multiaddr; @@ -10,6 +10,7 @@ use std::collections::HashSet; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use sui_framework_build::compiled_package::BuildConfig; +use sui_types::crypto::AuthoritySignature; use sui_types::crypto::{ get_authority_key_pair, get_key_pair, AccountKeyPair, AuthorityKeyPair, AuthorityPublicKeyBytes, }; @@ -22,8 +23,10 @@ use sui_types::object::{MoveObject, Object, Owner, GAS_VALUE_FOR_TESTING}; use test_utils::messages::make_random_certified_transaction; use super::*; -use crate::authority_client::{ - AuthorityAPI, LocalAuthorityClient, LocalAuthorityClientFaultConfig, +use crate::authority_client::AuthorityAPI; +use crate::test_authority_clients::{ + HandleTransactionTestAuthorityClient, LocalAuthorityClient, LocalAuthorityClientFaultConfig, + MockAuthorityApi, }; use crate::test_utils::init_local_authorities; use sui_types::utils::to_sender_signed_transaction; @@ -580,115 +583,13 @@ async fn test_process_transaction_fault_fail() { .is_err()); } -#[derive(Clone)] -struct MockAuthorityApi { - delay: Duration, - count: Arc>, - handle_committee_info_request_result: Option>, - handle_object_info_request_result: Option>, -} - -impl MockAuthorityApi { - pub fn new(delay: Duration, count: Arc>) -> Self { - MockAuthorityApi { - delay, - count, - handle_committee_info_request_result: None, - handle_object_info_request_result: None, - } - } - pub fn set_handle_committee_info_request_result( - &mut self, - result: SuiResult, - ) { - self.handle_committee_info_request_result = Some(result); - } - - pub fn set_handle_object_info_request(&mut self, result: SuiResult) { - self.handle_object_info_request_result = Some(result); - } -} - -#[async_trait] -impl AuthorityAPI for MockAuthorityApi { - /// Initiate a new transaction to a Sui or Primary account. - async fn handle_transaction( - &self, - _transaction: Transaction, - ) -> Result { - unreachable!(); - } - - /// Execute a certificate. - async fn handle_certificate( - &self, - _certificate: CertifiedTransaction, - ) -> Result { - unreachable!() - } - - /// Handle Account information requests for this account. - async fn handle_account_info_request( - &self, - _request: AccountInfoRequest, - ) -> Result { - unreachable!(); - } - - /// Handle Object information requests for this account. - async fn handle_object_info_request( - &self, - _request: ObjectInfoRequest, - ) -> Result { - self.handle_object_info_request_result.clone().unwrap() - } - - /// Handle Object information requests for this account. - async fn handle_transaction_info_request( - &self, - _request: TransactionInfoRequest, - ) -> Result { - let count = { - let mut count = self.count.lock().unwrap(); - *count += 1; - *count - }; - - // timeout until the 15th request - if count < 15 { - tokio::time::sleep(self.delay).await; - } - - let res = TransactionInfoResponse { - signed_transaction: None, - certified_transaction: None, - signed_effects: None, - }; - Ok(res) - } - - async fn handle_checkpoint( - &self, - _request: CheckpointRequest, - ) -> Result { - unreachable!(); - } - - async fn handle_committee_info_request( - &self, - _request: CommitteeInfoRequest, - ) -> Result { - self.handle_committee_info_request_result.clone().unwrap() - } -} - #[tokio::test(start_paused = true)] async fn test_quorum_once_with_timeout() { telemetry_subscribers::init_for_testing(); let count = Arc::new(Mutex::new(0)); let (authorities, _authorities_vec, clients) = get_authorities(count.clone(), 30); - let agg = get_agg(authorities, clients); + let agg = get_agg(authorities, clients, 0); let case = |agg: AuthorityAggregator, authority_request_timeout: u64| async move { let log = Arc::new(Mutex::new(Vec::new())); @@ -763,11 +664,12 @@ fn get_authorities( (authorities, authorities_vec, clients) } -fn get_agg( +fn get_agg( authorities: BTreeMap, - clients: BTreeMap, -) -> AuthorityAggregator { - let committee = Committee::new(0, authorities).unwrap(); + clients: BTreeMap, + epoch: EpochId, +) -> AuthorityAggregator { + let committee = Committee::new(epoch, authorities).unwrap(); let committee_store = Arc::new(CommitteeStore::new_for_testing(&committee)); AuthorityAggregator::new_with_timeouts( @@ -818,7 +720,7 @@ async fn test_get_committee_with_net_addresses() { client.set_handle_object_info_request(good_result.clone()); } let clients = clients; - let agg = get_agg(authorities.clone(), clients.clone()); + let agg = get_agg(authorities.clone(), clients.clone(), 0); let res = agg.get_committee_with_net_addresses(1).await; macro_rules! verify_good_result { @@ -880,7 +782,7 @@ async fn test_get_committee_with_net_addresses() { .unwrap() .set_handle_object_info_request(bad_result.clone()); - let agg = get_agg(authorities.clone(), clone_clients.clone()); + let agg = get_agg(authorities.clone(), clone_clients.clone(), 0); let res = agg.get_committee_with_net_addresses(1).await; verify_good_result!(res, 1); @@ -890,7 +792,7 @@ async fn test_get_committee_with_net_addresses() { .get_mut(&val1_pk) .unwrap() .set_handle_object_info_request(bad_result.clone()); - let agg = get_agg(authorities.clone(), clone_clients.clone()); + let agg = get_agg(authorities.clone(), clone_clients.clone(), 0); let res = agg.get_committee_with_net_addresses(1).await; verify_bad_result!(res, 1); @@ -912,7 +814,7 @@ async fn test_get_committee_with_net_addresses() { .unwrap() .set_handle_object_info_request(different_result.clone()); - let agg = get_agg(authorities.clone(), clone_clients.clone()); + let agg = get_agg(authorities.clone(), clone_clients.clone(), 0); let res = agg.get_committee_with_net_addresses(1).await; verify_good_result!(res, 1); @@ -941,7 +843,7 @@ async fn test_get_committee_with_net_addresses() { .unwrap() .set_handle_object_info_request(different_result.clone()); - let agg = get_agg(authorities.clone(), clone_clients); + let agg = get_agg(authorities.clone(), clone_clients, 0); let res = agg.get_committee_with_net_addresses(1).await; verify_bad_result!(res, 1); @@ -963,7 +865,7 @@ async fn test_get_committee_with_net_addresses() { .get_mut(&val2_pk) .unwrap() .set_handle_object_info_request(epoch_0_result.clone()); - let agg = get_agg(authorities.clone(), clone_clients); + let agg = get_agg(authorities.clone(), clone_clients, 0); let res = agg.get_committee_with_net_addresses(1).await; // Get error when asking with minimal epoch = 1 verify_bad_result!(res, 1); @@ -988,7 +890,7 @@ async fn test_get_committee_info() { } let clients = clients; let clone_clients = clients.clone(); - let agg = get_agg(authorities.clone(), clone_clients); + let agg = get_agg(authorities.clone(), clone_clients, 0); let res = agg.get_committee_info(Some(0)).await; match res { Ok(info) => { @@ -1009,7 +911,7 @@ async fn test_get_committee_info() { .next() .unwrap() .set_handle_committee_info_request_result(bad_result.clone()); - let agg = get_agg(authorities.clone(), clone_clients); + let agg = get_agg(authorities.clone(), clone_clients, 0); let res = agg.get_committee_info(Some(0)).await; match res { Ok(info) => { @@ -1029,7 +931,7 @@ async fn test_get_committee_info() { break; } } - let agg = get_agg(authorities.clone(), clone_clients); + let agg = get_agg(authorities.clone(), clone_clients, 0); let res = agg.get_committee_info(Some(0)).await; match res { Err(SuiError::TooManyIncorrectAuthorities { .. }) => (), @@ -1053,7 +955,7 @@ async fn test_get_committee_info() { break; } } - let agg = get_agg(authorities, clone_clients); + let agg = get_agg(authorities, clone_clients, 0); let res = agg.get_committee_info(Some(0)).await; match res { Err(SuiError::TooManyIncorrectAuthorities { .. }) => (), @@ -1064,6 +966,295 @@ async fn test_get_committee_info() { }; } +fn sign_tx( + tx: VerifiedTransaction, + epoch: EpochId, + authority: AuthorityName, + secret: &dyn signature::Signer, +) -> SignedTransaction { + SignedTransaction::new(epoch, tx.into_inner().into_data(), secret, authority) +} + +fn sign_tx_effects( + effects: TransactionEffects, + epoch: EpochId, + authority: AuthorityName, + secret: &dyn signature::Signer, +) -> SignedTransactionEffects { + SignedTransactionEffects::new(epoch, effects, secret, authority) +} + +#[tokio::test] +async fn test_handle_transaction_response() { + telemetry_subscribers::init_for_testing(); + + let mut authorities = BTreeMap::new(); + let mut clients = BTreeMap::new(); + let mut authority_keys = Vec::new(); + for _ in 0..4 { + let (_, sec): (_, AuthorityKeyPair) = get_key_pair(); + let name: AuthorityName = sec.public().into(); + authorities.insert(name, 1); + authority_keys.push((name, sec)); + clients.insert(name, HandleTransactionTestAuthorityClient::new()); + } + + let (sender, sender_kp): (_, AccountKeyPair) = get_key_pair(); + let tx = make_transfer_sui_transaction( + random_object_ref(), + SuiAddress::default(), + None, + sender, + &sender_kp, + ); + // Case 0 + // Validator will give invalid response because of the initial value set for their responses. + let agg = get_agg(authorities.clone(), clients.clone(), 0); + + assert_resp_err(&agg, tx.clone(), |e| { + matches!(e, SuiError::ByzantineAuthoritySuspicion { .. }) + }) + .await; + + // Case 1 + // All Validators gives signed-tx + set_tx_info_response_with_signed_tx(&mut clients, &authority_keys, &tx, 0); + // Validators now gives valid signed tx and we get TxCert + let mut agg = get_agg(authorities.clone(), clients.clone(), 0); + let cert_epoch_0 = agg.process_transaction(tx.clone()).await.unwrap(); + + // Case 2 + // Validator returns signed-tx with epoch 0, client expects 1 + // Update client to epoch 1 + let committee_1 = Committee::new(1, authorities.clone()).unwrap(); + agg.committee_store + .insert_new_committee(&committee_1) + .unwrap(); + agg.committee = committee_1; + + assert_resp_err(&agg, tx.clone(), + |e| matches!(e, SuiError::WrongEpoch { expected_epoch, actual_epoch } if *expected_epoch == 1 && *actual_epoch == 0) + ).await; + + // Case 3 + // Val-0 returns tx-cert + let effects = TransactionEffects { + transaction_digest: *cert_epoch_0.digest(), + ..Default::default() + }; + let (name_0, key_0) = &authority_keys[0]; + let resp = TransactionInfoResponse { + signed_transaction: None, // we don't care signed_tx when we have tx_cert + certified_transaction: Some(cert_epoch_0.clone().into_inner()), + signed_effects: Some(sign_tx_effects(effects, 0, *name_0, key_0)), + }; + clients + .get_mut(&authority_keys[0].0) + .unwrap() + .set_tx_info_response(resp); + + // Val-3 returns invalid response + // (Val-1 and Val-2 returns signed-tx) + for (name, _) in authority_keys.iter().skip(3) { + let resp = TransactionInfoResponse { + signed_transaction: None, + certified_transaction: None, + signed_effects: None, + }; + clients.get_mut(name).unwrap().set_tx_info_response(resp); + } + let agg = get_agg(authorities.clone(), clients.clone(), 0); + // We have a valid cert because val-0 has it. Note we can't form a cert based on what val-1 and val-2 give + agg.process_transaction(tx.clone()).await.unwrap(); + + // Case 4 + // Validator returns signed-tx with epoch 1, client expects 0 + set_tx_info_response_with_signed_tx(&mut clients, &authority_keys, &tx, 1); + + let mut agg = get_agg(authorities.clone(), clients.clone(), 0); + assert_resp_err( + &agg, + tx.clone(), + |e| matches!(e, SuiError::MissingCommitteeAtEpoch(e) if *e == 1), + ) + .await; + + let committee_1 = Committee::new(1, authorities.clone()).unwrap(); + agg.committee_store + .insert_new_committee(&committee_1) + .unwrap(); + agg.committee = committee_1.clone(); + let cert_epoch_1 = agg.process_transaction(tx.clone()).await.unwrap(); + + // Case 5 + // Validator returns tx-cert with epoch 0, client expects 1 + let effects = TransactionEffects { + transaction_digest: *cert_epoch_0.digest(), + ..Default::default() + }; + set_tx_info_response_with_cert_and_effects( + &mut clients, + authority_keys.iter(), + cert_epoch_0.inner(), + effects.clone(), + 0, + ); + + // Update client to epoch 1 + let mut agg = get_agg(authorities.clone(), clients.clone(), 0); + agg.committee_store + .insert_new_committee(&committee_1) + .unwrap(); + agg.committee = committee_1.clone(); + // Err because either cert or signed effects is in epoch 0 + assert_resp_err(&agg, tx.clone(), + |e| matches!(e, SuiError::WrongEpoch { expected_epoch, actual_epoch } if *expected_epoch == 1 && *actual_epoch == 0) + ).await; + + set_tx_info_response_with_cert_and_effects( + &mut clients, + authority_keys.iter(), + cert_epoch_0.inner(), + effects, + 1, + ); + let mut agg = get_agg(authorities.clone(), clients.clone(), 0); + agg.committee_store + .insert_new_committee(&committee_1) + .unwrap(); + agg.committee = committee_1.clone(); + // We have 2f+1 signed effects, so we are good. + agg.process_transaction(tx.clone()).await.unwrap(); + + // Case 6 + // Validator 2 and 3 returns tx-cert with epoch 0, but different signed effects + let effects = TransactionEffects { + transaction_digest: *cert_epoch_0.digest(), + status: ExecutionStatus::Failure { + error: ExecutionFailureStatus::InsufficientGas, + }, + ..Default::default() + }; + set_tx_info_response_with_cert_and_effects( + &mut clients, + authority_keys.iter().skip(2), + cert_epoch_0.inner(), + effects, + 1, + ); + + let mut agg = get_agg(authorities.clone(), clients.clone(), 0); + agg.committee_store + .insert_new_committee(&committee_1) + .unwrap(); + agg.committee = committee_1.clone(); + assert_resp_err(&agg, tx.clone(), |e| { + matches!( + e, + SuiError::QuorumFailedToFormEffectsCertWhenProcessingTransaction { .. } + ) + }) + .await; + + // Case 7 + // Validator returns tx-cert with epoch 1, client expects 0 + let effects = TransactionEffects { + transaction_digest: *cert_epoch_1.digest(), + ..Default::default() + }; + set_tx_info_response_with_cert_and_effects( + &mut clients, + authority_keys.iter(), + cert_epoch_1.inner(), + effects, + 1, + ); + let agg = get_agg(authorities.clone(), clients.clone(), 0); + assert_resp_err( + &agg, + tx.clone(), + |e| matches!(e, SuiError::MissingCommitteeAtEpoch(e) if *e == 1), + ) + .await; + + // Update committee store, now SafeClinet will pass + agg.committee_store + .insert_new_committee(&committee_1) + .unwrap(); + assert_resp_err( + &agg, + tx.clone(), + |e| matches!(e, SuiError::WrongEpoch { expected_epoch, actual_epoch } if *expected_epoch == 0 && *actual_epoch == 1) + ) + .await; +} + +async fn assert_resp_err( + agg: &AuthorityAggregator, + tx: VerifiedTransaction, + checker: F, +) where + F: Fn(&SuiError) -> bool, +{ + match agg.process_transaction(tx).await { + Err(SuiError::QuorumFailedToProcessTransaction { + good_stake, + errors, + conflicting_tx_digests, + }) => { + assert_eq!(good_stake, 0); + assert!(conflicting_tx_digests.is_empty()); + assert!(errors.iter().all(checker)); + } + other => { + panic!( + "Expect QuorumFailedToProcessTransaction but got {:?}", + other + ); + } + } +} + +fn set_tx_info_response_with_cert_and_effects<'a>( + clients: &mut BTreeMap, + authority_keys: impl Iterator, + cert: &CertifiedTransaction, + effects: TransactionEffects, + epoch: EpochId, +) { + for (name, key) in authority_keys { + let resp = TransactionInfoResponse { + signed_transaction: None, + certified_transaction: Some(cert.clone()), + signed_effects: Some(SignedTransactionEffects::new( + epoch, + effects.clone(), + key, + *name, + )), + }; + clients.get_mut(name).unwrap().set_tx_info_response(resp); + } +} + +fn set_tx_info_response_with_signed_tx( + clients: &mut BTreeMap, + authority_keys: &Vec<(AuthorityName, AuthorityKeyPair)>, + tx: &VerifiedTransaction, + epoch: EpochId, +) { + for (name, secret) in authority_keys { + let signed_tx = sign_tx(tx.clone(), epoch, *name, secret); + + let resp = TransactionInfoResponse { + signed_transaction: Some(signed_tx), + certified_transaction: None, + signed_effects: None, + }; + clients.get_mut(name).unwrap().set_tx_info_response(resp); + } +} + pub fn make_response_from_sui_system_state( system_state: SuiSystemState, ) -> SuiResult { diff --git a/crates/sui-core/src/unit_tests/execution_driver_tests.rs b/crates/sui-core/src/unit_tests/execution_driver_tests.rs index e819ef00bce20..b13c5f317d0d9 100644 --- a/crates/sui-core/src/unit_tests/execution_driver_tests.rs +++ b/crates/sui-core/src/unit_tests/execution_driver_tests.rs @@ -7,8 +7,8 @@ use crate::authority_aggregator::authority_aggregator_tests::{ create_object_move_transaction, do_cert, do_transaction, extract_cert, get_latest_ref, transfer_object_move_transaction, }; -use crate::authority_client::LocalAuthorityClient; use crate::safe_client::SafeClient; +use crate::test_authority_clients::LocalAuthorityClient; use crate::test_utils::init_local_authorities; use std::collections::BTreeSet; diff --git a/crates/sui-json-rpc/src/unit_tests/rpc_server_tests.rs b/crates/sui-json-rpc/src/unit_tests/rpc_server_tests.rs index c8ac9c4a30852..0a9a221e4af10 100644 --- a/crates/sui-json-rpc/src/unit_tests/rpc_server_tests.rs +++ b/crates/sui-json-rpc/src/unit_tests/rpc_server_tests.rs @@ -3,6 +3,7 @@ use std::path::Path; +#[cfg(not(msim))] use std::str::FromStr; use sui_config::SUI_KEYSTORE_FILENAME; diff --git a/crates/sui-types/src/error.rs b/crates/sui-types/src/error.rs index e329a4800329b..f3c1df1b91844 100644 --- a/crates/sui-types/src/error.rs +++ b/crates/sui-types/src/error.rs @@ -119,10 +119,14 @@ pub enum SuiError { }, #[error("Invalid Authority Bitmap: {}", error)] InvalidAuthorityBitmap { error: String }, - #[error("Unexpected validator response from handle_transaction: {err}")] - UnexpectedResultFromValidatorHandleTransaction { err: String }, #[error("Transaction certificate processing failed: {err}")] ErrorWhileProcessingCertificate { err: String }, + #[error( + "Failed to get a quorum of signed effects when processing transaction: {effects_map:?}" + )] + QuorumFailedToFormEffectsCertWhenProcessingTransaction { + effects_map: BTreeMap<(EpochId, TransactionEffectsDigest), (Vec, StakeUnit)>, + }, #[error( "Failed to process transaction on a quorum of validators to form a transaction certificate. Locked objects: {:#?}. Validator errors: {:#?}", conflicting_tx_digests, diff --git a/crates/sui-types/src/message_envelope.rs b/crates/sui-types/src/message_envelope.rs index b25399abb52c9..b250073c23e9c 100644 --- a/crates/sui-types/src/message_envelope.rs +++ b/crates/sui-types/src/message_envelope.rs @@ -241,6 +241,10 @@ impl VerifiedEnvelope { self.0 .0 } + pub fn inner(&self) -> &Envelope { + &self.0 .0 + } + pub fn into_message(self) -> T { self.into_inner().into_data() }