Skip to content

Commit

Permalink
feat(iceberg): support drop table for nimtable (#18404)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Sep 20, 2024
1 parent 1aa5580 commit 31361bb
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ public boolean tableExists(String tableIdentifier) {
return catalog.tableExists(id);
}

/**
* Drop a table from the catalog.
*
* @param tableIdentifier The identifier of the table to drop.
* @return true if the table was dropped, false otherwise.
*/
public boolean dropTable(String tableIdentifier) {
TableIdentifier id = TableIdentifier.parse(tableIdentifier);
return catalog.dropTable(id);
}

/**
* Create JniCatalogWrapper instance.
*
Expand Down
25 changes: 23 additions & 2 deletions src/connector/src/sink/iceberg/jni_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,29 @@ impl CatalogV2 for JniCatalog {
}

/// Drop a table from the catalog.
async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> {
todo!()
async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
execute_with_jni_env(self.jvm, |env| {
let table_name_str = format!(
"{}.{}",
table.namespace().clone().inner().into_iter().join("."),
table.name()
);

let table_name_jstr = env.new_string(&table_name_str).unwrap();

call_method!(env, self.java_catalog.as_obj(), {boolean dropTable(String)},
&table_name_jstr)
.with_context(|| format!("Failed to drop iceberg table: {table_name_str}"))?;

Ok(())
})
.map_err(|e| {
iceberg::Error::new(
iceberg::ErrorKind::Unexpected,
"Failed to load iceberg table.",
)
.with_source(e)
})
}

/// Check if a table exists in the catalog.
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ impl IcebergConfig {
}

impl IcebergConfig {
fn full_table_name_v2(&self) -> Result<TableIdent> {
pub fn full_table_name_v2(&self) -> Result<TableIdent> {
let ret = if let Some(database_name) = &self.database_name {
TableIdent::from_strs(vec![database_name, &self.table_name])
} else {
Expand All @@ -528,7 +528,7 @@ impl IcebergConfig {
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
}

async fn create_catalog_v2(&self) -> ConnectorResult<Arc<dyn CatalogV2>> {
pub async fn create_catalog_v2(&self) -> ConnectorResult<Arc<dyn CatalogV2>> {
match self.catalog_type() {
"storage" => {
let config = StorageCatalogConfig::builder()
Expand Down
8 changes: 6 additions & 2 deletions src/connector/src/sink/iceberg/storage_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,12 @@ impl Catalog for StorageCatalog {
}

/// Drop a table from the catalog.
async fn drop_table(&self, _table: &TableIdent) -> iceberg::Result<()> {
todo!()
async fn drop_table(&self, table: &TableIdent) -> iceberg::Result<()> {
let table = self.load_table(table).await?;
table
.file_io()
.remove_all(table.metadata().location())
.await
}

/// Check if a table exists in the catalog.
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ impl TableCatalog {
self.table_type
}

pub fn engine(&self) -> Engine {
self.engine
}

pub fn is_table(&self) -> bool {
self.table_type == TableType::Table
}
Expand Down
81 changes: 75 additions & 6 deletions src/frontend/src/handler/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_sqlparser::ast::ObjectName;
use risingwave_common::catalog::Engine;
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::ConnectorProperties;
use risingwave_sqlparser::ast::{Ident, ObjectName};

use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::catalog::table_catalog::{TableType, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
use crate::error::Result;
use crate::handler::HandlerArgs;

Expand All @@ -28,15 +32,15 @@ pub async fn handle_drop_table(
if_exists: bool,
cascade: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let session = handler_args.session.clone();
let db_name = session.database();
let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let (source_id, table_id) = {
let (source_id, table_id, engine) = {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
match reader.get_created_table_by_name(db_name, schema_path, &table_name) {
Expand All @@ -57,10 +61,75 @@ pub async fn handle_drop_table(
if table.table_type() != TableType::Table {
return Err(table.bad_drop_error());
}

(table.associated_source_id(), table.id())
(table.associated_source_id(), table.id(), table.engine)
};

match engine {
Engine::Iceberg => {
let source = session
.env()
.catalog_reader()
.read_guard()
.get_source_by_name(
db_name,
schema_path,
&(ICEBERG_SOURCE_PREFIX.to_string() + &table_name),
)
.map(|(source, _)| source.clone())?;
// TODO(nimtable): handle drop table failures in the middle.
// Drop sink
// Drop iceberg table
// - Purge table from warehouse
// - Drop table from catalog
// Drop source
crate::handler::drop_sink::handle_drop_sink(
handler_args.clone(),
ObjectName::from(vec![Ident::from(
(ICEBERG_SINK_PREFIX.to_string() + &table_name).as_str(),
)]),
true,
false,
)
.await?;

let config = ConnectorProperties::extract(source.with_properties.clone(), false)?;
if let ConnectorProperties::Iceberg(iceberg_properties) = config {
let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config();
let iceberg_catalog = iceberg_config
.create_catalog_v2()
.await
.context("Unable to load iceberg catalog")?;
let table_id = iceberg_config
.full_table_name_v2()
.context("Unable to parse table name")?;
let table = iceberg_catalog
.load_table(&table_id)
.await
.context("failed to load iceberg table")?;
table
.file_io()
.remove_all(table.metadata().location())
.await
.context("failed to purge iceberg table")?;
iceberg_catalog
.drop_table(&table_id)
.await
.context("failed to drop iceberg table")?;
}

crate::handler::drop_source::handle_drop_source(
handler_args.clone(),
ObjectName::from(vec![Ident::from(
(ICEBERG_SOURCE_PREFIX.to_string() + &table_name).as_str(),
)]),
true,
false,
)
.await?;
}
Engine::Hummock => {}
}

let catalog_writer = session.catalog_writer()?;
catalog_writer
.drop_table(source_id.map(|id| id.table_id), table_id, cascade)
Expand Down

0 comments on commit 31361bb

Please sign in to comment.