Skip to content

Commit

Permalink
Merge pull request #129 from anton-rs/rf/chore/rework-driver
Browse files Browse the repository at this point in the history
chore(driver): Rework
  • Loading branch information
refcell authored Nov 26, 2024
2 parents c4c7bcc + 2a7c3e1 commit 73eab8a
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 26 deletions.
102 changes: 78 additions & 24 deletions crates/driver/src/driver.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
//! Contains the core `HiloDriver`.
use alloy_transport::TransportResult;
use kona_derive::{errors::PipelineErrorKind, traits::SignalReceiver, types::ResetSignal};
use kona_driver::{Driver, DriverPipeline, PipelineCursor, TipCursor};
use std::sync::Arc;

use hilo_providers_local::{InMemoryChainProvider, InMemoryL2ChainProvider};

use crate::{
Config, ConfigError, Context, HiloExecutorConstructor, HiloPipeline, StandaloneContext,
ChainNotification, Config, ConfigError, Context, HiloDerivationPipeline, HiloExecutor,
HiloExecutorConstructor, HiloPipeline, StandaloneContext,
};

/// A driver from [kona_driver] that uses hilo-types.
pub type KonaDriver =
Driver<HiloExecutor, HiloExecutorConstructor, HiloPipeline, HiloDerivationPipeline>;

/// An error that can happen when running the driver.
#[derive(Debug, thiserror::Error)]
pub enum DriverError {
/// An error thrown from a method on the [Config].
#[error("config error: {0}")]
Config(#[from] ConfigError),
/// A pipeline reset failed.
#[error("pipeline reset error: {0}")]
PipelineReset(#[from] PipelineErrorKind),
}

/// HiloDriver is a wrapper around the `Driver` that
/// provides methods of constructing the driver.
#[derive(Debug)]
Expand All @@ -18,7 +36,7 @@ pub struct HiloDriver<C: Context> {
/// The driver config.
pub cfg: Config,
/// A constructor for execution.
pub exec: HiloExecutorConstructor,
pub exec: Option<HiloExecutorConstructor>,
}

impl HiloDriver<StandaloneContext> {
Expand All @@ -35,47 +53,83 @@ where
{
/// Constructs a new [HiloDriver].
pub fn new(cfg: Config, ctx: C, exec: HiloExecutorConstructor) -> Self {
Self { cfg, ctx, exec }
Self { cfg, ctx, exec: Some(exec) }
}

/// Initializes the pipeline.
pub async fn init_pipeline(&self) -> Result<HiloPipeline, ConfigError> {
let cursor = self.cfg.tip_cursor().await?;
/// Initializes the [HiloPipeline].
pub async fn init_pipeline(&self, cursor: PipelineCursor) -> Result<HiloPipeline, ConfigError> {
let chain_provider = InMemoryChainProvider::with_capacity(self.cfg.cache_size);
let l2_chain_provider = InMemoryL2ChainProvider::with_capacity(self.cfg.cache_size);
Ok(HiloPipeline::new(
Arc::new(self.cfg.rollup_config.clone()),
cursor.clone(),
cursor,
self.cfg.blob_provider().await?,
chain_provider.clone(),
l2_chain_provider,
))
}

/// Initializes a [Driver] using the [HiloPipeline].
pub async fn init_driver(&mut self) -> Result<KonaDriver, ConfigError> {
let cursor = self.cfg.tip_cursor().await?;
let pipeline = self.init_pipeline(cursor.clone()).await?;
let exec = self.exec.take().expect("Executor not set");
Ok(Driver::new(cursor, exec, pipeline))
}

/// Handle a chain notification from the driver context.
async fn handle_notification(
&mut self,
notification: ChainNotification,
driver: &mut KonaDriver,
) -> Result<(), DriverError> {
if let Some(reverted_chain) = notification.reverted_chain() {
// The reverted chain contains the list of blocks that were invalidated by the
// reorg. we need to reset the cursor to the last canonical block, which corresponds
// to the block before the reorg happened.
let fork_block = reverted_chain.fork_block_number();

// Find the last known L2 block that is still valid after the reorg,
// and reset the cursor and pipeline to it.
let (TipCursor { l2_safe_head, .. }, l1_origin) = driver.cursor.reset(fork_block);

warn!("Resetting derivation pipeline to L2 block: {}", l2_safe_head.block_info.number);
let reset_signal = ResetSignal { l1_origin, l2_safe_head, ..Default::default() };
if let Err(e) = driver.pipeline.signal(reset_signal.signal()).await {
return Err(DriverError::PipelineReset(e));
}
}

if let Some(new_chain) = notification.new_chain() {
let tip = new_chain.tip();
self.ctx.send_processed_tip_event(tip);
}

Ok(())
}

/// Continuously run the [HiloDriver].
pub async fn start(&mut self) -> Result<(), ConfigError> {
pub async fn start(&mut self) -> Result<(), DriverError> {
// Step 1: Wait for the L2 origin block to be available
self.wait_for_l2_genesis_l1_block().await;
info!("L1 chain synced to the rollup genesis block");

// Step 2: Initialize the rollup pipeline
let _ = self.init_pipeline().await?;
info!("Derivation pipeline initialized");
// Step 2: Initialize the kona driver
let mut driver = self.init_driver().await?;
info!("Driver initialized");

// Step 3: Start the processing loop
// loop {
// // Try to advance the pipeline until there's no more data to process
// if self.step(&mut pipeline).await {
// continue;
// }
//
// // Handle any incoming notifications from the context
// if let Some(notification) = self.ctx.recv_notification().await {
// self.handle_notification(notification, &mut pipeline).await?;
// }
// }

Ok(())
loop {
tokio::select! {
Ok(_) = driver.pipeline.produce_payload(*driver.cursor.l2_safe_head()) => {
info!("Produced payload");
// todo
}
Some(notification) = self.ctx.recv_notification() => {
self.handle_notification(notification, &mut driver).await?;
}
}
}
}

/// Wait for the L2 genesis' corresponding L1 block to be available in the L1 chain.
Expand Down
4 changes: 2 additions & 2 deletions crates/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ mod executor;
pub use executor::{HiloExecutor, HiloExecutorConstructor};

mod driver;
pub use driver::HiloDriver;
pub use driver::{DriverError, HiloDriver};

mod context;
pub use context::{Context, StandaloneContext};
pub use context::{ChainNotification, Context, StandaloneContext};

mod pipeline;
pub use pipeline::{
Expand Down
4 changes: 4 additions & 0 deletions crates/node/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Node error types.
use crate::ConfigError;
use hilo_driver::DriverError;

/// A high-level `Node`error.
#[derive(Debug, thiserror::Error)]
Expand All @@ -14,6 +15,9 @@ pub enum NodeError {
/// An error thrown by a [crate::Config] operation.
#[error("config error: {0}")]
Beacon(#[from] ConfigError),
/// An error thrown by the driver.
#[error("driver error: {0}")]
Driver(#[from] DriverError),
}

impl From<alloy_transport::TransportError> for NodeError {
Expand Down

0 comments on commit 73eab8a

Please sign in to comment.