From 20f6bfadbf84205068bf73486ffca7d1118c32f1 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Fri, 6 Sep 2024 17:12:13 +0800 Subject: [PATCH] check on encode json Signed-off-by: tabVersion --- src/frontend/src/handler/create_source.rs | 50 +++++++++++++++++------ src/frontend/src/handler/create_table.rs | 1 + 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 432f814cd4c4..2af7aa8b4b09 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -67,7 +67,7 @@ use risingwave_sqlparser::ast::{ get_delimiter, AstString, ColumnDef, ConnectorSchema, CreateSourceStatement, Encode, Format, ObjectName, ProtobufSchema, SourceWatermark, TableConstraint, }; -use risingwave_sqlparser::parser::IncludeOption; +use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem}; use thiserror_ext::AsReport; use super::RwPgResponse; @@ -595,8 +595,43 @@ fn bind_columns_from_source_for_cdc( Ok((Some(columns), stream_source_info)) } +// check the additional column compatibility with the format and encode +fn check_additional_column_compatibility( + column_def: &IncludeOptionItem, + source_schema: Option<&ConnectorSchema>, +) -> Result<()> { + // only allow header column have inner field + if column_def.inner_field.is_some() + && !column_def + .column_type + .real_value() + .eq_ignore_ascii_case("header") + { + return Err(RwError::from(ProtocolError(format!( + "Only header column can have inner field, but got {:?}", + column_def.column_type.real_value(), + )))); + } + + // Payload column only allowed when encode is JSON + if let Some(schema) = source_schema + && column_def + .column_type + .real_value() + .eq_ignore_ascii_case("payload") + && !matches!(schema.row_encode, Encode::Json) + { + return Err(RwError::from(ProtocolError(format!( + "Payload column is only allowed when row encode is JSON, but got {:?}", + schema.row_encode + )))); + } + Ok(()) +} + /// add connector-spec columns to the end of column catalog pub fn handle_addition_columns( + source_schema: Option<&ConnectorSchema>, with_properties: &BTreeMap, mut additional_columns: IncludeOption, columns: &mut Vec, @@ -620,17 +655,7 @@ pub fn handle_addition_columns( .unwrap(); // there must be at least one column in the column catalog while let Some(item) = additional_columns.pop() { - { - // only allow header column have inner field - if item.inner_field.is_some() - && !item.column_type.real_value().eq_ignore_ascii_case("header") - { - return Err(RwError::from(ProtocolError(format!( - "Only header column can have inner field, but got {:?}", - item.column_type.real_value(), - )))); - } - } + check_additional_column_compatibility(&item, source_schema)?; let data_type_name: Option = item .header_inner_expect_type @@ -1513,6 +1538,7 @@ pub async fn bind_create_source_or_table_with_connector( // add additional columns before bind pk, because `format upsert` requires the key column handle_addition_columns( + Some(&source_schema), &with_properties, include_column_options, &mut columns, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index a10453a43ea4..e2ef048143cd 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -772,6 +772,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( // append additional columns to the end handle_addition_columns( + None, &connect_properties, include_column_options, &mut columns,