From d5930b651c19415fcc8b620c371f842964ce965c Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Wed, 19 Jun 2024 16:48:52 -0300 Subject: [PATCH 01/43] feat: compare final state after relaying a block --- ...1ea55896f462e7bd9c33eada5ec8ced43256c.json | 18 +++++ src/eth/relayer/external.rs | 65 +++++++++++++++++-- src/eth/relayer/transaction_dag.rs | 49 ++++++++------ static/schema/004-relayer.sql | 9 +++ 4 files changed, 115 insertions(+), 26 deletions(-) create mode 100644 .sqlx/query-bcf66d999757219afdaac166a2c1ea55896f462e7bd9c33eada5ec8ced43256c.json diff --git a/.sqlx/query-bcf66d999757219afdaac166a2c1ea55896f462e7bd9c33eada5ec8ced43256c.json b/.sqlx/query-bcf66d999757219afdaac166a2c1ea55896f462e7bd9c33eada5ec8ced43256c.json new file mode 100644 index 000000000..813d68410 --- /dev/null +++ b/.sqlx/query-bcf66d999757219afdaac166a2c1ea55896f462e7bd9c33eada5ec8ced43256c.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO slot_mismatches (address, index, block_number, stratus_value, substrate_value) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bytea", + "Bytea", + "Int8", + "Bytea", + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "bcf66d999757219afdaac166a2c1ea55896f462e7bd9c33eada5ec8ced43256c" +} diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index b0ad3defd..23a38636b 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -17,11 +17,14 @@ use tracing::Span; use super::transaction_dag::TransactionDag; use crate::config::ExternalRelayerClientConfig; use crate::config::ExternalRelayerServerConfig; +use crate::eth::primitives::Address; use crate::eth::primitives::Block; use crate::eth::primitives::BlockNumber; use crate::eth::primitives::ExecutionValueChange; use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::Hash; +use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionMined; use crate::ext::traced_sleep; use crate::ext::ResultExt; @@ -83,6 +86,42 @@ impl ExternalRelayer { !join_all(futures).await.into_iter().any(|result| result.is_err() || result.unwrap().is_null()) } + #[tracing::instrument(name = "external_relayer::relay_next_block", skip_all)] + async fn compare_final_state(&self, changed_slots: HashSet<(Address, SlotIndex)>, block_number: BlockNumber) { + let point_in_time = StoragePointInTime::Past(block_number); + for (addr, index) in changed_slots { + let stratus_slot_value = loop { + match self.stratus_chain.fetch_storage_at(&addr, &index, point_in_time).await { + Ok(value) => break value, + Err(err) => tracing::warn!(?addr, ?index, ?err, "failed to fetch slot value from stratus, retrying..."), + } + }; + + let substrate_slot_value = loop { + match self.substrate_chain.fetch_storage_at(&addr, &index, point_in_time).await { + Ok(value) => break value, + Err(err) => tracing::warn!(?addr, ?index, ?err, "failed to fetch slot value from substrate, retrying..."), + } + }; + + if stratus_slot_value != substrate_slot_value { + tracing::error!(?addr, ?index, ?point_in_time, "evm state mismatch between stratus and substrate"); + while let Err(e) = sqlx::query!( + "INSERT INTO slot_mismatches (address, index, block_number, stratus_value, substrate_value) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING", + addr as _, + index as _, + block_number as _, + stratus_slot_value as _, + substrate_slot_value as _ + ) + .execute(&self.pool) + .await { + tracing::warn!(?e, "failed to insert slot mismatch, retrying...") + } + } + } + } + /// Polls the next block to be relayed and relays it to Substrate. #[tracing::instrument(name = "external_relayer::relay_next_block", skip_all, fields(block_number))] pub async fn relay_blocks(&self) -> anyhow::Result> { @@ -104,7 +143,17 @@ impl ExternalRelayer { .fetch_all(&self.pool) .await?; + if block_rows.len() == 0 { + tracing::info!("no blocks to relay"); + return Ok(vec![]); + } + let block_numbers: HashSet = block_rows.iter().map(|row| row.number.into()).collect(); + let max_number = block_numbers.iter().max().cloned().unwrap(); + + // fill span + Span::with(|s| s.rec_str("block_number", &max_number)); + let blocks: Vec = block_rows .into_iter() .sorted_by_key(|row| row.number) @@ -115,8 +164,11 @@ impl ExternalRelayer { return Err(anyhow!("some blocks in this batch have not been mined in stratus")); } + let combined_transactions = Self::combine_transactions(blocks); + let modified_slots = TransactionDag::get_slot_writes(&combined_transactions); + // TODO: Replace failed transactions with transactions that will for sure fail in substrate (need access to primary keys) - let dag = TransactionDag::new(Self::combine_transactions(blocks)); + let dag = TransactionDag::new(combined_transactions); let (mismatched_blocks, timedout_blocks) = self.relay_dag(dag).await; let non_ok_blocks: HashSet = mismatched_blocks.union(&timedout_blocks).cloned().collect(); @@ -152,6 +204,7 @@ impl ExternalRelayer { .await?; } + self.compare_final_state(modified_slots, max_number).await; Ok(ok_blocks.into_iter().chain(only_mismatched_blocks.into_iter()).collect()) } @@ -197,7 +250,7 @@ impl ExternalRelayer { break Ok(()); } } - Ok(None) => + Ok(None) => { if start.elapsed().as_secs() <= 30 { tracing::warn!(?tx_hash, "no receipt returned by substrate, retrying..."); } else { @@ -206,7 +259,8 @@ impl ExternalRelayer { block_number, anyhow!("no receipt returned by substrate for more than 30 seconds"), )); - }, + } + } Err(error) => { tracing::error!(?tx_hash, ?error, "failed to fetch substrate receipt, retrying..."); } @@ -260,7 +314,7 @@ impl ExternalRelayer { .expect("writing the mismatch to a file should not fail"); tracing::error!(?err, "failed to save mismatch, saving to file"); } - Ok(res) => + Ok(res) => { if res.rows_affected() == 0 { tracing::info!( ?block_number, @@ -268,7 +322,8 @@ impl ExternalRelayer { "transaction mismatch already in database (this should only happen if this block is being retried)." ); return; - }, + } + } } #[cfg(feature = "metrics")] diff --git a/src/eth/relayer/transaction_dag.rs b/src/eth/relayer/transaction_dag.rs index 5fa7aa899..52e3ada32 100644 --- a/src/eth/relayer/transaction_dag.rs +++ b/src/eth/relayer/transaction_dag.rs @@ -19,6 +19,32 @@ pub struct TransactionDag { } impl TransactionDag { + pub fn get_slot_writes(block_transactions: &Vec) -> HashSet<(Address, SlotIndex)> { + block_transactions + .iter() + .flat_map(|tx| { + tx.execution.changes.iter().flat_map(|(address, change)| { + change + .slots + .iter() + .filter_map(|(idx, slot_change)| slot_change.is_modified().then_some((*address, *idx))) + }) + }) + .collect() + } + + pub fn get_balance_writes(block_transactions: &Vec) -> HashSet
{ + block_transactions + .iter() + .flat_map(|tx| { + tx.execution + .changes + .iter() + .filter_map(|(address, change)| change.balance.is_modified().then_some(*address)) + }) + .collect() + } + /// Uses the transactions and produces a Dependency DAG (Directed Acyclical Graph). /// Each vertex of the graph is a transaction, and two vertices are connected iff they conflict /// on either a slot or balance and they don't have the same "from" field (since those transactions will @@ -40,27 +66,8 @@ impl TransactionDag { #[cfg(feature = "metrics")] let start = metrics::now(); - let slot_writes: HashSet<(Address, SlotIndex)> = block_transactions - .iter() - .flat_map(|tx| { - tx.execution.changes.iter().flat_map(|(address, change)| { - change - .slots - .iter() - .filter_map(|(idx, slot_change)| slot_change.is_modified().then_some((*address, *idx))) - }) - }) - .collect(); - - let balance_writes: HashSet
= block_transactions - .iter() - .flat_map(|tx| { - tx.execution - .changes - .iter() - .filter_map(|(address, change)| change.balance.is_modified().then_some(*address)) - }) - .collect(); + let slot_writes: HashSet<(Address, SlotIndex)> = Self::get_slot_writes(&block_transactions); + let balance_writes: HashSet
= Self::get_balance_writes(&block_transactions); let mut slot_conflicts: HashMap<(BlockNumber, Index), HashSet<(Address, SlotIndex)>> = HashMap::new(); let mut balance_conflicts: HashMap<(BlockNumber, Index), HashSet
> = HashMap::new(); diff --git a/static/schema/004-relayer.sql b/static/schema/004-relayer.sql index f2531a428..da714ecda 100644 --- a/static/schema/004-relayer.sql +++ b/static/schema/004-relayer.sql @@ -13,3 +13,12 @@ create table mismatches( substrate_receipt jsonb not null, error text not null ); + +create table slot_mismatches( + address bytea not null, + index bytea not null, + block_number bigint not null, + stratus_value bytea not null, + substrate_value bytea not null, + primary key (address, index) +); From e8a24c2b7b2ad2930c94ef29238281835546f1aa Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 20 Jun 2024 10:24:38 -0300 Subject: [PATCH 02/43] compare substrate with latest --- src/eth/relayer/external.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 23a38636b..98c3d35f2 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -98,7 +98,7 @@ impl ExternalRelayer { }; let substrate_slot_value = loop { - match self.substrate_chain.fetch_storage_at(&addr, &index, point_in_time).await { + match self.substrate_chain.fetch_storage_at(&addr, &index, StoragePointInTime::Present).await { Ok(value) => break value, Err(err) => tracing::warn!(?addr, ?index, ?err, "failed to fetch slot value from substrate, retrying..."), } From 1675ac52fa2e6f96b2bb7f5a282f64df802214c6 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 20 Jun 2024 10:34:44 -0300 Subject: [PATCH 03/43] compare final state metric --- src/eth/relayer/external.rs | 7 +++++++ src/infra/metrics/metrics_definitions.rs | 5 ++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 98c3d35f2..95b14f884 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -33,6 +33,7 @@ use crate::ext::SpanExt; use crate::infra::blockchain_client::pending_transaction::PendingTransaction; #[cfg(feature = "metrics")] use crate::infra::metrics; +use crate::infra::metrics::inc_compare_final_state; use crate::infra::BlockchainClient; use crate::log_and_err; @@ -88,6 +89,9 @@ impl ExternalRelayer { #[tracing::instrument(name = "external_relayer::relay_next_block", skip_all)] async fn compare_final_state(&self, changed_slots: HashSet<(Address, SlotIndex)>, block_number: BlockNumber) { + #[cfg(feature = "metrics")] + let start = metrics::now(); + let point_in_time = StoragePointInTime::Past(block_number); for (addr, index) in changed_slots { let stratus_slot_value = loop { @@ -120,6 +124,9 @@ impl ExternalRelayer { } } } + + #[cfg(feature = "metrics")] + inc_compare_final_state(start.elapsed()); } /// Polls the next block to be relayed and relays it to Substrate. diff --git a/src/infra/metrics/metrics_definitions.rs b/src/infra/metrics/metrics_definitions.rs index c27affbd8..02a30e121 100644 --- a/src/infra/metrics/metrics_definitions.rs +++ b/src/infra/metrics/metrics_definitions.rs @@ -206,5 +206,8 @@ metrics! { histogram_duration save_mismatch{}, "Time to run ExternalRelayerClient::send_to_relayer." - histogram_duration send_to_relayer{} + histogram_duration send_to_relayer{}, + + "Time to run ExternalRelayerClient::compare_final_state." + histogram_duration compare_final_state{} } From 95882c113bac35d698c2d1812a6747507bf93eed Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 20 Jun 2024 14:35:33 -0300 Subject: [PATCH 04/43] parallelize state comparison --- src/eth/relayer/external.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 95b14f884..45f70c827 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -5,6 +5,7 @@ use anyhow::anyhow; use anyhow::Context; use ethers_core::types::Transaction; use futures::future::join_all; +use futures::StreamExt; use itertools::Itertools; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -93,7 +94,9 @@ impl ExternalRelayer { let start = metrics::now(); let point_in_time = StoragePointInTime::Past(block_number); + let mut futures = vec![]; for (addr, index) in changed_slots { + futures.push(async move { let stratus_slot_value = loop { match self.stratus_chain.fetch_storage_at(&addr, &index, point_in_time).await { Ok(value) => break value, @@ -122,9 +125,12 @@ impl ExternalRelayer { .await { tracing::warn!(?e, "failed to insert slot mismatch, retrying...") } - } + }}) } + let mut buffer = futures::stream::iter(futures).buffer_unordered(100); + while let Some(_) = buffer.next().await {} + #[cfg(feature = "metrics")] inc_compare_final_state(start.elapsed()); } @@ -139,7 +145,7 @@ impl ExternalRelayer { FROM relayer_blocks WHERE finished = false ORDER BY number ASC - LIMIT 5 + LIMIT 2 ) UPDATE relayer_blocks r SET started = true From 7333140138d7ffe321a670ae92212b9861298a88 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 20 Jun 2024 14:56:09 -0300 Subject: [PATCH 05/43] sqlx prepare --- ...ca82cb34964cf74a4656bddd0404f001fca64c20e3287fedff18.json} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename .sqlx/{query-9aad59f76a6ef0d71c0d0e6df3aa379330ad9d8355790668f1db6d914444376d.json => query-346bbd1090c6ca82cb34964cf74a4656bddd0404f001fca64c20e3287fedff18.json} (83%) diff --git a/.sqlx/query-9aad59f76a6ef0d71c0d0e6df3aa379330ad9d8355790668f1db6d914444376d.json b/.sqlx/query-346bbd1090c6ca82cb34964cf74a4656bddd0404f001fca64c20e3287fedff18.json similarity index 83% rename from .sqlx/query-9aad59f76a6ef0d71c0d0e6df3aa379330ad9d8355790668f1db6d914444376d.json rename to .sqlx/query-346bbd1090c6ca82cb34964cf74a4656bddd0404f001fca64c20e3287fedff18.json index a0d899e23..e704fbd4c 100644 --- a/.sqlx/query-9aad59f76a6ef0d71c0d0e6df3aa379330ad9d8355790668f1db6d914444376d.json +++ b/.sqlx/query-346bbd1090c6ca82cb34964cf74a4656bddd0404f001fca64c20e3287fedff18.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH cte AS (\n SELECT number\n FROM relayer_blocks\n WHERE finished = false\n ORDER BY number ASC\n LIMIT 5\n )\n UPDATE relayer_blocks r\n SET started = true\n FROM cte\n WHERE r.number = cte.number\n RETURNING r.number, r.payload", + "query": "\n WITH cte AS (\n SELECT number\n FROM relayer_blocks\n WHERE finished = false\n ORDER BY number ASC\n LIMIT 2\n )\n UPDATE relayer_blocks r\n SET started = true\n FROM cte\n WHERE r.number = cte.number\n RETURNING r.number, r.payload", "describe": { "columns": [ { @@ -22,5 +22,5 @@ false ] }, - "hash": "9aad59f76a6ef0d71c0d0e6df3aa379330ad9d8355790668f1db6d914444376d" + "hash": "346bbd1090c6ca82cb34964cf74a4656bddd0404f001fca64c20e3287fedff18" } From 8a3f62f6b9d60ffa18ec1d8e8e378dc9123f2201 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 20 Jun 2024 19:26:03 -0300 Subject: [PATCH 06/43] try fetching more blocks and comparing after every relay --- ...172b964a5c0198e30534a9121ff5cd86102f.json} | 4 +-- src/eth/relayer/external.rs | 25 +++++++------------ 2 files changed, 11 insertions(+), 18 deletions(-) rename .sqlx/{query-346bbd1090c6ca82cb34964cf74a4656bddd0404f001fca64c20e3287fedff18.json => query-6a4b50e8a38f6a3b854c2599e410172b964a5c0198e30534a9121ff5cd86102f.json} (64%) diff --git a/.sqlx/query-346bbd1090c6ca82cb34964cf74a4656bddd0404f001fca64c20e3287fedff18.json b/.sqlx/query-6a4b50e8a38f6a3b854c2599e410172b964a5c0198e30534a9121ff5cd86102f.json similarity index 64% rename from .sqlx/query-346bbd1090c6ca82cb34964cf74a4656bddd0404f001fca64c20e3287fedff18.json rename to .sqlx/query-6a4b50e8a38f6a3b854c2599e410172b964a5c0198e30534a9121ff5cd86102f.json index e704fbd4c..4654a0ef8 100644 --- a/.sqlx/query-346bbd1090c6ca82cb34964cf74a4656bddd0404f001fca64c20e3287fedff18.json +++ b/.sqlx/query-6a4b50e8a38f6a3b854c2599e410172b964a5c0198e30534a9121ff5cd86102f.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH cte AS (\n SELECT number\n FROM relayer_blocks\n WHERE finished = false\n ORDER BY number ASC\n LIMIT 2\n )\n UPDATE relayer_blocks r\n SET started = true\n FROM cte\n WHERE r.number = cte.number\n RETURNING r.number, r.payload", + "query": "\n WITH cte AS (\n SELECT number\n FROM relayer_blocks\n WHERE finished = false\n ORDER BY number ASC\n LIMIT 50\n )\n UPDATE relayer_blocks r\n SET started = true\n FROM cte\n WHERE r.number = cte.number\n RETURNING r.number, r.payload", "describe": { "columns": [ { @@ -22,5 +22,5 @@ false ] }, - "hash": "346bbd1090c6ca82cb34964cf74a4656bddd0404f001fca64c20e3287fedff18" + "hash": "6a4b50e8a38f6a3b854c2599e410172b964a5c0198e30534a9121ff5cd86102f" } diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 45f70c827..a71575a1e 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -145,7 +145,7 @@ impl ExternalRelayer { FROM relayer_blocks WHERE finished = false ORDER BY number ASC - LIMIT 2 + LIMIT 50 ) UPDATE relayer_blocks r SET started = true @@ -263,7 +263,7 @@ impl ExternalRelayer { break Ok(()); } } - Ok(None) => { + Ok(None) => if start.elapsed().as_secs() <= 30 { tracing::warn!(?tx_hash, "no receipt returned by substrate, retrying..."); } else { @@ -272,8 +272,7 @@ impl ExternalRelayer { block_number, anyhow!("no receipt returned by substrate for more than 30 seconds"), )); - } - } + }, Err(error) => { tracing::error!(?tx_hash, ?error, "failed to fetch substrate receipt, retrying..."); } @@ -327,7 +326,7 @@ impl ExternalRelayer { .expect("writing the mismatch to a file should not fail"); tracing::error!(?err, "failed to save mismatch, saving to file"); } - Ok(res) => { + Ok(res) => if res.rows_affected() == 0 { tracing::info!( ?block_number, @@ -335,8 +334,7 @@ impl ExternalRelayer { "transaction mismatch already in database (this should only happen if this block is being retried)." ); return; - } - } + }, } #[cfg(feature = "metrics")] @@ -346,7 +344,7 @@ impl ExternalRelayer { /// Relays a transaction to Substrate and waits until the transaction is in the mempool by /// calling eth_getTransactionByHash. (infallible) #[tracing::instrument(name = "external_relayer::relay_and_check_mempool", skip_all, fields(hash))] - pub async fn relay_and_check_mempool(&self, tx_mined: TransactionMined) -> (PendingTransaction, TransactionMined) { + pub async fn relay_and_check_mempool(&self, tx_mined: TransactionMined) -> anyhow::Result<(), RelayError> { #[cfg(feature = "metrics")] let start = metrics::now(); @@ -367,7 +365,7 @@ impl ExternalRelayer { ); if self.substrate_chain.fetch_transaction(tx_hash).await.unwrap_or(None).is_some() { tracing::info!(?tx_hash, "transaction found on substrate"); - return (PendingTransaction::new(tx_hash, &self.substrate_chain), tx_mined); + return self.compare_receipt(tx_mined, PendingTransaction::new(tx_hash, &self.substrate_chain)).await; } tracing::warn!(?tx_hash, ?err, "failed to send raw transaction, retrying..."); continue; @@ -386,8 +384,7 @@ impl ExternalRelayer { #[cfg(feature = "metrics")] metrics::inc_relay_and_check_mempool(start.elapsed()); - - (tx, tx_mined) + self.compare_receipt(tx_mined, tx).await } /// Relays a dag by removing its roots and sending them consecutively. Returns `Ok` if we confirmed that all transactions @@ -406,11 +403,7 @@ impl ExternalRelayer { results.extend(join_all(futures).await); } - let futures = results - .into_iter() - .map(|(substrate_pending_tx, stratus_receipt)| self.compare_receipt(stratus_receipt, substrate_pending_tx)); - - let errors = join_all(futures).await.into_iter().filter_map(Result::err); + let errors = results.into_iter().filter_map(Result::err); let mut mismatched_blocks: MismatchedBlocks = HashSet::new(); let mut timedout_blocks: TimedoutBlocks = HashSet::new(); From 61c7f8bc1e3c3c3938b9972240e870c74265a3aa Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 20 Jun 2024 19:48:10 -0300 Subject: [PATCH 07/43] more blockss --- src/eth/relayer/external.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index a71575a1e..fae8c6b05 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -145,7 +145,7 @@ impl ExternalRelayer { FROM relayer_blocks WHERE finished = false ORDER BY number ASC - LIMIT 50 + LIMIT 100 ) UPDATE relayer_blocks r SET started = true From 319c70c499e0afed422fec0401329d7942244990 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Fri, 21 Jun 2024 10:44:03 -0300 Subject: [PATCH 08/43] sqlx prepare --- ...2e82533da4447c0880faa66a21ba404a6769fe3f8d27414281c4.json} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename .sqlx/{query-6a4b50e8a38f6a3b854c2599e410172b964a5c0198e30534a9121ff5cd86102f.json => query-25354e2a5d772e82533da4447c0880faa66a21ba404a6769fe3f8d27414281c4.json} (64%) diff --git a/.sqlx/query-6a4b50e8a38f6a3b854c2599e410172b964a5c0198e30534a9121ff5cd86102f.json b/.sqlx/query-25354e2a5d772e82533da4447c0880faa66a21ba404a6769fe3f8d27414281c4.json similarity index 64% rename from .sqlx/query-6a4b50e8a38f6a3b854c2599e410172b964a5c0198e30534a9121ff5cd86102f.json rename to .sqlx/query-25354e2a5d772e82533da4447c0880faa66a21ba404a6769fe3f8d27414281c4.json index 4654a0ef8..81a13435f 100644 --- a/.sqlx/query-6a4b50e8a38f6a3b854c2599e410172b964a5c0198e30534a9121ff5cd86102f.json +++ b/.sqlx/query-25354e2a5d772e82533da4447c0880faa66a21ba404a6769fe3f8d27414281c4.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH cte AS (\n SELECT number\n FROM relayer_blocks\n WHERE finished = false\n ORDER BY number ASC\n LIMIT 50\n )\n UPDATE relayer_blocks r\n SET started = true\n FROM cte\n WHERE r.number = cte.number\n RETURNING r.number, r.payload", + "query": "\n WITH cte AS (\n SELECT number\n FROM relayer_blocks\n WHERE finished = false\n ORDER BY number ASC\n LIMIT 100\n )\n UPDATE relayer_blocks r\n SET started = true\n FROM cte\n WHERE r.number = cte.number\n RETURNING r.number, r.payload", "describe": { "columns": [ { @@ -22,5 +22,5 @@ false ] }, - "hash": "6a4b50e8a38f6a3b854c2599e410172b964a5c0198e30534a9121ff5cd86102f" + "hash": "25354e2a5d772e82533da4447c0880faa66a21ba404a6769fe3f8d27414281c4" } From 5621fe57f2ec3909fda93a42a7b6fd82a4275b36 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Fri, 21 Jun 2024 17:41:16 -0300 Subject: [PATCH 09/43] sign transactions --- Cargo.lock | 208 +++++++++++++++++++++++- Cargo.toml | 1 + src/config.rs | 3 + src/eth/primitives/transaction_input.rs | 18 ++ src/eth/relayer/external.rs | 24 ++- src/eth/relayer/transaction_dag.rs | 38 ++--- 6 files changed, 262 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 942639225..ee723eb58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "ahash" version = "0.7.8" @@ -464,6 +475,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bech32" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445" + [[package]] name = "beef" version = "0.5.2" @@ -631,6 +648,16 @@ dependencies = [ "syn_derive", ] +[[package]] +name = "bs58" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf88ba1141d185c399bee5288d850d63b8369520c1eafc32a0430b5b6c287bf4" +dependencies = [ + "sha2", + "tinyvec", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -801,6 +828,16 @@ dependencies = [ "windows-targets 0.52.5", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clang-sys" version = "1.8.1" @@ -852,6 +889,58 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" +[[package]] +name = "coins-bip32" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b6be4a5df2098cd811f3194f64ddb96c267606bffd9689ac7b0160097b01ad3" +dependencies = [ + "bs58", + "coins-core", + "digest 0.10.7", + "hmac", + "k256", + "serde", + "sha2", + "thiserror", +] + +[[package]] +name = "coins-bip39" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3db8fba409ce3dc04f7d804074039eb68b960b0829161f8e06c95fea3f122528" +dependencies = [ + "bitvec", + "coins-bip32", + "hmac", + "once_cell", + "pbkdf2 0.12.2", + "rand", + "sha2", + "thiserror", +] + +[[package]] +name = "coins-core" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5286a0843c21f8367f7be734f89df9b822e0321d8bcce8d6e735aadff7d74979" +dependencies = [ + "base64 0.21.7", + "bech32", + "bs58", + "digest 0.10.7", + "generic-array", + "hex", + "ripemd", + "serde", + "serde_derive", + "sha2", + "sha3", + "thiserror", +] + [[package]] name = "colorchoice" version = "1.0.1" @@ -1059,6 +1148,15 @@ dependencies = [ "typenum", ] +[[package]] +name = "ctr" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" +dependencies = [ + "cipher", +] + [[package]] name = "darling" version = "0.13.4" @@ -1142,7 +1240,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d" dependencies = [ "serde", - "uuid", + "uuid 1.8.0", ] [[package]] @@ -1392,6 +1490,28 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "eth-keystore" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fda3bf123be441da5260717e0661c25a2fd9cb2b2c1d20bf2e05580047158ab" +dependencies = [ + "aes", + "ctr", + "digest 0.10.7", + "hex", + "hmac", + "pbkdf2 0.11.0", + "rand", + "scrypt", + "serde", + "serde_json", + "sha2", + "sha3", + "thiserror", + "uuid 0.8.2", +] + [[package]] name = "ethabi" version = "18.0.0" @@ -1467,6 +1587,25 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "ethers-signers" +version = "2.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "228875491c782ad851773b652dd8ecac62cda8571d3bc32a5853644dd26766c2" +dependencies = [ + "async-trait", + "coins-bip32", + "coins-bip39", + "const-hex", + "elliptic-curve", + "eth-keystore", + "ethers-core", + "rand", + "sha2", + "thiserror", + "tracing", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -2456,6 +2595,15 @@ dependencies = [ "serde", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "generic-array", +] + [[package]] name = "instant" version = "0.1.13" @@ -2756,6 +2904,7 @@ dependencies = [ "elliptic-curve", "once_cell", "sha2", + "signature", ] [[package]] @@ -3589,6 +3738,25 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pbkdf2" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83a0692ec44e4cf1ef28ca317f14f8f07da2d95ec3fa01f86e4467b725e60917" +dependencies = [ + "digest 0.10.7", +] + +[[package]] +name = "pbkdf2" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" +dependencies = [ + "digest 0.10.7", + "hmac", +] + [[package]] name = "pem" version = "3.0.4" @@ -4372,7 +4540,7 @@ dependencies = [ "rkyv_derive", "seahash", "tinyvec", - "uuid", + "uuid 1.8.0", ] [[package]] @@ -4702,6 +4870,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "salsa20" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213" +dependencies = [ + "cipher", +] + [[package]] name = "same-file" version = "1.0.6" @@ -4774,6 +4951,18 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "scrypt" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f9e24d2b632954ded8ab2ef9fea0a0c769ea56ea98bddbafbad22caeeadf45d" +dependencies = [ + "hmac", + "pbkdf2 0.11.0", + "salsa20", + "sha2", +] + [[package]] name = "sct" version = "0.7.1" @@ -4995,7 +5184,7 @@ dependencies = [ "thiserror", "time", "url", - "uuid", + "uuid 1.8.0", ] [[package]] @@ -5603,6 +5792,7 @@ dependencies = [ "ethabi", "ethereum-types", "ethers-core", + "ethers-signers", "evm-disassembler", "fake", "fancy-duration", @@ -5668,7 +5858,7 @@ dependencies = [ "triehash", "ulid", "url", - "uuid", + "uuid 1.8.0", "vergen", ] @@ -6479,6 +6669,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +dependencies = [ + "getrandom", + "serde", +] + [[package]] name = "uuid" version = "1.8.0" diff --git a/Cargo.toml b/Cargo.toml index 992084047..e7a524781 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,6 +109,7 @@ fake = { version = "=2.9.2", features = ["derive"] } # relayer petgraph = "=0.6.5" +ethers-signers = "2.0.14" # ------------------------------------------------------------------------------ # Platform specific dependencies diff --git a/src/config.rs b/src/config.rs index 805330802..f0bf35472 100644 --- a/src/config.rs +++ b/src/config.rs @@ -384,6 +384,9 @@ pub struct ExternalRelayerServerConfig { /// RPC response timeout. #[arg(long = "rpc-timeout", value_parser=parse_duration, env = "RPC_TIMEOUT", default_value = "2s")] pub rpc_timeout: Duration, + + #[arg(long = "signer", value_parser=parse_duration, env = "SIGNER")] + pub signer: String, } impl ExternalRelayerServerConfig { diff --git a/src/eth/primitives/transaction_input.rs b/src/eth/primitives/transaction_input.rs index 6f0764e1d..b728bd840 100644 --- a/src/eth/primitives/transaction_input.rs +++ b/src/eth/primitives/transaction_input.rs @@ -4,7 +4,9 @@ use anyhow::anyhow; use display_json::DebugAsJson; use ethereum_types::U256; use ethereum_types::U64; +use ethers_core::types::NameOrAddress; use ethers_core::types::Transaction as EthersTransaction; +use ethers_core::types::TransactionRequest; use fake::Dummy; use fake::Fake; use fake::Faker; @@ -182,3 +184,19 @@ impl From for EthersTransaction { } } } + +impl From for TransactionRequest { + fn from(value: TransactionInput) -> Self { + let input = value; + Self { + chain_id: input.chain_id.map(|id| id.inner_value()), + nonce: Some(input.nonce.into()), + from: Some(input.signer.into()), + to: input.to.map(|to| NameOrAddress::Address(to.into())), + value: Some(input.value.into()), + gas_price: Some(input.gas_price.into()), + gas: Some(input.gas_limit.into()), + data: Some(input.input.into()) + } + } +} diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index fae8c6b05..5829175d0 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -3,7 +3,9 @@ use std::time::Duration; use anyhow::anyhow; use anyhow::Context; -use ethers_core::types::Transaction; +use ethers_core::types::transaction::eip2718::TypedTransaction; +use ethers_signers::LocalWallet; +use ethers_signers::Signer; use futures::future::join_all; use futures::StreamExt; use itertools::Itertools; @@ -58,6 +60,8 @@ pub struct ExternalRelayer { /// RPC client that will submit transactions. stratus_chain: BlockchainClient, + + signer: LocalWallet, } impl ExternalRelayer { @@ -76,6 +80,7 @@ impl ExternalRelayer { substrate_chain: BlockchainClient::new_http(&config.forward_to, config.rpc_timeout).await?, stratus_chain: BlockchainClient::new_http(&config.stratus_rpc, config.rpc_timeout).await?, pool, + signer: LocalWallet::from_bytes(&const_hex::decode(config.signer).unwrap()).unwrap(), }) } @@ -263,7 +268,7 @@ impl ExternalRelayer { break Ok(()); } } - Ok(None) => + Ok(None) => { if start.elapsed().as_secs() <= 30 { tracing::warn!(?tx_hash, "no receipt returned by substrate, retrying..."); } else { @@ -272,7 +277,8 @@ impl ExternalRelayer { block_number, anyhow!("no receipt returned by substrate for more than 30 seconds"), )); - }, + } + } Err(error) => { tracing::error!(?tx_hash, ?error, "failed to fetch substrate receipt, retrying..."); } @@ -326,7 +332,7 @@ impl ExternalRelayer { .expect("writing the mismatch to a file should not fail"); tracing::error!(?err, "failed to save mismatch, saving to file"); } - Ok(res) => + Ok(res) => { if res.rows_affected() == 0 { tracing::info!( ?block_number, @@ -334,7 +340,8 @@ impl ExternalRelayer { "transaction mismatch already in database (this should only happen if this block is being retried)." ); return; - }, + } + } } #[cfg(feature = "metrics")] @@ -353,10 +360,13 @@ impl ExternalRelayer { // fill span Span::with(|s| s.rec_str("hash", &tx_hash)); + let req = TypedTransaction::Legacy(tx_mined.input.clone().into()); + let _new_hash = req.sighash(); + let signature = self.signer + .sign_transaction(&req).await.unwrap(); - let ethers_tx = Transaction::from(tx_mined.input.clone()); let tx = loop { - match self.substrate_chain.send_raw_transaction(tx_hash, ethers_tx.rlp()).await { + match self.substrate_chain.send_raw_transaction(tx_hash, req.rlp_signed(&signature)).await { Ok(tx) => break tx, Err(err) => { tracing::info!( diff --git a/src/eth/relayer/transaction_dag.rs b/src/eth/relayer/transaction_dag.rs index 52e3ada32..e86137cb4 100644 --- a/src/eth/relayer/transaction_dag.rs +++ b/src/eth/relayer/transaction_dag.rs @@ -21,28 +21,28 @@ pub struct TransactionDag { impl TransactionDag { pub fn get_slot_writes(block_transactions: &Vec) -> HashSet<(Address, SlotIndex)> { block_transactions - .iter() - .flat_map(|tx| { - tx.execution.changes.iter().flat_map(|(address, change)| { - change - .slots - .iter() - .filter_map(|(idx, slot_change)| slot_change.is_modified().then_some((*address, *idx))) + .iter() + .flat_map(|tx| { + tx.execution.changes.iter().flat_map(|(address, change)| { + change + .slots + .iter() + .filter_map(|(idx, slot_change)| slot_change.is_modified().then_some((*address, *idx))) + }) }) - }) - .collect() + .collect() } pub fn get_balance_writes(block_transactions: &Vec) -> HashSet
{ block_transactions - .iter() - .flat_map(|tx| { - tx.execution - .changes - .iter() - .filter_map(|(address, change)| change.balance.is_modified().then_some(*address)) - }) - .collect() + .iter() + .flat_map(|tx| { + tx.execution + .changes + .iter() + .filter_map(|(address, change)| change.balance.is_modified().then_some(*address)) + }) + .collect() } /// Uses the transactions and produces a Dependency DAG (Directed Acyclical Graph). @@ -94,8 +94,8 @@ impl TransactionDag { node_indexes.insert((tx_bnum, tx_idx), node_idx); } - Self::compute_edges(&mut dag, slot_conflicts, &node_indexes); - Self::compute_edges(&mut dag, balance_conflicts, &node_indexes); + //Self::compute_edges(&mut dag, slot_conflicts, &node_indexes); + //Self::compute_edges(&mut dag, balance_conflicts, &node_indexes); #[cfg(feature = "metrics")] metrics::inc_compute_tx_dag(start.elapsed()); From 255a93301f45db02b86a0cdb2035bff3a8b377a6 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Fri, 21 Jun 2024 17:53:20 -0300 Subject: [PATCH 10/43] new hash --- src/eth/relayer/external.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 5829175d0..21c5b4ce5 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -361,7 +361,7 @@ impl ExternalRelayer { // fill span Span::with(|s| s.rec_str("hash", &tx_hash)); let req = TypedTransaction::Legacy(tx_mined.input.clone().into()); - let _new_hash = req.sighash(); + let new_hash = req.sighash(); let signature = self.signer .sign_transaction(&req).await.unwrap(); @@ -373,9 +373,9 @@ impl ExternalRelayer { ?tx_hash, "substrate_chain.send_raw_transaction returned an error, checking if transaction was sent anyway" ); - if self.substrate_chain.fetch_transaction(tx_hash).await.unwrap_or(None).is_some() { - tracing::info!(?tx_hash, "transaction found on substrate"); - return self.compare_receipt(tx_mined, PendingTransaction::new(tx_hash, &self.substrate_chain)).await; + if self.substrate_chain.fetch_transaction(new_hash.into()).await.unwrap_or(None).is_some() { + tracing::info!(?new_hash, "transaction found on substrate"); + return self.compare_receipt(tx_mined, PendingTransaction::new(new_hash.into(), &self.substrate_chain)).await; } tracing::warn!(?tx_hash, ?err, "failed to send raw transaction, retrying..."); continue; @@ -384,10 +384,10 @@ impl ExternalRelayer { }; // this is probably redundant since send_raw_transaction probably only succeeds if the transaction was added to the mempool already. - tracing::info!(?tx_mined.input.hash, "polling eth_getTransactionByHash"); + tracing::info!(?new_hash, "polling eth_getTransactionByHash"); let mut tries = 0; - while self.substrate_chain.fetch_transaction(tx_mined.input.hash).await.unwrap_or(None).is_none() { - tracing::warn!(?tx_mined.input.hash, ?tries, "transaction not found, retrying..."); + while self.substrate_chain.fetch_transaction(new_hash.into()).await.unwrap_or(None).is_none() { + tracing::warn!(?new_hash, ?tries, "transaction not found, retrying..."); traced_sleep(Duration::from_millis(100), SleepReason::SyncData).await; tries += 1; } From 43fed0704322039d007d30bc64494c05391004db Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Mon, 24 Jun 2024 10:52:45 -0300 Subject: [PATCH 11/43] keep track of the nonce --- src/eth/primitives/nonce.rs | 17 ++++++ src/eth/primitives/transaction_input.rs | 2 +- src/eth/relayer/external.rs | 53 +++++++++++++++---- src/eth/rpc/rpc_server.rs | 21 ++++++++ .../blockchain_client/blockchain_client.rs | 17 ++++++ 5 files changed, 100 insertions(+), 10 deletions(-) diff --git a/src/eth/primitives/nonce.rs b/src/eth/primitives/nonce.rs index 1cc74ab81..0553c65ec 100644 --- a/src/eth/primitives/nonce.rs +++ b/src/eth/primitives/nonce.rs @@ -6,6 +6,8 @@ //! offers functionalities to create, manage, and convert nonces, maintaining //! the integrity and uniqueness of transactions in the network. +use std::str::FromStr; + use anyhow::anyhow; use ethereum_types::U256; use ethereum_types::U64; @@ -52,6 +54,21 @@ impl TryFrom for Nonce { } } +impl FromStr for Nonce { + type Err = anyhow::Error; + + fn from_str(s: &str) -> anyhow::Result { + // This parses a hexadecimal string + match U64::from_str(s) { + Ok(parsed) => Ok(Self(parsed)), + Err(e) => { + tracing::warn!(reason = ?e, value = %s, "failed to parse nonce"); + Err(anyhow!("Failed to parse field '{}' with value '{}'", "nonce", s)) + } + } + } +} + // ----------------------------------------------------------------------------- // Conversions: Self -> Other // ----------------------------------------------------------------------------- diff --git a/src/eth/primitives/transaction_input.rs b/src/eth/primitives/transaction_input.rs index b728bd840..b72526db4 100644 --- a/src/eth/primitives/transaction_input.rs +++ b/src/eth/primitives/transaction_input.rs @@ -196,7 +196,7 @@ impl From for TransactionRequest { value: Some(input.value.into()), gas_price: Some(input.gas_price.into()), gas: Some(input.gas_limit.into()), - data: Some(input.input.into()) + data: Some(input.input.into()), } } } diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 21c5b4ce5..df26bd034 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -1,9 +1,13 @@ use std::collections::HashSet; use std::time::Duration; +use ::metrics::atomics::AtomicU64; use anyhow::anyhow; use anyhow::Context; +use ethereum_types::H256; use ethers_core::types::transaction::eip2718::TypedTransaction; +use ethers_core::types::Bytes; +use ethers_core::types::TransactionRequest; use ethers_signers::LocalWallet; use ethers_signers::Signer; use futures::future::join_all; @@ -28,6 +32,7 @@ use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::Hash; use crate::eth::primitives::SlotIndex; use crate::eth::primitives::StoragePointInTime; +use crate::eth::primitives::TransactionInput; use crate::eth::primitives::TransactionMined; use crate::ext::traced_sleep; use crate::ext::ResultExt; @@ -52,6 +57,33 @@ pub enum RelayError { CompareTimeout(BlockNumber, anyhow::Error), } +struct TxSigner { + wallet: LocalWallet, + nonce: AtomicU64, +} + +impl TxSigner { + pub async fn new(private_key: String, chain: &BlockchainClient) -> anyhow::Result { + let private_key = const_hex::decode(private_key)?; + let wallet = LocalWallet::from_bytes(&private_key)?; + let addr = wallet.address().into(); + let nonce = chain.fetch_transaction_count(&addr).await?; + Ok(Self { + wallet, + nonce: u64::from(nonce).into(), + }) + } + + pub async fn sign_transaction(&self, tx: TransactionInput) -> (H256, Bytes) { + let tx: TransactionRequest = + >::from(tx).nonce(self.nonce.fetch_add(1, std::sync::atomic::Ordering::SeqCst)); + let req = TypedTransaction::Legacy(tx); + let new_hash = req.sighash(); + let signature = self.wallet.sign_transaction(&req).await.unwrap(); + (new_hash, req.rlp_signed(&signature)) + } +} + pub struct ExternalRelayer { pool: PgPool, @@ -61,7 +93,7 @@ pub struct ExternalRelayer { /// RPC client that will submit transactions. stratus_chain: BlockchainClient, - signer: LocalWallet, + signer: TxSigner, } impl ExternalRelayer { @@ -76,11 +108,14 @@ impl ExternalRelayer { .await .expect("should not fail to create pgpool"); + let substrate_chain = BlockchainClient::new_http(&config.forward_to, config.rpc_timeout).await?; + let signer = TxSigner::new(config.signer, &substrate_chain).await?; + Ok(Self { - substrate_chain: BlockchainClient::new_http(&config.forward_to, config.rpc_timeout).await?, + substrate_chain, stratus_chain: BlockchainClient::new_http(&config.stratus_rpc, config.rpc_timeout).await?, pool, - signer: LocalWallet::from_bytes(&const_hex::decode(config.signer).unwrap()).unwrap(), + signer, }) } @@ -360,13 +395,11 @@ impl ExternalRelayer { // fill span Span::with(|s| s.rec_str("hash", &tx_hash)); - let req = TypedTransaction::Legacy(tx_mined.input.clone().into()); - let new_hash = req.sighash(); - let signature = self.signer - .sign_transaction(&req).await.unwrap(); + + let (new_hash, rlp) = self.signer.sign_transaction(tx_mined.input.clone()).await; let tx = loop { - match self.substrate_chain.send_raw_transaction(tx_hash, req.rlp_signed(&signature)).await { + match self.substrate_chain.send_raw_transaction(tx_hash, rlp.clone()).await { Ok(tx) => break tx, Err(err) => { tracing::info!( @@ -375,7 +408,9 @@ impl ExternalRelayer { ); if self.substrate_chain.fetch_transaction(new_hash.into()).await.unwrap_or(None).is_some() { tracing::info!(?new_hash, "transaction found on substrate"); - return self.compare_receipt(tx_mined, PendingTransaction::new(new_hash.into(), &self.substrate_chain)).await; + return self + .compare_receipt(tx_mined, PendingTransaction::new(new_hash.into(), &self.substrate_chain)) + .await; } tracing::warn!(?tx_hash, ?err, "failed to send raw transaction, retrying..."); continue; diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index 62bb75c3e..5aa6e3ca8 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -156,6 +156,7 @@ fn register_methods(mut module: RpcModule) -> anyhow::Result, _: &RpcContext, _: &Extensions) -> anyhow::Res Ok(build_info::as_json()) } +#[tracing::instrument(name = "rpc::stratus_getSlots", skip_all, fields(address, indexes))] +fn stratus_get_slots(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { + let (params, address) = next_rpc_param::
(params.sequence())?; + let (params, indexes) = next_rpc_param::>(params)?; + let (_, block_selection) = next_rpc_param_or_default::(params)?; + + Span::with(|s| { + s.rec_str("address", &address); + s.rec_str("index", &format!("{:?}", indexes)); + }); + + let point_in_time = ctx.storage.translate_to_point_in_time(&block_selection)?; + let slots = indexes + .into_iter() + .map(|index| ctx.storage.read_slot(&address, &index, &point_in_time)) + .collect::, _>>()?; + + Ok(serde_json::to_value(slots).expect_infallible()) +} + // ----------------------------------------------------------------------------- // Blockchain // ----------------------------------------------------------------------------- diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index 6158a9fee..228ce78b4 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -21,6 +21,7 @@ use crate::eth::primitives::BlockNumber; use crate::eth::primitives::ExternalBlock; use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::Hash; +use crate::eth::primitives::Nonce; use crate::eth::primitives::SlotIndex; use crate::eth::primitives::SlotValue; use crate::eth::primitives::StoragePointInTime; @@ -242,6 +243,22 @@ impl BlockchainClient { } } + /// Fetches the current transaction count (nonce) for an account. + pub async fn fetch_transaction_count(&self, address: &Address) -> anyhow::Result { + tracing::debug!("fetching block number"); + let address = serde_json::to_value(address).expect_infallible(); + + let result = self + .http + .request::>("eth_getTransactionCount", vec![address, serde_json::to_value("latest").expect_infallible()]) + .await; + + match result { + Ok(number) => Ok(number), + Err(e) => log_and_err!(reason = e, "failed to fetch transaction count"), + } + } + // ------------------------------------------------------------------------- // RPC mutations // ------------------------------------------------------------------------- From 9388c4129620a91b7a8356024452b7acbea92ca7 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Mon, 24 Jun 2024 11:22:45 -0300 Subject: [PATCH 12/43] fix dumb issue --- src/config.rs | 2 +- src/eth/relayer/external.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/config.rs b/src/config.rs index f0bf35472..e76c1c831 100644 --- a/src/config.rs +++ b/src/config.rs @@ -385,7 +385,7 @@ pub struct ExternalRelayerServerConfig { #[arg(long = "rpc-timeout", value_parser=parse_duration, env = "RPC_TIMEOUT", default_value = "2s")] pub rpc_timeout: Duration, - #[arg(long = "signer", value_parser=parse_duration, env = "SIGNER")] + #[arg(long = "signer", env = "SIGNER")] pub signer: String, } diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index df26bd034..0aae0f57c 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -74,12 +74,12 @@ impl TxSigner { }) } - pub async fn sign_transaction(&self, tx: TransactionInput) -> (H256, Bytes) { + pub fn sign_transaction(&self, tx: TransactionInput) -> (H256, Bytes) { let tx: TransactionRequest = >::from(tx).nonce(self.nonce.fetch_add(1, std::sync::atomic::Ordering::SeqCst)); let req = TypedTransaction::Legacy(tx); let new_hash = req.sighash(); - let signature = self.wallet.sign_transaction(&req).await.unwrap(); + let signature = self.wallet.sign_transaction_sync(&req).unwrap(); (new_hash, req.rlp_signed(&signature)) } } @@ -396,7 +396,7 @@ impl ExternalRelayer { // fill span Span::with(|s| s.rec_str("hash", &tx_hash)); - let (new_hash, rlp) = self.signer.sign_transaction(tx_mined.input.clone()).await; + let (new_hash, rlp) = self.signer.sign_transaction(tx_mined.input.clone()); let tx = loop { match self.substrate_chain.send_raw_transaction(tx_hash, rlp.clone()).await { From 41dec68a99cf5a44db9ba5beae36b5da4ade4284 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Mon, 24 Jun 2024 13:02:40 -0300 Subject: [PATCH 13/43] dumb bug --- src/eth/relayer/external.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 0aae0f57c..fca6f4fb3 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -399,11 +399,11 @@ impl ExternalRelayer { let (new_hash, rlp) = self.signer.sign_transaction(tx_mined.input.clone()); let tx = loop { - match self.substrate_chain.send_raw_transaction(tx_hash, rlp.clone()).await { + match self.substrate_chain.send_raw_transaction(new_hash.into(), rlp.clone()).await { Ok(tx) => break tx, Err(err) => { - tracing::info!( - ?tx_hash, + tracing::warn!( + ?new_hash, "substrate_chain.send_raw_transaction returned an error, checking if transaction was sent anyway" ); if self.substrate_chain.fetch_transaction(new_hash.into()).await.unwrap_or(None).is_some() { @@ -412,7 +412,7 @@ impl ExternalRelayer { .compare_receipt(tx_mined, PendingTransaction::new(new_hash.into(), &self.substrate_chain)) .await; } - tracing::warn!(?tx_hash, ?err, "failed to send raw transaction, retrying..."); + tracing::warn!(?new_hash, ?err, "failed to send raw transaction, retrying..."); continue; } } From db5c4e43231ee5cf473bbdad1f03d89d99ce9e54 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Mon, 24 Jun 2024 13:25:53 -0300 Subject: [PATCH 14/43] sign transactions before --- src/eth/relayer/external.rs | 48 +++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index fca6f4fb3..d7a8c9471 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::time::Duration; +use ethers_core::types::Transaction; use ::metrics::atomics::AtomicU64; use anyhow::anyhow; use anyhow::Context; @@ -74,13 +75,21 @@ impl TxSigner { }) } - pub fn sign_transaction(&self, tx: TransactionInput) -> (H256, Bytes) { + + pub fn sign_transaction_input(&self, mut tx_input: TransactionInput) -> TransactionInput { let tx: TransactionRequest = - >::from(tx).nonce(self.nonce.fetch_add(1, std::sync::atomic::Ordering::SeqCst)); + >::from(tx_input.clone()).nonce(self.nonce.fetch_add(1, std::sync::atomic::Ordering::SeqCst)); let req = TypedTransaction::Legacy(tx); let new_hash = req.sighash(); let signature = self.wallet.sign_transaction_sync(&req).unwrap(); - (new_hash, req.rlp_signed(&signature)) + + tx_input.signer = self.wallet.address().into(); + tx_input.hash = new_hash.into(); + tx_input.r = signature.r; + tx_input.s = signature.s; + tx_input.v = signature.v.into(); + + tx_input } } @@ -185,7 +194,7 @@ impl ExternalRelayer { FROM relayer_blocks WHERE finished = false ORDER BY number ASC - LIMIT 100 + LIMIT 10 ) UPDATE relayer_blocks r SET started = true @@ -217,7 +226,13 @@ impl ExternalRelayer { return Err(anyhow!("some blocks in this batch have not been mined in stratus")); } - let combined_transactions = Self::combine_transactions(blocks); + let combined_transactions = Self::combine_transactions(blocks) + .into_iter() + .map(|mut tx| { + tx.input = self.signer.sign_transaction_input(tx.input); + tx + }) + .collect(); let modified_slots = TransactionDag::get_slot_writes(&combined_transactions); // TODO: Replace failed transactions with transactions that will for sure fail in substrate (need access to primary keys) @@ -396,33 +411,30 @@ impl ExternalRelayer { // fill span Span::with(|s| s.rec_str("hash", &tx_hash)); - let (new_hash, rlp) = self.signer.sign_transaction(tx_mined.input.clone()); - + let rlp = Transaction::from(tx_mined.input.clone()).rlp(); let tx = loop { - match self.substrate_chain.send_raw_transaction(new_hash.into(), rlp.clone()).await { + match self.substrate_chain.send_raw_transaction(tx_hash, rlp.clone()).await { Ok(tx) => break tx, Err(err) => { tracing::warn!( - ?new_hash, + ?tx_hash, "substrate_chain.send_raw_transaction returned an error, checking if transaction was sent anyway" ); - if self.substrate_chain.fetch_transaction(new_hash.into()).await.unwrap_or(None).is_some() { - tracing::info!(?new_hash, "transaction found on substrate"); - return self - .compare_receipt(tx_mined, PendingTransaction::new(new_hash.into(), &self.substrate_chain)) - .await; + if self.substrate_chain.fetch_transaction(tx_hash).await.unwrap_or(None).is_some() { + tracing::info!(?tx_hash, "transaction found on substrate"); + return self.compare_receipt(tx_mined, PendingTransaction::new(tx_hash, &self.substrate_chain)).await; } - tracing::warn!(?new_hash, ?err, "failed to send raw transaction, retrying..."); + tracing::warn!(?tx_hash, ?err, "failed to send raw transaction, retrying..."); continue; } } }; // this is probably redundant since send_raw_transaction probably only succeeds if the transaction was added to the mempool already. - tracing::info!(?new_hash, "polling eth_getTransactionByHash"); + tracing::info!(?tx_mined.input.hash, "polling eth_getTransactionByHash"); let mut tries = 0; - while self.substrate_chain.fetch_transaction(new_hash.into()).await.unwrap_or(None).is_none() { - tracing::warn!(?new_hash, ?tries, "transaction not found, retrying..."); + while self.substrate_chain.fetch_transaction(tx_mined.input.hash).await.unwrap_or(None).is_none() { + tracing::warn!(?tx_mined.input.hash, ?tries, "transaction not found, retrying..."); traced_sleep(Duration::from_millis(100), SleepReason::SyncData).await; tries += 1; } From 9b778ded6c6f7990a4fa7171399f6dae29bb6ce1 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Mon, 24 Jun 2024 14:40:28 -0300 Subject: [PATCH 15/43] sqlx prepare --- ...dac87dc25723f755e5ac8f53e4e1ff1f677f8ed58bd8266d270d.json} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename .sqlx/{query-25354e2a5d772e82533da4447c0880faa66a21ba404a6769fe3f8d27414281c4.json => query-1da3934b6b1ddac87dc25723f755e5ac8f53e4e1ff1f677f8ed58bd8266d270d.json} (64%) diff --git a/.sqlx/query-25354e2a5d772e82533da4447c0880faa66a21ba404a6769fe3f8d27414281c4.json b/.sqlx/query-1da3934b6b1ddac87dc25723f755e5ac8f53e4e1ff1f677f8ed58bd8266d270d.json similarity index 64% rename from .sqlx/query-25354e2a5d772e82533da4447c0880faa66a21ba404a6769fe3f8d27414281c4.json rename to .sqlx/query-1da3934b6b1ddac87dc25723f755e5ac8f53e4e1ff1f677f8ed58bd8266d270d.json index 81a13435f..02f286028 100644 --- a/.sqlx/query-25354e2a5d772e82533da4447c0880faa66a21ba404a6769fe3f8d27414281c4.json +++ b/.sqlx/query-1da3934b6b1ddac87dc25723f755e5ac8f53e4e1ff1f677f8ed58bd8266d270d.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH cte AS (\n SELECT number\n FROM relayer_blocks\n WHERE finished = false\n ORDER BY number ASC\n LIMIT 100\n )\n UPDATE relayer_blocks r\n SET started = true\n FROM cte\n WHERE r.number = cte.number\n RETURNING r.number, r.payload", + "query": "\n WITH cte AS (\n SELECT number\n FROM relayer_blocks\n WHERE finished = false\n ORDER BY number ASC\n LIMIT 10\n )\n UPDATE relayer_blocks r\n SET started = true\n FROM cte\n WHERE r.number = cte.number\n RETURNING r.number, r.payload", "describe": { "columns": [ { @@ -22,5 +22,5 @@ false ] }, - "hash": "25354e2a5d772e82533da4447c0880faa66a21ba404a6769fe3f8d27414281c4" + "hash": "1da3934b6b1ddac87dc25723f755e5ac8f53e4e1ff1f677f8ed58bd8266d270d" } From 157f97430273c31408777717a7ad4f62d80334b4 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Mon, 24 Jun 2024 15:27:46 -0300 Subject: [PATCH 16/43] sort txs --- src/eth/relayer/external.rs | 1 + src/eth/relayer/transaction_dag.rs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index d7a8c9471..a917dc923 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -228,6 +228,7 @@ impl ExternalRelayer { let combined_transactions = Self::combine_transactions(blocks) .into_iter() + .sorted() .map(|mut tx| { tx.input = self.signer.sign_transaction_input(tx.input); tx diff --git a/src/eth/relayer/transaction_dag.rs b/src/eth/relayer/transaction_dag.rs index e86137cb4..591af1f80 100644 --- a/src/eth/relayer/transaction_dag.rs +++ b/src/eth/relayer/transaction_dag.rs @@ -94,8 +94,8 @@ impl TransactionDag { node_indexes.insert((tx_bnum, tx_idx), node_idx); } - //Self::compute_edges(&mut dag, slot_conflicts, &node_indexes); - //Self::compute_edges(&mut dag, balance_conflicts, &node_indexes); + Self::compute_edges(&mut dag, slot_conflicts, &node_indexes); + Self::compute_edges(&mut dag, balance_conflicts, &node_indexes); #[cfg(feature = "metrics")] metrics::inc_compute_tx_dag(start.elapsed()); From 111ce885676eb7e158c0cdeba91aab93139d4aa7 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Mon, 24 Jun 2024 15:29:50 -0300 Subject: [PATCH 17/43] set tx_type to 0 --- src/eth/relayer/external.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index a917dc923..dac72b404 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -84,6 +84,7 @@ impl TxSigner { let signature = self.wallet.sign_transaction_sync(&req).unwrap(); tx_input.signer = self.wallet.address().into(); + tx_input.tx_type = Some(0.into()); tx_input.hash = new_hash.into(); tx_input.r = signature.r; tx_input.s = signature.s; From 12067d92f8d909cccce510cf84a91ff2ab6bbc08 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Mon, 24 Jun 2024 15:36:20 -0300 Subject: [PATCH 18/43] lint --- src/eth/relayer/external.rs | 25 ++++++++++--------------- src/eth/relayer/transaction_dag.rs | 4 ++-- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index dac72b404..ae0317f21 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -1,13 +1,11 @@ use std::collections::HashSet; use std::time::Duration; -use ethers_core::types::Transaction; use ::metrics::atomics::AtomicU64; use anyhow::anyhow; use anyhow::Context; -use ethereum_types::H256; use ethers_core::types::transaction::eip2718::TypedTransaction; -use ethers_core::types::Bytes; +use ethers_core::types::Transaction; use ethers_core::types::TransactionRequest; use ethers_signers::LocalWallet; use ethers_signers::Signer; @@ -75,7 +73,6 @@ impl TxSigner { }) } - pub fn sign_transaction_input(&self, mut tx_input: TransactionInput) -> TransactionInput { let tx: TransactionRequest = >::from(tx_input.clone()).nonce(self.nonce.fetch_add(1, std::sync::atomic::Ordering::SeqCst)); @@ -173,13 +170,13 @@ impl ExternalRelayer { ) .execute(&self.pool) .await { - tracing::warn!(?e, "failed to insert slot mismatch, retrying...") + tracing::warn!(?e, "failed to insert slot mismatch, retrying..."); } - }}) + }}); } let mut buffer = futures::stream::iter(futures).buffer_unordered(100); - while let Some(_) = buffer.next().await {} + while buffer.next().await.is_some() {} #[cfg(feature = "metrics")] inc_compare_final_state(start.elapsed()); @@ -206,7 +203,7 @@ impl ExternalRelayer { .fetch_all(&self.pool) .await?; - if block_rows.len() == 0 { + if block_rows.is_empty() { tracing::info!("no blocks to relay"); return Ok(vec![]); } @@ -234,7 +231,7 @@ impl ExternalRelayer { tx.input = self.signer.sign_transaction_input(tx.input); tx }) - .collect(); + .collect_vec(); let modified_slots = TransactionDag::get_slot_writes(&combined_transactions); // TODO: Replace failed transactions with transactions that will for sure fail in substrate (need access to primary keys) @@ -320,7 +317,7 @@ impl ExternalRelayer { break Ok(()); } } - Ok(None) => { + Ok(None) => if start.elapsed().as_secs() <= 30 { tracing::warn!(?tx_hash, "no receipt returned by substrate, retrying..."); } else { @@ -329,8 +326,7 @@ impl ExternalRelayer { block_number, anyhow!("no receipt returned by substrate for more than 30 seconds"), )); - } - } + }, Err(error) => { tracing::error!(?tx_hash, ?error, "failed to fetch substrate receipt, retrying..."); } @@ -384,7 +380,7 @@ impl ExternalRelayer { .expect("writing the mismatch to a file should not fail"); tracing::error!(?err, "failed to save mismatch, saving to file"); } - Ok(res) => { + Ok(res) => if res.rows_affected() == 0 { tracing::info!( ?block_number, @@ -392,8 +388,7 @@ impl ExternalRelayer { "transaction mismatch already in database (this should only happen if this block is being retried)." ); return; - } - } + }, } #[cfg(feature = "metrics")] diff --git a/src/eth/relayer/transaction_dag.rs b/src/eth/relayer/transaction_dag.rs index 591af1f80..4f5056392 100644 --- a/src/eth/relayer/transaction_dag.rs +++ b/src/eth/relayer/transaction_dag.rs @@ -19,7 +19,7 @@ pub struct TransactionDag { } impl TransactionDag { - pub fn get_slot_writes(block_transactions: &Vec) -> HashSet<(Address, SlotIndex)> { + pub fn get_slot_writes(block_transactions: &[TransactionMined]) -> HashSet<(Address, SlotIndex)> { block_transactions .iter() .flat_map(|tx| { @@ -33,7 +33,7 @@ impl TransactionDag { .collect() } - pub fn get_balance_writes(block_transactions: &Vec) -> HashSet
{ + pub fn get_balance_writes(block_transactions: &[TransactionMined]) -> HashSet
{ block_transactions .iter() .flat_map(|tx| { From 5f30df66aa25e2f6a1d090b569c0110f032da1f4 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Mon, 24 Jun 2024 16:01:12 -0300 Subject: [PATCH 19/43] none is legacy --- src/eth/relayer/external.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index ae0317f21..8a41903b4 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -81,7 +81,8 @@ impl TxSigner { let signature = self.wallet.sign_transaction_sync(&req).unwrap(); tx_input.signer = self.wallet.address().into(); - tx_input.tx_type = Some(0.into()); + // None is Legacy + tx_input.tx_type = None; tx_input.hash = new_hash.into(); tx_input.r = signature.r; tx_input.s = signature.s; From 08a3c4f218fb8a3de2e8c31c673d473fd53cfc84 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Mon, 24 Jun 2024 18:47:32 -0300 Subject: [PATCH 20/43] include nonce in log --- src/eth/relayer/external.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 8a41903b4..2cfe4d52d 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -404,7 +404,7 @@ impl ExternalRelayer { let start = metrics::now(); let tx_hash = tx_mined.input.hash; - tracing::info!(?tx_hash, "relaying transaction"); + tracing::info!(?tx_mined.input.nonce, ?tx_hash, "relaying transaction"); // fill span Span::with(|s| s.rec_str("hash", &tx_hash)); @@ -415,6 +415,7 @@ impl ExternalRelayer { Ok(tx) => break tx, Err(err) => { tracing::warn!( + ?tx_mined.input.nonce, ?tx_hash, "substrate_chain.send_raw_transaction returned an error, checking if transaction was sent anyway" ); From 9bf7dc5bc4122bd7625012af396647acedd17d0f Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Tue, 25 Jun 2024 12:49:03 -0300 Subject: [PATCH 21/43] different nonce approach --- src/bin/relayer.rs | 2 +- src/eth/relayer/external.rs | 23 +++++++++++++------ .../blockchain_client/blockchain_client.rs | 2 +- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/bin/relayer.rs b/src/bin/relayer.rs index 7edd9f8c9..852d1eb68 100644 --- a/src/bin/relayer.rs +++ b/src/bin/relayer.rs @@ -23,7 +23,7 @@ async fn run(config: ExternalRelayerConfig) -> anyhow::Result<()> { // init services let backoff = config.relayer.backoff; - let relayer = config.relayer.init().await?; + let mut relayer = config.relayer.init().await?; loop { if GlobalState::warn_if_shutdown(TASK_NAME) { diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 2cfe4d52d..503018289 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -1,7 +1,6 @@ use std::collections::HashSet; use std::time::Duration; -use ::metrics::atomics::AtomicU64; use anyhow::anyhow; use anyhow::Context; use ethers_core::types::transaction::eip2718::TypedTransaction; @@ -29,6 +28,7 @@ use crate::eth::primitives::BlockNumber; use crate::eth::primitives::ExecutionValueChange; use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::Hash; +use crate::eth::primitives::Nonce; use crate::eth::primitives::SlotIndex; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionInput; @@ -58,7 +58,7 @@ pub enum RelayError { struct TxSigner { wallet: LocalWallet, - nonce: AtomicU64, + nonce: Nonce, } impl TxSigner { @@ -69,13 +69,19 @@ impl TxSigner { let nonce = chain.fetch_transaction_count(&addr).await?; Ok(Self { wallet, - nonce: u64::from(nonce).into(), + nonce, }) } - pub fn sign_transaction_input(&self, mut tx_input: TransactionInput) -> TransactionInput { + pub async fn _sync_nonce(&mut self, chain: &BlockchainClient) -> anyhow::Result<()> { + self.nonce = chain.fetch_transaction_count(&self.wallet.address().into()).await?; + Ok(()) + } + + pub fn sign_transaction_input(&mut self, mut tx_input: TransactionInput) -> TransactionInput { let tx: TransactionRequest = - >::from(tx_input.clone()).nonce(self.nonce.fetch_add(1, std::sync::atomic::Ordering::SeqCst)); + >::from(tx_input.clone()).nonce(self.nonce); + let req = TypedTransaction::Legacy(tx); let new_hash = req.sighash(); let signature = self.wallet.sign_transaction_sync(&req).unwrap(); @@ -88,6 +94,7 @@ impl TxSigner { tx_input.s = signature.s; tx_input.v = signature.v.into(); + self.nonce = self.nonce.next(); tx_input } } @@ -185,7 +192,7 @@ impl ExternalRelayer { /// Polls the next block to be relayed and relays it to Substrate. #[tracing::instrument(name = "external_relayer::relay_next_block", skip_all, fields(block_number))] - pub async fn relay_blocks(&self) -> anyhow::Result> { + pub async fn relay_blocks(&mut self) -> anyhow::Result> { let block_rows = sqlx::query!( r#" WITH cte AS ( @@ -419,11 +426,13 @@ impl ExternalRelayer { ?tx_hash, "substrate_chain.send_raw_transaction returned an error, checking if transaction was sent anyway" ); + if self.substrate_chain.fetch_transaction(tx_hash).await.unwrap_or(None).is_some() { tracing::info!(?tx_hash, "transaction found on substrate"); return self.compare_receipt(tx_mined, PendingTransaction::new(tx_hash, &self.substrate_chain)).await; } - tracing::warn!(?tx_hash, ?err, "failed to send raw transaction, retrying..."); + + tracing::warn!(?tx_hash, ?err, "failed to send raw transaction, syncing nonce and retrying..."); continue; } } diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index 228ce78b4..e14434018 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -255,7 +255,7 @@ impl BlockchainClient { match result { Ok(number) => Ok(number), - Err(e) => log_and_err!(reason = e, "failed to fetch transaction count"), + Err(e) => log_and_err!(reason = e, "failed to fetch transaction"), } } From c0967b363d54d69229fec78ba63345d170f7cea7 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Tue, 25 Jun 2024 13:20:31 -0300 Subject: [PATCH 22/43] fix dumb bug --- src/eth/relayer/external.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 503018289..e97d3b873 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -90,6 +90,7 @@ impl TxSigner { // None is Legacy tx_input.tx_type = None; tx_input.hash = new_hash.into(); + tx_input.nonce = self.nonce; tx_input.r = signature.r; tx_input.s = signature.s; tx_input.v = signature.v.into(); From 651fb3891891157152aa7e96a645838ce3974f15 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Tue, 25 Jun 2024 15:04:06 -0300 Subject: [PATCH 23/43] another dumb bug --- src/eth/relayer/external.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index e97d3b873..e61b18b8c 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -83,8 +83,8 @@ impl TxSigner { >::from(tx_input.clone()).nonce(self.nonce); let req = TypedTransaction::Legacy(tx); - let new_hash = req.sighash(); let signature = self.wallet.sign_transaction_sync(&req).unwrap(); + let new_hash = req.hash(&signature); tx_input.signer = self.wallet.address().into(); // None is Legacy From 279c5b8340a2d96de77ea0abe2f8754f71540036 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Tue, 25 Jun 2024 15:38:54 -0300 Subject: [PATCH 24/43] only sign pixcashier txs --- src/eth/relayer/external.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index e61b18b8c..430824bb8 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -29,6 +29,7 @@ use crate::eth::primitives::ExecutionValueChange; use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::Hash; use crate::eth::primitives::Nonce; +use crate::eth::primitives::Signature; use crate::eth::primitives::SlotIndex; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionInput; @@ -192,7 +193,7 @@ impl ExternalRelayer { } /// Polls the next block to be relayed and relays it to Substrate. - #[tracing::instrument(name = "external_relayer::relay_next_block", skip_all, fields(block_number))] + #[tracing::instrument(name = "external_relayer::relay_blocks", skip_all, fields(block_number))] pub async fn relay_blocks(&mut self) -> anyhow::Result> { let block_rows = sqlx::query!( r#" @@ -237,7 +238,9 @@ impl ExternalRelayer { .into_iter() .sorted() .map(|mut tx| { - tx.input = self.signer.sign_transaction_input(tx.input); + if tx.input.extract_function().is_some_and(|sig| sig.contains("PixCashier")) { + tx.input = self.signer.sign_transaction_input(tx.input); + } tx }) .collect_vec(); From bdccc897292089b2c6d42c9079391fe31e7e6e57 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Tue, 25 Jun 2024 16:25:15 -0300 Subject: [PATCH 25/43] sync nonce --- src/eth/relayer/external.rs | 3 ++- src/infra/blockchain_client/blockchain_client.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 430824bb8..6f7c93226 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -74,7 +74,7 @@ impl TxSigner { }) } - pub async fn _sync_nonce(&mut self, chain: &BlockchainClient) -> anyhow::Result<()> { + pub async fn sync_nonce(&mut self, chain: &BlockchainClient) -> anyhow::Result<()> { self.nonce = chain.fetch_transaction_count(&self.wallet.address().into()).await?; Ok(()) } @@ -234,6 +234,7 @@ impl ExternalRelayer { return Err(anyhow!("some blocks in this batch have not been mined in stratus")); } + self.signer.sync_nonce(&self.substrate_chain).await?; let combined_transactions = Self::combine_transactions(blocks) .into_iter() .sorted() diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index e14434018..228ce78b4 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -255,7 +255,7 @@ impl BlockchainClient { match result { Ok(number) => Ok(number), - Err(e) => log_and_err!(reason = e, "failed to fetch transaction"), + Err(e) => log_and_err!(reason = e, "failed to fetch transaction count"), } } From c1178beee45a7247c7c988a108e423923cbf4b52 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Tue, 25 Jun 2024 18:11:26 -0300 Subject: [PATCH 26/43] remove kube --- docker/Dockerfile.run_stratus | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/Dockerfile.run_stratus b/docker/Dockerfile.run_stratus index 158890920..31d164fdf 100644 --- a/docker/Dockerfile.run_stratus +++ b/docker/Dockerfile.run_stratus @@ -19,7 +19,7 @@ ENV CARGO_PROFILE_RELEASE_DEBUG=1 ENV LOG_FORMAT=json ENV NO_COLOR=1 -RUN cargo build --release --bin stratus --features kubernetes +RUN cargo build --release --bin stratus # Runtime From 82a25fb1a6373ba6d461e165d5b1934403daa86e Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Tue, 25 Jun 2024 19:06:48 -0300 Subject: [PATCH 27/43] remove check loop --- src/eth/relayer/external.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 59a88ac00..160080097 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -443,15 +443,6 @@ impl ExternalRelayer { } }; - // this is probably redundant since send_raw_transaction probably only succeeds if the transaction was added to the mempool already. - tracing::info!(?tx_mined.input.hash, "polling eth_getTransactionByHash"); - let mut tries = 0; - while self.substrate_chain.fetch_transaction(tx_mined.input.hash).await.unwrap_or(None).is_none() { - tracing::warn!(?tx_mined.input.hash, ?tries, "transaction not found, retrying..."); - traced_sleep(Duration::from_millis(100), SleepReason::SyncData).await; - tries += 1; - } - #[cfg(feature = "metrics")] metrics::inc_relay_and_check_mempool(start.elapsed()); self.compare_receipt(tx_mined, tx).await From 6d778643fbfbadd7e2136b02426c2b276f418410 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Tue, 25 Jun 2024 20:03:55 -0300 Subject: [PATCH 28/43] progress log --- ...71c0d0e6df3aa379330ad9d8355790668f1db6d914444376d.json} | 4 ++-- src/eth/relayer/external.rs | 7 +++++-- src/eth/relayer/transaction_dag.rs | 4 ++++ static/schema/004-relayer.sql | 6 ++++-- 4 files changed, 15 insertions(+), 6 deletions(-) rename .sqlx/{query-1da3934b6b1ddac87dc25723f755e5ac8f53e4e1ff1f677f8ed58bd8266d270d.json => query-9aad59f76a6ef0d71c0d0e6df3aa379330ad9d8355790668f1db6d914444376d.json} (64%) diff --git a/.sqlx/query-1da3934b6b1ddac87dc25723f755e5ac8f53e4e1ff1f677f8ed58bd8266d270d.json b/.sqlx/query-9aad59f76a6ef0d71c0d0e6df3aa379330ad9d8355790668f1db6d914444376d.json similarity index 64% rename from .sqlx/query-1da3934b6b1ddac87dc25723f755e5ac8f53e4e1ff1f677f8ed58bd8266d270d.json rename to .sqlx/query-9aad59f76a6ef0d71c0d0e6df3aa379330ad9d8355790668f1db6d914444376d.json index 02f286028..a0d899e23 100644 --- a/.sqlx/query-1da3934b6b1ddac87dc25723f755e5ac8f53e4e1ff1f677f8ed58bd8266d270d.json +++ b/.sqlx/query-9aad59f76a6ef0d71c0d0e6df3aa379330ad9d8355790668f1db6d914444376d.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH cte AS (\n SELECT number\n FROM relayer_blocks\n WHERE finished = false\n ORDER BY number ASC\n LIMIT 10\n )\n UPDATE relayer_blocks r\n SET started = true\n FROM cte\n WHERE r.number = cte.number\n RETURNING r.number, r.payload", + "query": "\n WITH cte AS (\n SELECT number\n FROM relayer_blocks\n WHERE finished = false\n ORDER BY number ASC\n LIMIT 5\n )\n UPDATE relayer_blocks r\n SET started = true\n FROM cte\n WHERE r.number = cte.number\n RETURNING r.number, r.payload", "describe": { "columns": [ { @@ -22,5 +22,5 @@ false ] }, - "hash": "1da3934b6b1ddac87dc25723f755e5ac8f53e4e1ff1f677f8ed58bd8266d270d" + "hash": "9aad59f76a6ef0d71c0d0e6df3aa379330ad9d8355790668f1db6d914444376d" } diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 160080097..9409776a9 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -202,7 +202,7 @@ impl ExternalRelayer { FROM relayer_blocks WHERE finished = false ORDER BY number ASC - LIMIT 10 + LIMIT 5 ) UPDATE relayer_blocks r SET started = true @@ -416,6 +416,8 @@ impl ExternalRelayer { let start = metrics::now(); let tx_hash = tx_mined.input.hash; + let nonce = tx_mined.input.nonce; + tracing::info!(?tx_mined.input.nonce, ?tx_hash, "relaying transaction"); // fill span @@ -437,7 +439,7 @@ impl ExternalRelayer { return self.compare_receipt(tx_mined, PendingTransaction::new(tx_hash, &self.substrate_chain)).await; } - tracing::warn!(?tx_hash, ?err, "failed to send raw transaction, syncing nonce and retrying..."); + tracing::warn!(?tx_hash, ?err, "failed to send raw transaction, retrying..."); continue; } } @@ -460,6 +462,7 @@ impl ExternalRelayer { let mut results = vec![]; while let Some(roots) = dag.take_roots() { + tracing::info!(elapsed=?start.elapsed().as_secs(), transaction_num=roots.len(), remaining=dag.txs_remaining(),"forwarding"); let futures = roots.into_iter().map(|root_tx| self.relay_and_check_mempool(root_tx)); results.extend(join_all(futures).await); } diff --git a/src/eth/relayer/transaction_dag.rs b/src/eth/relayer/transaction_dag.rs index 4f5056392..5523d2b4a 100644 --- a/src/eth/relayer/transaction_dag.rs +++ b/src/eth/relayer/transaction_dag.rs @@ -33,6 +33,10 @@ impl TransactionDag { .collect() } + pub fn txs_remaining(&self) -> usize { + self.dag.node_count() + } + pub fn get_balance_writes(block_transactions: &[TransactionMined]) -> HashSet
{ block_transactions .iter() diff --git a/static/schema/004-relayer.sql b/static/schema/004-relayer.sql index da714ecda..f859b41ff 100644 --- a/static/schema/004-relayer.sql +++ b/static/schema/004-relayer.sql @@ -5,7 +5,6 @@ create table relayer_blocks( finished boolean default false, mismatched boolean default false ); - create table mismatches( hash bytea primary key not null, block_number bigint, @@ -13,7 +12,6 @@ create table mismatches( substrate_receipt jsonb not null, error text not null ); - create table slot_mismatches( address bytea not null, index bytea not null, @@ -22,3 +20,7 @@ create table slot_mismatches( substrate_value bytea not null, primary key (address, index) ); +create table tx_hash_map( + stratus_hash bytea primary key not null, + substrate_hash bytea not null +); From 0ce6b665ca184d93d2f243af8501b05e13dceb85 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Tue, 25 Jun 2024 21:49:05 -0300 Subject: [PATCH 29/43] increase gas 10 fold --- src/eth/relayer/external.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 9409776a9..1e21ca812 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -29,7 +29,6 @@ use crate::eth::primitives::ExecutionValueChange; use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::Hash; use crate::eth::primitives::Nonce; -use crate::eth::primitives::Signature; use crate::eth::primitives::SlotIndex; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionInput; @@ -68,10 +67,7 @@ impl TxSigner { let wallet = LocalWallet::from_bytes(&private_key)?; let addr = wallet.address().into(); let nonce = chain.fetch_transaction_count(&addr).await?; - Ok(Self { - wallet, - nonce, - }) + Ok(Self { wallet, nonce }) } pub async fn sync_nonce(&mut self, chain: &BlockchainClient) -> anyhow::Result<()> { @@ -80,8 +76,9 @@ impl TxSigner { } pub fn sign_transaction_input(&mut self, mut tx_input: TransactionInput) -> TransactionInput { - let tx: TransactionRequest = - >::from(tx_input.clone()).nonce(self.nonce); + let tx: TransactionRequest = >::from(tx_input.clone()) + .nonce(self.nonce) + .gas(tx_input.gas_limit.as_u64() * 10); let req = TypedTransaction::Legacy(tx); let signature = self.wallet.sign_transaction_sync(&req).unwrap(); @@ -330,7 +327,7 @@ impl ExternalRelayer { break Ok(()); } } - Ok(None) => + Ok(None) => { if start.elapsed().as_secs() <= 30 { tracing::warn!(?tx_hash, "no receipt returned by substrate, retrying..."); } else { @@ -339,7 +336,8 @@ impl ExternalRelayer { block_number, anyhow!("no receipt returned by substrate for more than 30 seconds"), )); - }, + } + } Err(error) => { tracing::error!(?tx_hash, ?error, "failed to fetch substrate receipt, retrying..."); } @@ -393,7 +391,7 @@ impl ExternalRelayer { .expect("writing the mismatch to a file should not fail"); tracing::error!(?err, "failed to save mismatch, saving to file"); } - Ok(res) => + Ok(res) => { if res.rows_affected() == 0 { tracing::info!( ?block_number, @@ -401,7 +399,8 @@ impl ExternalRelayer { "transaction mismatch already in database (this should only happen if this block is being retried)." ); return; - }, + } + } } #[cfg(feature = "metrics")] From 383fa24a807aceac65f4876d42fd9dc1140bf019 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Tue, 25 Jun 2024 21:50:40 -0300 Subject: [PATCH 30/43] no kube --- docker/Dockerfile.run_with_importer | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/Dockerfile.run_with_importer b/docker/Dockerfile.run_with_importer index 5602b8bab..a3e7f9795 100644 --- a/docker/Dockerfile.run_with_importer +++ b/docker/Dockerfile.run_with_importer @@ -19,7 +19,7 @@ ENV CARGO_PROFILE_RELEASE_DEBUG=1 ENV LOG_FORMAT=json ENV NO_COLOR=1 -RUN cargo build --release --bin run-with-importer --features kubernetes +RUN cargo build --release --bin run-with-importer # Runtime FROM rust:1.75 as runtime From 182599e4347c28fc77b3655b8f731dc353d92ca6 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Wed, 26 Jun 2024 13:29:36 -0300 Subject: [PATCH 31/43] remove timeout --- src/eth/relayer/external.rs | 36 ++++++++++++------------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 1e21ca812..b7c5569e9 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -303,20 +303,19 @@ impl ExternalRelayer { let start = Instant::now(); let mut substrate_receipt = substrate_pending_transaction; let _res = loop { - let Ok(receipt) = timeout(Duration::from_secs(30), substrate_receipt).await else { - tracing::error!( - ?block_number, - ?tx_hash, - "no receipt returned by substrate for more than 30 seconds, retrying block" - ); - break Err(RelayError::CompareTimeout( - block_number, - anyhow!("no receipt returned by substrate for more than 30 seconds"), - )); + let receipt = loop { + match substrate_receipt.await { + Ok(r) => break r, + Err(err) => { + substrate_receipt = PendingTransaction::new(tx_hash, &self.substrate_chain); + tracing::warn!(?err); + continue; + } + } }; match receipt { - Ok(Some(substrate_receipt)) => { + Some(substrate_receipt) => { let _ = stratus_tx.execution.apply_receipt(&substrate_receipt); if let Err(compare_error) = stratus_tx.execution.compare_with_receipt(&substrate_receipt) { let err_string = compare_error.to_string(); @@ -327,19 +326,8 @@ impl ExternalRelayer { break Ok(()); } } - Ok(None) => { - if start.elapsed().as_secs() <= 30 { - tracing::warn!(?tx_hash, "no receipt returned by substrate, retrying..."); - } else { - tracing::error!(?tx_hash, "no receipt returned by substrate for more than 30 seconds, retrying block"); - break Err(RelayError::CompareTimeout( - block_number, - anyhow!("no receipt returned by substrate for more than 30 seconds"), - )); - } - } - Err(error) => { - tracing::error!(?tx_hash, ?error, "failed to fetch substrate receipt, retrying..."); + None => { + tracing::warn!(?tx_hash, "no receipt returned by substrate, retrying..."); } } substrate_receipt = PendingTransaction::new(tx_hash, &self.substrate_chain); From e20b8838a950a14d8d8e23ab1683f497590257b1 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Wed, 26 Jun 2024 15:54:25 -0300 Subject: [PATCH 32/43] print substrate tx hash --- src/eth/consensus/forward_to.rs | 2 +- src/eth/relayer/external.rs | 4 ++-- src/infra/blockchain_client/blockchain_client.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/eth/consensus/forward_to.rs b/src/eth/consensus/forward_to.rs index 5611bf051..f5e3fd21e 100644 --- a/src/eth/consensus/forward_to.rs +++ b/src/eth/consensus/forward_to.rs @@ -25,7 +25,7 @@ impl TransactionRelayer { let tx = self .chain - .send_raw_transaction(tx_input.hash, Transaction::from(tx_input.clone()).rlp()) + .send_raw_transaction(Transaction::from(tx_input.clone()).rlp()) .await?; Ok(tx) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index b7c5569e9..fb01c2a97 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -295,7 +295,7 @@ impl ExternalRelayer { let tx_hash: Hash = stratus_tx.input.hash; let block_number: BlockNumber = stratus_tx.block_number; - tracing::info!(?block_number, ?tx_hash, "comparing receipts"); + tracing::info!(?block_number, ?tx_hash, ?substrate_pending_transaction.tx_hash, "comparing receipts"); // fill span Span::with(|s| s.rec_str("hash", &tx_hash)); @@ -412,7 +412,7 @@ impl ExternalRelayer { let rlp = Transaction::from(tx_mined.input.clone()).rlp(); let tx = loop { - match self.substrate_chain.send_raw_transaction(tx_hash, rlp.clone()).await { + match self.substrate_chain.send_raw_transaction(rlp.clone()).await { Ok(tx) => break tx, Err(err) => { tracing::warn!( diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index 228ce78b4..8cdbd7079 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -264,8 +264,8 @@ impl BlockchainClient { // ------------------------------------------------------------------------- /// Sends a signed transaction. - pub async fn send_raw_transaction(&self, hash: Hash, tx: Bytes) -> anyhow::Result> { - tracing::debug!(%hash, "sending raw transaction"); + pub async fn send_raw_transaction(&self, tx: Bytes) -> anyhow::Result> { + tracing::debug!("sending raw transaction"); let tx = serde_json::to_value(tx).expect_infallible(); let result = self.http.request::>("eth_sendRawTransaction", vec![tx]).await; From 19b1fa88071a59375afa88fca7df4632de9ab052 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Wed, 26 Jun 2024 15:55:57 -0300 Subject: [PATCH 33/43] correct gas limit for signature --- src/eth/relayer/external.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index fb01c2a97..c0ddca114 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -85,6 +85,7 @@ impl TxSigner { let new_hash = req.hash(&signature); tx_input.signer = self.wallet.address().into(); + tx_input.gas_limit = (tx_input.gas_limit.as_u64() * 10).into(); // None is Legacy tx_input.tx_type = None; tx_input.hash = new_hash.into(); From 659352fc27eae5e743bd8eb08c7f388657c0e044 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Wed, 26 Jun 2024 17:55:48 -0300 Subject: [PATCH 34/43] better handling for txs that are removed from the pool --- origin/e2e-contracts/repos/brlc-multisig | 1 + origin/e2e-contracts/repos/brlc-periphery | 1 + origin/e2e-contracts/repos/brlc-pix-cashier | 1 + origin/e2e-contracts/repos/brlc-token | 1 + .../e2e-contracts/repos/brlc-yield-streamer | 1 + origin/e2e-contracts/repos/compound-periphery | 1 + src/eth/relayer/external.rs | 91 ++++++++++--------- 7 files changed, 55 insertions(+), 42 deletions(-) create mode 160000 origin/e2e-contracts/repos/brlc-multisig create mode 160000 origin/e2e-contracts/repos/brlc-periphery create mode 160000 origin/e2e-contracts/repos/brlc-pix-cashier create mode 160000 origin/e2e-contracts/repos/brlc-token create mode 160000 origin/e2e-contracts/repos/brlc-yield-streamer create mode 160000 origin/e2e-contracts/repos/compound-periphery diff --git a/origin/e2e-contracts/repos/brlc-multisig b/origin/e2e-contracts/repos/brlc-multisig new file mode 160000 index 000000000..918a226af --- /dev/null +++ b/origin/e2e-contracts/repos/brlc-multisig @@ -0,0 +1 @@ +Subproject commit 918a226af3b3a9847fd70bf355e4412fa372ef88 diff --git a/origin/e2e-contracts/repos/brlc-periphery b/origin/e2e-contracts/repos/brlc-periphery new file mode 160000 index 000000000..b8d507a46 --- /dev/null +++ b/origin/e2e-contracts/repos/brlc-periphery @@ -0,0 +1 @@ +Subproject commit b8d507a468477b2387dbbc1986c166b5fda34fc9 diff --git a/origin/e2e-contracts/repos/brlc-pix-cashier b/origin/e2e-contracts/repos/brlc-pix-cashier new file mode 160000 index 000000000..a528d0cb1 --- /dev/null +++ b/origin/e2e-contracts/repos/brlc-pix-cashier @@ -0,0 +1 @@ +Subproject commit a528d0cb1c46112ee81196cb462de6fa13a224cd diff --git a/origin/e2e-contracts/repos/brlc-token b/origin/e2e-contracts/repos/brlc-token new file mode 160000 index 000000000..0858ec418 --- /dev/null +++ b/origin/e2e-contracts/repos/brlc-token @@ -0,0 +1 @@ +Subproject commit 0858ec41851b62e50559f2c288c4bb0e89e6c612 diff --git a/origin/e2e-contracts/repos/brlc-yield-streamer b/origin/e2e-contracts/repos/brlc-yield-streamer new file mode 160000 index 000000000..7683517c1 --- /dev/null +++ b/origin/e2e-contracts/repos/brlc-yield-streamer @@ -0,0 +1 @@ +Subproject commit 7683517c13c2f37ae274e72dca1f10966d20b534 diff --git a/origin/e2e-contracts/repos/compound-periphery b/origin/e2e-contracts/repos/compound-periphery new file mode 160000 index 000000000..e4d68dff7 --- /dev/null +++ b/origin/e2e-contracts/repos/compound-periphery @@ -0,0 +1 @@ +Subproject commit e4d68dff75102f00b89f501e01ae1f4edbff878a diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index c0ddca114..05bcd69a9 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -1,9 +1,8 @@ use std::collections::HashSet; -use std::time::Duration; - use anyhow::anyhow; use anyhow::Context; use ethers_core::types::transaction::eip2718::TypedTransaction; +use ethers_core::types::Bytes; use ethers_core::types::Transaction; use ethers_core::types::TransactionRequest; use ethers_signers::LocalWallet; @@ -15,8 +14,6 @@ use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; use tokio::fs::File; use tokio::io::AsyncWriteExt; -use tokio::time::timeout; -use tokio::time::Instant; use tracing::Span; use super::transaction_dag::TransactionDag; @@ -54,6 +51,9 @@ pub enum RelayError { #[error("Compare Timeout: {1}")] CompareTimeout(BlockNumber, anyhow::Error), + + #[error("Transaction not found")] + TransactionNotFound, } struct TxSigner { @@ -301,9 +301,8 @@ impl ExternalRelayer { // fill span Span::with(|s| s.rec_str("hash", &tx_hash)); - let start = Instant::now(); let mut substrate_receipt = substrate_pending_transaction; - let _res = loop { + let _res = { let receipt = loop { match substrate_receipt.await { Ok(r) => break r, @@ -314,25 +313,19 @@ impl ExternalRelayer { } } }; - - match receipt { - Some(substrate_receipt) => { - let _ = stratus_tx.execution.apply_receipt(&substrate_receipt); - if let Err(compare_error) = stratus_tx.execution.compare_with_receipt(&substrate_receipt) { - let err_string = compare_error.to_string(); - let error = log_and_err!("transaction mismatch!").context(err_string.clone()); - self.save_mismatch(stratus_tx, substrate_receipt, &err_string).await; - break error.map_err(|err| RelayError::Mismatch(block_number, err)); - } else { - break Ok(()); - } - } - None => { - tracing::warn!(?tx_hash, "no receipt returned by substrate, retrying..."); + if let Some(substrate_receipt) = receipt { + let _ = stratus_tx.execution.apply_receipt(&substrate_receipt); + if let Err(compare_error) = stratus_tx.execution.compare_with_receipt(&substrate_receipt) { + let err_string = compare_error.to_string(); + let error = log_and_err!("transaction mismatch!").context(err_string.clone()); + self.save_mismatch(stratus_tx, substrate_receipt, &err_string).await; + error.map_err(|err| RelayError::Mismatch(block_number, err)) + } else { + Ok(()) } + } else { + Err(RelayError::TransactionNotFound) } - substrate_receipt = PendingTransaction::new(tx_hash, &self.substrate_chain); - traced_sleep(Duration::from_millis(50), SleepReason::SyncData).await; }; #[cfg(feature = "metrics")] @@ -396,23 +389,9 @@ impl ExternalRelayer { metrics::inc_save_mismatch(start.elapsed()); } - /// Relays a transaction to Substrate and waits until the transaction is in the mempool by - /// calling eth_getTransactionByHash. (infallible) - #[tracing::instrument(name = "external_relayer::relay_and_check_mempool", skip_all, fields(hash))] - pub async fn relay_and_check_mempool(&self, tx_mined: TransactionMined) -> anyhow::Result<(), RelayError> { - #[cfg(feature = "metrics")] - let start = metrics::now(); - + pub async fn send_transaction(&self, tx_mined: TransactionMined, rlp: Bytes) -> PendingTransaction { let tx_hash = tx_mined.input.hash; - let nonce = tx_mined.input.nonce; - - tracing::info!(?tx_mined.input.nonce, ?tx_hash, "relaying transaction"); - - // fill span - Span::with(|s| s.rec_str("hash", &tx_hash)); - - let rlp = Transaction::from(tx_mined.input.clone()).rlp(); - let tx = loop { + loop { match self.substrate_chain.send_raw_transaction(rlp.clone()).await { Ok(tx) => break tx, Err(err) => { @@ -424,18 +403,45 @@ impl ExternalRelayer { if self.substrate_chain.fetch_transaction(tx_hash).await.unwrap_or(None).is_some() { tracing::info!(?tx_hash, "transaction found on substrate"); - return self.compare_receipt(tx_mined, PendingTransaction::new(tx_hash, &self.substrate_chain)).await; + return PendingTransaction::new(tx_hash, &self.substrate_chain); } tracing::warn!(?tx_hash, ?err, "failed to send raw transaction, retrying..."); continue; } } - }; + } + } + + /// Relays a transaction to Substrate and waits until the transaction is in the mempool by + /// calling eth_getTransactionByHash. (infallible) + #[tracing::instrument(name = "external_relayer::relay_and_check_mempool", skip_all, fields(hash))] + pub async fn relay_and_check_mempool(&self, tx_mined: TransactionMined) -> anyhow::Result<(), RelayError> { + #[cfg(feature = "metrics")] + let start = metrics::now(); + + let tx_hash = tx_mined.input.hash; + let nonce = tx_mined.input.nonce; + + tracing::info!(?tx_mined.input.nonce, ?tx_hash, "relaying transaction"); + + // fill span + Span::with(|s| s.rec_str("hash", &tx_hash)); + + let rlp = Transaction::from(tx_mined.input.clone()).rlp(); + let mut tx = self.send_transaction(tx_mined.clone(), rlp.clone()).await; #[cfg(feature = "metrics")] metrics::inc_relay_and_check_mempool(start.elapsed()); - self.compare_receipt(tx_mined, tx).await + loop { + if let Err(error) = self.compare_receipt(tx_mined.clone(), tx).await { + match error { + RelayError::TransactionNotFound => tracing::warn!(?tx_hash, "transaction not found in substrate, trying to resend"), + err => break Err(err), + } + } + tx = self.send_transaction(tx_mined.clone(), rlp.clone()).await; + } } /// Relays a dag by removing its roots and sending them consecutively. Returns `Ok` if we confirmed that all transactions @@ -464,6 +470,7 @@ impl ExternalRelayer { match error { RelayError::CompareTimeout(number, _) => timedout_blocks.insert(number), RelayError::Mismatch(number, _) => mismatched_blocks.insert(number), + _ => panic!("unexpected error"), }; } From a45eda82ecb6437e780cadb1c425f63ddad5097a Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Wed, 26 Jun 2024 21:25:13 -0300 Subject: [PATCH 35/43] fix bug --- src/eth/relayer/external.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 05bcd69a9..af88ab7c4 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -439,8 +439,10 @@ impl ExternalRelayer { RelayError::TransactionNotFound => tracing::warn!(?tx_hash, "transaction not found in substrate, trying to resend"), err => break Err(err), } + tx = self.send_transaction(tx_mined.clone(), rlp.clone()).await; + } else { + break Ok(()) } - tx = self.send_transaction(tx_mined.clone(), rlp.clone()).await; } } From 358f8e392a40aa41eb49306a4ec3c5db15720d0a Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Wed, 26 Jun 2024 23:59:46 -0300 Subject: [PATCH 36/43] save tx mapping --- ...99166671d50a769c5664e19eb053334289b6d.json | 22 +++++++ ...4c3a1eb73512c28cec059abbc7b9480e6c4e9.json | 16 +++++ src/eth/consensus/forward_to.rs | 5 +- src/eth/primitives/transaction_input.rs | 13 ++++ src/eth/relayer/external.rs | 61 ++++++++++++++----- static/schema/004-relayer.sql | 9 +-- 6 files changed, 103 insertions(+), 23 deletions(-) create mode 100644 .sqlx/query-61e2784d13ebc79ae571b5c1a6299166671d50a769c5664e19eb053334289b6d.json create mode 100644 .sqlx/query-f1a46cfa58166db3fbf5c6b59ab4c3a1eb73512c28cec059abbc7b9480e6c4e9.json diff --git a/.sqlx/query-61e2784d13ebc79ae571b5c1a6299166671d50a769c5664e19eb053334289b6d.json b/.sqlx/query-61e2784d13ebc79ae571b5c1a6299166671d50a769c5664e19eb053334289b6d.json new file mode 100644 index 000000000..5a2c3820f --- /dev/null +++ b/.sqlx/query-61e2784d13ebc79ae571b5c1a6299166671d50a769c5664e19eb053334289b6d.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT resigned_transaction\n FROM tx_hash_map\n WHERE stratus_hash=$1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "resigned_transaction", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Bytea" + ] + }, + "nullable": [ + false + ] + }, + "hash": "61e2784d13ebc79ae571b5c1a6299166671d50a769c5664e19eb053334289b6d" +} diff --git a/.sqlx/query-f1a46cfa58166db3fbf5c6b59ab4c3a1eb73512c28cec059abbc7b9480e6c4e9.json b/.sqlx/query-f1a46cfa58166db3fbf5c6b59ab4c3a1eb73512c28cec059abbc7b9480e6c4e9.json new file mode 100644 index 000000000..2d7e050e7 --- /dev/null +++ b/.sqlx/query-f1a46cfa58166db3fbf5c6b59ab4c3a1eb73512c28cec059abbc7b9480e6c4e9.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO tx_hash_map (stratus_hash, substrate_hash, resigned_transaction) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bytea", + "Bytea", + "Jsonb" + ] + }, + "nullable": [] + }, + "hash": "f1a46cfa58166db3fbf5c6b59ab4c3a1eb73512c28cec059abbc7b9480e6c4e9" +} diff --git a/src/eth/consensus/forward_to.rs b/src/eth/consensus/forward_to.rs index f5e3fd21e..039912255 100644 --- a/src/eth/consensus/forward_to.rs +++ b/src/eth/consensus/forward_to.rs @@ -23,10 +23,7 @@ impl TransactionRelayer { pub async fn forward(&self, tx_input: TransactionInput) -> anyhow::Result { tracing::debug!(hash = %tx_input.hash, "forwarding transaction"); - let tx = self - .chain - .send_raw_transaction(Transaction::from(tx_input.clone()).rlp()) - .await?; + let tx = self.chain.send_raw_transaction(Transaction::from(tx_input.clone()).rlp()).await?; Ok(tx) } diff --git a/src/eth/primitives/transaction_input.rs b/src/eth/primitives/transaction_input.rs index b7bffa852..db8bbe7da 100644 --- a/src/eth/primitives/transaction_input.rs +++ b/src/eth/primitives/transaction_input.rs @@ -11,6 +11,7 @@ use fake::Dummy; use fake::Fake; use fake::Faker; use rlp::Decodable; +use serde::Deserialize; use crate::eth::primitives::Address; use crate::eth::primitives::Bytes; @@ -24,6 +25,7 @@ use crate::eth::primitives::SoliditySignature; use crate::eth::primitives::Wei; use crate::ext::not; use crate::ext::OptionExt; +use crate::log_and_err; #[derive(DebugAsJson, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct TransactionInput { @@ -193,3 +195,14 @@ impl From for TransactionRequest { } } } + +impl TryFrom for TransactionInput { + type Error = anyhow::Error; + + fn try_from(value: serde_json::Value) -> Result { + match Self::deserialize(&value) { + Ok(v) => Ok(v), + Err(e) => log_and_err!(reason = e, payload = value, "failed to convert payload value to TransactionInput"), + } + } +} diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index af88ab7c4..1a89b5c56 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; + use anyhow::anyhow; use anyhow::Context; use ethers_core::types::transaction::eip2718::TypedTransaction; @@ -30,9 +31,7 @@ use crate::eth::primitives::SlotIndex; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionInput; use crate::eth::primitives::TransactionMined; -use crate::ext::traced_sleep; use crate::ext::ResultExt; -use crate::ext::SleepReason; use crate::infra::blockchain_client::pending_transaction::PendingTransaction; #[cfg(feature = "metrics")] use crate::infra::metrics; @@ -190,6 +189,36 @@ impl ExternalRelayer { inc_compare_final_state(start.elapsed()); } + pub async fn insert_transaction_mapping(&self, stratus_hash: Hash, new_transaction: &TransactionInput) { + let new_hash = new_transaction.hash; + let transaction_json = serde_json::to_value(new_transaction).expect_infallible(); + while let Err(e) = sqlx::query!( + "INSERT INTO tx_hash_map (stratus_hash, substrate_hash, resigned_transaction) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", + stratus_hash as _, + new_hash as _, + transaction_json as _, + ) + .execute(&self.pool) + .await + { + tracing::warn!(?e, "failed to insert transaction, retrying..."); + } + } + + pub async fn get_mapped_transaction(&self, stratus_hash: Hash) -> anyhow::Result> { + sqlx::query!( + r#" + SELECT resigned_transaction + FROM tx_hash_map + WHERE stratus_hash=$1"#, + stratus_hash as _ + ) + .fetch_optional(&self.pool) + .await? + .map(|row| row.resigned_transaction.try_into()) + .transpose() + } + /// Polls the next block to be relayed and relays it to Substrate. #[tracing::instrument(name = "external_relayer::relay_blocks", skip_all, fields(block_number))] pub async fn relay_blocks(&mut self) -> anyhow::Result> { @@ -233,16 +262,20 @@ impl ExternalRelayer { } self.signer.sync_nonce(&self.substrate_chain).await?; - let combined_transactions = Self::combine_transactions(blocks) - .into_iter() - .sorted() - .map(|mut tx| { - if tx.input.extract_function().is_some_and(|sig| sig.contains("PixCashier")) { + let mut combined_transactions = vec![]; + for mut tx in Self::combine_transactions(blocks).into_iter().sorted() { + if tx.input.extract_function().is_some_and(|sig| sig.contains("PixCashier")) { + let transaction_signed = self.get_mapped_transaction(tx.input.hash).await?; + if let Some(transaction) = transaction_signed { + tx.input = transaction; + } else { + let prev_hash = tx.input.hash; tx.input = self.signer.sign_transaction_input(tx.input); + self.insert_transaction_mapping(prev_hash, &tx.input).await; } - tx - }) - .collect_vec(); + } + combined_transactions.push(tx); + } let modified_slots = TransactionDag::get_slot_writes(&combined_transactions); // TODO: Replace failed transactions with transactions that will for sure fail in substrate (need access to primary keys) @@ -373,7 +406,7 @@ impl ExternalRelayer { .expect("writing the mismatch to a file should not fail"); tracing::error!(?err, "failed to save mismatch, saving to file"); } - Ok(res) => { + Ok(res) => if res.rows_affected() == 0 { tracing::info!( ?block_number, @@ -381,8 +414,7 @@ impl ExternalRelayer { "transaction mismatch already in database (this should only happen if this block is being retried)." ); return; - } - } + }, } #[cfg(feature = "metrics")] @@ -421,7 +453,6 @@ impl ExternalRelayer { let start = metrics::now(); let tx_hash = tx_mined.input.hash; - let nonce = tx_mined.input.nonce; tracing::info!(?tx_mined.input.nonce, ?tx_hash, "relaying transaction"); @@ -441,7 +472,7 @@ impl ExternalRelayer { } tx = self.send_transaction(tx_mined.clone(), rlp.clone()).await; } else { - break Ok(()) + break Ok(()); } } } diff --git a/static/schema/004-relayer.sql b/static/schema/004-relayer.sql index f859b41ff..12081dcf6 100644 --- a/static/schema/004-relayer.sql +++ b/static/schema/004-relayer.sql @@ -1,9 +1,9 @@ create table relayer_blocks( number bigint primary key not null check (number >= 0), payload jsonb not null, - started boolean default false, - finished boolean default false, - mismatched boolean default false + started boolean not null default false, + finished boolean not null default false, + mismatched boolean not null default false ); create table mismatches( hash bytea primary key not null, @@ -22,5 +22,6 @@ create table slot_mismatches( ); create table tx_hash_map( stratus_hash bytea primary key not null, - substrate_hash bytea not null + substrate_hash bytea not null, + resigned_transaction jsonb not null ); From c89022862fa1e84b216ad73d35d6cc76f9c3b363 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 27 Jun 2024 00:26:02 -0300 Subject: [PATCH 37/43] lint --- src/eth/relayer/external.rs | 8 +------- src/eth/rpc/rpc_server.rs | 2 +- src/infra/blockchain_client/blockchain_client.rs | 4 ++-- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 100e41c93..96bb2921f 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -31,13 +31,7 @@ use crate::eth::primitives::SlotIndex; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionInput; use crate::eth::primitives::TransactionMined; - -use crate::ext::ResultExt; - use crate::ext::to_json_value; -use crate::ext::traced_sleep; -use crate::ext::SleepReason; - use crate::infra::blockchain_client::pending_transaction::PendingTransaction; #[cfg(feature = "metrics")] use crate::infra::metrics; @@ -197,7 +191,7 @@ impl ExternalRelayer { pub async fn insert_transaction_mapping(&self, stratus_hash: Hash, new_transaction: &TransactionInput) { let new_hash = new_transaction.hash; - let transaction_json = serde_json::to_value(new_transaction).expect_infallible(); + let transaction_json = to_json_value(new_transaction); while let Err(e) = sqlx::query!( "INSERT INTO tx_hash_map (stratus_hash, substrate_hash, resigned_transaction) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", stratus_hash as _, diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index f54e00689..6cb250590 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -329,7 +329,7 @@ fn stratus_get_slots(params: Params<'_>, ctx: Arc, _: Extensions) -> .map(|index| ctx.storage.read_slot(&address, &index, &point_in_time)) .collect::, _>>()?; - Ok(serde_json::to_value(slots).expect_infallible()) + Ok(to_json_value(slots)) } // ----------------------------------------------------------------------------- diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index bd7ea3721..ca636e7cf 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -246,11 +246,11 @@ impl BlockchainClient { /// Fetches the current transaction count (nonce) for an account. pub async fn fetch_transaction_count(&self, address: &Address) -> anyhow::Result { tracing::debug!("fetching block number"); - let address = serde_json::to_value(address).expect_infallible(); + let address = to_json_value(address); let result = self .http - .request::>("eth_getTransactionCount", vec![address, serde_json::to_value("latest").expect_infallible()]) + .request::>("eth_getTransactionCount", vec![address, to_json_value("latest")]) .await; match result { From 07f37e3d5597aac8d0d258fe16104dd1c334003e Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 27 Jun 2024 00:50:43 -0300 Subject: [PATCH 38/43] lint --- src/eth/relayer/external.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 96bb2921f..810104d40 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::time::Instant; use anyhow::anyhow; use anyhow::Context; @@ -482,8 +483,7 @@ impl ExternalRelayer { /// on the `mismatches` table in pgsql, or in ./data as a fallback. #[tracing::instrument(name = "external_relayer::relay_dag", skip_all)] async fn relay_dag(&self, mut dag: TransactionDag) -> (MismatchedBlocks, TimedoutBlocks) { - #[cfg(feature = "metrics")] - let start = metrics::now(); + let start = Instant::now(); tracing::debug!("relaying transactions"); From f2813f16a2be36e91f64c37f9e5d814437101a67 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 27 Jun 2024 12:03:09 -0300 Subject: [PATCH 39/43] small refac --- src/eth/relayer/external.rs | 154 ++++++++++++++++-------------------- 1 file changed, 68 insertions(+), 86 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 810104d40..f58cd460b 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -14,8 +14,6 @@ use futures::StreamExt; use itertools::Itertools; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; -use tokio::fs::File; -use tokio::io::AsyncWriteExt; use tracing::Span; use super::transaction_dag::TransactionDag; @@ -42,16 +40,12 @@ use crate::infra::BlockchainClient; use crate::log_and_err; type MismatchedBlocks = HashSet; -type TimedoutBlocks = HashSet; #[derive(Debug, thiserror::Error, derive_new::new)] pub enum RelayError { #[error("Transaction Mismatch: {1}")] Mismatch(BlockNumber, anyhow::Error), - #[error("Compare Timeout: {1}")] - CompareTimeout(BlockNumber, anyhow::Error), - #[error("Transaction not found")] TransactionNotFound, } @@ -134,8 +128,22 @@ impl ExternalRelayer { }) } - fn combine_transactions(blocks: Vec) -> Vec { - blocks.into_iter().flat_map(|block| block.transactions).collect() + async fn combine_transactions(&mut self, blocks: Vec) -> anyhow::Result> { + let mut combined_transactions = vec![]; + for mut tx in blocks.into_iter().flat_map(|block| block.transactions).sorted() { + if tx.input.extract_function().is_some_and(|sig| sig.contains("PixCashier")) { + let transaction_signed = self.get_mapped_transaction(tx.input.hash).await?; + if let Some(transaction) = transaction_signed { + tx.input = transaction; + } else { + let prev_hash = tx.input.hash; + tx.input = self.signer.sign_transaction_input(tx.input); + self.insert_transaction_mapping(prev_hash, &tx.input).await; + } + } + combined_transactions.push(tx); + } + Ok(combined_transactions) } async fn blocks_have_been_mined(&self, blocks: Vec) -> bool { @@ -220,9 +228,7 @@ impl ExternalRelayer { .transpose() } - /// Polls the next block to be relayed and relays it to Substrate. - #[tracing::instrument(name = "external_relayer::relay_blocks", skip_all, fields(block_number))] - pub async fn relay_blocks(&mut self) -> anyhow::Result> { + pub async fn fetch_blocks(&self) -> anyhow::Result> { let block_rows = sqlx::query!( r#" WITH cte AS ( @@ -241,65 +247,55 @@ impl ExternalRelayer { .fetch_all(&self.pool) .await?; - if block_rows.is_empty() { + block_rows + .into_iter() + .sorted_by_key(|row| row.number) + .map(|row| row.payload.try_into()) + .collect::>() + } + + /// Polls the next block to be relayed and relays it to Substrate. + #[tracing::instrument(name = "external_relayer::relay_blocks", skip_all, fields(block_number))] + pub async fn relay_blocks(&mut self) -> anyhow::Result> { + let blocks = self.fetch_blocks().await?; + + if blocks.is_empty() { tracing::info!("no blocks to relay"); return Ok(vec![]); } - let block_numbers: HashSet = block_rows.iter().map(|row| row.number.into()).collect(); + let block_numbers: HashSet = blocks.iter().map(|block| block.number()).collect(); let max_number = block_numbers.iter().max().cloned().unwrap(); // fill span Span::with(|s| s.rec_str("block_number", &max_number)); - let blocks: Vec = block_rows - .into_iter() - .sorted_by_key(|row| row.number) - .map(|row| row.payload.try_into()) - .collect::>()?; - if !self.blocks_have_been_mined(blocks.iter().map(|block| block.hash()).collect()).await { return Err(anyhow!("some blocks in this batch have not been mined in stratus")); } self.signer.sync_nonce(&self.substrate_chain).await?; - let mut combined_transactions = vec![]; - for mut tx in Self::combine_transactions(blocks).into_iter().sorted() { - if tx.input.extract_function().is_some_and(|sig| sig.contains("PixCashier")) { - let transaction_signed = self.get_mapped_transaction(tx.input.hash).await?; - if let Some(transaction) = transaction_signed { - tx.input = transaction; - } else { - let prev_hash = tx.input.hash; - tx.input = self.signer.sign_transaction_input(tx.input); - self.insert_transaction_mapping(prev_hash, &tx.input).await; - } - } - combined_transactions.push(tx); - } + let combined_transactions = self.combine_transactions(blocks).await?; let modified_slots = TransactionDag::get_slot_writes(&combined_transactions); - // TODO: Replace failed transactions with transactions that will for sure fail in substrate (need access to primary keys) - let dag = TransactionDag::new(combined_transactions); - let (mismatched_blocks, timedout_blocks) = self.relay_dag(dag).await; - - let non_ok_blocks: HashSet = mismatched_blocks.union(&timedout_blocks).cloned().collect(); + if combined_transactions.is_empty() { + tracing::info!("no transactions to relay"); + return Ok(block_numbers.into_iter().collect_vec()); + } - let only_mismatched_blocks: Vec = mismatched_blocks.difference(&timedout_blocks).cloned().collect(); - let ok_blocks: Vec = block_numbers.difference(&non_ok_blocks).cloned().collect(); + let dag = TransactionDag::new(combined_transactions); - if !timedout_blocks.is_empty() { - tracing::warn!(?timedout_blocks, "some blocks timed-out"); - } + let mismatched_blocks = self.relay_dag(dag).await; + let ok_blocks: Vec = block_numbers.difference(&mismatched_blocks).cloned().collect(); - if !only_mismatched_blocks.is_empty() { - tracing::warn!(?only_mismatched_blocks, "some transactions mismatched"); + if !mismatched_blocks.is_empty() { + tracing::warn!(?mismatched_blocks, "some transactions mismatched"); sqlx::query!( r#"UPDATE relayer_blocks SET finished = true, mismatched = true WHERE number = ANY($1)"#, - &only_mismatched_blocks[..] as _ + &mismatched_blocks.iter().cloned().collect_vec()[..] as _ ) .execute(&self.pool) .await?; @@ -317,7 +313,7 @@ impl ExternalRelayer { } self.compare_final_state(modified_slots, max_number).await; - Ok(ok_blocks.into_iter().chain(only_mismatched_blocks.into_iter()).collect()) + Ok(ok_blocks.into_iter().chain(mismatched_blocks.into_iter()).collect()) } /// Compares the given receipt to the receipt returned by the pending transaction, retries until a receipt is returned @@ -381,42 +377,30 @@ impl ExternalRelayer { let stratus_json = to_json_value(stratus_receipt); let substrate_json = to_json_value(substrate_receipt); - let res = sqlx::query!( - "INSERT INTO mismatches (hash, block_number, stratus_receipt, substrate_receipt, error) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING", - &hash as _, - block_number as _, - &stratus_json, - &substrate_json, - err_string - ) - .execute(&self.pool) - .await; - - match res { - Err(err) => { - tracing::error!(?block_number, ?hash, "failed to insert row in pgsql, saving mismatche to json"); - let mut file = File::create(format!("data/{}.json", hash)).await.expect("opening the file should not fail"); - let json = serde_json::json!( - { - "stratus_receipt": stratus_json, - "substrate_receipt": substrate_json, - } - ); - file.write_all(json.to_string().as_bytes()) - .await - .expect("writing the mismatch to a file should not fail"); - tracing::error!(?err, "failed to save mismatch, saving to file"); + let res = loop { + match sqlx::query!( + "INSERT INTO mismatches (hash, block_number, stratus_receipt, substrate_receipt, error) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING", + &hash as _, + block_number as _, + &stratus_json, + &substrate_json, + err_string + ) + .execute(&self.pool) + .await + { + Ok(res) => break res, + Err(err) => tracing::error!(?block_number, ?hash, ?err, "failed to insert row in pgsql, retrying"), } - Ok(res) => - if res.rows_affected() == 0 { - tracing::info!( - ?block_number, - ?hash, - "transaction mismatch already in database (this should only happen if this block is being retried)." - ); - return; - }, - } + }; + + if res.rows_affected() == 0 { + tracing::info!( + ?block_number, + ?hash, + "transaction mismatch already in database (this should only happen if this block is being retried)." + ); + }; #[cfg(feature = "metrics")] metrics::inc_save_mismatch(start.elapsed()); @@ -482,7 +466,7 @@ impl ExternalRelayer { /// had the same receipts, returns `Err` if one or more transactions had receipts mismatches. The mismatches are saved /// on the `mismatches` table in pgsql, or in ./data as a fallback. #[tracing::instrument(name = "external_relayer::relay_dag", skip_all)] - async fn relay_dag(&self, mut dag: TransactionDag) -> (MismatchedBlocks, TimedoutBlocks) { + async fn relay_dag(&self, mut dag: TransactionDag) -> MismatchedBlocks { let start = Instant::now(); tracing::debug!("relaying transactions"); @@ -497,11 +481,9 @@ impl ExternalRelayer { let errors = results.into_iter().filter_map(Result::err); let mut mismatched_blocks: MismatchedBlocks = HashSet::new(); - let mut timedout_blocks: TimedoutBlocks = HashSet::new(); for error in errors { match error { - RelayError::CompareTimeout(number, _) => timedout_blocks.insert(number), RelayError::Mismatch(number, _) => mismatched_blocks.insert(number), _ => panic!("unexpected error"), }; @@ -510,7 +492,7 @@ impl ExternalRelayer { #[cfg(feature = "metrics")] metrics::inc_relay_dag(start.elapsed()); - (mismatched_blocks, timedout_blocks) + mismatched_blocks } } From 3e4b0fa580641909902afa9f37a782be23ea6237 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 27 Jun 2024 17:52:41 -0300 Subject: [PATCH 40/43] fix e2e --- e2e/cloudwalk-contracts/integration/hardhat.config.ts | 2 +- e2e/cloudwalk-contracts/integration/test/relayer.test.ts | 6 +++--- justfile | 8 ++++---- src/eth/relayer/external.rs | 7 ++----- src/eth/relayer/transaction_dag.rs | 2 +- src/infra/blockchain_client/pending_transaction.rs | 5 +++-- 6 files changed, 14 insertions(+), 16 deletions(-) diff --git a/e2e/cloudwalk-contracts/integration/hardhat.config.ts b/e2e/cloudwalk-contracts/integration/hardhat.config.ts index dd55ec92c..d85251431 100644 --- a/e2e/cloudwalk-contracts/integration/hardhat.config.ts +++ b/e2e/cloudwalk-contracts/integration/hardhat.config.ts @@ -36,7 +36,7 @@ const config: HardhatUserConfig = { mining: { auto: process.env.BLOCK_MODE === "automine", interval: - process.env.BLOCK_MODE === "automine" ? undefined : process.env.BLOCK_MODE === "1s" ? 1000 : 0, + process.env.BLOCK_MODE === "automine" ? undefined : process.env.BLOCK_MODE === "1s" ? 1000 : Number(process.env.BLOCK_MODE), }, accounts: { mnemonic: ACCOUNTS_MNEMONIC, diff --git a/e2e/cloudwalk-contracts/integration/test/relayer.test.ts b/e2e/cloudwalk-contracts/integration/test/relayer.test.ts index 7b7afaee3..b72e64d8e 100644 --- a/e2e/cloudwalk-contracts/integration/test/relayer.test.ts +++ b/e2e/cloudwalk-contracts/integration/test/relayer.test.ts @@ -31,9 +31,9 @@ describe("Relayer integration test", function () { describe("Long duration transaction tests", function () { const parameters = [ - { name: "Few wallets, sufficient balance", wallets: 3, duration: 15, tps: 5, baseBalance: 2000 }, - { name: "Few wallets, insufficient balance", wallets: 2, duration: 15, tps: 1, baseBalance: 5 }, - { name: "Many wallets, sufficient balance", wallets: 20, duration: 15, tps: 30, baseBalance: 2000 }, + { name: "Few wallets, sufficient balance", wallets: 3, duration: 10, tps: 5, baseBalance: 2000 }, + { name: "Few wallets, insufficient balance", wallets: 2, duration: 10, tps: 1, baseBalance: 5 }, + { name: "Many wallets, sufficient balance", wallets: 20, duration: 10, tps: 5, baseBalance: 2000 }, ]; parameters.forEach((params, index) => { const wallets: any[] = []; diff --git a/justfile b/justfile index bbe0edc3d..613b65b67 100644 --- a/justfile +++ b/justfile @@ -340,7 +340,7 @@ e2e-relayer-external-up: ( cd e2e/cloudwalk-contracts/integration npm install - BLOCK_MODE=1s npx hardhat node > ../../../e2e_logs/hardhat.log & + BLOCK_MODE=50 npx hardhat node > ../../../e2e_logs/hardhat.log & ) fi @@ -349,12 +349,12 @@ e2e-relayer-external-up: sleep 5 # Start Relayer External binary - cargo run --release --bin relayer --features dev -- --db-url postgres://postgres:123@0.0.0.0:5432/stratus --db-connections 5 --db-timeout 1s --forward-to http://0.0.0.0:8545 --stratus-rpc http://0.0.0.0:3000 --backoff 10ms --tokio-console-address 0.0.0.0:6979 --metrics-exporter-address 0.0.0.0:9001 > e2e_logs/relayer.log & + cargo run --release --bin relayer --features dev -- --db-url postgres://postgres:123@0.0.0.0:5432/stratus --db-connections 5 --db-timeout 1s --forward-to http://0.0.0.0:8545 --stratus-rpc http://0.0.0.0:3000 --backoff 10ms --tokio-console-address 0.0.0.0:6979 --metrics-exporter-address 0.0.0.0:9001 --signer "0x426e24d88dfc9d624cc4ca3f148a4b9cd3135f584ec8ae7f7ff3b5ca7e90f291" > e2e_logs/relayer.log & if [ -d e2e/cloudwalk-contracts ]; then ( cd e2e/cloudwalk-contracts/integration - npx hardhat test test/*.test.ts --network stratus + npx hardhat test test/*.test.ts --network stratus --bail if [ $? -ne 0 ]; then echo "Hardhat tests failed" exit 1 @@ -544,4 +544,4 @@ hive: # Run Hiveview hiveview: cd hive && go build ./cmd/hiveview - ./hive/hiveview --serve --addr 0.0.0.0:8080 --logdir ./hive/workspace/logs/ \ No newline at end of file + ./hive/hiveview --serve --addr 0.0.0.0:8080 --logdir ./hive/workspace/logs/ diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index f58cd460b..13826938f 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -70,6 +70,8 @@ impl TxSigner { } pub fn sign_transaction_input(&mut self, mut tx_input: TransactionInput) -> TransactionInput { + tracing::info!(?tx_input.hash, "signing transaction"); + let tx: TransactionRequest = >::from(tx_input.clone()) .nonce(self.nonce) .gas(tx_input.gas_limit.as_u64() * 10); @@ -278,11 +280,6 @@ impl ExternalRelayer { let combined_transactions = self.combine_transactions(blocks).await?; let modified_slots = TransactionDag::get_slot_writes(&combined_transactions); - if combined_transactions.is_empty() { - tracing::info!("no transactions to relay"); - return Ok(block_numbers.into_iter().collect_vec()); - } - let dag = TransactionDag::new(combined_transactions); let mismatched_blocks = self.relay_dag(dag).await; diff --git a/src/eth/relayer/transaction_dag.rs b/src/eth/relayer/transaction_dag.rs index 5523d2b4a..f3809d09b 100644 --- a/src/eth/relayer/transaction_dag.rs +++ b/src/eth/relayer/transaction_dag.rs @@ -120,7 +120,7 @@ impl TransactionDag { let tx2_from = dag.node_weight(tx2_node_index).unwrap().input.signer; if tx1_from != tx2_from && !set1.is_disjoint(set2) { - tracing::info!(?tx1, ?tx2, "adding edge"); + tracing::debug!(?tx1, ?tx2, "adding edge"); dag.add_edge(*node_indexes.get(tx1).unwrap(), *node_indexes.get(tx2).unwrap(), 1); } } diff --git a/src/infra/blockchain_client/pending_transaction.rs b/src/infra/blockchain_client/pending_transaction.rs index c7a8e2574..0c5f08f18 100644 --- a/src/infra/blockchain_client/pending_transaction.rs +++ b/src/infra/blockchain_client/pending_transaction.rs @@ -72,13 +72,13 @@ pub struct PendingTransaction<'a> { impl<'a> PendingTransaction<'a> { pub fn new(tx_hash: Hash, provider: &'a BlockchainClient) -> Self { - let delay = Box::pin(Delay::new(Duration::from_secs(1))); + let delay = Box::pin(Delay::new(Duration::from_millis(5))); PendingTransaction { state: PendingTxState::InitialDelay(delay), provider, tx_hash, interval: Box::new(interval(Duration::from_millis(10))), - retries_remaining: 100, + retries_remaining: 200, } } } @@ -133,6 +133,7 @@ impl<'a> Future for PendingTransaction<'a> { // If it hasn't confirmed yet, poll again later let tx = tx_opt.unwrap(); if tx.block_number.is_none() { + tracing::info!(?tx, "BLOCK NOT MINED"); *this.state = PendingTxState::PausedGettingTx; ctx.waker().wake_by_ref(); return Poll::Pending; From 3aa8d12286f81680b0a98366fac4f4833c343254 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 27 Jun 2024 18:06:40 -0300 Subject: [PATCH 41/43] wtf --- origin/e2e-contracts/repos/brlc-multisig | 1 - origin/e2e-contracts/repos/brlc-periphery | 1 - origin/e2e-contracts/repos/brlc-pix-cashier | 1 - origin/e2e-contracts/repos/brlc-token | 1 - origin/e2e-contracts/repos/brlc-yield-streamer | 1 - origin/e2e-contracts/repos/compound-periphery | 1 - 6 files changed, 6 deletions(-) delete mode 160000 origin/e2e-contracts/repos/brlc-multisig delete mode 160000 origin/e2e-contracts/repos/brlc-periphery delete mode 160000 origin/e2e-contracts/repos/brlc-pix-cashier delete mode 160000 origin/e2e-contracts/repos/brlc-token delete mode 160000 origin/e2e-contracts/repos/brlc-yield-streamer delete mode 160000 origin/e2e-contracts/repos/compound-periphery diff --git a/origin/e2e-contracts/repos/brlc-multisig b/origin/e2e-contracts/repos/brlc-multisig deleted file mode 160000 index 918a226af..000000000 --- a/origin/e2e-contracts/repos/brlc-multisig +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 918a226af3b3a9847fd70bf355e4412fa372ef88 diff --git a/origin/e2e-contracts/repos/brlc-periphery b/origin/e2e-contracts/repos/brlc-periphery deleted file mode 160000 index b8d507a46..000000000 --- a/origin/e2e-contracts/repos/brlc-periphery +++ /dev/null @@ -1 +0,0 @@ -Subproject commit b8d507a468477b2387dbbc1986c166b5fda34fc9 diff --git a/origin/e2e-contracts/repos/brlc-pix-cashier b/origin/e2e-contracts/repos/brlc-pix-cashier deleted file mode 160000 index a528d0cb1..000000000 --- a/origin/e2e-contracts/repos/brlc-pix-cashier +++ /dev/null @@ -1 +0,0 @@ -Subproject commit a528d0cb1c46112ee81196cb462de6fa13a224cd diff --git a/origin/e2e-contracts/repos/brlc-token b/origin/e2e-contracts/repos/brlc-token deleted file mode 160000 index 0858ec418..000000000 --- a/origin/e2e-contracts/repos/brlc-token +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 0858ec41851b62e50559f2c288c4bb0e89e6c612 diff --git a/origin/e2e-contracts/repos/brlc-yield-streamer b/origin/e2e-contracts/repos/brlc-yield-streamer deleted file mode 160000 index 7683517c1..000000000 --- a/origin/e2e-contracts/repos/brlc-yield-streamer +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 7683517c13c2f37ae274e72dca1f10966d20b534 diff --git a/origin/e2e-contracts/repos/compound-periphery b/origin/e2e-contracts/repos/compound-periphery deleted file mode 160000 index e4d68dff7..000000000 --- a/origin/e2e-contracts/repos/compound-periphery +++ /dev/null @@ -1 +0,0 @@ -Subproject commit e4d68dff75102f00b89f501e01ae1f4edbff878a From 33e1fcec4755b062b6ec6af3fe6890ee55f29a57 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 27 Jun 2024 18:14:31 -0300 Subject: [PATCH 42/43] lint --- e2e/cloudwalk-contracts/integration/hardhat.config.ts | 6 +++++- static/schema/004-relayer.sql | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/e2e/cloudwalk-contracts/integration/hardhat.config.ts b/e2e/cloudwalk-contracts/integration/hardhat.config.ts index d85251431..a1fe91d2e 100644 --- a/e2e/cloudwalk-contracts/integration/hardhat.config.ts +++ b/e2e/cloudwalk-contracts/integration/hardhat.config.ts @@ -36,7 +36,11 @@ const config: HardhatUserConfig = { mining: { auto: process.env.BLOCK_MODE === "automine", interval: - process.env.BLOCK_MODE === "automine" ? undefined : process.env.BLOCK_MODE === "1s" ? 1000 : Number(process.env.BLOCK_MODE), + process.env.BLOCK_MODE === "automine" + ? undefined + : process.env.BLOCK_MODE === "1s" + ? 1000 + : Number(process.env.BLOCK_MODE), }, accounts: { mnemonic: ACCOUNTS_MNEMONIC, diff --git a/static/schema/004-relayer.sql b/static/schema/004-relayer.sql index 12081dcf6..c5eeb907b 100644 --- a/static/schema/004-relayer.sql +++ b/static/schema/004-relayer.sql @@ -18,7 +18,7 @@ create table slot_mismatches( block_number bigint not null, stratus_value bytea not null, substrate_value bytea not null, - primary key (address, index) + primary key (block_number, address, index) ); create table tx_hash_map( stratus_hash bytea primary key not null, From 53ddcaf262fb691740d8fac877c6c00fa2e26b3d Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 27 Jun 2024 18:48:37 -0300 Subject: [PATCH 43/43] reduce tps --- e2e/cloudwalk-contracts/integration/test/relayer.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/cloudwalk-contracts/integration/test/relayer.test.ts b/e2e/cloudwalk-contracts/integration/test/relayer.test.ts index b72e64d8e..8987c682c 100644 --- a/e2e/cloudwalk-contracts/integration/test/relayer.test.ts +++ b/e2e/cloudwalk-contracts/integration/test/relayer.test.ts @@ -33,7 +33,7 @@ describe("Relayer integration test", function () { const parameters = [ { name: "Few wallets, sufficient balance", wallets: 3, duration: 10, tps: 5, baseBalance: 2000 }, { name: "Few wallets, insufficient balance", wallets: 2, duration: 10, tps: 1, baseBalance: 5 }, - { name: "Many wallets, sufficient balance", wallets: 20, duration: 10, tps: 5, baseBalance: 2000 }, + { name: "Many wallets, sufficient balance", wallets: 20, duration: 10, tps: 3, baseBalance: 2000 }, ]; parameters.forEach((params, index) => { const wallets: any[] = [];