From f5199920524e72be38397f3aa56bc5f4684f9eb2 Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 4 Sep 2024 15:36:40 +0800 Subject: [PATCH] feat(iceberg): support create table with primary key (#18384) --- proto/catalog.proto | 9 + src/common/src/catalog/mod.rs | 32 +++ src/frontend/planner_test/src/lib.rs | 2 + src/frontend/src/catalog/table_catalog.rs | 14 +- .../src/handler/alter_table_column.rs | 9 +- src/frontend/src/handler/create_sink.rs | 7 + src/frontend/src/handler/create_table.rs | 224 +++++++++++++++++- src/frontend/src/handler/create_table_as.rs | 7 + src/frontend/src/handler/explain.rs | 1 + src/frontend/src/handler/mod.rs | 3 + src/frontend/src/optimizer/mod.rs | 4 +- .../optimizer/plan_node/stream_materialize.rs | 14 +- src/frontend/src/optimizer/plan_node/utils.rs | 3 +- .../src/scheduler/distributed/query.rs | 3 +- src/meta/model_v2/src/table.rs | 33 ++- src/meta/src/controller/mod.rs | 3 +- src/sqlparser/src/ast/mod.rs | 26 ++ src/sqlparser/src/keywords.rs | 1 + src/sqlparser/src/parser.rs | 16 ++ src/storage/src/filter_key_extractor.rs | 3 +- src/tests/sqlsmith/src/lib.rs | 7 + 21 files changed, 400 insertions(+), 21 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 2bbfa39c10350..01ba0c82c4e17 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -333,6 +333,12 @@ message Table { int32 next_column_id = 2; } + enum Engine { + ENGINE_UNSPECIFIED = 0; + HUMMOCK = 1; + ICEBERG = 2; + } + uint32 id = 1; uint32 schema_id = 2; uint32 database_id = 3; @@ -435,6 +441,9 @@ message Table { // tables and tests. Not to be confused with the global catalog version for // notification service. TableVersion version = 100; + + // Table Engine, currently only support hummock and iceberg + Engine engine = 200; } enum HandleConflictBehavior { diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 86c6e8895c066..bab0d22b7cbc9 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -27,6 +27,7 @@ use futures::stream::BoxStream; pub use internal_table::*; use parse_display::Display; pub use physical_table::*; +use risingwave_pb::catalog::table::PbEngine; use risingwave_pb::catalog::{ CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior, StreamJobStatus as PbStreamJobStatus, @@ -528,6 +529,37 @@ impl ConflictBehavior { } } +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Engine { + /// TODO(nimtable): use iceberg engine as default. + #[default] + Hummock, + Iceberg, +} + +impl Engine { + pub fn from_protobuf(engine: &PbEngine) -> Self { + match engine { + PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock, + PbEngine::Iceberg => Engine::Iceberg, + } + } + + pub fn to_protobuf(self) -> PbEngine { + match self { + Engine::Hummock => PbEngine::Hummock, + Engine::Iceberg => PbEngine::Iceberg, + } + } + + pub fn debug_to_string(self) -> String { + match self { + Engine::Hummock => "Hummock".to_string(), + Engine::Iceberg => "Iceberg".to_string(), + } + } +} + #[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)] pub enum StreamJobStatus { #[default] diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 675b99ad0e145..eb1e726779f09 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -435,6 +435,7 @@ impl TestCase { cdc_table_info, include_column_options, wildcard_idx, + engine, .. } => { let source_schema = source_schema.map(|schema| schema.into_v2_with_warning()); @@ -453,6 +454,7 @@ impl TestCase { with_version_column, cdc_table_info, include_column_options, + engine, ) .await?; } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 8c33e55a0b164..c99267015abac 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -17,7 +17,7 @@ use std::collections::{HashMap, HashSet}; use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::catalog::{ - ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, StreamJobStatus, TableDesc, + ColumnCatalog, ConflictBehavior, CreateType, Engine, Field, Schema, StreamJobStatus, TableDesc, TableId, TableVersionId, }; use risingwave_common::hash::VnodeCountCompat; @@ -187,8 +187,13 @@ pub struct TableCatalog { /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build pub vnode_count: Option, + + pub engine: Engine, } +pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_"; +pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_"; + #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum TableType { /// Tables created by `CREATE TABLE`. @@ -458,6 +463,7 @@ impl TableCatalog { retention_seconds: self.retention_seconds, cdc_table_id: self.cdc_table_id.clone(), maybe_vnode_count: self.vnode_count.map(|v| v as _), + engine: self.engine.to_protobuf().into(), } } @@ -554,6 +560,7 @@ impl From for TableCatalog { fn from(tb: PbTable) -> Self { let id = tb.id; let tb_conflict_behavior = tb.handle_pk_conflict_behavior(); + let tb_engine = tb.engine(); let table_type = tb.get_table_type().unwrap(); let stream_job_status = tb .get_stream_job_status() @@ -586,6 +593,7 @@ impl From for TableCatalog { for idx in &tb.watermark_indices { watermark_columns.insert(*idx as _); } + let engine = Engine::from_protobuf(&tb_engine); Self { id: id.into(), @@ -635,6 +643,7 @@ impl From for TableCatalog { .collect_vec(), cdc_table_id: tb.cdc_table_id, vnode_count: Some(vnode_count), /* from existing (persisted) tables, vnode_count must be set */ + engine, } } } @@ -658,6 +667,7 @@ mod tests { use risingwave_common::test_prelude::*; use risingwave_common::types::*; use risingwave_common::util::sort_util::OrderType; + use risingwave_pb::catalog::table::PbEngine; use risingwave_pb::plan_common::{ AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, }; @@ -726,6 +736,7 @@ mod tests { version_column_index: None, cdc_table_id: None, maybe_vnode_count: Some(233), + engine: PbEngine::Hummock.into(), } .into(); @@ -790,6 +801,7 @@ mod tests { version_column_index: None, cdc_table_id: None, vnode_count: Some(233), + engine: Engine::Hummock, } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index e9ecb5713cb10..987a4a189d3e0 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::catalog::ColumnCatalog; +use risingwave_common::catalog::{ColumnCatalog, Engine}; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::{bail, bail_not_implemented}; @@ -185,12 +185,18 @@ pub async fn get_replace_table_plan( with_version_column, wildcard_idx, cdc_table_info, + engine, .. } = definition else { panic!("unexpected statement type: {:?}", definition); }; + let engine = match engine { + risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock, + risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg, + }; + let (mut graph, table, source, job_type) = generate_stream_graph_for_table( session, table_name, @@ -207,6 +213,7 @@ pub async fn get_replace_table_plan( with_version_column, cdc_table_info, new_version_columns, + engine, ) .await?; diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 1bbdfccdd4034..049e46ddaa188 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -673,12 +673,18 @@ pub(crate) async fn reparse_table_for_sink( append_only, on_conflict, with_version_column, + engine, .. } = definition else { panic!("unexpected statement type: {:?}", definition); }; + let engine = match engine { + risingwave_sqlparser::ast::Engine::Hummock => risingwave_common::catalog::Engine::Hummock, + risingwave_sqlparser::ast::Engine::Iceberg => risingwave_common::catalog::Engine::Iceberg, + }; + let (graph, table, source, _) = generate_stream_graph_for_table( session, table_name, @@ -695,6 +701,7 @@ pub(crate) async fn reparse_table_for_sink( with_version_column, None, None, + engine, ) .await?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 620206898cf97..19b360995e50f 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::rc::Rc; use std::sync::Arc; @@ -23,7 +23,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{ - CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, + CdcTableDesc, ColumnCatalog, ColumnDesc, Engine, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, INITIAL_TABLE_VERSION_ID, }; use risingwave_common::license::Feature; @@ -43,17 +43,18 @@ use risingwave_pb::plan_common::{ }; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_sqlparser::ast::{ - CdcTableInfo, ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, - ExplainOptions, Format, ObjectName, OnConflict, SourceWatermark, TableConstraint, + CdcTableInfo, ColumnDef, ColumnOption, CompatibleSourceSchema, ConnectorSchema, CreateSink, + CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format, + Ident, ObjectName, OnConflict, SourceWatermark, TableConstraint, WithProperties, }; use risingwave_sqlparser::parser::IncludeOption; use thiserror_ext::AsReport; -use super::RwPgResponse; +use super::{create_sink, create_source, RwPgResponse}; use crate::binder::{bind_data_type, bind_struct_field, Clause}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::source_catalog::SourceCatalog; -use crate::catalog::table_catalog::TableVersion; +use crate::catalog::table_catalog::{TableVersion, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX}; use crate::catalog::{check_valid_column_name, ColumnId, DatabaseId, SchemaId}; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; @@ -475,6 +476,7 @@ pub(crate) async fn gen_create_table_plan_with_source( on_conflict: Option, with_version_column: Option, include_column_options: IncludeOption, + engine: Engine, ) -> Result<(PlanRef, Option, PbTable)> { if append_only && source_schema.format != Format::Plain @@ -526,6 +528,7 @@ pub(crate) async fn gen_create_table_plan_with_source( Some(col_id_gen.into_version()), database_id, schema_id, + engine, )?; Ok((plan, Some(pb_source), table)) @@ -543,6 +546,7 @@ pub(crate) fn gen_create_table_plan( append_only: bool, on_conflict: Option, with_version_column: Option, + engine: Engine, ) -> Result<(PlanRef, PbTable)> { let definition = context.normalized_sql().to_owned(); let mut columns = bind_sql_columns(&column_defs)?; @@ -567,6 +571,7 @@ pub(crate) fn gen_create_table_plan( on_conflict, with_version_column, Some(col_id_gen.into_version()), + engine, ) } @@ -583,6 +588,7 @@ pub(crate) fn gen_create_table_plan_without_source( on_conflict: Option, with_version_column: Option, version: Option, + engine: Engine, ) -> Result<(PlanRef, PbTable)> { ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; @@ -625,6 +631,7 @@ pub(crate) fn gen_create_table_plan_without_source( None, database_id, schema_id, + engine, ) } @@ -638,6 +645,7 @@ fn gen_table_plan_with_source( * TABLE` for `CREATE TABLE AS`. */ database_id: DatabaseId, schema_id: SchemaId, + engine: Engine, ) -> Result<(PlanRef, PbTable)> { let cloned_source_catalog = source_catalog.clone(); gen_table_plan_inner( @@ -655,6 +663,7 @@ fn gen_table_plan_with_source( Some(cloned_source_catalog), database_id, schema_id, + engine, ) } @@ -675,6 +684,7 @@ fn gen_table_plan_inner( source_catalog: Option, database_id: DatabaseId, schema_id: SchemaId, + engine: Engine, ) -> Result<(PlanRef, PbTable)> { let session = context.session_ctx().clone(); let retention_seconds = context.with_options().retention_seconds(); @@ -745,6 +755,7 @@ fn gen_table_plan_inner( is_external_source, retention_seconds, None, + engine, )?; let mut table = materialize.table().to_prost(schema_id, database_id); @@ -774,6 +785,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( database_id: DatabaseId, schema_id: SchemaId, table_id: TableId, + engine: Engine, ) -> Result<(PlanRef, PbTable)> { let session = context.session_ctx().clone(); @@ -870,6 +882,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( true, None, Some(cdc_table_id), + engine, )?; let mut table = materialize.table().to_prost(schema_id, database_id); @@ -945,6 +958,7 @@ pub(super) async fn handle_create_table_plan( on_conflict: Option, with_version_column: Option, include_column_options: IncludeOption, + engine: Engine, ) -> Result<(PlanRef, Option, PbTable, TableJobType)> { let col_id_gen = ColumnIdGenerator::new_initial(); let source_schema = check_create_table_with_source( @@ -971,6 +985,7 @@ pub(super) async fn handle_create_table_plan( on_conflict, with_version_column, include_column_options, + engine, ) .await?, TableJobType::General, @@ -987,6 +1002,7 @@ pub(super) async fn handle_create_table_plan( append_only, on_conflict, with_version_column, + engine, )?; ((plan, None, table), TableJobType::General) @@ -1057,6 +1073,7 @@ pub(super) async fn handle_create_table_plan( database_id, schema_id, TableId::placeholder(), + engine, )?; ((plan, None, table), TableJobType::SharedCdcSource) @@ -1236,6 +1253,7 @@ pub async fn handle_create_table( with_version_column: Option, cdc_table_info: Option, include_column_options: IncludeOption, + ast_engine: risingwave_sqlparser::ast::Engine, ) -> Result { let session = handler_args.session.clone(); @@ -1245,6 +1263,11 @@ pub async fn handle_create_table( session.check_cluster_limits().await?; + let engine = match ast_engine { + risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock, + risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg, + }; + if let Either::Right(resp) = session.check_relation_name_duplicated( table_name.clone(), StatementType::CREATE_TABLE, @@ -1255,19 +1278,20 @@ pub async fn handle_create_table( let (graph, source, table, job_type) = { let (plan, source, table, job_type) = handle_create_table_plan( - handler_args, + handler_args.clone(), ExplainOptions::default(), source_schema, cdc_table_info, table_name.clone(), - column_defs, + column_defs.clone(), wildcard_idx, - constraints, + constraints.clone(), source_watermarks, append_only, on_conflict, with_version_column, include_column_options, + engine, ) .await?; @@ -1283,9 +1307,181 @@ pub async fn handle_create_table( ); let catalog_writer = session.catalog_writer()?; - catalog_writer - .create_table(source, table, graph, job_type) - .await?; + + // Handle engine + match engine { + Engine::Hummock => { + catalog_writer + .create_table(source, table, graph, job_type) + .await?; + } + Engine::Iceberg => { + let with_properties = WithProperties(vec![]); + let create_sink_stmt = CreateSinkStatement { + if_not_exists: false, + sink_name: ObjectName::from(vec![Ident::from( + (ICEBERG_SINK_PREFIX.to_string() + &table_name.real_value()).as_str(), + )]), + with_properties, + sink_from: CreateSink::From(table_name.clone()), + columns: vec![], + emit_mode: None, + sink_schema: None, + into_table_name: None, + }; + + // export AWS_REGION=your_region + // export AWS_ACCESS_KEY_ID=your_access_key + // export AWS_SECRET_ACCESS_KEY=your_secret_key + // export RW_S3_ENDPOINT=your_endpoint + // export META_STORE_URI=your_meta_store_uri + // export META_STORE_USER=your_meta_store_user + // export META_STORE_PASSWORD=your_meta_store_password + + let s3_endpoint = if let Ok(endpoint) = std::env::var("RW_S3_ENDPOINT") { + endpoint + } else { + "http://127.0.0.1:9301".to_string() + }; + + let s3_region = if let Ok(region) = std::env::var("AWS_REGION") { + region + } else { + "us-east-1".to_string() + }; + + let s3_ak = if let Ok(ak) = std::env::var("AWS_ACCESS_KEY_ID") { + ak + } else { + "hummockadmin".to_string() + }; + + let s3_sk = if let Ok(sk) = std::env::var("AWS_SECRET_ACCESS_KEY") { + sk + } else { + "hummockadmin".to_string() + }; + + let meta_store_uri = if let Ok(uri) = std::env::var("META_STORE_URI") { + uri + } else { + "jdbc:sqlite:/tmp/sqlite/iceberg.db".to_string() + }; + let meta_store_user = if let Ok(user) = std::env::var("META_STORE_USER") { + user + } else { + "xxx".to_string() + }; + let meta_store_password = if let Ok(password) = std::env::var("META_STORE_PASSWORD") { + password + } else { + "xxx".to_string() + }; + let catalog_name = "nimtable".to_string(); + let database_name = "nimtable_db".to_string(); + + let mut sink_handler_args = handler_args.clone(); + let mut with = BTreeMap::new(); + with.insert("connector".to_string(), "iceberg".to_string()); + // TODO: don't hard code primary key + let mut pks = column_defs + .into_iter() + .filter(|c| { + c.options + .iter() + .any(|o| matches!(o.option, ColumnOption::Unique { is_primary: true })) + }) + .map(|c| c.name.to_string()) + .collect::>(); + if pks.is_empty() { + pks = constraints + .into_iter() + .filter(|c| { + matches!( + c, + TableConstraint::Unique { + is_primary: true, + .. + } + ) + }) + .flat_map(|c| match c { + TableConstraint::Unique { columns, .. } => columns + .into_iter() + .map(|c| c.to_string()) + .collect::>(), + _ => vec![], + }) + .collect::>(); + } + if pks.is_empty() { + return Err(ErrorCode::InvalidInputSyntax( + "Primary key is required for iceberg table".to_string(), + ) + .into()); + } + with.insert("primary_key".to_string(), pks.join(",")); + with.insert("type".to_string(), "upsert".to_string()); + with.insert("catalog.type".to_string(), "jdbc".to_string()); + with.insert("warehouse.path".to_string(), "s3://hummock001".to_string()); + with.insert("s3.endpoint".to_string(), s3_endpoint.clone()); + with.insert("s3.access.key".to_string(), s3_ak.clone()); + with.insert("s3.secret.key".to_string(), s3_sk.clone()); + with.insert("s3.region".to_string(), s3_region.clone()); + with.insert("catalog.uri".to_string(), meta_store_uri.clone()); + with.insert("catalog.jdbc.user".to_string(), meta_store_user.clone()); + with.insert( + "catalog.jdbc.password".to_string(), + meta_store_password.clone(), + ); + with.insert("catalog.name".to_string(), catalog_name.clone()); + with.insert("database.name".to_string(), database_name.clone()); + with.insert("table.name".to_string(), table_name.to_string()); + with.insert("create_table_if_not_exists".to_string(), "true".to_string()); + sink_handler_args.with_options = WithOptions::new_with_options(with); + + let create_source_stmt = CreateSourceStatement { + temporary: false, + if_not_exists: false, + columns: vec![], + source_name: ObjectName::from(vec![Ident::from( + (ICEBERG_SOURCE_PREFIX.to_string() + &table_name.real_value()).as_str(), + )]), + wildcard_idx: None, + constraints: vec![], + with_properties: WithProperties(vec![]), + source_schema: CompatibleSourceSchema::V2(ConnectorSchema::none()), + source_watermarks: vec![], + include_column_options: vec![], + }; + + let mut source_handler_args = handler_args.clone(); + let mut with = BTreeMap::new(); + with.insert("connector".to_string(), "iceberg".to_string()); + with.insert("catalog.type".to_string(), "jdbc".to_string()); + with.insert("warehouse.path".to_string(), "s3://hummock001".to_string()); + with.insert("s3.endpoint".to_string(), s3_endpoint.clone()); + with.insert("s3.access.key".to_string(), s3_ak.clone()); + with.insert("s3.secret.key".to_string(), s3_sk.clone()); + with.insert("s3.region".to_string(), s3_region.clone()); + with.insert("catalog.uri".to_string(), meta_store_uri.clone()); + with.insert("catalog.jdbc.user".to_string(), meta_store_user.clone()); + with.insert( + "catalog.jdbc.password".to_string(), + meta_store_password.clone(), + ); + with.insert("catalog.name".to_string(), catalog_name.clone()); + with.insert("database.name".to_string(), database_name.clone()); + with.insert("table.name".to_string(), table_name.to_string()); + source_handler_args.with_options = WithOptions::new_with_options(with); + + catalog_writer + .create_table(source, table, graph, job_type) + .await?; + create_sink::handle_create_sink(sink_handler_args, create_sink_stmt).await?; + create_source::handle_create_source(source_handler_args, create_source_stmt).await?; + } + } Ok(PgResponse::empty_result(StatementType::CREATE_TABLE)) } @@ -1332,6 +1528,7 @@ pub async fn generate_stream_graph_for_table( with_version_column: Option, cdc_table_info: Option, new_version_columns: Option>, + engine: Engine, ) -> Result<(StreamFragmentGraph, Table, Option, TableJobType)> { use risingwave_pb::catalog::table::OptionalAssociatedSourceId; @@ -1351,6 +1548,7 @@ pub async fn generate_stream_graph_for_table( on_conflict, with_version_column, vec![], + engine, ) .await?, TableJobType::General, @@ -1367,6 +1565,7 @@ pub async fn generate_stream_graph_for_table( append_only, on_conflict, with_version_column, + engine, )?; ((plan, None, table), TableJobType::General) } @@ -1411,6 +1610,7 @@ pub async fn generate_stream_graph_for_table( database_id, schema_id, original_catalog.id(), + engine, )?; ((plan, None, table), TableJobType::SharedCdcSource) diff --git a/src/frontend/src/handler/create_table_as.rs b/src/frontend/src/handler/create_table_as.rs index 27c527969f9b2..3795fb3d90b62 100644 --- a/src/frontend/src/handler/create_table_as.rs +++ b/src/frontend/src/handler/create_table_as.rs @@ -33,6 +33,7 @@ pub async fn handle_create_as( append_only: bool, on_conflict: Option, with_version_column: Option, + ast_engine: risingwave_sqlparser::ast::Engine, ) -> Result { if column_defs.iter().any(|column| column.data_type.is_some()) { return Err(ErrorCode::InvalidInputSyntax( @@ -40,6 +41,11 @@ pub async fn handle_create_as( ) .into()); } + let engine = match ast_engine { + risingwave_sqlparser::ast::Engine::Hummock => risingwave_common::catalog::Engine::Hummock, + risingwave_sqlparser::ast::Engine::Iceberg => risingwave_common::catalog::Engine::Iceberg, + }; + let session = handler_args.session.clone(); if let Either::Right(resp) = session.check_relation_name_duplicated( @@ -108,6 +114,7 @@ pub async fn handle_create_as( on_conflict, with_version_column, Some(col_id_gen.into_version()), + engine, )?; let graph = build_graph(plan)?; diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index ed22462a66888..680449e17da38 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -81,6 +81,7 @@ async fn do_handle_explain( on_conflict, with_version_column, include_column_options, + risingwave_common::catalog::Engine::Hummock, ) .await?; let context = plan.ctx(); diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 42de3a116d0b6..dd40a4edea19f 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -343,6 +343,7 @@ pub async fn handle( with_version_column, cdc_table_info, include_column_options, + engine, } => { if or_replace { bail_not_implemented!("CREATE OR REPLACE TABLE"); @@ -360,6 +361,7 @@ pub async fn handle( append_only, on_conflict, with_version_column, + engine, ) .await; } @@ -378,6 +380,7 @@ pub async fn handle( with_version_column, cdc_table_info, include_column_options, + engine, ) .await } diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 1fc77046c6d55..fb829de0cf718 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -51,7 +51,7 @@ use plan_expr_rewriter::ConstEvalRewriter; use property::Order; use risingwave_common::bail; use risingwave_common::catalog::{ - ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId, + ColumnCatalog, ColumnDesc, ColumnId, ConflictBehavior, Engine, Field, Schema, TableId, }; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -639,6 +639,7 @@ impl PlanRoot { with_external_source: bool, retention_seconds: Option, cdc_table_id: Option, + engine: Engine, ) -> Result { assert_eq!(self.phase, PlanPhase::Logical); assert_eq!(self.plan.convention(), Convention::Logical); @@ -873,6 +874,7 @@ impl PlanRoot { version, retention_seconds, cdc_table_id, + engine, ) } diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index c78f07f1270e8..9f801301ef592 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -19,7 +19,8 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ - ColumnCatalog, ConflictBehavior, CreateType, StreamJobStatus, TableId, OBJECT_ID_PLACEHOLDER, + ColumnCatalog, ConflictBehavior, CreateType, Engine, StreamJobStatus, TableId, + OBJECT_ID_PLACEHOLDER, }; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; @@ -109,6 +110,7 @@ impl StreamMaterialize { cardinality, retention_seconds, create_type, + Engine::Hummock, )?; Ok(Self::new(input, table)) @@ -134,6 +136,7 @@ impl StreamMaterialize { version: Option, retention_seconds: Option, cdc_table_id: Option, + engine: Engine, ) -> Result { let input = Self::rewrite_input(input, user_distributed_by, TableType::Table)?; @@ -152,6 +155,7 @@ impl StreamMaterialize { Cardinality::unknown(), // unknown cardinality for tables retention_seconds, CreateType::Foreground, + engine, )?; table.cdc_table_id = cdc_table_id; @@ -226,6 +230,7 @@ impl StreamMaterialize { cardinality: Cardinality, retention_seconds: Option, create_type: CreateType, + engine: Engine, ) -> Result { let input = rewritten_input; @@ -284,6 +289,13 @@ impl StreamMaterialize { retention_seconds: retention_seconds.map(|i| i.into()), cdc_table_id: None, vnode_count: None, // will be filled in by the meta service later + engine: match table_type { + TableType::Table => engine, + TableType::MaterializedView | TableType::Index | TableType::Internal => { + assert_eq!(engine, Engine::Hummock); + engine + } + }, }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index d25aee9d20c8b..63f98c63219b0 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -21,7 +21,7 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode}; use risingwave_common::catalog::{ - ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Field, FieldDisplay, Schema, + ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, Field, FieldDisplay, Schema, StreamJobStatus, OBJECT_ID_PLACEHOLDER, }; use risingwave_common::constants::log_store::v2::{ @@ -180,6 +180,7 @@ impl TableCatalogBuilder { retention_seconds: None, cdc_table_id: None, vnode_count: None, // will be filled in by the meta service later + engine: Engine::Hummock, } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index d169874d5b3f2..68e816eda7ad3 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -470,7 +470,7 @@ pub(crate) mod tests { WorkerNodeManager, WorkerNodeSelector, }; use risingwave_common::catalog::{ - ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus, + ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Engine, StreamJobStatus, DEFAULT_SUPER_USER_ID, }; use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping}; @@ -590,6 +590,7 @@ pub(crate) mod tests { created_at_cluster_version: None, cdc_table_id: None, vnode_count: Some(vnode_count), + engine: Engine::Hummock, }; let batch_plan_node: PlanRef = LogicalScan::create( "".to_string(), diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 0a47208ff7351..9cbf9a4a7278e 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -14,7 +14,7 @@ use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER; use risingwave_common::hash::VnodeCountCompat; -use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType}; +use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbEngine, PbTableType}; use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable}; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; @@ -102,6 +102,34 @@ impl From for HandleConflictBehavior { } } +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "String(None)")] +pub enum Engine { + #[sea_orm(string_value = "HUMMOCK")] + Hummock, + #[sea_orm(string_value = "ICEBERG")] + Iceberg, +} + +impl From for PbEngine { + fn from(engine: Engine) -> Self { + match engine { + Engine::Hummock => Self::Hummock, + Engine::Iceberg => Self::Iceberg, + } + } +} + +impl From for Engine { + fn from(engine: PbEngine) -> Self { + match engine { + PbEngine::Hummock => Self::Hummock, + PbEngine::Iceberg => Self::Iceberg, + PbEngine::Unspecified => unreachable!("Unspecified engine"), + } + } +} + #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "table")] pub struct Model { @@ -135,6 +163,7 @@ pub struct Model { pub incoming_sinks: I32Array, pub cdc_table_id: Option, pub vnode_count: i32, + pub engine: Engine, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -210,6 +239,7 @@ impl From for ActiveModel { let table_type = pb_table.table_type(); let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior(); let vnode_count = pb_table.vnode_count(); + let engine = pb_table.engine(); let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER { NotSet @@ -259,6 +289,7 @@ impl From for ActiveModel { incoming_sinks: Set(pb_table.incoming_sinks.into()), cdc_table_id: Set(pb_table.cdc_table_id), vnode_count: Set(vnode_count as _), + engine: Set(engine.into()), } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index bfad20f668bcd..9a44aa2056a80 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -23,7 +23,7 @@ use risingwave_meta_model_v2::{ use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::subscription::PbSubscriptionState; -use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableType}; +use risingwave_pb::catalog::table::{PbEngine, PbOptionalAssociatedSourceId, PbTableType}; use risingwave_pb::catalog::{ PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex, PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable, @@ -164,6 +164,7 @@ impl From> for PbTable { retention_seconds: value.0.retention_seconds.map(|id| id as u32), cdc_table_id: value.0.cdc_table_id, maybe_vnode_count: Some(value.0.vnode_count as _), + engine: PbEngine::from(value.0.engine) as _, } } } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 44624f5a6a0a5..874c3880e704e 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1280,6 +1280,8 @@ pub enum Statement { cdc_table_info: Option, /// `INCLUDE a AS b INCLUDE c` include_column_options: IncludeOption, + + engine: Engine, }, /// CREATE INDEX CreateIndex { @@ -1808,6 +1810,7 @@ impl fmt::Display for Statement { query, cdc_table_info, include_column_options, + engine, } => { // We want to allow the following options // Empty column list, allowed by PostgreSQL: @@ -1873,6 +1876,13 @@ impl fmt::Display for Statement { write!(f, " FROM {}", info.source_name)?; write!(f, " TABLE '{}'", info.external_table_name)?; } + // TODO(nimtable): change the default engine to iceberg + match engine { + Engine::Hummock => {}, + Engine::Iceberg => { + write!(f, " ENGINE = {}", engine)?; + }, + } Ok(()) } Statement::CreateIndex { @@ -2750,6 +2760,22 @@ impl fmt::Display for OnConflict { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum Engine { + Hummock, + Iceberg, +} + +impl fmt::Display for crate::ast::Engine { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + crate::ast::Engine::Hummock => "HUMMOCK", + crate::ast::Engine::Iceberg => "ICEBERG", + }) + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum SetTimeZoneValue { diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 014d100b1f954..0f537fc772d06 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -215,6 +215,7 @@ define_keywords!( END_EXEC = "END-EXEC", END_FRAME, END_PARTITION, + ENGINE, EQUALS, ERROR, ESCAPE, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 061446e588d38..98dd5df046fb1 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2603,6 +2603,21 @@ impl Parser<'_> { None }; + let engine = if self.parse_keyword(Keyword::ENGINE) { + self.expect_token(&Token::Eq)?; + let engine_name = self.parse_object_name()?; + if "iceberg".eq_ignore_ascii_case(&engine_name.real_value()) { + Engine::Iceberg + } else if "hummock".eq_ignore_ascii_case(&engine_name.real_value()) { + Engine::Hummock + } else { + parser_err!("Unsupported engine: {}", engine_name); + } + } else { + // TODO(nimtable): default to hummock, later we can change it to iceberg + Engine::Hummock + }; + Ok(Statement::CreateTable { name: table_name, temporary, @@ -2620,6 +2635,7 @@ impl Parser<'_> { query, cdc_table_info, include_column_options: include_options, + engine, }) } diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index e9326d37dcd8c..22cdcb60c7789 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -447,7 +447,7 @@ mod tests { use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN; - use risingwave_pb::catalog::table::TableType; + use risingwave_pb::catalog::table::{PbEngine, TableType}; use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable}; use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType}; use risingwave_pb::plan_common::PbColumnCatalog; @@ -555,6 +555,7 @@ mod tests { created_at_cluster_version: None, cdc_table_id: None, maybe_vnode_count: None, + engine: PbEngine::Hummock.into(), } } diff --git a/src/tests/sqlsmith/src/lib.rs b/src/tests/sqlsmith/src/lib.rs index 2eb9c59d1bf38..9f34623d557e1 100644 --- a/src/tests/sqlsmith/src/lib.rs +++ b/src/tests/sqlsmith/src/lib.rs @@ -284,6 +284,7 @@ CREATE TABLE t3(v1 int, v2 bool, v3 smallint); query: None, cdc_table_info: None, include_column_options: [], + engine: Hummock, }, CreateTable { or_replace: false, @@ -332,6 +333,7 @@ CREATE TABLE t3(v1 int, v2 bool, v3 smallint); query: None, cdc_table_info: None, include_column_options: [], + engine: Hummock, }, CreateTable { or_replace: false, @@ -391,6 +393,7 @@ CREATE TABLE t3(v1 int, v2 bool, v3 smallint); query: None, cdc_table_info: None, include_column_options: [], + engine: Hummock, }, ], )"#]], @@ -526,6 +529,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY query: None, cdc_table_info: None, include_column_options: [], + engine: Hummock, }, CreateTable { or_replace: false, @@ -581,6 +585,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY query: None, cdc_table_info: None, include_column_options: [], + engine: Hummock, }, CreateTable { or_replace: false, @@ -643,6 +648,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY query: None, cdc_table_info: None, include_column_options: [], + engine: Hummock, }, CreateTable { or_replace: false, @@ -723,6 +729,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY query: None, cdc_table_info: None, include_column_options: [], + engine: Hummock, }, ], )"#]],