Skip to content

Commit

Permalink
feat(nimtable): support ban ddl for iceberg engine table (#18409)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored and chenzl25 committed Sep 26, 2024
1 parent a2cb5af commit 6877fbb
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,10 @@ impl TableCatalog {
pub fn is_created(&self) -> bool {
self.stream_job_status == StreamJobStatus::Created
}

pub fn is_iceberg_engine_table(&self) -> bool {
self.engine == Engine::Iceberg
}
}

impl From<PbTable> for TableCatalog {
Expand Down
129 changes: 128 additions & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, S
use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult};
use pgwire::pg_server::BoxedError;
use pgwire::types::{Format, Row};
use risingwave_common::bail_not_implemented;
use risingwave_common::types::Fields;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::{bail, bail_not_implemented};
use risingwave_pb::meta::PbThrottleTarget;
use risingwave_sqlparser::ast::*;
use util::get_table_catalog_by_table_name;

use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
use crate::catalog::table_catalog::TableType;
Expand Down Expand Up @@ -245,6 +246,8 @@ pub async fn handle(
let _guard = session.txn_begin_implicit();
let handler_args = HandlerArgs::new(session, &stmt, sql)?;

check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?;

match stmt {
Statement::Explain {
statement,
Expand Down Expand Up @@ -1029,3 +1032,127 @@ pub async fn handle(
_ => bail_not_implemented!("Unhandled statement: {}", stmt),
}
}

fn check_ban_ddl_for_iceberg_engine_table(
session: Arc<SessionImpl>,
stmt: &Statement,
) -> Result<()> {
match stmt {
Statement::CreateIndex { table_name, .. } => {
let (table, schema_name) =
get_table_catalog_by_table_name(session.as_ref(), table_name)?;
if table.is_iceberg_engine_table() {
bail!(
"CREATE INDEX is not supported for iceberg table: {}.{}",
schema_name,
table_name
);
}
}

Statement::AlterTable {
name,
operation:
operation @ (AlterTableOperation::AddColumn { .. }
| AlterTableOperation::DropColumn { .. }),
} => {
let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
if table.is_iceberg_engine_table() {
bail!(
"ALTER TABLE {} is not supported for iceberg table: {}.{}",
operation,
schema_name,
name
);
}
}

Statement::AlterTable {
name,
operation: AlterTableOperation::RenameTable { .. },
} => {
let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
if table.is_iceberg_engine_table() {
bail!(
"ALTER TABLE RENAME is not supported for iceberg table: {}.{}",
schema_name,
name
);
}
}

Statement::AlterTable {
name,
operation: AlterTableOperation::ChangeOwner { .. },
} => {
let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
if table.is_iceberg_engine_table() {
bail!(
"ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}",
schema_name,
name
);
}
}

Statement::AlterTable {
name,
operation: AlterTableOperation::SetParallelism { .. },
} => {
let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
if table.is_iceberg_engine_table() {
bail!(
"ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}",
schema_name,
name
);
}
}

Statement::AlterTable {
name,
operation: AlterTableOperation::SetSchema { .. },
} => {
let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
if table.is_iceberg_engine_table() {
bail!(
"ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}",
schema_name,
name
);
}
}

Statement::AlterTable {
name,
operation: AlterTableOperation::RefreshSchema,
} => {
let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
if table.is_iceberg_engine_table() {
bail!(
"ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}",
schema_name,
name
);
}
}

Statement::AlterTable {
name,
operation: AlterTableOperation::SetSourceRateLimit { .. },
} => {
let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?;
if table.is_iceberg_engine_table() {
bail!(
"ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}",
schema_name,
name
);
}
}

_ => {}
}

Ok(())
}
20 changes: 20 additions & 0 deletions src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ use risingwave_sqlparser::ast::{
TableFactor, TableWithJoins,
};

use crate::catalog::root_catalog::SchemaPath;
use crate::error::{ErrorCode, Result as RwResult};
use crate::session::{current, SessionImpl};
use crate::{Binder, TableCatalog};

pin_project! {
/// Wrapper struct that converts a stream of DataChunk to a stream of RowSet based on formatting
Expand Down Expand Up @@ -238,6 +240,24 @@ pub fn convert_logstore_u64_to_unix_millis(logstore_u64: u64) -> u64 {
Epoch::from(logstore_u64).as_unix_millis()
}

pub fn get_table_catalog_by_table_name(
session: &SessionImpl,
table_name: &ObjectName,
) -> RwResult<(Arc<TableCatalog>, String)> {
let db_name = session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;

Ok((table.clone(), schema_name.to_string()))
}

#[cfg(test)]
mod tests {
use postgres_types::{ToSql, Type};
Expand Down

0 comments on commit 6877fbb

Please sign in to comment.