Skip to content

Commit

Permalink
refactor: Replace KVApi::get() and KVApi::mget_kv() with `get_kv_…
Browse files Browse the repository at this point in the history
…stream()`

- **Compatibility Update**: Starting with this version of
  databend-query, the minimum required version of the meta-service has
  been updated from 1.1.32 to 1.2.226 as of 2023-11-26.

  Note that the min compatible is 1.2.163 but `[1.2.163, 1.2.226)` are
  removed from release download, due to some known bugs found in these
  versions.

- The `KVApi::get()` and `KVApi::mget_kv()` functions, which previously
  performed similar operations, have been merged into `get_kv_stream()`,
  which returns a stream of key-value pairs.

  In trait `KVApi`, `get_kv()` and `mget_kv()` are replaced with default
  implementation for backward compatibility.

  Additionally, we have introduced a new error type, `IncompleteStream`,
  to handle cases where the stream does not complete as expected.
  • Loading branch information
drmingdrmer committed Jan 7, 2024
1 parent 989fb49 commit bfa7591
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 155 deletions.
20 changes: 4 additions & 16 deletions src/meta/client/src/kv_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
// limitations under the License.

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::GetKVReply;
use databend_common_meta_kvapi::kvapi::GetKVReq;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::ListKVReq;
use databend_common_meta_kvapi::kvapi::MGetKVReply;
use databend_common_meta_kvapi::kvapi::MGetKVReq;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
Expand All @@ -41,20 +38,11 @@ impl kvapi::KVApi for ClientHandle {
}

#[minitrace::trace]
async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
let reply = self
.request(GetKVReq {
key: key.to_string(),
})
.await?;
Ok(reply)
}

#[minitrace::trace]
async fn mget_kv(&self, keys: &[String]) -> Result<MGetKVReply, Self::Error> {
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
let keys = keys.to_vec();
let reply = self.request(MGetKVReq { keys }).await?;
Ok(reply)
let strm = self.request(Streamed(MGetKVReq { keys })).await?;
let strm = strm.map_err(MetaError::from);
Ok(strm.boxed())
}

#[minitrace::trace]
Expand Down
4 changes: 2 additions & 2 deletions src/meta/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
/// ```
pub static MIN_METASRV_SEMVER: Version = Version {
major: 1,
minor: 1,
patch: 32,
minor: 2,
patch: 163,
pre: Prerelease::EMPTY,
build: BuildMetadata::EMPTY,
};
Expand Down
12 changes: 2 additions & 10 deletions src/meta/embedded/src/kv_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

use async_trait::async_trait;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::GetKVReply;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::MGetKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
pub use databend_common_meta_sled_store::init_temp_sled_db;
Expand All @@ -37,15 +35,9 @@ impl kvapi::KVApi for MetaEmbedded {
}

#[minitrace::trace]
async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
let sm = self.inner.lock().await;
sm.get_kv(key).await
}

#[minitrace::trace]
async fn mget_kv(&self, key: &[String]) -> Result<MGetKVReply, Self::Error> {
let sm = self.inner.lock().await;
sm.mget_kv(key).await
sm.get_kv_stream(keys).await
}

#[minitrace::trace]
Expand Down
61 changes: 44 additions & 17 deletions src/meta/kvapi/src/kvapi/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
use std::ops::Deref;

use async_trait::async_trait;
use databend_common_meta_types::errors;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use log::debug;

Expand Down Expand Up @@ -51,24 +53,57 @@ pub trait KVApi: Send + Sync {
/// Depends on the implementation the error could be different.
/// E.g., a remove kvapi::KVApi impl returns network error or remote storage error.
/// A local kvapi::KVApi impl just returns storage error.
type Error: std::error::Error + Send + Sync + 'static;
type Error: std::error::Error + From<errors::IncompleteStream> + Send + Sync + 'static;

/// Update or insert a key-value record.
async fn upsert_kv(&self, req: UpsertKVReq) -> Result<UpsertKVReply, Self::Error>;

/// Get a key-value record by key.
async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error>;
// TODO: #[deprecated(note = "use get_kv_stream() instead")]
async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
let mut strm = self.get_kv_stream(&[key.to_string()]).await?;

let strm_item = strm
.next()
.await
.ok_or_else(|| errors::IncompleteStream::new(1, 0).context(" while get_kv"))??;

let reply = strm_item.value.map(SeqV::from);

Ok(reply)
}

/// Get several key-values by keys.
async fn mget_kv(&self, keys: &[String]) -> Result<MGetKVReply, Self::Error>;
// TODO: #[deprecated(note = "use get_kv_stream() instead")]
async fn mget_kv(&self, keys: &[String]) -> Result<MGetKVReply, Self::Error> {
let n = keys.len();
let mut strm = self.get_kv_stream(keys).await?;
let mut seq_values = Vec::with_capacity(n);

while let Some(item) = strm.try_next().await? {
let item = item.value.map(SeqV::from);
seq_values.push(item);
}
if seq_values.len() != n {
return Err(
errors::IncompleteStream::new(n as u64, seq_values.len() as u64)
.context(" while mget_kv")
.into(),
);
}

Ok(seq_values)
}

/// Get key-values by keys.
///
/// 2024-01-06: since: TODO
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error>;

/// List key-value records that are starts with the specified prefix.
///
/// Same as `prefix_list_kv()`, except it returns a stream.
async fn list_kv(
&self,
prefix: &str,
) -> Result<BoxStream<'static, Result<StreamItem, Self::Error>>, Self::Error>;
async fn list_kv(&self, prefix: &str) -> Result<KVStream<Self::Error>, Self::Error>;

// TODO: deprecate it:
// #[deprecated(note = "use list_kv() instead")]
Expand Down Expand Up @@ -104,22 +139,14 @@ impl<U: kvapi::KVApi, T: Deref<Target = U> + Send + Sync> kvapi::KVApi for T {
self.deref().upsert_kv(act).await
}

async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
self.deref().get_kv(key).await
}

async fn mget_kv(&self, key: &[String]) -> Result<MGetKVReply, Self::Error> {
self.deref().mget_kv(key).await
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
self.deref().get_kv_stream(keys).await
}

async fn list_kv(&self, prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
self.deref().list_kv(prefix).await
}

async fn prefix_list_kv(&self, prefix: &str) -> Result<ListKVReply, Self::Error> {
self.deref().prefix_list_kv(prefix).await
}

async fn transaction(&self, txn: TxnRequest) -> Result<TxnReply, Self::Error> {
self.deref().transaction(txn).await
}
Expand Down
18 changes: 4 additions & 14 deletions src/meta/raft-store/src/sm_v002/sm_v002.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use std::io;
use std::sync::Arc;

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::GetKVReply;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::MGetKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf::StreamItem;
Expand Down Expand Up @@ -80,26 +78,18 @@ impl<'a> kvapi::KVApi for SMV002KVApi<'a> {
unreachable!("write operation SM2KVApi::upsert_kv is disabled")
}

async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
let got = self.sm.get_maybe_expired_kv(key).await?;

let local_now_ms = SeqV::<()>::now_ms();
let got = Self::non_expired(got, local_now_ms);
Ok(got)
}

async fn mget_kv(&self, keys: &[String]) -> Result<MGetKVReply, Self::Error> {
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
let local_now_ms = SeqV::<()>::now_ms();

let mut values = Vec::with_capacity(keys.len());
let mut items = Vec::with_capacity(keys.len());

for k in keys {
let got = self.sm.get_maybe_expired_kv(k.as_str()).await?;
let v = Self::non_expired(got, local_now_ms);
values.push(v);
items.push(Ok(StreamItem::from((k.clone(), v))));
}

Ok(values)
Ok(futures::stream::iter(items).boxed())
}

async fn list_kv(&self, prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
Expand Down
24 changes: 6 additions & 18 deletions src/meta/raft-store/src/state_machine/sm_kv_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
// limitations under the License.

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::GetKVReply;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::MGetKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf::StreamItem;
Expand All @@ -27,7 +25,6 @@ use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::UpsertKV;
use futures_util::StreamExt;
use log::debug;

use crate::state_machine::StateMachine;

Expand Down Expand Up @@ -74,28 +71,19 @@ impl kvapi::KVApi for StateMachine {
}
}

async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
let sv = self.kvs().get(&key.to_string())?;
debug!("get_kv sv:{:?}", sv);

let local_now_ms = SeqV::<()>::now_ms();
let (_expired, res) = Self::expire_seq_v(sv, local_now_ms);
Ok(res)
}

async fn mget_kv(&self, keys: &[String]) -> Result<MGetKVReply, Self::Error> {
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
let kvs = self.kvs();
let mut res = vec![];
let mut items = vec![];

let local_now_ms = SeqV::<()>::now_ms();

for x in keys.iter() {
let v = kvs.get(x)?;
for k in keys.iter() {
let v = kvs.get(k)?;
let (_, v) = Self::expire_seq_v(v, local_now_ms);
res.push(v)
items.push(Ok(StreamItem::from((k.clone(), v))))
}

Ok(res)
Ok(futures::stream::iter(items).boxed())
}

async fn list_kv(&self, prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
Expand Down
7 changes: 7 additions & 0 deletions src/meta/service/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ pub struct ForwardRequest<T> {
}

impl<T> ForwardRequest<T> {
pub fn new(forward_to_leader: u64, body: T) -> Self {
Self {
forward_to_leader,
body,
}
}

pub fn decr_forward(&mut self) {
self.forward_to_leader -= 1;
}
Expand Down
36 changes: 0 additions & 36 deletions src/meta/service/src/meta_service/meta_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::fmt::Debug;
use std::net::Ipv4Addr;
use std::sync::atomic::AtomicI32;
use std::sync::Arc;
Expand Down Expand Up @@ -50,7 +49,6 @@ use databend_common_meta_types::Endpoint;
use databend_common_meta_types::ForwardRPCError;
use databend_common_meta_types::ForwardToLeader;
use databend_common_meta_types::GrpcConfig;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::LogEntry;
use databend_common_meta_types::LogId;
use databend_common_meta_types::MembershipNode;
Expand Down Expand Up @@ -977,40 +975,6 @@ impl MetaNode {
endpoints
}

#[minitrace::trace]
pub async fn consistent_read<Request, Reply>(&self, req: Request) -> Result<Reply, MetaAPIError>
where
Request: Into<ForwardRequestBody> + Debug,
ForwardResponse: TryInto<Reply>,
<ForwardResponse as TryInto<Reply>>::Error: std::fmt::Display,
{
let res = self
.handle_forwardable_request(ForwardRequest {
forward_to_leader: 1,
body: req.into(),
})
.await;

match res {
Err(e) => {
server_metrics::incr_read_failed();
Err(e)
}
Ok(res) => {
let res: Reply = res.try_into().map_err(|e| {
server_metrics::incr_read_failed();
let invalid_reply = InvalidReply::new(
format!("expect reply type to be {}", std::any::type_name::<Reply>(),),
&AnyError::error(e),
);
MetaNetworkError::from(invalid_reply)
})?;

Ok(res)
}
}
}

#[minitrace::trace]
pub async fn handle_forwardable_request<Req>(
&self,
Expand Down
Loading

0 comments on commit bfa7591

Please sign in to comment.