From 234f1af19f3010bd6fe81204d217166cd409a4dd Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Sat, 3 Feb 2024 15:45:55 +0800 Subject: [PATCH] feat: add support for create or replace udf Signed-off-by: Chojan Shang --- src/query/ast/src/ast/statements/udf.rs | 16 +++++++--- src/query/ast/src/parser/statement.rs | 16 ++++++---- src/query/management/src/udf/udf_api.rs | 3 +- src/query/management/src/udf/udf_mgr.rs | 32 +++++++++++++------ src/query/management/tests/it/udf.rs | 21 ++++++------ .../interpreter_user_udf_create.rs | 2 +- src/query/sql/src/planner/binder/udf.rs | 2 +- src/query/sql/src/planner/plans/ddl/udf.rs | 3 +- src/query/users/src/user_udf.rs | 17 +++------- src/query/users/tests/it/user_udf.rs | 12 +++---- 10 files changed, 69 insertions(+), 55 deletions(-) diff --git a/src/query/ast/src/ast/statements/udf.rs b/src/query/ast/src/ast/statements/udf.rs index e634fd541b5b..8511a97254fa 100644 --- a/src/query/ast/src/ast/statements/udf.rs +++ b/src/query/ast/src/ast/statements/udf.rs @@ -15,6 +15,8 @@ use std::fmt::Display; use std::fmt::Formatter; +use databend_common_meta_app::schema::CreateOption; + use crate::ast::write_comma_separated_list; use crate::ast::Expr; use crate::ast::Identifier; @@ -37,7 +39,7 @@ pub enum UDFDefinition { #[derive(Debug, Clone, PartialEq)] pub struct CreateUDFStmt { - pub if_not_exists: bool, + pub create_option: CreateOption, pub udf_name: Identifier, pub description: Option, pub definition: UDFDefinition, @@ -82,9 +84,15 @@ impl Display for UDFDefinition { impl Display for CreateUDFStmt { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "CREATE FUNCTION")?; - if self.if_not_exists { - write!(f, " IF NOT EXISTS")?; + write!(f, "CREATE")?; + if let CreateOption::CreateOrReplace = self.create_option { + write!(f, " OR REPLACE")?; + } + write!(f, " FUNCTION")?; + if let CreateOption::CreateIfNotExists(if_not_exists) = self.create_option { + if if_not_exists { + write!(f, " IF NOT EXISTS")?; + } } write!(f, " {} {}", self.udf_name, self.definition)?; if let Some(description) = &self.description { diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index 0089c7cbca26..874044387e24 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -1182,19 +1182,21 @@ pub fn statement(i: Input) -> IResult { }) }, ); - let create_udf = map( + let create_udf = map_res( rule! { - CREATE ~ FUNCTION ~ ( IF ~ ^NOT ~ ^EXISTS )? + CREATE ~ (OR ~ REPLACE)? ~ FUNCTION ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ #ident ~ #udf_definition ~ ( DESC ~ ^"=" ~ ^#literal_string )? }, - |(_, _, opt_if_not_exists, udf_name, definition, opt_description)| { - Statement::CreateUDF(CreateUDFStmt { - if_not_exists: opt_if_not_exists.is_some(), + |(_, opt_or_replace, _, opt_if_not_exists, udf_name, definition, opt_description)| { + let create_option = + parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?; + Ok(Statement::CreateUDF(CreateUDFStmt { + create_option, udf_name, description: opt_description.map(|(_, _, description)| description), definition, - }) + })) }, ); let drop_udf = map( @@ -1881,7 +1883,7 @@ pub fn statement(i: Input) -> IResult { | #show_roles : "`SHOW ROLES`" | #create_role : "`CREATE ROLE [IF NOT EXISTS] `" | #drop_role : "`DROP ROLE [IF EXISTS] `" - | #create_udf : "`CREATE FUNCTION [IF NOT EXISTS] {AS (, ...) -> | (, ...) RETURNS LANGUAGE HANDLER= ADDRESS=} [DESC = ]`" + | #create_udf : "`CREATE [OR REPLACE] FUNCTION [IF NOT EXISTS] {AS (, ...) -> | (, ...) RETURNS LANGUAGE HANDLER= ADDRESS=} [DESC = ]`" | #drop_udf : "`DROP FUNCTION [IF EXISTS] `" | #alter_udf : "`ALTER FUNCTION (, ...) -> [DESC = ]`" | #set_role: "`SET [DEFAULT] ROLE `" diff --git a/src/query/management/src/udf/udf_api.rs b/src/query/management/src/udf/udf_api.rs index 5e5fa4ff44df..23ce0fcc0d32 100644 --- a/src/query/management/src/udf/udf_api.rs +++ b/src/query/management/src/udf/udf_api.rs @@ -14,13 +14,14 @@ use databend_common_exception::Result; use databend_common_meta_app::principal::UserDefinedFunction; +use databend_common_meta_app::schema::CreateOption; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::SeqV; #[async_trait::async_trait] pub trait UdfApi: Sync + Send { // Add a UDF to /tenant/udf-name. - async fn add_udf(&self, udf: UserDefinedFunction) -> Result; + async fn add_udf(&self, udf: UserDefinedFunction, create_option: &CreateOption) -> Result<()>; // Update a UDF to /tenant/udf-name. async fn update_udf(&self, udf: UserDefinedFunction, seq: MatchSeq) -> Result; diff --git a/src/query/management/src/udf/udf_mgr.rs b/src/query/management/src/udf/udf_mgr.rs index ffc7c4f0aaec..4082e2592d19 100644 --- a/src/query/management/src/udf/udf_mgr.rs +++ b/src/query/management/src/udf/udf_mgr.rs @@ -19,6 +19,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_functions::is_builtin_function; use databend_common_meta_app::principal::UserDefinedFunction; +use databend_common_meta_app::schema::CreateOption; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::MatchSeq; @@ -57,26 +58,37 @@ impl UdfMgr { impl UdfApi for UdfMgr { #[async_backtrace::framed] #[minitrace::trace] - async fn add_udf(&self, info: UserDefinedFunction) -> Result { + async fn add_udf(&self, info: UserDefinedFunction, create_option: &CreateOption) -> Result<()> { if is_builtin_function(info.name.as_str()) { return Err(ErrorCode::UdfAlreadyExists(format!( "It's a builtin function: {}", info.name.as_str() ))); } + let key = format!("{}/{}", self.udf_prefix, escape_for_key(&info.name)?); - let seq = MatchSeq::Exact(0); let val = Operation::Update(serialize_struct(&info, ErrorCode::IllegalUDFFormat, || "")?); - let key = format!("{}/{}", self.udf_prefix, escape_for_key(&info.name)?); - let upsert_info = self - .kv_api - .upsert_kv(UpsertKVReq::new(&key, seq, val, None)); - let res_seq = upsert_info.await?.added_seq_or_else(|_v| { - ErrorCode::UdfAlreadyExists(format!("UDF '{}' already exists.", info.name)) - })?; + let seq = match create_option { + CreateOption::CreateIfNotExists(_) => MatchSeq::Exact(0), + CreateOption::CreateOrReplace => MatchSeq::GE(0), + }; + + let res = self + .kv_api + .upsert_kv(UpsertKVReq::new(&key, seq, val, None)) + .await?; + + if let CreateOption::CreateIfNotExists(false) = create_option { + if res.prev.is_some() { + return Err(ErrorCode::UdfAlreadyExists(format!( + "UDF '{}' already exists.", + info.name + ))); + } + } - Ok(res_seq) + Ok(()) } #[async_backtrace::framed] diff --git a/src/query/management/tests/it/udf.rs b/src/query/management/tests/it/udf.rs index 5adafd95d0f1..89446da38df1 100644 --- a/src/query/management/tests/it/udf.rs +++ b/src/query/management/tests/it/udf.rs @@ -21,6 +21,7 @@ use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; use databend_common_management::*; use databend_common_meta_app::principal::UserDefinedFunction; +use databend_common_meta_app::schema::CreateOption; use databend_common_meta_embedded::MetaEmbedded; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_types::MatchSeq; @@ -32,7 +33,7 @@ async fn test_add_udf() -> Result<()> { // lambda udf let udf = create_test_lambda_udf(); - udf_api.add_udf(udf.clone()).await?; + udf_api.add_udf(udf.clone(), &CreateOption::CreateIfNotExists(false)).await?; let value = kv_api .get_kv(format!("__fd_udfs/admin/{}", udf.name).as_str()) .await?; @@ -52,7 +53,7 @@ async fn test_add_udf() -> Result<()> { } // udf server let udf = create_test_udf_server(); - udf_api.add_udf(udf.clone()).await?; + udf_api.add_udf(udf.clone(), &CreateOption::CreateIfNotExists(false)).await?; let value = kv_api .get_kv(format!("__fd_udfs/admin/{}", udf.name).as_str()) .await?; @@ -80,16 +81,16 @@ async fn test_already_exists_add_udf() -> Result<()> { // lambda udf let udf = create_test_lambda_udf(); - udf_api.add_udf(udf.clone()).await?; - match udf_api.add_udf(udf.clone()).await { + udf_api.add_udf(udf.clone(), &CreateOption::CreateIfNotExists(false)).await?; + match udf_api.add_udf(udf.clone(), &CreateOption::CreateIfNotExists(false)).await { Ok(_) => panic!("Already exists add udf must be return Err."), Err(cause) => assert_eq!(cause.code(), 2603), } // udf server let udf = create_test_udf_server(); - udf_api.add_udf(udf.clone()).await?; - match udf_api.add_udf(udf.clone()).await { + udf_api.add_udf(udf.clone(), &CreateOption::CreateIfNotExists(false)).await?; + match udf_api.add_udf(udf.clone(), &CreateOption::CreateIfNotExists(false)).await { Ok(_) => panic!("Already exists add udf must be return Err."), Err(cause) => assert_eq!(cause.code(), 2603), } @@ -107,8 +108,8 @@ async fn test_successfully_get_udfs() -> Result<()> { let lambda_udf = create_test_lambda_udf(); let udf_server = create_test_udf_server(); - udf_api.add_udf(lambda_udf.clone()).await?; - udf_api.add_udf(udf_server.clone()).await?; + udf_api.add_udf(lambda_udf.clone(), &CreateOption::CreateIfNotExists(false)).await?; + udf_api.add_udf(udf_server.clone(), &CreateOption::CreateIfNotExists(false)).await?; let udfs = udf_api.get_udfs().await?; assert_eq!(udfs, vec![lambda_udf, udf_server]); @@ -122,8 +123,8 @@ async fn test_successfully_drop_udf() -> Result<()> { let lambda_udf = create_test_lambda_udf(); let udf_server = create_test_udf_server(); - udf_api.add_udf(lambda_udf.clone()).await?; - udf_api.add_udf(udf_server.clone()).await?; + udf_api.add_udf(lambda_udf.clone(), &CreateOption::CreateIfNotExists(false)).await?; + udf_api.add_udf(udf_server.clone(), &CreateOption::CreateIfNotExists(false)).await?; let udfs = udf_api.get_udfs().await?; assert_eq!(udfs, vec![lambda_udf.clone(), udf_server.clone()]); diff --git a/src/query/service/src/interpreters/interpreter_user_udf_create.rs b/src/query/service/src/interpreters/interpreter_user_udf_create.rs index 3a6424807ef3..e1ca922fc341 100644 --- a/src/query/service/src/interpreters/interpreter_user_udf_create.rs +++ b/src/query/service/src/interpreters/interpreter_user_udf_create.rs @@ -54,7 +54,7 @@ impl Interpreter for CreateUserUDFInterpreter { let tenant = self.ctx.get_tenant(); let udf = plan.udf; let _ = UserApiProvider::instance() - .add_udf(&tenant, udf, plan.if_not_exists) + .add_udf(&tenant, udf, &plan.create_option) .await?; // Grant ownership as the current role diff --git a/src/query/sql/src/planner/binder/udf.rs b/src/query/sql/src/planner/binder/udf.rs index bde37f442b75..e2549d2e9056 100644 --- a/src/query/sql/src/planner/binder/udf.rs +++ b/src/query/sql/src/planner/binder/udf.rs @@ -129,7 +129,7 @@ impl Binder { .bind_udf_definition(&stmt.udf_name, &stmt.description, &stmt.definition) .await?; Ok(Plan::CreateUDF(Box::new(CreateUDFPlan { - if_not_exists: stmt.if_not_exists, + create_option: stmt.create_option.clone(), udf, }))) } diff --git a/src/query/sql/src/planner/plans/ddl/udf.rs b/src/query/sql/src/planner/plans/ddl/udf.rs index 3c03e5f86932..eaf53fa7d6ce 100644 --- a/src/query/sql/src/planner/plans/ddl/udf.rs +++ b/src/query/sql/src/planner/plans/ddl/udf.rs @@ -13,10 +13,11 @@ // limitations under the License. use databend_common_meta_app::principal::UserDefinedFunction; +use databend_common_meta_app::schema::CreateOption; #[derive(Clone, Debug, PartialEq, Eq)] pub struct CreateUDFPlan { - pub if_not_exists: bool, + pub create_option: CreateOption, pub udf: UserDefinedFunction, } diff --git a/src/query/users/src/user_udf.rs b/src/query/users/src/user_udf.rs index 81289ba581df..749b35b9730f 100644 --- a/src/query/users/src/user_udf.rs +++ b/src/query/users/src/user_udf.rs @@ -16,6 +16,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::UserDefinedFunction; use databend_common_meta_types::MatchSeq; +use databend_common_meta_app::schema::CreateOption; use crate::UserApiProvider; @@ -27,20 +28,10 @@ impl UserApiProvider { &self, tenant: &str, info: UserDefinedFunction, - if_not_exists: bool, - ) -> Result { + create_option: &CreateOption, + ) -> Result<()> { let udf_api_client = self.get_udf_api_client(tenant)?; - let add_udf = udf_api_client.add_udf(info); - match add_udf.await { - Ok(res) => Ok(res), - Err(e) => { - if if_not_exists && e.code() == ErrorCode::UDF_ALREADY_EXISTS { - Ok(u64::MIN) - } else { - Err(e) - } - } - } + udf_api_client.add_udf(info, create_option).await } // Update a UDF. diff --git a/src/query/users/tests/it/user_udf.rs b/src/query/users/tests/it/user_udf.rs index 0840383a3cef..0859c47bcf55 100644 --- a/src/query/users/tests/it/user_udf.rs +++ b/src/query/users/tests/it/user_udf.rs @@ -16,7 +16,7 @@ use databend_common_base::base::tokio; use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_grpc::RpcClientConf; -use databend_common_meta_app::principal::UserDefinedFunction; +use databend_common_meta_app::{principal::UserDefinedFunction, schema::CreateOption}; use databend_common_users::UserApiProvider; use pretty_assertions::assert_eq; @@ -29,7 +29,6 @@ async fn test_user_lambda_udf() -> Result<()> { let description = "this is a description"; let isempty = "isempty"; let isnotempty = "isnotempty"; - let if_not_exists = false; // add isempty. let isempty_udf = UserDefinedFunction::create_lambda_udf( @@ -39,7 +38,7 @@ async fn test_user_lambda_udf() -> Result<()> { description, ); user_mgr - .add_udf(tenant, isempty_udf.clone(), if_not_exists) + .add_udf(tenant, isempty_udf.clone(), &CreateOption::CreateIfNotExists(false)) .await?; // add isnotempty. @@ -50,7 +49,7 @@ async fn test_user_lambda_udf() -> Result<()> { description, ); user_mgr - .add_udf(tenant, isnotempty_udf.clone(), if_not_exists) + .add_udf(tenant, isnotempty_udf.clone(), &CreateOption::CreateIfNotExists(false)) .await?; // get all. @@ -99,7 +98,6 @@ async fn test_user_udf_server() -> Result<()> { let description = "this is a description"; let isempty = "isempty"; let isnotempty = "isnotempty"; - let if_not_exists = false; // add isempty. let isempty_udf = UserDefinedFunction::create_udf_server( @@ -112,7 +110,7 @@ async fn test_user_udf_server() -> Result<()> { description, ); user_mgr - .add_udf(tenant, isempty_udf.clone(), if_not_exists) + .add_udf(tenant, isempty_udf.clone(), &CreateOption::CreateIfNotExists(false)) .await?; // add isnotempty. @@ -126,7 +124,7 @@ async fn test_user_udf_server() -> Result<()> { description, ); user_mgr - .add_udf(tenant, isnotempty_udf.clone(), if_not_exists) + .add_udf(tenant, isnotempty_udf.clone(), &CreateOption::CreateIfNotExists(false)) .await?; // get all.