From ead91a0a3e0f2872c4d4a5d779486bad20c2dca5 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Tue, 24 Sep 2024 17:08:26 +0800 Subject: [PATCH] change sink into table --- e2e_test/sink/sink_into_table/basic.slt | 67 +++++++++++++++++++++++++ src/frontend/src/handler/create_sink.rs | 18 ++++--- 2 files changed, 78 insertions(+), 7 deletions(-) diff --git a/e2e_test/sink/sink_into_table/basic.slt b/e2e_test/sink/sink_into_table/basic.slt index e2a10d46fbf37..8e93672c083ae 100644 --- a/e2e_test/sink/sink_into_table/basic.slt +++ b/e2e_test/sink/sink_into_table/basic.slt @@ -323,6 +323,73 @@ drop table t_primary_key; statement ok drop table t_s3; + + +# target table append only with primary key + +statement ok +create table t_s3 (v1 int, v2 int) append only; + +statement ok +insert into t_s3 values (1, 11), (2, 12), (3, 13); + +statement ok +create table t_primary_key_append_only (v1 int primary key, v2 int, v3 int default 1000, v4 int as v1 + v2) APPEND ONLY; + +statement error +create sink s3 into t_primary_key_append_only as select v1, v2 from t_s3; + +statement ok +create sink s3 into t_primary_key_append_only as select v1, v2 from t_s3 with (type = 'append-only'); + + +statement ok +flush; + +query IIII rowsort +select * from t_primary_key_append_only order by v1; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 + +statement ok +insert into t_s3 values (4, 14), (5, 15), (6, 16); + +query IIII rowsort +select * from t_primary_key_append_only order by v1; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 14 1000 18 +5 15 1000 20 +6 16 1000 22 + +statement ok +insert into t_primary_key_append_only values (100, 100); + +query IIII +select * from t_primary_key_append_only order by v1; +---- +1 11 1000 12 +2 12 1000 14 +3 13 1000 16 +4 14 1000 18 +5 15 1000 20 +6 16 1000 22 +100 100 1000 200 + +statement ok +drop sink s3; + +statement ok +drop table t_primary_key_append_only; + +statement ok +drop table t_s3; + + # multi sinks statement ok diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index f87fced6b02a2..7486046701a81 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -277,15 +277,19 @@ pub async fn gen_sink_plan( } } - let user_defined_primary_key_table = - !(table_catalog.append_only || table_catalog.row_id_index.is_some()); + let user_defined_primary_key_table = !table_catalog.row_id_index.is_some(); + let sink_is_append_only = sink_catalog.sink_type == SinkType::AppendOnly + || sink_catalog.sink_type == SinkType::ForceAppendOnly; - if !(user_defined_primary_key_table - || sink_catalog.sink_type == SinkType::AppendOnly - || sink_catalog.sink_type == SinkType::ForceAppendOnly) - { + if !user_defined_primary_key_table && !sink_is_append_only { return Err(RwError::from(ErrorCode::BindError( - "Only append-only sinks can sink to a table without primary keys.".to_string(), + "Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), + ))); + } + + if table_catalog.append_only && !sink_is_append_only { + return Err(RwError::from(ErrorCode::BindError( + "Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(), ))); }