From 4dcc291d47174e5ff61e1fb3b9c7d6625b853f30 Mon Sep 17 00:00:00 2001 From: Luis Herasme Date: Fri, 19 Jul 2024 16:16:05 -0400 Subject: [PATCH] Refactor LatestBlockManager --- ghost-crab/src/block_handler.rs | 12 ++++++++-- ghost-crab/src/latest_block_manager.rs | 31 +++++++++++++------------- ghost-crab/src/process_logs.rs | 12 ++++++++-- 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/ghost-crab/src/block_handler.rs b/ghost-crab/src/block_handler.rs index c689353..bf2d530 100644 --- a/ghost-crab/src/block_handler.rs +++ b/ghost-crab/src/block_handler.rs @@ -8,6 +8,7 @@ use alloy::transports::http::{Client, Http}; use async_trait::async_trait; use ghost_crab_common::config::ExecutionMode; use std::sync::Arc; +use std::time::Duration; pub struct BlockContext { pub provider: RootProvider>, @@ -38,10 +39,17 @@ pub async fn process_logs_block(BlockConfig { handler, provider, templates }: Bl let execution_mode = handler.execution_mode(); let mut current_block = start_block; - let mut latest_block_manager = LatestBlockManager::new(1000, provider.clone()); + let mut latest_block_manager = + LatestBlockManager::new(provider.clone(), Duration::from_secs(10)); loop { - let latest_block = latest_block_manager.get().await; + let latest_block = match latest_block_manager.get().await { + Ok(block_number) => block_number, + Err(error) => { + println!("Error fetching block number: {error}"); + continue; + } + }; if current_block >= latest_block { tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; diff --git a/ghost-crab/src/latest_block_manager.rs b/ghost-crab/src/latest_block_manager.rs index e66b69c..71c65f9 100644 --- a/ghost-crab/src/latest_block_manager.rs +++ b/ghost-crab/src/latest_block_manager.rs @@ -1,31 +1,32 @@ use alloy::providers::{Provider, RootProvider}; use alloy::transports::http::{Client, Http}; -use std::time::{SystemTime, UNIX_EPOCH}; +use alloy::transports::TransportError; +use std::time::{Duration, Instant}; pub struct LatestBlockManager { - value: u64, - cache_duration_ms: u128, - last_fetch_ms: u128, provider: RootProvider>, + cache_duration: Duration, + block_number: Option, + last_fetch: Instant, } impl LatestBlockManager { - pub fn new(cache_duration_ms: u128, provider: RootProvider>) -> Self { - Self { value: 0, cache_duration_ms, last_fetch_ms: 0, provider } + pub fn new(provider: RootProvider>, cache_duration: Duration) -> Self { + Self { provider, cache_duration, block_number: None, last_fetch: Instant::now() } } - pub async fn get(&mut self) -> u64 { - let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis(); - - if (now_ms - self.last_fetch_ms) < self.cache_duration_ms { - return self.value; + pub async fn get(&mut self) -> Result { + if let Some(block_number) = self.block_number { + if self.last_fetch.elapsed() < self.cache_duration { + return Ok(block_number); + } } - let result = self.provider.get_block_number().await.unwrap(); - self.value = result; + let block_number = self.provider.get_block_number().await?; - self.last_fetch_ms = now_ms; + self.block_number = Some(block_number); + self.last_fetch = Instant::now(); - result + Ok(block_number) } } diff --git a/ghost-crab/src/process_logs.rs b/ghost-crab/src/process_logs.rs index 628db8f..321754f 100644 --- a/ghost-crab/src/process_logs.rs +++ b/ghost-crab/src/process_logs.rs @@ -4,6 +4,7 @@ use alloy::primitives::Address; use alloy::providers::Provider; use alloy::rpc::types::eth::Filter; use ghost_crab_common::config::ExecutionMode; +use std::time::Duration; pub async fn process_logs( HandlerConfig { start_block, step, address, handler, provider, templates }: HandlerConfig, @@ -13,11 +14,18 @@ pub async fn process_logs( let address = address.parse::
().unwrap(); let execution_mode = handler.execution_mode(); - let mut block_manager = LatestBlockManager::new(1000, provider.clone()); + let mut latest_block_manager = + LatestBlockManager::new(provider.clone(), Duration::from_secs(10)); loop { let mut end_block = current_block + step; - let latest_block = block_manager.get().await; + let latest_block = match latest_block_manager.get().await { + Ok(block_number) => block_number, + Err(error) => { + println!("Error fetching block number: {error}"); + continue; + } + }; if end_block > latest_block { end_block = latest_block;