Skip to content

Commit

Permalink
test: move more Kafka tests to source_inline
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Oct 11, 2024
1 parent 5201a1a commit d4391f0
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 53 deletions.
32 changes: 0 additions & 32 deletions e2e_test/source/basic/ttl_table_with_con.slt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
control substitution on

system ok
rpk topic delete json_timestamptz_handling_mode || true

system ok
rpk topic create json_timestamptz_handling_mode -p 1

system ok
cat <<EOF | rpk topic produce json_timestamptz_handling_mode -f "%v\n"
{"case":"0 number small","payloade2e_test/source_inline/kafka/handling_mode.slt":{"after":{"case":"0 number small","at":100},"op":"r"}}
{"case":"1 number recent","payload":{"after":{"case":"1 number recent","at":1712800800123456},"op":"r"}}
{"case":"2 string utc","payload":{"after":{"case":"2 string utc","at":"2024-04-11T02:00:00.654321Z"},"op":"r"}}
{"case":"3 string naive","payload":{"after":{"case":"3 string naive","at":"2024-04-11 02:00:00.234321"},"op":"r"}}
EOF

statement error unrecognized
create table t (
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'mili');

Expand All @@ -13,8 +28,7 @@ create table plain_guess (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mod = 'mili');

Expand All @@ -23,8 +37,7 @@ create table plain_milli (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'milli');

Expand All @@ -33,8 +46,7 @@ create table plain_micro (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'micro');

Expand All @@ -43,8 +55,7 @@ create table plain_utc (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_string');

Expand All @@ -53,17 +64,15 @@ create table plain_naive (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_without_suffix');

statement ok
create table debezium_milli (
"case" varchar, at timestamptz, primary key("case"))
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format debezium encode json (timestamptz.handling.mode = 'milli');

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
control substitution on

system ok
rpk topic delete test_temporary_kafka_batch || true

system ok
rpk topic create test_temporary_kafka_batch -p 1

system ok
cat <<EOF | rpk topic produce test_temporary_kafka_batch -f "%v\n"
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
EOF

statement ok
create temporary source s1 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_temporary_kafka_batch',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

Expand Down
47 changes: 47 additions & 0 deletions e2e_test/source_inline/kafka/ttl_table_with_con.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
control substitution on

system ok
rpk topic delete test_ttl_table_with_con || true

system ok
rpk topic create test_ttl_table_with_con -p 1

system ok
cat <<EOF | rpk topic produce test_ttl_table_with_con -f "%v\n"
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
EOF

statement ok
create table t (v1 int, v2 varchar) APPEND ONLY with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_ttl_table_with_con',
scan.startup.mode = 'earliest',
retention_seconds = 5
) FORMAT PLAIN ENCODE JSON;

statement ok
flush;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 1s

query IT rowsort
select * from t
----
1 1
2 22
3 333
4 4444

statement ok
select pg_sleep(10);

query I
select * from t;
----

statement ok
drop table t;
4 changes: 0 additions & 4 deletions scripts/source/test_data/json_timestamptz_handling_mode.1

This file was deleted.

0 comments on commit d4391f0

Please sign in to comment.