From 9f6140c63aba0f1b055d0f89bc31d8d3f0dbc038 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 20 Sep 2024 15:41:59 +0800 Subject: [PATCH] resolve conficts with iceberg properties refactor --- .../test_case/iceberg_source_all_delete.slt | 3 + .../iceberg_source_position_delete.slt | 3 + .../src/connector_common/iceberg/mod.rs | 141 ++++++++++++++++++ src/connector/src/sink/iceberg/mod.rs | 83 +---------- src/connector/src/source/iceberg/mod.rs | 24 ++- src/connector/with_options_sink.yaml | 10 +- src/connector/with_options_source.yaml | 9 +- src/frontend/src/handler/drop_table.rs | 35 +++-- src/meta/model_v2/migration/src/lib.rs | 2 +- 9 files changed, 199 insertions(+), 111 deletions(-) diff --git a/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt index 6b4f984dabdc..49501fcbca0e 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt @@ -1,4 +1,7 @@ # Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = default(true), so we'll commit every 10s. +statement ok +set sink_decouple=true; + statement ok set streaming_parallelism=4; diff --git a/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt index 40d665d7144e..812214c181b4 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt @@ -1,4 +1,7 @@ # Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = default(true), so we'll commit every 10s. +statement ok +set sink_decouple=true; + statement ok set streaming_parallelism=4; diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs index 37d4e5e6f5a0..95c141361671 100644 --- a/src/connector/src/connector_common/iceberg/mod.rs +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -20,16 +20,19 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::Context; +use clap::ValueEnum; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use icelake::catalog::{ load_iceberg_base_catalog_config, BaseCatalogConfig, CATALOG_NAME, CATALOG_TYPE, }; use risingwave_common::bail; +use risingwave_common::config::MetaBackend; use serde_derive::Deserialize; use serde_with::serde_as; use url::Url; use with_options::WithOptions; +use crate::deserialize_optional_bool_from_string; use crate::error::ConnectorResult; #[serde_as] @@ -62,6 +65,10 @@ pub struct IcebergCommon { /// Full name of table, must include schema name. #[serde(rename = "table.name")] pub table_name: String, + + /// enable config load currently is used by nimtable, so it only support jdbc catalog. + #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")] + pub enable_config_load: Option, } impl IcebergCommon { @@ -76,12 +83,146 @@ impl IcebergCommon { .unwrap_or_else(|| "risingwave".to_string()) } + fn build_jni_catalog_configs_for_config_load( + &self, + path_style_access: &Option, + java_catalog_props: &HashMap, + ) -> ConnectorResult<(BaseCatalogConfig, HashMap)> { + if self.catalog_type.as_deref() != Some("jdbc") { + bail!("enable_config_load only support jdbc catalog right now"); + } + + let mut iceberg_configs = HashMap::new(); + + let base_catalog_config = { + let catalog_type = self.catalog_type().to_string(); + + iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone()); + iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name()); + + let Ok(s3_region) = std::env::var("AWS_REGION") else { + bail!("To create an iceberg engine table, AWS_REGION needed to be set"); + }; + + // icelake + iceberg_configs.insert( + "iceberg.table.io.region".to_string(), + s3_region.clone().to_string(), + ); + // iceberg-rust + iceberg_configs.insert( + ("iceberg.table.io.".to_string() + S3_REGION).to_string(), + s3_region.clone().to_string(), + ); + + let Ok(s3_bucket) = std::env::var("AWS_S3_BUCKET") else { + bail!("To create an iceberg engine table, AWS_S3_BUCKET needed to be set"); + }; + + let Ok(data_directory) = std::env::var("RW_DATA_DIRECTORY") else { + bail!("To create an iceberg engine table, RW_DATA_DIRECTORY needed to be set"); + }; + let warehouse_path = format!("s3://{}/{}/nimtable", s3_bucket, data_directory); + + let (bucket, _) = { + let url = Url::parse(&warehouse_path) + .with_context(|| format!("Invalid warehouse path: {}", warehouse_path))?; + let bucket = url + .host_str() + .with_context(|| { + format!("Invalid s3 path: {}, bucket is missing", warehouse_path) + })? + .to_string(); + let root = url.path().trim_start_matches('/').to_string(); + (bucket, root) + }; + + iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); + + load_iceberg_base_catalog_config(&iceberg_configs)? + }; + + // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/ + let mut java_catalog_configs = HashMap::new(); + { + let Ok(meta_store_endpoint) = std::env::var("RW_SQL_ENDPOINT") else { + bail!("To create an iceberg engine table, RW_SQL_ENDPOINT needed to be set"); + }; + + let Ok(meta_store_database) = std::env::var("RW_SQL_DATABASE") else { + bail!("To create an iceberg engine table, RW_SQL_DATABASE needed to be set"); + }; + + let Ok(meta_store_backend) = std::env::var("RW_BACKEND") else { + bail!("To create an iceberg engine table, RW_BACKEND needed to be set"); + }; + let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else { + bail!("failed to parse meta backend: {}", meta_store_backend); + }; + + let catalog_uri = match meta_backend { + MetaBackend::Postgres => format!( + "jdbc:postgresql://{}/{}", + meta_store_endpoint.clone(), + meta_store_database.clone() + ), + MetaBackend::Mysql => format!( + "jdbc:mysql://{}/{}", + meta_store_endpoint.clone(), + meta_store_database.clone() + ), + MetaBackend::Sqlite | MetaBackend::Etcd | MetaBackend::Sql | MetaBackend::Mem => { + bail!( + "Unsupported meta backend for iceberg engine table: {}", + meta_store_backend + ); + } + }; + + java_catalog_configs.insert("uri".to_string(), catalog_uri.to_string()); + + java_catalog_configs.insert("warehouse".to_string(), self.warehouse_path.clone()); + java_catalog_configs.extend(java_catalog_props.clone()); + + // Currently we only support s3, so let's set it to s3 + java_catalog_configs.insert( + "io-impl".to_string(), + "org.apache.iceberg.aws.s3.S3FileIO".to_string(), + ); + + if let Some(path_style_access) = path_style_access { + java_catalog_configs.insert( + "s3.path-style-access".to_string(), + path_style_access.to_string(), + ); + } + + let Ok(meta_store_user) = std::env::var("RW_SQL_USERNAME") else { + bail!("To create an iceberg engine table, RW_SQL_USERNAME needed to be set"); + }; + + let Ok(meta_store_password) = std::env::var("RW_SQL_PASSWORD") else { + bail!("To create an iceberg engine table, RW_SQL_PASSWORD needed to be set"); + }; + java_catalog_configs.insert("jdbc.user".to_string(), meta_store_user); + java_catalog_configs.insert("jdbc.password".to_string(), meta_store_password); + } + + Ok((base_catalog_config, java_catalog_configs)) + } + /// For both V1 and V2. fn build_jni_catalog_configs( &self, path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult<(BaseCatalogConfig, HashMap)> { + if let Some(enable_config_load) = self.enable_config_load + && enable_config_load + { + return self + .build_jni_catalog_configs_for_config_load(path_style_access, java_catalog_props); + } let mut iceberg_configs = HashMap::new(); let base_catalog_config = { diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 60838222febc..69457e564258 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -22,10 +22,6 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; use async_trait::async_trait; -use clap::ValueEnum; -use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; -use iceberg::spec::TableMetadata; -use iceberg::table::Table as TableV2; use iceberg::{Catalog as CatalogV2, NamespaceIdent, TableCreation, TableIdent}; use icelake::catalog::CatalogRef; use icelake::io_v2::input_wrapper::{DeltaWriter, RecordBatchWriter}; @@ -45,7 +41,6 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::Schema; -use risingwave_common::config::MetaBackend; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; @@ -109,10 +104,6 @@ pub struct IcebergConfig { #[serde(default, deserialize_with = "deserialize_bool_from_string")] pub create_table_if_not_exists: bool, - - /// enable config load currently is used by nimtable, so it only support jdbc catalog. - #[serde(default, deserialize_with = "deserialize_bool_from_string")] - pub enable_config_load: bool, } impl IcebergConfig { @@ -171,82 +162,10 @@ impl IcebergConfig { "`commit_checkpoint_interval` must be greater than 0" ))); } - config = config.fill_for_config_load()?; Ok(config) } - pub fn fill_for_config_load(mut self) -> Result { - if self.enable_config_load { - if self.catalog_type.as_deref() != Some("jdbc") { - return Err(SinkError::Config(anyhow!( - "enable_config_load only support jdbc catalog right now" - ))); - } - - let Ok(s3_region) = std::env::var("AWS_REGION") else { - bail!("To create an iceberg engine table, AWS_REGION needed to be set"); - }; - self.region = Some(s3_region); - - let Ok(s3_bucket) = std::env::var("AWS_S3_BUCKET") else { - bail!("To create an iceberg engine table, AWS_S3_BUCKET needed to be set"); - }; - - let Ok(data_directory) = std::env::var("RW_DATA_DIRECTORY") else { - bail!("To create an iceberg engine table, RW_DATA_DIRECTORY needed to be set"); - }; - self.path = format!("s3://{}/{}/nimtable", s3_bucket, data_directory); - - let Ok(meta_store_endpoint) = std::env::var("RW_SQL_ENDPOINT") else { - bail!("To create an iceberg engine table, RW_SQL_ENDPOINT needed to be set"); - }; - - let Ok(meta_store_database) = std::env::var("RW_SQL_DATABASE") else { - bail!("To create an iceberg engine table, RW_SQL_DATABASE needed to be set"); - }; - - let Ok(meta_store_backend) = std::env::var("RW_BACKEND") else { - bail!("To create an iceberg engine table, RW_BACKEND needed to be set"); - }; - let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else { - bail!("failed to parse meta backend: {}", meta_store_backend); - }; - - self.uri = match meta_backend { - MetaBackend::Postgres => Some(format!( - "jdbc:postgresql://{}/{}", - meta_store_endpoint.clone(), - meta_store_database.clone() - )), - MetaBackend::Mysql => Some(format!( - "jdbc:mysql://{}/{}", - meta_store_endpoint.clone(), - meta_store_database.clone() - )), - MetaBackend::Sqlite | MetaBackend::Etcd | MetaBackend::Sql | MetaBackend::Mem => { - bail!( - "Unsupported meta backend for iceberg engine table: {}", - meta_store_backend - ); - } - }; - - let Ok(meta_store_user) = std::env::var("RW_SQL_USERNAME") else { - bail!("To create an iceberg engine table, RW_SQL_USERNAME needed to be set"); - }; - - let Ok(meta_store_password) = std::env::var("RW_SQL_PASSWORD") else { - bail!("To create an iceberg engine table, RW_SQL_PASSWORD needed to be set"); - }; - - let java_catalog_props = &mut self.java_catalog_props; - java_catalog_props.insert("jdbc.user".to_string(), meta_store_user); - java_catalog_props.insert("jdbc.password".to_string(), meta_store_password); - } - Ok(self) - } - pub fn catalog_type(&self) -> &str { self.common.catalog_type() } @@ -1038,6 +957,7 @@ mod test { catalog_name: Some("demo".to_string()), database_name: Some("demo_db".to_string()), table_name: "demo_table".to_string(), + enable_config_load: None, }, r#type: "upsert".to_string(), force_append_only: false, @@ -1049,7 +969,6 @@ mod test { .collect(), commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE, create_table_if_not_exists: false, - enable_config_load: false, }; assert_eq!(iceberg_config, expected_iceberg_config); diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 2385b3c4c11b..f5ee75d9c733 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -15,6 +15,7 @@ pub mod parquet_file_reader; use std::collections::HashMap; +use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; @@ -30,7 +31,6 @@ use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; use crate::connector_common::IcebergCommon; -use crate::deserialize_optional_bool_from_string; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::ParserConfig; use crate::source::{ @@ -50,20 +50,28 @@ pub struct IcebergProperties { #[serde(rename = "catalog.jdbc.password")] pub jdbc_password: Option, - #[serde( - rename = "enable_config_load", - default, - deserialize_with = "deserialize_optional_bool_from_string" - )] - pub enable_config_load: Option, - #[serde(flatten)] pub unknown_fields: HashMap, } use iceberg::table::Table as TableV2; +use iceberg::Catalog as CatalogV2; impl IcebergProperties { + pub async fn create_catalog_v2(&self) -> ConnectorResult> { + let mut java_catalog_props = HashMap::new(); + if let Some(jdbc_user) = self.jdbc_user.clone() { + java_catalog_props.insert("jdbc.user".to_string(), jdbc_user); + } + if let Some(jdbc_password) = self.jdbc_password.clone() { + java_catalog_props.insert("jdbc.password".to_string(), jdbc_password); + } + // TODO: support path_style_access and java_catalog_props for iceberg source + self.common + .create_catalog_v2(&None, &java_catalog_props) + .await + } + pub async fn load_table_v2(&self) -> ConnectorResult { let mut java_catalog_props = HashMap::new(); if let Some(jdbc_user) = self.jdbc_user.clone() { diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 200af203a742..8934854c22b4 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -327,6 +327,11 @@ IcebergConfig: field_type: String comments: Full name of table, must include schema name. required: true + - name: enable_config_load + field_type: bool + comments: enable config load currently is used by nimtable, so it only support jdbc catalog. + required: false + default: Default::default - name: s3.path.style.access field_type: bool required: false @@ -347,11 +352,6 @@ IcebergConfig: field_type: bool required: false default: Default::default - - name: enable_config_load - field_type: bool - comments: enable config load currently is used by nimtable, so it only support jdbc catalog. - required: false - default: Default::default KafkaConfig: fields: - name: properties.bootstrap.server diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index c6a8fd0f8a85..dc8135bbea32 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -117,16 +117,17 @@ IcebergProperties: field_type: String comments: Full name of table, must include schema name. required: true + - name: enable_config_load + field_type: bool + comments: enable config load currently is used by nimtable, so it only support jdbc catalog. + required: false + default: Default::default - name: catalog.jdbc.user field_type: String required: false - name: catalog.jdbc.password field_type: String required: false - - name: enable_config_load - field_type: bool - required: false - default: Default::default KafkaProperties: fields: - name: bytes.per.second diff --git a/src/frontend/src/handler/drop_table.rs b/src/frontend/src/handler/drop_table.rs index 936f5e9def31..f2618b0d523a 100644 --- a/src/frontend/src/handler/drop_table.rs +++ b/src/frontend/src/handler/drop_table.rs @@ -15,6 +15,7 @@ use anyhow::Context; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::Engine; +use risingwave_common::util::tokio_util::either::Either; use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::ConnectorProperties; use risingwave_sqlparser::ast::{Ident, ObjectName}; @@ -67,7 +68,7 @@ pub async fn handle_drop_table( match engine { Engine::Iceberg => { - let iceberg_config = if let Ok(source) = session + let either = if let Ok(source) = session .env() .catalog_reader() .read_guard() @@ -80,7 +81,7 @@ pub async fn handle_drop_table( { let config = ConnectorProperties::extract(source.with_properties.clone(), false)?; if let ConnectorProperties::Iceberg(iceberg_properties) = config { - Some(iceberg_properties.to_iceberg_config()) + Some(Either::Left(iceberg_properties)) } else { unreachable!("must be iceberg source"); } @@ -96,7 +97,8 @@ pub async fn handle_drop_table( .map(|(sink, _)| sink.clone()) { // If iceberg source does not exist, use iceberg sink to load iceberg table - Some(IcebergConfig::from_btreemap(sink.properties.clone())?) + let iceberg_config = IcebergConfig::from_btreemap(sink.properties.clone())?; + Some(Either::Right(iceberg_config)) } else { None }; @@ -117,14 +119,25 @@ pub async fn handle_drop_table( ) .await?; - if let Some(iceberg_config) = iceberg_config { - let iceberg_catalog = iceberg_config - .create_catalog_v2() - .await - .context("Unable to load iceberg catalog")?; - let table_id = iceberg_config - .full_table_name_v2() - .context("Unable to parse table name")?; + if let Some(either) = either { + let (iceberg_catalog, table_id) = match either { + Either::Left(iceberg_properties) => { + let catalog = iceberg_properties.create_catalog_v2().await?; + let table_id = iceberg_properties + .common + .full_table_name_v2() + .context("Unable to parse table name")?; + (catalog, table_id) + } + Either::Right(iceberg_config) => { + let catalog = iceberg_config.create_catalog_v2().await?; + let table_id = iceberg_config + .full_table_name_v2() + .context("Unable to parse table name")?; + (catalog, table_id) + } + }; + if let Ok(table) = iceberg_catalog .load_table(&table_id) .await diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 8351a2a00dda..91d081cec59f 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -21,8 +21,8 @@ mod m20240702_084927_unnecessary_fk; mod m20240726_063833_auto_schema_change; mod m20240806_143329_add_rate_limit_to_source_catalog; mod m20240820_081248_add_time_travel_per_table_epoch; -mod m20240911_083152_variable_vnode_count; mod m20240909_101830_nimtable_dev; +mod m20240911_083152_variable_vnode_count; pub struct Migrator;