-
Notifications
You must be signed in to change notification settings - Fork 589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): support encode protobuf
with confluent schema registry
#15546
Conversation
buf.reserve(1 + 4); | ||
buf.put_u8(0); | ||
buf.put_i32(schema_id); | ||
MessageIndexes::from(self.message.descriptor()).encode(&mut buf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be pre-computed and is same for all messages.
56acf85
to
63bddef
Compare
9c94e58
to
12acefe
Compare
7b2c44a
to
b41d537
Compare
b41d537
to
30b9477
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally LGTM, thanks
let schema_location = format_options.get(SCHEMA_LOCATION_KEY); | ||
let schema_registry = format_options.get(SCHEMA_REGISTRY_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we also need to handle the auth for schema registry here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to be an involved process. Leaving it as a separate issue.
https://docs.redpanda.com/23.2/manage/security/authentication/#configure-basic-authentication
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
When creating kafka sink with
format plain encode protobuf
, the schema definition can now come from confluent schema registry, in addition to previously supportedhttp[s]://
andfile://
.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
Support confluent schema registry for kafka sink using
format plain encode protobuf
.The following options are same as in protobuf/avro source and avro sink.
schema.registry
(instead ofschema.location
): requiredschema.registry.username
schema.registry.password
schema.registry.name.strategy
: optional and defaults totopic_name_strategy
The following options are same as protobuf source, but different from avro source/sink:
message
record_name_strategy
ortopic_record_name_strategy
)key.message
format plain
; there's noformat upsert
support)record_name_strategy
ortopic_record_name_strategy
)Example: