Skip to content

Commit

Permalink
refactor(source): move protobuf to codec crate, and refactor tests (#…
Browse files Browse the repository at this point in the history
…18507)

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Sep 13, 2024
1 parent 870bcde commit c219218
Show file tree
Hide file tree
Showing 36 changed files with 1,235 additions and 1,150 deletions.
12 changes: 9 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ risedev ci-kill
echo "--- Prepare data"
cp src/connector/src/test_data/simple-schema.avsc ./avro-simple-schema.avsc
cp src/connector/src/test_data/complex-schema.avsc ./avro-complex-schema.avsc
cp src/connector/src/test_data/complex-schema ./proto-complex-schema
cp src/connector/src/test_data/complex-schema.json ./json-complex-schema


Expand Down
29 changes: 13 additions & 16 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@ set sink_decouple = false;
system ok
rpk topic create test-rw-sink-append-only-protobuf

system ok
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive

statement ok
create table from_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
schema.location = 'file:///risingwave/src/connector/codec/tests/test_data/all-types.pb',
message = 'all_types.AllTypes');

system ok
rpk topic create test-rw-sink-append-only-protobuf-csr-a
Expand Down Expand Up @@ -91,8 +88,8 @@ create sink sink0 from into_kafka with (
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
schema.location = 'file:///risingwave/src/connector/codec/tests/test_data/all-types.pb',
message = 'all_types.AllTypes');

statement ok
create sink sink_csr_trivial as select string_field as field_a from into_kafka with (
Expand Down Expand Up @@ -121,8 +118,8 @@ create sink sink_upsert from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'string_field')
format upsert encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
schema.location = 'file:///risingwave/src/connector/codec/tests/test_data/all-types.pb',
message = 'all_types.AllTypes');
----
db error: ERROR: Failed to run the query

Expand All @@ -140,8 +137,8 @@ create sink sink_upsert from into_kafka with (
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'string_field')
format upsert encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes')
schema.location = 'file:///risingwave/src/connector/codec/tests/test_data/all-types.pb',
message = 'all_types.AllTypes')
key encode text;

# Shall be ignored by force_append_only sinks but processed by upsert sinks.
Expand Down Expand Up @@ -196,7 +193,7 @@ create sink sink_err from into_kafka with (
format plain encode protobuf (
force_append_only = true,
schema.location = 'file:///risingwave/proto-recursiv',
message = 'recursive.AllTypes');
message = 'all_types.AllTypes');

statement error field not in proto
create sink sink_err as select 1 as extra_column with (
Expand All @@ -205,8 +202,8 @@ create sink sink_err as select 1 as extra_column with (
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
schema.location = 'file:///risingwave/src/connector/codec/tests/test_data/all-types.pb',
message = 'all_types.AllTypes');

statement error s3 URL not supported yet
create sink sink_err from into_kafka with (
Expand All @@ -215,8 +212,8 @@ create sink sink_err from into_kafka with (
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 's3:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
schema.location = 's3:///risingwave/src/connector/codec/tests/test_data/all-types.pb',
message = 'all_types.AllTypes');

statement ok
drop table from_kafka cascade;
Expand Down
36 changes: 0 additions & 36 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,6 @@ create table s10 with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE AVRO (schema.location = 'file:///risingwave/avro-complex-schema.avsc', with_deprecated_file_header = true);

statement ok
create table s11 with (
connector = 'kafka',
topic = 'proto_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest')
FORMAT PLAIN ENCODE PROTOBUF (
message = 'test.User',
schema.location = 'file:///risingwave/proto-complex-schema'
);

statement ok
CREATE TABLE s12(
id int,
Expand Down Expand Up @@ -273,17 +262,6 @@ create table s16 (v1 int, v2 varchar) with (
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON

statement ok
create source s17 with (
connector = 'kafka',
topic = 'proto_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest')
FORMAT PLAIN ENCODE PROTOBUF (
message = 'test.User',
schema.location = 'file:///risingwave/proto-complex-schema'
);

statement ok
create source s18 with (
connector = 'kafka',
Expand Down Expand Up @@ -696,11 +674,6 @@ select id, code, timestamp, xfas, contacts, sex from s10;
----
100 abc 1473305798 {"(0,200,10.0.0.1)","(1,400,10.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE

query ITITT
select id, code, timestamp, xfas, contacts, sex from s11;
----
0 abc 1473305798 {"(0,200,127.0.0.1)","(1,400,127.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE

query ITITT
select id, code, timestamp, xfas, contacts, jsonb from s12;
----
Expand Down Expand Up @@ -730,9 +703,6 @@ select count(*) from s16
statement error Not supported: alter source with schema registry
alter source s18 add column v10 int;

statement error Not supported: alter source with schema registry
alter source s17 add column v10 int;

query III rowsort
select * from s21;
----
Expand Down Expand Up @@ -875,9 +845,6 @@ drop table s9
statement ok
drop table s10

statement ok
drop table s11

statement ok
drop table s12

Expand All @@ -893,9 +860,6 @@ drop table s15
statement ok
drop table s16

statement ok
drop source s17

statement ok
drop source s18

Expand Down
27 changes: 0 additions & 27 deletions e2e_test/source/basic/old_row_format_syntax/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,6 @@ create table s10 with (
scan.startup.mode = 'earliest'
) row format avro row schema location 'file:///risingwave/avro-complex-schema.avsc'

statement ok
create table s11 with (
connector = 'kafka',
topic = 'proto_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema'

statement ok
CREATE TABLE s12(
id int,
Expand Down Expand Up @@ -254,14 +246,6 @@ create table s16 (v1 int, v2 varchar) with (
scan.startup.mode = 'latest'
) ROW FORMAT JSON

statement ok
create source s17 with (
connector = 'kafka',
topic = 'proto_c_bin',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) row format protobuf message 'test.User' row schema location 'file:///risingwave/proto-complex-schema'

statement error without schema registry
create source s18 with (
connector = 'kafka',
Expand Down Expand Up @@ -570,11 +554,6 @@ select id, first_name, last_name, email from s8_no_schema_field;
# ----
# 100 abc 1473305798 {"(0,200,10.0.0.1)","(1,400,10.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE

query ITITT
select id, code, timestamp, xfas, contacts, sex from s11;
----
0 abc 1473305798 {"(0,200,127.0.0.1)","(1,400,127.0.0.2)"} ("{1xxx,2xxx}","{1xxx,2xxx}") MALE

query ITITT
select id, code, timestamp, xfas, contacts, jsonb from s12;
----
Expand Down Expand Up @@ -712,9 +691,6 @@ drop table s8_no_schema_field
# statement ok
# drop table s10

statement ok
drop table s11

statement ok
drop table s12

Expand All @@ -730,9 +706,6 @@ drop table s15
statement ok
drop table s16

statement ok
drop source s17

# statement ok
# drop source s18

Expand Down
6 changes: 1 addition & 5 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ prometheus = { version = "0.13", features = ["process"] }
prost = { workspace = true, features = ["no-recursion-limit"] }
prost-reflect = { version = "0.14", features = ["serde"] }
prost-types = "0.13"
protobuf-native = "0.2.2"
pulsar = { version = "6.3", default-features = false, features = [
"tokio-runtime",
"telemetry",
Expand Down Expand Up @@ -194,6 +193,7 @@ assert_matches = "1"
criterion = { workspace = true, features = ["async_tokio", "async"] }
deltalake = { workspace = true, features = ["datafusion"] }
expect-test = "1"
fs-err = "2"
paste = "1"
pretty_assertions = "1"
quote = "1"
Expand All @@ -206,10 +206,6 @@ tracing-subscriber = "0.3"
tracing-test = "0.2"
walkdir = "2"

[build-dependencies]
prost-build = "0.12"
protobuf-src = "1"

[[bench]]
name = "debezium_json_parser"
harness = false
Expand Down
10 changes: 10 additions & 0 deletions src/connector/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ itertools = { workspace = true }
jsonbb = { workspace = true }
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
num-bigint = "0.4"
prost = { workspace = true, features = ["no-recursion-limit"] }
prost-reflect = { version = "0.14", features = ["serde"] }
prost-types = "0.13"
protobuf-native = "0.2.2"
risingwave_common = { workspace = true }
risingwave_pb = { workspace = true }
rust_decimal = "1"
Expand All @@ -37,7 +41,13 @@ tracing = "0.1"

[dev-dependencies]
expect-test = "1"
fs-err = "2"
hex = "0.4"
tokio = { version = "0.2", package = "madsim-tokio" }

[build-dependencies]
prost-build = "0.12"
protobuf-src = "1"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../../workspace-hack" }
Expand Down
6 changes: 3 additions & 3 deletions src/connector/build.rs → src/connector/codec/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@
// limitations under the License.

fn main() {
let proto_dir = "./src/test_data/proto_recursive";
let proto_dir = "./tests/test_data/";

println!("cargo:rerun-if-changed={}", proto_dir);

let proto_files = ["recursive"];
let proto_files = ["recursive", "all-types"];
let protos: Vec<String> = proto_files
.iter()
.map(|f| format!("{}/{}.proto", proto_dir, f))
.collect();
prost_build::Config::new()
.out_dir("./src/parser/protobuf")
.out_dir("./tests/integration_tests/protobuf")
.compile_protos(&protos, &Vec::<String>::new())
.unwrap();

Expand Down
15 changes: 15 additions & 0 deletions src/connector/codec/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod protobuf;
Loading

0 comments on commit c219218

Please sign in to comment.