Skip to content

Commit

Permalink
feat(iceberg): support create table with primary key (#18384)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Sep 26, 2024
1 parent 5e2cfac commit f519992
Show file tree
Hide file tree
Showing 21 changed files with 400 additions and 21 deletions.
9 changes: 9 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,12 @@ message Table {
int32 next_column_id = 2;
}

enum Engine {
ENGINE_UNSPECIFIED = 0;
HUMMOCK = 1;
ICEBERG = 2;
}

uint32 id = 1;
uint32 schema_id = 2;
uint32 database_id = 3;
Expand Down Expand Up @@ -435,6 +441,9 @@ message Table {
// tables and tests. Not to be confused with the global catalog version for
// notification service.
TableVersion version = 100;

// Table Engine, currently only support hummock and iceberg
Engine engine = 200;
}

enum HandleConflictBehavior {
Expand Down
32 changes: 32 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use futures::stream::BoxStream;
pub use internal_table::*;
use parse_display::Display;
pub use physical_table::*;
use risingwave_pb::catalog::table::PbEngine;
use risingwave_pb::catalog::{
CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
StreamJobStatus as PbStreamJobStatus,
Expand Down Expand Up @@ -528,6 +529,37 @@ impl ConflictBehavior {
}
}

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Engine {
/// TODO(nimtable): use iceberg engine as default.
#[default]
Hummock,
Iceberg,
}

impl Engine {
pub fn from_protobuf(engine: &PbEngine) -> Self {
match engine {
PbEngine::Hummock | PbEngine::Unspecified => Engine::Hummock,
PbEngine::Iceberg => Engine::Iceberg,
}
}

pub fn to_protobuf(self) -> PbEngine {
match self {
Engine::Hummock => PbEngine::Hummock,
Engine::Iceberg => PbEngine::Iceberg,
}
}

pub fn debug_to_string(self) -> String {
match self {
Engine::Hummock => "Hummock".to_string(),
Engine::Iceberg => "Iceberg".to_string(),
}
}
}

#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
pub enum StreamJobStatus {
#[default]
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ impl TestCase {
cdc_table_info,
include_column_options,
wildcard_idx,
engine,
..
} => {
let source_schema = source_schema.map(|schema| schema.into_v2_with_warning());
Expand All @@ -453,6 +454,7 @@ impl TestCase {
with_version_column,
cdc_table_info,
include_column_options,
engine,
)
.await?;
}
Expand Down
14 changes: 13 additions & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::{HashMap, HashSet};
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, StreamJobStatus, TableDesc,
ColumnCatalog, ConflictBehavior, CreateType, Engine, Field, Schema, StreamJobStatus, TableDesc,
TableId, TableVersionId,
};
use risingwave_common::hash::VnodeCountCompat;
Expand Down Expand Up @@ -187,8 +187,13 @@ pub struct TableCatalog {
/// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog
/// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build
pub vnode_count: Option<usize>,

pub engine: Engine,
}

pub const ICEBERG_SOURCE_PREFIX: &str = "__iceberg_source_";
pub const ICEBERG_SINK_PREFIX: &str = "__iceberg_sink_";

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum TableType {
/// Tables created by `CREATE TABLE`.
Expand Down Expand Up @@ -458,6 +463,7 @@ impl TableCatalog {
retention_seconds: self.retention_seconds,
cdc_table_id: self.cdc_table_id.clone(),
maybe_vnode_count: self.vnode_count.map(|v| v as _),
engine: self.engine.to_protobuf().into(),
}
}

Expand Down Expand Up @@ -554,6 +560,7 @@ impl From<PbTable> for TableCatalog {
fn from(tb: PbTable) -> Self {
let id = tb.id;
let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
let tb_engine = tb.engine();
let table_type = tb.get_table_type().unwrap();
let stream_job_status = tb
.get_stream_job_status()
Expand Down Expand Up @@ -586,6 +593,7 @@ impl From<PbTable> for TableCatalog {
for idx in &tb.watermark_indices {
watermark_columns.insert(*idx as _);
}
let engine = Engine::from_protobuf(&tb_engine);

Self {
id: id.into(),
Expand Down Expand Up @@ -635,6 +643,7 @@ impl From<PbTable> for TableCatalog {
.collect_vec(),
cdc_table_id: tb.cdc_table_id,
vnode_count: Some(vnode_count), /* from existing (persisted) tables, vnode_count must be set */
engine,
}
}
}
Expand All @@ -658,6 +667,7 @@ mod tests {
use risingwave_common::test_prelude::*;
use risingwave_common::types::*;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::catalog::table::PbEngine;
use risingwave_pb::plan_common::{
AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
};
Expand Down Expand Up @@ -726,6 +736,7 @@ mod tests {
version_column_index: None,
cdc_table_id: None,
maybe_vnode_count: Some(233),
engine: PbEngine::Hummock.into(),
}
.into();

Expand Down Expand Up @@ -790,6 +801,7 @@ mod tests {
version_column_index: None,
cdc_table_id: None,
vnode_count: Some(233),
engine: Engine::Hummock,
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context};
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::ColumnCatalog;
use risingwave_common::catalog::{ColumnCatalog, Engine};
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::{bail, bail_not_implemented};
Expand Down Expand Up @@ -185,12 +185,18 @@ pub async fn get_replace_table_plan(
with_version_column,
wildcard_idx,
cdc_table_info,
engine,
..
} = definition
else {
panic!("unexpected statement type: {:?}", definition);
};

let engine = match engine {
risingwave_sqlparser::ast::Engine::Hummock => Engine::Hummock,
risingwave_sqlparser::ast::Engine::Iceberg => Engine::Iceberg,
};

let (mut graph, table, source, job_type) = generate_stream_graph_for_table(
session,
table_name,
Expand All @@ -207,6 +213,7 @@ pub async fn get_replace_table_plan(
with_version_column,
cdc_table_info,
new_version_columns,
engine,
)
.await?;

Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,12 +673,18 @@ pub(crate) async fn reparse_table_for_sink(
append_only,
on_conflict,
with_version_column,
engine,
..
} = definition
else {
panic!("unexpected statement type: {:?}", definition);
};

let engine = match engine {
risingwave_sqlparser::ast::Engine::Hummock => risingwave_common::catalog::Engine::Hummock,
risingwave_sqlparser::ast::Engine::Iceberg => risingwave_common::catalog::Engine::Iceberg,
};

let (graph, table, source, _) = generate_stream_graph_for_table(
session,
table_name,
Expand All @@ -695,6 +701,7 @@ pub(crate) async fn reparse_table_for_sink(
with_version_column,
None,
None,
engine,
)
.await?;

Expand Down
Loading

0 comments on commit f519992

Please sign in to comment.