From 86a545e6f5768ada7f5274c57218b37c82c4e706 Mon Sep 17 00:00:00 2001 From: 0xOsiris Date: Thu, 10 Oct 2024 08:56:44 -0700 Subject: [PATCH 1/3] feat: map l2 -> builder process ids --- src/server.rs | 59 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/src/server.rs b/src/server.rs index 3190e2f..c10ce47 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,7 +13,8 @@ use op_alloy_rpc_types_engine::{ AsInnerPayload, OptimismExecutionPayloadEnvelopeV3, OptimismPayloadAttributes, }; use reth_rpc_layer::AuthClientService; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; +use tokio::{join, sync::RwLock}; use tracing::{error, info}; #[rpc(server, client, namespace = "engine")] @@ -41,9 +42,14 @@ pub trait EngineApi { } pub struct EthEngineApi> { + /// HTTP client for the L2 execution engine. l2_client: Arc>, + /// HTTP client for the builder execution engine. builder_client: Arc>, + /// Whether to sync the builder using the proposer. boost_sync: bool, + /// Ephemeral cache to map l2 process id to builder process id. + process_id_cache: Arc>>, } impl EthEngineApi { @@ -56,6 +62,7 @@ impl EthEngineApi { l2_client, builder_client, boost_sync, + process_id_cache: Arc::new(RwLock::new(HashMap::new())), } } } @@ -88,18 +95,32 @@ impl EngineApiServer for EthEngineApi { // async call to builder to trigger payload building and sync let builder = self.builder_client.clone(); let attr = payload_attributes.clone(); - tokio::spawn(async move { - builder.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| { - let payload_id_str = response.payload_id.map(|id| id.to_string()).unwrap_or_default(); - if response.is_invalid() { - error!(message = "builder rejected fork_choice_updated_v3 with attributes", "payload_id" = payload_id_str, "validation_error" = %response.payload_status.status); - } else { - info!(message = "called fork_choice_updated_v3 to builder with payload attributes", "payload_status" = %response.payload_status.status, "payload_id" = payload_id_str); + let f = async { + let fcu_builder = builder.fork_choice_updated_v3(fork_choice_state, attr.clone()); + let fcu_l2 = self + .l2_client + .fork_choice_updated_v3(fork_choice_state, attr); + + join!(fcu_builder, fcu_l2) + }; + + // Always return `ErrorCode::InternalError` if the l2_client fails + match f.await { + (Ok(fcu_builder), Ok(fcu_l2)) => { + if let (Some(v), Some(p)) = (fcu_builder.payload_id, fcu_l2.payload_id) { + self.process_id_cache.write().await.insert(p, v); } - }).map_err(|e| { + return Ok(fcu_l2); + } + (Err(e), Ok(fcu_l2)) => { error!(message = "error calling fork_choice_updated_v3 to builder", "error" = %e, "head_block_hash" = %fork_choice_state.head_block_hash); - }) - }); + return Ok(fcu_l2); + } + _ => { + error!(message = "error calling fork_choice_updated_v3", "head_block_hash" = %fork_choice_state.head_block_hash); + return Err(ErrorCode::InternalError.into()); + } + } } else { info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash); } @@ -126,30 +147,34 @@ impl EngineApiServer for EthEngineApi { ) -> RpcResult { info!(message = "received get_payload_v3", "payload_id" = %payload_id); let l2_client_future = self.l2_client.get_payload_v3(payload_id); + // Grab the builder process id from the latest fcu. + let mut lock = self.process_id_cache.write().await; + let id_builder = lock.get(&payload_id).cloned().unwrap_or(payload_id); + lock.clear(); let builder_client_future = Box::pin(async { - let payload = self.builder_client.get_payload_v3(payload_id).await.map_err(|e| { - error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %payload_id); + let payload = self.builder_client.get_payload_v3(id_builder).await.map_err(|e| { + error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %id_builder); e })?; - info!(message = "received payload from builder", "payload_id" = %payload_id, "block_hash" = %payload.as_v1_payload().block_hash); + info!(message = "received payload from builder", "payload_id" = %id_builder, "block_hash" = %payload.as_v1_payload().block_hash); // Send the payload to the local execution engine with engine_newPayload to validate the block from the builder. // Otherwise, we do not want to risk the network to a halt since op-node will not be able to propose the block. // If validation fails, return the local block since that one has already been validated. let payload_status = self.l2_client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| { - error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %payload_id); + error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %id_builder); e })?; if payload_status.is_invalid() { - error!(message = "builder payload was not valid", "payload_status" = %payload_status.status, "payload_id" = %payload_id); + error!(message = "builder payload was not valid", "payload_status" = %payload_status.status, "payload_id" = %id_builder); Err(ClientError::Call(ErrorObject::owned( INVALID_REQUEST_CODE, "Builder payload was not valid", None::, ))) } else { - info!(message = "received payload status from local execution engine validating builder payload", "payload_id" = %payload_id); + info!(message = "received payload status from local execution engine validating builder payload", "payload_id" = %id_builder); Ok(payload) } }); From dac7b6b3f26c6dd1469959f00f197bf0f9bcd91c Mon Sep 17 00:00:00 2001 From: 0xOsiris Date: Thu, 10 Oct 2024 09:12:19 -0700 Subject: [PATCH 2/3] chore: tracing verbosity --- src/server.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/server.rs b/src/server.rs index c10ce47..7d70cd2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -48,7 +48,8 @@ pub struct EthEngineApi> { builder_client: Arc>, /// Whether to sync the builder using the proposer. boost_sync: bool, - /// Ephemeral cache to map l2 process id to builder process id. + /// Ephemeral cache to map l2 process id to builder process id between + /// a FCU, and subsequent retrieval of the payload. process_id_cache: Arc>>, } @@ -107,8 +108,17 @@ impl EngineApiServer for EthEngineApi { // Always return `ErrorCode::InternalError` if the l2_client fails match f.await { (Ok(fcu_builder), Ok(fcu_l2)) => { - if let (Some(v), Some(p)) = (fcu_builder.payload_id, fcu_l2.payload_id) { - self.process_id_cache.write().await.insert(p, v); + let payload_id_str = fcu_builder + .payload_id + .map(|id| id.to_string()) + .unwrap_or_default(); + if fcu_builder.is_invalid() { + error!(message = "builder rejected fork_choice_updated_v3 with attributes", "payload_id" = payload_id_str, "validation_error" = %fcu_builder.payload_status.status); + } else { + info!(message = "called fork_choice_updated_v3 to builder with payload attributes", "payload_status" = %fcu_builder.payload_status.status, "payload_id" = payload_id_str); + if let (Some(v), Some(p)) = (fcu_builder.payload_id, fcu_l2.payload_id) { + self.process_id_cache.write().await.insert(p, v); + } } return Ok(fcu_l2); } From 403b01851875b488f2f2d57b8b27692dd45da277 Mon Sep 17 00:00:00 2001 From: 0xOsiris Date: Thu, 10 Oct 2024 09:23:17 -0700 Subject: [PATCH 3/3] chore: make comments reflective of new logic --- src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.rs b/src/server.rs index 7d70cd2..e03c550 100644 --- a/src/server.rs +++ b/src/server.rs @@ -93,7 +93,7 @@ impl EngineApiServer for EthEngineApi { }; if should_send_to_builder { - // async call to builder to trigger payload building and sync + // async call to the builder, and l2_client to start the payload building process let builder = self.builder_client.clone(); let attr = payload_attributes.clone(); let f = async {