From 0ded016f43f18de310132ab99856867108666c3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Fri, 6 Oct 2023 22:30:03 +0800 Subject: [PATCH] refactor: meta-service: re-implement export with Stream - Updated the `export()` function in `grpc_service.rs` to use crate `futures-async-stream`, changed return type of the function to use a `BoxStream` that yields a series of JSON strings, instead of returning a big `Vec`. This allows for more efficient handling of asynchronous streams, which can provide performance improvements. Note that `export(self: Arc)` takes an `Arc` as argument because the generated `gRPC` stream signature requires a `'static` lifetime. As a result, the `export(&self)` can not be used: ``` type ExportStream: Stream> + Send + 'static; ``` - Add dependency crate `futures-async-stream`; - Enable unstable feature `#![feature(generators)]` for building a `Stream` from a generator(`yield ...`); --- Cargo.lock | 23 ++++++++++++ Cargo.toml | 1 + src/binaries/metactl/snapshot.rs | 10 ++++-- src/meta/service/Cargo.toml | 1 + src/meta/service/src/api/grpc/grpc_service.rs | 31 +++++++++------- src/meta/service/src/lib.rs | 1 + src/meta/service/src/store/store.rs | 4 +++ src/meta/service/src/store/store_inner.rs | 35 ++++++++----------- 8 files changed, 70 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81609667f029..4a8e2597b69b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3768,6 +3768,7 @@ dependencies = [ "derive_more", "env_logger", "futures", + "futures-async-stream", "itertools 0.10.5", "lazy_static", "log", @@ -4816,6 +4817,28 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-async-stream" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f529ccdeacfa2446a9577041686cf1abb839b1b3e15fee4c1b1232ab3b7d799f" +dependencies = [ + "futures-async-stream-macro", + "futures-core", + "pin-project", +] + +[[package]] +name = "futures-async-stream-macro" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b48ee06dc8d2808ba5ebad075d06c3406085bb19deaac33be64c39113bf80" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "futures-channel" version = "0.3.28" diff --git a/Cargo.toml b/Cargo.toml index 1ccf7dfaa6fa..1ea78215fb42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -133,6 +133,7 @@ lazy_static = "1.4.0" # future and async futures = "0.3.24" futures-util = "0.3.24" +futures-async-stream = { version = "0.2.7" } stream-more = "0.1.3" bytes = "1.5.0" diff --git a/src/binaries/metactl/snapshot.rs b/src/binaries/metactl/snapshot.rs index 0b4691608975..62ee1430618a 100644 --- a/src/binaries/metactl/snapshot.rs +++ b/src/binaries/metactl/snapshot.rs @@ -25,6 +25,7 @@ use std::net::SocketAddr; use std::net::ToSocketAddrs; use std::path::Path; use std::str::FromStr; +use std::sync::Arc; use anyhow::anyhow; use common_base::base::tokio; @@ -55,6 +56,7 @@ use common_meta_types::NodeId; use common_meta_types::StoredMembership; use databend_meta::store::RaftStore; use databend_meta::store::StoreInner; +use futures::TryStreamExt; use tokio::net::TcpSocket; use url::Url; @@ -467,7 +469,7 @@ async fn export_from_dir(config: &Config) -> anyhow::Result<()> { let raft_config: RaftConfig = config.raft_config.clone().into(); let sto_inn = StoreInner::open_create(&raft_config, Some(()), None).await?; - let lines = sto_inn.export().await?; + let mut lines = Arc::new(sto_inn).export(); eprintln!(" From: {}", raft_config.raft_dir); @@ -479,9 +481,11 @@ async fn export_from_dir(config: &Config) -> anyhow::Result<()> { None }; - let cnt = lines.len(); + let mut cnt = 0; + + while let Some(line) = lines.try_next().await? { + cnt += 1; - for line in lines { if file.as_ref().is_none() { println!("{}", line); } else { diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index 6821b383c6f2..3ae589475f45 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -51,6 +51,7 @@ backon = "0.4" clap = { workspace = true } derive_more = { workspace = true } futures = "0.3.24" +futures-async-stream = { workspace = true } itertools = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } diff --git a/src/meta/service/src/api/grpc/grpc_service.rs b/src/meta/service/src/api/grpc/grpc_service.rs index aeace275074a..25148c7bedd9 100644 --- a/src/meta/service/src/api/grpc/grpc_service.rs +++ b/src/meta/service/src/api/grpc/grpc_service.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io; use std::pin::Pin; use std::sync::Arc; use std::task::Context; @@ -40,7 +41,9 @@ use common_meta_types::TxnReply; use common_meta_types::TxnRequest; use common_metrics::counter::Count; use common_tracing::func_name; +use futures::stream::TryChunksError; use futures::StreamExt; +use futures::TryStreamExt; use log::debug; use log::info; use minitrace::prelude::*; @@ -194,7 +197,7 @@ impl MetaService for MetaServiceImpl { let elapsed = t0.elapsed(); info!("Handled(elapsed: {:?}) MetaGrpcReq: {:?}", elapsed, req); - Ok::<_, tonic::Status>(reply) + Ok::<_, Status>(reply) } .in_span(root) .await?; @@ -243,13 +246,12 @@ impl MetaService for MetaServiceImpl { .await } - type ExportStream = - Pin> + Send + Sync + 'static>>; + type ExportStream = Pin> + Send + 'static>>; - // Export all meta data. - // - // Including raft hard state, logs and state machine. - // The exported data is a list of json strings in form of `(tree_name, sub_tree_prefix, key, value)`. + /// Export all meta data. + /// + /// Including header, raft state, logs and state machine. + /// The exported data is a series of JSON encoded strings of `RaftStoreEntry`. async fn export( &self, _request: Request, @@ -257,16 +259,21 @@ impl MetaService for MetaServiceImpl { let _guard = RequestInFlight::guard(); let meta_node = &self.meta_node; - let res = meta_node.sto.export().await?; + let strm = meta_node.sto.inner().export(); - let stream = ExportStream { data: res }; - let s = stream.map(|strings| Ok(ExportedChunk { data: strings })); + let chunk_size = 32; + // - Chunk up upto 32 Ok items inside a Vec; + // - Convert Vec to ExportedChunk; + // - Convert TryChunkError<_, io::Error> to Status; + let s = strm + .try_chunks(chunk_size) + .map_ok(|chunk: Vec| ExportedChunk { data: chunk }) + .map_err(|e: TryChunksError<_, io::Error>| Status::internal(e.1.to_string())); Ok(Response::new(Box::pin(s))) } - type WatchStream = - Pin> + Send + Sync + 'static>>; + type WatchStream = Pin> + Send + 'static>>; #[minitrace::trace] async fn watch( diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index e0c5c56ca852..ee367ffd0eb0 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(generators)] #![allow(clippy::uninlined_format_args)] pub mod api; diff --git a/src/meta/service/src/store/store.rs b/src/meta/service/src/store/store.rs index 728d8c8bca18..5cb4aab72019 100644 --- a/src/meta/service/src/store/store.rs +++ b/src/meta/service/src/store/store.rs @@ -73,6 +73,10 @@ impl RaftStore { let sto = StoreInner::open_create(config, open, create).await?; Ok(Self::new(sto)) } + + pub fn inner(&self) -> Arc { + self.inner.clone() + } } impl Deref for RaftStore { diff --git a/src/meta/service/src/store/store_inner.rs b/src/meta/service/src/store/store_inner.rs index a424c0cc44a7..d01a79bd0cd6 100644 --- a/src/meta/service/src/store/store_inner.rs +++ b/src/meta/service/src/store/store_inner.rs @@ -387,27 +387,27 @@ impl StoreInner { } /// Export data that can be used to restore a meta-service node. - #[minitrace::trace] - pub async fn export(&self) -> Result, io::Error> { - // Convert to io::Error(InvalidData). + /// + /// Returns a `BoxStream<'a, Result>` that yields a series of JSON strings. + #[futures_async_stream::try_stream(boxed, ok = String, error = io::Error)] + pub async fn export(self: Arc) { + // Convert an error occurred during export to `io::Error(InvalidData)`. fn invalid_data(e: impl std::error::Error + Send + Sync + 'static) -> io::Error { io::Error::new(ErrorKind::InvalidData, e) } - // Lock all components so that we have a consistent view. + // Lock all data components so that we have a consistent view. // // Hold the snapshot lock to prevent snapshot from being replaced until exporting finished. // Holding this lock prevent logs from being purged. // // Although vote and log must be consistent, - // it is OK to export RaftState and logs without transaction protection, + // it is OK to export RaftState and logs without transaction protection(i.e. they do not share a lock), // if it guarantees no logs have a greater `vote` than `RaftState.HardState`. let current_snapshot = self.current_snapshot.read().await; let raft_state = self.raft_state.read().await; let log = self.log.read().await; - let mut res = vec![]; - // Export data header first { let header_tree = SledTree::open(&self.db, TREE_HEADER, false).map_err(invalid_data)?; @@ -416,7 +416,7 @@ impl StoreInner { for kv in header_kvs.iter() { let line = vec_kv_to_json(TREE_HEADER, kv)?; - res.push(line); + yield line; } } @@ -435,8 +435,7 @@ impl StoreInner { }; let s = serde_json::to_string(&(tree_name, ent_id)).map_err(invalid_data)?; - - res.push(s); + yield s; } let vote = ks.get(&RaftStateKey::HardState)?.map(Vote::from); @@ -448,8 +447,7 @@ impl StoreInner { }; let s = serde_json::to_string(&(tree_name, ent_vote)).map_err(invalid_data)?; - - res.push(s); + yield s; } let committed = ks @@ -462,8 +460,7 @@ impl StoreInner { }; let s = serde_json::to_string(&(tree_name, ent_committed)).map_err(invalid_data)?; - - res.push(s); + yield s; }; // Export logs that has smaller or equal leader id as `vote` @@ -477,8 +474,7 @@ impl StoreInner { let tree_kv = (tree_name, kv_entry); let line = serde_json::to_string(&tree_kv).map_err(invalid_data)?; - - res.push(line); + yield line; } } @@ -507,14 +503,11 @@ impl StoreInner { let named_entry = (tree_name, ent); - let l = serde_json::to_string(&named_entry).map_err(invalid_data)?; - - res.push(l); + let line = serde_json::to_string(&named_entry).map_err(invalid_data)?; + yield line; } } } - - Ok(res) } pub async fn get_node(&self, node_id: &NodeId) -> Option {