diff --git a/src/meta/app/src/principal/mod.rs b/src/meta/app/src/principal/mod.rs index 6e6b624c4c7e..84bf4b9eb68c 100644 --- a/src/meta/app/src/principal/mod.rs +++ b/src/meta/app/src/principal/mod.rs @@ -47,6 +47,7 @@ pub use user_defined_file_format::UserDefinedFileFormat; pub use user_defined_function::LambdaUDF; pub use user_defined_function::UDFDefinition; pub use user_defined_function::UDFServer; +pub use user_defined_function::UdfName; pub use user_defined_function::UserDefinedFunction; pub use user_grant::GrantEntry; pub use user_grant::GrantObject; diff --git a/src/meta/app/src/principal/user_defined_function.rs b/src/meta/app/src/principal/user_defined_function.rs index 526d3c2e9040..9f19981c715d 100644 --- a/src/meta/app/src/principal/user_defined_function.rs +++ b/src/meta/app/src/principal/user_defined_function.rs @@ -18,6 +18,26 @@ use std::fmt::Formatter; use chrono::DateTime; use chrono::Utc; use databend_common_expression::types::DataType; +use databend_common_meta_kvapi::kvapi::Key; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct UdfName { + pub tenant: String, + pub name: String, +} + +impl UdfName { + pub fn new(tenant: impl ToString, name: impl ToString) -> Self { + Self { + tenant: tenant.to_string(), + name: name.to_string(), + } + } + + pub fn tenant_prefix(&self) -> String { + Self::new(&self.tenant, "").to_string_key() + } +} #[derive(Clone, Debug, Eq, PartialEq)] pub struct LambdaUDF { @@ -128,3 +148,33 @@ impl Display for UDFDefinition { Ok(()) } } + +mod kv_api_impl { + use databend_common_meta_kvapi::kvapi; + + use super::UdfName; + use crate::principal::UserDefinedFunction; + + impl kvapi::Key for UdfName { + const PREFIX: &'static str = "__fd_udfs"; + + type ValueType = UserDefinedFunction; + + fn to_string_key(&self) -> String { + kvapi::KeyBuilder::new_prefixed(Self::PREFIX) + .push_str(&self.tenant) + .push_str(&self.name) + .done() + } + + fn from_str_key(s: &str) -> Result { + let mut p = kvapi::KeyParser::new_prefixed(s, Self::PREFIX)?; + + let tenant = p.next_str()?; + let name = p.next_str()?; + p.done()?; + + Ok(UdfName { tenant, name }) + } + } +} diff --git a/src/meta/app/src/schema/create_option.rs b/src/meta/app/src/schema/create_option.rs index 86c7aa2d3bf4..c46a927c2b8f 100644 --- a/src/meta/app/src/schema/create_option.rs +++ b/src/meta/app/src/schema/create_option.rs @@ -12,8 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +use databend_common_meta_types::MatchSeq; + +#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, PartialEq, Eq)] pub enum CreateOption { CreateIfNotExists(bool), CreateOrReplace, } + +impl From for MatchSeq { + /// Convert `CreateOption` to `MatchSeq`. + /// + /// - If `CreateOption` is `CreateIfNotExists`, then to add a record only when it does not exist, i.e., `MatchSeq` is `Exact(0)`. + /// - If `CreateOption` is `CreateOrReplace`, then always to add a record, i.e., `MatchSeq` matches any value: `GE(0)`. + fn from(create_option: CreateOption) -> Self { + match create_option { + CreateOption::CreateIfNotExists(_) => MatchSeq::Exact(0), + CreateOption::CreateOrReplace => MatchSeq::GE(0), + } + } +} diff --git a/src/query/management/src/udf/udf_mgr.rs b/src/query/management/src/udf/udf_mgr.rs index 4082e2592d19..82c6d8dc9265 100644 --- a/src/query/management/src/udf/udf_mgr.rs +++ b/src/query/management/src/udf/udf_mgr.rs @@ -14,29 +14,28 @@ use std::sync::Arc; -use databend_common_base::base::escape_for_key; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_functions::is_builtin_function; +use databend_common_meta_app::principal::UdfName; use databend_common_meta_app::principal::UserDefinedFunction; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_kvapi::kvapi; -use databend_common_meta_kvapi::kvapi::UpsertKVReq; +use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MatchSeqExt; use databend_common_meta_types::MetaError; -use databend_common_meta_types::Operation; use databend_common_meta_types::SeqV; +use databend_common_meta_types::UpsertKV; +use databend_common_meta_types::With; use crate::serde::deserialize_struct; use crate::serde::serialize_struct; use crate::udf::UdfApi; -static UDF_API_KEY_PREFIX: &str = "__fd_udfs"; - pub struct UdfMgr { kv_api: Arc>, - udf_prefix: String, + tenant: String, } impl UdfMgr { @@ -49,7 +48,7 @@ impl UdfMgr { Ok(UdfMgr { kv_api, - udf_prefix: format!("{}/{}", UDF_API_KEY_PREFIX, escape_for_key(tenant)?), + tenant: tenant.to_string(), }) } } @@ -65,19 +64,13 @@ impl UdfApi for UdfMgr { info.name.as_str() ))); } - let key = format!("{}/{}", self.udf_prefix, escape_for_key(&info.name)?); - - let val = Operation::Update(serialize_struct(&info, ErrorCode::IllegalUDFFormat, || "")?); - let seq = match create_option { - CreateOption::CreateIfNotExists(_) => MatchSeq::Exact(0), - CreateOption::CreateOrReplace => MatchSeq::GE(0), - }; + let seq = MatchSeq::from(*create_option); - let res = self - .kv_api - .upsert_kv(UpsertKVReq::new(&key, seq, val, None)) - .await?; + let key = UdfName::new(&self.tenant, &info.name); + let value = serialize_struct(&info, ErrorCode::IllegalUDFFormat, || "")?; + let req = UpsertKV::insert(key.to_string_key(), &value).with(seq); + let res = self.kv_api.upsert_kv(req).await?; if let CreateOption::CreateIfNotExists(false) = create_option { if res.prev.is_some() { @@ -101,16 +94,17 @@ impl UdfApi for UdfMgr { ))); } + // TODO: remove get_udf(), check if the UDF exists after upsert_kv() // Check if UDF is defined let _ = self.get_udf(info.name.as_str(), seq).await?; - let val = Operation::Update(serialize_struct(&info, ErrorCode::IllegalUDFFormat, || "")?); - let key = format!("{}/{}", self.udf_prefix, escape_for_key(&info.name)?); - let upsert_info = self - .kv_api - .upsert_kv(UpsertKVReq::new(&key, seq, val, None)); + let key = UdfName::new(&self.tenant, &info.name); + // TODO: these logic are reppeated several times, consider to extract them. + // TODO: add a new trait PBKVApi for the common logic that saves pb values in kvapi. + let value = serialize_struct(&info, ErrorCode::IllegalUDFFormat, || "")?; + let req = UpsertKV::update(key.to_string_key(), &value).with(seq); + let res = self.kv_api.upsert_kv(req).await?; - let res = upsert_info.await?; match res.result { Some(SeqV { seq: s, .. }) => Ok(s), None => Err(ErrorCode::UnknownUDF(format!( @@ -123,10 +117,10 @@ impl UdfApi for UdfMgr { #[async_backtrace::framed] #[minitrace::trace] async fn get_udf(&self, udf_name: &str, seq: MatchSeq) -> Result> { - let key = format!("{}/{}", self.udf_prefix, escape_for_key(udf_name)?); - let kv_api = self.kv_api.clone(); - let get_kv = async move { kv_api.get_kv(&key).await }; - let res = get_kv.await?; + // TODO: get() does not need seq + let key = UdfName::new(&self.tenant, udf_name); + let res = self.kv_api.get_kv(&key.to_string_key()).await?; + let seq_value = res .ok_or_else(|| ErrorCode::UnknownUDF(format!("UDF '{}' does not exist.", udf_name)))?; @@ -146,7 +140,9 @@ impl UdfApi for UdfMgr { #[async_backtrace::framed] #[minitrace::trace] async fn get_udfs(&self) -> Result> { - let values = self.kv_api.prefix_list_kv(&self.udf_prefix).await?; + let key = UdfName::new(&self.tenant, ""); + // TODO: use list_kv instead. + let values = self.kv_api.prefix_list_kv(&key.to_string_key()).await?; let mut udfs = Vec::with_capacity(values.len()); for (name, value) in values { @@ -164,14 +160,10 @@ impl UdfApi for UdfMgr { #[async_backtrace::framed] #[minitrace::trace] async fn drop_udf(&self, udf_name: &str, seq: MatchSeq) -> Result<()> { - let key = format!("{}/{}", self.udf_prefix, escape_for_key(udf_name)?); - let kv_api = self.kv_api.clone(); - let upsert_kv = async move { - kv_api - .upsert_kv(UpsertKVReq::new(&key, seq, Operation::Delete, None)) - .await - }; - let res = upsert_kv.await?; + let key = UdfName::new(&self.tenant, udf_name); + let req = UpsertKV::delete(key.to_string_key()).with(seq); + let res = self.kv_api.upsert_kv(req).await?; + if res.prev.is_some() && res.result.is_none() { Ok(()) } else { diff --git a/src/query/service/src/interpreters/interpreter_table_create.rs b/src/query/service/src/interpreters/interpreter_table_create.rs index 79089d902d62..036b65d7adda 100644 --- a/src/query/service/src/interpreters/interpreter_table_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_create.rs @@ -362,7 +362,7 @@ impl CreateTableInterpreter { } let req = CreateTableReq { - create_option: self.plan.create_option.clone(), + create_option: self.plan.create_option, name_ident: TableNameIdent { tenant: self.plan.tenant.to_string(), db_name: self.plan.database.to_string(), @@ -428,7 +428,7 @@ impl CreateTableInterpreter { ..Default::default() }; let req = CreateTableReq { - create_option: self.plan.create_option.clone(), + create_option: self.plan.create_option, name_ident: TableNameIdent { tenant: self.plan.tenant.to_string(), db_name: self.plan.database.to_string(), diff --git a/src/query/sql/src/planner/binder/ddl/account.rs b/src/query/sql/src/planner/binder/ddl/account.rs index 935ba8d13235..1cbfcbbc1fc8 100644 --- a/src/query/sql/src/planner/binder/ddl/account.rs +++ b/src/query/sql/src/planner/binder/ddl/account.rs @@ -248,7 +248,7 @@ impl Binder { .await?; let plan = CreateUserPlan { - create_option: create_option.clone(), + create_option: *create_option, user: user.clone(), auth_info: AuthInfo::create2(&auth_option.auth_type, &auth_option.password)?, user_option, diff --git a/src/query/sql/src/planner/binder/ddl/connection.rs b/src/query/sql/src/planner/binder/ddl/connection.rs index be3412f0ed18..c9e7c84e50be 100644 --- a/src/query/sql/src/planner/binder/ddl/connection.rs +++ b/src/query/sql/src/planner/binder/ddl/connection.rs @@ -36,7 +36,7 @@ impl Binder { ); parse_storage_params_from_uri(&mut location, None, "when CREATE CONNECTION").await?; Ok(Plan::CreateConnection(Box::new(CreateConnectionPlan { - create_option: stmt.create_option.clone(), + create_option: stmt.create_option, name: stmt.name.to_string(), storage_type: stmt.storage_type.clone(), storage_params: stmt.storage_params.clone(), diff --git a/src/query/sql/src/planner/binder/ddl/database.rs b/src/query/sql/src/planner/binder/ddl/database.rs index 6ad083bb4c70..e3ccc0c0a9de 100644 --- a/src/query/sql/src/planner/binder/ddl/database.rs +++ b/src/query/sql/src/planner/binder/ddl/database.rs @@ -230,7 +230,7 @@ impl Binder { let meta = self.database_meta(engine, options, from_share)?; Ok(Plan::CreateDatabase(Box::new(CreateDatabasePlan { - create_option: create_option.clone(), + create_option: *create_option, tenant, catalog, database, diff --git a/src/query/sql/src/planner/binder/ddl/stage.rs b/src/query/sql/src/planner/binder/ddl/stage.rs index 4eb4163abdc0..b0e31e8cdbe4 100644 --- a/src/query/sql/src/planner/binder/ddl/stage.rs +++ b/src/query/sql/src/planner/binder/ddl/stage.rs @@ -116,7 +116,7 @@ impl Binder { } Ok(Plan::CreateStage(Box::new(CreateStagePlan { - create_option: create_option.clone(), + create_option: *create_option, tenant: self.ctx.get_tenant(), stage_info, }))) diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 632a93bf559a..b52eab41ba2e 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -627,7 +627,7 @@ impl Binder { }; let plan = CreateTablePlan { - create_option: create_option.clone(), + create_option: *create_option, tenant: self.ctx.get_tenant(), catalog: catalog.clone(), database: database.clone(), diff --git a/src/query/sql/src/planner/binder/udf.rs b/src/query/sql/src/planner/binder/udf.rs index e2549d2e9056..26e9217a1e5d 100644 --- a/src/query/sql/src/planner/binder/udf.rs +++ b/src/query/sql/src/planner/binder/udf.rs @@ -129,7 +129,7 @@ impl Binder { .bind_udf_definition(&stmt.udf_name, &stmt.description, &stmt.definition) .await?; Ok(Plan::CreateUDF(Box::new(CreateUDFPlan { - create_option: stmt.create_option.clone(), + create_option: stmt.create_option, udf, }))) } diff --git a/src/query/sql/src/planner/plans/ddl/database.rs b/src/query/sql/src/planner/plans/ddl/database.rs index 7b8638bc7ce4..ecb625a20e44 100644 --- a/src/query/sql/src/planner/plans/ddl/database.rs +++ b/src/query/sql/src/planner/plans/ddl/database.rs @@ -33,7 +33,7 @@ pub struct CreateDatabasePlan { impl From for CreateDatabaseReq { fn from(p: CreateDatabasePlan) -> Self { CreateDatabaseReq { - create_option: p.create_option.clone(), + create_option: p.create_option, name_ident: DatabaseNameIdent { tenant: p.tenant, db_name: p.database, @@ -46,7 +46,7 @@ impl From for CreateDatabaseReq { impl From<&CreateDatabasePlan> for CreateDatabaseReq { fn from(p: &CreateDatabasePlan) -> Self { CreateDatabaseReq { - create_option: p.create_option.clone(), + create_option: p.create_option, name_ident: DatabaseNameIdent { tenant: p.tenant.clone(), db_name: p.database.clone(),