Skip to content

Commit

Permalink
feat(query): privilege access check need consider if exists (#14700)
Browse files Browse the repository at this point in the history
* feat(query): drop table need table level drop privilege

* remove tmp var if_exists

* privilege access check need consider if exists

* delete println

---------

Co-authored-by: Bohu <[email protected]>
  • Loading branch information
TCeason and BohuTANG authored Feb 21, 2024
1 parent 926ac0a commit bdad505
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 65 deletions.
173 changes: 109 additions & 64 deletions src/query/service/src/interpreters/access/privilege_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl PrivilegeAccess {
catalog_name: &str,
db_name: &str,
privileges: Vec<UserPrivilegeType>,
if_exists: bool,
) -> Result<()> {
let tenant = self.ctx.get_tenant();

Expand All @@ -159,18 +160,32 @@ impl PrivilegeAccess {
return Ok(());
}
Err(_err) => {
let (db_id, _) = match self
match self
.convert_to_id(&tenant, catalog_name, db_name, None)
.await?
.await
{
ObjectId::Table(db_id, table_id) => (db_id, Some(table_id)),
ObjectId::Database(db_id) => (db_id, None),
};
self.validate_access(
&GrantObject::DatabaseById(catalog_name.to_string(), db_id),
privileges,
)
.await?
Ok(obj) => {
let (db_id, _) = match obj {
ObjectId::Table(db_id, table_id) => (db_id, Some(table_id)),
ObjectId::Database(db_id) => (db_id, None),
};
self.validate_access(
&GrantObject::DatabaseById(catalog_name.to_string(), db_id),
privileges,
)
.await?
}
Err(e) => match e.code() {
ErrorCode::UNKNOWN_DATABASE
| ErrorCode::UNKNOWN_TABLE
| ErrorCode::UNKNOWN_CATALOG
if if_exists =>
{
return Ok(());
}
_ => return Err(e.add_message("error on validating database access")),
},
}
}
}
Ok(())
Expand All @@ -182,13 +197,25 @@ impl PrivilegeAccess {
db_name: &str,
table_name: &str,
privileges: Vec<UserPrivilegeType>,
if_exists: bool,
) -> Result<()> {
let tenant = self.ctx.get_tenant();

let catalog = self.ctx.get_catalog(catalog_name).await?;
if catalog.exists_table_function(table_name) {
return Ok(());
match self.ctx.get_catalog(catalog_name).await {
Ok(catalog) => {
if catalog.exists_table_function(table_name) {
return Ok(());
}
}
Err(error) => {
if error.code() == ErrorCode::UNKNOWN_CATALOG && if_exists {
return Ok(());
} else {
return Err(error);
}
}
}

// to keep compatibility with the legacy privileges which granted by table name,
// we'd both check the privileges by name and id.
// we'll completely move to the id side in the future.
Expand All @@ -203,22 +230,38 @@ impl PrivilegeAccess {
)
.await
{
Ok(_) => {
return Ok(());
}
Ok(_) => return Ok(()),
Err(_err) => {
let (db_id, table_id) = match self
match self
.convert_to_id(&tenant, catalog_name, db_name, Some(table_name))
.await?
.await
{
ObjectId::Table(db_id, table_id) => (db_id, Some(table_id)),
ObjectId::Database(db_id) => (db_id, None),
};
self.validate_access(
&GrantObject::TableById(catalog_name.to_string(), db_id, table_id.unwrap()),
privileges,
)
.await?
Ok(obj) => {
let (db_id, table_id) = match obj {
ObjectId::Table(db_id, table_id) => (db_id, Some(table_id)),
ObjectId::Database(db_id) => (db_id, None),
};
self.validate_access(
&GrantObject::TableById(
catalog_name.to_string(),
db_id,
table_id.unwrap(),
),
privileges,
)
.await?
}
Err(e) => match e.code() {
ErrorCode::UNKNOWN_DATABASE
| ErrorCode::UNKNOWN_TABLE
| ErrorCode::UNKNOWN_CATALOG
if if_exists =>
{
return Ok(());
}
_ => return Err(e.add_message("error on validating table access")),
},
}
}
}
Ok(())
Expand Down Expand Up @@ -549,7 +592,7 @@ impl AccessChecker for PrivilegeAccess {
// like this sql: copy into t from (select * from @s3); will bind a mock table with name `system.read_parquet(s3)`
// this is no means to check table `system.read_parquet(s3)` privilege
if !table.is_source_of_stage() {
self.validate_table_access(catalog_name, table.database(), table.name(), vec![UserPrivilegeType::Select]).await?
self.validate_table_access(catalog_name, table.database(), table.name(), vec![UserPrivilegeType::Select], false).await?
}
}
}
Expand All @@ -559,18 +602,19 @@ impl AccessChecker for PrivilegeAccess {

// Database.
Plan::ShowCreateDatabase(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Select]).await?
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Select], false).await?
}
Plan::CreateDatabase(_) => {
self.validate_access(&GrantObject::Global, vec![UserPrivilegeType::CreateDatabase])
.await?;
}
Plan::DropDatabase(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Drop]).await?;
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Drop], plan.if_exists).await?;
}
Plan::UndropDatabase(_)
| Plan::DropUDF(_)
| Plan::DropIndex(_) => {
// undroptable/db need convert name to id. But because of drop, can not find the id. Upgrade Object to Database.
self.validate_access(&GrantObject::Global, vec![UserPrivilegeType::Drop])
.await?;
}
Expand Down Expand Up @@ -599,64 +643,65 @@ impl AccessChecker for PrivilegeAccess {

// Virtual Column.
Plan::CreateVirtualColumn(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Create]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Create], false).await?
}
Plan::AlterVirtualColumn(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter], plan.if_exists).await?
}
Plan::DropVirtualColumn(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Drop]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Drop], plan.if_exists).await?
}
Plan::RefreshVirtualColumn(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Super]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Super], false).await?
}

// Table.
Plan::ShowCreateTable(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Select]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Select], false).await?
}
Plan::DescribeTable(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Select]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Select], false).await?
}
Plan::CreateTable(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Create]).await?;
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Create], false).await?;
if let Some(query) = &plan.as_select {
self.check(ctx, query).await?;
}
}
Plan::DropTable(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Drop]).await?;
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Drop], plan.if_exists).await?;
}
Plan::UndropTable(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Drop]).await?;
// undroptable/db need convert name to id. But because of drop, can not find the id. Upgrade Object to Database.
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Drop], false).await?;

}
Plan::RenameTable(plan) => {
// You must have ALTER and DROP privileges for the original table,
// and CREATE for the new db.
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter, UserPrivilegeType::Drop]).await?;
self.validate_db_access(&plan.catalog, &plan.new_database, vec![UserPrivilegeType::Create]).await?;
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter, UserPrivilegeType::Drop], plan.if_exists).await?;
self.validate_db_access(&plan.catalog, &plan.new_database, vec![UserPrivilegeType::Create], false).await?;
}
Plan::SetOptions(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter], false).await?
}
Plan::AddTableColumn(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter], false).await?
}
Plan::RenameTableColumn(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter], false).await?
}
Plan::ModifyTableColumn(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter], false).await?
}
Plan::DropTableColumn(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter], false).await?
}
Plan::AlterTableClusterKey(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter], false).await?
}
Plan::DropTableClusterKey(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Drop]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Drop], false).await?
}
Plan::ReclusterTable(plan) => {
if enable_experimental_rbac_check {
Expand All @@ -665,26 +710,26 @@ impl AccessChecker for PrivilegeAccess {
self.validate_udf_access(udf).await?;
}
}
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Alter], false).await?
}
Plan::TruncateTable(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Delete]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Delete], false).await?
}
Plan::OptimizeTable(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Super]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Super], false).await?
}
Plan::VacuumTable(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Super]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Super], false).await?
}
Plan::VacuumDropTable(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Super]).await?
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Super], false).await?
}
Plan::AnalyzeTable(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Super]).await?
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Super], false).await?
}
// Others.
Plan::Insert(plan) => {
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Insert]).await?;
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Insert], false).await?;
match &plan.source {
InsertInputSource::SelectPlan(plan) => {
self.check(ctx, plan).await?;
Expand All @@ -699,7 +744,7 @@ impl AccessChecker for PrivilegeAccess {
}
Plan::Replace(plan) => {
//plan.delete_when is Expr no need to check privileges.
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Insert, UserPrivilegeType::Delete]).await?;
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Insert, UserPrivilegeType::Delete], false).await?;
match &plan.source {
InsertInputSource::SelectPlan(plan) => {
self.check(ctx, plan).await?;
Expand Down Expand Up @@ -750,7 +795,7 @@ impl AccessChecker for PrivilegeAccess {
}
}
}
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Insert, UserPrivilegeType::Delete]).await?;
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Insert, UserPrivilegeType::Delete], false).await?;
}
Plan::Delete(plan) => {
if enable_experimental_rbac_check {
Expand All @@ -771,7 +816,7 @@ impl AccessChecker for PrivilegeAccess {
}
}
}
self.validate_table_access(&plan.catalog_name, &plan.database_name, &plan.table_name, vec![UserPrivilegeType::Delete]).await?
self.validate_table_access(&plan.catalog_name, &plan.database_name, &plan.table_name, vec![UserPrivilegeType::Delete], false).await?
}
Plan::Update(plan) => {
if enable_experimental_rbac_check {
Expand All @@ -796,22 +841,22 @@ impl AccessChecker for PrivilegeAccess {
}
}
}
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Update]).await?;
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, vec![UserPrivilegeType::Update], false).await?;
}
Plan::CreateView(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Create]).await?
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Create], false).await?
}
Plan::AlterView(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Alter]).await?
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Alter], false).await?
}
Plan::DropView(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Drop]).await?
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Drop], plan.if_exists).await?
}
Plan::CreateStream(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Create]).await?
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Create], false).await?
}
Plan::DropStream(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Drop]).await?
self.validate_db_access(&plan.catalog, &plan.database, vec![UserPrivilegeType::Drop], plan.if_exists).await?
}
Plan::CreateUser(_) => {
self.validate_access(
Expand Down Expand Up @@ -868,7 +913,7 @@ impl AccessChecker for PrivilegeAccess {
}
Plan::CopyIntoTable(plan) => {
self.validate_stage_access(&plan.stage_table_info.stage_info, UserPrivilegeType::Read).await?;
self.validate_table_access(plan.catalog_info.catalog_name(), &plan.database_name, &plan.table_name, vec![UserPrivilegeType::Insert]).await?;
self.validate_table_access(plan.catalog_info.catalog_name(), &plan.database_name, &plan.table_name, vec![UserPrivilegeType::Insert], false).await?;
if let Some(query) = &plan.query {
self.check(ctx, query).await?;
}
Expand Down
Loading

0 comments on commit bdad505

Please sign in to comment.