From b2230a55c2643adc9809df48dc005ef26871a71a Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 4 Sep 2024 16:45:00 +0800 Subject: [PATCH] feat(iceberg): support drop table for nimtable (#18404) --- .../connector/catalog/JniCatalogWrapper.java | 11 +++ .../connector_common/iceberg/jni_catalog.rs | 25 +++++- .../iceberg/storage_catalog.rs | 8 +- src/frontend/src/catalog/table_catalog.rs | 4 + src/frontend/src/handler/drop_table.rs | 81 +++++++++++++++++-- 5 files changed, 119 insertions(+), 10 deletions(-) diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/catalog/JniCatalogWrapper.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/catalog/JniCatalogWrapper.java index 30e8230a6dc9b..9c3633ef578c8 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/catalog/JniCatalogWrapper.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/catalog/JniCatalogWrapper.java @@ -96,6 +96,17 @@ public boolean tableExists(String tableIdentifier) { return catalog.tableExists(id); } + /** + * Drop a table from the catalog. + * + * @param tableIdentifier The identifier of the table to drop. + * @return true if the table was dropped, false otherwise. + */ + public boolean dropTable(String tableIdentifier) { + TableIdentifier id = TableIdentifier.parse(tableIdentifier); + return catalog.dropTable(id); + } + /** * Create JniCatalogWrapper instance. * diff --git a/src/connector/src/connector_common/iceberg/jni_catalog.rs b/src/connector/src/connector_common/iceberg/jni_catalog.rs index 6529ea733428d..3fedfac70c72d 100644 --- a/src/connector/src/connector_common/iceberg/jni_catalog.rs +++ b/src/connector/src/connector_common/iceberg/jni_catalog.rs @@ -342,8 +342,29 @@ impl CatalogV2 for JniCatalog { } /// Drop a table from the catalog. - async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> { - todo!() + async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> { + execute_with_jni_env(self.jvm, |env| { + let table_name_str = format!( + "{}.{}", + table.namespace().clone().inner().into_iter().join("."), + table.name() + ); + + let table_name_jstr = env.new_string(&table_name_str).unwrap(); + + call_method!(env, self.java_catalog.as_obj(), {boolean dropTable(String)}, + &table_name_jstr) + .with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?; + + Ok(()) + }) + .map_err(|e| { + iceberg::Error::new( + iceberg::ErrorKind::Unexpected, + "Failed to load iceberg table.", + ) + .with_source(e) + }) } /// Check if a table exists in the catalog. diff --git a/src/connector/src/connector_common/iceberg/storage_catalog.rs b/src/connector/src/connector_common/iceberg/storage_catalog.rs index cd7bb2ca4ba0a..08c4e48f538f0 100644 --- a/src/connector/src/connector_common/iceberg/storage_catalog.rs +++ b/src/connector/src/connector_common/iceberg/storage_catalog.rs @@ -292,8 +292,12 @@ impl Catalog for StorageCatalog { } /// Drop a table from the catalog. - async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> { - todo!() + async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> { + let table = self.load_table(table).await?; + table + .file_io() + .remove_all(table.metadata().location()) + .await } /// Check if a table exists in the catalog. diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index c99267015abac..11be5c2e5d1bf 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -292,6 +292,10 @@ impl TableCatalog { self.table_type } + pub fn engine(&self) -> Engine { + self.engine + } + pub fn is_table(&self) -> bool { self.table_type == TableType::Table } diff --git a/src/frontend/src/handler/drop_table.rs b/src/frontend/src/handler/drop_table.rs index 2f85eb98d704b..dc471414f7e4c 100644 --- a/src/frontend/src/handler/drop_table.rs +++ b/src/frontend/src/handler/drop_table.rs @@ -12,13 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::Context; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_sqlparser::ast::ObjectName; +use risingwave_common::catalog::Engine; +use risingwave_connector::sink::iceberg::IcebergConfig; +use risingwave_connector::source::ConnectorProperties; +use risingwave_sqlparser::ast::{Ident, ObjectName}; use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::root_catalog::SchemaPath; -use crate::catalog::table_catalog::TableType; +use crate::catalog::table_catalog::{TableType, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX}; use crate::error::Result; use crate::handler::HandlerArgs; @@ -28,7 +32,7 @@ pub async fn handle_drop_table( if_exists: bool, cascade: bool, ) -> Result { - let session = handler_args.session; + let session = handler_args.session.clone(); let db_name = session.database(); let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; let search_path = session.config().search_path(); @@ -36,7 +40,7 @@ pub async fn handle_drop_table( let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name); - let (source_id, table_id) = { + let (source_id, table_id, engine) = { let reader = session.env().catalog_reader().read_guard(); let (table, schema_name) = match reader.get_created_table_by_name(db_name, schema_path, &table_name) { @@ -57,10 +61,75 @@ pub async fn handle_drop_table( if table.table_type() != TableType::Table { return Err(table.bad_drop_error()); } - - (table.associated_source_id(), table.id()) + (table.associated_source_id(), table.id(), table.engine) }; + match engine { + Engine::Iceberg => { + let source = session + .env() + .catalog_reader() + .read_guard() + .get_source_by_name( + db_name, + schema_path, + &(ICEBERG_SOURCE_PREFIX.to_string() + &table_name), + ) + .map(|(source, _)| source.clone())?; + // TODO(nimtable): handle drop table failures in the middle. + // Drop sink + // Drop iceberg table + // - Purge table from warehouse + // - Drop table from catalog + // Drop source + crate::handler::drop_sink::handle_drop_sink( + handler_args.clone(), + ObjectName::from(vec![Ident::from( + (ICEBERG_SINK_PREFIX.to_string() + &table_name).as_str(), + )]), + true, + false, + ) + .await?; + + let config = ConnectorProperties::extract(source.with_properties.clone(), false)?; + if let ConnectorProperties::Iceberg(iceberg_properties) = config { + let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config(); + let iceberg_catalog = iceberg_config + .create_catalog_v2() + .await + .context("Unable to load iceberg catalog")?; + let table_id = iceberg_config + .full_table_name_v2() + .context("Unable to parse table name")?; + let table = iceberg_catalog + .load_table(&table_id) + .await + .context("failed to load iceberg table")?; + table + .file_io() + .remove_all(table.metadata().location()) + .await + .context("failed to purge iceberg table")?; + iceberg_catalog + .drop_table(&table_id) + .await + .context("failed to drop iceberg table")?; + } + + crate::handler::drop_source::handle_drop_source( + handler_args.clone(), + ObjectName::from(vec![Ident::from( + (ICEBERG_SOURCE_PREFIX.to_string() + &table_name).as_str(), + )]), + true, + false, + ) + .await?; + } + Engine::Hummock => {} + } + let catalog_writer = session.catalog_writer()?; catalog_writer .drop_table(source_id.map(|id| id.table_id), table_id, cascade)