diff --git a/src/query/ast/src/ast/statements/connection.rs b/src/query/ast/src/ast/statements/connection.rs index d63094ddd5a7..5d02535ac499 100644 --- a/src/query/ast/src/ast/statements/connection.rs +++ b/src/query/ast/src/ast/statements/connection.rs @@ -17,15 +17,16 @@ use std::fmt::Display; use std::fmt::Formatter; use databend_common_base::base::mask_string; +use databend_common_meta_app::schema::CreateOption; use crate::ast::Identifier; #[derive(Debug, Clone, PartialEq)] pub struct CreateConnectionStmt { - pub if_not_exists: bool, pub name: Identifier, pub storage_type: String, pub storage_params: BTreeMap, + pub create_option: CreateOption, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -44,9 +45,15 @@ pub struct ShowConnectionsStmt {} impl Display for CreateConnectionStmt { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "CREATE CONNECTION ")?; - if self.if_not_exists { - write!(f, "IF NOT EXISTS ")?; + write!(f, "CREATE")?; + if let CreateOption::CreateOrReplace = self.create_option { + write!(f, " OR REPLACE")?; + } + write!(f, " CONNECTION ")?; + if let CreateOption::CreateIfNotExists(if_not_exists) = self.create_option { + if if_not_exists { + write!(f, "IF NOT EXISTS ")?; + } } write!(f, "{} ", self.name)?; write!(f, "STORAGE_TYPE = {} ", self.storage_type)?; diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index 0089c7cbca26..46dad9763790 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -1306,14 +1306,26 @@ pub fn statement(i: Input) -> IResult { let connection_opt = connection_opt("="); let create_connection = map_res( rule! { - CREATE ~ CONNECTION ~ ( IF ~ ^NOT ~ ^EXISTS )? + CREATE ~ (OR ~ REPLACE)? ~ CONNECTION ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ #ident ~ STORAGE_TYPE ~ "=" ~ #literal_string ~ #connection_opt* }, - |(_, _, opt_if_not_exists, connection_name, _, _, storage_type, options)| { + |( + _, + opt_or_replace, + _, + opt_if_not_exists, + connection_name, + _, + _, + storage_type, + options, + )| { + let create_option = + parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?; let options = BTreeMap::from_iter(options.iter().map(|(k, v)| (k.to_lowercase(), v.clone()))); Ok(Statement::CreateConnection(CreateConnectionStmt { - if_not_exists: opt_if_not_exists.is_some(), + create_option, name: connection_name, storage_type, storage_params: options, @@ -1970,7 +1982,7 @@ AS ), rule!( - #create_connection: "`CREATE CONNECTION [IF NOT EXISTS] STORAGE_TYPE = `" + #create_connection: "`CREATE [OR REPLACE] CONNECTION [IF NOT EXISTS] STORAGE_TYPE = `" | #drop_connection: "`DROP CONNECTION [IF EXISTS] `" | #desc_connection: "`DESC | DESCRIBE CONNECTION `" | #show_connections: "`SHOW CONNECTIONS`" diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index cbccf4695f9a..2eadbb53774b 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -536,6 +536,7 @@ fn test_statement() { r#"DESC TASK MyTask"#, r#"CREATE CONNECTION IF NOT EXISTS my_conn STORAGE_TYPE='s3'"#, r#"CREATE CONNECTION IF NOT EXISTS my_conn STORAGE_TYPE='s3' any_arg='any_value'"#, + r#"CREATE OR REPLACE CONNECTION my_conn STORAGE_TYPE='s3' any_arg='any_value'"#, r#"DROP CONNECTION IF EXISTS my_conn;"#, r#"DESC CONNECTION my_conn;"#, r#"SHOW CONNECTIONS;"#, diff --git a/src/query/ast/tests/it/testdata/statement-error.txt b/src/query/ast/tests/it/testdata/statement-error.txt index e96694461f17..915e23a6aa52 100644 --- a/src/query/ast/tests/it/testdata/statement-error.txt +++ b/src/query/ast/tests/it/testdata/statement-error.txt @@ -739,7 +739,7 @@ error: 1 | CREATE CONNECTION IF NOT EXISTS my_conn | ------ ^ unexpected end of line, expecting `STORAGE_TYPE` | | - | while parsing `CREATE CONNECTION [IF NOT EXISTS] STORAGE_TYPE = ` + | while parsing `CREATE [OR REPLACE] CONNECTION [IF NOT EXISTS] STORAGE_TYPE = ` ---------- Input ---------- diff --git a/src/query/ast/tests/it/testdata/statement.txt b/src/query/ast/tests/it/testdata/statement.txt index debf50412ae8..e7e53f9f3981 100644 --- a/src/query/ast/tests/it/testdata/statement.txt +++ b/src/query/ast/tests/it/testdata/statement.txt @@ -14570,7 +14570,6 @@ CREATE CONNECTION IF NOT EXISTS my_conn STORAGE_TYPE = s3 ---------- AST ------------ CreateConnection( CreateConnectionStmt { - if_not_exists: true, name: Identifier { name: "my_conn", quote: None, @@ -14580,6 +14579,9 @@ CreateConnection( }, storage_type: "s3", storage_params: {}, + create_option: CreateIfNotExists( + true, + ), }, ) @@ -14591,7 +14593,6 @@ CREATE CONNECTION IF NOT EXISTS my_conn STORAGE_TYPE = s3 any_arg = ******lue ---------- AST ------------ CreateConnection( CreateConnectionStmt { - if_not_exists: true, name: Identifier { name: "my_conn", quote: None, @@ -14603,6 +14604,32 @@ CreateConnection( storage_params: { "any_arg": "any_value", }, + create_option: CreateIfNotExists( + true, + ), + }, +) + + +---------- Input ---------- +CREATE OR REPLACE CONNECTION my_conn STORAGE_TYPE='s3' any_arg='any_value' +---------- Output --------- +CREATE OR REPLACE CONNECTION my_conn STORAGE_TYPE = s3 any_arg = ******lue +---------- AST ------------ +CreateConnection( + CreateConnectionStmt { + name: Identifier { + name: "my_conn", + quote: None, + span: Some( + 29..36, + ), + }, + storage_type: "s3", + storage_params: { + "any_arg": "any_value", + }, + create_option: CreateOrReplace, }, ) diff --git a/src/query/management/src/connection/connection_api.rs b/src/query/management/src/connection/connection_api.rs index 53961925c55d..390763d82fbc 100644 --- a/src/query/management/src/connection/connection_api.rs +++ b/src/query/management/src/connection/connection_api.rs @@ -14,13 +14,18 @@ use databend_common_exception::Result; use databend_common_meta_app::principal::UserDefinedConnection; +use databend_common_meta_app::schema::CreateOption; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::SeqV; #[async_trait::async_trait] pub trait ConnectionApi: Sync + Send { // Add a connection info to /tenant/connection-name. - async fn add_connection(&self, connection: UserDefinedConnection) -> Result; + async fn add_connection( + &self, + connection: UserDefinedConnection, + create_option: &CreateOption, + ) -> Result<()>; async fn get_connection( &self, diff --git a/src/query/management/src/connection/connection_mgr.rs b/src/query/management/src/connection/connection_mgr.rs index 6548419de044..78ef35715dfa 100644 --- a/src/query/management/src/connection/connection_mgr.rs +++ b/src/query/management/src/connection/connection_mgr.rs @@ -18,6 +18,7 @@ use databend_common_base::base::escape_for_key; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::UserDefinedConnection; +use databend_common_meta_app::schema::CreateOption; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::MatchSeq; @@ -60,26 +61,37 @@ impl ConnectionMgr { impl ConnectionApi for ConnectionMgr { #[async_backtrace::framed] #[minitrace::trace] - async fn add_connection(&self, info: UserDefinedConnection) -> Result { - let seq = MatchSeq::Exact(0); + async fn add_connection( + &self, + info: UserDefinedConnection, + create_option: &CreateOption, + ) -> Result<()> { let val = Operation::Update(serialize_struct( &info, ErrorCode::IllegalConnection, || "", )?); let key = format!("{}/{}", self.connection_prefix, escape_for_key(&info.name)?); - let upsert_info = self + let seq = match create_option { + CreateOption::CreateIfNotExists(_) => MatchSeq::Exact(0), + CreateOption::CreateOrReplace => MatchSeq::GE(0), + }; + + let res = self .kv_api - .upsert_kv(UpsertKVReq::new(&key, seq, val, None)); + .upsert_kv(UpsertKVReq::new(&key, seq, val, None)) + .await?; - let res_seq = upsert_info.await?.added_seq_or_else(|_v| { - ErrorCode::ConnectionAlreadyExists(format!( - "Connection '{}' already exists.", - info.name - )) - })?; + if let CreateOption::CreateIfNotExists(false) = create_option { + if res.prev.is_some() { + return Err(ErrorCode::ConnectionAlreadyExists(format!( + "Connection '{}' already exists.", + info.name + ))); + } + } - Ok(res_seq) + Ok(()) } #[async_backtrace::framed] diff --git a/src/query/service/src/interpreters/interpreter_connection_create.rs b/src/query/service/src/interpreters/interpreter_connection_create.rs index 85fc8392ff63..eb5cadf6340c 100644 --- a/src/query/service/src/interpreters/interpreter_connection_create.rs +++ b/src/query/service/src/interpreters/interpreter_connection_create.rs @@ -58,7 +58,7 @@ impl Interpreter for CreateConnectionInterpreter { let tenant = self.ctx.get_tenant(); let _create_file_format = user_mgr - .add_connection(&tenant, conn, plan.if_not_exists) + .add_connection(&tenant, conn, &plan.create_option) .await?; Ok(PipelineBuildResult::create()) diff --git a/src/query/sql/src/planner/binder/ddl/connection.rs b/src/query/sql/src/planner/binder/ddl/connection.rs index da2e1b62afbc..be3412f0ed18 100644 --- a/src/query/sql/src/planner/binder/ddl/connection.rs +++ b/src/query/sql/src/planner/binder/ddl/connection.rs @@ -36,7 +36,7 @@ impl Binder { ); parse_storage_params_from_uri(&mut location, None, "when CREATE CONNECTION").await?; Ok(Plan::CreateConnection(Box::new(CreateConnectionPlan { - if_not_exists: stmt.if_not_exists, + create_option: stmt.create_option.clone(), name: stmt.name.to_string(), storage_type: stmt.storage_type.clone(), storage_params: stmt.storage_params.clone(), diff --git a/src/query/sql/src/planner/plans/ddl/connection.rs b/src/query/sql/src/planner/plans/ddl/connection.rs index c6a301fe63b1..03429f4bcb7e 100644 --- a/src/query/sql/src/planner/plans/ddl/connection.rs +++ b/src/query/sql/src/planner/plans/ddl/connection.rs @@ -19,13 +19,14 @@ use databend_common_expression::types::DataType; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; +use databend_common_meta_app::schema::CreateOption; #[derive(Clone, Debug, PartialEq, Eq)] pub struct CreateConnectionPlan { - pub if_not_exists: bool, pub name: String, pub storage_type: String, pub storage_params: BTreeMap, + pub create_option: CreateOption, } #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/src/query/users/src/connection.rs b/src/query/users/src/connection.rs index 416a835d3471..b781d812e461 100644 --- a/src/query/users/src/connection.rs +++ b/src/query/users/src/connection.rs @@ -15,6 +15,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::UserDefinedConnection; +use databend_common_meta_app::schema::CreateOption; use databend_common_meta_types::MatchSeq; use crate::UserApiProvider; @@ -27,20 +28,12 @@ impl UserApiProvider { &self, tenant: &str, connection: UserDefinedConnection, - if_not_exists: bool, - ) -> Result { + create_option: &CreateOption, + ) -> Result<()> { let connection_api_provider = self.get_connection_api_client(tenant)?; - let add_connection = connection_api_provider.add_connection(connection); - match add_connection.await { - Ok(res) => Ok(res), - Err(e) => { - if if_not_exists && e.code() == ErrorCode::CONNECTION_ALREADY_EXISTS { - Ok(u64::MIN) - } else { - Err(e) - } - } - } + connection_api_provider + .add_connection(connection, create_option) + .await } // Get one connection from by tenant. diff --git a/tests/sqllogictests/suites/base/05_ddl/05_0033_ddl_connection.test b/tests/sqllogictests/suites/base/05_ddl/05_0033_ddl_connection.test index 19348ca72b53..0976ddf294f0 100644 --- a/tests/sqllogictests/suites/base/05_ddl/05_0033_ddl_connection.test +++ b/tests/sqllogictests/suites/base/05_ddl/05_0033_ddl_connection.test @@ -47,3 +47,18 @@ DESC CONNECTION test_conn query TTT SHOW CONNECTIONS ---- + +statement ok +DROP CONNECTION IF EXISTS test_replace_conn + +statement error 1005 +CREATE OR REPLACE CONNECTION IF NOT EXISTS test_replace_conn STORAGE_TYPE='azblob' ENDPOINT_URL='http://s3.amazonaws.com'; + +statement ok +CREATE OR REPLACE CONNECTION test_replace_conn STORAGE_TYPE='azblob' ENDPOINT_URL='http://s3.amazonaws.com'; + +statement ok +CREATE OR REPLACE CONNECTION test_replace_conn STORAGE_TYPE='S3'; + +statement ok +DROP CONNECTION IF EXISTS test_replace_conn