Skip to content

Commit

Permalink
feat: add create or replace user support
Browse files Browse the repository at this point in the history
  • Loading branch information
lichuang committed Feb 3, 2024
1 parent c4d0aad commit 68d12c9
Show file tree
Hide file tree
Showing 17 changed files with 151 additions and 61 deletions.
15 changes: 11 additions & 4 deletions src/query/ast/src/ast/statements/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,29 @@ use databend_common_meta_app::principal::UserIdentity;
use databend_common_meta_app::principal::UserOption;
use databend_common_meta_app::principal::UserOptionFlag;
use databend_common_meta_app::principal::UserPrivilegeType;
use databend_common_meta_app::schema::CreateOption;

use crate::ast::write_comma_separated_list;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CreateUserStmt {
pub if_not_exists: bool,
pub create_option: CreateOption,
pub user: UserIdentity,
pub auth_option: AuthOption,
pub user_options: Vec<UserOptionItem>,
}

impl Display for CreateUserStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "CREATE USER")?;
if self.if_not_exists {
write!(f, " IF NOT EXISTS")?;
write!(f, "CREATE")?;
if let CreateOption::CreateOrReplace = self.create_option {
write!(f, " OR REPLACE")?;
}
write!(f, " USER")?;
if let CreateOption::CreateIfNotExists(if_not_exists) = self.create_option {
if if_not_exists {
write!(f, " IF NOT EXISTS")?;
}
}
write!(f, " {} IDENTIFIED", self.user)?;
write!(f, " {}", self.auth_option)?;
Expand Down
26 changes: 19 additions & 7 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1061,16 +1061,28 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
);

let show_users = value(Statement::ShowUsers, rule! { SHOW ~ USERS });
let create_user = map(
let create_user = map_res(
rule! {
CREATE ~ USER ~ ( IF ~ ^NOT ~ ^EXISTS )?
CREATE ~ (OR ~ REPLACE)? ~ USER ~ ( IF ~ ^NOT ~ ^EXISTS )?
~ #user_identity
~ IDENTIFIED ~ ( WITH ~ ^#auth_type )? ~ ( BY ~ ^#literal_string )?
~ ( WITH ~ ^#comma_separated_list1(user_option))?
},
|(_, _, opt_if_not_exists, user, _, opt_auth_type, opt_password, opt_user_option)| {
Statement::CreateUser(CreateUserStmt {
if_not_exists: opt_if_not_exists.is_some(),
|(
_,
opt_or_replace,
_,
opt_if_not_exists,
user,
_,
opt_auth_type,
opt_password,
opt_user_option,
)| {
let create_option =
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;
Ok(Statement::CreateUser(CreateUserStmt {
create_option,
user,
auth_option: AuthOption {
auth_type: opt_auth_type.map(|(_, auth_type)| auth_type),
Expand All @@ -1079,7 +1091,7 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
user_options: opt_user_option
.map(|(_, user_options)| user_options)
.unwrap_or_default(),
})
}))
},
);
let alter_user = map(
Expand Down Expand Up @@ -1887,7 +1899,7 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
),
rule!(
#show_users : "`SHOW USERS`"
| #create_user : "`CREATE USER [IF NOT EXISTS] '<username>'@'hostname' IDENTIFIED [WITH <auth_type>] [BY <password>] [WITH <user_option>, ...]`"
| #create_user : "`CREATE [OR REPLACE] USER [IF NOT EXISTS] '<username>'@'hostname' IDENTIFIED [WITH <auth_type>] [BY <password>] [WITH <user_option>, ...]`"
| #alter_user : "`ALTER USER ('<username>'@'hostname' | USER()) [IDENTIFIED [WITH <auth_type>] [BY <password>]] [WITH <user_option>, ...]`"
| #drop_user : "`DROP USER [IF EXISTS] '<username>'@'hostname'`"
| #show_roles : "`SHOW ROLES`"
Expand Down
12 changes: 9 additions & 3 deletions src/query/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3370,7 +3370,9 @@ CREATE USER 'u1'@'%' IDENTIFIED BY '123456' WITH DEFAULT_ROLE = 'role123' TENANT
---------- AST ------------
CreateUser(
CreateUserStmt {
if_not_exists: false,
create_option: CreateIfNotExists(
false,
),
user: UserIdentity {
username: "u1",
hostname: "%",
Expand Down Expand Up @@ -3400,7 +3402,9 @@ CREATE USER 'u1'@'%' IDENTIFIED BY '123456' WITH SET NETWORK POLICY = 'policy1'
---------- AST ------------
CreateUser(
CreateUserStmt {
if_not_exists: false,
create_option: CreateIfNotExists(
false,
),
user: UserIdentity {
username: "u1",
hostname: "%",
Expand Down Expand Up @@ -7719,7 +7723,9 @@ CREATE USER 'test-e'@'%' IDENTIFIED BY 'password'
---------- AST ------------
CreateUser(
CreateUserStmt {
if_not_exists: false,
create_option: CreateIfNotExists(
false,
),
user: UserIdentity {
username: "test-e",
hostname: "%",
Expand Down
3 changes: 2 additions & 1 deletion src/query/management/src/user/user_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
use databend_common_exception::Result;
use databend_common_meta_app::principal::UserIdentity;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::SeqV;

#[async_trait::async_trait]
pub trait UserApi: Sync + Send {
async fn add_user(&self, user_info: UserInfo) -> Result<u64>;
async fn add_user(&self, user_info: UserInfo, create_option: &CreateOption) -> Result<()>;

async fn get_user(&self, user: UserIdentity, seq: MatchSeq) -> Result<SeqV<UserInfo>>;

Expand Down
40 changes: 25 additions & 15 deletions src/query/management/src/user/user_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::UserIdentity;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::MatchSeq;
Expand Down Expand Up @@ -81,25 +82,34 @@ impl UserMgr {
impl UserApi for UserMgr {
#[async_backtrace::framed]
#[minitrace::trace]
async fn add_user(&self, user_info: UserInfo) -> databend_common_exception::Result<u64> {
let match_seq = MatchSeq::Exact(0);
async fn add_user(
&self,
user_info: UserInfo,
create_option: &CreateOption,
) -> databend_common_exception::Result<()> {
let user_key = format_user_key(&user_info.name, &user_info.hostname);
let key = format!("{}/{}", self.user_prefix, escape_for_key(&user_key)?);
let value = serialize_struct(&user_info, ErrorCode::IllegalUserInfoFormat, || "")?;

let kv_api = self.kv_api.clone();
let upsert_kv = kv_api.upsert_kv(UpsertKVReq::new(
&key,
match_seq,
Operation::Update(value),
None,
));

let res_seq = upsert_kv.await?.added_seq_or_else(|_v| {
ErrorCode::UserAlreadyExists(format!("User {} already exists.", user_key))
})?;

Ok(res_seq)
let kv_api = &self.kv_api;
let seq = match create_option {
CreateOption::CreateIfNotExists(_) => MatchSeq::Exact(0),
CreateOption::CreateOrReplace => MatchSeq::GE(0),
};
let res = kv_api
.upsert_kv(UpsertKVReq::new(&key, seq, Operation::Update(value), None))
.await?;

if let CreateOption::CreateIfNotExists(false) = create_option {
if res.prev.is_some() {
return Err(ErrorCode::UserAlreadyExists(format!(
"User {} already exists.",
user_key
)));
}
}

Ok(())
}

#[async_backtrace::framed]
Expand Down
7 changes: 5 additions & 2 deletions src/query/management/tests/it/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ fn default_test_auth_info() -> AuthInfo {

mod add {
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_types::Operation;

use super::*;
Expand Down Expand Up @@ -119,7 +120,7 @@ mod add {
.return_once(|_u| Ok(UpsertKVReply::new(None, Some(SeqV::new(1, v)))));
let api = Arc::new(api);
let user_mgr = UserMgr::create(api, "tenant1")?;
let res = user_mgr.add_user(user_info);
let res = user_mgr.add_user(user_info, &CreateOption::CreateIfNotExists(false));

assert!(res.await.is_ok());
}
Expand Down Expand Up @@ -148,7 +149,9 @@ mod add {

let user_info = UserInfo::new(test_user_name, test_hostname, default_test_auth_info());

let res = user_mgr.add_user(user_info).await;
let res = user_mgr
.add_user(user_info, &CreateOption::CreateIfNotExists(false))
.await;

assert_eq!(
res.unwrap_err().code(),
Expand Down
9 changes: 8 additions & 1 deletion src/query/service/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_exception::Result;
use databend_common_meta_app::principal::AuthInfo;
use databend_common_meta_app::principal::UserIdentity;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::schema::CreateOption;
use databend_common_users::JwtAuthenticator;
use databend_common_users::UserApiProvider;

Expand Down Expand Up @@ -116,7 +117,13 @@ impl AuthMgr {
user_info.grants.grant_role(role);
}
}
user_api.add_user(&tenant, user_info.clone(), true).await?;
user_api
.add_user(
&tenant,
user_info.clone(),
&CreateOption::CreateIfNotExists(true),
)
.await?;
user_info
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Interpreter for CreateUserInterpreter {
lockout_time: None,
};
user_mgr
.add_user(&tenant, user_info, plan.if_not_exists)
.add_user(&tenant, user_info, &plan.create_option)
.await?;

Ok(PipelineBuildResult::create())
Expand Down
7 changes: 6 additions & 1 deletion src/query/service/tests/it/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use databend_common_base::base::tokio;
use databend_common_exception::Result;
use databend_common_meta_app::principal::AuthInfo;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::schema::CreateOption;
use databend_common_users::CustomClaims;
use databend_common_users::EnsureUser;
use databend_common_users::UserApiProvider;
Expand Down Expand Up @@ -152,7 +153,11 @@ async fn test_auth_mgr_with_jwt_multi_sources() -> Result<()> {
let tenant = ctx.get_current_session().get_current_tenant();
let user2_info = UserInfo::new(user2, "%", AuthInfo::JWT);
UserApiProvider::instance()
.add_user(tenant.as_str(), user2_info.clone(), true)
.add_user(
tenant.as_str(),
user2_info.clone(),
&CreateOption::CreateIfNotExists(true),
)
.await?;
let res2 = auth_mgr
.auth(ctx.get_current_session(), &Credential::Jwt {
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/tests/it/storages/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_meta_app::principal::UserGrantSet;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::principal::UserOption;
use databend_common_meta_app::principal::UserQuota;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::storage::StorageFsConfig;
use databend_common_meta_app::storage::StorageParams;
use databend_common_meta_app::storage::StorageS3Config;
Expand Down Expand Up @@ -398,7 +399,7 @@ async fn test_users_table() -> Result<()> {
password_update_on: None,
lockout_time: None,
},
false,
&CreateOption::CreateIfNotExists(false),
)
.await?;
let auth_data = AuthInfo::new(AuthType::Sha256Password, &Some("123456789".to_string()));
Expand All @@ -418,7 +419,7 @@ async fn test_users_table() -> Result<()> {
password_update_on: None,
lockout_time: None,
},
false,
&CreateOption::CreateIfNotExists(false),
)
.await?;

Expand Down
4 changes: 2 additions & 2 deletions src/query/sql/src/planner/binder/ddl/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl Binder {
stmt: &CreateUserStmt,
) -> Result<Plan> {
let CreateUserStmt {
if_not_exists,
create_option,
user,
auth_option,
user_options,
Expand All @@ -248,7 +248,7 @@ impl Binder {
.await?;

let plan = CreateUserPlan {
if_not_exists: *if_not_exists,
create_option: create_option.clone(),
user: user.clone(),
auth_info: AuthInfo::create2(&auth_option.auth_type, &auth_option.password)?,
user_option,
Expand Down
3 changes: 2 additions & 1 deletion src/query/sql/src/planner/plans/ddl/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ use databend_common_meta_app::principal::PrincipalIdentity;
use databend_common_meta_app::principal::UserIdentity;
use databend_common_meta_app::principal::UserOption;
use databend_common_meta_app::principal::UserPrivilegeSet;
use databend_common_meta_app::schema::CreateOption;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CreateUserPlan {
pub if_not_exists: bool,
pub create_option: CreateOption,
pub user: UserIdentity,
pub auth_info: AuthInfo,
pub user_option: UserOption,
Expand Down
17 changes: 4 additions & 13 deletions src/query/users/src/user_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_meta_app::principal::UserIdentity;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::principal::UserOption;
use databend_common_meta_app::principal::UserPrivilegeSet;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_types::MatchSeq;

use crate::role_mgr::BUILTIN_ROLE_ACCOUNT_ADMIN;
Expand Down Expand Up @@ -128,8 +129,8 @@ impl UserApiProvider {
&self,
tenant: &str,
user_info: UserInfo,
if_not_exists: bool,
) -> Result<u64> {
create_option: &CreateOption,
) -> Result<()> {
if let Some(name) = user_info.option.network_policy() {
if self.get_network_policy(tenant, name).await.is_err() {
return Err(ErrorCode::UnknownNetworkPolicy(format!(
Expand All @@ -153,17 +154,7 @@ impl UserApiProvider {
)));
}
let client = self.get_user_api_client(tenant)?;
let add_user = client.add_user(user_info);
match add_user.await {
Ok(res) => Ok(res),
Err(e) => {
if if_not_exists && e.code() == ErrorCode::USER_ALREADY_EXISTS {
Ok(0)
} else {
Err(e.add_message_back("(while add user)"))
}
}
}
client.add_user(user_info, create_option).await
}

#[async_backtrace::framed]
Expand Down
5 changes: 4 additions & 1 deletion src/query/users/tests/it/network_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use databend_common_meta_app::principal::PasswordHashMethod;
use databend_common_meta_app::principal::UserIdentity;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::principal::UserOption;
use databend_common_meta_app::schema::CreateOption;
use databend_common_users::UserApiProvider;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down Expand Up @@ -62,7 +63,9 @@ async fn test_network_policy() -> Result<()> {
let mut option = UserOption::empty();
option = option.with_network_policy(Some(policy_name.clone()));
user_info.update_auth_option(None, Some(option));
user_mgr.add_user(tenant, user_info, false).await?;
user_mgr
.add_user(tenant, user_info, &CreateOption::CreateIfNotExists(false))
.await?;

let user = UserIdentity::new(username, hostname);

Expand Down
Loading

0 comments on commit 68d12c9

Please sign in to comment.