diff --git a/src/server.rs b/src/server.rs index 3190e2f..e03c550 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,7 +13,8 @@ use op_alloy_rpc_types_engine::{ AsInnerPayload, OptimismExecutionPayloadEnvelopeV3, OptimismPayloadAttributes, }; use reth_rpc_layer::AuthClientService; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; +use tokio::{join, sync::RwLock}; use tracing::{error, info}; #[rpc(server, client, namespace = "engine")] @@ -41,9 +42,15 @@ pub trait EngineApi { } pub struct EthEngineApi> { + /// HTTP client for the L2 execution engine. l2_client: Arc>, + /// HTTP client for the builder execution engine. builder_client: Arc>, + /// Whether to sync the builder using the proposer. boost_sync: bool, + /// Ephemeral cache to map l2 process id to builder process id between + /// a FCU, and subsequent retrieval of the payload. + process_id_cache: Arc>>, } impl EthEngineApi { @@ -56,6 +63,7 @@ impl EthEngineApi { l2_client, builder_client, boost_sync, + process_id_cache: Arc::new(RwLock::new(HashMap::new())), } } } @@ -85,21 +93,44 @@ impl EngineApiServer for EthEngineApi { }; if should_send_to_builder { - // async call to builder to trigger payload building and sync + // async call to the builder, and l2_client to start the payload building process let builder = self.builder_client.clone(); let attr = payload_attributes.clone(); - tokio::spawn(async move { - builder.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| { - let payload_id_str = response.payload_id.map(|id| id.to_string()).unwrap_or_default(); - if response.is_invalid() { - error!(message = "builder rejected fork_choice_updated_v3 with attributes", "payload_id" = payload_id_str, "validation_error" = %response.payload_status.status); + let f = async { + let fcu_builder = builder.fork_choice_updated_v3(fork_choice_state, attr.clone()); + let fcu_l2 = self + .l2_client + .fork_choice_updated_v3(fork_choice_state, attr); + + join!(fcu_builder, fcu_l2) + }; + + // Always return `ErrorCode::InternalError` if the l2_client fails + match f.await { + (Ok(fcu_builder), Ok(fcu_l2)) => { + let payload_id_str = fcu_builder + .payload_id + .map(|id| id.to_string()) + .unwrap_or_default(); + if fcu_builder.is_invalid() { + error!(message = "builder rejected fork_choice_updated_v3 with attributes", "payload_id" = payload_id_str, "validation_error" = %fcu_builder.payload_status.status); } else { - info!(message = "called fork_choice_updated_v3 to builder with payload attributes", "payload_status" = %response.payload_status.status, "payload_id" = payload_id_str); + info!(message = "called fork_choice_updated_v3 to builder with payload attributes", "payload_status" = %fcu_builder.payload_status.status, "payload_id" = payload_id_str); + if let (Some(v), Some(p)) = (fcu_builder.payload_id, fcu_l2.payload_id) { + self.process_id_cache.write().await.insert(p, v); + } } - }).map_err(|e| { + return Ok(fcu_l2); + } + (Err(e), Ok(fcu_l2)) => { error!(message = "error calling fork_choice_updated_v3 to builder", "error" = %e, "head_block_hash" = %fork_choice_state.head_block_hash); - }) - }); + return Ok(fcu_l2); + } + _ => { + error!(message = "error calling fork_choice_updated_v3", "head_block_hash" = %fork_choice_state.head_block_hash); + return Err(ErrorCode::InternalError.into()); + } + } } else { info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash); } @@ -126,30 +157,34 @@ impl EngineApiServer for EthEngineApi { ) -> RpcResult { info!(message = "received get_payload_v3", "payload_id" = %payload_id); let l2_client_future = self.l2_client.get_payload_v3(payload_id); + // Grab the builder process id from the latest fcu. + let mut lock = self.process_id_cache.write().await; + let id_builder = lock.get(&payload_id).cloned().unwrap_or(payload_id); + lock.clear(); let builder_client_future = Box::pin(async { - let payload = self.builder_client.get_payload_v3(payload_id).await.map_err(|e| { - error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %payload_id); + let payload = self.builder_client.get_payload_v3(id_builder).await.map_err(|e| { + error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %id_builder); e })?; - info!(message = "received payload from builder", "payload_id" = %payload_id, "block_hash" = %payload.as_v1_payload().block_hash); + info!(message = "received payload from builder", "payload_id" = %id_builder, "block_hash" = %payload.as_v1_payload().block_hash); // Send the payload to the local execution engine with engine_newPayload to validate the block from the builder. // Otherwise, we do not want to risk the network to a halt since op-node will not be able to propose the block. // If validation fails, return the local block since that one has already been validated. let payload_status = self.l2_client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| { - error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %payload_id); + error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %id_builder); e })?; if payload_status.is_invalid() { - error!(message = "builder payload was not valid", "payload_status" = %payload_status.status, "payload_id" = %payload_id); + error!(message = "builder payload was not valid", "payload_status" = %payload_status.status, "payload_id" = %id_builder); Err(ClientError::Call(ErrorObject::owned( INVALID_REQUEST_CODE, "Builder payload was not valid", None::, ))) } else { - info!(message = "received payload status from local execution engine validating builder payload", "payload_id" = %payload_id); + info!(message = "received payload status from local execution engine validating builder payload", "payload_id" = %id_builder); Ok(payload) } });