Skip to content

Commit

Permalink
feat: add support for create or replace udf
Browse files Browse the repository at this point in the history
Signed-off-by: Chojan Shang <[email protected]>
  • Loading branch information
PsiACE committed Feb 3, 2024
1 parent b3d942c commit 234f1af
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 55 deletions.
16 changes: 12 additions & 4 deletions src/query/ast/src/ast/statements/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::fmt::Display;
use std::fmt::Formatter;

use databend_common_meta_app::schema::CreateOption;

use crate::ast::write_comma_separated_list;
use crate::ast::Expr;
use crate::ast::Identifier;
Expand All @@ -37,7 +39,7 @@ pub enum UDFDefinition {

#[derive(Debug, Clone, PartialEq)]
pub struct CreateUDFStmt {
pub if_not_exists: bool,
pub create_option: CreateOption,
pub udf_name: Identifier,
pub description: Option<String>,
pub definition: UDFDefinition,
Expand Down Expand Up @@ -82,9 +84,15 @@ impl Display for UDFDefinition {

impl Display for CreateUDFStmt {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "CREATE FUNCTION")?;
if self.if_not_exists {
write!(f, " IF NOT EXISTS")?;
write!(f, "CREATE")?;
if let CreateOption::CreateOrReplace = self.create_option {
write!(f, " OR REPLACE")?;
}
write!(f, " FUNCTION")?;
if let CreateOption::CreateIfNotExists(if_not_exists) = self.create_option {
if if_not_exists {
write!(f, " IF NOT EXISTS")?;
}
}
write!(f, " {} {}", self.udf_name, self.definition)?;
if let Some(description) = &self.description {
Expand Down
16 changes: 9 additions & 7 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1182,19 +1182,21 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
})
},
);
let create_udf = map(
let create_udf = map_res(
rule! {
CREATE ~ FUNCTION ~ ( IF ~ ^NOT ~ ^EXISTS )?
CREATE ~ (OR ~ REPLACE)? ~ FUNCTION ~ ( IF ~ ^NOT ~ ^EXISTS )?
~ #ident ~ #udf_definition
~ ( DESC ~ ^"=" ~ ^#literal_string )?
},
|(_, _, opt_if_not_exists, udf_name, definition, opt_description)| {
Statement::CreateUDF(CreateUDFStmt {
if_not_exists: opt_if_not_exists.is_some(),
|(_, opt_or_replace, _, opt_if_not_exists, udf_name, definition, opt_description)| {
let create_option =
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;
Ok(Statement::CreateUDF(CreateUDFStmt {
create_option,
udf_name,
description: opt_description.map(|(_, _, description)| description),
definition,
})
}))
},
);
let drop_udf = map(
Expand Down Expand Up @@ -1881,7 +1883,7 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
| #show_roles : "`SHOW ROLES`"
| #create_role : "`CREATE ROLE [IF NOT EXISTS] <role_name>`"
| #drop_role : "`DROP ROLE [IF EXISTS] <role_name>`"
| #create_udf : "`CREATE FUNCTION [IF NOT EXISTS] <name> {AS (<parameter>, ...) -> <definition expr> | (<arg_type>, ...) RETURNS <return_type> LANGUAGE <language> HANDLER=<handler> ADDRESS=<udf_server_address>} [DESC = <description>]`"
| #create_udf : "`CREATE [OR REPLACE] FUNCTION [IF NOT EXISTS] <name> {AS (<parameter>, ...) -> <definition expr> | (<arg_type>, ...) RETURNS <return_type> LANGUAGE <language> HANDLER=<handler> ADDRESS=<udf_server_address>} [DESC = <description>]`"
| #drop_udf : "`DROP FUNCTION [IF EXISTS] <udf_name>`"
| #alter_udf : "`ALTER FUNCTION <udf_name> (<parameter>, ...) -> <definition_expr> [DESC = <description>]`"
| #set_role: "`SET [DEFAULT] ROLE <role>`"
Expand Down
3 changes: 2 additions & 1 deletion src/query/management/src/udf/udf_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@

use databend_common_exception::Result;
use databend_common_meta_app::principal::UserDefinedFunction;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::SeqV;

#[async_trait::async_trait]
pub trait UdfApi: Sync + Send {
// Add a UDF to /tenant/udf-name.
async fn add_udf(&self, udf: UserDefinedFunction) -> Result<u64>;
async fn add_udf(&self, udf: UserDefinedFunction, create_option: &CreateOption) -> Result<()>;

// Update a UDF to /tenant/udf-name.
async fn update_udf(&self, udf: UserDefinedFunction, seq: MatchSeq) -> Result<u64>;
Expand Down
32 changes: 22 additions & 10 deletions src/query/management/src/udf/udf_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_functions::is_builtin_function;
use databend_common_meta_app::principal::UserDefinedFunction;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::MatchSeq;
Expand Down Expand Up @@ -57,26 +58,37 @@ impl UdfMgr {
impl UdfApi for UdfMgr {
#[async_backtrace::framed]
#[minitrace::trace]
async fn add_udf(&self, info: UserDefinedFunction) -> Result<u64> {
async fn add_udf(&self, info: UserDefinedFunction, create_option: &CreateOption) -> Result<()> {
if is_builtin_function(info.name.as_str()) {
return Err(ErrorCode::UdfAlreadyExists(format!(
"It's a builtin function: {}",
info.name.as_str()
)));
}
let key = format!("{}/{}", self.udf_prefix, escape_for_key(&info.name)?);

let seq = MatchSeq::Exact(0);
let val = Operation::Update(serialize_struct(&info, ErrorCode::IllegalUDFFormat, || "")?);
let key = format!("{}/{}", self.udf_prefix, escape_for_key(&info.name)?);
let upsert_info = self
.kv_api
.upsert_kv(UpsertKVReq::new(&key, seq, val, None));

let res_seq = upsert_info.await?.added_seq_or_else(|_v| {
ErrorCode::UdfAlreadyExists(format!("UDF '{}' already exists.", info.name))
})?;
let seq = match create_option {
CreateOption::CreateIfNotExists(_) => MatchSeq::Exact(0),
CreateOption::CreateOrReplace => MatchSeq::GE(0),
};

let res = self
.kv_api
.upsert_kv(UpsertKVReq::new(&key, seq, val, None))
.await?;

if let CreateOption::CreateIfNotExists(false) = create_option {
if res.prev.is_some() {
return Err(ErrorCode::UdfAlreadyExists(format!(
"UDF '{}' already exists.",
info.name
)));
}
}

Ok(res_seq)
Ok(())
}

#[async_backtrace::framed]
Expand Down
21 changes: 11 additions & 10 deletions src/query/management/tests/it/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_management::*;
use databend_common_meta_app::principal::UserDefinedFunction;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_embedded::MetaEmbedded;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_types::MatchSeq;
Expand All @@ -32,7 +33,7 @@ async fn test_add_udf() -> Result<()> {

// lambda udf
let udf = create_test_lambda_udf();
udf_api.add_udf(udf.clone()).await?;
udf_api.add_udf(udf.clone(), &CreateOption::CreateIfNotExists(false)).await?;
let value = kv_api
.get_kv(format!("__fd_udfs/admin/{}", udf.name).as_str())
.await?;
Expand All @@ -52,7 +53,7 @@ async fn test_add_udf() -> Result<()> {
}
// udf server
let udf = create_test_udf_server();
udf_api.add_udf(udf.clone()).await?;
udf_api.add_udf(udf.clone(), &CreateOption::CreateIfNotExists(false)).await?;
let value = kv_api
.get_kv(format!("__fd_udfs/admin/{}", udf.name).as_str())
.await?;
Expand Down Expand Up @@ -80,16 +81,16 @@ async fn test_already_exists_add_udf() -> Result<()> {

// lambda udf
let udf = create_test_lambda_udf();
udf_api.add_udf(udf.clone()).await?;
match udf_api.add_udf(udf.clone()).await {
udf_api.add_udf(udf.clone(), &CreateOption::CreateIfNotExists(false)).await?;
match udf_api.add_udf(udf.clone(), &CreateOption::CreateIfNotExists(false)).await {
Ok(_) => panic!("Already exists add udf must be return Err."),
Err(cause) => assert_eq!(cause.code(), 2603),
}

// udf server
let udf = create_test_udf_server();
udf_api.add_udf(udf.clone()).await?;
match udf_api.add_udf(udf.clone()).await {
udf_api.add_udf(udf.clone(), &CreateOption::CreateIfNotExists(false)).await?;
match udf_api.add_udf(udf.clone(), &CreateOption::CreateIfNotExists(false)).await {
Ok(_) => panic!("Already exists add udf must be return Err."),
Err(cause) => assert_eq!(cause.code(), 2603),
}
Expand All @@ -107,8 +108,8 @@ async fn test_successfully_get_udfs() -> Result<()> {
let lambda_udf = create_test_lambda_udf();
let udf_server = create_test_udf_server();

udf_api.add_udf(lambda_udf.clone()).await?;
udf_api.add_udf(udf_server.clone()).await?;
udf_api.add_udf(lambda_udf.clone(), &CreateOption::CreateIfNotExists(false)).await?;
udf_api.add_udf(udf_server.clone(), &CreateOption::CreateIfNotExists(false)).await?;

let udfs = udf_api.get_udfs().await?;
assert_eq!(udfs, vec![lambda_udf, udf_server]);
Expand All @@ -122,8 +123,8 @@ async fn test_successfully_drop_udf() -> Result<()> {
let lambda_udf = create_test_lambda_udf();
let udf_server = create_test_udf_server();

udf_api.add_udf(lambda_udf.clone()).await?;
udf_api.add_udf(udf_server.clone()).await?;
udf_api.add_udf(lambda_udf.clone(), &CreateOption::CreateIfNotExists(false)).await?;
udf_api.add_udf(udf_server.clone(), &CreateOption::CreateIfNotExists(false)).await?;

let udfs = udf_api.get_udfs().await?;
assert_eq!(udfs, vec![lambda_udf.clone(), udf_server.clone()]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl Interpreter for CreateUserUDFInterpreter {
let tenant = self.ctx.get_tenant();
let udf = plan.udf;
let _ = UserApiProvider::instance()
.add_udf(&tenant, udf, plan.if_not_exists)
.add_udf(&tenant, udf, &plan.create_option)
.await?;

// Grant ownership as the current role
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Binder {
.bind_udf_definition(&stmt.udf_name, &stmt.description, &stmt.definition)
.await?;
Ok(Plan::CreateUDF(Box::new(CreateUDFPlan {
if_not_exists: stmt.if_not_exists,
create_option: stmt.create_option.clone(),
udf,
})))
}
Expand Down
3 changes: 2 additions & 1 deletion src/query/sql/src/planner/plans/ddl/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

use databend_common_meta_app::principal::UserDefinedFunction;
use databend_common_meta_app::schema::CreateOption;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CreateUDFPlan {
pub if_not_exists: bool,
pub create_option: CreateOption,
pub udf: UserDefinedFunction,
}

Expand Down
17 changes: 4 additions & 13 deletions src/query/users/src/user_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::UserDefinedFunction;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_app::schema::CreateOption;

use crate::UserApiProvider;

Expand All @@ -27,20 +28,10 @@ impl UserApiProvider {
&self,
tenant: &str,
info: UserDefinedFunction,
if_not_exists: bool,
) -> Result<u64> {
create_option: &CreateOption,
) -> Result<()> {
let udf_api_client = self.get_udf_api_client(tenant)?;
let add_udf = udf_api_client.add_udf(info);
match add_udf.await {
Ok(res) => Ok(res),
Err(e) => {
if if_not_exists && e.code() == ErrorCode::UDF_ALREADY_EXISTS {
Ok(u64::MIN)
} else {
Err(e)
}
}
}
udf_api_client.add_udf(info, create_option).await
}

// Update a UDF.
Expand Down
12 changes: 5 additions & 7 deletions src/query/users/tests/it/user_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use databend_common_base::base::tokio;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_grpc::RpcClientConf;
use databend_common_meta_app::principal::UserDefinedFunction;
use databend_common_meta_app::{principal::UserDefinedFunction, schema::CreateOption};
use databend_common_users::UserApiProvider;
use pretty_assertions::assert_eq;

Expand All @@ -29,7 +29,6 @@ async fn test_user_lambda_udf() -> Result<()> {
let description = "this is a description";
let isempty = "isempty";
let isnotempty = "isnotempty";
let if_not_exists = false;

// add isempty.
let isempty_udf = UserDefinedFunction::create_lambda_udf(
Expand All @@ -39,7 +38,7 @@ async fn test_user_lambda_udf() -> Result<()> {
description,
);
user_mgr
.add_udf(tenant, isempty_udf.clone(), if_not_exists)
.add_udf(tenant, isempty_udf.clone(), &CreateOption::CreateIfNotExists(false))
.await?;

// add isnotempty.
Expand All @@ -50,7 +49,7 @@ async fn test_user_lambda_udf() -> Result<()> {
description,
);
user_mgr
.add_udf(tenant, isnotempty_udf.clone(), if_not_exists)
.add_udf(tenant, isnotempty_udf.clone(), &CreateOption::CreateIfNotExists(false))
.await?;

// get all.
Expand Down Expand Up @@ -99,7 +98,6 @@ async fn test_user_udf_server() -> Result<()> {
let description = "this is a description";
let isempty = "isempty";
let isnotempty = "isnotempty";
let if_not_exists = false;

// add isempty.
let isempty_udf = UserDefinedFunction::create_udf_server(
Expand All @@ -112,7 +110,7 @@ async fn test_user_udf_server() -> Result<()> {
description,
);
user_mgr
.add_udf(tenant, isempty_udf.clone(), if_not_exists)
.add_udf(tenant, isempty_udf.clone(), &CreateOption::CreateIfNotExists(false))
.await?;

// add isnotempty.
Expand All @@ -126,7 +124,7 @@ async fn test_user_udf_server() -> Result<()> {
description,
);
user_mgr
.add_udf(tenant, isnotempty_udf.clone(), if_not_exists)
.add_udf(tenant, isnotempty_udf.clone(), &CreateOption::CreateIfNotExists(false))
.await?;

// get all.
Expand Down

0 comments on commit 234f1af

Please sign in to comment.