diff --git a/Cargo.lock b/Cargo.lock index dc0d5a26d909..85167ec39fbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2877,6 +2877,7 @@ dependencies = [ "databend-common-proto-conv", "databend-common-storage", "enumflags2", + "futures", "log", "minitrace", "mockall", diff --git a/src/meta/api/src/kv_pb_api.rs b/src/meta/api/src/kv_pb_api.rs index ec44df0d6866..3df1dd610813 100644 --- a/src/meta/api/src/kv_pb_api.rs +++ b/src/meta/api/src/kv_pb_api.rs @@ -17,7 +17,11 @@ use std::future::Future; use databend_common_meta_kvapi::kvapi; +use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_kvapi::kvapi::Key; +use databend_common_meta_kvapi::kvapi::NonEmptyItem; +use databend_common_meta_types::protobuf::StreamItem; use databend_common_meta_types::InvalidReply; use databend_common_meta_types::MetaError; use databend_common_meta_types::MetaNetworkError; @@ -25,6 +29,9 @@ use databend_common_meta_types::SeqV; use databend_common_proto_conv::FromToProto; use databend_common_proto_conv::Incompatible; use futures::future::FutureExt; +use futures::stream::BoxStream; +use futures::stream::StreamExt; +use futures::TryStreamExt; use PbApiReadError::KvApiError; /// An error occurs when decoding protobuf encoded value. @@ -35,12 +42,30 @@ pub enum PbDecodeError { Incompatible(#[from] Incompatible), } +/// An error occurs when found an unexpected None value. +#[derive(Clone, Debug, PartialEq, thiserror::Error)] +#[error("NoneValue: unexpected None value of key: '{key}'")] +pub struct NoneValue { + key: String, +} + +impl NoneValue { + pub fn new(key: impl ToString) -> Self { + NoneValue { + key: key.to_string(), + } + } +} + /// An error occurs when reading protobuf encoded value from kv store. #[derive(Clone, Debug, PartialEq, thiserror::Error)] #[error("PbApiReadError: {0}")] pub enum PbApiReadError { DecodeError(#[from] prost::DecodeError), Incompatible(#[from] Incompatible), + KeyError(#[from] kvapi::KeyError), + NoneValue(#[from] NoneValue), + /// Error returned from KVApi. KvApiError(E), } @@ -70,6 +95,16 @@ impl From> for MetaError { let net_err = MetaNetworkError::InvalidReply(inv); MetaError::NetworkError(net_err) } + PbApiReadError::KeyError(e) => { + let inv = InvalidReply::new("", &e); + let net_err = MetaNetworkError::InvalidReply(inv); + MetaError::NetworkError(net_err) + } + PbApiReadError::NoneValue(e) => { + let inv = InvalidReply::new("", &e); + let net_err = MetaNetworkError::InvalidReply(inv); + MetaError::NetworkError(net_err) + } KvApiError(e) => e, } } @@ -110,11 +145,77 @@ pub trait KVPbApi: KVApi { Ok(v) } } - // TODO: add list + + /// List protobuf encoded values by prefix and returns a stream. + /// + /// The returned value is decoded by `FromToProto`. + /// It returns the same error as `KVApi::Error`, + /// thus it requires KVApi::Error can describe a decoding error, i.e., `impl From`. + fn list_pb( + &self, + prefix: &DirName, + ) -> impl Future< + Output = Result, Self::Error>>, Self::Error>, + > + Send + where + K: kvapi::Key + 'static, + K::ValueType: FromToProto, + Self::Error: From>, + { + self.list_pb_low(prefix).map(|r| match r { + Ok(strm) => Ok(strm.map_err(Self::Error::from).boxed()), + Err(e) => Err(Self::Error::from(e)), + }) + } + + /// Same as `list_pb` but returns [`PbApiReadError`]. No require of `From` for `Self::Error`. + fn list_pb_low( + &self, + prefix: &DirName, + ) -> impl Future< + Output = Result< + BoxStream<'static, Result, PbApiReadError>>, + PbApiReadError, + >, + > + Send + where + K: kvapi::Key + 'static, + K::ValueType: FromToProto, + { + let prefix = prefix.to_string_key(); + async move { + let strm = self.list_kv(&prefix).await.map_err(KvApiError)?; + let strm = strm.map(decode_non_empty_item::); + Ok(strm.boxed()) + } + } } impl KVPbApi for T where T: KVApi + ?Sized {} +/// Decode key and protobuf encoded value from `StreamItem`. +/// +/// It requires K to be static because it is used in a static stream map() +fn decode_non_empty_item( + r: Result, +) -> Result, PbApiReadError> +where + K: kvapi::Key + 'static, + K::ValueType: FromToProto, +{ + match r { + Ok(item) => { + let k = K::from_str_key(&item.key)?; + + let raw = item.value.ok_or_else(|| NoneValue::new(item.key))?; + let v = decode_seqv::(SeqV::from(raw))?; + + Ok(NonEmptyItem::new(k, v)) + } + Err(e) => Err(KvApiError(e)), + } +} + /// Deserialize SeqV> into SeqV, with FromToProto. fn decode_seqv(seqv: SeqV) -> Result, PbDecodeError> where T: FromToProto { diff --git a/src/meta/kvapi/src/kvapi/item.rs b/src/meta/kvapi/src/kvapi/item.rs new file mode 100644 index 000000000000..6f01b1cfd8e7 --- /dev/null +++ b/src/meta/kvapi/src/kvapi/item.rs @@ -0,0 +1,43 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_meta_types::SeqV; + +use crate::kvapi::Key; + +/// Key-Value item contains key and optional value with seq number. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Item { + pub key: K, + pub seqv: Option>, +} + +impl Item { + pub fn new(key: K, seqv: Option>) -> Self { + Item { key, seqv } + } +} + +/// Key-Value item contains key and non-optional value with seq number. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct NonEmptyItem { + pub key: K, + pub seqv: SeqV, +} + +impl NonEmptyItem { + pub fn new(key: K, seqv: SeqV) -> Self { + NonEmptyItem { key, seqv } + } +} diff --git a/src/meta/kvapi/src/kvapi/key.rs b/src/meta/kvapi/src/kvapi/key.rs index 175486cb556e..39160adb2882 100644 --- a/src/meta/kvapi/src/kvapi/key.rs +++ b/src/meta/kvapi/src/kvapi/key.rs @@ -74,3 +74,143 @@ impl kvapi::Key for String { Ok(s.to_string()) } } + +/// The dir name of a key. +/// +/// For example, the dir name of a key `a/b/c` is `a/b`. +/// +/// Note that the dir name of `a` is still `a`. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct DirName { + key: K, + level: usize, +} + +impl DirName { + pub fn new(key: K) -> Self { + DirName { key, level: 1 } + } + + pub fn new_with_level(key: K, level: usize) -> Self { + DirName { key, level } + } + + pub fn with_level(&mut self, level: usize) -> &mut Self { + self.level = level; + self + } + + pub fn key(&self) -> &K { + &self.key + } + + pub fn into_key(self) -> K { + self.key + } +} + +impl Key for DirName { + const PREFIX: &'static str = K::PREFIX; + type ValueType = K::ValueType; + + fn to_string_key(&self) -> String { + let k = self.key.to_string_key(); + k.rsplitn(self.level + 1, '/').last().unwrap().to_string() + } + + fn from_str_key(s: &str) -> Result { + let d = DirName::new_with_level(K::from_str_key(s)?, 0); + Ok(d) + } +} + +#[cfg(test)] +mod tests { + use super::DirName; + use crate::kvapi::Key; + use crate::kvapi::KeyError; + + #[derive(Debug, Clone, PartialEq, Eq)] + struct FooKey { + a: u64, + b: String, + c: u64, + } + + impl Key for FooKey { + const PREFIX: &'static str = "pref"; + type ValueType = (); + + fn to_string_key(&self) -> String { + format!("{}/{}/{}/{}", Self::PREFIX, self.a, self.b, self.c) + } + + fn from_str_key(s: &str) -> Result { + // dummy impl + let k = FooKey { + a: 9, + b: "x".to_string(), + c: 8, + }; + Ok(k) + } + } + + #[test] + fn test_dir_name_from_key() { + let d = DirName::::from_str_key("").unwrap(); + assert_eq!( + FooKey { + a: 9, + b: "x".to_string(), + c: 8, + }, + d.into_key() + ); + } + + #[test] + fn test_dir_name() { + let k = FooKey { + a: 1, + b: "b".to_string(), + c: 2, + }; + + let dir = DirName::new(k); + assert_eq!("pref/1/b", dir.to_string_key()); + + let dir = DirName::new(dir); + assert_eq!("pref/1", dir.to_string_key()); + + let dir = DirName::new(dir); + assert_eq!("pref", dir.to_string_key()); + + let dir = DirName::new(dir); + assert_eq!("pref", dir.to_string_key(), "root dir should be the same"); + } + + #[test] + fn test_dir_name_with_level() { + let k = FooKey { + a: 1, + b: "b".to_string(), + c: 2, + }; + + let mut dir = DirName::new(k); + assert_eq!("pref/1/b", dir.to_string_key()); + + dir.with_level(0); + assert_eq!("pref/1/b/2", dir.to_string_key()); + + dir.with_level(2); + assert_eq!("pref/1", dir.to_string_key()); + + dir.with_level(3); + assert_eq!("pref", dir.to_string_key()); + + dir.with_level(4); + assert_eq!("pref", dir.to_string_key(), "root dir should be the same"); + } +} diff --git a/src/meta/kvapi/src/kvapi/mod.rs b/src/meta/kvapi/src/kvapi/mod.rs index 054bfe4b0594..7f87eec5d37f 100644 --- a/src/meta/kvapi/src/kvapi/mod.rs +++ b/src/meta/kvapi/src/kvapi/mod.rs @@ -14,6 +14,7 @@ mod api; mod helper; +mod item; mod key; mod key_builder; mod key_parser; @@ -25,6 +26,9 @@ pub use api::ApiBuilder; pub use api::AsKVApi; pub use api::KVApi; pub use api::KVStream; +pub use item::Item; +pub use item::NonEmptyItem; +pub use key::DirName; pub use key::Key; pub use key::KeyError; pub use key_builder::KeyBuilder; diff --git a/src/meta/types/src/proto_ext/seq_v_ext.rs b/src/meta/types/src/proto_ext/seq_v_ext.rs index 3c6dded06e23..093e725c23c3 100644 --- a/src/meta/types/src/proto_ext/seq_v_ext.rs +++ b/src/meta/types/src/proto_ext/seq_v_ext.rs @@ -65,6 +65,7 @@ impl From for pb::SeqV { } impl From for SeqV { + /// Convert from protobuf SeqV to the native SeqV we defined. fn from(sv: pb::SeqV) -> Self { Self { seq: sv.seq, diff --git a/src/query/management/Cargo.toml b/src/query/management/Cargo.toml index 09d383a35358..443230ed33fa 100644 --- a/src/query/management/Cargo.toml +++ b/src/query/management/Cargo.toml @@ -26,6 +26,7 @@ databend-common-proto-conv = { path = "../../meta/proto-conv" } async-backtrace = { workspace = true } async-trait = { workspace = true } enumflags2 = { workspace = true } +futures = { workspace = true } log = { workspace = true } minitrace = { workspace = true } prost = { workspace = true } diff --git a/src/query/management/src/udf/udf_mgr.rs b/src/query/management/src/udf/udf_mgr.rs index 08a5a72d41b9..07b10b95fcdd 100644 --- a/src/query/management/src/udf/udf_mgr.rs +++ b/src/query/management/src/udf/udf_mgr.rs @@ -22,6 +22,7 @@ use databend_common_meta_app::principal::UdfName; use databend_common_meta_app::principal::UserDefinedFunction; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_kvapi::kvapi; +use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeqExt; @@ -29,8 +30,8 @@ use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; use databend_common_meta_types::UpsertKV; use databend_common_meta_types::With; +use futures::stream::TryStreamExt; -use crate::serde::deserialize_struct; use crate::serde::serialize_struct; use crate::udf::UdfApi; @@ -142,20 +143,10 @@ impl UdfApi for UdfMgr { #[async_backtrace::framed] #[minitrace::trace] async fn get_udfs(&self) -> Result> { - let key = UdfName::new(&self.tenant, ""); - // TODO: use list_kv instead. - let values = self.kv_api.prefix_list_kv(&key.to_string_key()).await?; - - let mut udfs = Vec::with_capacity(values.len()); - for (name, value) in values { - let udf = deserialize_struct(&value.data, ErrorCode::IllegalUDFFormat, || { - format!( - "Failed to deserialize UDF '{}': data format is corrupt or invalid.", - name - ) - })?; - udfs.push(udf); - } + let key = DirName::new(UdfName::new(&self.tenant, "")); + let strm = self.kv_api.list_pb(&key).await?; + let strm = strm.map_ok(|item| item.seqv.data); + let udfs = strm.try_collect().await?; Ok(udfs) }