diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index 1a319975f32ca..b1d1f19c8f54d 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -143,14 +143,22 @@ sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt' ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1 # test different encoding +echo "preparing confluent schema registry" +python3 -m pip install requests confluent-kafka + echo "testing protobuf" cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --create > /dev/null 2>&1 +python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto +python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt' +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --delete > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --delete > /dev/null 2>&1 ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1 echo "testing avro" -python3 -m pip install requests confluent-kafka python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field' ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1 diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt index 15e24bde59e94..4aa14e0840087 100644 --- a/e2e_test/sink/kafka/protobuf.slt +++ b/e2e_test/sink/kafka/protobuf.slt @@ -7,6 +7,24 @@ format plain encode protobuf ( schema.location = 'file:///risingwave/proto-recursive', message = 'recursive.AllTypes'); +statement ok +create table from_kafka_csr_trivial with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf-csr-a', + properties.bootstrap.server = 'message_queue:29092') +format plain encode protobuf ( + schema.registry = 'http://message_queue:8081', + message = 'test.package.MessageA'); + +statement ok +create table from_kafka_csr_nested with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf-csr-hi', + properties.bootstrap.server = 'message_queue:29092') +format plain encode protobuf ( + schema.registry = 'http://message_queue:8081', + message = 'test.package.MessageH.MessageI'); + statement ok create table into_kafka ( bool_field bool, @@ -43,6 +61,26 @@ format plain encode protobuf ( schema.location = 'file:///risingwave/proto-recursive', message = 'recursive.AllTypes'); +statement ok +create sink sink_csr_trivial as select string_field as field_a from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf-csr-a', + properties.bootstrap.server = 'message_queue:29092') +format plain encode protobuf ( + force_append_only = true, + schema.registry = 'http://message_queue:8081', + message = 'test.package.MessageA'); + +statement ok +create sink sink_csr_nested as select sint32_field as field_i from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf-csr-hi', + properties.bootstrap.server = 'message_queue:29092') +format plain encode protobuf ( + force_append_only = true, + schema.registry = 'http://message_queue:8081', + message = 'test.package.MessageH.MessageI'); + sleep 2s query TTTRRIIIIIITTTI @@ -66,6 +104,18 @@ select t Rising \x6130 3.5 4.25 22 23 24 0 26 27 (1,"") {4,0,4} (1136239445,0) 42 f Wave \x5a4446 1.5 0 11 12 13 14 15 16 (4,foo) {} (0,0) 0 +query T +select field_a from from_kafka_csr_trivial order by 1; +---- +Rising +Wave + +query I +select field_i from from_kafka_csr_nested order by 1; +---- +24 +13 + statement error No such file create sink sink_err from into_kafka with ( connector = 'kafka', diff --git a/e2e_test/sink/kafka/register_schema.py b/e2e_test/sink/kafka/register_schema.py index 2606e07bcb89b..7385725ca8d34 100644 --- a/e2e_test/sink/kafka/register_schema.py +++ b/e2e_test/sink/kafka/register_schema.py @@ -5,7 +5,8 @@ def main(): url = sys.argv[1] subject = sys.argv[2] - with open(sys.argv[3]) as f: + local_path = sys.argv[3] + with open(local_path) as f: schema_str = f.read() if 4 < len(sys.argv): keys = sys.argv[4].split(',') @@ -14,11 +15,16 @@ def main(): client = SchemaRegistryClient({"url": url}) - if keys: - schema_str = select_keys(schema_str, keys) + if local_path.endswith('.avsc'): + if keys: + schema_str = select_keys(schema_str, keys) + else: + schema_str = remove_unsupported(schema_str) + schema = Schema(schema_str, 'AVRO') + elif local_path.endswith('.proto'): + schema = Schema(schema_str, 'PROTOBUF') else: - schema_str = remove_unsupported(schema_str) - schema = Schema(schema_str, 'AVRO') + raise ValueError('{} shall end with .avsc or .proto'.format(local_path)) client.register_schema(subject, schema) diff --git a/src/connector/src/test_data/test-index-array.proto b/src/connector/src/test_data/test-index-array.proto new file mode 100644 index 0000000000000..f5119819ce3cb --- /dev/null +++ b/src/connector/src/test_data/test-index-array.proto @@ -0,0 +1,39 @@ +// Example taken from https://docs.confluent.io/platform/7.6/schema-registry/fundamentals/serdes-develop/index.html#wire-format +// `test.package.MessageH.MessageI` `[1, 0]` `2, 1, 0` 0x040200 +// `test.package.MessageA.MessageE.MessageG` `[0, 2, 1]` `3, 0, 2, 1` 0x06000402 +// `test.package.MessageA` `[0]` `1, 0`/`0` 0x00 + +syntax = "proto3"; +package test.package; + +message MessageA { + string field_a = 1; + + message MessageB { + double field_b = 1; + + message MessageC { + sint32 field_c = 1; + } + } + message MessageD { + sint32 field_d = 1; + } + message MessageE { + string field_e = 1; + + message MessageF { + double field_f = 1; + } + message MessageG { + sint32 field_g = 1; + } + } +} +message MessageH { + double field_h = 1; + + message MessageI { + sint32 field_i = 1; + } +}