Skip to content

Commit

Permalink
improve instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw committed Jun 3, 2024
1 parent d0bb657 commit ad0e706
Showing 1 changed file with 31 additions and 16 deletions.
47 changes: 31 additions & 16 deletions src/eth/relayer/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::TransactionMined;
use crate::ext::ResultExt;
use crate::ext::SpanExt;
use crate::infra::blockchain_client::pending_transaction::PendingTransaction;
#[cfg(feature = "metrics")]
use crate::infra::metrics;
Expand Down Expand Up @@ -71,9 +72,10 @@ impl ExternalRelayer {
let block_number = block.header.number;

tracing::debug!(?block_number, "relaying block");

// fill span
let span = Span::current();
//span.rec("block_number", &block_number);
span.rec("block_number", &block_number);

// TODO: Replace failed transactions with transactions that will for sure fail in substrate (need access to primary keys)
let dag = TransactionDag::new(block.transactions);
Expand All @@ -89,18 +91,25 @@ impl ExternalRelayer {
Ok(Some(block_number))
}

#[tracing::instrument(name = "external_relayer::compare_receipt", skip_all, fields(hash))]
async fn compare_receipt(&self, stratus_receipt: ExternalReceipt, substrate_receipt: PendingTransaction<'_>) -> anyhow::Result<()> {
let tx_hash = stratus_receipt.0.transaction_hash;
tracing::info!(?tx_hash, "comparing receipts");

// fill span
let span = Span::current();
span.rec("hash", &tx_hash);

match substrate_receipt.await {
Ok(Some(substrate_receipt)) => {
Ok(Some(substrate_receipt)) =>
if let Err(compare_error) = substrate_receipt.compare(&stratus_receipt) {
let err_string = compare_error.to_string();
let error = log_and_err!("transaction mismatch!").context(err_string.clone());
self.save_mismatch(stratus_receipt, Some(substrate_receipt), &err_string).await;
error
} else {
Ok(())
}
}
},
Ok(None) => {
self.save_mismatch(stratus_receipt, None, "no receipt returned by substrate").await;
Err(anyhow!("no receipt returned by substrate"))
Expand All @@ -115,6 +124,7 @@ impl ExternalRelayer {
}

/// Save a transaction mismatch to postgres, if it fails, save it to a file.
#[tracing::instrument(name = "external_relayer::save_mismatch", skip_all)]
async fn save_mismatch(&self, stratus_receipt: ExternalReceipt, substrate_receipt: Option<ExternalReceipt>, err_string: &str) {
let hash = stratus_receipt.hash().to_string();
let stratus_json = serde_json::to_value(stratus_receipt).expect_infallible();
Expand Down Expand Up @@ -147,24 +157,29 @@ impl ExternalRelayer {

/// Relays a transaction to Substrate and waits until the transaction is in the mempool by
/// calling eth_getTransactionByHash. (infallible)
#[tracing::instrument(skip_all)]
#[tracing::instrument(name = "external_relayer::relay_and_check_mempool", skip_all, fields(hash))]
pub async fn relay_and_check_mempool(&self, tx_mined: TransactionMined) -> (PendingTransaction, ExternalReceipt) {
tracing::debug!(?tx_mined.input.hash, "relaying transaction");
let tx_hash = tx_mined.input.hash;
tracing::debug!(?tx_hash, "relaying transaction");

// fill span
let span = Span::current();
span.rec("hash", &tx_hash);

let ethers_tx = Transaction::from(tx_mined.input.clone());
let tx = loop {
match self.substrate_chain.send_raw_transaction(tx_mined.input.hash, ethers_tx.rlp()).await {
match self.substrate_chain.send_raw_transaction(tx_hash, ethers_tx.rlp()).await {
Ok(tx) => break tx,
Err(err) => {
tracing::debug!(?tx_mined.input.hash, "substrate_chain.send_raw_transaction returned an error, checking if transaction was sent anyway");
if self.substrate_chain.fetch_transaction(tx_mined.input.hash).await.unwrap_or(None).is_some() {
tracing::debug!(?tx_mined.input.hash, "transaction found on substrate");
return (
PendingTransaction::new(tx_mined.input.hash, &self.substrate_chain),
ExternalReceipt(tx_mined.into()),
);
tracing::debug!(
?tx_hash,
"substrate_chain.send_raw_transaction returned an error, checking if transaction was sent anyway"
);
if self.substrate_chain.fetch_transaction(tx_hash).await.unwrap_or(None).is_some() {
tracing::debug!(?tx_hash, "transaction found on substrate");
return (PendingTransaction::new(tx_hash, &self.substrate_chain), ExternalReceipt(tx_mined.into()));
}
tracing::warn!(?tx_mined.input.hash, ?err, "failed to send raw transaction, retrying...");
tracing::warn!(?tx_hash, ?err, "failed to send raw transaction, retrying...");
continue;
}
}
Expand All @@ -181,7 +196,7 @@ impl ExternalRelayer {
(tx, ExternalReceipt(tx_mined.into()))
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(name = "external_relayer::relay_dag", skip_all)]
/// Relays a dag by removing its roots and sending them consecutively. Returns `Ok` if we confirmed that all transactions
/// had the same receipts, returns `Err` if one or more transactions had receipts mismatches. The mismatches are saved
/// on the `mismatches` table in pgsql, or in data/mismatches as a fallback.
Expand Down

0 comments on commit ad0e706

Please sign in to comment.