From 6a021c8bbc0cdf1e38805e6140a509fc4e27a56d Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 25 Sep 2023 10:32:58 +0300 Subject: [PATCH] Get best and finalized head updates from subscription (#2166) * read best + best finalized headers from subscription * spelling * clippy --- .../client-substrate/src/client/caching.rs | 158 ++++++++++++++++-- .../client-substrate/src/client/client.rs | 5 + .../relays/client-substrate/src/client/mod.rs | 2 +- .../relays/client-substrate/src/client/rpc.rs | 42 +++++ .../client-substrate/src/client/rpc_api.rs | 14 ++ bridges/relays/client-substrate/src/error.rs | 52 ++++++ 6 files changed, 260 insertions(+), 13 deletions(-) diff --git a/bridges/relays/client-substrate/src/client/caching.rs b/bridges/relays/client-substrate/src/client/caching.rs index e6dc798a542e..a6116988a490 100644 --- a/bridges/relays/client-substrate/src/client/caching.rs +++ b/bridges/relays/client-substrate/src/client/caching.rs @@ -24,19 +24,23 @@ use crate::{ HashOf, HeaderIdOf, HeaderOf, NonceOf, SignedBlockOf, SimpleRuntimeVersion, Subscription, TransactionTracker, UnsignedTransaction, ANCIENT_BLOCK_THRESHOLD, }; -use std::future::Future; +use std::{cmp::Ordering, future::Future, task::Poll}; -use async_std::sync::{Arc, Mutex, RwLock}; +use async_std::{ + sync::{Arc, Mutex, RwLock}, + task::JoinHandle, +}; use async_trait::async_trait; use codec::Encode; use frame_support::weights::Weight; +use futures::{FutureExt, StreamExt}; use quick_cache::unsync::Cache; use sp_consensus_grandpa::{AuthorityId, OpaqueKeyOwnershipProof, SetId}; use sp_core::{ storage::{StorageData, StorageKey}, Bytes, Pair, }; -use sp_runtime::transaction_validity::TransactionValidity; +use sp_runtime::{traits::Header as _, transaction_validity::TransactionValidity}; use sp_trie::StorageProof; use sp_version::RuntimeVersion; @@ -57,6 +61,9 @@ pub struct CachingClient> { struct ClientData { grandpa_justifications: Arc>>>, beefy_justifications: Arc>>>, + background_task_handle: Arc>>>, + best_header: Arc>>>, + best_finalized_header: Arc>>>, // `quick_cache::sync::Cache` has the `get_or_insert_async` method, which fits our needs, // but it uses synchronization primitives that are not aware of async execution. They // can block the executor threads and cause deadlocks => let's use primitives from @@ -70,19 +77,32 @@ struct ClientData { impl> CachingClient { /// Creates new `CachingClient` on top of given `backend`. - pub fn new(backend: B) -> Self { + pub async fn new(backend: B) -> Self { // most of relayer operations will never touch more than `ANCIENT_BLOCK_THRESHOLD` // headers, so we'll use this as a cache capacity for all chain-related caches let chain_state_capacity = ANCIENT_BLOCK_THRESHOLD as usize; + let best_header = Arc::new(RwLock::new(None)); + let best_finalized_header = Arc::new(RwLock::new(None)); + let header_by_hash_cache = Arc::new(RwLock::new(Cache::new(chain_state_capacity))); + let background_task_handle = Self::start_background_task( + backend.clone(), + best_header.clone(), + best_finalized_header.clone(), + header_by_hash_cache.clone(), + ) + .await; CachingClient { backend, data: Arc::new(ClientData { grandpa_justifications: Arc::new(Mutex::new(None)), beefy_justifications: Arc::new(Mutex::new(None)), + background_task_handle: Arc::new(Mutex::new(background_task_handle)), + best_header, + best_finalized_header, header_hash_by_number_cache: Arc::new(RwLock::new(Cache::new( chain_state_capacity, ))), - header_by_hash_cache: Arc::new(RwLock::new(Cache::new(chain_state_capacity))), + header_by_hash_cache, block_by_hash_cache: Arc::new(RwLock::new(Cache::new(chain_state_capacity))), raw_storage_value_cache: Arc::new(RwLock::new(Cache::new(1_024))), state_call_cache: Arc::new(RwLock::new(Cache::new(1_024))), @@ -114,6 +134,7 @@ impl> CachingClient { Ok(value) } + /// Subscribe to finality justifications, trying to reuse existing subscription. async fn subscribe_finality_justifications<'a>( &'a self, maybe_broadcaster: &Mutex>>, @@ -133,6 +154,98 @@ impl> CachingClient { broadcaster.subscribe().await } + + /// Start background task that reads best (and best finalized) headers from subscriptions. + async fn start_background_task( + backend: B, + best_header: Arc>>>, + best_finalized_header: Arc>>>, + header_by_hash_cache: SyncCache, HeaderOf>, + ) -> JoinHandle> { + async_std::task::spawn(async move { + // initialize by reading headers directly from backend to avoid doing that in the + // high-level code + let mut last_finalized_header = + backend.header_by_hash(backend.best_finalized_header_hash().await?).await?; + *best_header.write().await = Some(backend.best_header().await?); + *best_finalized_header.write().await = Some(last_finalized_header.clone()); + + // ...and then continue with subscriptions + let mut best_headers = backend.subscribe_best_headers().await?; + let mut finalized_headers = backend.subscribe_finalized_headers().await?; + loop { + futures::select! { + new_best_header = best_headers.next().fuse() => { + // we assume that the best header is always the actual best header, even if its + // number is lower than the number of previous-best-header (chain may use its own + // best header selection algorithms) + let new_best_header = new_best_header + .ok_or_else(|| Error::ChannelError(format!("Mandatory best headers subscription for {} has finished", C::NAME)))?; + let new_best_header_hash = new_best_header.hash(); + header_by_hash_cache.write().await.insert(new_best_header_hash, new_best_header.clone()); + *best_header.write().await = Some(new_best_header); + }, + new_finalized_header = finalized_headers.next().fuse() => { + // in theory we'll always get finalized headers in order, but let's double check + let new_finalized_header = new_finalized_header. + ok_or_else(|| Error::ChannelError(format!("Finalized headers subscription for {} has finished", C::NAME)))?; + let new_finalized_header_number = *new_finalized_header.number(); + let last_finalized_header_number = *last_finalized_header.number(); + match new_finalized_header_number.cmp(&last_finalized_header_number) { + Ordering::Greater => { + let new_finalized_header_hash = new_finalized_header.hash(); + header_by_hash_cache.write().await.insert(new_finalized_header_hash, new_finalized_header.clone()); + *best_finalized_header.write().await = Some(new_finalized_header.clone()); + last_finalized_header = new_finalized_header; + }, + Ordering::Less => { + return Err(Error::unordered_finalized_headers::( + new_finalized_header_number, + last_finalized_header_number, + )); + }, + _ => (), + } + }, + } + } + }) + } + + /// Ensure that the background task is active. + async fn ensure_background_task_active(&self) -> Result<()> { + let mut background_task_handle = self.data.background_task_handle.lock().await; + if let Poll::Ready(result) = futures::poll!(&mut *background_task_handle) { + return Err(Error::ChannelError(format!( + "Background task of {} client has exited with result: {:?}", + C::NAME, + result + ))) + } + + Ok(()) + } + + /// Try to get header, read elsewhere by background task through subscription. + async fn read_header_from_background<'a>( + &'a self, + header: &Arc>>>, + read_header_from_backend: impl Future>> + 'a, + ) -> Result> { + // ensure that the background task is active + self.ensure_background_task_active().await?; + + // now we know that the background task is active, so we could trust that the + // `header` has the most recent updates from it + match header.read().await.clone() { + Some(header) => Ok(header), + None => { + // header has not yet been read from the subscription, which means that + // we are just starting - let's read header directly from backend this time + read_header_from_backend.await + }, + } + } } impl> std::fmt::Debug for CachingClient { @@ -162,6 +275,16 @@ impl> Client for CachingClient { // since we have new underlying client, we need to restart subscriptions too *self.data.grandpa_justifications.lock().await = None; *self.data.beefy_justifications.lock().await = None; + // also restart background task too + *self.data.best_header.write().await = None; + *self.data.best_finalized_header.write().await = None; + *self.data.background_task_handle.lock().await = Self::start_background_task( + self.backend.clone(), + self.data.best_header.clone(), + self.data.best_finalized_header.clone(), + self.data.header_by_hash_cache.clone(), + ) + .await; Ok(()) } @@ -197,16 +320,27 @@ impl> Client for CachingClient { } async fn best_finalized_header_hash(&self) -> Result> { - // TODO: after https://github.com/paritytech/parity-bridges-common/issues/2074 we may - // use single-value-cache here, but for now let's just call the backend - self.backend.best_finalized_header_hash().await + self.read_header_from_background( + &self.data.best_finalized_header, + self.backend.best_finalized_header(), + ) + .await + .map(|h| h.hash()) } async fn best_header(&self) -> Result> { - // TODO: if after https://github.com/paritytech/parity-bridges-common/issues/2074 we'll - // be using subscriptions to get best blocks, we may use single-value-cache here, but for - // now let's just call the backend - self.backend.best_header().await + self.read_header_from_background(&self.data.best_header, self.backend.best_header()) + .await + } + + async fn subscribe_best_headers(&self) -> Result>> { + // we may share the sunbscription here, but atm there's no callers of this method + self.backend.subscribe_best_headers().await + } + + async fn subscribe_finalized_headers(&self) -> Result>> { + // we may share the sunbscription here, but atm there's no callers of this method + self.backend.subscribe_finalized_headers().await } async fn subscribe_grandpa_finality_justifications(&self) -> Result> diff --git a/bridges/relays/client-substrate/src/client/client.rs b/bridges/relays/client-substrate/src/client/client.rs index 9bc843d6e1ab..49f5c001c3f7 100644 --- a/bridges/relays/client-substrate/src/client/client.rs +++ b/bridges/relays/client-substrate/src/client/client.rs @@ -75,6 +75,11 @@ pub trait Client: 'static + Send + Sync + Clone + Debug { Ok(self.best_header().await?.hash()) } + /// Subscribe to new best headers. + async fn subscribe_best_headers(&self) -> Result>>; + /// Subscribe to new finalized headers. + async fn subscribe_finalized_headers(&self) -> Result>>; + /// Subscribe to GRANDPA finality justifications. async fn subscribe_grandpa_finality_justifications(&self) -> Result> where diff --git a/bridges/relays/client-substrate/src/client/mod.rs b/bridges/relays/client-substrate/src/client/mod.rs index b60a75868724..8c34bcdcd046 100644 --- a/bridges/relays/client-substrate/src/client/mod.rs +++ b/bridges/relays/client-substrate/src/client/mod.rs @@ -41,7 +41,7 @@ pub type RpcWithCachingClient = CachingClient>; /// Creates new RPC client with caching support. pub async fn rpc_with_caching(params: ConnectionParams) -> RpcWithCachingClient { let rpc = rpc::RpcClient::::new(params).await; - caching::CachingClient::new(rpc) + caching::CachingClient::new(rpc).await } /// The difference between best block number and number of its ancestor, that is enough diff --git a/bridges/relays/client-substrate/src/client/rpc.rs b/bridges/relays/client-substrate/src/client/rpc.rs index 381946c614ff..bf7442a95141 100644 --- a/bridges/relays/client-substrate/src/client/rpc.rs +++ b/bridges/relays/client-substrate/src/client/rpc.rs @@ -254,6 +254,7 @@ impl RpcClient { .await } + /// Subscribe to finality justifications. async fn subscribe_finality_justifications( &self, gadget_name: &str, @@ -272,6 +273,27 @@ impl RpcClient { subscription, )) } + + /// Subscribe to headers stream. + async fn subscribe_headers( + &self, + stream_name: &str, + do_subscribe: impl FnOnce(Arc) -> Fut + Send + 'static, + map_err: impl FnOnce(Error) -> Error, + ) -> Result>> + where + Fut: Future>, ClientError>> + Send, + { + let subscription = self + .jsonrpsee_execute(move |client| async move { Ok(do_subscribe(client).await?) }) + .map_err(map_err) + .await?; + + Ok(Subscription::new_forwarded( + StreamDescription::new(format!("{} headers", stream_name), C::NAME.into()), + subscription, + )) + } } impl Clone for RpcClient { @@ -356,6 +378,26 @@ impl Client for RpcClient { .map_err(|e| Error::failed_to_read_best_header::(e)) } + async fn subscribe_best_headers(&self) -> Result>> { + self.subscribe_headers( + "best headers", + move |client| async move { SubstrateChainClient::::subscribe_new_heads(&*client).await }, + |e| Error::failed_to_subscribe_best_headers::(e), + ) + .await + } + + async fn subscribe_finalized_headers(&self) -> Result>> { + self.subscribe_headers( + "best finalized headers", + move |client| async move { + SubstrateChainClient::::subscribe_finalized_heads(&*client).await + }, + |e| Error::failed_to_subscribe_finalized_headers::(e), + ) + .await + } + async fn subscribe_grandpa_finality_justifications(&self) -> Result> where C: ChainWithGrandpa, diff --git a/bridges/relays/client-substrate/src/client/rpc_api.rs b/bridges/relays/client-substrate/src/client/rpc_api.rs index 5af403406426..9cac69f7a13d 100644 --- a/bridges/relays/client-substrate/src/client/rpc_api.rs +++ b/bridges/relays/client-substrate/src/client/rpc_api.rs @@ -54,6 +54,20 @@ pub(crate) trait SubstrateChain { /// Return signed block (with justifications) by its hash. #[method(name = "getBlock")] async fn block(&self, block_hash: Option) -> RpcResult; + /// Subscribe to best headers. + #[subscription( + name = "subscribeNewHeads" => "newHead", + unsubscribe = "unsubscribeNewHeads", + item = C::Header + )] + async fn subscribe_new_heads(&self); + /// Subscribe to finalized headers. + #[subscription( + name = "subscribeFinalizedHeads" => "finalizedHead", + unsubscribe = "unsubscribeFinalizedHeads", + item = C::Header + )] + async fn subscribe_finalized_heads(&self); } /// RPC methods of Substrate `author` namespace, that we are using. diff --git a/bridges/relays/client-substrate/src/error.rs b/bridges/relays/client-substrate/src/error.rs index 4be65864eb44..f4c093a0be00 100644 --- a/bridges/relays/client-substrate/src/error.rs +++ b/bridges/relays/client-substrate/src/error.rs @@ -173,6 +173,22 @@ pub enum Error { error: Box, }, /// Failed to subscribe to GRANDPA justifications stream. + #[error("Failed to subscribe to {chain} best headers: {error:?}.")] + FailedToSubscribeBestHeaders { + /// Name of the chain where the error has happened. + chain: String, + /// Underlying error. + error: Box, + }, + /// Failed to subscribe to GRANDPA justifications stream. + #[error("Failed to subscribe to {chain} finalized headers: {error:?}.")] + FailedToSubscribeFinalizedHeaders { + /// Name of the chain where the error has happened. + chain: String, + /// Underlying error. + error: Box, + }, + /// Failed to subscribe to GRANDPA justifications stream. #[error("Failed to subscribe to {chain} justifications: {error:?}.")] FailedToSubscribeJustifications { /// Name of the chain where the error has happened. @@ -180,6 +196,17 @@ pub enum Error { /// Underlying error. error: Box, }, + /// Headers of the chain are finalized out of order. Maybe chain has been + /// restarted? + #[error("Finalized headers of {chain} are unordered: previously finalized {prev_number} vs new {next_number}")] + UnorderedFinalizedHeaders { + /// Name of the chain where the error has happened. + chain: String, + /// Previously finalized header number. + prev_number: String, + /// New finalized header number. + next_number: String, + }, /// The bridge pallet is halted and all transactions will be rejected. #[error("Bridge pallet is halted.")] BridgePalletIsHalted, @@ -247,6 +274,8 @@ impl Error { Self::FailedStateCall { ref error, .. } => Some(&**error), Self::FailedToProveStorage { ref error, .. } => Some(&**error), Self::FailedToGetSystemHealth { ref error, .. } => Some(&**error), + Self::FailedToSubscribeBestHeaders { ref error, .. } => Some(&**error), + Self::FailedToSubscribeFinalizedHeaders { ref error, .. } => Some(&**error), Self::FailedToSubscribeJustifications { ref error, .. } => Some(&**error), _ => None, } @@ -356,10 +385,32 @@ impl Error { Error::FailedToGetSystemHealth { chain: C::NAME.into(), error: e.boxed() } } + /// Constructs `FailedToSubscribeBestHeaders` variant. + pub fn failed_to_subscribe_best_headers(e: Error) -> Self { + Error::FailedToSubscribeBestHeaders { chain: C::NAME.into(), error: e.boxed() } + } + + /// Constructs `FailedToSubscribeFinalizedHeaders` variant. + pub fn failed_to_subscribe_finalized_headers(e: Error) -> Self { + Error::FailedToSubscribeFinalizedHeaders { chain: C::NAME.into(), error: e.boxed() } + } + /// Constructs `FailedToSubscribeJustifications` variant. pub fn failed_to_subscribe_justification(e: Error) -> Self { Error::FailedToSubscribeJustifications { chain: C::NAME.into(), error: e.boxed() } } + + /// Constructs `Un` + pub fn unordered_finalized_headers( + prev_number: BlockNumberOf, + next_number: BlockNumberOf, + ) -> Self { + Error::UnorderedFinalizedHeaders { + chain: C::NAME.into(), + prev_number: format!("{}", prev_number), + next_number: format!("{}", next_number), + } + } } impl MaybeConnectionError for Error { @@ -369,6 +420,7 @@ impl MaybeConnectionError for Error { Error::RpcError(ref e) => matches!(*e, RpcError::Transport(_) | RpcError::RestartNeeded(_),), Error::ClientNotSynced(_) => true, + Error::UnorderedFinalizedHeaders { .. } => true, _ => self.nested().map(|e| e.is_connection_error()).unwrap_or(false), } }