From 350af077ed53d9f5d9553aa6a10662565071053b Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 5 Mar 2024 18:44:44 +0800 Subject: [PATCH] fix sr sink array error fix --- integration_tests/starrocks-sink/README.md | 4 ++-- src/connector/src/sink/starrocks.rs | 14 +++++++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/integration_tests/starrocks-sink/README.md b/integration_tests/starrocks-sink/README.md index eb9d57ef4c260..660fcf962a6d9 100644 --- a/integration_tests/starrocks-sink/README.md +++ b/integration_tests/starrocks-sink/README.md @@ -23,7 +23,7 @@ Run the following queries to create database and table. You also use other starr CREATE database demo; use demo; -CREATE table demo_bhv_table( +CREATE table upsert_table( user_id int, target_id text, event_timestamp_local datetime @@ -46,5 +46,5 @@ We only support `upsert` with starrocks' `PRIMARY KEY` Run the following query ```sql -select user_id, count(*) from demo.demo_bhv_table group by user_id; +select user_id, count(*) from demo.upsert_table group by user_id; ``` diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index c5a0740b0736f..edefaf7aa1201 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -139,7 +139,7 @@ impl StarrocksSink { i.name )) })?; - if !Self::check_and_correct_column_type(&i.data_type, value.to_string())? { + if !Self::check_and_correct_column_type(&i.data_type, value)? { return Err(SinkError::Starrocks(format!( "Column type don't match, column name is {:?}. starrocks type is {:?} risingwave type is {:?} ",i.name,value,i.data_type ))); @@ -150,7 +150,7 @@ impl StarrocksSink { fn check_and_correct_column_type( rw_data_type: &DataType, - starrocks_data_type: String, + starrocks_data_type: &String, ) -> Result { match rw_data_type { risingwave_common::types::DataType::Boolean => { @@ -186,12 +186,16 @@ impl StarrocksSink { risingwave_common::types::DataType::Interval => Err(SinkError::Starrocks( "INTERVAL is not supported for Starrocks sink. Please convert to VARCHAR or other supported types.".to_string(), )), - // todo! Validate the type struct and list risingwave_common::types::DataType::Struct(_) => Err(SinkError::Starrocks( "STRUCT is not supported for Starrocks sink.".to_string(), )), - risingwave_common::types::DataType::List(_) => { - Ok(starrocks_data_type.contains("unknown")) + risingwave_common::types::DataType::List(list) => { + // For compatibility with older versions starrocks + if starrocks_data_type.contains("unknown") { + return Ok(true); + } + let check_result = Self::check_and_correct_column_type(list.as_ref(), starrocks_data_type)?; + Ok(check_result && starrocks_data_type.contains("array")) } risingwave_common::types::DataType::Bytea => Err(SinkError::Starrocks( "BYTEA is not supported for Starrocks sink. Please convert to VARCHAR or other supported types.".to_string(),