Skip to content

Commit

Permalink
chore(conductor): send boxed objects over channels (#1865)
Browse files Browse the repository at this point in the history
## Summary
Changed firm block channel to use boxed `ReconstructedBlock`.

## Background
A large error variant lint was triggered when running clippy with Rust
1.83.0 in #1857 due to send errors returning the whole block. Large enum
variants should be avoided because enums are only as small as their
largest variant:
https://rust-lang.github.io/rust-clippy/master/index.html#large_enum_variant.
Changing the channel to consist of a boxed block instead solves this
problem at its source.

## Changes
- Changed firm block channel to use boxed `ReconstructedBlock` instead
of it being unboxed.

## Testing
Passing all tests.

## Changelogs
No updates needed.

## Breaking Changes
Overridden code freeze since this is a very small, non breaking change
that shouldn't have any bearing since our previous audit.

## Related Issues
closes #1858
  • Loading branch information
ethanoroshiba authored Jan 6, 2025
1 parent 7a0e4d0 commit 8be7af8
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 26 deletions.
28 changes: 15 additions & 13 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,18 +516,20 @@ impl RunningReader {
match self.executor.try_send_firm_block(block) {
Ok(()) => self.advance_reference_celestia_height(celestia_height),
Err(FirmTrySendError::Channel {
source: mpsc::error::TrySendError::Full(block),
}) => {
trace!(
"executor channel is full; rescheduling block fetch until the channel opens up"
);
self.enqueued_block = enqueue_block(self.executor.clone(), block).boxed().fuse();
}

Err(FirmTrySendError::Channel {
source: mpsc::error::TrySendError::Closed(_),
}) => bail!("exiting because executor channel is closed"),

source,
}) => match source {
mpsc::error::TrySendError::Full(block) => {
trace!(
"executor channel is full; rescheduling block fetch until the channel \
opens up"
);
self.enqueued_block =
enqueue_block(self.executor.clone(), block).boxed().fuse();
}
mpsc::error::TrySendError::Closed(_) => {
bail!("exiting because executor channel is closed");
}
},
Err(FirmTrySendError::NotSet) => bail!(
"exiting because executor was configured without firm commitments; this Celestia \
reader should have never been started"
Expand Down Expand Up @@ -663,7 +665,7 @@ impl FetchConvertVerifyAndReconstruct {
#[instrument(skip_all, err)]
async fn enqueue_block(
executor: executor::Handle<StateIsInit>,
block: ReconstructedBlock,
block: Box<ReconstructedBlock>,
) -> Result<u64, FirmSendError> {
let celestia_height = block.celestia_height;
executor.send_firm_block(block).await?;
Expand Down
3 changes: 2 additions & 1 deletion crates/astria-conductor/src/executor/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use super::{
state,
Executor,
Handle,
ReconstructedBlock,
StateNotInit,
};
use crate::{
Expand Down Expand Up @@ -44,7 +45,7 @@ impl Builder {
let mut firm_block_tx = None;
let mut firm_block_rx = None;
if mode.is_with_firm() {
let (tx, rx) = mpsc::channel(16);
let (tx, rx) = mpsc::channel::<Box<ReconstructedBlock>>(16);
firm_block_tx = Some(tx);
firm_block_rx = Some(rx);
}
Expand Down
22 changes: 10 additions & 12 deletions crates/astria-conductor/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub(crate) enum FirmSendError {
#[error("failed sending blocks to executor")]
Channel {
#[from]
source: mpsc::error::SendError<ReconstructedBlock>,
source: mpsc::error::SendError<Box<ReconstructedBlock>>,
},
}

Expand All @@ -85,7 +85,7 @@ pub(crate) enum FirmTrySendError {
#[error("failed sending blocks to executor")]
Channel {
#[from]
source: mpsc::error::TrySendError<ReconstructedBlock>,
source: mpsc::error::TrySendError<Box<ReconstructedBlock>>,
},
}

Expand Down Expand Up @@ -115,7 +115,7 @@ pub(crate) enum SoftTrySendError {
/// information.
#[derive(Debug, Clone)]
pub(crate) struct Handle<TStateInit = StateNotInit> {
firm_blocks: Option<mpsc::Sender<ReconstructedBlock>>,
firm_blocks: Option<mpsc::Sender<Box<ReconstructedBlock>>>,
soft_blocks: Option<channel::Sender<FilteredSequencerBlock>>,
state: StateReceiver,
_state_init: TStateInit,
Expand Down Expand Up @@ -146,20 +146,18 @@ impl Handle<StateIsInit> {
#[instrument(skip_all, err)]
pub(crate) async fn send_firm_block(
self,
block: ReconstructedBlock,
block: impl Into<Box<ReconstructedBlock>>,
) -> Result<(), FirmSendError> {
let sender = self.firm_blocks.as_ref().ok_or(FirmSendError::NotSet)?;
sender.send(block).await?;
Ok(())
Ok(sender.send(block.into()).await?)
}

pub(crate) fn try_send_firm_block(
&self,
block: ReconstructedBlock,
block: impl Into<Box<ReconstructedBlock>>,
) -> Result<(), FirmTrySendError> {
let sender = self.firm_blocks.as_ref().ok_or(FirmTrySendError::NotSet)?;
sender.try_send(block)?;
Ok(())
Ok(sender.try_send(block.into())?)
}

#[instrument(skip_all, err)]
Expand Down Expand Up @@ -226,7 +224,7 @@ pub(crate) struct Executor {
/// The channel of which this executor receives blocks for executing
/// firm commitments.
/// Only set if `mode` is `FirmOnly` or `SoftAndFirm`.
firm_blocks: Option<mpsc::Receiver<ReconstructedBlock>>,
firm_blocks: Option<mpsc::Receiver<Box<ReconstructedBlock>>>,

/// The channel of which this executor receives blocks for executing
/// soft commitments.
Expand Down Expand Up @@ -456,9 +454,9 @@ impl Executor {
block.height = block.sequencer_height().value(),
err,
))]
async fn execute_firm(&mut self, block: ReconstructedBlock) -> eyre::Result<()> {
async fn execute_firm(&mut self, block: Box<ReconstructedBlock>) -> eyre::Result<()> {
let celestia_height = block.celestia_height;
let executable_block = ExecutableBlock::from_reconstructed(block);
let executable_block = ExecutableBlock::from_reconstructed(*block);
let expected_height = self.state.next_expected_firm_sequencer_height();
let block_height = executable_block.height;
ensure!(
Expand Down

0 comments on commit 8be7af8

Please sign in to comment.