Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: move more source tests to source_inline #18870

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/scripts/deterministic-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ download-and-decompress-artifact risingwave_simulation .
chmod +x ./risingwave_simulation

echo "--- Extract data for Kafka"
pushd ./scripts/source/
pushd ./e2e_test/source_legacy/kafka/script
mkdir -p ./test_data
unzip -o test_data.zip -d .
popd
Expand Down Expand Up @@ -39,7 +39,7 @@ echo "--- deterministic simulation e2e, ci-3cn-2fe, batch"
seq "$TEST_NUM" | parallel './risingwave_simulation ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/batch-{}.log && rm $LOGDIR/batch-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, kafka source"
seq "$TEST_NUM" | parallel './risingwave_simulation --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log'
seq "$TEST_NUM" | parallel './risingwave_simulation --kafka-datadir=./e2e_test/source_legacy/kafka/script/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, streaming"
seq "$TEST_NUM" | parallel './risingwave_simulation -j 16 ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/parallel-streaming-{}.log && rm $LOGDIR/parallel-streaming-{}.log'
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,6 @@ echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, kafka source
seq "$TEST_NUM" | parallel './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--kafka-datadir=./scripts/source/test_data \
--kafka-datadir=./e2e_test/source_legacy/kafka/script/test_data \
${EXTRA_ARGS:-} \
./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/recovery-source-{}.log && rm $LOGDIR/recovery-source-{}.log'
./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/recovery-source-{}.log && rm $LOGDIR/recovery-source-{}.log'
4 changes: 2 additions & 2 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ risedev ci-kill
echo "--- e2e, ci-1cn-1fe, nexmark endless"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-1cn-1fe
sqllogictest -p 4566 -d dev './e2e_test/source/nexmark_endless_mvs/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/source/nexmark_endless_sinks/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/source_inline/nexmark_endless_mvs/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/source_inline/nexmark_endless_sinks/*.slt'

echo "--- Kill cluster"
risedev ci-kill
8 changes: 4 additions & 4 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,13 @@ echo "--- Kill cluster"
risedev ci-kill
export RISINGWAVE_CI=true

echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source"
echo "--- e2e, ci-kafka, legacy kafka tests"
export RUST_MIN_STACK=4194304
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-kafka
./scripts/source/prepare_ci_kafka.sh
risedev slt './e2e_test/source/basic/*.slt'
risedev slt './e2e_test/source/basic/old_row_format_syntax/*.slt'
./e2e_test/source_legacy/kafka/script/prepare_ci_kafka.sh
risedev slt './e2e_test/source_legacy/kafka/*.slt'
risedev slt './e2e_test/source_legacy/kafka/old_row_format_syntax/*.slt'

echo "--- Run CH-benCHmark"
risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt'
Expand Down
46 changes: 0 additions & 46 deletions e2e_test/source/basic/nexmark/nexmark_endless_part1.slt.part

This file was deleted.

32 changes: 0 additions & 32 deletions e2e_test/source/basic/ttl_table_with_con.slt

This file was deleted.

6 changes: 3 additions & 3 deletions e2e_test/source_inline/kafka/avro/name_strategy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ create source s1 () with (
# Currently we are abusing this test case to also test data types.

system ok
python3 scripts/source/schema_registry_producer.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" e2e_test/source_inline/kafka/avro/upsert_avro_json "topic" "avro"
python3 e2e_test/source_legacy/kafka/script/schema_registry_producer.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" e2e_test/source_inline/kafka/avro/upsert_avro_json "topic" "avro"

statement ok
CREATE TABLE t_topic ( primary key (rw_key) )
Expand All @@ -44,7 +44,7 @@ FORMAT UPSERT ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');

## topic: upsert_avro_json-record, key subject: string, value subject: CPLM.OBJ_ATTRIBUTE_VALUE
system ok
python3 scripts/source/schema_registry_producer.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" e2e_test/source_inline/kafka/avro/upsert_avro_json "record" "avro"
python3 e2e_test/source_legacy/kafka/script/schema_registry_producer.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" e2e_test/source_inline/kafka/avro/upsert_avro_json "record" "avro"


statement error key\.message
Expand Down Expand Up @@ -80,7 +80,7 @@ create table t_record_format_plain () with (
## key subject: upsert_avro_json-topic-record-string
## value subject: upsert_avro_json-topic-record-CPLM.OBJ_ATTRIBUTE_VALUE
system ok
python3 scripts/source/schema_registry_producer.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" e2e_test/source_inline/kafka/avro/upsert_avro_json "topic-record" "avro"
python3 e2e_test/source_legacy/kafka/script/schema_registry_producer.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" e2e_test/source_inline/kafka/avro/upsert_avro_json "topic-record" "avro"



Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
control substitution on

system ok
rpk topic delete json_timestamptz_handling_mode || true

system ok
rpk topic create json_timestamptz_handling_mode -p 1

system ok
cat <<EOF | rpk topic produce json_timestamptz_handling_mode -f "%v\n"
{"case":"0 number small","payload":{"after":{"case":"0 number small","at":100},"op":"r"}}
{"case":"1 number recent","payload":{"after":{"case":"1 number recent","at":1712800800123456},"op":"r"}}
{"case":"2 string utc","payload":{"after":{"case":"2 string utc","at":"2024-04-11T02:00:00.654321Z"},"op":"r"}}
{"case":"3 string naive","payload":{"after":{"case":"3 string naive","at":"2024-04-11 02:00:00.234321"},"op":"r"}}
EOF

statement error unrecognized
create table t (
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'mili');

Expand All @@ -13,8 +28,7 @@ create table plain_guess (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mod = 'mili');

Expand All @@ -23,8 +37,7 @@ create table plain_milli (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'milli');

Expand All @@ -33,8 +46,7 @@ create table plain_micro (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'micro');

Expand All @@ -43,8 +55,7 @@ create table plain_utc (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_string');

Expand All @@ -53,17 +64,15 @@ create table plain_naive (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_without_suffix');

statement ok
create table debezium_milli (
"case" varchar, at timestamptz, primary key("case"))
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format debezium encode json (timestamptz.handling.mode = 'milli');

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
control substitution on

system ok
rpk topic delete test_temporary_kafka_batch || true

system ok
rpk topic create test_temporary_kafka_batch -p 1

system ok
cat <<EOF | rpk topic produce test_temporary_kafka_batch -f "%v\n"
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
EOF

statement ok
create temporary source s1 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_temporary_kafka_batch',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

Expand Down
47 changes: 47 additions & 0 deletions e2e_test/source_inline/kafka/ttl_table_with_con.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
control substitution on

system ok
rpk topic delete test_ttl_table_with_con || true

system ok
rpk topic create test_ttl_table_with_con -p 1

system ok
cat <<EOF | rpk topic produce test_ttl_table_with_con -f "%v\n"
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
EOF

statement ok
create table t (v1 int, v2 varchar) APPEND ONLY with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_ttl_table_with_con',
scan.startup.mode = 'earliest',
retention_seconds = 5
) FORMAT PLAIN ENCODE JSON;

statement ok
flush;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 1s

query IT rowsort
select * from t
----
1 1
2 22
3 333
4 4444

statement ok
select pg_sleep(10);

query I
select * from t;
----

statement ok
drop table t;
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CREATE TABLE diamonds (
) WITH (
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source/opendal/data',
posix_fs.root = 'e2e_test/source_inline/posix_fs/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');

sleep 10s
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
> [!NOTE]
>
> Please write new tests according to the style in `e2e_test/source_inline`.
> Don't add new tests here.
> DO NOT ADD NEW TESTS HERE!
>
> See the [connector development guide](http://risingwavelabs.github.io/risingwave/connector/intro.html#end-to-end-tests) for more information about how to test.

Test in this directory needs some prior setup.

See also `ci/scripts/e2e-source-test.sh`, and `scripts/source`

## Kafka

`scripts/source/test_data` contains the data. Filename's convention is `<topic_name>.<n_partitions>`.
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ SELECT * FROM tt1;
1 2023-10-23 10:00:00+00:00

statement ok
drop table tt1;
drop table tt1;
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,4 @@ statement ok
drop table s8

statement ok
drop source s9
drop source s9
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
set -e

SCRIPT_PATH="$(cd "$(dirname "$0")" >/dev/null 2>&1 && pwd)"
cd "$SCRIPT_PATH/.." || exit 1
# cwd is /scripts
# SCRIPT_PATH is e2e_test/source_legacy/kafka/script/
# cwd is e2e_test/source_legacy/kafka/

echo "$SCRIPT_PATH"

source ../.risingwave/config/risedev-env
source ../../../.risingwave/config/risedev-env

if [ "$1" == "compress" ]; then
echo "Compress test_data/ into test_data.zip"
cd ./source
cd ./script
zip_file=test_data.zip
if [ -f "$zip_file" ]; then
rm "$zip_file"
Expand All @@ -23,7 +23,7 @@ if [ "$1" == "compress" ]; then
fi

echo "--- Extract data for Kafka"
cd ./source/
cd ./script/
mkdir -p ./test_data/ch_benchmark/
unzip -o test_data.zip -d .
cd ..
Expand Down
Loading
Loading