-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka SchemaTransform translation #31362
Kafka SchemaTransform translation #31362
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Assigning reviewers. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
R: @chamikaramj |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
…a_schematransform_translation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM.
return SchemaRegistry.createDefault() | ||
.getToRowFunction(KafkaReadSchemaTransformConfiguration.class) | ||
.apply(configuration) | ||
.sorted() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify why this sorting is needed ? Do we need to do this for every implementation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just to keep in line with what TypedSchemaTransformProvider does when producing a config schema:
Line 96 in f67f95c
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase(); |
This is due to the SchemaProvider not always producing a consistent schema (#24361). So we sort to keep it consistent
Do we need to do this for every implementation
Right now unfortunately yes. I'm working on adding some things to SchemaTransform (#30943) to avoid having to copy this everywhere. My hope is this change will make SchemaTransformTranslation sufficient for all and help avoid needing a SchemaTransformTranslation for each IO.
...va/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
Show resolved
Hide resolved
* kafka schematransform translation and tests * cleanup * spotless * address failing tests * switch existing schematransform tests to use Managed API * fix nullness * add some more mappings * fix mapping * typo * more accurate test name * cleanup after merging snake_case PR * spotless
Adding SchemaTransform translation and tests for KafkaIO