Skip to content

Commit

Permalink
test: move more Kafka tests to source_inline
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Oct 11, 2024
1 parent 9ef40ba commit b03b9a6
Show file tree
Hide file tree
Showing 100 changed files with 120 additions and 136 deletions.
4 changes: 2 additions & 2 deletions ci/scripts/deterministic-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ download-and-decompress-artifact risingwave_simulation .
chmod +x ./risingwave_simulation

echo "--- Extract data for Kafka"
pushd ./scripts/source/
pushd ./e2e_test/source_legacy/kafka/script
mkdir -p ./test_data
unzip -o test_data.zip -d .
popd
Expand Down Expand Up @@ -39,7 +39,7 @@ echo "--- deterministic simulation e2e, ci-3cn-2fe, batch"
seq "$TEST_NUM" | parallel './risingwave_simulation ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/batch-{}.log && rm $LOGDIR/batch-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, kafka source"
seq "$TEST_NUM" | parallel './risingwave_simulation --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log'
seq "$TEST_NUM" | parallel './risingwave_simulation --kafka-datadir=./e2e_test/source_legacy/kafka/script/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, streaming"
seq "$TEST_NUM" | parallel './risingwave_simulation -j 16 ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/parallel-streaming-{}.log && rm $LOGDIR/parallel-streaming-{}.log'
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,6 @@ echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, kafka source
seq "$TEST_NUM" | parallel './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--kafka-datadir=./scripts/source/test_data \
--kafka-datadir=./e2e_test/source_legacy/kafka/script/test_data \
${EXTRA_ARGS:-} \
./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/recovery-source-{}.log && rm $LOGDIR/recovery-source-{}.log'
./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/recovery-source-{}.log && rm $LOGDIR/recovery-source-{}.log'
8 changes: 4 additions & 4 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,13 @@ echo "--- Kill cluster"
risedev ci-kill
export RISINGWAVE_CI=true

echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source"
echo "--- e2e, ci-kafka, legacy kafka tests"
export RUST_MIN_STACK=4194304
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-kafka
./scripts/source/prepare_ci_kafka.sh
risedev slt './e2e_test/source/basic/*.slt'
risedev slt './e2e_test/source/basic/old_row_format_syntax/*.slt'
./e2e_test/source_legacy/kafka/script/prepare_ci_kafka.sh
risedev slt './e2e_test/source_legacy/kafka/*.slt'
risedev slt './e2e_test/source_legacy/kafka/old_row_format_syntax/*.slt'

echo "--- Run CH-benCHmark"
risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt'
Expand Down
46 changes: 0 additions & 46 deletions e2e_test/source/basic/nexmark/nexmark_endless_part1.slt.part

This file was deleted.

32 changes: 0 additions & 32 deletions e2e_test/source/basic/ttl_table_with_con.slt

This file was deleted.

File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
control substitution on

system ok
rpk topic delete json_timestamptz_handling_mode || true

system ok
rpk topic create json_timestamptz_handling_mode -p 1

system ok
cat <<EOF | rpk topic produce json_timestamptz_handling_mode -f "%v\n"
{"case":"0 number small","payloade2e_test/source_inline/kafka/handling_mode.slt":{"after":{"case":"0 number small","at":100},"op":"r"}}
{"case":"1 number recent","payload":{"after":{"case":"1 number recent","at":1712800800123456},"op":"r"}}
{"case":"2 string utc","payload":{"after":{"case":"2 string utc","at":"2024-04-11T02:00:00.654321Z"},"op":"r"}}
{"case":"3 string naive","payload":{"after":{"case":"3 string naive","at":"2024-04-11 02:00:00.234321"},"op":"r"}}
EOF

statement error unrecognized
create table t (
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'mili');

Expand All @@ -13,8 +28,7 @@ create table plain_guess (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mod = 'mili');

Expand All @@ -23,8 +37,7 @@ create table plain_milli (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'milli');

Expand All @@ -33,8 +46,7 @@ create table plain_micro (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'micro');

Expand All @@ -43,8 +55,7 @@ create table plain_utc (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_string');

Expand All @@ -53,17 +64,15 @@ create table plain_naive (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_without_suffix');

statement ok
create table debezium_milli (
"case" varchar, at timestamptz, primary key("case"))
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format debezium encode json (timestamptz.handling.mode = 'milli');

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
control substitution on

system ok
rpk topic delete test_temporary_kafka_batch || true

system ok
rpk topic create test_temporary_kafka_batch -p 1

system ok
cat <<EOF | rpk topic produce test_temporary_kafka_batch -f "%v\n"
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
EOF

statement ok
create temporary source s1 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_temporary_kafka_batch',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

Expand Down
47 changes: 47 additions & 0 deletions e2e_test/source_inline/kafka/ttl_table_with_con.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
control substitution on

system ok
rpk topic delete test_ttl_table_with_con || true

system ok
rpk topic create test_ttl_table_with_con -p 1

system ok
cat <<EOF | rpk topic produce test_ttl_table_with_con -f "%v\n"
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
EOF

statement ok
create table t (v1 int, v2 varchar) APPEND ONLY with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_ttl_table_with_con',
scan.startup.mode = 'earliest',
retention_seconds = 5
) FORMAT PLAIN ENCODE JSON;

statement ok
flush;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 1s

query IT rowsort
select * from t
----
1 1
2 22
3 333
4 4444

statement ok
select pg_sleep(10);

query I
select * from t;
----

statement ok
drop table t;
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CREATE TABLE diamonds (
) WITH (
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source/opendal/data',
posix_fs.root = 'e2e_test/source_inline/posix_fs/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');

sleep 10s
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
> [!NOTE]
>
> Please write new tests according to the style in `e2e_test/source_inline`.
> Don't add new tests here.
> DO NOT ADD NEW TESTS HERE!
>
> See the [connector development guide](http://risingwavelabs.github.io/risingwave/connector/intro.html#end-to-end-tests) for more information about how to test.
Test in this directory needs some prior setup.

See also `ci/scripts/e2e-source-test.sh`, and `scripts/source`

## Kafka

`scripts/source/test_data` contains the data. Filename's convention is `<topic_name>.<n_partitions>`.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ SELECT * FROM tt1;
1 2023-10-23 10:00:00+00:00

statement ok
drop table tt1;
drop table tt1;
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,4 @@ statement ok
drop table s8

statement ok
drop source s9
drop source s9
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
set -e

SCRIPT_PATH="$(cd "$(dirname "$0")" >/dev/null 2>&1 && pwd)"
cd "$SCRIPT_PATH/.." || exit 1
# cwd is /scripts
# SCRIPT_PATH is e2e_test/source_legacy/kafka/script/
# cwd is e2e_test/source_legacy/kafka/

echo "$SCRIPT_PATH"

source ../.risingwave/config/risedev-env
source ../../../.risingwave/config/risedev-env

if [ "$1" == "compress" ]; then
echo "Compress test_data/ into test_data.zip"
cd ./source
cd ./script
zip_file=test_data.zip
if [ -f "$zip_file" ]; then
rm "$zip_file"
Expand All @@ -23,7 +23,7 @@ if [ "$1" == "compress" ]; then
fi

echo "--- Extract data for Kafka"
cd ./source/
cd ./script/
mkdir -p ./test_data/ch_benchmark/
unzip -o test_data.zip -d .
cd ..
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"data":[{"id":"1","name":"mike","is_adult":"0","balance":"1000.62","reg_time":"2018-01-01 00:00:01","win_rate":"0.65"}],"database":"demo","es":1668673394000,"id":5,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(40)","is_adult":"boolean","balance":"decimal(10,2)","reg_time":"timestamp","win_rate":"double"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"is_adult":-6,"balance":3,"reg_time":93,"win_rate":8},"table":"demo","ts":1668673394788,"type":"INSERT"}
{"data":[{"id":"2","name":"alice","is_adult":"1","balance":"2000.62","reg_time":"2020-01-01 00:00:01","win_rate":"0.55"}],"database":"demo","es":1668673417000,"id":6,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(40)","is_adult":"boolean","balance":"decimal(10,2)","reg_time":"timestamp","win_rate":"double"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"is_adult":-6,"balance":3,"reg_time":93,"win_rate":8},"table":"demo","ts":1668673417265,"type":"INSERT"}
{"data":[{"id":"1","name":"mike","is_adult":"0","balance":"1500.62","reg_time":"2018-01-01 00:00:01","win_rate":"0.65"}],"database":"demo","es":1668673476000,"id":7,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(40)","is_adult":"boolean","balance":"decimal(10,2)","reg_time":"timestamp","win_rate":"double"},"old":[{"balance":"1000.62"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"is_adult":-6,"balance":3,"reg_time":93,"win_rate":8},"table":"demo","ts":1668673476732,"type":"UPDATE"}
{"data":[{"id":"2","name":"alice","is_adult":"1","balance":"2000.62","reg_time":"2020-01-01 00:00:01","win_rate":"0.55"}],"database":"demo","es":1668673493000,"id":8,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(40)","is_adult":"boolean","balance":"decimal(10,2)","reg_time":"timestamp","win_rate":"double"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"is_adult":-6,"balance":3,"reg_time":93,"win_rate":8},"table":"demo","ts":1668673493638,"type":"DELETE"}
{"data":[{"id":"2","name":"alice","is_adult":"1","balance":"2000.62","reg_time":"2020-01-01 00:00:01","win_rate":"0.55"}],"database":"demo","es":1668673493000,"id":8,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(40)","is_adult":"boolean","balance":"decimal(10,2)","reg_time":"timestamp","win_rate":"double"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"is_adult":-6,"balance":3,"reg_time":93,"win_rate":8},"table":"demo","ts":1668673493638,"type":"DELETE"}
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"id": 100, "code": "abc", "timestamp": 1473305798, "xfas": [{"device_model_id": 0, "device_make_id": 200, "ip": "10.0.0.1"}, {"device_model_id": 1, "device_make_id": 400, "ip": "10.0.0.2"}], "contacts": {"phones": ["1xxx", "2xxx"], "emails": ["1xxx", "2xxx"]}, "jsonb": {"blockNumber": 16938734}}
{"id": 100, "code": "abc", "timestamp": 1473305798, "xfas": [{"device_model_id": 0, "device_make_id": 200, "ip": "10.0.0.1"}, {"device_model_id": 1, "device_make_id": 400, "ip": "10.0.0.2"}], "contacts": {"phones": ["1xxx", "2xxx"], "emails": ["1xxx", "2xxx"]}, "jsonb": {"blockNumber": 16938734}}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"productId":1,"productName":"An ice sculpture","price":12.5,"tags":["cold","ice"],"dimensions":{"length":7,"width":12,"height":9.5}}
{"productId":1,"productName":"An ice sculpture","price":12.5,"tags":["cold","ice"],"dimensions":{"length":7,"width":12,"height":9.5}}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
{"v1": 4, "v2": "4444"}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
{"v1": 4, "v2": "4444"}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
{"v1": 4, "v2": "4444"}
Loading

0 comments on commit b03b9a6

Please sign in to comment.