Skip to content

Commit

Permalink
feat(iceberg): support position delete for iceberg source (#18579)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Sep 20, 2024
1 parent 57f01de commit 759efe3
Show file tree
Hide file tree
Showing 10 changed files with 531 additions and 136 deletions.
4 changes: 3 additions & 1 deletion ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ poetry run python main.py -t ./test_case/range_partition_append_only.toml
poetry run python main.py -t ./test_case/range_partition_upsert.toml
poetry run python main.py -t ./test_case/append_only_with_checkpoint_interval.toml
poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml
poetry run python main.py -t ./test_case/iceberg_source_eq_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_equality_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml


echo "--- Kill cluster"
Expand Down
101 changes: 101 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_all_delete.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = default(true), so we'll commit every 10s.
statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;

statement ok
CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'upsert',
database.name = 'demo_db',
table.name = 'test_all_delete',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://hummock001/iceberg-data',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
create_table_if_not_exists = 'true',
commit_checkpoint_interval = 5,
primary_key = 'i1,i2',
);

statement ok
INSERT INTO s1 (i1, i2, i3)
SELECT s, s::text, s::text FROM generate_series(1, 10000) s;

statement ok
flush

statement ok
DELETE FROM s1
WHERE i1 IN (
SELECT s
FROM generate_series(1, 10000, 2) s
);

sleep 10s

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3a://hummock001/iceberg-data',
database.name = 'demo_db',
table.name = 'test_all_delete',
);

statement ok
DELETE FROM s1
WHERE i1 IN (
SELECT s
FROM generate_series(1, 10000, 3) s
);

statement ok
flush

sleep 15s

query I
select * from iceberg_t1_source order by i1 limit 5;
----
2 2 2
6 6 6
8 8 8
12 12 12
14 14 14

query I
select * from iceberg_t1_source order by i1 desc limit 5;
----
9998 9998 9998
9996 9996 9996
9992 9992 9992
9990 9990 9990
9986 9986 9986

query I
select count(*) from iceberg_t1_source
----
3333

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_all_delete.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.test_all_delete',
]

slt = 'test_case/iceberg_source_all_delete.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.test_all_delete',
'DROP SCHEMA IF EXISTS demo_db',
]
11 changes: 0 additions & 11 deletions e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = false, so we'll commit every 1s.
statement ok
set sink_decouple = false;

Expand All @@ -15,7 +16,7 @@ CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'upsert',
database.name = 'demo_db',
table.name = 't1',
table.name = 'test_equality_delete',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
Expand Down Expand Up @@ -58,7 +59,7 @@ WITH (
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 't1',
table.name = 'test_equality_delete',
);

query I
Expand Down
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_equality_delete.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.test_equality_delete',
]

slt = 'test_case/iceberg_source_equality_delete.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.test_equality_delete',
'DROP SCHEMA IF EXISTS demo_db',
]
89 changes: 89 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_position_delete.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = default(true), so we'll commit every 10s.
statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;

statement ok
CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'upsert',
database.name = 'demo_db',
table.name = 'test_position_delete',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
create_table_if_not_exists = 'true',
commit_checkpoint_interval = 5,
primary_key = 'i1,i2',
);

statement ok
INSERT INTO s1 (i1, i2, i3)
SELECT s, s::text, s::text FROM generate_series(1, 10000) s;

statement ok
flush

statement ok
DELETE FROM s1
WHERE i1 IN (
SELECT s
FROM generate_series(1, 10000, 2) s
);

sleep 15s

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 'test_position_delete',
);

query I
select * from iceberg_t1_source order by i1 limit 5;
----
2 2 2
4 4 4
6 6 6
8 8 8
10 10 10

query I
select * from iceberg_t1_source order by i1 desc limit 5;
----
10000 10000 10000
9998 9998 9998
9996 9996 9996
9994 9994 9994
9992 9992 9992

query I
select count(*) from iceberg_t1_source
----
5000

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_position_delete.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.test_position_delete',
]

slt = 'test_case/iceberg_source_position_delete.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.test_position_delete',
'DROP SCHEMA IF EXISTS demo_db',
]
Loading

0 comments on commit 759efe3

Please sign in to comment.