diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/catalog/JniCatalogWrapper.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/catalog/JniCatalogWrapper.java index 583747f3b2f3f..30e8230a6dc9b 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/catalog/JniCatalogWrapper.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/catalog/JniCatalogWrapper.java @@ -22,8 +22,10 @@ import java.util.Objects; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.rest.CatalogHandlers; +import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.LoadTableResponse; @@ -62,6 +64,38 @@ public String updateTable(String updateTableRequest) throws Exception { return RESTObjectMapper.mapper().writer().writeValueAsString(resp); } + /** + * Create table through this prox. + * + * @param namespaceStr String. + * @param createTableRequest Request serialized using json. + * @return Response serialized using json. + * @throws Exception + */ + public String createTable(String namespaceStr, String createTableRequest) throws Exception { + Namespace namespace; + if (namespaceStr == null) { + namespace = Namespace.empty(); + } else { + namespace = Namespace.of(namespaceStr); + } + CreateTableRequest req = + RESTObjectMapper.mapper().readValue(createTableRequest, CreateTableRequest.class); + LoadTableResponse resp = CatalogHandlers.createTable(catalog, namespace, req); + return RESTObjectMapper.mapper().writer().writeValueAsString(resp); + } + + /** + * Checks if a table exists in the catalog. + * + * @param tableIdentifier The identifier of the table to check. + * @return true if the table exists, false otherwise. + */ + public boolean tableExists(String tableIdentifier) { + TableIdentifier id = TableIdentifier.parse(tableIdentifier); + return catalog.tableExists(id); + } + /** * Create JniCatalogWrapper instance. * diff --git a/src/connector/src/sink/iceberg/jni_catalog.rs b/src/connector/src/sink/iceberg/jni_catalog.rs index 6ef251878ff94..b80a6a305870f 100644 --- a/src/connector/src/sink/iceberg/jni_catalog.rs +++ b/src/connector/src/sink/iceberg/jni_catalog.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; use iceberg::io::FileIO; -use iceberg::spec::TableMetadata; +use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec}; use iceberg::table::Table as TableV2; use iceberg::{ Catalog as CatalogV2, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, @@ -34,9 +34,10 @@ use icelake::{ErrorKind, Table, TableIdentifier}; use itertools::Itertools; use jni::objects::{GlobalRef, JObject}; use jni::JavaVM; +use risingwave_common::bail; use risingwave_jni_core::call_method; use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use crate::error::ConnectorResult; @@ -48,6 +49,36 @@ struct LoadTableResponse { pub _config: Option>, } +#[derive(Debug, Serialize)] +#[serde(rename_all = "kebab-case")] +struct CreateTableRequest { + /// The name of the table. + pub name: String, + /// The location of the table. + pub location: Option, + /// The schema of the table. + pub schema: Schema, + /// The partition spec of the table, could be None. + pub partition_spec: Option, + /// The sort order of the table. + pub write_order: Option, + /// The properties of the table. + pub properties: HashMap, +} + +impl From<&TableCreation> for CreateTableRequest { + fn from(value: &TableCreation) -> Self { + Self { + name: value.name.clone(), + location: value.location.clone(), + schema: value.schema.clone(), + partition_spec: value.partition_spec.clone(), + write_order: value.sort_order.clone(), + properties: value.properties.clone(), + } + } +} + #[derive(Debug)] pub struct JniCatalog { java_catalog: GlobalRef, @@ -206,10 +237,58 @@ impl CatalogV2 for JniCatalog { /// Create a new table inside the namespace. async fn create_table( &self, - _namespace: &NamespaceIdent, - _creation: TableCreation, + namespace: &NamespaceIdent, + creation: TableCreation, ) -> iceberg::Result { - todo!() + execute_with_jni_env(self.jvm, |env| { + let namespace_jstr = if namespace.is_empty() { + env.new_string("").unwrap() + } else { + if namespace.len() > 1 { + bail!("Namespace with more than one level is not supported!") + } + env.new_string(&namespace[0]).unwrap() + }; + + let creation_str = serde_json::to_string(&CreateTableRequest::from(&creation))?; + + let creation_jstr = env.new_string(&creation_str).unwrap(); + + let result_json = + call_method!(env, self.java_catalog.as_obj(), {String createTable(String, String)}, + &namespace_jstr, &creation_jstr) + .with_context(|| format!("Failed to create iceberg table: {}", creation.name))?; + + let rust_json_str = jobj_to_str(env, result_json)?; + + let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?; + + let metadata_location = resp.metadata_location.ok_or_else(|| { + iceberg::Error::new( + iceberg::ErrorKind::FeatureUnsupported, + "Loading uncommitted table is not supported!", + ) + })?; + + let table_metadata = resp.metadata; + + let file_io = FileIO::from_path(&metadata_location)? + .with_props(self.config.table_io_configs.iter()) + .build()?; + + Ok(TableV2::builder() + .file_io(file_io) + .identifier(TableIdent::new(namespace.clone(), creation.name)) + .metadata(table_metadata) + .build()) + }) + .map_err(|e| { + iceberg::Error::new( + iceberg::ErrorKind::Unexpected, + "Failed to crete iceberg table.", + ) + .with_source(e) + }) } /// Load table from the catalog. @@ -233,8 +312,8 @@ impl CatalogV2 for JniCatalog { let resp: LoadTableResponse = serde_json::from_str(&rust_json_str)?; let metadata_location = resp.metadata_location.ok_or_else(|| { - icelake::Error::new( - ErrorKind::IcebergFeatureUnsupported, + iceberg::Error::new( + iceberg::ErrorKind::FeatureUnsupported, "Loading uncommitted table is not supported!", ) })?; @@ -268,8 +347,32 @@ impl CatalogV2 for JniCatalog { } /// Check if a table exists in the catalog. - async fn table_exists(&self, _table: &TableIdent) -> iceberg::Result { - todo!() + async fn table_exists(&self, table: &TableIdent) -> iceberg::Result { + execute_with_jni_env(self.jvm, |env| { + let table_name_str = format!( + "{}.{}", + table.namespace().clone().inner().into_iter().join("."), + table.name() + ); + + let table_name_jstr = env.new_string(&table_name_str).unwrap(); + + let exists = + call_method!(env, self.java_catalog.as_obj(), {boolean tableExists(String)}, + &table_name_jstr) + .with_context(|| { + format!("Failed to check iceberg table exists: {table_name_str}") + })?; + + Ok(exists) + }) + .map_err(|e| { + iceberg::Error::new( + iceberg::ErrorKind::Unexpected, + "Failed to load iceberg table.", + ) + .with_source(e) + }) } /// Rename a table in the catalog. @@ -326,7 +429,7 @@ impl JniCatalog { config: base_config, }) }) - .map_err(Into::into) + .map_err(Into::into) } pub fn build_catalog(