Skip to content

Commit

Permalink
feat: add create or replace connection support (#14580)
Browse files Browse the repository at this point in the history
* feat: add create or replace connection support

* feat: add create or replace connection support

* feat: add create or replace connection support
  • Loading branch information
lichuang authored Feb 3, 2024
1 parent 9323427 commit 0e64c72
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 39 deletions.
15 changes: 11 additions & 4 deletions src/query/ast/src/ast/statements/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ use std::fmt::Display;
use std::fmt::Formatter;

use databend_common_base::base::mask_string;
use databend_common_meta_app::schema::CreateOption;

use crate::ast::Identifier;

#[derive(Debug, Clone, PartialEq)]
pub struct CreateConnectionStmt {
pub if_not_exists: bool,
pub name: Identifier,
pub storage_type: String,
pub storage_params: BTreeMap<String, String>,
pub create_option: CreateOption,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand All @@ -44,9 +45,15 @@ pub struct ShowConnectionsStmt {}

impl Display for CreateConnectionStmt {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "CREATE CONNECTION ")?;
if self.if_not_exists {
write!(f, "IF NOT EXISTS ")?;
write!(f, "CREATE")?;
if let CreateOption::CreateOrReplace = self.create_option {
write!(f, " OR REPLACE")?;
}
write!(f, " CONNECTION ")?;
if let CreateOption::CreateIfNotExists(if_not_exists) = self.create_option {
if if_not_exists {
write!(f, "IF NOT EXISTS ")?;
}
}
write!(f, "{} ", self.name)?;
write!(f, "STORAGE_TYPE = {} ", self.storage_type)?;
Expand Down
20 changes: 16 additions & 4 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1306,14 +1306,26 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
let connection_opt = connection_opt("=");
let create_connection = map_res(
rule! {
CREATE ~ CONNECTION ~ ( IF ~ ^NOT ~ ^EXISTS )?
CREATE ~ (OR ~ REPLACE)? ~ CONNECTION ~ ( IF ~ ^NOT ~ ^EXISTS )?
~ #ident ~ STORAGE_TYPE ~ "=" ~ #literal_string ~ #connection_opt*
},
|(_, _, opt_if_not_exists, connection_name, _, _, storage_type, options)| {
|(
_,
opt_or_replace,
_,
opt_if_not_exists,
connection_name,
_,
_,
storage_type,
options,
)| {
let create_option =
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;
let options =
BTreeMap::from_iter(options.iter().map(|(k, v)| (k.to_lowercase(), v.clone())));
Ok(Statement::CreateConnection(CreateConnectionStmt {
if_not_exists: opt_if_not_exists.is_some(),
create_option,
name: connection_name,
storage_type,
storage_params: options,
Expand Down Expand Up @@ -1970,7 +1982,7 @@ AS

),
rule!(
#create_connection: "`CREATE CONNECTION [IF NOT EXISTS] <connection_name> STORAGE_TYPE = <type> <storage_configs>`"
#create_connection: "`CREATE [OR REPLACE] CONNECTION [IF NOT EXISTS] <connection_name> STORAGE_TYPE = <type> <storage_configs>`"
| #drop_connection: "`DROP CONNECTION [IF EXISTS] <connection_name>`"
| #desc_connection: "`DESC | DESCRIBE CONNECTION <connection_name>`"
| #show_connections: "`SHOW CONNECTIONS`"
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ fn test_statement() {
r#"DESC TASK MyTask"#,
r#"CREATE CONNECTION IF NOT EXISTS my_conn STORAGE_TYPE='s3'"#,
r#"CREATE CONNECTION IF NOT EXISTS my_conn STORAGE_TYPE='s3' any_arg='any_value'"#,
r#"CREATE OR REPLACE CONNECTION my_conn STORAGE_TYPE='s3' any_arg='any_value'"#,
r#"DROP CONNECTION IF EXISTS my_conn;"#,
r#"DESC CONNECTION my_conn;"#,
r#"SHOW CONNECTIONS;"#,
Expand Down
2 changes: 1 addition & 1 deletion src/query/ast/tests/it/testdata/statement-error.txt
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ error:
1 | CREATE CONNECTION IF NOT EXISTS my_conn
| ------ ^ unexpected end of line, expecting `STORAGE_TYPE`
| |
| while parsing `CREATE CONNECTION [IF NOT EXISTS] <connection_name> STORAGE_TYPE = <type> <storage_configs>`
| while parsing `CREATE [OR REPLACE] CONNECTION [IF NOT EXISTS] <connection_name> STORAGE_TYPE = <type> <storage_configs>`


---------- Input ----------
Expand Down
31 changes: 29 additions & 2 deletions src/query/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14570,7 +14570,6 @@ CREATE CONNECTION IF NOT EXISTS my_conn STORAGE_TYPE = s3
---------- AST ------------
CreateConnection(
CreateConnectionStmt {
if_not_exists: true,
name: Identifier {
name: "my_conn",
quote: None,
Expand All @@ -14580,6 +14579,9 @@ CreateConnection(
},
storage_type: "s3",
storage_params: {},
create_option: CreateIfNotExists(
true,
),
},
)

Expand All @@ -14591,7 +14593,6 @@ CREATE CONNECTION IF NOT EXISTS my_conn STORAGE_TYPE = s3 any_arg = ******lue
---------- AST ------------
CreateConnection(
CreateConnectionStmt {
if_not_exists: true,
name: Identifier {
name: "my_conn",
quote: None,
Expand All @@ -14603,6 +14604,32 @@ CreateConnection(
storage_params: {
"any_arg": "any_value",
},
create_option: CreateIfNotExists(
true,
),
},
)


---------- Input ----------
CREATE OR REPLACE CONNECTION my_conn STORAGE_TYPE='s3' any_arg='any_value'
---------- Output ---------
CREATE OR REPLACE CONNECTION my_conn STORAGE_TYPE = s3 any_arg = ******lue
---------- AST ------------
CreateConnection(
CreateConnectionStmt {
name: Identifier {
name: "my_conn",
quote: None,
span: Some(
29..36,
),
},
storage_type: "s3",
storage_params: {
"any_arg": "any_value",
},
create_option: CreateOrReplace,
},
)

Expand Down
7 changes: 6 additions & 1 deletion src/query/management/src/connection/connection_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@

use databend_common_exception::Result;
use databend_common_meta_app::principal::UserDefinedConnection;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::SeqV;

#[async_trait::async_trait]
pub trait ConnectionApi: Sync + Send {
// Add a connection info to /tenant/connection-name.
async fn add_connection(&self, connection: UserDefinedConnection) -> Result<u64>;
async fn add_connection(
&self,
connection: UserDefinedConnection,
create_option: &CreateOption,
) -> Result<()>;

async fn get_connection(
&self,
Expand Down
34 changes: 23 additions & 11 deletions src/query/management/src/connection/connection_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use databend_common_base::base::escape_for_key;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::UserDefinedConnection;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::MatchSeq;
Expand Down Expand Up @@ -60,26 +61,37 @@ impl ConnectionMgr {
impl ConnectionApi for ConnectionMgr {
#[async_backtrace::framed]
#[minitrace::trace]
async fn add_connection(&self, info: UserDefinedConnection) -> Result<u64> {
let seq = MatchSeq::Exact(0);
async fn add_connection(
&self,
info: UserDefinedConnection,
create_option: &CreateOption,
) -> Result<()> {
let val = Operation::Update(serialize_struct(
&info,
ErrorCode::IllegalConnection,
|| "",
)?);
let key = format!("{}/{}", self.connection_prefix, escape_for_key(&info.name)?);
let upsert_info = self
let seq = match create_option {
CreateOption::CreateIfNotExists(_) => MatchSeq::Exact(0),
CreateOption::CreateOrReplace => MatchSeq::GE(0),
};

let res = self
.kv_api
.upsert_kv(UpsertKVReq::new(&key, seq, val, None));
.upsert_kv(UpsertKVReq::new(&key, seq, val, None))
.await?;

let res_seq = upsert_info.await?.added_seq_or_else(|_v| {
ErrorCode::ConnectionAlreadyExists(format!(
"Connection '{}' already exists.",
info.name
))
})?;
if let CreateOption::CreateIfNotExists(false) = create_option {
if res.prev.is_some() {
return Err(ErrorCode::ConnectionAlreadyExists(format!(
"Connection '{}' already exists.",
info.name
)));
}
}

Ok(res_seq)
Ok(())
}

#[async_backtrace::framed]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Interpreter for CreateConnectionInterpreter {

let tenant = self.ctx.get_tenant();
let _create_file_format = user_mgr
.add_connection(&tenant, conn, plan.if_not_exists)
.add_connection(&tenant, conn, &plan.create_option)
.await?;

Ok(PipelineBuildResult::create())
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Binder {
);
parse_storage_params_from_uri(&mut location, None, "when CREATE CONNECTION").await?;
Ok(Plan::CreateConnection(Box::new(CreateConnectionPlan {
if_not_exists: stmt.if_not_exists,
create_option: stmt.create_option.clone(),
name: stmt.name.to_string(),
storage_type: stmt.storage_type.clone(),
storage_params: stmt.storage_params.clone(),
Expand Down
3 changes: 2 additions & 1 deletion src/query/sql/src/planner/plans/ddl/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use databend_common_expression::types::DataType;
use databend_common_expression::DataField;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;
use databend_common_meta_app::schema::CreateOption;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CreateConnectionPlan {
pub if_not_exists: bool,
pub name: String,
pub storage_type: String,
pub storage_params: BTreeMap<String, String>,
pub create_option: CreateOption,
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down
19 changes: 6 additions & 13 deletions src/query/users/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::UserDefinedConnection;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_types::MatchSeq;

use crate::UserApiProvider;
Expand All @@ -27,20 +28,12 @@ impl UserApiProvider {
&self,
tenant: &str,
connection: UserDefinedConnection,
if_not_exists: bool,
) -> Result<u64> {
create_option: &CreateOption,
) -> Result<()> {
let connection_api_provider = self.get_connection_api_client(tenant)?;
let add_connection = connection_api_provider.add_connection(connection);
match add_connection.await {
Ok(res) => Ok(res),
Err(e) => {
if if_not_exists && e.code() == ErrorCode::CONNECTION_ALREADY_EXISTS {
Ok(u64::MIN)
} else {
Err(e)
}
}
}
connection_api_provider
.add_connection(connection, create_option)
.await
}

// Get one connection from by tenant.
Expand Down
15 changes: 15 additions & 0 deletions tests/sqllogictests/suites/base/05_ddl/05_0033_ddl_connection.test
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,18 @@ DESC CONNECTION test_conn
query TTT
SHOW CONNECTIONS
----

statement ok
DROP CONNECTION IF EXISTS test_replace_conn

statement error 1005
CREATE OR REPLACE CONNECTION IF NOT EXISTS test_replace_conn STORAGE_TYPE='azblob' ENDPOINT_URL='http://s3.amazonaws.com';

statement ok
CREATE OR REPLACE CONNECTION test_replace_conn STORAGE_TYPE='azblob' ENDPOINT_URL='http://s3.amazonaws.com';

statement ok
CREATE OR REPLACE CONNECTION test_replace_conn STORAGE_TYPE='S3';

statement ok
DROP CONNECTION IF EXISTS test_replace_conn

0 comments on commit 0e64c72

Please sign in to comment.