Skip to content

Commit

Permalink
refactor: make ListKV non-blocking by returning stream directly (#16868)
Browse files Browse the repository at this point in the history
perf: make ListKV non-blocking by returning stream directly

Return Stream to client instead of collecting results into Vec first.
This change eliminates unnecessary buffering of ListKV results in
databend-meta server, reducing memory usage and improving response
times. The direct streaming approach better matches current usage
patterns and removes legacy collection behavior.

Although a large ListKV response body won't block the databend-meta
server, but it will still block on the client side.
  • Loading branch information
drmingdrmer authored Nov 19, 2024
1 parent ed6a0c7 commit 357404c
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions src/meta/raft-store/src/state_machine_api_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,6 @@ pub trait StateMachineApiExt: StateMachineApi {
future::ready(Ok(res))
});

// Make it static

let vs = strm.collect::<Vec<_>>().await;
let strm = futures::stream::iter(vs);

Ok(strm.boxed())
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ semver = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serfig = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tonic-reflection = { workspace = true }
Expand Down
16 changes: 10 additions & 6 deletions src/meta/service/src/meta_service/meta_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ use databend_common_meta_types::MetaOperationError;
use databend_common_meta_types::Node;
use databend_common_metrics::count::Count;
use futures::StreamExt;
use futures::TryStreamExt;
use log::debug;
use log::info;
use maplit::btreemap;
use maplit::btreeset;
use tonic::codegen::BoxStream;
use tonic::Status;

use crate::message::ForwardRequest;
use crate::message::ForwardRequestBody;
Expand Down Expand Up @@ -147,12 +149,14 @@ impl<'a> Handler<MetaGrpcReadReq> for MetaLeader<'a> {
}

MetaGrpcReadReq::ListKV(req) => {
// safe unwrap(): Infallible
let kvs = kv_api.prefix_list_kv(&req.prefix).await.unwrap();

let kv_iter = kvs.into_iter().map(|kv| Ok(StreamItem::from(kv)));

let strm = futures::stream::iter(kv_iter);
let strm =
kv_api.list_kv(&req.prefix).await.map_err(|e| {
MetaOperationError::DataError(MetaDataError::ReadError(
MetaDataReadError::new("list_kv", &req.prefix, &e),
))
})?;

let strm = strm.map_err(|e| Status::internal(e.to_string()));

Ok(strm.boxed())
}
Expand Down

0 comments on commit 357404c

Please sign in to comment.