diff --git a/prdoc/pr_7021.prdoc b/prdoc/pr_7021.prdoc new file mode 100644 index 000000000000..5443579bbd92 --- /dev/null +++ b/prdoc/pr_7021.prdoc @@ -0,0 +1,8 @@ +title: Improve remote externalities logging +doc: +- audience: Node Dev + description: |- + Automatically detect if current env is tty. If not disable the spinner logging. +crates: +- name: frame-remote-externalities + bump: patch diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs index 75a2ac2aef41..4c49663260bb 100644 --- a/substrate/utils/frame/remote-externalities/src/lib.rs +++ b/substrate/utils/frame/remote-externalities/src/lib.rs @@ -20,6 +20,8 @@ //! An equivalent of `sp_io::TestExternalities` that can load its state from a remote substrate //! based chain, or a local state snapshot file. +mod logging; + use codec::{Compact, Decode, Encode}; use indicatif::{ProgressBar, ProgressStyle}; use jsonrpsee::{core::params::ArrayParams, http_client::HttpClient}; @@ -37,7 +39,6 @@ use sp_runtime::{ StateVersion, }; use sp_state_machine::TestExternalities; -use spinners::{Spinner, Spinners}; use std::{ cmp::{max, min}, fs, @@ -49,6 +50,8 @@ use std::{ use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi}; use tokio_retry::{strategy::FixedInterval, Retry}; +type Result = std::result::Result; + type KeyValue = (StorageKey, StorageData); type TopKeyValues = Vec; type ChildKeyValues = Vec<(ChildInfo, Vec)>; @@ -87,7 +90,7 @@ impl Snapshot { } } - fn load(path: &PathBuf) -> Result, &'static str> { + fn load(path: &PathBuf) -> Result> { let bytes = fs::read(path).map_err(|_| "fs::read failed.")?; // The first item in the SCALE encoded struct bytes is the snapshot version. We decode and // check that first, before proceeding to decode the rest of the snapshot. @@ -168,9 +171,9 @@ impl Transport { } // Build an HttpClient from a URI. - async fn init(&mut self) -> Result<(), &'static str> { + async fn init(&mut self) -> Result<()> { if let Self::Uri(uri) = self { - log::debug!(target: LOG_TARGET, "initializing remote client to {:?}", uri); + debug!(target: LOG_TARGET, "initializing remote client to {uri:?}"); // If we have a ws uri, try to convert it to an http uri. // We use an HTTP client rather than WS because WS starts to choke with "accumulated @@ -178,11 +181,11 @@ impl Transport { // from a node running a default configuration. let uri = if uri.starts_with("ws://") { let uri = uri.replace("ws://", "http://"); - log::info!(target: LOG_TARGET, "replacing ws:// in uri with http://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri); + info!(target: LOG_TARGET, "replacing ws:// in uri with http://: {uri:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)"); uri } else if uri.starts_with("wss://") { let uri = uri.replace("wss://", "https://"); - log::info!(target: LOG_TARGET, "replacing wss:// in uri with https://: {:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)", uri); + info!(target: LOG_TARGET, "replacing wss:// in uri with https://: {uri:?} (ws is currently unstable for fetching remote storage, for more see https://github.com/paritytech/jsonrpsee/issues/1086)"); uri } else { uri.clone() @@ -193,7 +196,7 @@ impl Transport { .request_timeout(std::time::Duration::from_secs(60 * 5)) .build(uri) .map_err(|e| { - log::error!(target: LOG_TARGET, "error: {:?}", e); + error!(target: LOG_TARGET, "error: {e:?}"); "failed to build http client" })?; @@ -364,23 +367,23 @@ where &self, key: StorageKey, maybe_at: Option, - ) -> Result, &'static str> { + ) -> Result> { trace!(target: LOG_TARGET, "rpc: get_storage"); self.as_online().rpc_client().storage(key, maybe_at).await.map_err(|e| { - error!(target: LOG_TARGET, "Error = {:?}", e); + error!(target: LOG_TARGET, "Error = {e:?}"); "rpc get_storage failed." }) } /// Get the latest finalized head. - async fn rpc_get_head(&self) -> Result { + async fn rpc_get_head(&self) -> Result { trace!(target: LOG_TARGET, "rpc: finalized_head"); // sadly this pretty much unreadable... ChainApi::<(), _, B::Header, ()>::finalized_head(self.as_online().rpc_client()) .await .map_err(|e| { - error!(target: LOG_TARGET, "Error = {:?}", e); + error!(target: LOG_TARGET, "Error = {e:?}"); "rpc finalized_head failed." }) } @@ -390,13 +393,13 @@ where prefix: Option, start_key: Option, at: B::Hash, - ) -> Result, &'static str> { + ) -> Result> { self.as_online() .rpc_client() .storage_keys_paged(prefix, Self::DEFAULT_KEY_DOWNLOAD_PAGE, start_key, Some(at)) .await .map_err(|e| { - error!(target: LOG_TARGET, "Error = {:?}", e); + error!(target: LOG_TARGET, "Error = {e:?}"); "rpc get_keys failed" }) } @@ -407,7 +410,7 @@ where prefix: &StorageKey, block: B::Hash, parallel: usize, - ) -> Result, &'static str> { + ) -> Result> { /// Divide the workload and return the start key of each chunks. Guaranteed to return a /// non-empty list. fn gen_start_keys(prefix: &StorageKey) -> Vec { @@ -491,7 +494,7 @@ where block: B::Hash, start_key: Option<&StorageKey>, end_key: Option<&StorageKey>, - ) -> Result, &'static str> { + ) -> Result> { let mut last_key: Option<&StorageKey> = start_key; let mut keys: Vec = vec![]; @@ -518,11 +521,11 @@ where // scraping out of range or no more matches, // we are done either way if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize { - log::debug!(target: LOG_TARGET, "last page received: {}", page_len); + debug!(target: LOG_TARGET, "last page received: {page_len}"); break } - log::debug!( + debug!( target: LOG_TARGET, "new total = {}, full page received: {}", keys.len(), @@ -589,11 +592,10 @@ where let total_payloads = payloads.len(); while start_index < total_payloads { - log::debug!( + debug!( target: LOG_TARGET, - "Remaining payloads: {} Batch request size: {}", + "Remaining payloads: {} Batch request size: {batch_size}", total_payloads - start_index, - batch_size, ); let end_index = usize::min(start_index + batch_size, total_payloads); @@ -620,18 +622,16 @@ where retries += 1; let failure_log = format!( - "Batch request failed ({}/{} retries). Error: {}", - retries, - Self::MAX_RETRIES, - e + "Batch request failed ({retries}/{} retries). Error: {e}", + Self::MAX_RETRIES ); // after 2 subsequent failures something very wrong is happening. log a warning // and reset the batch size down to 1. if retries >= 2 { - log::warn!("{}", failure_log); + warn!("{failure_log}"); batch_size = 1; } else { - log::debug!("{}", failure_log); + debug!("{failure_log}"); // Decrease batch size by DECREASE_FACTOR batch_size = (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize; @@ -655,13 +655,11 @@ where ) }; - log::debug!( + debug!( target: LOG_TARGET, - "Request duration: {:?} Target duration: {:?} Last batch size: {} Next batch size: {}", - request_duration, + "Request duration: {request_duration:?} Target duration: {:?} Last batch size: {} Next batch size: {batch_size}", Self::REQUEST_DURATION_TARGET, end_index - start_index, - batch_size ); let batch_response_len = batch_response.len(); @@ -689,21 +687,24 @@ where prefix: StorageKey, at: B::Hash, pending_ext: &mut TestExternalities>, - ) -> Result, &'static str> { - let start = Instant::now(); - let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into()); - // TODO We could start downloading when having collected the first batch of keys - // https://github.com/paritytech/polkadot-sdk/issues/2494 - let keys = self - .rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS) - .await? - .into_iter() - .collect::>(); - sp.stop_with_message(format!( - "✅ Found {} keys ({:.2}s)", - keys.len(), - start.elapsed().as_secs_f32() - )); + ) -> Result> { + let keys = logging::with_elapsed_async( + || async { + // TODO: We could start downloading when having collected the first batch of keys. + // https://github.com/paritytech/polkadot-sdk/issues/2494 + let keys = self + .rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS) + .await? + .into_iter() + .collect::>(); + + Ok(keys) + }, + "Scraping keys...", + |keys| format!("Found {} keys", keys.len()), + ) + .await?; + if keys.is_empty() { return Ok(Default::default()) } @@ -735,7 +736,7 @@ where let storage_data = match storage_data_result { Ok(storage_data) => storage_data.into_iter().flatten().collect::>(), Err(e) => { - log::error!(target: LOG_TARGET, "Error while getting storage data: {}", e); + error!(target: LOG_TARGET, "Error while getting storage data: {e}"); return Err("Error while getting storage data") }, }; @@ -751,27 +752,31 @@ where .map(|(key, maybe_value)| match maybe_value { Some(data) => (key.clone(), data), None => { - log::warn!(target: LOG_TARGET, "key {:?} had none corresponding value.", &key); + warn!(target: LOG_TARGET, "key {key:?} had none corresponding value."); let data = StorageData(vec![]); (key.clone(), data) }, }) .collect::>(); - let mut sp = Spinner::with_timer(Spinners::Dots, "Inserting keys into DB...".into()); - let start = Instant::now(); - pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| { - // Don't insert the child keys here, they need to be inserted separately with all their - // data in the load_child_remote function. - match is_default_child_storage_key(&k.0) { - true => None, - false => Some((k.0, v.0)), - } - })); - sp.stop_with_message(format!( - "✅ Inserted keys into DB ({:.2}s)", - start.elapsed().as_secs_f32() - )); + logging::with_elapsed( + || { + pending_ext.batch_insert(key_values.clone().into_iter().filter_map(|(k, v)| { + // Don't insert the child keys here, they need to be inserted separately with + // all their data in the load_child_remote function. + match is_default_child_storage_key(&k.0) { + true => None, + false => Some((k.0, v.0)), + } + })); + + Ok(()) + }, + "Inserting keys into DB...", + |_| "Inserted keys into DB".into(), + ) + .expect("must succeed; qed"); + Ok(key_values) } @@ -781,7 +786,7 @@ where prefixed_top_key: &StorageKey, child_keys: Vec, at: B::Hash, - ) -> Result, &'static str> { + ) -> Result> { let child_keys_len = child_keys.len(); let payloads = child_keys @@ -803,7 +808,7 @@ where match Self::get_storage_data_dynamic_batch_size(client, payloads, &bar).await { Ok(storage_data) => storage_data, Err(e) => { - log::error!(target: LOG_TARGET, "batch processing failed: {:?}", e); + error!(target: LOG_TARGET, "batch processing failed: {e:?}"); return Err("batch processing failed") }, }; @@ -816,7 +821,7 @@ where .map(|(key, maybe_value)| match maybe_value { Some(v) => (key.clone(), v), None => { - log::warn!(target: LOG_TARGET, "key {:?} had no corresponding value.", &key); + warn!(target: LOG_TARGET, "key {key:?} had no corresponding value."); (key.clone(), StorageData(vec![])) }, }) @@ -828,7 +833,7 @@ where prefixed_top_key: &StorageKey, child_prefix: StorageKey, at: B::Hash, - ) -> Result, &'static str> { + ) -> Result> { let retry_strategy = FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES); let mut all_child_keys = Vec::new(); @@ -850,7 +855,7 @@ where let child_keys = Retry::spawn(retry_strategy.clone(), get_child_keys_closure) .await .map_err(|e| { - error!(target: LOG_TARGET, "Error = {:?}", e); + error!(target: LOG_TARGET, "Error = {e:?}"); "rpc child_get_keys failed." })?; @@ -896,7 +901,7 @@ where &self, top_kv: &[KeyValue], pending_ext: &mut TestExternalities>, - ) -> Result { + ) -> Result { let child_roots = top_kv .iter() .filter(|(k, _)| is_default_child_storage_key(k.as_ref())) @@ -904,7 +909,7 @@ where .collect::>(); if child_roots.is_empty() { - info!(target: LOG_TARGET, "👩‍👦 no child roots found to scrape",); + info!(target: LOG_TARGET, "👩‍👦 no child roots found to scrape"); return Ok(Default::default()) } @@ -930,7 +935,7 @@ where let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) { Some((ChildType::ParentKeyId, storage_key)) => storage_key, None => { - log::error!(target: LOG_TARGET, "invalid key: {:?}", prefixed_top_key); + error!(target: LOG_TARGET, "invalid key: {prefixed_top_key:?}"); return Err("Invalid child key") }, }; @@ -954,13 +959,13 @@ where async fn load_top_remote( &self, pending_ext: &mut TestExternalities>, - ) -> Result { + ) -> Result { let config = self.as_online(); let at = self .as_online() .at .expect("online config must be initialized by this point; qed."); - log::info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {:?}", at); + info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {at:?}"); let mut keys_and_values = Vec::new(); for prefix in &config.hashed_prefixes { @@ -968,7 +973,7 @@ where let additional_key_values = self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?; let elapsed = now.elapsed(); - log::info!( + info!( target: LOG_TARGET, "adding data for hashed prefix: {:?}, took {:.2}s", HexDisplay::from(prefix), @@ -979,7 +984,7 @@ where for key in &config.hashed_keys { let key = StorageKey(key.to_vec()); - log::info!( + info!( target: LOG_TARGET, "adding data for hashed key: {:?}", HexDisplay::from(&key) @@ -990,7 +995,7 @@ where keys_and_values.push((key, value)); }, None => { - log::warn!( + warn!( target: LOG_TARGET, "no data found for hashed key: {:?}", HexDisplay::from(&key) @@ -1005,17 +1010,16 @@ where /// The entry point of execution, if `mode` is online. /// /// initializes the remote client in `transport`, and sets the `at` field, if not specified. - async fn init_remote_client(&mut self) -> Result<(), &'static str> { + async fn init_remote_client(&mut self) -> Result<()> { // First, initialize the http client. self.as_online_mut().transport.init().await?; // Then, if `at` is not set, set it. if self.as_online().at.is_none() { let at = self.rpc_get_head().await?; - log::info!( + info!( target: LOG_TARGET, - "since no at is provided, setting it to latest finalized head, {:?}", - at + "since no at is provided, setting it to latest finalized head, {at:?}", ); self.as_online_mut().at = Some(at); } @@ -1040,7 +1044,7 @@ where .filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX) .count() == 0 { - log::info!( + info!( target: LOG_TARGET, "since no prefix is filtered, the data for all pallets will be downloaded" ); @@ -1050,7 +1054,7 @@ where Ok(()) } - async fn load_header(&self) -> Result { + async fn load_header(&self) -> Result { let retry_strategy = FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES); let get_header_closure = || { @@ -1069,14 +1073,12 @@ where /// `load_child_remote`. /// /// Must be called after `init_remote_client`. - async fn load_remote_and_maybe_save( - &mut self, - ) -> Result>, &'static str> { + async fn load_remote_and_maybe_save(&mut self) -> Result>> { let state_version = StateApi::::runtime_version(self.as_online().rpc_client(), None) .await .map_err(|e| { - error!(target: LOG_TARGET, "Error = {:?}", e); + error!(target: LOG_TARGET, "Error = {e:?}"); "rpc runtime_version failed." }) .map(|v| v.state_version())?; @@ -1100,11 +1102,10 @@ where self.load_header().await?, ); let encoded = snapshot.encode(); - log::info!( + info!( target: LOG_TARGET, - "writing snapshot of {} bytes to {:?}", + "writing snapshot of {} bytes to {path:?}", encoded.len(), - path ); std::fs::write(path, encoded).map_err(|_| "fs::write failed")?; @@ -1119,33 +1120,35 @@ where Ok(pending_ext) } - async fn do_load_remote(&mut self) -> Result, &'static str> { + async fn do_load_remote(&mut self) -> Result> { self.init_remote_client().await?; let inner_ext = self.load_remote_and_maybe_save().await?; Ok(RemoteExternalities { header: self.load_header().await?, inner_ext }) } - fn do_load_offline( - &mut self, - config: OfflineConfig, - ) -> Result, &'static str> { - let mut sp = Spinner::with_timer(Spinners::Dots, "Loading snapshot...".into()); - let start = Instant::now(); - info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path); - let Snapshot { snapshot_version: _, header, state_version, raw_storage, storage_root } = - Snapshot::::load(&config.state_snapshot.path)?; - - let inner_ext = TestExternalities::from_raw_snapshot( - raw_storage, - storage_root, - self.overwrite_state_version.unwrap_or(state_version), - ); - sp.stop_with_message(format!("✅ Loaded snapshot ({:.2}s)", start.elapsed().as_secs_f32())); + fn do_load_offline(&mut self, config: OfflineConfig) -> Result> { + let (header, inner_ext) = logging::with_elapsed( + || { + info!(target: LOG_TARGET, "Loading snapshot from {:?}", &config.state_snapshot.path); + + let Snapshot { header, state_version, raw_storage, storage_root, .. } = + Snapshot::::load(&config.state_snapshot.path)?; + let inner_ext = TestExternalities::from_raw_snapshot( + raw_storage, + storage_root, + self.overwrite_state_version.unwrap_or(state_version), + ); + + Ok((header, inner_ext)) + }, + "Loading snapshot...", + |_| "Loaded snapshot".into(), + )?; Ok(RemoteExternalities { inner_ext, header }) } - pub(crate) async fn pre_build(mut self) -> Result, &'static str> { + pub(crate) async fn pre_build(mut self) -> Result> { let mut ext = match self.mode.clone() { Mode::Offline(config) => self.do_load_offline(config)?, Mode::Online(_) => self.do_load_remote().await?, @@ -1159,7 +1162,7 @@ where // inject manual key values. if !self.hashed_key_values.is_empty() { - log::info!( + info!( target: LOG_TARGET, "extending externalities with {} manually injected key-values", self.hashed_key_values.len() @@ -1169,7 +1172,7 @@ where // exclude manual key values. if !self.hashed_blacklist.is_empty() { - log::info!( + info!( target: LOG_TARGET, "excluding externalities from {} keys", self.hashed_blacklist.len() @@ -1221,7 +1224,7 @@ where self } - pub async fn build(self) -> Result, &'static str> { + pub async fn build(self) -> Result> { let mut ext = self.pre_build().await?; ext.commit_all().unwrap(); diff --git a/substrate/utils/frame/remote-externalities/src/logging.rs b/substrate/utils/frame/remote-externalities/src/logging.rs new file mode 100644 index 000000000000..7ab901c004de --- /dev/null +++ b/substrate/utils/frame/remote-externalities/src/logging.rs @@ -0,0 +1,86 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + future::Future, + io::{self, IsTerminal}, + time::Instant, +}; + +use spinners::{Spinner, Spinners}; + +use super::Result; + +// A simple helper to time a operation with a nice spinner, start message, and end message. +// +// The spinner is only displayed when stdout is a terminal. +pub(super) fn with_elapsed(f: F, start_msg: &str, end_msg: EndMsg) -> Result +where + F: FnOnce() -> Result, + EndMsg: FnOnce(&R) -> String, +{ + let timer = Instant::now(); + let mut maybe_sp = start(start_msg); + + Ok(end(f()?, timer, maybe_sp.as_mut(), end_msg)) +} + +// A simple helper to time an async operation with a nice spinner, start message, and end message. +// +// The spinner is only displayed when stdout is a terminal. +pub(super) async fn with_elapsed_async( + f: F, + start_msg: &str, + end_msg: EndMsg, +) -> Result +where + F: FnOnce() -> Fut, + Fut: Future>, + EndMsg: FnOnce(&R) -> String, +{ + let timer = Instant::now(); + let mut maybe_sp = start(start_msg); + + Ok(end(f().await?, timer, maybe_sp.as_mut(), end_msg)) +} + +fn start(start_msg: &str) -> Option { + let msg = format!("⏳ {start_msg}"); + + if io::stdout().is_terminal() { + Some(Spinner::new(Spinners::Dots, msg)) + } else { + println!("{msg}"); + + None + } +} + +fn end(val: T, timer: Instant, maybe_sp: Option<&mut Spinner>, end_msg: EndMsg) -> T +where + EndMsg: FnOnce(&T) -> String, +{ + let msg = format!("✅ {} in {:.2}s", end_msg(&val), timer.elapsed().as_secs_f32()); + + if let Some(sp) = maybe_sp { + sp.stop_with_message(msg); + } else { + println!("{msg}"); + } + + val +}