Skip to content

Commit

Permalink
change sink into table
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Sep 24, 2024
1 parent 7fd27d4 commit ead91a0
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 7 deletions.
67 changes: 67 additions & 0 deletions e2e_test/sink/sink_into_table/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,73 @@ drop table t_primary_key;
statement ok
drop table t_s3;



# target table append only with primary key

statement ok
create table t_s3 (v1 int, v2 int) append only;

statement ok
insert into t_s3 values (1, 11), (2, 12), (3, 13);

statement ok
create table t_primary_key_append_only (v1 int primary key, v2 int, v3 int default 1000, v4 int as v1 + v2) APPEND ONLY;

statement error
create sink s3 into t_primary_key_append_only as select v1, v2 from t_s3;

statement ok
create sink s3 into t_primary_key_append_only as select v1, v2 from t_s3 with (type = 'append-only');


statement ok
flush;

query IIII rowsort
select * from t_primary_key_append_only order by v1;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16

statement ok
insert into t_s3 values (4, 14), (5, 15), (6, 16);

query IIII rowsort
select * from t_primary_key_append_only order by v1;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22

statement ok
insert into t_primary_key_append_only values (100, 100);

query IIII
select * from t_primary_key_append_only order by v1;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22
100 100 1000 200

statement ok
drop sink s3;

statement ok
drop table t_primary_key_append_only;

statement ok
drop table t_s3;


# multi sinks

statement ok
Expand Down
18 changes: 11 additions & 7 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,19 @@ pub async fn gen_sink_plan(
}
}

let user_defined_primary_key_table =
!(table_catalog.append_only || table_catalog.row_id_index.is_some());
let user_defined_primary_key_table = !table_catalog.row_id_index.is_some();
let sink_is_append_only = sink_catalog.sink_type == SinkType::AppendOnly
|| sink_catalog.sink_type == SinkType::ForceAppendOnly;

if !(user_defined_primary_key_table
|| sink_catalog.sink_type == SinkType::AppendOnly
|| sink_catalog.sink_type == SinkType::ForceAppendOnly)
{
if !user_defined_primary_key_table && !sink_is_append_only {
return Err(RwError::from(ErrorCode::BindError(
"Only append-only sinks can sink to a table without primary keys.".to_string(),
"Only append-only sinks can sink to a table without primary keys. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(),
)));
}

if table_catalog.append_only && !sink_is_append_only {
return Err(RwError::from(ErrorCode::BindError(
"Only append-only sinks can sink to a append only table. please try to add type = 'append-only' in the with option. e.g. create sink s into t as select * from t1 with (type = 'append-only')".to_string(),
)));
}

Expand Down

0 comments on commit ead91a0

Please sign in to comment.