From ae7a23feb775917255a39db356ca2aaf81d64cb2 Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Thu, 8 Aug 2024 11:09:43 -0300 Subject: [PATCH] feat: forward to leader repass client to leader and leader response to client (#1619) --- justfile | 3 +- src/eth/executor/evm.rs | 2 +- src/eth/executor/executor.rs | 22 +-- src/eth/follower/consensus.rs | 16 +- src/eth/primitives/stratus_error.rs | 14 +- src/eth/rpc/rpc_client_app.rs | 13 ++ src/eth/rpc/rpc_middleware.rs | 18 +- src/eth/rpc/rpc_server.rs | 49 ++--- .../blockchain_client/blockchain_client.rs | 20 +- src/infra/blockchain_client/mod.rs | 1 - .../blockchain_client/pending_transaction.rs | 181 ------------------ 11 files changed, 92 insertions(+), 247 deletions(-) delete mode 100644 src/infra/blockchain_client/pending_transaction.rs diff --git a/justfile b/justfile index ab503cc3a..0e4b86538 100644 --- a/justfile +++ b/justfile @@ -31,7 +31,8 @@ setup: # Stratus tasks # ------------------------------------------------------------------------------ -alias run := stratus +alias run := stratus +alias run-leader := stratus alias run-follower := stratus-follower alias run-importer := stratus-follower diff --git a/src/eth/executor/evm.rs b/src/eth/executor/evm.rs index 42e8617cb..8db7892ad 100644 --- a/src/eth/executor/evm.rs +++ b/src/eth/executor/evm.rs @@ -165,7 +165,7 @@ impl Evm { // unexpected errors Err(e) => { tracing::warn!(reason = ?e, "evm transaction error"); - Err(StratusError::TransactionFailed(e.to_string())) + Err(StratusError::TransactionEvmFailed(e.to_string())) } }; diff --git a/src/eth/executor/executor.rs b/src/eth/executor/executor.rs index 2a85240a2..b1217ef85 100644 --- a/src/eth/executor/executor.rs +++ b/src/eth/executor/executor.rs @@ -356,16 +356,18 @@ impl Executor { /// Executes a transaction persisting state changes. #[tracing::instrument(name = "executor::local_transaction", skip_all, fields(tx_hash, tx_from, tx_to, tx_nonce))] - pub fn execute_local_transaction(&self, tx_input: TransactionInput) -> Result { + pub fn execute_local_transaction(&self, tx: TransactionInput) -> Result { #[cfg(feature = "metrics")] let start = metrics::now(); + tracing::info!(tx_hash = %tx.hash, "executing local transaction"); + // track Span::with(|s| { - s.rec_str("tx_hash", &tx_input.hash); - s.rec_str("tx_from", &tx_input.signer); - s.rec_opt("tx_to", &tx_input.to); - s.rec_str("tx_nonce", &tx_input.nonce); + s.rec_str("tx_hash", &tx.hash); + s.rec_str("tx_from", &tx.signer); + s.rec_opt("tx_to", &tx.to); + s.rec_str("tx_nonce", &tx.nonce); }); // execute according to the strategy @@ -383,7 +385,6 @@ impl Executor { self.locks.serial.clear_poison(); poison.into_inner() }); - tracing::info!("executor acquired serial execution lock"); // WORKAROUND: prevents interval miner mining blocks while a transaction is being executed. // this can be removed when we implement conflict detection for block number @@ -396,23 +397,22 @@ impl Executor { tracing::info!("executor acquired mine_and_commit lock to prevent executor mining block"); miner_lock } else { - tracing::warn!("executor did not try to acquire mine_and_commit lock"); None }; // execute transaction - self.execute_local_transaction_attempts(tx_input.clone(), EvmRoute::Serial, INFINITE_ATTEMPTS) + self.execute_local_transaction_attempts(tx.clone(), EvmRoute::Serial, INFINITE_ATTEMPTS) } // Executes transactions in parallel mode: // * Conflict detection prevents data corruption. ExecutorStrategy::Paralell => { - let parallel_attempt = self.execute_local_transaction_attempts(tx_input.clone(), EvmRoute::Parallel, 1); + let parallel_attempt = self.execute_local_transaction_attempts(tx.clone(), EvmRoute::Parallel, 1); match parallel_attempt { Ok(tx_execution) => Ok(tx_execution), Err(e) => if let StratusError::TransactionConflict(_) = e { - self.execute_local_transaction_attempts(tx_input.clone(), EvmRoute::Serial, INFINITE_ATTEMPTS) + self.execute_local_transaction_attempts(tx.clone(), EvmRoute::Serial, INFINITE_ATTEMPTS) } else { Err(e) }, @@ -423,7 +423,7 @@ impl Executor { // track metrics #[cfg(feature = "metrics")] { - let function = codegen::function_sig_for_o11y(&tx_input.input); + let function = codegen::function_sig_for_o11y(&tx.input); match &tx_execution { Ok(tx_execution) => { metrics::inc_executor_local_transaction(start.elapsed(), true, function); diff --git a/src/eth/follower/consensus.rs b/src/eth/follower/consensus.rs index 5d4292a38..0b0b35147 100644 --- a/src/eth/follower/consensus.rs +++ b/src/eth/follower/consensus.rs @@ -4,6 +4,8 @@ use async_trait::async_trait; use crate::eth::primitives::Bytes; use crate::eth::primitives::Hash; +use crate::eth::primitives::StratusError; +use crate::eth::rpc::RpcClientApp; #[cfg(feature = "metrics")] use crate::infra::metrics; use crate::infra::BlockchainClient; @@ -29,23 +31,19 @@ pub trait Consensus: Send + Sync { return should_serve; } - /// Forward a transaction - async fn forward(&self, transaction: Bytes) -> anyhow::Result { + /// Forwards a transaction to leader. + async fn forward_to_leader(&self, tx_hash: Hash, tx_data: Bytes, rpc_client: RpcClientApp) -> Result { #[cfg(feature = "metrics")] let start = metrics::now(); - let blockchain_client = self.get_chain()?; + tracing::info!(%tx_hash, %rpc_client, "forwaring transaction to leader"); - let result = blockchain_client.send_raw_transaction(transaction.into()).await?; + let hash = self.get_chain()?.send_raw_transaction_to_leader(tx_data.into(), rpc_client).await?; #[cfg(feature = "metrics")] metrics::inc_consensus_forward(start.elapsed()); - let tx_hash = result.tx_hash; - let validator_url = &blockchain_client.http_url; - tracing::info!(%tx_hash, ?validator_url, "forwarded eth_sendRawTransaction to leader"); - - Ok(result.tx_hash) + Ok(hash) } fn get_chain(&self) -> anyhow::Result<&Arc>; diff --git a/src/eth/primitives/stratus_error.rs b/src/eth/primitives/stratus_error.rs index 4c3d0a01f..231b7d374 100644 --- a/src/eth/primitives/stratus_error.rs +++ b/src/eth/primitives/stratus_error.rs @@ -79,7 +79,11 @@ pub enum StratusError { #[error("Failed to executed transaction in EVM: {0:?}.")] #[strum(props(kind = "execution"))] - TransactionFailed(String), // split this in multiple errors + TransactionEvmFailed(String), // split this in multiple errors + + #[error("Failed to execute transaction in leader: {0:?}.")] + #[strum(props(kind = "execution"))] + TransactionLeaderFailed(ErrorObjectOwned), #[error("Failed to forward transaction to leader node.")] #[strum(props(kind = "execution"))] @@ -170,7 +174,7 @@ impl StratusError { // Transaction Self::RpcTransactionInvalid { decode_error } => to_json_value(decode_error), - Self::TransactionFailed(e) => JsonValue::String(e.to_string()), + Self::TransactionEvmFailed(e) => JsonValue::String(e.to_string()), Self::TransactionReverted { output } => to_json_value(output), // Unexpected @@ -196,6 +200,12 @@ impl From for StratusError { // ----------------------------------------------------------------------------- impl From for ErrorObjectOwned { fn from(value: StratusError) -> Self { + // return response from leader + if let StratusError::TransactionLeaderFailed(response) = value { + return response; + } + + // generate response let data = match value.rpc_data() { serde_json::Value::String(data_str) => { let data_str = data_str.trim_start_matches('\"').trim_end_matches('\"').replace("\\\"", "\""); diff --git a/src/eth/rpc/rpc_client_app.rs b/src/eth/rpc/rpc_client_app.rs index ccd416bb1..cd1ff1720 100644 --- a/src/eth/rpc/rpc_client_app.rs +++ b/src/eth/rpc/rpc_client_app.rs @@ -80,6 +80,19 @@ impl serde::Serialize for RpcClientApp { } } +impl<'de> serde::Deserialize<'de> for RpcClientApp { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let value = String::deserialize(deserializer)?; + match value.as_str() { + "unknown" => Ok(Self::Unknown), + _ => Ok(Self::Identified(value)), + } + } +} + // ----------------------------------------------------------------------------- // Conversions: Self -> Other // ----------------------------------------------------------------------------- diff --git a/src/eth/rpc/rpc_middleware.rs b/src/eth/rpc/rpc_middleware.rs index 5bb90d112..cdeb39dc5 100644 --- a/src/eth/rpc/rpc_middleware.rs +++ b/src/eth/rpc/rpc_middleware.rs @@ -77,7 +77,7 @@ impl<'a> RpcServiceT<'a> for RpcMiddleware { let middleware_enter = span.enter(); // extract request data - let client = request.extensions.rpc_client(); + let mut client = request.extensions.rpc_client(); let method = request.method_name().to_owned(); let tx = match method.as_str() { "eth_call" | "eth_estimateGas" => TransactionTracingIdentifiers::from_call(request.params()).ok(), @@ -85,6 +85,10 @@ impl<'a> RpcServiceT<'a> for RpcMiddleware { "eth_getTransactionByHash" | "eth_getTransactionReceipt" => TransactionTracingIdentifiers::from_transaction_query(request.params()).ok(), _ => None, }; + if let Some(tx_client) = tx.as_ref().and_then(|tx| tx.client.clone()) { + request.extensions_mut().insert(tx_client.clone()); + client = tx_client; + } // trace request Span::with(|s| { @@ -226,6 +230,7 @@ impl<'a> Future for RpcResponse<'a> { // ----------------------------------------------------------------------------- struct TransactionTracingIdentifiers { + pub client: Option, pub hash: Option, pub contract: ContractName, pub function: SoliditySignature, @@ -235,11 +240,14 @@ struct TransactionTracingIdentifiers { } impl TransactionTracingIdentifiers { - // eth_sendRawTransction + // eth_sendRawTransaction fn from_transaction(params: Params) -> anyhow::Result { - let (_, data) = next_rpc_param::(params.sequence())?; - let tx = parse_rpc_rlp::(&data)?; + let (params, tx_data) = next_rpc_param::(params.sequence())?; + let tx = parse_rpc_rlp::(&tx_data)?; + let client = next_rpc_param::(params); + Ok(Self { + client: client.map(|(_, client)| client).ok(), hash: Some(tx.hash), contract: codegen::contract_name_for_o11y(&tx.to), function: codegen::function_sig_for_o11y(&tx.input), @@ -253,6 +261,7 @@ impl TransactionTracingIdentifiers { fn from_call(params: Params) -> anyhow::Result { let (_, call) = next_rpc_param::(params.sequence())?; Ok(Self { + client: None, hash: None, contract: codegen::contract_name_for_o11y(&call.to), function: codegen::function_sig_for_o11y(&call.data), @@ -266,6 +275,7 @@ impl TransactionTracingIdentifiers { fn from_transaction_query(params: Params) -> anyhow::Result { let (_, hash) = next_rpc_param::(params.sequence())?; Ok(Self { + client: None, hash: Some(hash), contract: metrics::LABEL_MISSING, function: metrics::LABEL_MISSING, diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index f841879e4..665eca391 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -604,17 +604,17 @@ fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc, ext: Exten // parse params reject_unknown_client(ext.rpc_client())?; - let (_, data) = next_rpc_param::(params.sequence())?; - let tx = parse_rpc_rlp::(&data)?; + let (_, tx_data) = next_rpc_param::(params.sequence())?; + let tx = parse_rpc_rlp::(&tx_data)?; + let tx_hash = tx.hash; // track Span::with(|s| { - s.rec_str("tx_hash", &tx.hash); + s.rec_str("tx_hash", &tx_hash); s.rec_str("tx_from", &tx.signer); s.rec_opt("tx_to", &tx.to); s.rec_str("tx_nonce", &tx.nonce); }); - let tx_hash = tx.hash; // check feature if not(GlobalState::is_transactions_enabled()) { @@ -622,35 +622,24 @@ fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc, ext: Exten return Err(StratusError::RpcTransactionDisabled); } - // forward transaction to the validator node - if let Some(consensus) = &ctx.consensus { - tracing::info!(%tx_hash, "forwarding eth_sendRawTransaction to leader"); - return match Handle::current().block_on(consensus.forward(data)) { - Ok(hash) => Ok(hex_data(hash)), + // execute locally or forward to leader + match &ctx.consensus { + // is leader + None => match ctx.executor.execute_local_transaction(tx) { + Ok(_) => Ok(hex_data(tx_hash)), Err(e) => { - tracing::error!(reason = ?e, %tx_hash, "failed to forward eth_sendRawTransaction to leader"); - Err(StratusError::TransactionForwardToLeaderFailed) + if e.is_internal() { + tracing::error!(reason = ?e, "failed to execute eth_sendRawTransaction"); + } + Err(e) } - }; - } + }, - // execute locally if leader - tracing::info!(%tx_hash, "executing eth_sendRawTransaction locally"); - match ctx.executor.execute_local_transaction(tx) { - Ok(tx) => { - if tx.is_success() { - tracing::info!(tx_hash = %tx.hash(), tx_output = %tx.execution().output, "executed eth_sendRawTransaction with success"); - } else { - tracing::warn!(tx_output = %tx.hash(), tx_output = %tx.execution().output, "executed eth_sendRawTransaction with failure"); - } - Ok(hex_data(tx_hash)) - } - Err(e) => { - if e.is_internal() { - tracing::error!(reason = ?e, "failed to execute eth_sendRawTransaction"); - } - Err(e) - } + // is follower + Some(consensus) => match Handle::current().block_on(consensus.forward_to_leader(tx_hash, tx_data, ext.rpc_client())) { + Ok(hash) => Ok(hex_data(hash)), + Err(e) => Err(e), + }, } } diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index b2ea0786c..196b70b14 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -12,7 +12,6 @@ use jsonrpsee::ws_client::WsClientBuilder; use tokio::sync::RwLock; use tokio::sync::RwLockReadGuard; -use super::pending_transaction::PendingTransaction; use crate::alias::EthersBytes; use crate::alias::EthersTransaction; use crate::alias::JsonValue; @@ -21,7 +20,9 @@ use crate::eth::primitives::BlockNumber; use crate::eth::primitives::ExternalBlock; use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::Hash; +use crate::eth::primitives::StratusError; use crate::eth::primitives::Wei; +use crate::eth::rpc::RpcClientApp; use crate::ext::to_json_value; use crate::ext::DisplayExt; use crate::infra::tracing::TracingExt; @@ -210,16 +211,21 @@ impl BlockchainClient { // RPC mutations // ------------------------------------------------------------------------- - /// Sends a signed transaction. - pub async fn send_raw_transaction(&self, tx: EthersBytes) -> anyhow::Result> { - tracing::debug!("sending raw transaction"); + /// Forwards a transaction to leader. + pub async fn send_raw_transaction_to_leader(&self, tx: EthersBytes, rpc_client: RpcClientApp) -> Result { + tracing::debug!("sending raw transaction to leader"); let tx = to_json_value(tx); - let result = self.http.request::>("eth_sendRawTransaction", vec![tx]).await; + let rpc_client = to_json_value(rpc_client); + let result = self.http.request::>("eth_sendRawTransaction", vec![tx, rpc_client]).await; match result { - Ok(hash) => Ok(PendingTransaction::new(hash, self)), - Err(e) => log_and_err!(reason = e, "failed to send raw transaction"), + Ok(hash) => Ok(hash), + Err(ClientError::Call(response)) => Err(StratusError::TransactionLeaderFailed(response.into_owned())), + Err(e) => { + tracing::error!(reason = ?e, "failed to send raw transaction to leader"); + Err(StratusError::TransactionForwardToLeaderFailed) + } } } diff --git a/src/infra/blockchain_client/mod.rs b/src/infra/blockchain_client/mod.rs index 5b7a73ca8..721ae25ac 100644 --- a/src/infra/blockchain_client/mod.rs +++ b/src/infra/blockchain_client/mod.rs @@ -1,5 +1,4 @@ #[allow(clippy::module_inception)] pub mod blockchain_client; -pub mod pending_transaction; pub use blockchain_client::BlockchainClient; diff --git a/src/infra/blockchain_client/pending_transaction.rs b/src/infra/blockchain_client/pending_transaction.rs deleted file mode 100644 index 8baf8ecdc..000000000 --- a/src/infra/blockchain_client/pending_transaction.rs +++ /dev/null @@ -1,181 +0,0 @@ -use std::fmt::Debug; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; -use std::time::Duration; - -use futures::Future; -use futures::Stream; -use futures_timer::Delay; -use futures_util::stream; -use futures_util::FutureExt; -use futures_util::StreamExt; -use pin_project::pin_project; - -use super::BlockchainClient; -use crate::alias::EthersTransaction; -use crate::eth::primitives::ExternalReceipt; -use crate::eth::primitives::Hash; - -type PinBoxFut<'a, T> = Pin> + Send + 'a>>; - -/// Create a stream that emits items at a fixed interval. Used for rate control -pub fn interval(duration: Duration) -> impl futures::stream::Stream + Send + Unpin { - stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop) -} - -enum PendingTxState<'a> { - /// Initial delay to ensure the GettingTx loop doesn't immediately fail - InitialDelay(Pin>), - - /// Waiting for interval to elapse before calling API again - PausedGettingTx, - - /// Polling The blockchain to see if the Tx has confirmed or dropped - GettingTx(PinBoxFut<'a, Option>), - - /// Waiting for interval to elapse before calling API again - PausedGettingReceipt, - - /// Polling the blockchain for the receipt - GettingReceipt(PinBoxFut<'a, Option>), - - CheckingReceipt(Option), - - /// Future has completed and should panic if polled again - Completed, -} - -impl<'a> Debug for PendingTxState<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::InitialDelay(_) => "InitialDelay", - Self::CheckingReceipt(_) => "CheckingReceipt", - Self::Completed => "Completed", - Self::GettingReceipt(_) => "GettingReceipt", - Self::GettingTx(_) => "GettingTx", - Self::PausedGettingReceipt => "PausedGettingReceipt", - Self::PausedGettingTx => "PausedGettingTx", - } - .fmt(f) - } -} - -#[pin_project] -pub struct PendingTransaction<'a> { - pub tx_hash: Hash, - state: PendingTxState<'a>, - provider: &'a BlockchainClient, - interval: Box + Send + Unpin>, - retries_remaining: i32, -} - -impl<'a> PendingTransaction<'a> { - pub fn new(tx_hash: Hash, provider: &'a BlockchainClient) -> Self { - let delay = Box::pin(Delay::new(Duration::from_millis(100))); - PendingTransaction { - state: PendingTxState::InitialDelay(delay), - provider, - tx_hash, - interval: Box::new(interval(Duration::from_millis(20))), - retries_remaining: 200, - } - } -} - -impl<'a> Future for PendingTransaction<'a> { - type Output = anyhow::Result>; - - fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { - let this = self.project(); - tracing::debug!(?this.state, ?this.retries_remaining); - - match this.state { - PendingTxState::InitialDelay(fut) => { - futures_util::ready!(fut.as_mut().poll(ctx)); - let fut = Box::pin(this.provider.fetch_transaction(*this.tx_hash)); - *this.state = PendingTxState::GettingTx(fut); - ctx.waker().wake_by_ref(); - return Poll::Pending; - } - PendingTxState::PausedGettingTx => { - // Wait the polling period so that we do not spam the chain when no - // new block has been mined - let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx)); - let fut = Box::pin(this.provider.fetch_transaction(*this.tx_hash)); - *this.state = PendingTxState::GettingTx(fut); - ctx.waker().wake_by_ref(); - } - PendingTxState::GettingTx(fut) => { - let tx_res = futures_util::ready!(fut.as_mut().poll(ctx)); - // If the provider errors, just try again after the interval. - // nbd. - if tx_res.is_err() { - *this.state = PendingTxState::PausedGettingTx; - ctx.waker().wake_by_ref(); - return Poll::Pending; - } - - let tx_opt = tx_res.unwrap(); - - if tx_opt.is_none() { - if *this.retries_remaining <= 0 { - *this.state = PendingTxState::Completed; - return Poll::Ready(Ok(None)); - } - - *this.retries_remaining -= 1; - *this.state = PendingTxState::PausedGettingTx; - ctx.waker().wake_by_ref(); - return Poll::Pending; - } - - // If it hasn't confirmed yet, poll again later - let tx = tx_opt.unwrap(); - if tx.block_number.is_none() { - *this.state = PendingTxState::PausedGettingTx; - ctx.waker().wake_by_ref(); - return Poll::Pending; - } - - // Start polling for the receipt now - let fut = Box::pin(this.provider.fetch_receipt(*this.tx_hash)); - *this.state = PendingTxState::GettingReceipt(fut); - ctx.waker().wake_by_ref(); - return Poll::Pending; - } - PendingTxState::PausedGettingReceipt => { - // Wait the polling period so that we do not spam the chain when no - // new block has been mined - let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx)); - let fut = Box::pin(this.provider.fetch_receipt(*this.tx_hash)); - *this.state = PendingTxState::GettingReceipt(fut); - ctx.waker().wake_by_ref(); - } - PendingTxState::GettingReceipt(fut) => { - if let Ok(receipt) = futures_util::ready!(fut.as_mut().poll(ctx)) { - *this.state = PendingTxState::CheckingReceipt(receipt); - } else { - *this.state = PendingTxState::PausedGettingReceipt; - } - ctx.waker().wake_by_ref(); - } - PendingTxState::CheckingReceipt(receipt) => { - if receipt.is_none() { - *this.state = PendingTxState::PausedGettingReceipt; - ctx.waker().wake_by_ref(); - return Poll::Pending; - } - - let receipt = receipt.take(); - *this.state = PendingTxState::Completed; - return Poll::Ready(Ok(receipt)); - } - PendingTxState::Completed => { - panic!("polled pending transaction future after completion") - } - }; - - Poll::Pending - } -}