Skip to content

Commit

Permalink
feat: forward to leader repass client to leader and leader response t…
Browse files Browse the repository at this point in the history
…o client (#1619)
  • Loading branch information
dinhani-cw authored and marcospb19-cw committed Aug 13, 2024
1 parent 4188ee2 commit 99bd491
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 247 deletions.
3 changes: 2 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ setup:
# Stratus tasks
# ------------------------------------------------------------------------------

alias run := stratus
alias run := stratus
alias run-leader := stratus
alias run-follower := stratus-follower
alias run-importer := stratus-follower

Expand Down
2 changes: 1 addition & 1 deletion src/eth/executor/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl Evm {
// unexpected errors
Err(e) => {
tracing::warn!(reason = ?e, "evm transaction error");
Err(StratusError::TransactionFailed(e.to_string()))
Err(StratusError::TransactionEvmFailed(e.to_string()))
}
};

Expand Down
22 changes: 11 additions & 11 deletions src/eth/executor/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,16 +356,18 @@ impl Executor {

/// Executes a transaction persisting state changes.
#[tracing::instrument(name = "executor::local_transaction", skip_all, fields(tx_hash, tx_from, tx_to, tx_nonce))]
pub fn execute_local_transaction(&self, tx_input: TransactionInput) -> Result<TransactionExecution, StratusError> {
pub fn execute_local_transaction(&self, tx: TransactionInput) -> Result<TransactionExecution, StratusError> {
#[cfg(feature = "metrics")]
let start = metrics::now();

tracing::info!(tx_hash = %tx.hash, "executing local transaction");

// track
Span::with(|s| {
s.rec_str("tx_hash", &tx_input.hash);
s.rec_str("tx_from", &tx_input.signer);
s.rec_opt("tx_to", &tx_input.to);
s.rec_str("tx_nonce", &tx_input.nonce);
s.rec_str("tx_hash", &tx.hash);
s.rec_str("tx_from", &tx.signer);
s.rec_opt("tx_to", &tx.to);
s.rec_str("tx_nonce", &tx.nonce);
});

// execute according to the strategy
Expand All @@ -383,7 +385,6 @@ impl Executor {
self.locks.serial.clear_poison();
poison.into_inner()
});
tracing::info!("executor acquired serial execution lock");

// WORKAROUND: prevents interval miner mining blocks while a transaction is being executed.
// this can be removed when we implement conflict detection for block number
Expand All @@ -396,23 +397,22 @@ impl Executor {
tracing::info!("executor acquired mine_and_commit lock to prevent executor mining block");
miner_lock
} else {
tracing::warn!("executor did not try to acquire mine_and_commit lock");
None
};

// execute transaction
self.execute_local_transaction_attempts(tx_input.clone(), EvmRoute::Serial, INFINITE_ATTEMPTS)
self.execute_local_transaction_attempts(tx.clone(), EvmRoute::Serial, INFINITE_ATTEMPTS)
}

// Executes transactions in parallel mode:
// * Conflict detection prevents data corruption.
ExecutorStrategy::Paralell => {
let parallel_attempt = self.execute_local_transaction_attempts(tx_input.clone(), EvmRoute::Parallel, 1);
let parallel_attempt = self.execute_local_transaction_attempts(tx.clone(), EvmRoute::Parallel, 1);
match parallel_attempt {
Ok(tx_execution) => Ok(tx_execution),
Err(e) =>
if let StratusError::TransactionConflict(_) = e {
self.execute_local_transaction_attempts(tx_input.clone(), EvmRoute::Serial, INFINITE_ATTEMPTS)
self.execute_local_transaction_attempts(tx.clone(), EvmRoute::Serial, INFINITE_ATTEMPTS)
} else {
Err(e)
},
Expand All @@ -423,7 +423,7 @@ impl Executor {
// track metrics
#[cfg(feature = "metrics")]
{
let function = codegen::function_sig_for_o11y(&tx_input.input);
let function = codegen::function_sig_for_o11y(&tx.input);
match &tx_execution {
Ok(tx_execution) => {
metrics::inc_executor_local_transaction(start.elapsed(), true, function);
Expand Down
16 changes: 7 additions & 9 deletions src/eth/follower/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use async_trait::async_trait;

use crate::eth::primitives::Bytes;
use crate::eth::primitives::Hash;
use crate::eth::primitives::StratusError;
use crate::eth::rpc::RpcClientApp;
#[cfg(feature = "metrics")]
use crate::infra::metrics;
use crate::infra::BlockchainClient;
Expand All @@ -29,23 +31,19 @@ pub trait Consensus: Send + Sync {
return should_serve;
}

/// Forward a transaction
async fn forward(&self, transaction: Bytes) -> anyhow::Result<Hash> {
/// Forwards a transaction to leader.
async fn forward_to_leader(&self, tx_hash: Hash, tx_data: Bytes, rpc_client: RpcClientApp) -> Result<Hash, StratusError> {
#[cfg(feature = "metrics")]
let start = metrics::now();

let blockchain_client = self.get_chain()?;
tracing::info!(%tx_hash, %rpc_client, "forwaring transaction to leader");

let result = blockchain_client.send_raw_transaction(transaction.into()).await?;
let hash = self.get_chain()?.send_raw_transaction_to_leader(tx_data.into(), rpc_client).await?;

#[cfg(feature = "metrics")]
metrics::inc_consensus_forward(start.elapsed());

let tx_hash = result.tx_hash;
let validator_url = &blockchain_client.http_url;
tracing::info!(%tx_hash, ?validator_url, "forwarded eth_sendRawTransaction to leader");

Ok(result.tx_hash)
Ok(hash)
}

fn get_chain(&self) -> anyhow::Result<&Arc<BlockchainClient>>;
Expand Down
14 changes: 12 additions & 2 deletions src/eth/primitives/stratus_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ pub enum StratusError {

#[error("Failed to executed transaction in EVM: {0:?}.")]
#[strum(props(kind = "execution"))]
TransactionFailed(String), // split this in multiple errors
TransactionEvmFailed(String), // split this in multiple errors

#[error("Failed to execute transaction in leader: {0:?}.")]
#[strum(props(kind = "execution"))]
TransactionLeaderFailed(ErrorObjectOwned),

#[error("Failed to forward transaction to leader node.")]
#[strum(props(kind = "execution"))]
Expand Down Expand Up @@ -170,7 +174,7 @@ impl StratusError {

// Transaction
Self::RpcTransactionInvalid { decode_error } => to_json_value(decode_error),
Self::TransactionFailed(e) => JsonValue::String(e.to_string()),
Self::TransactionEvmFailed(e) => JsonValue::String(e.to_string()),
Self::TransactionReverted { output } => to_json_value(output),

// Unexpected
Expand All @@ -196,6 +200,12 @@ impl From<anyhow::Error> for StratusError {
// -----------------------------------------------------------------------------
impl From<StratusError> for ErrorObjectOwned {
fn from(value: StratusError) -> Self {
// return response from leader
if let StratusError::TransactionLeaderFailed(response) = value {
return response;
}

// generate response
let data = match value.rpc_data() {
serde_json::Value::String(data_str) => {
let data_str = data_str.trim_start_matches('\"').trim_end_matches('\"').replace("\\\"", "\"");
Expand Down
13 changes: 13 additions & 0 deletions src/eth/rpc/rpc_client_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@ impl serde::Serialize for RpcClientApp {
}
}

impl<'de> serde::Deserialize<'de> for RpcClientApp {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let value = String::deserialize(deserializer)?;
match value.as_str() {
"unknown" => Ok(Self::Unknown),
_ => Ok(Self::Identified(value)),
}
}
}

// -----------------------------------------------------------------------------
// Conversions: Self -> Other
// -----------------------------------------------------------------------------
Expand Down
18 changes: 14 additions & 4 deletions src/eth/rpc/rpc_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,18 @@ impl<'a> RpcServiceT<'a> for RpcMiddleware {
let middleware_enter = span.enter();

// extract request data
let client = request.extensions.rpc_client();
let mut client = request.extensions.rpc_client();
let method = request.method_name().to_owned();
let tx = match method.as_str() {
"eth_call" | "eth_estimateGas" => TransactionTracingIdentifiers::from_call(request.params()).ok(),
"eth_sendRawTransaction" => TransactionTracingIdentifiers::from_transaction(request.params()).ok(),
"eth_getTransactionByHash" | "eth_getTransactionReceipt" => TransactionTracingIdentifiers::from_transaction_query(request.params()).ok(),
_ => None,
};
if let Some(tx_client) = tx.as_ref().and_then(|tx| tx.client.clone()) {
request.extensions_mut().insert(tx_client.clone());
client = tx_client;
}

// trace request
Span::with(|s| {
Expand Down Expand Up @@ -226,6 +230,7 @@ impl<'a> Future for RpcResponse<'a> {
// -----------------------------------------------------------------------------

struct TransactionTracingIdentifiers {
pub client: Option<RpcClientApp>,
pub hash: Option<Hash>,
pub contract: ContractName,
pub function: SoliditySignature,
Expand All @@ -235,11 +240,14 @@ struct TransactionTracingIdentifiers {
}

impl TransactionTracingIdentifiers {
// eth_sendRawTransction
// eth_sendRawTransaction
fn from_transaction(params: Params) -> anyhow::Result<Self> {
let (_, data) = next_rpc_param::<Bytes>(params.sequence())?;
let tx = parse_rpc_rlp::<TransactionInput>(&data)?;
let (params, tx_data) = next_rpc_param::<Bytes>(params.sequence())?;
let tx = parse_rpc_rlp::<TransactionInput>(&tx_data)?;
let client = next_rpc_param::<RpcClientApp>(params);

Ok(Self {
client: client.map(|(_, client)| client).ok(),
hash: Some(tx.hash),
contract: codegen::contract_name_for_o11y(&tx.to),
function: codegen::function_sig_for_o11y(&tx.input),
Expand All @@ -253,6 +261,7 @@ impl TransactionTracingIdentifiers {
fn from_call(params: Params) -> anyhow::Result<Self> {
let (_, call) = next_rpc_param::<CallInput>(params.sequence())?;
Ok(Self {
client: None,
hash: None,
contract: codegen::contract_name_for_o11y(&call.to),
function: codegen::function_sig_for_o11y(&call.data),
Expand All @@ -266,6 +275,7 @@ impl TransactionTracingIdentifiers {
fn from_transaction_query(params: Params) -> anyhow::Result<Self> {
let (_, hash) = next_rpc_param::<Hash>(params.sequence())?;
Ok(Self {
client: None,
hash: Some(hash),
contract: metrics::LABEL_MISSING,
function: metrics::LABEL_MISSING,
Expand Down
49 changes: 19 additions & 30 deletions src/eth/rpc/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,53 +604,42 @@ fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc<RpcContext>, ext: Exten

// parse params
reject_unknown_client(ext.rpc_client())?;
let (_, data) = next_rpc_param::<Bytes>(params.sequence())?;
let tx = parse_rpc_rlp::<TransactionInput>(&data)?;
let (_, tx_data) = next_rpc_param::<Bytes>(params.sequence())?;
let tx = parse_rpc_rlp::<TransactionInput>(&tx_data)?;
let tx_hash = tx.hash;

// track
Span::with(|s| {
s.rec_str("tx_hash", &tx.hash);
s.rec_str("tx_hash", &tx_hash);
s.rec_str("tx_from", &tx.signer);
s.rec_opt("tx_to", &tx.to);
s.rec_str("tx_nonce", &tx.nonce);
});
let tx_hash = tx.hash;

// check feature
if not(GlobalState::is_transactions_enabled()) {
tracing::warn!(%tx_hash, "failed to execute eth_sendRawTransaction because transactions are disabled");
return Err(StratusError::RpcTransactionDisabled);
}

// forward transaction to the validator node
if let Some(consensus) = &ctx.consensus {
tracing::info!(%tx_hash, "forwarding eth_sendRawTransaction to leader");
return match Handle::current().block_on(consensus.forward(data)) {
Ok(hash) => Ok(hex_data(hash)),
// execute locally or forward to leader
match &ctx.consensus {
// is leader
None => match ctx.executor.execute_local_transaction(tx) {
Ok(_) => Ok(hex_data(tx_hash)),
Err(e) => {
tracing::error!(reason = ?e, %tx_hash, "failed to forward eth_sendRawTransaction to leader");
Err(StratusError::TransactionForwardToLeaderFailed)
if e.is_internal() {
tracing::error!(reason = ?e, "failed to execute eth_sendRawTransaction");
}
Err(e)
}
};
}
},

// execute locally if leader
tracing::info!(%tx_hash, "executing eth_sendRawTransaction locally");
match ctx.executor.execute_local_transaction(tx) {
Ok(tx) => {
if tx.is_success() {
tracing::info!(tx_hash = %tx.hash(), tx_output = %tx.execution().output, "executed eth_sendRawTransaction with success");
} else {
tracing::warn!(tx_output = %tx.hash(), tx_output = %tx.execution().output, "executed eth_sendRawTransaction with failure");
}
Ok(hex_data(tx_hash))
}
Err(e) => {
if e.is_internal() {
tracing::error!(reason = ?e, "failed to execute eth_sendRawTransaction");
}
Err(e)
}
// is follower
Some(consensus) => match Handle::current().block_on(consensus.forward_to_leader(tx_hash, tx_data, ext.rpc_client())) {
Ok(hash) => Ok(hex_data(hash)),
Err(e) => Err(e),
},
}
}

Expand Down
20 changes: 13 additions & 7 deletions src/infra/blockchain_client/blockchain_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use jsonrpsee::ws_client::WsClientBuilder;
use tokio::sync::RwLock;
use tokio::sync::RwLockReadGuard;

use super::pending_transaction::PendingTransaction;
use crate::alias::EthersBytes;
use crate::alias::EthersTransaction;
use crate::alias::JsonValue;
Expand All @@ -21,7 +20,9 @@ use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::Hash;
use crate::eth::primitives::StratusError;
use crate::eth::primitives::Wei;
use crate::eth::rpc::RpcClientApp;
use crate::ext::to_json_value;
use crate::ext::DisplayExt;
use crate::infra::tracing::TracingExt;
Expand Down Expand Up @@ -210,16 +211,21 @@ impl BlockchainClient {
// RPC mutations
// -------------------------------------------------------------------------

/// Sends a signed transaction.
pub async fn send_raw_transaction(&self, tx: EthersBytes) -> anyhow::Result<PendingTransaction<'_>> {
tracing::debug!("sending raw transaction");
/// Forwards a transaction to leader.
pub async fn send_raw_transaction_to_leader(&self, tx: EthersBytes, rpc_client: RpcClientApp) -> Result<Hash, StratusError> {
tracing::debug!("sending raw transaction to leader");

let tx = to_json_value(tx);
let result = self.http.request::<Hash, Vec<JsonValue>>("eth_sendRawTransaction", vec![tx]).await;
let rpc_client = to_json_value(rpc_client);
let result = self.http.request::<Hash, Vec<JsonValue>>("eth_sendRawTransaction", vec![tx, rpc_client]).await;

match result {
Ok(hash) => Ok(PendingTransaction::new(hash, self)),
Err(e) => log_and_err!(reason = e, "failed to send raw transaction"),
Ok(hash) => Ok(hash),
Err(ClientError::Call(response)) => Err(StratusError::TransactionLeaderFailed(response.into_owned())),
Err(e) => {
tracing::error!(reason = ?e, "failed to send raw transaction to leader");
Err(StratusError::TransactionForwardToLeaderFailed)
}
}
}

Expand Down
1 change: 0 additions & 1 deletion src/infra/blockchain_client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#[allow(clippy::module_inception)]
pub mod blockchain_client;
pub mod pending_transaction;

pub use blockchain_client::BlockchainClient;
Loading

0 comments on commit 99bd491

Please sign in to comment.