Skip to content

Commit

Permalink
refactor: Add KVPbApi::list_pb(), simplify UdfMgr::get_udfs()
Browse files Browse the repository at this point in the history
`KVPbApi::list_pb()` lists key value pairs by a prefix `DirName<K>`, and
decodes key back to `kvapi::Key` and values back to `kvapi::Key::ValueType`.

`UdfMgr::get_udfs()` is simplified by using `list_pb()`.
  • Loading branch information
drmingdrmer committed Feb 6, 2024
1 parent d8713c6 commit ce4b569
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

103 changes: 102 additions & 1 deletion src/meta/api/src/kv_pb_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@
use std::future::Future;

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::NonEmptyItem;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaNetworkError;
use databend_common_meta_types::SeqV;
use databend_common_proto_conv::FromToProto;
use databend_common_proto_conv::Incompatible;
use futures::future::FutureExt;
use futures::stream::BoxStream;
use futures::stream::StreamExt;
use futures::TryStreamExt;
use PbApiReadError::KvApiError;

/// An error occurs when decoding protobuf encoded value.
Expand All @@ -35,12 +42,30 @@ pub enum PbDecodeError {
Incompatible(#[from] Incompatible),
}

/// An error occurs when found an unexpected None value.
#[derive(Clone, Debug, PartialEq, thiserror::Error)]
#[error("NoneValue: unexpected None value of key: '{key}'")]
pub struct NoneValue {
key: String,
}

impl NoneValue {
pub fn new(key: impl ToString) -> Self {
NoneValue {
key: key.to_string(),
}
}
}

/// An error occurs when reading protobuf encoded value from kv store.
#[derive(Clone, Debug, PartialEq, thiserror::Error)]
#[error("PbApiReadError: {0}")]
pub enum PbApiReadError<E> {
DecodeError(#[from] prost::DecodeError),
Incompatible(#[from] Incompatible),
KeyError(#[from] kvapi::KeyError),
NoneValue(#[from] NoneValue),
/// Error returned from KVApi.
KvApiError(E),
}

Expand Down Expand Up @@ -70,6 +95,16 @@ impl From<PbApiReadError<MetaError>> for MetaError {
let net_err = MetaNetworkError::InvalidReply(inv);
MetaError::NetworkError(net_err)
}
PbApiReadError::KeyError(e) => {
let inv = InvalidReply::new("", &e);
let net_err = MetaNetworkError::InvalidReply(inv);
MetaError::NetworkError(net_err)
}
PbApiReadError::NoneValue(e) => {
let inv = InvalidReply::new("", &e);
let net_err = MetaNetworkError::InvalidReply(inv);
MetaError::NetworkError(net_err)
}
KvApiError(e) => e,
}
}
Expand Down Expand Up @@ -110,11 +145,77 @@ pub trait KVPbApi: KVApi {
Ok(v)
}
}
// TODO: add list

/// List protobuf encoded values by prefix and returns a stream.
///
/// The returned value is decoded by `FromToProto`.
/// It returns the same error as `KVApi::Error`,
/// thus it requires KVApi::Error can describe a decoding error, i.e., `impl From<PbApiReadError>`.
fn list_pb<K>(
&self,
prefix: &DirName<K>,
) -> impl Future<
Output = Result<BoxStream<'static, Result<NonEmptyItem<K>, Self::Error>>, Self::Error>,
> + Send
where
K: kvapi::Key + 'static,
K::ValueType: FromToProto,
Self::Error: From<PbApiReadError<Self::Error>>,
{
self.list_pb_low(prefix).map(|r| match r {
Ok(strm) => Ok(strm.map_err(Self::Error::from).boxed()),
Err(e) => Err(Self::Error::from(e)),
})
}

/// Same as `list_pb` but returns [`PbApiReadError`]. No require of `From<PbApiReadError>` for `Self::Error`.
fn list_pb_low<K>(
&self,
prefix: &DirName<K>,
) -> impl Future<
Output = Result<
BoxStream<'static, Result<NonEmptyItem<K>, PbApiReadError<Self::Error>>>,
PbApiReadError<Self::Error>,
>,
> + Send
where
K: kvapi::Key + 'static,
K::ValueType: FromToProto,
{
let prefix = prefix.to_string_key();
async move {
let strm = self.list_kv(&prefix).await.map_err(KvApiError)?;
let strm = strm.map(decode_non_empty_item::<K, Self::Error>);
Ok(strm.boxed())
}
}
}

impl<T> KVPbApi for T where T: KVApi + ?Sized {}

/// Decode key and protobuf encoded value from `StreamItem`.
///
/// It requires K to be static because it is used in a static stream map()
fn decode_non_empty_item<K, E>(
r: Result<StreamItem, E>,
) -> Result<NonEmptyItem<K>, PbApiReadError<E>>
where
K: kvapi::Key + 'static,
K::ValueType: FromToProto,
{
match r {
Ok(item) => {
let k = K::from_str_key(&item.key)?;

let raw = item.value.ok_or_else(|| NoneValue::new(item.key))?;
let v = decode_seqv::<K::ValueType>(SeqV::from(raw))?;

Ok(NonEmptyItem::new(k, v))
}
Err(e) => Err(KvApiError(e)),
}
}

/// Deserialize SeqV<Vec<u8>> into SeqV<T>, with FromToProto.
fn decode_seqv<T>(seqv: SeqV) -> Result<SeqV<T>, PbDecodeError>
where T: FromToProto {
Expand Down
43 changes: 43 additions & 0 deletions src/meta/kvapi/src/kvapi/item.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_meta_types::SeqV;

use crate::kvapi::Key;

/// Key-Value item contains key and optional value with seq number.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Item<K: Key> {
pub key: K,
pub seqv: Option<SeqV<K::ValueType>>,
}

impl<K: Key> Item<K> {
pub fn new(key: K, seqv: Option<SeqV<K::ValueType>>) -> Self {
Item { key, seqv }
}
}

/// Key-Value item contains key and non-optional value with seq number.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct NonEmptyItem<K: Key> {
pub key: K,
pub seqv: SeqV<K::ValueType>,
}

impl<K: Key> NonEmptyItem<K> {
pub fn new(key: K, seqv: SeqV<K::ValueType>) -> Self {
NonEmptyItem { key, seqv }
}
}
140 changes: 140 additions & 0 deletions src/meta/kvapi/src/kvapi/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,143 @@ impl kvapi::Key for String {
Ok(s.to_string())
}
}

/// The dir name of a key.
///
/// For example, the dir name of a key `a/b/c` is `a/b`.
///
/// Note that the dir name of `a` is still `a`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DirName<K> {
key: K,
level: usize,
}

impl<K> DirName<K> {
pub fn new(key: K) -> Self {
DirName { key, level: 1 }
}

pub fn new_with_level(key: K, level: usize) -> Self {
DirName { key, level }
}

pub fn with_level(&mut self, level: usize) -> &mut Self {
self.level = level;
self
}

pub fn key(&self) -> &K {
&self.key
}

pub fn into_key(self) -> K {
self.key
}
}

impl<K: Key> Key for DirName<K> {
const PREFIX: &'static str = K::PREFIX;
type ValueType = K::ValueType;

fn to_string_key(&self) -> String {
let k = self.key.to_string_key();
k.rsplitn(self.level + 1, '/').last().unwrap().to_string()
}

fn from_str_key(s: &str) -> Result<Self, KeyError> {
let d = DirName::new_with_level(K::from_str_key(s)?, 0);
Ok(d)
}
}

#[cfg(test)]
mod tests {
use super::DirName;
use crate::kvapi::Key;
use crate::kvapi::KeyError;

#[derive(Debug, Clone, PartialEq, Eq)]
struct FooKey {
a: u64,
b: String,
c: u64,
}

impl Key for FooKey {
const PREFIX: &'static str = "pref";
type ValueType = ();

fn to_string_key(&self) -> String {
format!("{}/{}/{}/{}", Self::PREFIX, self.a, self.b, self.c)
}

fn from_str_key(s: &str) -> Result<Self, KeyError> {
// dummy impl
let k = FooKey {
a: 9,
b: "x".to_string(),
c: 8,
};
Ok(k)
}
}

#[test]
fn test_dir_name_from_key() {
let d = DirName::<FooKey>::from_str_key("").unwrap();
assert_eq!(
FooKey {
a: 9,
b: "x".to_string(),
c: 8,
},
d.into_key()
);
}

#[test]
fn test_dir_name() {
let k = FooKey {
a: 1,
b: "b".to_string(),
c: 2,
};

let dir = DirName::new(k);
assert_eq!("pref/1/b", dir.to_string_key());

let dir = DirName::new(dir);
assert_eq!("pref/1", dir.to_string_key());

let dir = DirName::new(dir);
assert_eq!("pref", dir.to_string_key());

let dir = DirName::new(dir);
assert_eq!("pref", dir.to_string_key(), "root dir should be the same");
}

#[test]
fn test_dir_name_with_level() {
let k = FooKey {
a: 1,
b: "b".to_string(),
c: 2,
};

let mut dir = DirName::new(k);
assert_eq!("pref/1/b", dir.to_string_key());

dir.with_level(0);
assert_eq!("pref/1/b/2", dir.to_string_key());

dir.with_level(2);
assert_eq!("pref/1", dir.to_string_key());

dir.with_level(3);
assert_eq!("pref", dir.to_string_key());

dir.with_level(4);
assert_eq!("pref", dir.to_string_key(), "root dir should be the same");
}
}
4 changes: 4 additions & 0 deletions src/meta/kvapi/src/kvapi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod api;
mod helper;
mod item;
mod key;
mod key_builder;
mod key_parser;
Expand All @@ -25,6 +26,9 @@ pub use api::ApiBuilder;
pub use api::AsKVApi;
pub use api::KVApi;
pub use api::KVStream;
pub use item::Item;
pub use item::NonEmptyItem;
pub use key::DirName;
pub use key::Key;
pub use key::KeyError;
pub use key_builder::KeyBuilder;
Expand Down
1 change: 1 addition & 0 deletions src/meta/types/src/proto_ext/seq_v_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl From<SeqV> for pb::SeqV {
}

impl From<pb::SeqV> for SeqV {
/// Convert from protobuf SeqV to the native SeqV we defined.
fn from(sv: pb::SeqV) -> Self {
Self {
seq: sv.seq,
Expand Down
1 change: 1 addition & 0 deletions src/query/management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ databend-common-proto-conv = { path = "../../meta/proto-conv" }
async-backtrace = { workspace = true }
async-trait = { workspace = true }
enumflags2 = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
minitrace = { workspace = true }
prost = { workspace = true }
Expand Down
Loading

0 comments on commit ce4b569

Please sign in to comment.