Skip to content

Commit

Permalink
feat(rbac): drop role will transfer role owns object to account_admin (
Browse files Browse the repository at this point in the history
…#15154)

* feature(rbac): drop role will transfer role owns object to account_admin

* use Txn optimize upsert

* fix clippy

* 1. use txn_backoff replace while
2. add add condition to match seq and key

* in loop try get ownerships

* into loop first check retry times

* add compat test
  • Loading branch information
TCeason authored Apr 9, 2024
1 parent 23ea5bd commit d72f6aa
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .github/actions/fuse_compat/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ runs:
bash ./tests/fuse-compat/test-fuse-compat.sh 1.1.46 base
bash ./tests/fuse-compat/test-fuse-compat.sh 1.2.241 revoke
bash ./tests/fuse-compat/test-fuse-compat.sh 1.2.306 rbac
bash ./tests/fuse-compat/test-fuse-compat.sh 1.2.307 rbac
bash ./tests/fuse-compat/test-fuse-compat.sh 1.2.318 rbac
bash ./tests/fuse-compat/test-fuse-forward-compat.sh 1.2.307 rbac
bash ./tests/fuse-compat/test-fuse-forward-compat.sh 1.2.318 rbac
- name: Upload failure
Expand Down
7 changes: 7 additions & 0 deletions src/query/management/src/role/role_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ pub trait RoleApi: Sync + Send {
async fn update_role_with<F>(&self, role: &String, seq: MatchSeq, f: F) -> Result<Option<u64>>
where F: FnOnce(&mut RoleInfo) + Send;

/// Only drop role will call transfer.
///
/// If a role is dropped, but the owner object is exists,
///
/// The owner role need to update to account_admin.
async fn transfer_ownership_to_admin(&self, role: &str) -> Result<()>;

/// Grant ownership would transfer ownership of a object from one role to another role
///
///
Expand Down
68 changes: 68 additions & 0 deletions src/query/management/src/role/role_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_meta_api::reply::txn_reply_to_api_result;
use databend_common_meta_api::txn_backoff::txn_backoff;
use databend_common_meta_api::txn_cond_seq;
use databend_common_meta_api::txn_op_del;
use databend_common_meta_api::txn_op_put;
Expand All @@ -41,6 +42,7 @@ use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnRequest;
use enumflags2::make_bitflags;
use log::debug;
use minitrace::func_name;

use crate::role::role_api::RoleApi;
Expand Down Expand Up @@ -247,6 +249,72 @@ impl RoleApi for RoleMgr {
Ok(Some(seq))
}

/// Only drop role will call transfer.
///
/// If a role is dropped, but the owner object is exists,
///
/// The owner role need to update to account_admin.
///
/// get_ownerships use prefix_list_kv that will generate once meta call
///
/// According to Txn reduce meta call. If role own n objects, will generate once meta call.
#[async_backtrace::framed]
#[minitrace::trace]
async fn transfer_ownership_to_admin(
&self,
role: &str,
) -> databend_common_exception::Result<()> {
let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;
let mut if_then = vec![];
let mut condition = vec![];
let seq_owns = self.get_ownerships().await.map_err(|e| {
e.add_message_back("(while in transfer_ownership_to_admin get ownerships).")
})?;
let mut need_transfer = false;
for own in seq_owns {
if own.data.role == *role {
need_transfer = true;
let object = own.data.object;
let owner_key = self.ownership_object_key(&object);
let owner_value = serialize_struct(
&OwnershipInfo {
object,
role: BUILTIN_ROLE_ACCOUNT_ADMIN.to_string(),
},
ErrorCode::IllegalUserInfoFormat,
|| "",
)?;
// Ensure accurate matching of a key
condition.push(txn_cond_seq(&owner_key, Eq, own.seq));
if_then.push(txn_op_put(&owner_key, owner_value))
}
}

if need_transfer {
let txn_req = TxnRequest {
condition: condition.clone(),
if_then: if_then.clone(),
else_then: vec![],
};
let tx_reply = self.kv_api.transaction(txn_req.clone()).await?;
let (succ, _) = txn_reply_to_api_result(tx_reply)?;
debug!(
succ = succ;
"transfer_ownership_to_admin"
);
if succ {
break;
}
} else {
break;
}
}

Ok(())
}

#[async_backtrace::framed]
#[minitrace::trace]
async fn grant_ownership(
Expand Down
3 changes: 3 additions & 0 deletions src/query/users/src/role_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ impl UserApiProvider {
#[async_backtrace::framed]
pub async fn drop_role(&self, tenant: &Tenant, role: String, if_exists: bool) -> Result<()> {
let client = self.role_api(tenant);
// If the dropped role owns objects, transfer objects owner to account_admin role.
client.transfer_ownership_to_admin(&role).await?;

let drop_role = client.drop_role(role, MatchSeq::GE(1));
match drop_role.await {
Ok(res) => Ok(res),
Expand Down
9 changes: 9 additions & 0 deletions tests/fuse-compat/compat-logictest/rbac/fuse_compat_read
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,12 @@ show grants for role 'role1';

statement ok
show grants for role 'role2';

statement ok
drop role role1;

statement ok
drop role role2;

statement ok
show roles;
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ t1
a drop_role
t drop_role
a account_admin
t drop_role
t account_admin
a drop_role1
t drop_role1
OWNERSHIP a ROLE drop_role1 GRANT OWNERSHIP ON 'default'.'a'.* TO ROLE `drop_role1`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
1
2
3
=== test drop role ===
table1 d20_0014_owner
table2 d20_0014_owner
d20_0014 d20_0014_owner
table1 account_admin
table2 account_admin
d20_0014 account_admin
table1 account_admin
table2 account_admin
d20_0014 account_admin
15 changes: 13 additions & 2 deletions tests/suites/0_stateless/18_rbac/18_0008_privilege_ownership.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,25 @@ echo "create database d20_0014" | $BENDSQL_CLIENT_CONNECT
echo "GRANT OWNERSHIP ON d20_0014.* TO ROLE 'd20_0014_owner'" | $BENDSQL_CLIENT_CONNECT

echo "GRANT ROLE 'd20_0014_owner' TO '${TEST_USER_NAME}'" | $BENDSQL_CLIENT_CONNECT
echo "ALTER USER '${TEST_USER_NAME}' WITH DEFAULT_ROLE='20_0014_owner'" | $BENDSQL_CLIENT_CONNECT
echo "ALTER USER '${TEST_USER_NAME}' WITH DEFAULT_ROLE='d20_0014_owner'" | $BENDSQL_CLIENT_CONNECT

## owner should have all privileges on the table
echo "create table d20_0014.table1(i int);" | $TEST_USER_CONNECT
echo "create table d20_0014.table2(i int);" | $TEST_USER_CONNECT
echo "insert into d20_0014.table1 values(1),(2),(3);" | $TEST_USER_CONNECT
echo "select * from d20_0014.table1;" | $TEST_USER_CONNECT

echo "=== test drop role ==="
echo "select name, owner from system.tables where name in ('table1', 'table2') and database='d20_0014' order by name" | $BENDSQL_CLIENT_CONNECT
echo "select name, owner from system.databases where name='d20_0014'" | $BENDSQL_CLIENT_CONNECT
echo "drop role 'd20_0014_owner'" | $BENDSQL_CLIENT_CONNECT
echo "select name, owner from system.tables where name in ('table1', 'table2') and database='d20_0014' order by name" | $BENDSQL_CLIENT_CONNECT
echo "select name, owner from system.databases where name='d20_0014'" | $BENDSQL_CLIENT_CONNECT
echo "create role 'd20_0014_owner'" | $BENDSQL_CLIENT_CONNECT
echo "select name, owner from system.tables where name in ('table1', 'table2') and database='d20_0014' order by name" | $BENDSQL_CLIENT_CONNECT
echo "select name, owner from system.databases where name='d20_0014'" | $BENDSQL_CLIENT_CONNECT

## cleanup
echo "drop role 'd20_0014_owner'" | $BENDSQL_CLIENT_CONNECT
echo "drop database d20_0014;" | $BENDSQL_CLIENT_CONNECT
echo "drop user '${TEST_USER_NAME}'" | $BENDSQL_CLIENT_CONNECT
echo "drop role 'd20_0014_owner'" | $BENDSQL_CLIENT_CONNECT

0 comments on commit d72f6aa

Please sign in to comment.