diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index bf8762c7136b..d2acf17c37bc 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -43,8 +43,8 @@ use risingwave_pb::plan_common::{ AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc, }; use risingwave_pb::stream_plan::StreamFragmentGraph; -use risingwave_sqlparser::ast::{CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, CreateSink, CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format, FormatEncodeOptions, Ident, ObjectName, OnConflict, SourceWatermark, TableConstraint, WithProperties}; -use risingwave_sqlparser::parser::IncludeOption; +use risingwave_sqlparser::ast::{CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, CreateSink, CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format, FormatEncodeOptions, Ident, ObjectName, OnConflict, SourceWatermark, Statement, TableConstraint, WithProperties}; +use risingwave_sqlparser::parser::{IncludeOption, Parser}; use thiserror_ext::AsReport; use super::{create_sink, create_source, RwPgResponse}; @@ -1290,20 +1290,6 @@ pub async fn handle_create_table( .await?; } Engine::Iceberg => { - let with_properties = WithProperties(vec![]); - let create_sink_stmt = CreateSinkStatement { - if_not_exists: false, - sink_name: ObjectName::from(vec![Ident::from( - (ICEBERG_SINK_PREFIX.to_string() + &table_name.real_value()).as_str(), - )]), - with_properties, - sink_from: CreateSink::From(table_name.clone()), - columns: vec![], - emit_mode: None, - sink_schema: None, - into_table_name: None, - }; - // export AWS_REGION=your_region // export AWS_ACCESS_KEY_ID=your_access_key // export AWS_SECRET_ACCESS_KEY=your_secret_key @@ -1341,23 +1327,21 @@ pub async fn handle_create_table( } else { "jdbc:sqlite:/tmp/sqlite/iceberg.db".to_string() }; + let meta_store_user = if let Ok(user) = std::env::var("META_STORE_USER") { user } else { "xxx".to_string() }; + let meta_store_password = if let Ok(password) = std::env::var("META_STORE_PASSWORD") { password } else { "xxx".to_string() }; - let catalog_name = "nimtable".to_string(); - let database_name = "nimtable_db".to_string(); - let mut sink_handler_args = handler_args.clone(); - let mut with = BTreeMap::new(); - with.insert("connector".to_string(), "iceberg".to_string()); - // TODO: don't hard code primary key + // Iceberg sinks require a primary key, if none is provided, we will use the _row_id column + // Fetch primary key from columns let mut pks = column_defs .into_iter() .filter(|c| { @@ -1367,6 +1351,8 @@ pub async fn handle_create_table( }) .map(|c| c.name.to_string()) .collect::>(); + + // Fetch primary key from constraints if pks.is_empty() { pks = constraints .into_iter() @@ -1388,12 +1374,45 @@ pub async fn handle_create_table( }) .collect::>(); } - if pks.is_empty() { - return Err(ErrorCode::InvalidInputSyntax( - "Primary key is required for iceberg table".to_string(), - ) - .into()); - } + + // There is a table without primary key. We will use _row_id as primary key + let sink_from = if pks.is_empty() { + pks = vec!["_row_id".to_string()]; + let [stmt]: [_; 1] = + Parser::parse_sql(&format!("select _row_id, * from {}", table_name)) + .context("unable to parse query")? + .try_into() + .unwrap(); + + let Statement::Query(query) = &stmt else { + panic!("unexpected statement: {:?}", stmt); + }; + CreateSink::AsQuery(query.clone()) + } else { + CreateSink::From(table_name.clone()) + }; + + let with_properties = WithProperties(vec![]); + let create_sink_stmt = CreateSinkStatement { + if_not_exists: false, + sink_name: ObjectName::from(vec![Ident::from( + (ICEBERG_SINK_PREFIX.to_string() + &table_name.real_value()).as_str(), + )]), + with_properties, + sink_from, + columns: vec![], + emit_mode: None, + sink_schema: None, + into_table_name: None, + }; + + let catalog_name = "nimtable".to_string(); + let database_name = "nimtable_db".to_string(); + + let mut sink_handler_args = handler_args.clone(); + let mut with = BTreeMap::new(); + with.insert("connector".to_string(), "iceberg".to_string()); + with.insert("primary_key".to_string(), pks.join(",")); with.insert("type".to_string(), "upsert".to_string()); with.insert("catalog.type".to_string(), "jdbc".to_string());