Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(test): move some tests from source_legacy to source_inline #18894

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ risedev ci-kill
echo "--- e2e, ci-1cn-1fe, nexmark endless"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-1cn-1fe
sqllogictest -p 4566 -d dev './e2e_test/source_legacy/nexmark_endless_mvs/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/source_legacy/nexmark_endless_sinks/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/nexmark_endless_mvs/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/nexmark_endless_sinks/*.slt'

echo "--- Kill cluster"
risedev ci-kill
3 changes: 0 additions & 3 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
export SQLCMDSERVER=sqlserver-server SQLCMDUSER=SA SQLCMDPASSWORD="SomeTestOnly@SA" SQLCMDDBNAME=mydb SQLCMDPORT=1433
risedev slt './e2e_test/source_legacy/cdc_inline/**/*.slt'

echo "--- opendal source test"
risedev slt './e2e_test/source_legacy/opendal/**/*.slt'

echo "--- mysql & postgres cdc validate test"
risedev slt './e2e_test/source_legacy/cdc/cdc.validate.mysql.slt'
risedev slt './e2e_test/source_legacy/cdc/cdc.validate.postgres.slt'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CREATE TABLE diamonds (
) WITH (
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source_legacy/opendal/data',
posix_fs.root = 'e2e_test/source_inline/fs/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');

sleep 10s
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
control substitution on

system ok
rpk topic delete json_timestamptz_handling_mode || true

system ok
rpk topic create json_timestamptz_handling_mode -p 1

system ok
cat <<EOF | rpk topic produce json_timestamptz_handling_mode -f "%v\n"
{"case":"0 number small","payload":{"after":{"case":"0 number small","at":100},"op":"r"}}
{"case":"1 number recent","payload":{"after":{"case":"1 number recent","at":1712800800123456},"op":"r"}}
{"case":"2 string utc","payload":{"after":{"case":"2 string utc","at":"2024-04-11T02:00:00.654321Z"},"op":"r"}}
{"case":"3 string naive","payload":{"after":{"case":"3 string naive","at":"2024-04-11 02:00:00.234321"},"op":"r"}}
EOF

statement error unrecognized
create table t (
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'mili');

Expand All @@ -13,8 +28,7 @@ create table plain_guess (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mod = 'mili');

Expand All @@ -23,8 +37,7 @@ create table plain_milli (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'milli');

Expand All @@ -33,8 +46,7 @@ create table plain_micro (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'micro');

Expand All @@ -43,8 +55,7 @@ create table plain_utc (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_string');

Expand All @@ -53,17 +64,15 @@ create table plain_naive (
"case" varchar,
payload struct<after struct<"case" varchar, at timestamptz>>)
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format plain encode json (timestamptz.handling.mode = 'utc_without_suffix');

statement ok
create table debezium_milli (
"case" varchar, at timestamptz, primary key("case"))
with (
connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'json_timestamptz_handling_mode')
format debezium encode json (timestamptz.handling.mode = 'milli');

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
control substitution on

system ok
rpk topic delete test_temporary_kafka_batch || true

system ok
rpk topic create test_temporary_kafka_batch -p 1

system ok
cat <<EOF | rpk topic produce test_temporary_kafka_batch -f "%v\n"
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
EOF

statement ok
create temporary source s1 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_temporary_kafka_batch',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
control substitution on

system ok
rpk topic delete test_ttl_table_with_con || true

system ok
rpk topic create test_ttl_table_with_con -p 1

system ok
cat <<EOF | rpk topic produce test_ttl_table_with_con -f "%v\n"
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
EOF

statement ok
create table t (v1 int, v2 varchar) APPEND ONLY with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_ttl_table_with_con',
scan.startup.mode = 'earliest',
retention_seconds = 5
) FORMAT PLAIN ENCODE JSON;
Expand Down
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead test file

This file was deleted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to inline style

This file was deleted.

1 change: 0 additions & 1 deletion e2e_test/source_legacy/basic/scripts/test_data/weiling.1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead test file

This file was deleted.

2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl BackfillStage {
vis
}

/// Updates backfill states and returns whether the row from upstream `SourceExecutor` is visible.
/// Updates backfill states and returns whether the row backfilled from external system is visible.
fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
let state = self.states.get_mut(split_id).unwrap();
match state {
Expand Down
Loading