Skip to content

Commit

Permalink
better handling of failed L1 transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
tomg10 committed Oct 15, 2024
1 parent 331fe87 commit f29fd12
Show file tree
Hide file tree
Showing 14 changed files with 85 additions and 284 deletions.
31 changes: 0 additions & 31 deletions core/lib/circuit_breaker/src/l1_txs.rs

This file was deleted.

1 change: 0 additions & 1 deletion core/lib/circuit_breaker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{fmt, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::sync::{watch, Mutex};

pub mod l1_txs;
mod metrics;
pub mod replication_lag;

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

83 changes: 20 additions & 63 deletions core/lib/dal/src/eth_sender_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ use sqlx::types::chrono::{DateTime, Utc};
use zksync_db_connection::{connection::Connection, interpolate_query, match_query_as};
use zksync_types::{
aggregated_operations::AggregatedActionType,
eth_sender::{EthTx, EthTxBlobSidecar, TxHistory, TxHistoryToSend},
eth_sender::{EthTx, EthTxBlobSidecar, TxHistory},
Address, L1BatchNumber, H256, U256,
};

use crate::{
models::storage_eth_tx::{
L1BatchEthSenderStats, StorageEthTx, StorageTxHistory, StorageTxHistoryToSend,
},
models::storage_eth_tx::{L1BatchEthSenderStats, StorageEthTx, StorageTxHistory},
Core,
};

Expand Down Expand Up @@ -194,33 +192,6 @@ impl EthSenderDal<'_, '_> {
Ok(txs.into_iter().map(|tx| tx.into()).collect())
}

pub async fn get_unsent_txs(&mut self) -> sqlx::Result<Vec<TxHistoryToSend>> {
let txs = sqlx::query_as!(
StorageTxHistoryToSend,
r#"
SELECT
eth_txs_history.id,
eth_txs_history.eth_tx_id,
eth_txs_history.tx_hash,
eth_txs_history.base_fee_per_gas,
eth_txs_history.priority_fee_per_gas,
eth_txs_history.signed_raw_tx,
eth_txs.nonce
FROM
eth_txs_history
JOIN eth_txs ON eth_txs.id = eth_txs_history.eth_tx_id
WHERE
eth_txs_history.sent_at_block IS NULL
AND eth_txs.confirmed_eth_tx_history_id IS NULL
ORDER BY
eth_txs_history.id DESC
"#,
)
.fetch_all(self.storage.conn())
.await?;
Ok(txs.into_iter().map(|tx| tx.into()).collect())
}

#[allow(clippy::too_many_arguments)]
pub async fn save_eth_tx(
&mut self,
Expand Down Expand Up @@ -321,29 +292,6 @@ impl EthSenderDal<'_, '_> {
.map(|row| row.id as u32))
}

pub async fn set_sent_at_block(
&mut self,
eth_txs_history_id: u32,
sent_at_block: u32,
) -> sqlx::Result<()> {
sqlx::query!(
r#"
UPDATE eth_txs_history
SET
sent_at_block = $2,
sent_at = NOW()
WHERE
id = $1
AND sent_at_block IS NULL
"#,
eth_txs_history_id as i32,
sent_at_block as i32
)
.execute(self.storage.conn())
.await?;
Ok(())
}

pub async fn remove_tx_history(&mut self, eth_txs_history_id: u32) -> sqlx::Result<()> {
sqlx::query!(
r#"
Expand Down Expand Up @@ -690,22 +638,31 @@ impl EthSenderDal<'_, '_> {
sqlx::query!(
r#"
DELETE FROM eth_txs
WHERE
id >= (
SELECT
MIN(id)
FROM
eth_txs
WHERE
has_failed = TRUE
)
WHERE has_failed = TRUE
"#
)
.execute(self.storage.conn())
.await?;
Ok(())
}

pub async fn count_all_inflight_txs(&mut self) -> anyhow::Result<i64> {
sqlx::query!(
r#"
SELECT
COUNT(*)
FROM
eth_txs
WHERE
confirmed_eth_tx_history_id IS NULL AND has_failed = FALSE
"#
)
.fetch_one(self.storage.conn())
.await?
.count
.context("count field is missing")
}

pub async fn delete_eth_txs(&mut self, last_batch_to_keep: L1BatchNumber) -> sqlx::Result<()> {
sqlx::query!(
r#"
Expand Down
29 changes: 1 addition & 28 deletions core/lib/dal/src/models/storage_eth_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::str::FromStr;
use sqlx::types::chrono::NaiveDateTime;
use zksync_types::{
aggregated_operations::AggregatedActionType,
eth_sender::{EthTx, TxHistory, TxHistoryToSend},
eth_sender::{EthTx, TxHistory},
Address, L1BatchNumber, Nonce, SLChainId, H256,
};

Expand Down Expand Up @@ -39,17 +39,6 @@ pub struct L1BatchEthSenderStats {
pub mined: Vec<(AggregatedActionType, L1BatchNumber)>,
}

#[derive(Clone, Debug)]
pub struct StorageTxHistoryToSend {
pub id: i32,
pub eth_tx_id: i32,
pub tx_hash: String,
pub priority_fee_per_gas: i64,
pub base_fee_per_gas: i64,
pub signed_raw_tx: Option<Vec<u8>>,
pub nonce: i64,
}

#[derive(Clone, Debug)]
pub struct StorageTxHistory {
pub id: i32,
Expand Down Expand Up @@ -110,19 +99,3 @@ impl From<StorageTxHistory> for TxHistory {
}
}
}

impl From<StorageTxHistoryToSend> for TxHistoryToSend {
fn from(history: StorageTxHistoryToSend) -> TxHistoryToSend {
TxHistoryToSend {
id: history.id as u32,
eth_tx_id: history.eth_tx_id as u32,
tx_hash: H256::from_str(&history.tx_hash).expect("Incorrect hash"),
base_fee_per_gas: history.base_fee_per_gas as u64,
priority_fee_per_gas: history.priority_fee_per_gas as u64,
signed_raw_tx: history
.signed_raw_tx
.expect("Should rely only on the new txs"),
nonce: Nonce(history.nonce as u32),
}
}
}
11 changes: 0 additions & 11 deletions core/lib/types/src/eth_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,3 @@ pub struct TxHistory {
pub signed_raw_tx: Vec<u8>,
pub sent_at_block: Option<u32>,
}

#[derive(Clone, Debug)]
pub struct TxHistoryToSend {
pub id: u32,
pub eth_tx_id: u32,
pub base_fee_per_gas: u64,
pub priority_fee_per_gas: u64,
pub tx_hash: H256,
pub signed_raw_tx: Vec<u8>,
pub nonce: Nonce,
}
15 changes: 15 additions & 0 deletions core/node/block_reverter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,21 @@ impl BlockReverter {
/// Clears failed L1 transactions.
pub async fn clear_failed_l1_transactions(&self) -> anyhow::Result<()> {
tracing::info!("Clearing failed L1 transactions");
if self
.connection_pool
.connection()
.await?
.eth_sender_dal()
.count_all_inflight_txs()
.await
.unwrap()
!= 0
{
tracing::error!(
"There are still some in-flight txs, cannot proceed. \
Please wait for eth-sender to process all in-flight txs and try again!"
);
}
self.connection_pool
.connection()
.await?
Expand Down
Loading

0 comments on commit f29fd12

Please sign in to comment.