Skip to content

Commit

Permalink
fix(client): SyncStart Refactor (#797)
Browse files Browse the repository at this point in the history
* feat: abstract pipeline

* fix(client): sync start refactor

* small fix
  • Loading branch information
refcell authored Nov 11, 2024
1 parent 8acfb82 commit 5d14375
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 116 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 17 additions & 8 deletions bin/client/src/kona.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use kona_client::{
executor::KonaExecutorConstructor,
l1::{OracleBlobProvider, OracleL1ChainProvider, OraclePipeline},
l2::OracleL2ChainProvider,
sync::SyncStart,
BootInfo, CachingOracle,
};
use kona_common::io;
Expand Down Expand Up @@ -70,27 +71,35 @@ fn main() -> Result<(), String> {
////////////////////////////////////////////////////////////////

// Create a new derivation driver with the given boot information and oracle.
let Ok((pipeline, cursor)) = OraclePipeline::new(
&boot,

let Ok(sync_start) = SyncStart::from(
oracle.clone(),
beacon,
l1_provider.clone(),
l2_provider.clone(),
&boot,
&mut l1_provider.clone(),
&mut l2_provider.clone(),
)
.await
else {
error!(target: "client", "Failed to create derivation pipeline");
io::print("Failed to create derivation pipeline\n");
error!(target: "client", "Failed to find sync start");
io::print("Failed to find sync start\n");
io::exit(1);
};
let cfg = Arc::new(boot.rollup_config.clone());
let pipeline = OraclePipeline::new(
cfg.clone(),
sync_start.clone(),
oracle.clone(),
beacon,
l1_provider.clone(),
l2_provider.clone(),
);
let executor = KonaExecutorConstructor::new(
&cfg,
l2_provider.clone(),
l2_provider,
fpvm_handle_register,
);
let mut driver = Driver::new(cursor, executor, pipeline);
let mut driver = Driver::new(sync_start.cursor, executor, pipeline);

// Run the derivation pipeline until we are able to produce the output root of the claimed
// L2 block.
Expand Down
109 changes: 14 additions & 95 deletions bin/client/src/l1/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Contains an oracle-backed pipeline for
//! Contains an oracle-backed pipeline.
use alloc::{boxed::Box, sync::Arc};
use alloy_consensus::Sealed;
use async_trait::async_trait;
use core::fmt::Debug;
use kona_derive::{
Expand All @@ -13,19 +12,17 @@ use kona_derive::{
AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal,
},
traits::{BlobProvider, ChainProvider, OriginProvider, Pipeline, SignalReceiver},
traits::{BlobProvider, OriginProvider, Pipeline, SignalReceiver},
types::{PipelineResult, Signal, StepResult},
};
use kona_driver::{DriverPipeline, SyncCursor};
use kona_mpt::TrieProvider;
use kona_preimage::{CommsClient, PreimageKey, PreimageKeyType};
use kona_driver::DriverPipeline;
use kona_preimage::CommsClient;
use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BatchValidationProvider, BlockInfo, L2BlockInfo};
use op_alloy_protocol::{BlockInfo, L2BlockInfo};
use op_alloy_rpc_types_engine::OpAttributesWithParent;

use crate::{
errors::OracleProviderError, l1::OracleL1ChainProvider, l2::OracleL2ChainProvider, BootInfo,
FlushableCache, HintType,
l1::OracleL1ChainProvider, l2::OracleL2ChainProvider, sync::SyncStart, FlushableCache,
};

/// An oracle-backed derivation pipeline.
Expand Down Expand Up @@ -58,22 +55,6 @@ pub type OracleAttributesQueue<DAP, O> = AttributesQueue<
OracleAttributesBuilder<O>,
>;

/// An error encountered when starting the pipeline
#[derive(Debug, derive_more::Display)]
pub enum PipelineStartError {
/// An oracle provider error.
#[display("Oracle provider error: {_0}")]
OracleProvider(OracleProviderError),
}

impl core::error::Error for PipelineStartError {}

impl From<OracleProviderError> for PipelineStartError {
fn from(err: OracleProviderError) -> Self {
Self::OracleProvider(err)
}
}

/// An oracle-backed derivation pipeline.
#[derive(Debug)]
pub struct OraclePipeline<O, B>
Expand All @@ -93,34 +74,14 @@ where
B: BlobProvider + Send + Sync + Debug + Clone,
{
/// Constructs a new oracle-backed derivation pipeline.
pub async fn new(
boot_info: &BootInfo,
pub fn new(
cfg: Arc<RollupConfig>,
sync_start: SyncStart,
caching_oracle: Arc<O>,
blob_provider: B,
mut chain_provider: OracleL1ChainProvider<O>,
mut l2_chain_provider: OracleL2ChainProvider<O>,
) -> Result<(Self, SyncCursor), PipelineStartError> {
let cfg = Arc::new(boot_info.rollup_config.clone());

// Fetch the startup information.
let (l1_origin, l2_safe_head, sc) = Self::sync_start(
caching_oracle.clone(),
boot_info,
&mut chain_provider,
&mut l2_chain_provider,
)
.await?;

// Walk back the starting L1 block by `channel_timeout` to ensure that the full channel is
// captured.
let channel_timeout =
boot_info.rollup_config.channel_timeout(l2_safe_head.block_info.timestamp);
let mut l1_origin_number = l1_origin.number.saturating_sub(channel_timeout);
if l1_origin_number < boot_info.rollup_config.genesis.l1.number {
l1_origin_number = boot_info.rollup_config.genesis.l1.number;
}
let l1_origin = chain_provider.block_info_by_number(l1_origin_number).await?;

chain_provider: OracleL1ChainProvider<O>,
l2_chain_provider: OracleL2ChainProvider<O>,
) -> Self {
let attributes = StatefulAttributesBuilder::new(
cfg.clone(),
l2_chain_provider.clone(),
Expand All @@ -134,54 +95,12 @@ where
.l2_chain_provider(l2_chain_provider)
.chain_provider(chain_provider)
.builder(attributes)
.origin(l1_origin)
.origin(sync_start.origin)
.build();
Ok((Self { pipeline, caching_oracle }, sc))
}

async fn sync_start(
caching_oracle: Arc<O>,
boot_info: &BootInfo,
chain_provider: &mut OracleL1ChainProvider<O>,
l2_chain_provider: &mut OracleL2ChainProvider<O>,
) -> Result<(BlockInfo, L2BlockInfo, SyncCursor), PipelineStartError> {
// Find the initial safe head, based off of the starting L2 block number in the boot info.
caching_oracle
.write(
&HintType::StartingL2Output
.encode_with(&[boot_info.agreed_l2_output_root.as_ref()]),
)
.await
.map_err(OracleProviderError::Preimage)?;
let mut output_preimage = [0u8; 128];
caching_oracle
.get_exact(
PreimageKey::new(*boot_info.agreed_l2_output_root, PreimageKeyType::Keccak256),
&mut output_preimage,
)
.await
.map_err(OracleProviderError::Preimage)?;

let safe_hash =
output_preimage[96..128].try_into().map_err(OracleProviderError::SliceConversion)?;
let safe_header = l2_chain_provider.header_by_hash(safe_hash)?;
let safe_head_info = l2_chain_provider.l2_block_info_by_number(safe_header.number).await?;
let l1_origin =
chain_provider.block_info_by_number(safe_head_info.l1_origin.number).await?;

Ok((
l1_origin,
safe_head_info,
SyncCursor::new(
safe_head_info,
Sealed::new_unchecked(safe_header, safe_hash),
boot_info.agreed_l2_output_root,
),
))
Self { pipeline, caching_oracle }
}
}

#[async_trait]
impl<O, B> DriverPipeline<OracleDerivationPipeline<O, B>> for OraclePipeline<O, B>
where
O: CommsClient + FlushableCache + Send + Sync + Debug,
Expand Down
2 changes: 2 additions & 0 deletions bin/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub mod l1;

pub mod l2;

pub mod sync;

pub mod errors;

pub mod executor;
Expand Down
79 changes: 79 additions & 0 deletions bin/client/src/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! Sync Start
use crate::{
errors::OracleProviderError, l1::OracleL1ChainProvider, l2::OracleL2ChainProvider, BootInfo,
FlushableCache, HintType,
};
use alloc::sync::Arc;
use alloy_consensus::Sealed;
use core::fmt::Debug;
use kona_derive::traits::ChainProvider;
use kona_driver::SyncCursor;
use kona_mpt::TrieProvider;
use kona_preimage::{CommsClient, PreimageKey, PreimageKeyType};
use op_alloy_protocol::{BatchValidationProvider, BlockInfo};

/// Sync Start
#[derive(Debug, Clone)]
pub struct SyncStart {
/// The l1 origin block info.
pub origin: BlockInfo,
/// The sync cursor used for the derivation driver.
pub cursor: SyncCursor,
}

impl SyncStart {
/// Constructs the [`SyncStart`] from the caching oracle, boot info, and providers.
pub async fn from<O>(
caching_oracle: Arc<O>,
boot_info: &BootInfo,
chain_provider: &mut OracleL1ChainProvider<O>,
l2_chain_provider: &mut OracleL2ChainProvider<O>,
) -> Result<Self, OracleProviderError>
where
O: CommsClient + FlushableCache + FlushableCache + Send + Sync + Debug,
{
// Find the initial safe head, based off of the starting L2 block number in the boot info.
caching_oracle
.write(
&HintType::StartingL2Output
.encode_with(&[boot_info.agreed_l2_output_root.as_ref()]),
)
.await
.map_err(OracleProviderError::Preimage)?;
let mut output_preimage = [0u8; 128];
caching_oracle
.get_exact(
PreimageKey::new(*boot_info.agreed_l2_output_root, PreimageKeyType::Keccak256),
&mut output_preimage,
)
.await
.map_err(OracleProviderError::Preimage)?;

let safe_hash =
output_preimage[96..128].try_into().map_err(OracleProviderError::SliceConversion)?;
let safe_header = l2_chain_provider.header_by_hash(safe_hash)?;
let safe_head_info = l2_chain_provider.l2_block_info_by_number(safe_header.number).await?;
let l1_origin =
chain_provider.block_info_by_number(safe_head_info.l1_origin.number).await?;

// Construct the sync cursor for the pipeline driver.
let cursor = SyncCursor::new(
safe_head_info,
Sealed::new_unchecked(safe_header, safe_hash),
boot_info.agreed_l2_output_root,
);

// Walk back the starting L1 block by `channel_timeout` to ensure that the full channel is
// captured.
let channel_timeout =
boot_info.rollup_config.channel_timeout(safe_head_info.block_info.timestamp);
let mut l1_origin_number = l1_origin.number.saturating_sub(channel_timeout);
if l1_origin_number < boot_info.rollup_config.genesis.l1.number {
l1_origin_number = boot_info.rollup_config.genesis.l1.number;
}
let origin = chain_provider.block_info_by_number(l1_origin_number).await?;

Ok(Self { origin, cursor })
}
}
2 changes: 1 addition & 1 deletion crates/driver/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use alloy_primitives::B256;
use op_alloy_protocol::L2BlockInfo;

/// A cursor that keeps track of the L2 tip block.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SyncCursor {
/// The current L2 safe head.
pub l2_safe_head: L2BlockInfo,
Expand Down

0 comments on commit 5d14375

Please sign in to comment.