Skip to content

Commit

Permalink
feat(sink): support encode protobuf with confluent schema registry (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored and jetjinser committed Mar 14, 2024
1 parent a61404a commit 4b03d00
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 26 deletions.
10 changes: 9 additions & 1 deletion ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,22 @@ sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1

# test different encoding
echo "preparing confluent schema registry"
python3 -m pip install requests confluent-kafka

echo "testing protobuf"
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --create > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --create > /dev/null 2>&1
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1

echo "testing avro"
python3 -m pip install requests confluent-kafka
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1
Expand Down
56 changes: 56 additions & 0 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ format plain encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
create table from_kafka_csr_trivial with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf-csr-a',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.registry = 'http://message_queue:8081',
message = 'test.package.MessageA');

statement ok
create table from_kafka_csr_nested with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf-csr-hi',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
schema.registry = 'http://message_queue:8081',
message = 'test.package.MessageH.MessageI');

statement ok
create table into_kafka (
bool_field bool,
Expand Down Expand Up @@ -43,6 +61,26 @@ format plain encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
create sink sink_csr_trivial as select string_field as field_a from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf-csr-a',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.registry = 'http://message_queue:8081',
message = 'test.package.MessageA');

statement ok
create sink sink_csr_nested as select sint32_field as field_i from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf-csr-hi',
properties.bootstrap.server = 'message_queue:29092')
format plain encode protobuf (
force_append_only = true,
schema.registry = 'http://message_queue:8081',
message = 'test.package.MessageH.MessageI');

sleep 2s

query TTTRRIIIIIITTTI
Expand All @@ -66,6 +104,18 @@ select
t Rising \x6130 3.5 4.25 22 23 24 0 26 27 (1,"") {4,0,4} (1136239445,0) 42
f Wave \x5a4446 1.5 0 11 12 13 14 15 16 (4,foo) {} (0,0) 0

query T
select field_a from from_kafka_csr_trivial order by 1;
----
Rising
Wave

query I
select field_i from from_kafka_csr_nested order by 1;
----
13
24

statement error No such file
create sink sink_err from into_kafka with (
connector = 'kafka',
Expand Down Expand Up @@ -96,6 +146,12 @@ format plain encode protobuf (
schema.location = 's3:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
drop sink sink_csr_nested;

statement ok
drop sink sink_csr_trivial;

statement ok
drop sink sink0;

Expand Down
16 changes: 11 additions & 5 deletions e2e_test/sink/kafka/register_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
def main():
url = sys.argv[1]
subject = sys.argv[2]
with open(sys.argv[3]) as f:
local_path = sys.argv[3]
with open(local_path) as f:
schema_str = f.read()
if 4 < len(sys.argv):
keys = sys.argv[4].split(',')
Expand All @@ -14,11 +15,16 @@ def main():

client = SchemaRegistryClient({"url": url})

if keys:
schema_str = select_keys(schema_str, keys)
if local_path.endswith('.avsc'):
if keys:
schema_str = select_keys(schema_str, keys)
else:
schema_str = remove_unsupported(schema_str)
schema = Schema(schema_str, 'AVRO')
elif local_path.endswith('.proto'):
schema = Schema(schema_str, 'PROTOBUF')
else:
schema_str = remove_unsupported(schema_str)
schema = Schema(schema_str, 'AVRO')
raise ValueError('{} shall end with .avsc or .proto'.format(local_path))
client.register_schema(subject, schema)


Expand Down
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#![expect(dead_code)]
#![allow(clippy::derive_partial_eq_without_eq)]
#![feature(array_chunks)]
#![feature(coroutines)]
#![feature(proc_macro_hygiene)]
#![feature(stmt_expr_attributes)]
Expand Down
49 changes: 41 additions & 8 deletions src/connector/src/schema/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,46 @@ use std::collections::BTreeMap;
use itertools::Itertools as _;
use prost_reflect::{DescriptorPool, FileDescriptor, MessageDescriptor};

use super::loader::LoadedSchema;
use super::loader::{LoadedSchema, SchemaLoader};
use super::schema_registry::Subject;
use super::{
invalid_option_error, InvalidOptionError, SchemaFetchError, MESSAGE_NAME_KEY,
SCHEMA_LOCATION_KEY,
SCHEMA_LOCATION_KEY, SCHEMA_REGISTRY_KEY,
};
use crate::common::AwsAuthProps;
use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties};

/// `aws_auth_props` is only required when reading `s3://` URL.
pub async fn fetch_descriptor(
format_options: &BTreeMap<String, String>,
topic: &str,
aws_auth_props: Option<&AwsAuthProps>,
) -> Result<MessageDescriptor, SchemaFetchError> {
let row_schema_location = format_options
.get(SCHEMA_LOCATION_KEY)
.ok_or_else(|| invalid_option_error!("{SCHEMA_LOCATION_KEY} required"))?
.clone();
) -> Result<(MessageDescriptor, Option<i32>), SchemaFetchError> {
let message_name = format_options
.get(MESSAGE_NAME_KEY)
.ok_or_else(|| invalid_option_error!("{MESSAGE_NAME_KEY} required"))?
.clone();
let schema_location = format_options.get(SCHEMA_LOCATION_KEY);
let schema_registry = format_options.get(SCHEMA_REGISTRY_KEY);
let row_schema_location = match (schema_location, schema_registry) {
(Some(_), Some(_)) => {
return Err(invalid_option_error!(
"cannot use {SCHEMA_LOCATION_KEY} and {SCHEMA_REGISTRY_KEY} together"
)
.into())
}
(None, None) => {
return Err(invalid_option_error!(
"requires one of {SCHEMA_LOCATION_KEY} or {SCHEMA_REGISTRY_KEY}"
)
.into())
}
(None, Some(_)) => {
let (md, sid) = fetch_from_registry(&message_name, format_options, topic).await?;
return Ok((md, Some(sid)));
}
(Some(url), None) => url.clone(),
};

if row_schema_location.starts_with("s3") && aws_auth_props.is_none() {
return Err(invalid_option_error!("s3 URL not supported yet").into());
Expand All @@ -59,7 +77,22 @@ pub async fn fetch_descriptor(
let conf = ProtobufParserConfig::new(enc)
.await
.map_err(SchemaFetchError::YetToMigrate)?;
Ok(conf.message_descriptor)
Ok((conf.message_descriptor, None))
}

pub async fn fetch_from_registry(
message_name: &str,
format_options: &BTreeMap<String, String>,
topic: &str,
) -> Result<(MessageDescriptor, i32), SchemaFetchError> {
let loader = SchemaLoader::from_format_options(topic, format_options)?;

let (vid, vpb) = loader.load_val_schema::<FileDescriptor>().await?;

Ok((
vpb.parent_pool().get_message_by_name(message_name).unwrap(),
vid,
))
}

impl LoadedSchema for FileDescriptor {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/encoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod template;

pub use avro::{AvroEncoder, AvroHeader};
pub use json::JsonEncoder;
pub use proto::ProtoEncoder;
pub use proto::{ProtoEncoder, ProtoHeader};

/// Encode a row of a relation into
/// * an object in json
Expand Down
85 changes: 80 additions & 5 deletions src/connector/src/sink/encoder/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes::Bytes;
use bytes::{BufMut, Bytes};
use prost::Message;
use prost_reflect::{
DynamicMessage, FieldDescriptor, Kind, MessageDescriptor, ReflectMessage, Value,
Expand All @@ -30,13 +30,25 @@ pub struct ProtoEncoder {
schema: Schema,
col_indices: Option<Vec<usize>>,
descriptor: MessageDescriptor,
header: ProtoHeader,
}

#[derive(Debug, Clone, Copy)]
pub enum ProtoHeader {
None,
/// <https://docs.confluent.io/platform/7.5/schema-registry/fundamentals/serdes-develop/index.html#messages-wire-format>
///
/// * 00
/// * 4-byte big-endian schema ID
ConfluentSchemaRegistry(i32),
}

impl ProtoEncoder {
pub fn new(
schema: Schema,
col_indices: Option<Vec<usize>>,
descriptor: MessageDescriptor,
header: ProtoHeader,
) -> SinkResult<Self> {
match &col_indices {
Some(col_indices) => validate_fields(
Expand All @@ -59,12 +71,18 @@ impl ProtoEncoder {
schema,
col_indices,
descriptor,
header,
})
}
}

pub struct ProtoEncoded {
message: DynamicMessage,
header: ProtoHeader,
}

impl RowEncoder for ProtoEncoder {
type Output = DynamicMessage;
type Output = ProtoEncoded;

fn schema(&self) -> &Schema {
&self.schema
Expand All @@ -87,12 +105,68 @@ impl RowEncoder for ProtoEncoder {
&self.descriptor,
)
.map_err(Into::into)
.map(|m| ProtoEncoded {
message: m,
header: self.header,
})
}
}

impl SerTo<Vec<u8>> for DynamicMessage {
impl SerTo<Vec<u8>> for ProtoEncoded {
fn ser_to(self) -> SinkResult<Vec<u8>> {
Ok(self.encode_to_vec())
let mut buf = Vec::new();
match self.header {
ProtoHeader::None => { /* noop */ }
ProtoHeader::ConfluentSchemaRegistry(schema_id) => {
buf.reserve(1 + 4);
buf.put_u8(0);
buf.put_i32(schema_id);
MessageIndexes::from(self.message.descriptor()).encode(&mut buf);
}
}
self.message.encode(&mut buf).unwrap();
Ok(buf)
}
}

struct MessageIndexes(Vec<i32>);

impl MessageIndexes {
fn from(desc: MessageDescriptor) -> Self {
// https://github.com/protocolbuffers/protobuf/blob/v25.1/src/google/protobuf/descriptor.proto
// https://docs.rs/prost-reflect/0.12.0/src/prost_reflect/descriptor/tag.rs.html
// https://docs.rs/prost-reflect/0.12.0/src/prost_reflect/descriptor/build/visit.rs.html#125
// `FileDescriptorProto` field #4 is `repeated DescriptorProto message_type`
const TAG_FILE_MESSAGE: i32 = 4;
// `DescriptorProto` field #3 is `repeated DescriptorProto nested_type`
const TAG_MESSAGE_NESTED: i32 = 3;

let mut indexes = vec![];
let mut path = desc.path().array_chunks();
let &[tag, idx] = path.next().unwrap();
assert_eq!(tag, TAG_FILE_MESSAGE);
indexes.push(idx);
for &[tag, idx] in path {
assert_eq!(tag, TAG_MESSAGE_NESTED);
indexes.push(idx);
}
Self(indexes)
}

fn zig_i32(value: i32, buf: &mut impl BufMut) {
let unsigned = ((value << 1) ^ (value >> 31)) as u32 as u64;
prost::encoding::encode_varint(unsigned, buf);
}

fn encode(&self, buf: &mut impl BufMut) {
if self.0 == [0] {
buf.put_u8(0);
return;
}
Self::zig_i32(self.0.len().try_into().unwrap(), buf);
for &idx in &self.0 {
Self::zig_i32(idx, buf);
}
}
}

Expand Down Expand Up @@ -367,7 +441,8 @@ mod tests {
Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(3))),
]);

let encoder = ProtoEncoder::new(schema, None, descriptor.clone()).unwrap();
let encoder =
ProtoEncoder::new(schema, None, descriptor.clone(), ProtoHeader::None).unwrap();
let m = encoder.encode(row).unwrap();
let encoded: Vec<u8> = m.ser_to().unwrap();
assert_eq!(
Expand Down
Loading

0 comments on commit 4b03d00

Please sign in to comment.