Skip to content

Commit

Permalink
e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Mar 12, 2024
1 parent 9b0a7a2 commit b41d537
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 6 deletions.
10 changes: 9 additions & 1 deletion ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,22 @@ sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1

# test different encoding
echo "preparing confluent schema registry"
python3 -m pip install requests confluent-kafka

echo "testing protobuf"
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --create > /dev/null 2>&1
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1

echo "testing avro"
python3 -m pip install requests confluent-kafka
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1
Expand Down
56 changes: 56 additions & 0 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ format plain encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
create table from_kafka_csr_trivial with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf-csr-a',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.registry = 'http://message_queue:8081',
message = 'test.package.MessageA');

statement ok
create table from_kafka_csr_nested with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf-csr-hi',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.registry = 'http://message_queue:8081',
message = 'test.package.MessageH.MessageI');

statement ok
create table into_kafka (
bool_field bool,
Expand Down Expand Up @@ -43,6 +61,26 @@ format plain encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
create sink sink_csr_trivial as select string_field as field_a from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf-csr-a',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.registry = 'http://message_queue:8081',
message = 'test.package.MessageA');

statement ok
create sink sink_csr_nested as select sint32_field as field_i from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf-csr-hi',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.registry = 'http://message_queue:8081',
message = 'test.package.MessageH.MessageI');

sleep 2s

query TTTRRIIIIIITTTI
Expand All @@ -66,6 +104,18 @@ select
t Rising \x6130 3.5 4.25 22 23 24 0 26 27 (1,"") {4,0,4} (1136239445,0) 42
f Wave \x5a4446 1.5 0 11 12 13 14 15 16 (4,foo) {} (0,0) 0

query T
select field_a from from_kafka_csr_trivial order by 1;
----
Rising
Wave

query I
select field_i from from_kafka_csr_nested order by 1;
----
13
24

statement error No such file
create sink sink_err from into_kafka with (
connector = 'kafka',
Expand Down Expand Up @@ -96,6 +146,12 @@ format plain encode protobuf (
schema.location = 's3:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
drop sink sink_csr_nested;

statement ok
drop sink sink_csr_trivial;

statement ok
drop sink sink0;

Expand Down
16 changes: 11 additions & 5 deletions e2e_test/sink/kafka/register_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
def main():
url = sys.argv[1]
subject = sys.argv[2]
with open(sys.argv[3]) as f:
local_path = sys.argv[3]
with open(local_path) as f:
schema_str = f.read()
if 4 < len(sys.argv):
keys = sys.argv[4].split(',')
Expand All @@ -14,11 +15,16 @@ def main():

client = SchemaRegistryClient({"url": url})

if keys:
schema_str = select_keys(schema_str, keys)
if local_path.endswith('.avsc'):
if keys:
schema_str = select_keys(schema_str, keys)
else:
schema_str = remove_unsupported(schema_str)
schema = Schema(schema_str, 'AVRO')
elif local_path.endswith('.proto'):
schema = Schema(schema_str, 'PROTOBUF')
else:
schema_str = remove_unsupported(schema_str)
schema = Schema(schema_str, 'AVRO')
raise ValueError('{} shall end with .avsc or .proto'.format(local_path))
client.register_schema(subject, schema)


Expand Down
39 changes: 39 additions & 0 deletions src/connector/src/test_data/test-index-array.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Example taken from https://docs.confluent.io/platform/7.6/schema-registry/fundamentals/serdes-develop/index.html#wire-format
// `test.package.MessageH.MessageI` `[1, 0]` `2, 1, 0` 0x040200
// `test.package.MessageA.MessageE.MessageG` `[0, 2, 1]` `3, 0, 2, 1` 0x06000402
// `test.package.MessageA` `[0]` `1, 0`/`0` 0x00

syntax = "proto3";
package test.package;

message MessageA {
string field_a = 1;

message MessageB {
double field_b = 1;

message MessageC {
sint32 field_c = 1;
}
}
message MessageD {
sint32 field_d = 1;
}
message MessageE {
string field_e = 1;

message MessageF {
double field_f = 1;
}
message MessageG {
sint32 field_g = 1;
}
}
}
message MessageH {
double field_h = 1;

message MessageI {
sint32 field_i = 1;
}
}

0 comments on commit b41d537

Please sign in to comment.