diff --git a/src/config.rs b/src/config.rs index 6966e4c..fbebd07 100644 --- a/src/config.rs +++ b/src/config.rs @@ -53,6 +53,12 @@ pub struct Config { /// the behaviour of a full execution engine. #[arg(long, value_name = "MILLIS", default_value = "2000")] pub new_payload_wait_millis: u128, + /// Maximum age of a payload that will trigger a wait on `newPayload` + /// + /// Payloads older than this age receive an instant SYNCING response. See docs for + /// `--new-payload-wait-millis` for the purpose of this wait. + #[arg(long, value_name = "NUM_BLOCKS", default_value = "64")] + pub new_payload_wait_cutoff: u64, /// Maximum time that a consensus node should wait for a forkchoiceUpdated response from the /// cache. /// diff --git a/src/multiplexer.rs b/src/multiplexer.rs index 728ff75..13a8081 100644 --- a/src/multiplexer.rs +++ b/src/multiplexer.rs @@ -19,7 +19,7 @@ use tokio::sync::Mutex; pub struct Multiplexer { pub engine: Engine, pub fcu_cache: Mutex>, - pub new_payload_cache: Mutex>, + pub new_payload_cache: Mutex>, pub justified_block_cache: Mutex>, pub finalized_block_cache: Mutex>, pub payload_builder: Mutex>, @@ -30,6 +30,11 @@ pub struct Multiplexer { _phantom: PhantomData, } +pub struct NewPayloadCacheEntry { + pub status: JsonPayloadStatusV1, + pub block_number: u64, +} + impl Multiplexer { pub fn new(config: Config, executor: TaskExecutor, log: Logger) -> Result { let engine: Engine = { diff --git a/src/new_payload.rs b/src/new_payload.rs index dac338d..34d8b25 100644 --- a/src/new_payload.rs +++ b/src/new_payload.rs @@ -1,6 +1,6 @@ //! Handler for new payload. use crate::{ - multiplexer::Multiplexer, + multiplexer::{Multiplexer, NewPayloadCacheEntry}, types::{ ErrorResponse, JsonExecutionPayload, JsonPayloadStatusV1, JsonPayloadStatusV1Status, JsonValue, QuantityU64, Request, Response, @@ -20,6 +20,7 @@ impl Multiplexer { // TODO: verify block hash let block_hash = *json_execution_payload.block_hash(); + let block_number = *json_execution_payload.block_number(); let status = if let Some(status) = self.get_cached_payload_status(&block_hash, true).await { status @@ -31,10 +32,13 @@ impl Multiplexer { let json_status = JsonPayloadStatusV1::from(status); // Update newPayload cache. - self.new_payload_cache - .lock() - .await - .put(block_hash, json_status.clone()); + self.new_payload_cache.lock().await.put( + block_hash, + NewPayloadCacheEntry { + status: json_status.clone(), + block_number, + }, + ); // Update payload builder. self.register_canonical_payload(&execution_payload, json_status.status) @@ -63,15 +67,19 @@ impl Multiplexer { // TODO: verify block hash let block_hash = execution_payload.block_hash(); - - // Wait a short time for a definite response from the EL. Chances are it's busy processing - // the payload sent by the controlling BN. - let start = Instant::now(); - while start.elapsed().as_millis() < self.config.new_payload_wait_millis { - if let Some(status) = self.get_cached_payload_status(block_hash, true).await { - return Response::new(id, status); + let block_number = *execution_payload.block_number(); + + // If this is a *recent* payload, wait a short time for a definite response from the EL. + // Chances are it's busy processing the payload sent by the controlling BN. + let is_recent = self.is_recent_payload(block_number).await; + if is_recent { + let start = Instant::now(); + while start.elapsed().as_millis() < self.config.new_payload_wait_millis { + if let Some(status) = self.get_cached_payload_status(block_hash, true).await { + return Response::new(id, status); + } + tokio::time::sleep(Duration::from_millis(50)).await; } - tokio::time::sleep(Duration::from_millis(50)).await; } // Try again to get any status from the cache, or fall back on a SYNCING response. @@ -81,7 +89,11 @@ impl Multiplexer { } status } else { - tracing::info!("sending SYNCING response on newPayload"); + if is_recent { + tracing::info!("sending SYNCING response on recent newPayload"); + } else { + tracing::info!("sending instant SYNCING response for old newPayload"); + } // Synthetic syncing response. JsonPayloadStatusV1 { status: JsonPayloadStatusV1Status::Syncing, @@ -145,11 +157,32 @@ impl Multiplexer { definite_only: bool, ) -> Option { let mut cache = self.new_payload_cache.lock().await; - if let Some(existing_status) = cache.get(execution_block_hash) { - if !definite_only || Self::is_definite(existing_status) { - return Some(existing_status.clone()); + if let Some(existing) = cache.get(execution_block_hash) { + if !definite_only || Self::is_definite(&existing.status) { + return Some(existing.status.clone()); } } None } + + /// Return the highest `block_number` of any cached payload, or 0 if none is cached. + /// + /// This is useful for approximately time-based cutoffs & heuristics. + pub async fn highest_cached_payload_number(&self) -> u64 { + let cache = self.new_payload_cache.lock().await; + cache + .iter() + .map(|(_, entry)| entry.block_number) + .max() + .unwrap_or(0) + } + + /// Check if the given block number is recent based on the `highest_cached_payload_number`. + pub async fn is_recent_payload(&self, block_number: u64) -> bool { + let cutoff = self + .highest_cached_payload_number() + .await + .saturating_sub(self.config.new_payload_wait_cutoff); + block_number >= cutoff + } }