From 6877fbbfa2cd8f93de355408587fdaf8b96786bf Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 5 Sep 2024 23:17:21 +0800 Subject: [PATCH] feat(nimtable): support ban ddl for iceberg engine table (#18409) --- src/frontend/src/catalog/table_catalog.rs | 4 + src/frontend/src/handler/mod.rs | 129 +++++++++++++++++++++- src/frontend/src/handler/util.rs | 20 ++++ 3 files changed, 152 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 7bf9ea889de03..82464e847f85c 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -572,6 +572,10 @@ impl TableCatalog { pub fn is_created(&self) -> bool { self.stream_job_status == StreamJobStatus::Created } + + pub fn is_iceberg_engine_table(&self) -> bool { + self.engine == Engine::Iceberg + } } impl From for TableCatalog { diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index dd40a4edea19f..a826d01179b32 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -24,11 +24,12 @@ use pgwire::pg_response::StatementType::{self, ABORT, BEGIN, COMMIT, ROLLBACK, S use pgwire::pg_response::{PgResponse, PgResponseBuilder, RowSetResult}; use pgwire::pg_server::BoxedError; use pgwire::types::{Format, Row}; -use risingwave_common::bail_not_implemented; use risingwave_common::types::Fields; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common::{bail, bail_not_implemented}; use risingwave_pb::meta::PbThrottleTarget; use risingwave_sqlparser::ast::*; +use util::get_table_catalog_by_table_name; use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt}; use crate::catalog::table_catalog::TableType; @@ -245,6 +246,8 @@ pub async fn handle( let _guard = session.txn_begin_implicit(); let handler_args = HandlerArgs::new(session, &stmt, sql)?; + check_ban_ddl_for_iceberg_engine_table(handler_args.session.clone(), &stmt)?; + match stmt { Statement::Explain { statement, @@ -1029,3 +1032,127 @@ pub async fn handle( _ => bail_not_implemented!("Unhandled statement: {}", stmt), } } + +fn check_ban_ddl_for_iceberg_engine_table( + session: Arc, + stmt: &Statement, +) -> Result<()> { + match stmt { + Statement::CreateIndex { table_name, .. } => { + let (table, schema_name) = + get_table_catalog_by_table_name(session.as_ref(), table_name)?; + if table.is_iceberg_engine_table() { + bail!( + "CREATE INDEX is not supported for iceberg table: {}.{}", + schema_name, + table_name + ); + } + } + + Statement::AlterTable { + name, + operation: + operation @ (AlterTableOperation::AddColumn { .. } + | AlterTableOperation::DropColumn { .. }), + } => { + let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?; + if table.is_iceberg_engine_table() { + bail!( + "ALTER TABLE {} is not supported for iceberg table: {}.{}", + operation, + schema_name, + name + ); + } + } + + Statement::AlterTable { + name, + operation: AlterTableOperation::RenameTable { .. }, + } => { + let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?; + if table.is_iceberg_engine_table() { + bail!( + "ALTER TABLE RENAME is not supported for iceberg table: {}.{}", + schema_name, + name + ); + } + } + + Statement::AlterTable { + name, + operation: AlterTableOperation::ChangeOwner { .. }, + } => { + let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?; + if table.is_iceberg_engine_table() { + bail!( + "ALTER TABLE CHANGE OWNER is not supported for iceberg table: {}.{}", + schema_name, + name + ); + } + } + + Statement::AlterTable { + name, + operation: AlterTableOperation::SetParallelism { .. }, + } => { + let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?; + if table.is_iceberg_engine_table() { + bail!( + "ALTER TABLE SET PARALLELISM is not supported for iceberg table: {}.{}", + schema_name, + name + ); + } + } + + Statement::AlterTable { + name, + operation: AlterTableOperation::SetSchema { .. }, + } => { + let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?; + if table.is_iceberg_engine_table() { + bail!( + "ALTER TABLE SET SCHEMA is not supported for iceberg table: {}.{}", + schema_name, + name + ); + } + } + + Statement::AlterTable { + name, + operation: AlterTableOperation::RefreshSchema, + } => { + let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?; + if table.is_iceberg_engine_table() { + bail!( + "ALTER TABLE REFRESH SCHEMA is not supported for iceberg table: {}.{}", + schema_name, + name + ); + } + } + + Statement::AlterTable { + name, + operation: AlterTableOperation::SetSourceRateLimit { .. }, + } => { + let (table, schema_name) = get_table_catalog_by_table_name(session.as_ref(), name)?; + if table.is_iceberg_engine_table() { + bail!( + "ALTER TABLE SET SOURCE RATE LIMIT is not supported for iceberg table: {}.{}", + schema_name, + name + ); + } + } + + _ => {} + } + + Ok(()) +} diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 0531ce5a65284..9c745bab68ab7 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -36,8 +36,10 @@ use risingwave_sqlparser::ast::{ TableFactor, TableWithJoins, }; +use crate::catalog::root_catalog::SchemaPath; use crate::error::{ErrorCode, Result as RwResult}; use crate::session::{current, SessionImpl}; +use crate::{Binder, TableCatalog}; pin_project! { /// Wrapper struct that converts a stream of DataChunk to a stream of RowSet based on formatting @@ -238,6 +240,24 @@ pub fn convert_logstore_u64_to_unix_millis(logstore_u64: u64) -> u64 { Epoch::from(logstore_u64).as_unix_millis() } +pub fn get_table_catalog_by_table_name( + session: &SessionImpl, + table_name: &ObjectName, +) -> RwResult<(Arc, String)> { + let db_name = session.database(); + let (schema_name, real_table_name) = + Binder::resolve_schema_qualified_name(db_name, table_name.clone())?; + let search_path = session.config().search_path(); + let user_name = &session.auth_context().user_name; + + let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); + let reader = session.env().catalog_reader().read_guard(); + let (table, schema_name) = + reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?; + + Ok((table.clone(), schema_name.to_string())) +} + #[cfg(test)] mod tests { use postgres_types::{ToSql, Type};