Skip to content

Commit

Permalink
feat(driver,client): Pipeline Cursor Refactor (#798)
Browse files Browse the repository at this point in the history
* fix(driver,client): pipeline cursor

* trigger ci
  • Loading branch information
refcell authored Nov 11, 2024
1 parent 5d14375 commit 2e943f4
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 107 deletions.
8 changes: 4 additions & 4 deletions bin/client/src/kona.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use kona_client::{
executor::KonaExecutorConstructor,
l1::{OracleBlobProvider, OracleL1ChainProvider, OraclePipeline},
l2::OracleL2ChainProvider,
sync::SyncStart,
sync::new_pipeline_cursor,
BootInfo, CachingOracle,
};
use kona_common::io;
Expand Down Expand Up @@ -72,7 +72,7 @@ fn main() -> Result<(), String> {

// Create a new derivation driver with the given boot information and oracle.

let Ok(sync_start) = SyncStart::from(
let Ok(cursor) = new_pipeline_cursor(
oracle.clone(),
&boot,
&mut l1_provider.clone(),
Expand All @@ -87,7 +87,7 @@ fn main() -> Result<(), String> {
let cfg = Arc::new(boot.rollup_config.clone());
let pipeline = OraclePipeline::new(
cfg.clone(),
sync_start.clone(),
cursor.clone(),
oracle.clone(),
beacon,
l1_provider.clone(),
Expand All @@ -99,7 +99,7 @@ fn main() -> Result<(), String> {
l2_provider,
fpvm_handle_register,
);
let mut driver = Driver::new(sync_start.cursor, executor, pipeline);
let mut driver = Driver::new(cursor, executor, pipeline);

// Run the derivation pipeline until we are able to produce the output root of the claimed
// L2 block.
Expand Down
10 changes: 4 additions & 6 deletions bin/client/src/l1/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ use kona_derive::{
traits::{BlobProvider, OriginProvider, Pipeline, SignalReceiver},
types::{PipelineResult, Signal, StepResult},
};
use kona_driver::DriverPipeline;
use kona_driver::{DriverPipeline, PipelineCursor};
use kona_preimage::CommsClient;
use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BlockInfo, L2BlockInfo};
use op_alloy_rpc_types_engine::OpAttributesWithParent;

use crate::{
l1::OracleL1ChainProvider, l2::OracleL2ChainProvider, sync::SyncStart, FlushableCache,
};
use crate::{l1::OracleL1ChainProvider, l2::OracleL2ChainProvider, FlushableCache};

/// An oracle-backed derivation pipeline.
pub type OracleDerivationPipeline<O, B> = DerivationPipeline<
Expand Down Expand Up @@ -76,7 +74,7 @@ where
/// Constructs a new oracle-backed derivation pipeline.
pub fn new(
cfg: Arc<RollupConfig>,
sync_start: SyncStart,
sync_start: PipelineCursor,
caching_oracle: Arc<O>,
blob_provider: B,
chain_provider: OracleL1ChainProvider<O>,
Expand All @@ -95,7 +93,7 @@ where
.l2_chain_provider(l2_chain_provider)
.chain_provider(chain_provider)
.builder(attributes)
.origin(sync_start.origin)
.origin(sync_start.origin())
.build();
Self { pipeline, caching_oracle }
}
Expand Down
107 changes: 45 additions & 62 deletions bin/client/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,72 +8,55 @@ use alloc::sync::Arc;
use alloy_consensus::Sealed;
use core::fmt::Debug;
use kona_derive::traits::ChainProvider;
use kona_driver::SyncCursor;
use kona_driver::{PipelineCursor, TipCursor};
use kona_mpt::TrieProvider;
use kona_preimage::{CommsClient, PreimageKey, PreimageKeyType};
use op_alloy_protocol::{BatchValidationProvider, BlockInfo};
use op_alloy_protocol::BatchValidationProvider;

/// Sync Start
#[derive(Debug, Clone)]
pub struct SyncStart {
/// The l1 origin block info.
pub origin: BlockInfo,
/// The sync cursor used for the derivation driver.
pub cursor: SyncCursor,
}

impl SyncStart {
/// Constructs the [`SyncStart`] from the caching oracle, boot info, and providers.
pub async fn from<O>(
caching_oracle: Arc<O>,
boot_info: &BootInfo,
chain_provider: &mut OracleL1ChainProvider<O>,
l2_chain_provider: &mut OracleL2ChainProvider<O>,
) -> Result<Self, OracleProviderError>
where
O: CommsClient + FlushableCache + FlushableCache + Send + Sync + Debug,
{
// Find the initial safe head, based off of the starting L2 block number in the boot info.
caching_oracle
.write(
&HintType::StartingL2Output
.encode_with(&[boot_info.agreed_l2_output_root.as_ref()]),
)
.await
.map_err(OracleProviderError::Preimage)?;
let mut output_preimage = [0u8; 128];
caching_oracle
.get_exact(
PreimageKey::new(*boot_info.agreed_l2_output_root, PreimageKeyType::Keccak256),
&mut output_preimage,
)
.await
.map_err(OracleProviderError::Preimage)?;

let safe_hash =
output_preimage[96..128].try_into().map_err(OracleProviderError::SliceConversion)?;
let safe_header = l2_chain_provider.header_by_hash(safe_hash)?;
let safe_head_info = l2_chain_provider.l2_block_info_by_number(safe_header.number).await?;
let l1_origin =
chain_provider.block_info_by_number(safe_head_info.l1_origin.number).await?;
/// Constructs a [`PipelineCursor`] from the caching oracle, boot info, and providers.
pub async fn new_pipeline_cursor<O>(
caching_oracle: Arc<O>,
boot_info: &BootInfo,
chain_provider: &mut OracleL1ChainProvider<O>,
l2_chain_provider: &mut OracleL2ChainProvider<O>,
) -> Result<PipelineCursor, OracleProviderError>
where
O: CommsClient + FlushableCache + FlushableCache + Send + Sync + Debug,
{
// Find the initial safe head, based off of the starting L2 block number in the boot info.
caching_oracle
.write(&HintType::StartingL2Output.encode_with(&[boot_info.agreed_l2_output_root.as_ref()]))
.await
.map_err(OracleProviderError::Preimage)?;
let mut output_preimage = [0u8; 128];
caching_oracle
.get_exact(
PreimageKey::new(*boot_info.agreed_l2_output_root, PreimageKeyType::Keccak256),
&mut output_preimage,
)
.await
.map_err(OracleProviderError::Preimage)?;

// Construct the sync cursor for the pipeline driver.
let cursor = SyncCursor::new(
safe_head_info,
Sealed::new_unchecked(safe_header, safe_hash),
boot_info.agreed_l2_output_root,
);
let safe_hash =
output_preimage[96..128].try_into().map_err(OracleProviderError::SliceConversion)?;
let safe_header = l2_chain_provider.header_by_hash(safe_hash)?;
let safe_head_info = l2_chain_provider.l2_block_info_by_number(safe_header.number).await?;
let l1_origin = chain_provider.block_info_by_number(safe_head_info.l1_origin.number).await?;

// Walk back the starting L1 block by `channel_timeout` to ensure that the full channel is
// captured.
let channel_timeout =
boot_info.rollup_config.channel_timeout(safe_head_info.block_info.timestamp);
let mut l1_origin_number = l1_origin.number.saturating_sub(channel_timeout);
if l1_origin_number < boot_info.rollup_config.genesis.l1.number {
l1_origin_number = boot_info.rollup_config.genesis.l1.number;
}
let origin = chain_provider.block_info_by_number(l1_origin_number).await?;

Ok(Self { origin, cursor })
// Walk back the starting L1 block by `channel_timeout` to ensure that the full channel is
// captured.
let channel_timeout =
boot_info.rollup_config.channel_timeout(safe_head_info.block_info.timestamp);
let mut l1_origin_number = l1_origin.number.saturating_sub(channel_timeout);
if l1_origin_number < boot_info.rollup_config.genesis.l1.number {
l1_origin_number = boot_info.rollup_config.genesis.l1.number;
}
let origin = chain_provider.block_info_by_number(l1_origin_number).await?;

// Construct the cursor.
let safe_header = Sealed::new_unchecked(safe_header, safe_hash);
let mut cursor = PipelineCursor::new(channel_timeout, origin);
let tip = TipCursor::new(safe_head_info, safe_header, boot_info.agreed_l2_output_root);
cursor.advance(origin, tip);
Ok(cursor)
}
25 changes: 16 additions & 9 deletions crates/driver/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! The driver of the Derivation Pipeline.
//! The driver of the kona derivation pipeline.
use alloc::vec::Vec;
use alloy_consensus::{BlockBody, Sealable};
Expand All @@ -16,7 +16,10 @@ use op_alloy_protocol::L2BlockInfo;
use op_alloy_rpc_types_engine::OpAttributesWithParent;
use tracing::{error, info, warn};

use crate::{DriverError, DriverPipeline, DriverResult, Executor, ExecutorConstructor, SyncCursor};
use crate::{
DriverError, DriverPipeline, DriverResult, Executor, ExecutorConstructor, PipelineCursor,
TipCursor,
};

/// The Rollup Driver entrypoint.
#[derive(Debug)]
Expand All @@ -34,7 +37,7 @@ where
/// A pipeline abstraction.
pipeline: DP,
/// Cursor to keep track of the L2 tip
cursor: SyncCursor,
cursor: PipelineCursor,
/// Executor constructor.
executor: EC,
}
Expand All @@ -47,7 +50,7 @@ where
P: Pipeline + SignalReceiver + Send + Sync + Debug,
{
/// Creates a new [Driver].
pub const fn new(cursor: SyncCursor, executor: EC, pipeline: DP) -> Self {
pub const fn new(cursor: PipelineCursor, executor: EC, pipeline: DP) -> Self {
Self {
_marker: core::marker::PhantomData,
_marker2: core::marker::PhantomData,
Expand Down Expand Up @@ -161,14 +164,18 @@ where
},
};

// Update the safe head.
self.cursor.l2_safe_head = L2BlockInfo::from_block_and_genesis(
// Get the pipeline origin and update the cursor.
let origin = self.pipeline.origin().ok_or(PipelineError::MissingOrigin.crit())?;
let l2_info = L2BlockInfo::from_block_and_genesis(
&block,
&self.pipeline.rollup_config().genesis,
)?;
self.cursor.l2_safe_head_header = header.clone().seal_slow();
self.cursor.l2_safe_head_output_root =
executor.compute_output_root().map_err(DriverError::Executor)?;
let cursor = TipCursor::new(
l2_info,
header.clone().seal_slow(),
executor.compute_output_root().map_err(DriverError::Executor)?,
);
self.cursor.advance(origin, cursor);
}
}
}
121 changes: 96 additions & 25 deletions crates/driver/src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,113 @@
//! Contains the cursor for the derivation driver.
//! Contains the cursor for the derivation pipeline.
use crate::TipCursor;
use alloc::collections::{btree_map::BTreeMap, vec_deque::VecDeque};
use alloy_consensus::{Header, Sealed};
use alloy_primitives::B256;
use op_alloy_protocol::L2BlockInfo;
use alloy_primitives::{map::HashMap, B256};
use op_alloy_protocol::{BlockInfo, L2BlockInfo};

/// A cursor that keeps track of the L2 tip block.
/// A cursor that tracks the pipeline tip.
#[derive(Debug, Clone)]
pub struct SyncCursor {
/// The current L2 safe head.
pub l2_safe_head: L2BlockInfo,
/// The header of the L2 safe head.
pub l2_safe_head_header: Sealed<Header>,
/// The output root of the L2 safe head.
pub l2_safe_head_output_root: B256,
pub struct PipelineCursor {
/// The block cache capacity before evicting old entries
/// (to avoid unbounded memory growth)
capacity: usize,
/// The channel timeout used to create the cursor.
channel_timeout: u64,
/// The l1 Origin of the pipeline.
origin: BlockInfo,
/// The L1 origin block numbers for which we have an L2 block in the cache.
/// Used to keep track of the order of insertion and evict the oldest entry.
origins: VecDeque<u64>,
/// The L1 origin block info for which we have an L2 block in the cache.
origin_infos: HashMap<u64, BlockInfo>,
/// A map from the l1 origin block number to its L2 tip.
tips: BTreeMap<u64, TipCursor>,
}

impl SyncCursor {
/// Instantiates a new `SyncCursor`.
pub const fn new(
l2_safe_head: L2BlockInfo,
l2_safe_head_header: Sealed<Header>,
l2_safe_head_output_root: B256,
) -> Self {
Self { l2_safe_head, l2_safe_head_header, l2_safe_head_output_root }
impl PipelineCursor {
/// Create a new cursor with the default cache capacity
pub fn new(channel_timeout: u64, origin: BlockInfo) -> Self {
// NOTE: capacity must be greater than the `channel_timeout` to allow
// for derivation to proceed through a deep reorg.
// Ref: <https://specs.optimism.io/protocol/derivation.html#timeouts>
let capacity = channel_timeout as usize + 5;

let mut origins = VecDeque::with_capacity(capacity);
origins.push_back(origin.number);
let mut origin_infos = HashMap::default();
origin_infos.insert(origin.number, origin);
Self { capacity, channel_timeout, origin, origins, origin_infos, tips: Default::default() }
}

/// Returns the current origin of the pipeline.
pub const fn origin(&self) -> BlockInfo {
self.origin
}

/// Returns the current L2 safe head.
pub const fn l2_safe_head(&self) -> &L2BlockInfo {
&self.l2_safe_head
pub fn l2_safe_head(&self) -> &L2BlockInfo {
&self.tip().l2_safe_head
}

/// Returns the header of the L2 safe head.
pub const fn l2_safe_head_header(&self) -> &Sealed<Header> {
&self.l2_safe_head_header
pub fn l2_safe_head_header(&self) -> &Sealed<Header> {
&self.tip().l2_safe_head_header
}

/// Returns the output root of the L2 safe head.
pub const fn l2_safe_head_output_root(&self) -> &B256 {
&self.l2_safe_head_output_root
pub fn l2_safe_head_output_root(&self) -> &B256 {
&self.tip().l2_safe_head_output_root
}

/// Get the current L2 tip
pub fn tip(&self) -> &TipCursor {
if let Some((_, l2_tip)) = self.tips.last_key_value() {
l2_tip
} else {
unreachable!("cursor must be initialized with one block before advancing")
}
}

/// Advance the cursor to the provided L2 block, given the corresponding L1 origin block.
///
/// If the cache is full, the oldest entry is evicted.
pub fn advance(&mut self, origin: BlockInfo, l2_tip_block: TipCursor) {
if self.tips.len() >= self.capacity {
let key = self.origins.pop_front().unwrap();
self.tips.remove(&key);
}

self.origin = origin;
self.origins.push_back(origin.number);
self.origin_infos.insert(origin.number, origin);
self.tips.insert(origin.number, l2_tip_block);
}

/// When the L1 undergoes a reorg, we need to reset the cursor to the fork block minus
/// the channel timeout, because an L2 block might have started to be derived at the
/// beginning of the channel.
///
/// Returns the (L2 block info, L1 origin block info) tuple for the new cursor state.
pub fn reset(&mut self, fork_block: u64) -> (TipCursor, BlockInfo) {
let channel_start = fork_block - self.channel_timeout;

match self.tips.get(&channel_start) {
Some(l2_safe_tip) => {
// The channel start block is in the cache, we can use it to reset the cursor.
(l2_safe_tip.clone(), self.origin_infos[&channel_start])
}
None => {
// If the channel start block is not in the cache, we reset the cursor
// to the closest known L1 block for which we have a corresponding L2 block.
let (last_l1_known_tip, l2_known_tip) = self
.tips
.range(..=channel_start)
.next_back()
.expect("walked back to genesis without finding anchor origin block");

(l2_known_tip.clone(), self.origin_infos[last_l1_known_tip])
}
}
}
}
5 changes: 4 additions & 1 deletion crates/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ mod core;
pub use core::Driver;

mod cursor;
pub use cursor::SyncCursor;
pub use cursor::PipelineCursor;

mod tip;
pub use tip::TipCursor;
Loading

0 comments on commit 2e943f4

Please sign in to comment.