Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: meta-service: re-implement export with Stream #13097

Merged
merged 1 commit into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ lazy_static = "1.4.0"
# future and async
futures = "0.3.24"
futures-util = "0.3.24"
futures-async-stream = { version = "0.2.7" }
stream-more = "0.1.3"
bytes = "1.5.0"

Expand Down
10 changes: 7 additions & 3 deletions src/binaries/metactl/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::anyhow;
use common_base::base::tokio;
Expand Down Expand Up @@ -55,6 +56,7 @@ use common_meta_types::NodeId;
use common_meta_types::StoredMembership;
use databend_meta::store::RaftStore;
use databend_meta::store::StoreInner;
use futures::TryStreamExt;
use tokio::net::TcpSocket;
use url::Url;

Expand Down Expand Up @@ -467,7 +469,7 @@ async fn export_from_dir(config: &Config) -> anyhow::Result<()> {
let raft_config: RaftConfig = config.raft_config.clone().into();

let sto_inn = StoreInner::open_create(&raft_config, Some(()), None).await?;
let lines = sto_inn.export().await?;
let mut lines = Arc::new(sto_inn).export();

eprintln!(" From: {}", raft_config.raft_dir);

Expand All @@ -479,9 +481,11 @@ async fn export_from_dir(config: &Config) -> anyhow::Result<()> {
None
};

let cnt = lines.len();
let mut cnt = 0;

while let Some(line) = lines.try_next().await? {
cnt += 1;

for line in lines {
if file.as_ref().is_none() {
println!("{}", line);
} else {
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ backon = "0.4"
clap = { workspace = true }
derive_more = { workspace = true }
futures = "0.3.24"
futures-async-stream = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
Expand Down
31 changes: 19 additions & 12 deletions src/meta/service/src/api/grpc/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
Expand Down Expand Up @@ -40,7 +41,9 @@ use common_meta_types::TxnReply;
use common_meta_types::TxnRequest;
use common_metrics::counter::Count;
use common_tracing::func_name;
use futures::stream::TryChunksError;
use futures::StreamExt;
use futures::TryStreamExt;
use log::debug;
use log::info;
use minitrace::prelude::*;
Expand Down Expand Up @@ -194,7 +197,7 @@ impl MetaService for MetaServiceImpl {
let elapsed = t0.elapsed();
info!("Handled(elapsed: {:?}) MetaGrpcReq: {:?}", elapsed, req);

Ok::<_, tonic::Status>(reply)
Ok::<_, Status>(reply)
}
.in_span(root)
.await?;
Expand Down Expand Up @@ -243,30 +246,34 @@ impl MetaService for MetaServiceImpl {
.await
}

type ExportStream =
Pin<Box<dyn Stream<Item = Result<ExportedChunk, tonic::Status>> + Send + Sync + 'static>>;
type ExportStream = Pin<Box<dyn Stream<Item = Result<ExportedChunk, Status>> + Send + 'static>>;

// Export all meta data.
//
// Including raft hard state, logs and state machine.
// The exported data is a list of json strings in form of `(tree_name, sub_tree_prefix, key, value)`.
/// Export all meta data.
///
/// Including header, raft state, logs and state machine.
/// The exported data is a series of JSON encoded strings of `RaftStoreEntry`.
async fn export(
&self,
_request: Request<common_meta_types::protobuf::Empty>,
) -> Result<Response<Self::ExportStream>, Status> {
let _guard = RequestInFlight::guard();

let meta_node = &self.meta_node;
let res = meta_node.sto.export().await?;
let strm = meta_node.sto.inner().export();

let stream = ExportStream { data: res };
let s = stream.map(|strings| Ok(ExportedChunk { data: strings }));
let chunk_size = 32;
// - Chunk up upto 32 Ok items inside a Vec<String>;
// - Convert Vec<String> to ExportedChunk;
// - Convert TryChunkError<_, io::Error> to Status;
let s = strm
.try_chunks(chunk_size)
.map_ok(|chunk: Vec<String>| ExportedChunk { data: chunk })
.map_err(|e: TryChunksError<_, io::Error>| Status::internal(e.1.to_string()));

Ok(Response::new(Box::pin(s)))
}

type WatchStream =
Pin<Box<dyn Stream<Item = Result<WatchResponse, tonic::Status>> + Send + Sync + 'static>>;
type WatchStream = Pin<Box<dyn Stream<Item = Result<WatchResponse, Status>> + Send + 'static>>;

#[minitrace::trace]
async fn watch(
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(generators)]
#![allow(clippy::uninlined_format_args)]

pub mod api;
Expand Down
4 changes: 4 additions & 0 deletions src/meta/service/src/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ impl RaftStore {
let sto = StoreInner::open_create(config, open, create).await?;
Ok(Self::new(sto))
}

pub fn inner(&self) -> Arc<StoreInner> {
self.inner.clone()
}
}

impl Deref for RaftStore {
Expand Down
35 changes: 14 additions & 21 deletions src/meta/service/src/store/store_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,27 +387,27 @@ impl StoreInner {
}

/// Export data that can be used to restore a meta-service node.
#[minitrace::trace]
pub async fn export(&self) -> Result<Vec<String>, io::Error> {
// Convert to io::Error(InvalidData).
///
/// Returns a `BoxStream<'a, Result<String, io::Error>>` that yields a series of JSON strings.
#[futures_async_stream::try_stream(boxed, ok = String, error = io::Error)]
pub async fn export(self: Arc<StoreInner>) {
// Convert an error occurred during export to `io::Error(InvalidData)`.
fn invalid_data(e: impl std::error::Error + Send + Sync + 'static) -> io::Error {
io::Error::new(ErrorKind::InvalidData, e)
}

// Lock all components so that we have a consistent view.
// Lock all data components so that we have a consistent view.
//
// Hold the snapshot lock to prevent snapshot from being replaced until exporting finished.
// Holding this lock prevent logs from being purged.
//
// Although vote and log must be consistent,
// it is OK to export RaftState and logs without transaction protection,
// it is OK to export RaftState and logs without transaction protection(i.e. they do not share a lock),
// if it guarantees no logs have a greater `vote` than `RaftState.HardState`.
let current_snapshot = self.current_snapshot.read().await;
let raft_state = self.raft_state.read().await;
let log = self.log.read().await;

let mut res = vec![];

// Export data header first
{
let header_tree = SledTree::open(&self.db, TREE_HEADER, false).map_err(invalid_data)?;
Expand All @@ -416,7 +416,7 @@ impl StoreInner {

for kv in header_kvs.iter() {
let line = vec_kv_to_json(TREE_HEADER, kv)?;
res.push(line);
yield line;
}
}

Expand All @@ -435,8 +435,7 @@ impl StoreInner {
};

let s = serde_json::to_string(&(tree_name, ent_id)).map_err(invalid_data)?;

res.push(s);
yield s;
}

let vote = ks.get(&RaftStateKey::HardState)?.map(Vote::from);
Expand All @@ -448,8 +447,7 @@ impl StoreInner {
};

let s = serde_json::to_string(&(tree_name, ent_vote)).map_err(invalid_data)?;

res.push(s);
yield s;
}

let committed = ks
Expand All @@ -462,8 +460,7 @@ impl StoreInner {
};

let s = serde_json::to_string(&(tree_name, ent_committed)).map_err(invalid_data)?;

res.push(s);
yield s;
};

// Export logs that has smaller or equal leader id as `vote`
Expand All @@ -477,8 +474,7 @@ impl StoreInner {

let tree_kv = (tree_name, kv_entry);
let line = serde_json::to_string(&tree_kv).map_err(invalid_data)?;

res.push(line);
yield line;
}
}

Expand Down Expand Up @@ -507,14 +503,11 @@ impl StoreInner {

let named_entry = (tree_name, ent);

let l = serde_json::to_string(&named_entry).map_err(invalid_data)?;

res.push(l);
let line = serde_json::to_string(&named_entry).map_err(invalid_data)?;
yield line;
}
}
}

Ok(res)
}

pub async fn get_node(&self, node_id: &NodeId) -> Option<Node> {
Expand Down
Loading