-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[YAML] - Kafka Proto String schema #29835
[YAML] - Kafka Proto String schema #29835
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @riteshghorse for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
@brucearctor this PR contains the String Proto schema feature for Kafka. There is a test check that fails because of I've tested this with Dataflow as well using different configurations and it seems to be working fine. |
A bit more info about that failed test:
|
I still want to dig more closely into the code, only have superficially skimmed at this point... Rereading @ffernandez92 comments -- this seems likely to be an issue with EnricoMi/publish-unit-test-result-action@v2 ... and not the test itself. So, that's a positive! |
Reminder, please take a look at this pr: @riteshghorse @damondouglas @damondouglas |
Will be curious anyone else's thoughts. As I understand it, this PR is fine - from a code perspective. BUT, introduces an issue due to some of our testing infrastructure. Not that a test 'fails' but rather a limitation in something we rely on. I'm inclined to merge the PR, and then address the limitations in the testing infra afterwards [ if it were to persist ]. Thoughts? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this! I gave a few suggestions, mostly nits on formatting and organization.
...va/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
...va/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
...xtensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
Outdated
Show resolved
Hide resolved
...a/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java
Show resolved
Hide resolved
...a/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
...va/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
} else { | ||
throw new IllegalArgumentException( | ||
"Expecting both descriptorPath and messageName to be non-null."); | ||
"At least a descriptorPath or a proto Schema is required."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More for my understanding - why is a schema provided by the Configuration required here? The other data formats use the schema from the incoming PCollectionRowTuple input to create the schema for the outgoing PCollectionRowTuple output. Can the output Proto schema not be constructed from the input Row schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Providing a separate schema in the Configuration offers flexibility and explicit control over the translation process, particularly when addressing variations in field mapping, data types, nested structures, default values, and schema evolution. Other alternatives, such as using the StorageApiProto, were considered, but this approach could potentially prevent the resulting output from matching the expected Proto schema for the subsequent reader. Another option explored was similar to the approach used in Scio (https://spotify.github.io/scio/io/Protobuf.html#write-protobuf-files), where a wrapper is created. However, this method introduces a layer of abstraction, potentially resulting in the output not precisely aligning with the user's desired schema. I remain open to suggestions for alternative approaches in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting -- I didn't realize a translation function existed from beam row to proto, I imagine there are things around: https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.html
We'd definitely need to understand the translation much more, to ensure sufficiently deterministic. Passing the information explicitly removes all doubt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, in that case I think providing an explicit schema, at least optionally, makes sense. Perhaps adding support for an implicit schema could be provided in a future FR.
...o/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
Outdated
Show resolved
Hide resolved
I agree with this. @damccorm do you have any reservations on merging? |
Where is the failed check? If the test infra is flaky then I agree we shouldn't block on it. If we are turning a meaningful suite permared then I think we should address that before proceeding. Looking at the current pr, I only see the failing Kafka check which looks like it is running into a timeout (maybe its stuck)? We likely should not ignore that |
I added #29964 to address the timeout issue |
Also see --> #29835 (comment)
Seems to be an issue with a limitation on Since an issue filed, it also seems like we can proceed, and see whether this is a persistent or flaky problem, and then prioritize fixing if warranted -- rather than being a blocker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving, assuming the conversation over the failing test is resolved before merging.
Oh I see - this silently failed and the workflow still succeeded. Yeah, I think this is fine to ignore. Its actually not a new issue (e.g. a scheduled run on master ran into this earlier today - https://github.com/apache/beam/actions/runs/7458106150)
Have we actually filed the issue? I don't see one referenced in the comments and couldn't find one |
Merged ... And filed: #29966 ... |
addresses [Feature Request][YAML]: KafkaIO for YAML #28664
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.