Skip to content

Commit

Permalink
feat: add create or replace network policy support (#14658)
Browse files Browse the repository at this point in the history
  • Loading branch information
lichuang authored Feb 10, 2024
1 parent baed42d commit 8c85ace
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 46 deletions.
16 changes: 12 additions & 4 deletions src/query/ast/src/ast/statements/network_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use std::fmt::Display;
use std::fmt::Formatter;

use databend_common_meta_app::schema::CreateOption;

#[derive(Debug, Clone, PartialEq)]
pub struct CreateNetworkPolicyStmt {
pub if_not_exists: bool,
pub create_option: CreateOption,
pub name: String,
pub allowed_ip_list: Vec<String>,
pub blocked_ip_list: Option<Vec<String>>,
Expand All @@ -26,9 +28,15 @@ pub struct CreateNetworkPolicyStmt {

impl Display for CreateNetworkPolicyStmt {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "CREATE NETWORK POLICY ")?;
if self.if_not_exists {
write!(f, "IF NOT EXISTS ")?;
write!(f, "CREATE ")?;
if let CreateOption::CreateOrReplace = self.create_option {
write!(f, "OR REPLACE ")?;
}
write!(f, "NETWORK POLICY ")?;
if let CreateOption::CreateIfNotExists(if_not_exists) = self.create_option {
if if_not_exists {
write!(f, "IF NOT EXISTS ")?;
}
}
write!(f, "{}", self.name)?;
write!(f, " ALLOWED_IP_LIST = (")?;
Expand Down
11 changes: 7 additions & 4 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1619,15 +1619,16 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
},
);

let create_network_policy = map(
let create_network_policy = map_res(
rule! {
CREATE ~ NETWORK ~ ^POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ ^#ident
CREATE ~ (OR ~ REPLACE)? ~ NETWORK ~ ^POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ ^#ident
~ ALLOWED_IP_LIST ~ ^Eq ~ ^"(" ~ ^#comma_separated_list0(literal_string) ~ ^")"
~ ( BLOCKED_IP_LIST ~ ^Eq ~ ^"(" ~ ^#comma_separated_list0(literal_string) ~ ^")" ) ?
~ ( COMMENT ~ ^Eq ~ ^#literal_string)?
},
|(
_,
opt_or_replace,
_,
_,
opt_if_not_exists,
Expand All @@ -1640,8 +1641,10 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
opt_blocked_ip_list,
opt_comment,
)| {
let create_option =
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;
let stmt = CreateNetworkPolicyStmt {
if_not_exists: opt_if_not_exists.is_some(),
create_option,
name: name.to_string(),
allowed_ip_list,
blocked_ip_list: match opt_blocked_ip_list {
Expand All @@ -1653,7 +1656,7 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
None => None,
},
};
Statement::CreateNetworkPolicy(stmt)
Ok(Statement::CreateNetworkPolicy(stmt))
},
);
let alter_network_policy = map(
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ fn test_statement() {
r#"DROP VIRTUAL COLUMN FOR t"#,
r#"REFRESH VIRTUAL COLUMN FOR t"#,
r#"CREATE NETWORK POLICY mypolicy ALLOWED_IP_LIST=('192.168.10.0/24') BLOCKED_IP_LIST=('192.168.10.99') COMMENT='test'"#,
r#"CREATE OR REPLACE NETWORK POLICY mypolicy ALLOWED_IP_LIST=('192.168.10.0/24') BLOCKED_IP_LIST=('192.168.10.99') COMMENT='test'"#,
r#"ALTER NETWORK POLICY mypolicy SET ALLOWED_IP_LIST=('192.168.10.0/24','192.168.255.1') BLOCKED_IP_LIST=('192.168.1.99') COMMENT='test'"#,
// tasks
r#"CREATE TASK IF NOT EXISTS MyTask1 WAREHOUSE = 'MyWarehouse' SCHEDULE = 15 MINUTE SUSPEND_TASK_AFTER_NUM_FAILURES = 3 COMMENT = 'This is test task 1' DATABASE = 'target', TIMEZONE = 'America/Los Angeles' AS SELECT * FROM MyTable1"#,
Expand Down
28 changes: 27 additions & 1 deletion src/query/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14839,7 +14839,33 @@ CREATE NETWORK POLICY mypolicy ALLOWED_IP_LIST = ('192.168.10.0/24') BLOCKED_IP_
---------- AST ------------
CreateNetworkPolicy(
CreateNetworkPolicyStmt {
if_not_exists: false,
create_option: CreateIfNotExists(
false,
),
name: "mypolicy",
allowed_ip_list: [
"192.168.10.0/24",
],
blocked_ip_list: Some(
[
"192.168.10.99",
],
),
comment: Some(
"test",
),
},
)


---------- Input ----------
CREATE OR REPLACE NETWORK POLICY mypolicy ALLOWED_IP_LIST=('192.168.10.0/24') BLOCKED_IP_LIST=('192.168.10.99') COMMENT='test'
---------- Output ---------
CREATE OR REPLACE NETWORK POLICY mypolicy ALLOWED_IP_LIST = ('192.168.10.0/24') BLOCKED_IP_LIST = ('192.168.10.99') COMMENT = 'test'
---------- AST ------------
CreateNetworkPolicy(
CreateNetworkPolicyStmt {
create_option: CreateOrReplace,
name: "mypolicy",
allowed_ip_list: [
"192.168.10.0/24",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@

use databend_common_exception::Result;
use databend_common_meta_app::principal::NetworkPolicy;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::SeqV;

#[async_trait::async_trait]
pub trait NetworkPolicyApi: Sync + Send {
async fn add_network_policy(&self, network_policy: NetworkPolicy) -> Result<u64>;
async fn add_network_policy(
&self,
network_policy: NetworkPolicy,
create_option: &CreateOption,
) -> Result<()>;

async fn update_network_policy(
&self,
Expand Down
29 changes: 19 additions & 10 deletions src/query/management/src/network_policy/network_policy_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use databend_common_base::base::escape_for_key;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::NetworkPolicy;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::MatchSeq;
Expand Down Expand Up @@ -67,8 +68,12 @@ impl NetworkPolicyMgr {
impl NetworkPolicyApi for NetworkPolicyMgr {
#[async_backtrace::framed]
#[minitrace::trace]
async fn add_network_policy(&self, network_policy: NetworkPolicy) -> Result<u64> {
let match_seq = MatchSeq::Exact(0);
async fn add_network_policy(
&self,
network_policy: NetworkPolicy,
create_option: &CreateOption,
) -> Result<()> {
let seq = MatchSeq::from(*create_option);
let key = self.make_network_policy_key(network_policy.name.as_str())?;
let value = Operation::Update(serialize_struct(
&network_policy,
Expand All @@ -77,16 +82,20 @@ impl NetworkPolicyApi for NetworkPolicyMgr {
)?);

let kv_api = self.kv_api.clone();
let upsert_kv = kv_api.upsert_kv(UpsertKVReq::new(&key, match_seq, value, None));
let res = kv_api
.upsert_kv(UpsertKVReq::new(&key, seq, value, None))
.await?;

let res_seq = upsert_kv.await?.added_seq_or_else(|_v| {
ErrorCode::NetworkPolicyAlreadyExists(format!(
"Network policy '{}' already exists.",
network_policy.name
))
})?;
if let CreateOption::CreateIfNotExists(false) = create_option {
if res.prev.is_some() {
return Err(ErrorCode::NetworkPolicyAlreadyExists(format!(
"Network policy '{}' already exists.",
network_policy.name
)));
}
}

Ok(res_seq)
Ok(())
}

#[async_backtrace::framed]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Interpreter for CreateNetworkPolicyInterpreter {
update_on: None,
};
user_mgr
.add_network_policy(&tenant, network_policy, plan.if_not_exists)
.add_network_policy(&tenant, network_policy, &plan.create_option)
.await?;

Ok(PipelineBuildResult::create())
Expand Down
4 changes: 2 additions & 2 deletions src/query/sql/src/planner/binder/ddl/network_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Binder {
stmt: &CreateNetworkPolicyStmt,
) -> Result<Plan> {
let CreateNetworkPolicyStmt {
if_not_exists,
create_option,
name,
allowed_ip_list,
blocked_ip_list,
Expand Down Expand Up @@ -60,7 +60,7 @@ impl Binder {

let tenant = self.ctx.get_tenant();
let plan = CreateNetworkPolicyPlan {
if_not_exists: *if_not_exists,
create_option: *create_option,
tenant,
name: name.to_string(),
allowed_ip_list: allowed_ip_list.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/plans/ddl/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub struct RevokePrivilegePlan {

#[derive(Clone, Debug, PartialEq)]
pub struct CreateNetworkPolicyPlan {
pub if_not_exists: bool,
pub create_option: CreateOption,
pub tenant: String,
pub name: String,
pub allowed_ip_list: Vec<String>,
Expand Down
27 changes: 6 additions & 21 deletions src/query/users/src/network_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_management::NetworkPolicyApi;
use databend_common_meta_app::principal::NetworkPolicy;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_types::MatchSeq;

use crate::UserApiProvider;
Expand All @@ -28,28 +29,12 @@ impl UserApiProvider {
&self,
tenant: &str,
network_policy: NetworkPolicy,
if_not_exists: bool,
) -> Result<u64> {
if if_not_exists
&& self
.exists_network_policy(tenant, network_policy.name.as_str())
.await?
{
return Ok(0);
}

create_option: &CreateOption,
) -> Result<()> {
let client = self.get_network_policy_api_client(tenant)?;
let add_network_policy = client.add_network_policy(network_policy);
match add_network_policy.await {
Ok(res) => Ok(res),
Err(e) => {
if if_not_exists && e.code() == ErrorCode::NETWORK_POLICY_ALREADY_EXISTS {
Ok(0)
} else {
Err(e.add_message_back("(while add network policy)"))
}
}
}
client
.add_network_policy(network_policy, create_option)
.await
}

// Update network policy.
Expand Down
6 changes: 5 additions & 1 deletion src/query/users/tests/it/network_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ async fn test_network_policy() -> Result<()> {
update_on: None,
};
user_mgr
.add_network_policy(tenant, network_policy, false)
.add_network_policy(
tenant,
network_policy,
&CreateOption::CreateIfNotExists(false),
)
.await?;

// add user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,15 @@ DROP NETWORK POLICY test_policy

statement error 2207
DESC NETWORK POLICY test_policy

statement ok
CREATE NETWORK POLICY replace_policy ALLOWED_IP_LIST=('192.168.1.0/24') BLOCKED_IP_LIST=('192.168.1.99') COMMENT='test comment'

statement error 1005
CREATE OR REPLACE NETWORK POLICY IF NOT EXISTS replace_policy ALLOWED_IP_LIST=('192.168.1.0/24') BLOCKED_IP_LIST=('192.168.1.89') COMMENT='another test comment'

statement ok
CREATE OR REPLACE NETWORK POLICY replace_policy ALLOWED_IP_LIST=('192.168.1.0/24') BLOCKED_IP_LIST=('192.168.1.89') COMMENT='another test comment'

statement ok
DROP NETWORK POLICY replace_policy

0 comments on commit 8c85ace

Please sign in to comment.