diff --git a/crates/astria-sequencer/src/app.rs b/crates/astria-sequencer/src/app.rs index d9bb547bad..14c122c6f5 100644 --- a/crates/astria-sequencer/src/app.rs +++ b/crates/astria-sequencer/src/app.rs @@ -8,6 +8,7 @@ use std::{ use anyhow::{ anyhow, + bail, ensure, Context, }; @@ -34,6 +35,7 @@ use sha2::{ Digest as _, Sha256, }; +use telemetry::display::json; use tendermint::{ abci::{ self, @@ -73,9 +75,12 @@ use crate::{ component::Component as _, genesis::GenesisState, ibc::component::IbcComponent, - proposal::commitment::{ - generate_rollup_datas_commitment, - GeneratedCommitments, + proposal::{ + block_size_constraints::BlockSizeConstraints, + commitment::{ + generate_rollup_datas_commitment, + GeneratedCommitments, + }, }, state_ext::{ StateReadExt as _, @@ -87,9 +92,6 @@ use crate::{ /// The inter-block state being written to by the application. type InterBlockState = Arc>; -/// The maximum number of bytes allowed in sequencer action data. -const MAX_SEQUENCE_DATA_BYTES_PER_BLOCK: usize = 256_000; - /// The Sequencer application, written as a bundle of [`Component`]s. /// /// Note: this is called `App` because this is a Tendermint ABCI application, @@ -237,7 +239,16 @@ impl App { self.validator_address = Some(prepare_proposal.proposer_address); self.update_state_for_new_round(&storage); - let (signed_txs, txs_to_include) = self.execute_block_data(prepare_proposal.txs).await; + let mut block_size_constraints = BlockSizeConstraints::new( + usize::try_from(prepare_proposal.max_tx_bytes) + .context("failed to convert max_tx_bytes to usize")?, + ) + .context("failed to create block size constraints")?; + + let (signed_txs, txs_to_include) = self + .execute_and_filter_block_data(prepare_proposal.txs, &mut block_size_constraints) + .await + .context("failed to execute and filter transactions in prepare_proposal")?; let deposits = self .state @@ -299,18 +310,10 @@ impl App { .try_into() .map_err(|_| anyhow!("chain IDs commitment must be 32 bytes"))?; - let expected_txs_len = txs.len(); - - let (signed_txs, txs_to_include) = self.execute_block_data(txs.into()).await; - - // all txs in the proposal should be deserializable and executable - // if any txs were not deserializeable or executable, they would not have been - // returned by `execute_block_data`, thus the length of `txs_to_include` - // will be shorter than that of `txs`. - ensure!( - txs_to_include.len() == expected_txs_len, - "transactions to be included do not match expected", - ); + let signed_txs = self + .execute_block_data(txs) + .await + .context("transactions failed to decode and execute")?; let deposits = self .state @@ -337,31 +340,49 @@ impl App { Ok(()) } - /// Executes the given transaction data, writing it to the app's `StateDelta`. + /// Executes and filters the given transaction data, writing it to the app's `StateDelta`. /// /// The result of execution of every transaction which is successfully decoded /// is stored in `self.execution_result`. /// /// Returns the transactions which were successfully decoded and executed /// in both their [`SignedTransaction`] and raw bytes form. - #[instrument(name = "App::execute_block_data", skip_all, fields( + /// + /// Will filter transactions that are too large for the sequencer block or + /// the cometBFT block. + #[instrument(name = "App::execute_and_filter_block_data", skip_all, fields( tx_count = txs.len() ))] - async fn execute_block_data( + async fn execute_and_filter_block_data( &mut self, txs: Vec, - ) -> (Vec, Vec) { - let mut signed_txs = Vec::with_capacity(txs.len()); + block_size_constraints: &mut BlockSizeConstraints, + ) -> anyhow::Result<(Vec, Vec)> { + let mut signed_txs: Vec = Vec::with_capacity(txs.len()); let mut validated_txs = Vec::with_capacity(txs.len()); - let mut block_sequence_data_bytes: usize = 0; let mut excluded_tx_count: usize = 0; for tx in txs { + let tx_hash = Sha256::digest(&tx); + + // don't include tx if it would make the cometBFT block too large + if !block_size_constraints.cometbft_has_space(tx.len()) { + debug!( + transaction_hash = %telemetry::display::base64(&tx_hash), + block_size_constraints = %json(&block_size_constraints), + tx_data_bytes = tx.len(), + "excluding transactions: max cometBFT data limit reached" + ); + excluded_tx_count += 1; + continue; + } + + // try to decode the tx let signed_tx = match signed_transaction_from_bytes(&tx) { Err(e) => { debug!( error = AsRef::::as_ref(&e), - "failed to decode deliver tx payload to signed transaction; ignoring it", + "failed to decode deliver tx payload to signed transaction; excluding it", ); excluded_tx_count += 1; continue; @@ -369,8 +390,7 @@ impl App { Ok(tx) => tx, }; - let tx_hash = Sha256::digest(&tx); - + // check if tx's sequence data will fit into sequence block let tx_sequence_data_bytes = signed_tx .unsigned_transaction() .actions @@ -378,38 +398,34 @@ impl App { .filter_map(Action::as_sequence) .fold(0usize, |acc, seq| acc + seq.data.len()); - // Don't include tx if it would make the sequenced block data too large. - if block_sequence_data_bytes + tx_sequence_data_bytes - > MAX_SEQUENCE_DATA_BYTES_PER_BLOCK - { + if !block_size_constraints.sequencer_has_space(tx_sequence_data_bytes) { debug!( transaction_hash = %telemetry::display::base64(&tx_hash), - included_data_bytes = block_sequence_data_bytes, + block_size_constraints = %json(&block_size_constraints), tx_data_bytes = tx_sequence_data_bytes, - max_data_bytes = MAX_SEQUENCE_DATA_BYTES_PER_BLOCK, "excluding transaction: max block sequenced data limit reached" ); excluded_tx_count += 1; continue; } - // store transaction execution result, indexed by tx hash - match self.deliver_tx(signed_tx.clone()).await { - Ok(events) => { - self.execution_result.insert(tx_hash.into(), Ok(events)); - signed_txs.push(signed_tx); - validated_txs.push(tx); - block_sequence_data_bytes += tx_sequence_data_bytes; - } - Err(e) => { - debug!( - transaction_hash = %telemetry::display::base64(&tx_hash), - error = AsRef::::as_ref(&e), - "failed to execute transaction, not including in block" - ); - excluded_tx_count += 1; - self.execution_result.insert(tx_hash.into(), Err(e)); - } + // execute the tx and include it if it succeeds + let tx_hash = self.execute_and_store_signed_tx(&signed_tx).await; + if let Some(Err(_e)) = self.execution_result.get(&tx_hash) { + debug!( + transaction_hash = %telemetry::display::base64(&tx_hash), + "failed to execute transaction, not including in block" + ); + excluded_tx_count += 1; + } else { + block_size_constraints + .sequencer_checked_add(tx_sequence_data_bytes) + .context("error growing sequencer block size")?; + block_size_constraints + .cometbft_checked_add(tx.len()) + .context("error growing cometBFT block size")?; + signed_txs.push(signed_tx); + validated_txs.push(tx); } } @@ -421,7 +437,68 @@ impl App { ); } - (signed_txs, validated_txs) + Ok((signed_txs, validated_txs)) + } + + /// Executes the given transaction data, writing it to the app's `StateDelta` + /// and stores the results in `self.execution_result`. + /// + /// Will throw error if any transaction fails to decode or execute as these + /// transactions should've been filtered in prepare proposal. + /// + /// Returns the transactions which were [`SignedTransaction`] form. + #[instrument(name = "App::execute_block_data", skip_all, fields( + tx_count = txs.len() + ))] + async fn execute_block_data( + &mut self, + txs: VecDeque, + ) -> anyhow::Result, anyhow::Error> { + let mut signed_txs = Vec::new(); + for tx in txs { + // all txs in the proposal should be deserializable and executable, + // if any txs were not deserializeable or executable, they would not have been + // returned by `execute_and_filter_block_data` + let signed_tx = signed_transaction_from_bytes(&tx).context( + "failed to decode tx bytes to signed transaction, should've been filtered out in \ + prepare proposal", + )?; + + // execute transaction and ensure that it succeeded + let tx_hash = self.execute_and_store_signed_tx(&signed_tx).await; + if let Some(Err(_e)) = self.execution_result.get(&tx_hash) { + bail!( + "transaction failed during execution, should've been filtered out in prepare \ + proposal" + ) + } + signed_txs.push(signed_tx); + } + Ok(signed_txs) + } + + /// Executes the given transaction data, writing it to the app's `StateDelta` + /// and stores the results in `self.execution_result`. + /// + /// Returns the transaction's hash that can be used to look up the result. + #[instrument(name = "App::execute_and_store_signed_tx", skip_all)] + async fn execute_and_store_signed_tx(&mut self, signed_tx: &SignedTransaction) -> [u8; 32] { + // store transaction execution result, indexed by tx hash + let tx_hash: [u8; 32] = Sha256::digest(signed_tx.to_raw().encode_to_vec()).into(); + match self.deliver_tx(signed_tx.clone()).await { + Ok(events) => { + self.execution_result.insert(tx_hash, Ok(events)); + } + Err(e) => { + debug!( + transaction_hash = %telemetry::display::base64(&tx_hash), + error = AsRef::::as_ref(&e), + "failed to execute transaction" + ); + self.execution_result.insert(tx_hash, Err(e)); + } + } + tx_hash } #[instrument(name = "App::begin_block", skip_all)] @@ -2192,4 +2269,152 @@ mod test { }, ) } + + #[tokio::test] + async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() { + let (mut app, storage) = initialize_app_with_storage(None, vec![]).await; + + // update storage with initalized genesis app state + let intermediate_state = StateDelta::new(storage.latest_snapshot()); + let state = Arc::try_unwrap(std::mem::replace( + &mut app.state, + Arc::new(intermediate_state), + )) + .expect("we have exclusive ownership of the State at commit()"); + storage + .commit(state) + .await + .expect("applying genesis state should be okay"); + + // create txs which will cause cometBFT overflow + let (alice_signing_key, _) = get_alice_signing_key_and_address(); + let tx_pass = UnsignedTransaction { + nonce: 0, + actions: vec![ + SequenceAction { + rollup_id: RollupId::from([1u8; 32]), + data: vec![1u8; 100_000], + fee_asset_id: get_native_asset().id(), + } + .into(), + ], + } + .into_signed(&alice_signing_key); + let tx_overflow = UnsignedTransaction { + nonce: 1, + actions: vec![ + SequenceAction { + rollup_id: RollupId::from([1u8; 32]), + data: vec![1u8; 100_000], + fee_asset_id: get_native_asset().id(), + } + .into(), + ], + } + .into_signed(&alice_signing_key); + + let txs: Vec = vec![ + tx_pass.to_raw().encode_to_vec().into(), + tx_overflow.to_raw().encode_to_vec().into(), + ]; + + // send to prepare_proposal + let prepare_args = abci::request::PrepareProposal { + max_tx_bytes: 200_000, + txs, + local_last_commit: None, + misbehavior: vec![], + height: Height::default(), + time: Time::now(), + next_validators_hash: Hash::default(), + proposer_address: account::Id::new([1u8; 20]), + }; + + let result = app + .prepare_proposal(prepare_args, storage) + .await + .expect("too large transactions should not cause prepare proposal to fail"); + + // see only first tx made it in + assert_eq!( + result.txs.len(), + 3, + "total transaciton length should be three, including the two commitments and the one \ + tx that fit" + ); + } + + #[tokio::test] + async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() { + let (mut app, storage) = initialize_app_with_storage(None, vec![]).await; + + // update storage with initalized genesis app state + let intermediate_state = StateDelta::new(storage.latest_snapshot()); + let state = Arc::try_unwrap(std::mem::replace( + &mut app.state, + Arc::new(intermediate_state), + )) + .expect("we have exclusive ownership of the State at commit()"); + storage + .commit(state) + .await + .expect("applying genesis state should be okay"); + + // create txs which will cause sequencer overflow (max is currently 256_000 bytes) + let (alice_signing_key, _) = get_alice_signing_key_and_address(); + let tx_pass = UnsignedTransaction { + nonce: 0, + actions: vec![ + SequenceAction { + rollup_id: RollupId::from([1u8; 32]), + data: vec![1u8; 200_000], + fee_asset_id: get_native_asset().id(), + } + .into(), + ], + } + .into_signed(&alice_signing_key); + let tx_overflow = UnsignedTransaction { + nonce: 1, + actions: vec![ + SequenceAction { + rollup_id: RollupId::from([1u8; 32]), + data: vec![1u8; 100_000], + fee_asset_id: get_native_asset().id(), + } + .into(), + ], + } + .into_signed(&alice_signing_key); + + let txs: Vec = vec![ + tx_pass.to_raw().encode_to_vec().into(), + tx_overflow.to_raw().encode_to_vec().into(), + ]; + + // send to prepare_proposal + let prepare_args = abci::request::PrepareProposal { + max_tx_bytes: 600_000, // make large enough to overflow sequencer bytes first + txs, + local_last_commit: None, + misbehavior: vec![], + height: Height::default(), + time: Time::now(), + next_validators_hash: Hash::default(), + proposer_address: account::Id::new([1u8; 20]), + }; + + let result = app + .prepare_proposal(prepare_args, storage) + .await + .expect("too large transactions should not cause prepare proposal to fail"); + + // see only first tx made it in + assert_eq!( + result.txs.len(), + 3, + "total transaciton length should be three, including the two commitments and the one \ + tx that fit" + ); + } } diff --git a/crates/astria-sequencer/src/proposal/block_size_constraints.rs b/crates/astria-sequencer/src/proposal/block_size_constraints.rs new file mode 100644 index 0000000000..466cd80d51 --- /dev/null +++ b/crates/astria-sequencer/src/proposal/block_size_constraints.rs @@ -0,0 +1,127 @@ +use anyhow::{ + anyhow, + ensure, + Context, +}; + +use super::commitment::GeneratedCommitments; + +/// The maximum number of bytes allowed in sequencer action data. +const MAX_SEQUENCE_DATA_BYTES_PER_BLOCK: usize = 256_000; + +/// Struct for organizing block size constraints in prepare proposal +#[derive(serde::Serialize)] +pub(crate) struct BlockSizeConstraints { + max_size_sequencer: usize, + max_size_cometbft: usize, + current_size_sequencer: usize, + current_size_cometbft: usize, +} + +impl BlockSizeConstraints { + pub(crate) fn new(cometbft_max_size: usize) -> anyhow::Result { + if cometbft_max_size < GeneratedCommitments::TOTAL_SIZE { + return Err(anyhow!( + "cometbft_max_size must be at least GeneratedCommitments::TOTAL_SIZE" + )); + } + + Ok(BlockSizeConstraints { + max_size_sequencer: MAX_SEQUENCE_DATA_BYTES_PER_BLOCK, + max_size_cometbft: cometbft_max_size, + current_size_sequencer: 0, + current_size_cometbft: GeneratedCommitments::TOTAL_SIZE, + }) + } + + pub(crate) fn sequencer_has_space(&self, size: usize) -> bool { + size <= self + .max_size_sequencer + .saturating_sub(self.current_size_sequencer) + } + + pub(crate) fn cometbft_has_space(&self, size: usize) -> bool { + size <= self + .max_size_cometbft + .saturating_sub(self.current_size_cometbft) + } + + pub(crate) fn sequencer_checked_add(&mut self, size: usize) -> anyhow::Result<()> { + let new_size = self + .current_size_sequencer + .checked_add(size) + .context("overflow adding to sequencer size")?; + ensure!( + new_size <= self.max_size_sequencer, + "max sequencer size reached" + ); + self.current_size_sequencer = new_size; + Ok(()) + } + + pub(crate) fn cometbft_checked_add(&mut self, size: usize) -> anyhow::Result<()> { + let new_size = self + .current_size_cometbft + .checked_add(size) + .context("overflow adding to cometBFT size")?; + ensure!( + new_size <= self.max_size_cometbft, + "max cometBFT size reached" + ); + self.current_size_cometbft = new_size; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cometbft_checks() { + let mut block_size_constraints = + BlockSizeConstraints::new(10 + GeneratedCommitments::TOTAL_SIZE) + .expect("should be able to create block constraints with this size"); + assert!( + block_size_constraints.cometbft_has_space(10), + "cometBFT has space" + ); + assert!( + !block_size_constraints.cometbft_has_space(11), + "cometBFT doesn't have space" + ); + assert!( + block_size_constraints.cometbft_checked_add(10).is_ok(), + "should be able to grow to cometBFT max size" + ); + assert!( + block_size_constraints.cometbft_checked_add(1).is_err(), + "shouldn't be able to grow past cometBFT max size" + ); + } + + #[test] + fn sequencer_checks() { + let mut block_size_constraints = + BlockSizeConstraints::new(GeneratedCommitments::TOTAL_SIZE) + .expect("should be able to create block constraints with this size"); + assert!( + block_size_constraints.sequencer_has_space(MAX_SEQUENCE_DATA_BYTES_PER_BLOCK), + "sequencer has space" + ); + assert!( + !block_size_constraints.sequencer_has_space(MAX_SEQUENCE_DATA_BYTES_PER_BLOCK + 1), + "sequencer doesn't have space" + ); + assert!( + block_size_constraints + .sequencer_checked_add(MAX_SEQUENCE_DATA_BYTES_PER_BLOCK) + .is_ok(), + "should be able to grow to sequencer max size" + ); + assert!( + block_size_constraints.sequencer_checked_add(1).is_err(), + "shouldn't be able to grow past sequencer max size" + ); + } +} diff --git a/crates/astria-sequencer/src/proposal/commitment.rs b/crates/astria-sequencer/src/proposal/commitment.rs index 931188e295..7c6aa3fe90 100644 --- a/crates/astria-sequencer/src/proposal/commitment.rs +++ b/crates/astria-sequencer/src/proposal/commitment.rs @@ -18,6 +18,9 @@ pub(crate) struct GeneratedCommitments { } impl GeneratedCommitments { + /// The total size of the commitments in bytes. + pub(crate) const TOTAL_SIZE: usize = 64; + /// Converts the commitments plus external transaction data into a vector of bytes /// which can be used as the block's transactions. #[must_use] diff --git a/crates/astria-sequencer/src/proposal/mod.rs b/crates/astria-sequencer/src/proposal/mod.rs index bb616bded5..75663b7892 100644 --- a/crates/astria-sequencer/src/proposal/mod.rs +++ b/crates/astria-sequencer/src/proposal/mod.rs @@ -1 +1,2 @@ +pub(crate) mod block_size_constraints; pub(crate) mod commitment;