Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Replace KVApi::get() and KVApi::mget_kv() with get_kv_stream() #14255

Merged
merged 2 commits into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 4 additions & 16 deletions src/meta/client/src/kv_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
// limitations under the License.

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::GetKVReply;
use databend_common_meta_kvapi::kvapi::GetKVReq;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::ListKVReq;
use databend_common_meta_kvapi::kvapi::MGetKVReply;
use databend_common_meta_kvapi::kvapi::MGetKVReq;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
Expand All @@ -41,20 +38,11 @@ impl kvapi::KVApi for ClientHandle {
}

#[minitrace::trace]
async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
let reply = self
.request(GetKVReq {
key: key.to_string(),
})
.await?;
Ok(reply)
}

#[minitrace::trace]
async fn mget_kv(&self, keys: &[String]) -> Result<MGetKVReply, Self::Error> {
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
let keys = keys.to_vec();
let reply = self.request(MGetKVReq { keys }).await?;
Ok(reply)
let strm = self.request(Streamed(MGetKVReq { keys })).await?;
let strm = strm.map_err(MetaError::from);
Ok(strm.boxed())
}

#[minitrace::trace]
Expand Down
5 changes: 3 additions & 2 deletions src/meta/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
/// ```
pub static MIN_METASRV_SEMVER: Version = Version {
major: 1,
minor: 1,
patch: 32,
minor: 2,
// [1.2.163, 1.2.226) are removed from release download, due to some known bugs found in these versions.
patch: 226,
pre: Prerelease::EMPTY,
build: BuildMetadata::EMPTY,
};
Expand Down
12 changes: 2 additions & 10 deletions src/meta/embedded/src/kv_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

use async_trait::async_trait;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::GetKVReply;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::MGetKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
pub use databend_common_meta_sled_store::init_temp_sled_db;
Expand All @@ -37,15 +35,9 @@ impl kvapi::KVApi for MetaEmbedded {
}

#[minitrace::trace]
async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
let sm = self.inner.lock().await;
sm.get_kv(key).await
}

#[minitrace::trace]
async fn mget_kv(&self, key: &[String]) -> Result<MGetKVReply, Self::Error> {
let sm = self.inner.lock().await;
sm.mget_kv(key).await
sm.get_kv_stream(keys).await
}

#[minitrace::trace]
Expand Down
61 changes: 44 additions & 17 deletions src/meta/kvapi/src/kvapi/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
use std::ops::Deref;

use async_trait::async_trait;
use databend_common_meta_types::errors;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use log::debug;

Expand Down Expand Up @@ -51,24 +53,57 @@ pub trait KVApi: Send + Sync {
/// Depends on the implementation the error could be different.
/// E.g., a remove kvapi::KVApi impl returns network error or remote storage error.
/// A local kvapi::KVApi impl just returns storage error.
type Error: std::error::Error + Send + Sync + 'static;
type Error: std::error::Error + From<errors::IncompleteStream> + Send + Sync + 'static;

/// Update or insert a key-value record.
async fn upsert_kv(&self, req: UpsertKVReq) -> Result<UpsertKVReply, Self::Error>;

/// Get a key-value record by key.
async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error>;
// TODO: #[deprecated(note = "use get_kv_stream() instead")]
async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
let mut strm = self.get_kv_stream(&[key.to_string()]).await?;

let strm_item = strm
.next()
.await
.ok_or_else(|| errors::IncompleteStream::new(1, 0).context(" while get_kv"))??;

let reply = strm_item.value.map(SeqV::from);

Ok(reply)
}

/// Get several key-values by keys.
async fn mget_kv(&self, keys: &[String]) -> Result<MGetKVReply, Self::Error>;
// TODO: #[deprecated(note = "use get_kv_stream() instead")]
async fn mget_kv(&self, keys: &[String]) -> Result<MGetKVReply, Self::Error> {
let n = keys.len();
let mut strm = self.get_kv_stream(keys).await?;
let mut seq_values = Vec::with_capacity(n);

while let Some(item) = strm.try_next().await? {
let item = item.value.map(SeqV::from);
seq_values.push(item);
}
if seq_values.len() != n {
return Err(
errors::IncompleteStream::new(n as u64, seq_values.len() as u64)
.context(" while mget_kv")
.into(),
);
}

Ok(seq_values)
}

/// Get key-values by keys.
///
/// 2024-01-06: since: TODO
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error>;

/// List key-value records that are starts with the specified prefix.
///
/// Same as `prefix_list_kv()`, except it returns a stream.
async fn list_kv(
&self,
prefix: &str,
) -> Result<BoxStream<'static, Result<StreamItem, Self::Error>>, Self::Error>;
async fn list_kv(&self, prefix: &str) -> Result<KVStream<Self::Error>, Self::Error>;

// TODO: deprecate it:
// #[deprecated(note = "use list_kv() instead")]
Expand Down Expand Up @@ -104,22 +139,14 @@ impl<U: kvapi::KVApi, T: Deref<Target = U> + Send + Sync> kvapi::KVApi for T {
self.deref().upsert_kv(act).await
}

async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
self.deref().get_kv(key).await
}

async fn mget_kv(&self, key: &[String]) -> Result<MGetKVReply, Self::Error> {
self.deref().mget_kv(key).await
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
self.deref().get_kv_stream(keys).await
}

async fn list_kv(&self, prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
self.deref().list_kv(prefix).await
}

async fn prefix_list_kv(&self, prefix: &str) -> Result<ListKVReply, Self::Error> {
self.deref().prefix_list_kv(prefix).await
}

async fn transaction(&self, txn: TxnRequest) -> Result<TxnReply, Self::Error> {
self.deref().transaction(txn).await
}
Expand Down
18 changes: 4 additions & 14 deletions src/meta/raft-store/src/sm_v002/sm_v002.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use std::io;
use std::sync::Arc;

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::GetKVReply;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::MGetKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf::StreamItem;
Expand Down Expand Up @@ -80,26 +78,18 @@ impl<'a> kvapi::KVApi for SMV002KVApi<'a> {
unreachable!("write operation SM2KVApi::upsert_kv is disabled")
}

async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
let got = self.sm.get_maybe_expired_kv(key).await?;

let local_now_ms = SeqV::<()>::now_ms();
let got = Self::non_expired(got, local_now_ms);
Ok(got)
}

async fn mget_kv(&self, keys: &[String]) -> Result<MGetKVReply, Self::Error> {
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
let local_now_ms = SeqV::<()>::now_ms();

let mut values = Vec::with_capacity(keys.len());
let mut items = Vec::with_capacity(keys.len());

for k in keys {
let got = self.sm.get_maybe_expired_kv(k.as_str()).await?;
let v = Self::non_expired(got, local_now_ms);
values.push(v);
items.push(Ok(StreamItem::from((k.clone(), v))));
}

Ok(values)
Ok(futures::stream::iter(items).boxed())
}

async fn list_kv(&self, prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
Expand Down
24 changes: 6 additions & 18 deletions src/meta/raft-store/src/state_machine/sm_kv_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
// limitations under the License.

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::GetKVReply;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::MGetKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf::StreamItem;
Expand All @@ -27,7 +25,6 @@ use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::UpsertKV;
use futures_util::StreamExt;
use log::debug;

use crate::state_machine::StateMachine;

Expand Down Expand Up @@ -74,28 +71,19 @@ impl kvapi::KVApi for StateMachine {
}
}

async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
let sv = self.kvs().get(&key.to_string())?;
debug!("get_kv sv:{:?}", sv);

let local_now_ms = SeqV::<()>::now_ms();
let (_expired, res) = Self::expire_seq_v(sv, local_now_ms);
Ok(res)
}

async fn mget_kv(&self, keys: &[String]) -> Result<MGetKVReply, Self::Error> {
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
let kvs = self.kvs();
let mut res = vec![];
let mut items = vec![];

let local_now_ms = SeqV::<()>::now_ms();

for x in keys.iter() {
let v = kvs.get(x)?;
for k in keys.iter() {
let v = kvs.get(k)?;
let (_, v) = Self::expire_seq_v(v, local_now_ms);
res.push(v)
items.push(Ok(StreamItem::from((k.clone(), v))))
}

Ok(res)
Ok(futures::stream::iter(items).boxed())
}

async fn list_kv(&self, prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
Expand Down
7 changes: 7 additions & 0 deletions src/meta/service/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ pub struct ForwardRequest<T> {
}

impl<T> ForwardRequest<T> {
pub fn new(forward_to_leader: u64, body: T) -> Self {
Self {
forward_to_leader,
body,
}
}

pub fn decr_forward(&mut self) {
self.forward_to_leader -= 1;
}
Expand Down
36 changes: 0 additions & 36 deletions src/meta/service/src/meta_service/meta_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::fmt::Debug;
use std::net::Ipv4Addr;
use std::sync::atomic::AtomicI32;
use std::sync::Arc;
Expand Down Expand Up @@ -50,7 +49,6 @@ use databend_common_meta_types::Endpoint;
use databend_common_meta_types::ForwardRPCError;
use databend_common_meta_types::ForwardToLeader;
use databend_common_meta_types::GrpcConfig;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::LogEntry;
use databend_common_meta_types::LogId;
use databend_common_meta_types::MembershipNode;
Expand Down Expand Up @@ -977,40 +975,6 @@ impl MetaNode {
endpoints
}

#[minitrace::trace]
pub async fn consistent_read<Request, Reply>(&self, req: Request) -> Result<Reply, MetaAPIError>
where
Request: Into<ForwardRequestBody> + Debug,
ForwardResponse: TryInto<Reply>,
<ForwardResponse as TryInto<Reply>>::Error: std::fmt::Display,
{
let res = self
.handle_forwardable_request(ForwardRequest {
forward_to_leader: 1,
body: req.into(),
})
.await;

match res {
Err(e) => {
server_metrics::incr_read_failed();
Err(e)
}
Ok(res) => {
let res: Reply = res.try_into().map_err(|e| {
server_metrics::incr_read_failed();
let invalid_reply = InvalidReply::new(
format!("expect reply type to be {}", std::any::type_name::<Reply>(),),
&AnyError::error(e),
);
MetaNetworkError::from(invalid_reply)
})?;

Ok(res)
}
}
}

#[minitrace::trace]
pub async fn handle_forwardable_request<Req>(
&self,
Expand Down
Loading
Loading