From be4e33cfcca691586964d126a5856424f1dced53 Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:25:03 +0100 Subject: [PATCH] fix(p2p): cache responses to serve without roundtrip to db (#2352) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Linked Issues/PRs - the intermittent outages on testnet ## Description ~When we request transactions for a given block range, we shouldn't only keep using the same peer causing pressure on it. we should pick a random one with the same height and try to get the transactions from that instead.~ This PR caches p2p responses (ttl 10 seconds by default) and serves requests from cache falling back to db for others. ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else? --------- Co-authored-by: Mårten Blankfors Co-authored-by: Rafał Chabowski <88321181+rafal-ch@users.noreply.github.com> Co-authored-by: green --- CHANGELOG.md | 4 + Cargo.lock | 13 + crates/metrics/src/p2p_metrics.rs | 26 ++ crates/services/p2p/Cargo.toml | 1 + crates/services/p2p/src/cached_view.rs | 373 +++++++++++++++++++++++++ crates/services/p2p/src/lib.rs | 2 + crates/services/p2p/src/service.rs | 96 +++++-- 7 files changed, 489 insertions(+), 26 deletions(-) create mode 100644 crates/services/p2p/src/cached_view.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index bf175fbf613..d9072f0b3b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2310](https://github.com/FuelLabs/fuel-core/pull/2310): The `metrics` command-line parameter has been replaced with `disable-metrics`. Metrics are now enabled by default, with the option to disable them entirely or on a per-module basis. - [2341](https://github.com/FuelLabs/fuel-core/pull/2341): The maximum number of processed coins from the `coins_to_spend` query is limited to `max_inputs`. +### Fixed + +- [2352](https://github.com/FuelLabs/fuel-core/pull/2352): Cache p2p responses to serve without roundtrip to db. + ## [Version 0.39.0] ### Added diff --git a/Cargo.lock b/Cargo.lock index 5cd13642f4d..a3d5b110deb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3548,6 +3548,7 @@ dependencies = [ "prometheus-client", "quick-protobuf", "quick-protobuf-codec 0.3.1", + "quick_cache", "rand", "rayon", "serde", @@ -7548,6 +7549,18 @@ dependencies = [ "unsigned-varint 0.8.0", ] +[[package]] +name = "quick_cache" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d7c94f8935a9df96bb6380e8592c70edf497a643f94bd23b2f76b399385dbf4" +dependencies = [ + "ahash", + "equivalent", + "hashbrown 0.14.5", + "parking_lot", +] + [[package]] name = "quinn" version = "0.11.6" diff --git a/crates/metrics/src/p2p_metrics.rs b/crates/metrics/src/p2p_metrics.rs index a139cfb6ee3..b535bcccb4b 100644 --- a/crates/metrics/src/p2p_metrics.rs +++ b/crates/metrics/src/p2p_metrics.rs @@ -8,16 +8,22 @@ use std::sync::OnceLock; pub struct P2PMetrics { pub unique_peers: Counter, pub blocks_requested: Gauge, + pub p2p_req_res_cache_hits: Counter, + pub p2p_req_res_cache_misses: Counter, } impl P2PMetrics { fn new() -> Self { let unique_peers = Counter::default(); let blocks_requested = Gauge::default(); + let p2p_req_res_cache_hits = Counter::default(); + let p2p_req_res_cache_misses = Counter::default(); let metrics = P2PMetrics { unique_peers, blocks_requested, + p2p_req_res_cache_hits, + p2p_req_res_cache_misses, }; let mut registry = global_registry().registry.lock(); @@ -33,6 +39,18 @@ impl P2PMetrics { metrics.blocks_requested.clone() ); + registry.register( + "P2p_Req_Res_Cache_Hits", + "A Counter which keeps track of the number of cache hits for the p2p req/res protocol", + metrics.p2p_req_res_cache_hits.clone() + ); + + registry.register( + "P2p_Req_Res_Cache_Misses", + "A Counter which keeps track of the number of cache misses for the p2p req/res protocol", + metrics.p2p_req_res_cache_misses.clone() + ); + metrics } } @@ -50,3 +68,11 @@ pub fn increment_unique_peers() { pub fn set_blocks_requested(count: usize) { p2p_metrics().blocks_requested.set(count as i64); } + +pub fn increment_p2p_req_res_cache_hits() { + p2p_metrics().p2p_req_res_cache_hits.inc(); +} + +pub fn increment_p2p_req_res_cache_misses() { + p2p_metrics().p2p_req_res_cache_misses.inc(); +} diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index 09662d26311..158f16f836f 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -43,6 +43,7 @@ postcard = { workspace = true, features = ["use-std"] } prometheus-client = { workspace = true } quick-protobuf = "0.8.1" quick-protobuf-codec = "0.3.0" +quick_cache = "0.6.9" rand = { workspace = true } rayon = { workspace = true } serde = { workspace = true, features = ["derive"] } diff --git a/crates/services/p2p/src/cached_view.rs b/crates/services/p2p/src/cached_view.rs new file mode 100644 index 00000000000..1bef869ca01 --- /dev/null +++ b/crates/services/p2p/src/cached_view.rs @@ -0,0 +1,373 @@ +use crate::ports::P2pDb; +use fuel_core_metrics::p2p_metrics::{ + increment_p2p_req_res_cache_hits, + increment_p2p_req_res_cache_misses, +}; +use fuel_core_storage::Result as StorageResult; +use fuel_core_types::{ + blockchain::SealedBlockHeader, + services::p2p::Transactions, +}; +use quick_cache::sync::Cache; +use std::ops::Range; + +type BlockHeight = u32; + +pub(super) struct CachedView { + sealed_block_headers: Cache, + transactions_on_blocks: Cache, + metrics: bool, +} + +impl CachedView { + pub fn new(capacity: usize, metrics: bool) -> Self { + Self { + sealed_block_headers: Cache::new(capacity), + transactions_on_blocks: Cache::new(capacity), + metrics, + } + } + + fn update_metrics(&self, update_fn: U) + where + U: FnOnce(), + { + if self.metrics { + update_fn() + } + } + + fn get_from_cache_or_db( + &self, + cache: &Cache, + view: &V, + range: Range, + fetch_fn: F, + ) -> StorageResult>> + where + V: P2pDb, + T: Clone, + F: Fn(&V, Range) -> StorageResult>>, + { + let mut items = Vec::new(); + let mut missing_start = None; + + for height in range.clone() { + if let Some(item) = cache.get(&height) { + items.push(item); + } else { + missing_start = Some(height); + break; + } + } + + let Some(missing_start) = missing_start else { + self.update_metrics(increment_p2p_req_res_cache_hits); + return Ok(Some(items)); + }; + + let missing_range = missing_start..range.end; + + self.update_metrics(increment_p2p_req_res_cache_misses); + if let Some(fetched_items) = fetch_fn(view, missing_range.clone())? { + for (height, item) in missing_range.zip(fetched_items.into_iter()) { + cache.insert(height, item.clone()); + items.push(item); + } + + return Ok(Some(items)); + } + + Ok(None) + } + + pub(crate) fn get_sealed_headers( + &self, + view: &V, + block_height_range: Range, + ) -> StorageResult>> + where + V: P2pDb, + { + self.get_from_cache_or_db( + &self.sealed_block_headers, + view, + block_height_range, + V::get_sealed_headers, + ) + } + + pub(crate) fn get_transactions( + &self, + view: &V, + block_height_range: Range, + ) -> StorageResult>> + where + V: P2pDb, + { + self.get_from_cache_or_db( + &self.transactions_on_blocks, + view, + block_height_range, + V::get_transactions, + ) + } +} + +#[allow(non_snake_case)] +#[cfg(test)] +mod tests { + use super::*; + use fuel_core_types::blockchain::consensus::Genesis; + use std::sync::Arc; + use tokio::sync::Notify; + + struct FakeDb { + sender: Arc, + values: bool, + } + + #[inline] + fn default_sealed_headers(range: Range) -> Vec { + vec![SealedBlockHeader::default(); range.len()] + } + + #[inline] + fn default_transactions(range: Range) -> Vec { + vec![Transactions::default(); range.len()] + } + + impl P2pDb for FakeDb { + fn get_sealed_headers( + &self, + range: Range, + ) -> StorageResult>> { + self.sender.notify_waiters(); + if !self.values { + return Ok(None); + } + let headers = default_sealed_headers(range); + Ok(Some(headers)) + } + + fn get_transactions( + &self, + range: Range, + ) -> StorageResult>> { + self.sender.notify_waiters(); + if !self.values { + return Ok(None); + } + let transactions = default_transactions(range); + Ok(Some(transactions)) + } + + fn get_genesis(&self) -> StorageResult { + self.sender.notify_waiters(); + Ok(Genesis::default()) + } + } + + #[tokio::test] + async fn cached_view__get_sealed_headers__cache_hit() { + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(10, false); + + let block_height_range = 0..100; + let sealed_headers = default_sealed_headers(block_height_range.clone()); + for (block_height, header) in + block_height_range.clone().zip(sealed_headers.iter()) + { + cached_view + .sealed_block_headers + .insert(block_height, header.clone()); + } + + let result = cached_view + .get_sealed_headers(&db, block_height_range.clone()) + .unwrap(); + assert_eq!(result, Some(sealed_headers)); + } + + #[tokio::test] + async fn cached_view__get_sealed_headers__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(10, false); + + // when + let notified = sender.notified(); + let block_height_range = 0..100; + let sealed_headers = default_sealed_headers(block_height_range.clone()); + let result = cached_view + .get_sealed_headers(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert_eq!(result, Some(sealed_headers)); + } + + #[tokio::test] + async fn cached_view__when_response_is_none__get_sealed_headers__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: false, + }; + let cached_view = CachedView::new(10, false); + + // when + let notified = sender.notified(); + let block_height_range = 0..100; + let result = cached_view + .get_sealed_headers(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert!(result.is_none()); + } + + #[tokio::test] + async fn cached_view__get_transactions__cache_hit() { + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(10, false); + + let block_height_range = 0..100; + let transactions = default_transactions(block_height_range.clone()); + + for (block_height, transactions) in + block_height_range.clone().zip(transactions.iter()) + { + cached_view + .transactions_on_blocks + .insert(block_height, transactions.clone()); + } + + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + for (expected, actual) in transactions.iter().zip(result.unwrap().iter()) { + assert_eq!(expected.0, actual.0); + } + } + + #[tokio::test] + async fn cached_view__get_transactions__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(10, false); + + // when + let notified = sender.notified(); + let block_height_range = 0..100; + let transactions = default_transactions(block_height_range.clone()); + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + for (expected, actual) in transactions.iter().zip(result.unwrap().iter()) { + assert_eq!(expected.0, actual.0); + } + } + + #[tokio::test] + async fn cached_view__when_response_is_none__get_transactions__cache_miss() { + // given + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: false, + }; + let cached_view = CachedView::new(10, false); + + // when + let notified = sender.notified(); + let block_height_range = 0..100; + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert!(result.is_none()); + } + + #[tokio::test] + async fn cached_view__when_lru_is_full_it_makes_call_to_db() { + // given + let cache_capacity = 10; + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(cache_capacity, false); + + // when + let block_height_range = 0..u32::try_from(cache_capacity).unwrap() + 1; + let _ = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + let notified = sender.notified(); + let result = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + // then + notified.await; + assert!(result.is_some()); + } + + #[tokio::test] + async fn cached_view__when_lru_is_partially_full_it_does_not_make_call_to_db() { + // given + let cache_capacity = 100; + let sender = Arc::new(Notify::new()); + let db = FakeDb { + sender: sender.clone(), + values: true, + }; + let cached_view = CachedView::new(cache_capacity, false); + + // when + let block_height_range = 0..10; + let _ = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + let notified = sender.notified(); + let _ = cached_view + .get_transactions(&db, block_height_range.clone()) + .unwrap(); + + // then + assert!( + tokio::time::timeout(std::time::Duration::from_millis(50), notified) + .await + .is_err() + ) + } +} diff --git a/crates/services/p2p/src/lib.rs b/crates/services/p2p/src/lib.rs index fbd82c25453..375eeb7351c 100644 --- a/crates/services/p2p/src/lib.rs +++ b/crates/services/p2p/src/lib.rs @@ -16,6 +16,8 @@ pub mod request_response; pub mod service; mod utils; +mod cached_view; + pub use gossipsub::config as gossipsub_config; pub use heartbeat::Config; diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 3a33ef5f4e1..9e59f8a8f74 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1,4 +1,5 @@ use crate::{ + cached_view::CachedView, codecs::postcard::PostcardCodec, config::{ Config, @@ -434,6 +435,8 @@ pub struct Task { heartbeat_max_time_since_last: Duration, next_check_time: Instant, heartbeat_peer_reputation_config: HeartbeatPeerReputationConfig, + // cached view + cached_view: Arc, } #[derive(Default, Clone)] @@ -469,7 +472,12 @@ impl UninitializedTask { } } -impl Task { +impl Task +where + P: TaskP2PService, + V: AtomicView, + B: Broadcast, +{ fn peer_heartbeat_reputation_checks(&self) -> anyhow::Result<()> { for (peer_id, peer_info) in self.p2p_service.get_all_peer_info() { if peer_info.heartbeat_data.duration_since_last_heartbeat() @@ -518,6 +526,7 @@ where V: AtomicView + 'static, V::LatestView: P2pDb, T: TxPool + 'static, + B: Send, { fn update_metrics(&self, update_fn: U) where @@ -557,8 +566,9 @@ where max_len: usize, ) -> anyhow::Result<()> where - DbLookUpFn: - Fn(&V::LatestView, Range) -> anyhow::Result> + Send + 'static, + DbLookUpFn: Fn(&V::LatestView, &Arc, Range) -> anyhow::Result> + + Send + + 'static, ResponseSenderFn: Fn(Result) -> V2ResponseMessage + Send + 'static, TaskRequestFn: Fn(Result, InboundRequestId) -> TaskRequest @@ -587,20 +597,23 @@ where } let view = self.view_provider.latest_view()?; - let result = self.db_heavy_task_processor.try_spawn(move || { - if instant.elapsed() > timeout { - tracing::warn!("Request timed out"); - return; - } + let result = self.db_heavy_task_processor.try_spawn({ + let cached_view = self.cached_view.clone(); + move || { + if instant.elapsed() > timeout { + tracing::warn!("Request timed out"); + return; + } - let response = db_lookup(&view, range.clone()) - .ok() - .flatten() - .ok_or(ResponseMessageErrorCode::Timeout); + let response = db_lookup(&view, &cached_view, range.clone()) + .ok() + .flatten() + .ok_or(ResponseMessageErrorCode::Timeout); - let _ = response_channel - .try_send(task_request(response, request_id)) - .trace_err("Failed to send response to the request channel"); + let _ = response_channel + .try_send(task_request(response, request_id)) + .trace_err("Failed to send response to the request channel"); + } }); if result.is_err() { @@ -622,7 +635,11 @@ where range, request_id, V2ResponseMessage::Transactions, - |view, range| view.get_transactions(range).map_err(anyhow::Error::from), + |view, cached_view, range| { + cached_view + .get_transactions(view, range) + .map_err(anyhow::Error::from) + }, |response, request_id| TaskRequest::DatabaseTransactionsLookUp { response, request_id, @@ -640,7 +657,11 @@ where range, request_id, V2ResponseMessage::SealedHeaders, - |view, range| view.get_sealed_headers(range).map_err(anyhow::Error::from), + |view, cached_view, range| { + cached_view + .get_sealed_headers(view, range) + .map_err(anyhow::Error::from) + }, |response, request_id| TaskRequest::DatabaseHeaderLookUp { response, request_id, @@ -787,6 +808,7 @@ where heartbeat_max_time_since_last, database_read_threads, tx_pool_threads, + metrics, .. } = config; @@ -839,6 +861,7 @@ where heartbeat_max_time_since_last, next_check_time, heartbeat_peer_reputation_config, + cached_view: Arc::new(CachedView::new(614 * 10, metrics)), }; Ok(task) } @@ -1024,7 +1047,7 @@ pub struct SharedState { reserved_peers_broadcast: broadcast::Sender, /// Used for communicating with the `Task`. request_sender: mpsc::Sender, - /// Sender of p2p blopck height data + /// Sender of p2p block height data block_height_broadcast: broadcast::Sender, /// Max txs per request max_txs_per_request: usize, @@ -1686,17 +1709,20 @@ pub mod tests { heartbeat_max_time_since_last, next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), + cached_view: Arc::new(CachedView::new(100, false)), }; let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); let mut watcher = StateWatcher::from(watch_receiver); // when - task.run(&mut watcher).await.unwrap(); + let (report_peer_id, report, reporting_service) = tokio::time::timeout( + Duration::from_secs(1), + wait_until_report_received(&mut report_receiver, &mut task, &mut watcher), + ) + .await + .unwrap(); // then - let (report_peer_id, report, reporting_service) = - report_receiver.recv().await.unwrap(); - watch_sender.send(State::Stopped).unwrap(); assert_eq!( @@ -1776,17 +1802,21 @@ pub mod tests { heartbeat_max_time_since_last, next_check_time: Instant::now(), heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(), + cached_view: Arc::new(CachedView::new(100, false)), }; let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started); let mut watcher = StateWatcher::from(watch_receiver); // when - task.run(&mut watcher).await.unwrap(); + // we run this in a loop to ensure that the task is run until it reports + let (report_peer_id, report, reporting_service) = tokio::time::timeout( + Duration::from_secs(1), + wait_until_report_received(&mut report_receiver, &mut task, &mut watcher), + ) + .await + .unwrap(); // then - let (report_peer_id, report, reporting_service) = - report_receiver.recv().await.unwrap(); - watch_sender.send(State::Stopped).unwrap(); assert_eq!( @@ -1800,6 +1830,19 @@ pub mod tests { assert_eq!(reporting_service, "p2p"); } + async fn wait_until_report_received( + report_receiver: &mut Receiver<(FuelPeerId, AppScore, String)>, + task: &mut Task, + watcher: &mut StateWatcher, + ) -> (FuelPeerId, AppScore, String) { + loop { + task.run(watcher).await.unwrap(); + if let Ok((peer_id, recv_report, service)) = report_receiver.try_recv() { + return (peer_id, recv_report, service); + } + } + } + #[tokio::test] async fn should_process_all_imported_block_under_infinite_events_from_p2p() { // Given @@ -1838,6 +1881,7 @@ pub mod tests { heartbeat_max_time_since_last: Default::default(), next_check_time: Instant::now(), heartbeat_peer_reputation_config: Default::default(), + cached_view: Arc::new(CachedView::new(100, false)), }; let mut watcher = StateWatcher::started(); // End of initialization