Skip to content

Commit

Permalink
check on encode json
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion committed Sep 6, 2024
1 parent 7f78492 commit 20f6bfa
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 12 deletions.
50 changes: 38 additions & 12 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use risingwave_sqlparser::ast::{
get_delimiter, AstString, ColumnDef, ConnectorSchema, CreateSourceStatement, Encode, Format,
ObjectName, ProtobufSchema, SourceWatermark, TableConstraint,
};
use risingwave_sqlparser::parser::IncludeOption;
use risingwave_sqlparser::parser::{IncludeOption, IncludeOptionItem};
use thiserror_ext::AsReport;

use super::RwPgResponse;
Expand Down Expand Up @@ -595,8 +595,43 @@ fn bind_columns_from_source_for_cdc(
Ok((Some(columns), stream_source_info))
}

// check the additional column compatibility with the format and encode
fn check_additional_column_compatibility(
column_def: &IncludeOptionItem,
source_schema: Option<&ConnectorSchema>,
) -> Result<()> {
// only allow header column have inner field
if column_def.inner_field.is_some()
&& !column_def
.column_type
.real_value()
.eq_ignore_ascii_case("header")
{
return Err(RwError::from(ProtocolError(format!(
"Only header column can have inner field, but got {:?}",
column_def.column_type.real_value(),
))));
}

// Payload column only allowed when encode is JSON
if let Some(schema) = source_schema
&& column_def
.column_type
.real_value()
.eq_ignore_ascii_case("payload")
&& !matches!(schema.row_encode, Encode::Json)
{
return Err(RwError::from(ProtocolError(format!(
"Payload column is only allowed when row encode is JSON, but got {:?}",
schema.row_encode
))));
}
Ok(())
}

/// add connector-spec columns to the end of column catalog
pub fn handle_addition_columns(
source_schema: Option<&ConnectorSchema>,
with_properties: &BTreeMap<String, String>,
mut additional_columns: IncludeOption,
columns: &mut Vec<ColumnCatalog>,
Expand All @@ -620,17 +655,7 @@ pub fn handle_addition_columns(
.unwrap(); // there must be at least one column in the column catalog

while let Some(item) = additional_columns.pop() {
{
// only allow header column have inner field
if item.inner_field.is_some()
&& !item.column_type.real_value().eq_ignore_ascii_case("header")
{
return Err(RwError::from(ProtocolError(format!(
"Only header column can have inner field, but got {:?}",
item.column_type.real_value(),
))));
}
}
check_additional_column_compatibility(&item, source_schema)?;

let data_type_name: Option<String> = item
.header_inner_expect_type
Expand Down Expand Up @@ -1513,6 +1538,7 @@ pub async fn bind_create_source_or_table_with_connector(

// add additional columns before bind pk, because `format upsert` requires the key column
handle_addition_columns(
Some(&source_schema),
&with_properties,
include_column_options,
&mut columns,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table(

// append additional columns to the end
handle_addition_columns(
None,
&connect_properties,
include_column_options,
&mut columns,
Expand Down

0 comments on commit 20f6bfa

Please sign in to comment.