Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Add KVPbApi::list_pb(), simplify UdfMgr::get_udfs() #14623

Merged
merged 2 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

103 changes: 102 additions & 1 deletion src/meta/api/src/kv_pb_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@
use std::future::Future;

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::NonEmptyItem;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaNetworkError;
use databend_common_meta_types::SeqV;
use databend_common_proto_conv::FromToProto;
use databend_common_proto_conv::Incompatible;
use futures::future::FutureExt;
use futures::stream::BoxStream;
use futures::stream::StreamExt;
use futures::TryStreamExt;
use PbApiReadError::KvApiError;

/// An error occurs when decoding protobuf encoded value.
Expand All @@ -35,12 +42,30 @@ pub enum PbDecodeError {
Incompatible(#[from] Incompatible),
}

/// An error occurs when found an unexpected None value.
#[derive(Clone, Debug, PartialEq, thiserror::Error)]
#[error("NoneValue: unexpected None value of key: '{key}'")]
pub struct NoneValue {
key: String,
}

impl NoneValue {
pub fn new(key: impl ToString) -> Self {
NoneValue {
key: key.to_string(),
}
}
}

/// An error occurs when reading protobuf encoded value from kv store.
#[derive(Clone, Debug, PartialEq, thiserror::Error)]
#[error("PbApiReadError: {0}")]
pub enum PbApiReadError<E> {
DecodeError(#[from] prost::DecodeError),
Incompatible(#[from] Incompatible),
KeyError(#[from] kvapi::KeyError),
NoneValue(#[from] NoneValue),
/// Error returned from KVApi.
KvApiError(E),
}

Expand Down Expand Up @@ -70,6 +95,16 @@ impl From<PbApiReadError<MetaError>> for MetaError {
let net_err = MetaNetworkError::InvalidReply(inv);
MetaError::NetworkError(net_err)
}
PbApiReadError::KeyError(e) => {
let inv = InvalidReply::new("", &e);
let net_err = MetaNetworkError::InvalidReply(inv);
MetaError::NetworkError(net_err)
}
PbApiReadError::NoneValue(e) => {
let inv = InvalidReply::new("", &e);
let net_err = MetaNetworkError::InvalidReply(inv);
MetaError::NetworkError(net_err)
}
KvApiError(e) => e,
}
}
Expand Down Expand Up @@ -110,11 +145,77 @@ pub trait KVPbApi: KVApi {
Ok(v)
}
}
// TODO: add list

/// List protobuf encoded values by prefix and returns a stream.
///
/// The returned value is decoded by `FromToProto`.
/// It returns the same error as `KVApi::Error`,
/// thus it requires KVApi::Error can describe a decoding error, i.e., `impl From<PbApiReadError>`.
fn list_pb<K>(
&self,
prefix: &DirName<K>,
) -> impl Future<
Output = Result<BoxStream<'static, Result<NonEmptyItem<K>, Self::Error>>, Self::Error>,
> + Send
where
K: kvapi::Key + 'static,
K::ValueType: FromToProto,
Self::Error: From<PbApiReadError<Self::Error>>,
{
self.list_pb_low(prefix).map(|r| match r {
Ok(strm) => Ok(strm.map_err(Self::Error::from).boxed()),
Err(e) => Err(Self::Error::from(e)),
})
}

/// Same as `list_pb` but returns [`PbApiReadError`]. No require of `From<PbApiReadError>` for `Self::Error`.
fn list_pb_low<K>(
&self,
prefix: &DirName<K>,
) -> impl Future<
Output = Result<
BoxStream<'static, Result<NonEmptyItem<K>, PbApiReadError<Self::Error>>>,
PbApiReadError<Self::Error>,
>,
> + Send
where
K: kvapi::Key + 'static,
K::ValueType: FromToProto,
{
let prefix = prefix.to_string_key();
async move {
let strm = self.list_kv(&prefix).await.map_err(KvApiError)?;
let strm = strm.map(decode_non_empty_item::<K, Self::Error>);
Ok(strm.boxed())
}
}
}

impl<T> KVPbApi for T where T: KVApi + ?Sized {}

/// Decode key and protobuf encoded value from `StreamItem`.
///
/// It requires K to be static because it is used in a static stream map()
fn decode_non_empty_item<K, E>(
r: Result<StreamItem, E>,
) -> Result<NonEmptyItem<K>, PbApiReadError<E>>
where
K: kvapi::Key + 'static,
K::ValueType: FromToProto,
{
match r {
Ok(item) => {
let k = K::from_str_key(&item.key)?;

let raw = item.value.ok_or_else(|| NoneValue::new(item.key))?;
let v = decode_seqv::<K::ValueType>(SeqV::from(raw))?;

Ok(NonEmptyItem::new(k, v))
}
Err(e) => Err(KvApiError(e)),
}
}

/// Deserialize SeqV<Vec<u8>> into SeqV<T>, with FromToProto.
fn decode_seqv<T>(seqv: SeqV) -> Result<SeqV<T>, PbDecodeError>
where T: FromToProto {
Expand Down
43 changes: 43 additions & 0 deletions src/meta/kvapi/src/kvapi/item.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_meta_types::SeqV;

use crate::kvapi::Key;

/// Key-Value item contains key and optional value with seq number.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Item<K: Key> {
pub key: K,
pub seqv: Option<SeqV<K::ValueType>>,
}

impl<K: Key> Item<K> {
pub fn new(key: K, seqv: Option<SeqV<K::ValueType>>) -> Self {
Item { key, seqv }
}
}

/// Key-Value item contains key and non-optional value with seq number.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct NonEmptyItem<K: Key> {
pub key: K,
pub seqv: SeqV<K::ValueType>,
}

impl<K: Key> NonEmptyItem<K> {
pub fn new(key: K, seqv: SeqV<K::ValueType>) -> Self {
NonEmptyItem { key, seqv }
}
}
140 changes: 140 additions & 0 deletions src/meta/kvapi/src/kvapi/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,143 @@ impl kvapi::Key for String {
Ok(s.to_string())
}
}

/// The dir name of a key.
///
/// For example, the dir name of a key `a/b/c` is `a/b`.
///
/// Note that the dir name of `a` is still `a`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DirName<K> {
key: K,
level: usize,
}

impl<K> DirName<K> {
pub fn new(key: K) -> Self {
DirName { key, level: 1 }
}

pub fn new_with_level(key: K, level: usize) -> Self {
DirName { key, level }
}

pub fn with_level(&mut self, level: usize) -> &mut Self {
self.level = level;
self
}

pub fn key(&self) -> &K {
&self.key
}

pub fn into_key(self) -> K {
self.key
}
}

impl<K: Key> Key for DirName<K> {
const PREFIX: &'static str = K::PREFIX;
type ValueType = K::ValueType;

fn to_string_key(&self) -> String {
let k = self.key.to_string_key();
k.rsplitn(self.level + 1, '/').last().unwrap().to_string()
}

fn from_str_key(s: &str) -> Result<Self, KeyError> {
let d = DirName::new_with_level(K::from_str_key(s)?, 0);
Ok(d)
}
}

#[cfg(test)]
mod tests {
use super::DirName;
use crate::kvapi::Key;
use crate::kvapi::KeyError;

#[derive(Debug, Clone, PartialEq, Eq)]
struct FooKey {
a: u64,
b: String,
c: u64,
}

impl Key for FooKey {
const PREFIX: &'static str = "pref";
type ValueType = ();

fn to_string_key(&self) -> String {
format!("{}/{}/{}/{}", Self::PREFIX, self.a, self.b, self.c)
}

fn from_str_key(_s: &str) -> Result<Self, KeyError> {
// dummy impl
let k = FooKey {
a: 9,
b: "x".to_string(),
c: 8,
};
Ok(k)
}
}

#[test]
fn test_dir_name_from_key() {
let d = DirName::<FooKey>::from_str_key("").unwrap();
assert_eq!(
FooKey {
a: 9,
b: "x".to_string(),
c: 8,
},
d.into_key()
);
}

#[test]
fn test_dir_name() {
let k = FooKey {
a: 1,
b: "b".to_string(),
c: 2,
};

let dir = DirName::new(k);
assert_eq!("pref/1/b", dir.to_string_key());

let dir = DirName::new(dir);
assert_eq!("pref/1", dir.to_string_key());

let dir = DirName::new(dir);
assert_eq!("pref", dir.to_string_key());

let dir = DirName::new(dir);
assert_eq!("pref", dir.to_string_key(), "root dir should be the same");
}

#[test]
fn test_dir_name_with_level() {
let k = FooKey {
a: 1,
b: "b".to_string(),
c: 2,
};

let mut dir = DirName::new(k);
assert_eq!("pref/1/b", dir.to_string_key());

dir.with_level(0);
assert_eq!("pref/1/b/2", dir.to_string_key());

dir.with_level(2);
assert_eq!("pref/1", dir.to_string_key());

dir.with_level(3);
assert_eq!("pref", dir.to_string_key());

dir.with_level(4);
assert_eq!("pref", dir.to_string_key(), "root dir should be the same");
}
}
4 changes: 4 additions & 0 deletions src/meta/kvapi/src/kvapi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod api;
mod helper;
mod item;
mod key;
mod key_builder;
mod key_parser;
Expand All @@ -25,6 +26,9 @@ pub use api::ApiBuilder;
pub use api::AsKVApi;
pub use api::KVApi;
pub use api::KVStream;
pub use item::Item;
pub use item::NonEmptyItem;
pub use key::DirName;
pub use key::Key;
pub use key::KeyError;
pub use key_builder::KeyBuilder;
Expand Down
1 change: 1 addition & 0 deletions src/meta/types/src/proto_ext/seq_v_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl From<SeqV> for pb::SeqV {
}

impl From<pb::SeqV> for SeqV {
/// Convert from protobuf SeqV to the native SeqV we defined.
fn from(sv: pb::SeqV) -> Self {
Self {
seq: sv.seq,
Expand Down
1 change: 1 addition & 0 deletions src/query/management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ databend-common-proto-conv = { path = "../../meta/proto-conv" }
async-backtrace = { workspace = true }
async-trait = { workspace = true }
enumflags2 = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
minitrace = { workspace = true }
prost = { workspace = true }
Expand Down
Loading
Loading