diff --git a/src/query/ast/src/ast/statements/network_policy.rs b/src/query/ast/src/ast/statements/network_policy.rs index 5cb9a7e90451..7f3b9fbea8f9 100644 --- a/src/query/ast/src/ast/statements/network_policy.rs +++ b/src/query/ast/src/ast/statements/network_policy.rs @@ -15,9 +15,11 @@ use std::fmt::Display; use std::fmt::Formatter; +use databend_common_meta_app::schema::CreateOption; + #[derive(Debug, Clone, PartialEq)] pub struct CreateNetworkPolicyStmt { - pub if_not_exists: bool, + pub create_option: CreateOption, pub name: String, pub allowed_ip_list: Vec, pub blocked_ip_list: Option>, @@ -26,9 +28,15 @@ pub struct CreateNetworkPolicyStmt { impl Display for CreateNetworkPolicyStmt { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "CREATE NETWORK POLICY ")?; - if self.if_not_exists { - write!(f, "IF NOT EXISTS ")?; + write!(f, "CREATE ")?; + if let CreateOption::CreateOrReplace = self.create_option { + write!(f, "OR REPLACE ")?; + } + write!(f, "NETWORK POLICY ")?; + if let CreateOption::CreateIfNotExists(if_not_exists) = self.create_option { + if if_not_exists { + write!(f, "IF NOT EXISTS ")?; + } } write!(f, "{}", self.name)?; write!(f, " ALLOWED_IP_LIST = (")?; diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index c08bf15777e9..ed6f3e734ef8 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -1619,15 +1619,16 @@ pub fn statement(i: Input) -> IResult { }, ); - let create_network_policy = map( + let create_network_policy = map_res( rule! { - CREATE ~ NETWORK ~ ^POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ ^#ident + CREATE ~ (OR ~ REPLACE)? ~ NETWORK ~ ^POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ ^#ident ~ ALLOWED_IP_LIST ~ ^Eq ~ ^"(" ~ ^#comma_separated_list0(literal_string) ~ ^")" ~ ( BLOCKED_IP_LIST ~ ^Eq ~ ^"(" ~ ^#comma_separated_list0(literal_string) ~ ^")" ) ? ~ ( COMMENT ~ ^Eq ~ ^#literal_string)? }, |( _, + opt_or_replace, _, _, opt_if_not_exists, @@ -1640,8 +1641,10 @@ pub fn statement(i: Input) -> IResult { opt_blocked_ip_list, opt_comment, )| { + let create_option = + parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?; let stmt = CreateNetworkPolicyStmt { - if_not_exists: opt_if_not_exists.is_some(), + create_option, name: name.to_string(), allowed_ip_list, blocked_ip_list: match opt_blocked_ip_list { @@ -1653,7 +1656,7 @@ pub fn statement(i: Input) -> IResult { None => None, }, }; - Statement::CreateNetworkPolicy(stmt) + Ok(Statement::CreateNetworkPolicy(stmt)) }, ); let alter_network_policy = map( diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index a458f4bbe0af..f8f500837407 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -519,6 +519,7 @@ fn test_statement() { r#"DROP VIRTUAL COLUMN FOR t"#, r#"REFRESH VIRTUAL COLUMN FOR t"#, r#"CREATE NETWORK POLICY mypolicy ALLOWED_IP_LIST=('192.168.10.0/24') BLOCKED_IP_LIST=('192.168.10.99') COMMENT='test'"#, + r#"CREATE OR REPLACE NETWORK POLICY mypolicy ALLOWED_IP_LIST=('192.168.10.0/24') BLOCKED_IP_LIST=('192.168.10.99') COMMENT='test'"#, r#"ALTER NETWORK POLICY mypolicy SET ALLOWED_IP_LIST=('192.168.10.0/24','192.168.255.1') BLOCKED_IP_LIST=('192.168.1.99') COMMENT='test'"#, // tasks r#"CREATE TASK IF NOT EXISTS MyTask1 WAREHOUSE = 'MyWarehouse' SCHEDULE = 15 MINUTE SUSPEND_TASK_AFTER_NUM_FAILURES = 3 COMMENT = 'This is test task 1' DATABASE = 'target', TIMEZONE = 'America/Los Angeles' AS SELECT * FROM MyTable1"#, diff --git a/src/query/ast/tests/it/testdata/statement.txt b/src/query/ast/tests/it/testdata/statement.txt index fee70a8ddc38..d7dd863b820f 100644 --- a/src/query/ast/tests/it/testdata/statement.txt +++ b/src/query/ast/tests/it/testdata/statement.txt @@ -14839,7 +14839,33 @@ CREATE NETWORK POLICY mypolicy ALLOWED_IP_LIST = ('192.168.10.0/24') BLOCKED_IP_ ---------- AST ------------ CreateNetworkPolicy( CreateNetworkPolicyStmt { - if_not_exists: false, + create_option: CreateIfNotExists( + false, + ), + name: "mypolicy", + allowed_ip_list: [ + "192.168.10.0/24", + ], + blocked_ip_list: Some( + [ + "192.168.10.99", + ], + ), + comment: Some( + "test", + ), + }, +) + + +---------- Input ---------- +CREATE OR REPLACE NETWORK POLICY mypolicy ALLOWED_IP_LIST=('192.168.10.0/24') BLOCKED_IP_LIST=('192.168.10.99') COMMENT='test' +---------- Output --------- +CREATE OR REPLACE NETWORK POLICY mypolicy ALLOWED_IP_LIST = ('192.168.10.0/24') BLOCKED_IP_LIST = ('192.168.10.99') COMMENT = 'test' +---------- AST ------------ +CreateNetworkPolicy( + CreateNetworkPolicyStmt { + create_option: CreateOrReplace, name: "mypolicy", allowed_ip_list: [ "192.168.10.0/24", diff --git a/src/query/management/src/network_policy/network_policy_api.rs b/src/query/management/src/network_policy/network_policy_api.rs index ed51bf17c982..cbabab5b718c 100644 --- a/src/query/management/src/network_policy/network_policy_api.rs +++ b/src/query/management/src/network_policy/network_policy_api.rs @@ -14,12 +14,17 @@ use databend_common_exception::Result; use databend_common_meta_app::principal::NetworkPolicy; +use databend_common_meta_app::schema::CreateOption; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::SeqV; #[async_trait::async_trait] pub trait NetworkPolicyApi: Sync + Send { - async fn add_network_policy(&self, network_policy: NetworkPolicy) -> Result; + async fn add_network_policy( + &self, + network_policy: NetworkPolicy, + create_option: &CreateOption, + ) -> Result<()>; async fn update_network_policy( &self, diff --git a/src/query/management/src/network_policy/network_policy_mgr.rs b/src/query/management/src/network_policy/network_policy_mgr.rs index f6ad997e6977..61f341584852 100644 --- a/src/query/management/src/network_policy/network_policy_mgr.rs +++ b/src/query/management/src/network_policy/network_policy_mgr.rs @@ -18,6 +18,7 @@ use databend_common_base::base::escape_for_key; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::NetworkPolicy; +use databend_common_meta_app::schema::CreateOption; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::UpsertKVReq; use databend_common_meta_types::MatchSeq; @@ -67,8 +68,12 @@ impl NetworkPolicyMgr { impl NetworkPolicyApi for NetworkPolicyMgr { #[async_backtrace::framed] #[minitrace::trace] - async fn add_network_policy(&self, network_policy: NetworkPolicy) -> Result { - let match_seq = MatchSeq::Exact(0); + async fn add_network_policy( + &self, + network_policy: NetworkPolicy, + create_option: &CreateOption, + ) -> Result<()> { + let seq = MatchSeq::from(*create_option); let key = self.make_network_policy_key(network_policy.name.as_str())?; let value = Operation::Update(serialize_struct( &network_policy, @@ -77,16 +82,20 @@ impl NetworkPolicyApi for NetworkPolicyMgr { )?); let kv_api = self.kv_api.clone(); - let upsert_kv = kv_api.upsert_kv(UpsertKVReq::new(&key, match_seq, value, None)); + let res = kv_api + .upsert_kv(UpsertKVReq::new(&key, seq, value, None)) + .await?; - let res_seq = upsert_kv.await?.added_seq_or_else(|_v| { - ErrorCode::NetworkPolicyAlreadyExists(format!( - "Network policy '{}' already exists.", - network_policy.name - )) - })?; + if let CreateOption::CreateIfNotExists(false) = create_option { + if res.prev.is_some() { + return Err(ErrorCode::NetworkPolicyAlreadyExists(format!( + "Network policy '{}' already exists.", + network_policy.name + ))); + } + } - Ok(res_seq) + Ok(()) } #[async_backtrace::framed] diff --git a/src/query/service/src/interpreters/interpreter_network_policy_create.rs b/src/query/service/src/interpreters/interpreter_network_policy_create.rs index a289aa76d8c0..5fb646c930d5 100644 --- a/src/query/service/src/interpreters/interpreter_network_policy_create.rs +++ b/src/query/service/src/interpreters/interpreter_network_policy_create.rs @@ -62,7 +62,7 @@ impl Interpreter for CreateNetworkPolicyInterpreter { update_on: None, }; user_mgr - .add_network_policy(&tenant, network_policy, plan.if_not_exists) + .add_network_policy(&tenant, network_policy, &plan.create_option) .await?; Ok(PipelineBuildResult::create()) diff --git a/src/query/sql/src/planner/binder/ddl/network_policy.rs b/src/query/sql/src/planner/binder/ddl/network_policy.rs index d481e7a479d5..02dbc099261e 100644 --- a/src/query/sql/src/planner/binder/ddl/network_policy.rs +++ b/src/query/sql/src/planner/binder/ddl/network_policy.rs @@ -32,7 +32,7 @@ impl Binder { stmt: &CreateNetworkPolicyStmt, ) -> Result { let CreateNetworkPolicyStmt { - if_not_exists, + create_option, name, allowed_ip_list, blocked_ip_list, @@ -60,7 +60,7 @@ impl Binder { let tenant = self.ctx.get_tenant(); let plan = CreateNetworkPolicyPlan { - if_not_exists: *if_not_exists, + create_option: *create_option, tenant, name: name.to_string(), allowed_ip_list: allowed_ip_list.clone(), diff --git a/src/query/sql/src/planner/plans/ddl/account.rs b/src/query/sql/src/planner/plans/ddl/account.rs index b3402015f1b1..9c6c93377bb1 100644 --- a/src/query/sql/src/planner/plans/ddl/account.rs +++ b/src/query/sql/src/planner/plans/ddl/account.rs @@ -129,7 +129,7 @@ pub struct RevokePrivilegePlan { #[derive(Clone, Debug, PartialEq)] pub struct CreateNetworkPolicyPlan { - pub if_not_exists: bool, + pub create_option: CreateOption, pub tenant: String, pub name: String, pub allowed_ip_list: Vec, diff --git a/src/query/users/src/network_policy.rs b/src/query/users/src/network_policy.rs index aaf96e4e9d03..7a57df538dd6 100644 --- a/src/query/users/src/network_policy.rs +++ b/src/query/users/src/network_policy.rs @@ -17,6 +17,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_management::NetworkPolicyApi; use databend_common_meta_app::principal::NetworkPolicy; +use databend_common_meta_app::schema::CreateOption; use databend_common_meta_types::MatchSeq; use crate::UserApiProvider; @@ -28,28 +29,12 @@ impl UserApiProvider { &self, tenant: &str, network_policy: NetworkPolicy, - if_not_exists: bool, - ) -> Result { - if if_not_exists - && self - .exists_network_policy(tenant, network_policy.name.as_str()) - .await? - { - return Ok(0); - } - + create_option: &CreateOption, + ) -> Result<()> { let client = self.get_network_policy_api_client(tenant)?; - let add_network_policy = client.add_network_policy(network_policy); - match add_network_policy.await { - Ok(res) => Ok(res), - Err(e) => { - if if_not_exists && e.code() == ErrorCode::NETWORK_POLICY_ALREADY_EXISTS { - Ok(0) - } else { - Err(e.add_message_back("(while add network policy)")) - } - } - } + client + .add_network_policy(network_policy, create_option) + .await } // Update network policy. diff --git a/src/query/users/tests/it/network_policy.rs b/src/query/users/tests/it/network_policy.rs index 441bf1360964..6bac9fb40efb 100644 --- a/src/query/users/tests/it/network_policy.rs +++ b/src/query/users/tests/it/network_policy.rs @@ -50,7 +50,11 @@ async fn test_network_policy() -> Result<()> { update_on: None, }; user_mgr - .add_network_policy(tenant, network_policy, false) + .add_network_policy( + tenant, + network_policy, + &CreateOption::CreateIfNotExists(false), + ) .await?; // add user diff --git a/tests/sqllogictests/suites/base/05_ddl/05_0032_ddl_network_policy.test b/tests/sqllogictests/suites/base/05_ddl/05_0032_ddl_network_policy.test index 72afb242f390..3b502841b413 100644 --- a/tests/sqllogictests/suites/base/05_ddl/05_0032_ddl_network_policy.test +++ b/tests/sqllogictests/suites/base/05_ddl/05_0032_ddl_network_policy.test @@ -66,3 +66,15 @@ DROP NETWORK POLICY test_policy statement error 2207 DESC NETWORK POLICY test_policy + +statement ok +CREATE NETWORK POLICY replace_policy ALLOWED_IP_LIST=('192.168.1.0/24') BLOCKED_IP_LIST=('192.168.1.99') COMMENT='test comment' + +statement error 1005 +CREATE OR REPLACE NETWORK POLICY IF NOT EXISTS replace_policy ALLOWED_IP_LIST=('192.168.1.0/24') BLOCKED_IP_LIST=('192.168.1.89') COMMENT='another test comment' + +statement ok +CREATE OR REPLACE NETWORK POLICY replace_policy ALLOWED_IP_LIST=('192.168.1.0/24') BLOCKED_IP_LIST=('192.168.1.89') COMMENT='another test comment' + +statement ok +DROP NETWORK POLICY replace_policy \ No newline at end of file