From b8e476c6ee9fd495e68799ab667b160457da76b4 Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 27 Nov 2024 17:16:35 -0500 Subject: [PATCH] fix: use l2 rpc for block ref --- Cargo.lock | 1 + crates/driver/src/driver.rs | 1 + crates/engine/Cargo.toml | 3 ++ crates/engine/src/client.rs | 49 +++++++++++++++++++++------------ crates/engine/src/controller.rs | 34 +++++++++++++++++++---- crates/engine/src/traits.rs | 2 +- 6 files changed, 66 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2bd540..cb879e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2817,6 +2817,7 @@ dependencies = [ "alloy-rpc-types-eth", "alloy-transport-http", "async-trait", + "hilo-providers-alloy", "http-body-util", "kona-driver", "op-alloy-genesis", diff --git a/crates/driver/src/driver.rs b/crates/driver/src/driver.rs index 3bca675..8a6638a 100644 --- a/crates/driver/src/driver.rs +++ b/crates/driver/src/driver.rs @@ -80,6 +80,7 @@ where let pipeline = self.init_pipeline(cursor.clone()).await?; let exec = EngineController::new( self.cfg.l2_engine_url.clone(), + self.cfg.l2_rpc_url.clone(), self.cfg.jwt_secret, cursor.origin(), cursor.l2_safe_head().block_info.into(), diff --git a/crates/engine/Cargo.toml b/crates/engine/Cargo.toml index 5f82ae3..b5886c0 100644 --- a/crates/engine/Cargo.toml +++ b/crates/engine/Cargo.toml @@ -13,6 +13,9 @@ repository.workspace = true rust-version.workspace = true [dependencies] +# Hilo +hilo-providers-alloy.workspace = true + # Kona kona-driver.workspace = true diff --git a/crates/engine/src/client.rs b/crates/engine/src/client.rs index 3717e83..197f575 100644 --- a/crates/engine/src/client.rs +++ b/crates/engine/src/client.rs @@ -3,7 +3,7 @@ use alloy_eips::eip1898::BlockNumberOrTag; use alloy_network::AnyNetwork; use alloy_primitives::{Bytes, B256}; -use alloy_provider::RootProvider; +use alloy_provider::{ReqwestProvider, RootProvider}; use alloy_rpc_client::RpcClient; use alloy_rpc_types_engine::{ ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, JwtSecret, PayloadId, PayloadStatus, @@ -17,12 +17,16 @@ use alloy_transport_http::{ }; use async_trait::async_trait; use http_body_util::Full; -use op_alloy_protocol::L2BlockInfo; +use op_alloy_genesis::RollupConfig; +use op_alloy_protocol::{BatchValidationProvider, L2BlockInfo}; use op_alloy_provider::ext::engine::OpEngineApi; use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelopeV3, OpPayloadAttributes}; +use std::sync::Arc; use tower::ServiceBuilder; use url::Url; +use hilo_providers_alloy::AlloyL2ChainProvider; + use crate::{Engine, EngineApiError}; /// A Hyper HTTP client with a JWT authentication layer. @@ -31,24 +35,28 @@ type HyperAuthClient> = HyperClient, AnyNetwork>, + /// The L2 engine provider. + engine: RootProvider, AnyNetwork>, + /// The L2 chain provider. + rpc: AlloyL2ChainProvider, } impl EngineClient { /// Creates a new [`EngineClient`] from the provided [Url] and [JwtSecret]. - pub fn new_http(url: Url, jwt: JwtSecret) -> Self { + pub fn new_http(engine: Url, rpc: Url, cfg: Arc, jwt: JwtSecret) -> Self { let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); let auth_layer = AuthLayer::new(jwt); let service = ServiceBuilder::new().layer(auth_layer).service(hyper_client); let layer_transport = HyperClient::with_service(service); - let http_hyper = Http::with_client(layer_transport, url); + let http_hyper = Http::with_client(layer_transport, engine); let rpc_client = RpcClient::new(http_hyper, true); - let provider = RootProvider::<_, AnyNetwork>::new(rpc_client); + let engine = RootProvider::<_, AnyNetwork>::new(rpc_client); - Self { provider } + let rpc = ReqwestProvider::new_http(rpc); + let rpc = AlloyL2ChainProvider::new(rpc, cfg); + Self { engine, rpc } } } @@ -60,7 +68,7 @@ impl Engine for EngineClient { &self, payload_id: PayloadId, ) -> Result { - self.provider.get_payload_v3(payload_id).await.map_err(|_| EngineApiError::PayloadError) + self.engine.get_payload_v3(payload_id).await.map_err(|_| EngineApiError::PayloadError) } async fn forkchoice_update( @@ -68,7 +76,7 @@ impl Engine for EngineClient { state: ForkchoiceState, attr: Option, ) -> Result { - self.provider + self.engine .fork_choice_updated_v2(state, attr) .await .map_err(|_| EngineApiError::PayloadError) @@ -79,17 +87,24 @@ impl Engine for EngineClient { payload: ExecutionPayloadV3, parent_beacon_block_root: B256, ) -> Result { - self.provider + self.engine .new_payload_v3(payload, parent_beacon_block_root) .await .map_err(|_| EngineApiError::PayloadError) } - async fn l2_block_ref_by_label(&self, _: BlockNumberOrTag) -> Result { - // Convert the payload into an L2 block info. - // go impl uses an L2 client and fetches block by number, converting block to payload and - // payload to L2 block info. - todo!("implement l2_block_ref_by_label for the engine client") + async fn l2_block_ref_by_label( + &mut self, + numtag: BlockNumberOrTag, + ) -> Result { + let number = match numtag { + BlockNumberOrTag::Number(n) => n, + BlockNumberOrTag::Latest => { + self.rpc.latest_block_number().await.map_err(|_| EngineApiError::PayloadError)? + } + _ => return Err(EngineApiError::PayloadError), + }; + self.rpc.l2_block_info_by_number(number).await.map_err(|_| EngineApiError::PayloadError) } } @@ -97,6 +112,6 @@ impl std::ops::Deref for EngineClient { type Target = RootProvider, AnyNetwork>; fn deref(&self) -> &Self::Target { - &self.provider + &self.engine } } diff --git a/crates/engine/src/controller.rs b/crates/engine/src/controller.rs index 3778949..0dd8607 100644 --- a/crates/engine/src/controller.rs +++ b/crates/engine/src/controller.rs @@ -10,7 +10,7 @@ use kona_driver::Executor; use op_alloy_genesis::RollupConfig; use op_alloy_protocol::BlockInfo; use op_alloy_rpc_types_engine::OpPayloadAttributes; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use tokio::time::sleep; use url::Url; @@ -56,12 +56,18 @@ impl EngineController { /// Creates a new engine controller. pub fn new( l2_engine_url: Url, + l2_rpc_url: Url, jwt_secret: JwtSecret, finalized_head: BlockInfo, finalized_epoch: Epoch, config: &RollupConfig, ) -> Self { - let client = EngineClient::new_http(l2_engine_url.clone(), jwt_secret); + let client = EngineClient::new_http( + l2_engine_url.clone(), + l2_rpc_url, + Arc::new(config.clone()), + jwt_secret, + ); Self { blocktime: config.block_time, unsafe_head: finalized_head, @@ -93,15 +99,31 @@ impl Executor for EngineController { /// Waits for the engine to be ready. async fn wait_until_ready(&mut self) { let forkchoice = self.create_forkchoice_state(); - // Loop until the forkchoice is updated - while !self.client.forkchoice_update(forkchoice, None).await.is_ok_and(|u| u.is_valid()) { + while self.client.forkchoice_update(forkchoice, None).await.is_err() { sleep(Duration::from_secs(1)).await; } } /// Updates the safe head. - fn update_safe_head(&mut self, _: Sealed
) { - todo!() + fn update_safe_head(&mut self, header: Sealed
) { + if self.safe_head.number < header.number { + self.safe_head = BlockInfo { + number: header.number, + hash: header.hash_slow(), + timestamp: header.timestamp, + parent_hash: header.parent_hash, + }; + self.safe_epoch = self.safe_head.into(); + } + + if header.number > self.unsafe_head.number { + self.unsafe_head = BlockInfo { + number: header.number, + hash: header.hash_slow(), + timestamp: header.timestamp, + parent_hash: header.parent_hash, + }; + } } /// Execute the given payload attributes. diff --git a/crates/engine/src/traits.rs b/crates/engine/src/traits.rs index 61d7082..ae923b9 100644 --- a/crates/engine/src/traits.rs +++ b/crates/engine/src/traits.rs @@ -38,7 +38,7 @@ pub trait Engine { /// Returns the [L2BlockInfo] for the given label. async fn l2_block_ref_by_label( - &self, + &mut self, label: BlockNumberOrTag, ) -> Result; }