Skip to content

Commit

Permalink
refactor: refine UdfMgr (#14589)
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer authored Feb 4, 2024
1 parent ae9c884 commit 432ead0
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 48 deletions.
1 change: 1 addition & 0 deletions src/meta/app/src/principal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub use user_defined_file_format::UserDefinedFileFormat;
pub use user_defined_function::LambdaUDF;
pub use user_defined_function::UDFDefinition;
pub use user_defined_function::UDFServer;
pub use user_defined_function::UdfName;
pub use user_defined_function::UserDefinedFunction;
pub use user_grant::GrantEntry;
pub use user_grant::GrantObject;
Expand Down
50 changes: 50 additions & 0 deletions src/meta/app/src/principal/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,26 @@ use std::fmt::Formatter;
use chrono::DateTime;
use chrono::Utc;
use databend_common_expression::types::DataType;
use databend_common_meta_kvapi::kvapi::Key;

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UdfName {
pub tenant: String,
pub name: String,
}

impl UdfName {
pub fn new(tenant: impl ToString, name: impl ToString) -> Self {
Self {
tenant: tenant.to_string(),
name: name.to_string(),
}
}

pub fn tenant_prefix(&self) -> String {
Self::new(&self.tenant, "").to_string_key()
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LambdaUDF {
Expand Down Expand Up @@ -128,3 +148,33 @@ impl Display for UDFDefinition {
Ok(())
}
}

mod kv_api_impl {
use databend_common_meta_kvapi::kvapi;

use super::UdfName;
use crate::principal::UserDefinedFunction;

impl kvapi::Key for UdfName {
const PREFIX: &'static str = "__fd_udfs";

type ValueType = UserDefinedFunction;

fn to_string_key(&self) -> String {
kvapi::KeyBuilder::new_prefixed(Self::PREFIX)
.push_str(&self.tenant)
.push_str(&self.name)
.done()
}

fn from_str_key(s: &str) -> Result<Self, kvapi::KeyError> {
let mut p = kvapi::KeyParser::new_prefixed(s, Self::PREFIX)?;

let tenant = p.next_str()?;
let name = p.next_str()?;
p.done()?;

Ok(UdfName { tenant, name })
}
}
}
17 changes: 16 additions & 1 deletion src/meta/app/src/schema/create_option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
use databend_common_meta_types::MatchSeq;

#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, PartialEq, Eq)]
pub enum CreateOption {
CreateIfNotExists(bool),
CreateOrReplace,
}

impl From<CreateOption> for MatchSeq {
/// Convert `CreateOption` to `MatchSeq`.
///
/// - If `CreateOption` is `CreateIfNotExists`, then to add a record only when it does not exist, i.e., `MatchSeq` is `Exact(0)`.
/// - If `CreateOption` is `CreateOrReplace`, then always to add a record, i.e., `MatchSeq` matches any value: `GE(0)`.
fn from(create_option: CreateOption) -> Self {
match create_option {
CreateOption::CreateIfNotExists(_) => MatchSeq::Exact(0),
CreateOption::CreateOrReplace => MatchSeq::GE(0),
}
}
}
66 changes: 29 additions & 37 deletions src/query/management/src/udf/udf_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,28 @@

use std::sync::Arc;

use databend_common_base::base::escape_for_key;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_functions::is_builtin_function;
use databend_common_meta_app::principal::UdfName;
use databend_common_meta_app::principal::UserDefinedFunction;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MatchSeqExt;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::UpsertKV;
use databend_common_meta_types::With;

use crate::serde::deserialize_struct;
use crate::serde::serialize_struct;
use crate::udf::UdfApi;

static UDF_API_KEY_PREFIX: &str = "__fd_udfs";

pub struct UdfMgr {
kv_api: Arc<dyn kvapi::KVApi<Error = MetaError>>,
udf_prefix: String,
tenant: String,
}

impl UdfMgr {
Expand All @@ -49,7 +48,7 @@ impl UdfMgr {

Ok(UdfMgr {
kv_api,
udf_prefix: format!("{}/{}", UDF_API_KEY_PREFIX, escape_for_key(tenant)?),
tenant: tenant.to_string(),
})
}
}
Expand All @@ -65,19 +64,13 @@ impl UdfApi for UdfMgr {
info.name.as_str()
)));
}
let key = format!("{}/{}", self.udf_prefix, escape_for_key(&info.name)?);

let val = Operation::Update(serialize_struct(&info, ErrorCode::IllegalUDFFormat, || "")?);

let seq = match create_option {
CreateOption::CreateIfNotExists(_) => MatchSeq::Exact(0),
CreateOption::CreateOrReplace => MatchSeq::GE(0),
};
let seq = MatchSeq::from(*create_option);

let res = self
.kv_api
.upsert_kv(UpsertKVReq::new(&key, seq, val, None))
.await?;
let key = UdfName::new(&self.tenant, &info.name);
let value = serialize_struct(&info, ErrorCode::IllegalUDFFormat, || "")?;
let req = UpsertKV::insert(key.to_string_key(), &value).with(seq);
let res = self.kv_api.upsert_kv(req).await?;

if let CreateOption::CreateIfNotExists(false) = create_option {
if res.prev.is_some() {
Expand All @@ -101,16 +94,17 @@ impl UdfApi for UdfMgr {
)));
}

// TODO: remove get_udf(), check if the UDF exists after upsert_kv()
// Check if UDF is defined
let _ = self.get_udf(info.name.as_str(), seq).await?;

let val = Operation::Update(serialize_struct(&info, ErrorCode::IllegalUDFFormat, || "")?);
let key = format!("{}/{}", self.udf_prefix, escape_for_key(&info.name)?);
let upsert_info = self
.kv_api
.upsert_kv(UpsertKVReq::new(&key, seq, val, None));
let key = UdfName::new(&self.tenant, &info.name);
// TODO: these logic are reppeated several times, consider to extract them.
// TODO: add a new trait PBKVApi for the common logic that saves pb values in kvapi.
let value = serialize_struct(&info, ErrorCode::IllegalUDFFormat, || "")?;
let req = UpsertKV::update(key.to_string_key(), &value).with(seq);
let res = self.kv_api.upsert_kv(req).await?;

let res = upsert_info.await?;
match res.result {
Some(SeqV { seq: s, .. }) => Ok(s),
None => Err(ErrorCode::UnknownUDF(format!(
Expand All @@ -123,10 +117,10 @@ impl UdfApi for UdfMgr {
#[async_backtrace::framed]
#[minitrace::trace]
async fn get_udf(&self, udf_name: &str, seq: MatchSeq) -> Result<SeqV<UserDefinedFunction>> {
let key = format!("{}/{}", self.udf_prefix, escape_for_key(udf_name)?);
let kv_api = self.kv_api.clone();
let get_kv = async move { kv_api.get_kv(&key).await };
let res = get_kv.await?;
// TODO: get() does not need seq
let key = UdfName::new(&self.tenant, udf_name);
let res = self.kv_api.get_kv(&key.to_string_key()).await?;

let seq_value = res
.ok_or_else(|| ErrorCode::UnknownUDF(format!("UDF '{}' does not exist.", udf_name)))?;

Expand All @@ -146,7 +140,9 @@ impl UdfApi for UdfMgr {
#[async_backtrace::framed]
#[minitrace::trace]
async fn get_udfs(&self) -> Result<Vec<UserDefinedFunction>> {
let values = self.kv_api.prefix_list_kv(&self.udf_prefix).await?;
let key = UdfName::new(&self.tenant, "");
// TODO: use list_kv instead.
let values = self.kv_api.prefix_list_kv(&key.to_string_key()).await?;

let mut udfs = Vec::with_capacity(values.len());
for (name, value) in values {
Expand All @@ -164,14 +160,10 @@ impl UdfApi for UdfMgr {
#[async_backtrace::framed]
#[minitrace::trace]
async fn drop_udf(&self, udf_name: &str, seq: MatchSeq) -> Result<()> {
let key = format!("{}/{}", self.udf_prefix, escape_for_key(udf_name)?);
let kv_api = self.kv_api.clone();
let upsert_kv = async move {
kv_api
.upsert_kv(UpsertKVReq::new(&key, seq, Operation::Delete, None))
.await
};
let res = upsert_kv.await?;
let key = UdfName::new(&self.tenant, udf_name);
let req = UpsertKV::delete(key.to_string_key()).with(seq);
let res = self.kv_api.upsert_kv(req).await?;

if res.prev.is_some() && res.result.is_none() {
Ok(())
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ impl CreateTableInterpreter {
}

let req = CreateTableReq {
create_option: self.plan.create_option.clone(),
create_option: self.plan.create_option,
name_ident: TableNameIdent {
tenant: self.plan.tenant.to_string(),
db_name: self.plan.database.to_string(),
Expand Down Expand Up @@ -428,7 +428,7 @@ impl CreateTableInterpreter {
..Default::default()
};
let req = CreateTableReq {
create_option: self.plan.create_option.clone(),
create_option: self.plan.create_option,
name_ident: TableNameIdent {
tenant: self.plan.tenant.to_string(),
db_name: self.plan.database.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl Binder {
.await?;

let plan = CreateUserPlan {
create_option: create_option.clone(),
create_option: *create_option,
user: user.clone(),
auth_info: AuthInfo::create2(&auth_option.auth_type, &auth_option.password)?,
user_option,
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Binder {
);
parse_storage_params_from_uri(&mut location, None, "when CREATE CONNECTION").await?;
Ok(Plan::CreateConnection(Box::new(CreateConnectionPlan {
create_option: stmt.create_option.clone(),
create_option: stmt.create_option,
name: stmt.name.to_string(),
storage_type: stmt.storage_type.clone(),
storage_params: stmt.storage_params.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl Binder {
let meta = self.database_meta(engine, options, from_share)?;

Ok(Plan::CreateDatabase(Box::new(CreateDatabasePlan {
create_option: create_option.clone(),
create_option: *create_option,
tenant,
catalog,
database,
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Binder {
}

Ok(Plan::CreateStage(Box::new(CreateStagePlan {
create_option: create_option.clone(),
create_option: *create_option,
tenant: self.ctx.get_tenant(),
stage_info,
})))
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ impl Binder {
};

let plan = CreateTablePlan {
create_option: create_option.clone(),
create_option: *create_option,
tenant: self.ctx.get_tenant(),
catalog: catalog.clone(),
database: database.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Binder {
.bind_udf_definition(&stmt.udf_name, &stmt.description, &stmt.definition)
.await?;
Ok(Plan::CreateUDF(Box::new(CreateUDFPlan {
create_option: stmt.create_option.clone(),
create_option: stmt.create_option,
udf,
})))
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/sql/src/planner/plans/ddl/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct CreateDatabasePlan {
impl From<CreateDatabasePlan> for CreateDatabaseReq {
fn from(p: CreateDatabasePlan) -> Self {
CreateDatabaseReq {
create_option: p.create_option.clone(),
create_option: p.create_option,
name_ident: DatabaseNameIdent {
tenant: p.tenant,
db_name: p.database,
Expand All @@ -46,7 +46,7 @@ impl From<CreateDatabasePlan> for CreateDatabaseReq {
impl From<&CreateDatabasePlan> for CreateDatabaseReq {
fn from(p: &CreateDatabasePlan) -> Self {
CreateDatabaseReq {
create_option: p.create_option.clone(),
create_option: p.create_option,
name_ident: DatabaseNameIdent {
tenant: p.tenant.clone(),
db_name: p.database.clone(),
Expand Down

0 comments on commit 432ead0

Please sign in to comment.