Skip to content

Commit

Permalink
refactor: meta-service: re-implement export with Stream
Browse files Browse the repository at this point in the history
- Updated the `export()` function in `grpc_service.rs` to use crate
  `futures-async-stream`, changed return type of the function to use a
  `BoxStream` that yields a series of JSON strings, instead of returning
  a big `Vec<String>`.

  This allows for more efficient handling of asynchronous streams, which
  can provide performance improvements.

  Note that `export(self: Arc<Sefl>)` takes an `Arc<Self>` as argument
  because the generated `gRPC` stream signature requires a `'static`
  lifetime. As a result, the `export(&self)` can not be used:

  ```
  type ExportStream: Stream<Item = Result<_, _>> + Send + 'static;
  ```

- Add dependency crate `futures-async-stream`;

- Enable unstable feature `#![feature(generators)]` for building a
  `Stream` from a generator(`yield ...`);
  • Loading branch information
drmingdrmer committed Oct 7, 2023
1 parent e8c1aeb commit 0ded016
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 36 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ lazy_static = "1.4.0"
# future and async
futures = "0.3.24"
futures-util = "0.3.24"
futures-async-stream = { version = "0.2.7" }
stream-more = "0.1.3"
bytes = "1.5.0"

Expand Down
10 changes: 7 additions & 3 deletions src/binaries/metactl/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::anyhow;
use common_base::base::tokio;
Expand Down Expand Up @@ -55,6 +56,7 @@ use common_meta_types::NodeId;
use common_meta_types::StoredMembership;
use databend_meta::store::RaftStore;
use databend_meta::store::StoreInner;
use futures::TryStreamExt;
use tokio::net::TcpSocket;
use url::Url;

Expand Down Expand Up @@ -467,7 +469,7 @@ async fn export_from_dir(config: &Config) -> anyhow::Result<()> {
let raft_config: RaftConfig = config.raft_config.clone().into();

let sto_inn = StoreInner::open_create(&raft_config, Some(()), None).await?;
let lines = sto_inn.export().await?;
let mut lines = Arc::new(sto_inn).export();

eprintln!(" From: {}", raft_config.raft_dir);

Expand All @@ -479,9 +481,11 @@ async fn export_from_dir(config: &Config) -> anyhow::Result<()> {
None
};

let cnt = lines.len();
let mut cnt = 0;

while let Some(line) = lines.try_next().await? {
cnt += 1;

for line in lines {
if file.as_ref().is_none() {
println!("{}", line);
} else {
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ backon = "0.4"
clap = { workspace = true }
derive_more = { workspace = true }
futures = "0.3.24"
futures-async-stream = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
Expand Down
31 changes: 19 additions & 12 deletions src/meta/service/src/api/grpc/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
Expand Down Expand Up @@ -40,7 +41,9 @@ use common_meta_types::TxnReply;
use common_meta_types::TxnRequest;
use common_metrics::counter::Count;
use common_tracing::func_name;
use futures::stream::TryChunksError;
use futures::StreamExt;
use futures::TryStreamExt;
use log::debug;
use log::info;
use minitrace::prelude::*;
Expand Down Expand Up @@ -194,7 +197,7 @@ impl MetaService for MetaServiceImpl {
let elapsed = t0.elapsed();
info!("Handled(elapsed: {:?}) MetaGrpcReq: {:?}", elapsed, req);

Ok::<_, tonic::Status>(reply)
Ok::<_, Status>(reply)
}
.in_span(root)
.await?;
Expand Down Expand Up @@ -243,30 +246,34 @@ impl MetaService for MetaServiceImpl {
.await
}

type ExportStream =
Pin<Box<dyn Stream<Item = Result<ExportedChunk, tonic::Status>> + Send + Sync + 'static>>;
type ExportStream = Pin<Box<dyn Stream<Item = Result<ExportedChunk, Status>> + Send + 'static>>;

// Export all meta data.
//
// Including raft hard state, logs and state machine.
// The exported data is a list of json strings in form of `(tree_name, sub_tree_prefix, key, value)`.
/// Export all meta data.
///
/// Including header, raft state, logs and state machine.
/// The exported data is a series of JSON encoded strings of `RaftStoreEntry`.
async fn export(
&self,
_request: Request<common_meta_types::protobuf::Empty>,
) -> Result<Response<Self::ExportStream>, Status> {
let _guard = RequestInFlight::guard();

let meta_node = &self.meta_node;
let res = meta_node.sto.export().await?;
let strm = meta_node.sto.inner().export();

let stream = ExportStream { data: res };
let s = stream.map(|strings| Ok(ExportedChunk { data: strings }));
let chunk_size = 32;
// - Chunk up upto 32 Ok items inside a Vec<String>;
// - Convert Vec<String> to ExportedChunk;
// - Convert TryChunkError<_, io::Error> to Status;
let s = strm
.try_chunks(chunk_size)
.map_ok(|chunk: Vec<String>| ExportedChunk { data: chunk })
.map_err(|e: TryChunksError<_, io::Error>| Status::internal(e.1.to_string()));

Ok(Response::new(Box::pin(s)))
}

type WatchStream =
Pin<Box<dyn Stream<Item = Result<WatchResponse, tonic::Status>> + Send + Sync + 'static>>;
type WatchStream = Pin<Box<dyn Stream<Item = Result<WatchResponse, Status>> + Send + 'static>>;

#[minitrace::trace]
async fn watch(
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(generators)]
#![allow(clippy::uninlined_format_args)]

pub mod api;
Expand Down
4 changes: 4 additions & 0 deletions src/meta/service/src/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ impl RaftStore {
let sto = StoreInner::open_create(config, open, create).await?;
Ok(Self::new(sto))
}

pub fn inner(&self) -> Arc<StoreInner> {
self.inner.clone()
}
}

impl Deref for RaftStore {
Expand Down
35 changes: 14 additions & 21 deletions src/meta/service/src/store/store_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,27 +387,27 @@ impl StoreInner {
}

/// Export data that can be used to restore a meta-service node.
#[minitrace::trace]
pub async fn export(&self) -> Result<Vec<String>, io::Error> {
// Convert to io::Error(InvalidData).
///
/// Returns a `BoxStream<'a, Result<String, io::Error>>` that yields a series of JSON strings.
#[futures_async_stream::try_stream(boxed, ok = String, error = io::Error)]
pub async fn export(self: Arc<StoreInner>) {
// Convert an error occurred during export to `io::Error(InvalidData)`.
fn invalid_data(e: impl std::error::Error + Send + Sync + 'static) -> io::Error {
io::Error::new(ErrorKind::InvalidData, e)
}

// Lock all components so that we have a consistent view.
// Lock all data components so that we have a consistent view.
//
// Hold the snapshot lock to prevent snapshot from being replaced until exporting finished.
// Holding this lock prevent logs from being purged.
//
// Although vote and log must be consistent,
// it is OK to export RaftState and logs without transaction protection,
// it is OK to export RaftState and logs without transaction protection(i.e. they do not share a lock),
// if it guarantees no logs have a greater `vote` than `RaftState.HardState`.
let current_snapshot = self.current_snapshot.read().await;
let raft_state = self.raft_state.read().await;
let log = self.log.read().await;

let mut res = vec![];

// Export data header first
{
let header_tree = SledTree::open(&self.db, TREE_HEADER, false).map_err(invalid_data)?;
Expand All @@ -416,7 +416,7 @@ impl StoreInner {

for kv in header_kvs.iter() {
let line = vec_kv_to_json(TREE_HEADER, kv)?;
res.push(line);
yield line;
}
}

Expand All @@ -435,8 +435,7 @@ impl StoreInner {
};

let s = serde_json::to_string(&(tree_name, ent_id)).map_err(invalid_data)?;

res.push(s);
yield s;
}

let vote = ks.get(&RaftStateKey::HardState)?.map(Vote::from);
Expand All @@ -448,8 +447,7 @@ impl StoreInner {
};

let s = serde_json::to_string(&(tree_name, ent_vote)).map_err(invalid_data)?;

res.push(s);
yield s;
}

let committed = ks
Expand All @@ -462,8 +460,7 @@ impl StoreInner {
};

let s = serde_json::to_string(&(tree_name, ent_committed)).map_err(invalid_data)?;

res.push(s);
yield s;
};

// Export logs that has smaller or equal leader id as `vote`
Expand All @@ -477,8 +474,7 @@ impl StoreInner {

let tree_kv = (tree_name, kv_entry);
let line = serde_json::to_string(&tree_kv).map_err(invalid_data)?;

res.push(line);
yield line;
}
}

Expand Down Expand Up @@ -507,14 +503,11 @@ impl StoreInner {

let named_entry = (tree_name, ent);

let l = serde_json::to_string(&named_entry).map_err(invalid_data)?;

res.push(l);
let line = serde_json::to_string(&named_entry).map_err(invalid_data)?;
yield line;
}
}
}

Ok(res)
}

pub async fn get_node(&self, node_id: &NodeId) -> Option<Node> {
Expand Down

0 comments on commit 0ded016

Please sign in to comment.