diff --git a/crates/driver/src/driver.rs b/crates/driver/src/driver.rs index b506178..3605a27 100644 --- a/crates/driver/src/driver.rs +++ b/crates/driver/src/driver.rs @@ -1,14 +1,32 @@ //! Contains the core `HiloDriver`. use alloy_transport::TransportResult; +use kona_derive::{errors::PipelineErrorKind, traits::SignalReceiver, types::ResetSignal}; +use kona_driver::{Driver, DriverPipeline, PipelineCursor, TipCursor}; use std::sync::Arc; use hilo_providers_local::{InMemoryChainProvider, InMemoryL2ChainProvider}; use crate::{ - Config, ConfigError, Context, HiloExecutorConstructor, HiloPipeline, StandaloneContext, + ChainNotification, Config, ConfigError, Context, HiloDerivationPipeline, HiloExecutor, + HiloExecutorConstructor, HiloPipeline, StandaloneContext, }; +/// A driver from [kona_driver] that uses hilo-types. +pub type KonaDriver = + Driver; + +/// An error that can happen when running the driver. +#[derive(Debug, thiserror::Error)] +pub enum DriverError { + /// An error thrown from a method on the [Config]. + #[error("config error: {0}")] + Config(#[from] ConfigError), + /// A pipeline reset failed. + #[error("pipeline reset error: {0}")] + PipelineReset(#[from] PipelineErrorKind), +} + /// HiloDriver is a wrapper around the `Driver` that /// provides methods of constructing the driver. #[derive(Debug)] @@ -18,7 +36,7 @@ pub struct HiloDriver { /// The driver config. pub cfg: Config, /// A constructor for execution. - pub exec: HiloExecutorConstructor, + pub exec: Option, } impl HiloDriver { @@ -35,47 +53,83 @@ where { /// Constructs a new [HiloDriver]. pub fn new(cfg: Config, ctx: C, exec: HiloExecutorConstructor) -> Self { - Self { cfg, ctx, exec } + Self { cfg, ctx, exec: Some(exec) } } - /// Initializes the pipeline. - pub async fn init_pipeline(&self) -> Result { - let cursor = self.cfg.tip_cursor().await?; + /// Initializes the [HiloPipeline]. + pub async fn init_pipeline(&self, cursor: PipelineCursor) -> Result { let chain_provider = InMemoryChainProvider::with_capacity(self.cfg.cache_size); let l2_chain_provider = InMemoryL2ChainProvider::with_capacity(self.cfg.cache_size); Ok(HiloPipeline::new( Arc::new(self.cfg.rollup_config.clone()), - cursor.clone(), + cursor, self.cfg.blob_provider().await?, chain_provider.clone(), l2_chain_provider, )) } + /// Initializes a [Driver] using the [HiloPipeline]. + pub async fn init_driver(&mut self) -> Result { + let cursor = self.cfg.tip_cursor().await?; + let pipeline = self.init_pipeline(cursor.clone()).await?; + let exec = self.exec.take().expect("Executor not set"); + Ok(Driver::new(cursor, exec, pipeline)) + } + + /// Handle a chain notification from the driver context. + async fn handle_notification( + &mut self, + notification: ChainNotification, + driver: &mut KonaDriver, + ) -> Result<(), DriverError> { + if let Some(reverted_chain) = notification.reverted_chain() { + // The reverted chain contains the list of blocks that were invalidated by the + // reorg. we need to reset the cursor to the last canonical block, which corresponds + // to the block before the reorg happened. + let fork_block = reverted_chain.fork_block_number(); + + // Find the last known L2 block that is still valid after the reorg, + // and reset the cursor and pipeline to it. + let (TipCursor { l2_safe_head, .. }, l1_origin) = driver.cursor.reset(fork_block); + + warn!("Resetting derivation pipeline to L2 block: {}", l2_safe_head.block_info.number); + let reset_signal = ResetSignal { l1_origin, l2_safe_head, ..Default::default() }; + if let Err(e) = driver.pipeline.signal(reset_signal.signal()).await { + return Err(DriverError::PipelineReset(e)); + } + } + + if let Some(new_chain) = notification.new_chain() { + let tip = new_chain.tip(); + self.ctx.send_processed_tip_event(tip); + } + + Ok(()) + } + /// Continuously run the [HiloDriver]. - pub async fn start(&mut self) -> Result<(), ConfigError> { + pub async fn start(&mut self) -> Result<(), DriverError> { // Step 1: Wait for the L2 origin block to be available self.wait_for_l2_genesis_l1_block().await; info!("L1 chain synced to the rollup genesis block"); - // Step 2: Initialize the rollup pipeline - let _ = self.init_pipeline().await?; - info!("Derivation pipeline initialized"); + // Step 2: Initialize the kona driver + let mut driver = self.init_driver().await?; + info!("Driver initialized"); // Step 3: Start the processing loop - // loop { - // // Try to advance the pipeline until there's no more data to process - // if self.step(&mut pipeline).await { - // continue; - // } - // - // // Handle any incoming notifications from the context - // if let Some(notification) = self.ctx.recv_notification().await { - // self.handle_notification(notification, &mut pipeline).await?; - // } - // } - - Ok(()) + loop { + tokio::select! { + Ok(_) = driver.pipeline.produce_payload(*driver.cursor.l2_safe_head()) => { + info!("Produced payload"); + // todo + } + Some(notification) = self.ctx.recv_notification() => { + self.handle_notification(notification, &mut driver).await?; + } + } + } } /// Wait for the L2 genesis' corresponding L1 block to be available in the L1 chain. diff --git a/crates/driver/src/lib.rs b/crates/driver/src/lib.rs index 9941898..d59f34b 100644 --- a/crates/driver/src/lib.rs +++ b/crates/driver/src/lib.rs @@ -13,10 +13,10 @@ mod executor; pub use executor::{HiloExecutor, HiloExecutorConstructor}; mod driver; -pub use driver::HiloDriver; +pub use driver::{DriverError, HiloDriver}; mod context; -pub use context::{Context, StandaloneContext}; +pub use context::{ChainNotification, Context, StandaloneContext}; mod pipeline; pub use pipeline::{ diff --git a/crates/node/src/errors.rs b/crates/node/src/errors.rs index f78e8bb..96d6f61 100644 --- a/crates/node/src/errors.rs +++ b/crates/node/src/errors.rs @@ -1,6 +1,7 @@ //! Node error types. use crate::ConfigError; +use hilo_driver::DriverError; /// A high-level `Node`error. #[derive(Debug, thiserror::Error)] @@ -14,6 +15,9 @@ pub enum NodeError { /// An error thrown by a [crate::Config] operation. #[error("config error: {0}")] Beacon(#[from] ConfigError), + /// An error thrown by the driver. + #[error("driver error: {0}")] + Driver(#[from] DriverError), } impl From for NodeError {