Skip to content

Commit

Permalink
wip: forward to without execution
Browse files Browse the repository at this point in the history
  • Loading branch information
renancloudwalk committed Jun 1, 2024
1 parent 565b49b commit a80bb33
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 73 deletions.
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,13 @@ pub struct IntegratedRelayerConfig {
}

impl IntegratedRelayerConfig {
pub async fn init(&self, storage: Arc<StratusStorage>) -> anyhow::Result<Option<Arc<TransactionRelayer>>> {
pub async fn init(&self) -> anyhow::Result<Option<Arc<TransactionRelayer>>> {
tracing::info!(config = ?self, "creating transaction relayer");

match self.forward_to {
Some(ref forward_to) => {
let chain = BlockchainClient::new_http(forward_to, self.relayer_timeout).await?;
let relayer = TransactionRelayer::new(storage, chain);
let relayer = TransactionRelayer::new(chain);
Ok(Some(Arc::new(relayer)))
}
None => Ok(None),
Expand Down
52 changes: 20 additions & 32 deletions src/eth/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,40 +268,28 @@ impl Executor {
return Err(anyhow!("transaction sent from zero address is not allowed."));
}

let tx_execution = match &self.relayer {
// relayer not present
None => {
// executes transaction until no more conflicts
// TODO: must have a stop condition like timeout or max number of retries
loop {
// execute transaction
let evm_input = EvmInput::from_eth_transaction(tx_input.clone());
let evm_result = self.execute_in_evm(evm_input).await?;

// save execution to temporary storage (not working yet)
let tx_execution = TransactionExecution::new_local(tx_input.clone(), evm_result.clone());
if let Err(e) = self.miner.save_execution(tx_execution.clone()).await {
if let Some(StorageError::Conflict(conflicts)) = e.downcast_ref::<StorageError>() {
tracing::warn!(?conflicts, "temporary storage conflict detected when saving execution");
continue;
} else {
#[cfg(feature = "metrics")]
metrics::inc_executor_transact(start.elapsed(), false, function);
return Err(e);
}
}

break tx_execution;
// executes transaction until no more conflicts
// TODO: must have a stop condition like timeout or max number of retries
loop {
// execute transaction
let evm_input = EvmInput::from_eth_transaction(tx_input.clone());
let evm_result = self.execute_in_evm(evm_input).await?;

// save execution to temporary storage (not working yet)
let tx_execution = TransactionExecution::new_local(tx_input.clone(), evm_result.clone());
if let Err(e) = self.miner.save_execution(tx_execution.clone()).await {
if let Some(StorageError::Conflict(conflicts)) = e.downcast_ref::<StorageError>() {
tracing::warn!(?conflicts, "temporary storage conflict detected when saving execution");
continue;
} else {
#[cfg(feature = "metrics")]
metrics::inc_executor_transact(start.elapsed(), false, function);
return Err(e);
}
}
// relayer present
Some(relayer) => {
let evm_input = EvmInput::from_eth_transaction(tx_input.clone());
let evm_result = self.execute_in_evm(evm_input).await?;
relayer.forward(tx_input.clone(), evm_result.clone()).await?; // TODO: Check if we should run this in paralel by spawning a task when running the online importer.
TransactionExecution::new_local(tx_input, evm_result)
}
};

break tx_execution;
}

#[cfg(feature = "metrics")]
metrics::inc_executor_transact(start.elapsed(), true, function.clone());
Expand Down
44 changes: 5 additions & 39 deletions src/eth/relayer/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,65 +8,31 @@ use tokio::io::AsyncWriteExt;
use crate::eth::evm::EvmExecutionResult;
use crate::eth::primitives::TransactionExecution;
use crate::eth::primitives::TransactionInput;
use crate::eth::storage::StratusStorage;
use crate::infra::BlockchainClient;

pub struct TransactionRelayer {
storage: Arc<StratusStorage>,

/// RPC client that will submit transactions.
chain: BlockchainClient,
}

impl TransactionRelayer {
/// Creates a new [`TransactionRelayer`].
pub fn new(storage: Arc<StratusStorage>, chain: BlockchainClient) -> Self {
pub fn new(chain: BlockchainClient) -> Self {
tracing::info!(?chain, "creating transaction relayer");
Self { storage, chain }
Self { chain }
}

/// Forwards the transaction to the external blockchain if the execution was successful on our side.
#[tracing::instrument(skip_all)]
pub async fn forward(&self, tx_input: TransactionInput, evm_result: EvmExecutionResult) -> anyhow::Result<()> {
pub async fn forward(&self, tx_input: TransactionInput) -> anyhow::Result<()> {
tracing::debug!(hash = %tx_input.hash, "forwarding transaction");

// handle local failure
if evm_result.is_failure() {
tracing::debug!("transaction failed in local execution");
let tx_execution = TransactionExecution::new_local(tx_input, evm_result);
self.storage.save_execution(tx_execution).await?;
return Ok(());
}

// handle local success
let pending_tx = self
let tx = self
.chain
.send_raw_transaction(tx_input.hash, Transaction::from(tx_input.clone()).rlp())
.await?;

let Some(receipt) = pending_tx.await? else {
return Err(anyhow!("transaction did not produce a receipt"));
};

let status = match receipt.status {
Some(status) => status.as_u32(),
None => return Err(anyhow!("receipt did not report the transaction status")),
};

if status == 0 {
tracing::warn!(?receipt.transaction_hash, "transaction result mismatch between stratus and external rpc. saving to json");
let mut file = File::create(format!("data/mismatched_transactions/{}.json", receipt.transaction_hash)).await?;
let json = serde_json::json!(
{
"transaction_input": tx_input,
"stratus_execution": evm_result,
"substrate_receipt": receipt
}
);
file.write_all(json.to_string().as_bytes()).await?;
return Err(anyhow!("transaction succeeded in stratus but failed in substrate"));
}

//TODO send the result of the tx back
Ok(())
}
}
1 change: 1 addition & 0 deletions src/eth/rpc/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ async fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc<RpcContext>) -> a
let (_, data) = next_rpc_param::<Bytes>(params.sequence())?;
let transaction = parse_rpc_rlp::<TransactionInput>(&data)?;

//TODO if the forward_to is up, dont call executor, just forward it
let hash = transaction.hash;
match ctx.executor.transact(transaction).await {
// result is success
Expand Down

0 comments on commit a80bb33

Please sign in to comment.