-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[YAML] Kafka Read Provider #28865
[YAML] Kafka Read Provider #28865
Conversation
Codecov Report
@@ Coverage Diff @@
## master #28865 +/- ##
===========================================
- Coverage 72.23% 38.36% -33.87%
===========================================
Files 684 686 +2
Lines 101241 101673 +432
===========================================
- Hits 73129 39010 -34119
- Misses 26535 61088 +34553
+ Partials 1577 1575 -2
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 307 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Assigning reviewers. If you would like to opt out of this review, comment R: @riteshghorse for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
R: @brucearctor |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
public static final String VALID_FORMATS_STR = "raw,avro,json"; | ||
public static final Set<String> VALID_DATA_FORMATS = | ||
Sets.newHashSet(VALID_FORMATS_STR.split(",")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wanting to look more at this:
- Is there a reason for the convention of UPPERCASE, vs lower?
- Why use a string that we need to then split on the subsequent line, instead of adding to a new set directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unless that is more of a java convention to not have to constuct the Hash/Set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context on the first point --> I'm used to FINALS being UPPERCASE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I tried to match:
beam/sdks/python/apache_beam/yaml/yaml_io.py
Line 195 in 7531501
- raw: Produces records with a single `payload` field whose contents |
Regarding:
unless that is more of a java convention to not have to constuct the Hash/Set.
I'll do some research but I don't think there is any. I imagine the reason why it was made this way was to prevent misalignments between the print of Valid data formats
and the Set of formats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to ensuring the two remain consistent (though one could just as easily join for the error message).
assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat) | ||
: "Valid data formats are " + VALID_DATA_FORMATS; | ||
assert dataFormat == null || isValidDataFormat(dataFormat) | ||
: "Valid data formats are " + VALID_FORMATS_STR; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBD on returning STR vs the SET. I see pros/cons, so this is mostly to call out the thought process and choice/difference than a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On rereading, I guess I don't see the value of the changes in this file other than adding RAW on line 42? Unless directly creating a set.
private boolean isValidDataFormat(String dataFormat) { | ||
// Convert the input dataFormat to lowercase for case-insensitive comparison | ||
String lowercaseDataFormat = dataFormat.toLowerCase(); | ||
return VALID_DATA_FORMATS.contains(lowercaseDataFormat); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why allow case insensitivity? I'd err towards being explicit [ and sensitive on case ]. But, curious thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I actually considered that too. But, you know, I'm not entirely sure if someone else might be using it through the expansion service. So, my thought was to make it compatible with older versions and not cause any disruptions. That's why I went for the case-insensitive approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd actually aim to err for one or the other [ either upper or lower ]. Else, we need to really be careful and dig in here to ensure there isn't a case where it passes partly, and then doesn't fail. I am not looking at code this second, but would there be problems with 'RaW', or 'jSOn' as the data format. It would pass that check, but what if that value is piped through elsewhere.
String format = configuration.getFormat(); | ||
|
||
if (format == null || format.isEmpty()) { | ||
format = "raw"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to add some sort of log for the user, so that they know that RAW wound up auto-set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we think that RAW is the ideal default?
I am thinking ahead to a future where we might be able to pickup information from the schema shared to then choose to set the format. That's probably outside the scope of this PR, but to think of a future where that is possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should require the user to set this. In the future we may want to pull the appropriate schema from metadata (in that case we could add an explicit "infer" value if we wanted, but if there is a default that should be it). We should leave ourselves options--we can always set a default later if we really want but we can't retract one.
"To read from Kafka in raw format, you can't provide a schema."); | ||
} | ||
Schema rawSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build(); | ||
SerializableFunction<byte[], Row> valueMapper = getRawBytesToRowFunction(rawSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wanting to wrap my head around why we'd have bytes with row. Row tends to imply schemas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as before:
beam/sdks/python/apache_beam/yaml/yaml_io.py
Line 195 in 7531501
- raw: Produces records with a single `payload` field whose contents |
It does have an schema is just that this schema only has one attribute which is payload
and then the values are raw bytes.
I do agree that this feature might not see widespread use. Ideally, users should utilize schemas for data processing. However, there could be scenarios where this feature comes in handy. For instance, imagine you have events in a format like a,b,c in Kafka, resembling CSV data. In such cases, you might receive raw bytes in the payload, and it becomes your responsibility to parse them downstream in the correct way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Ideally users should be publishing schema'd data, but technically Kafka is just a plumber of bytes and doesn't otherwise impose restrictions. This allows users to at least get at the data and do what they want with it. (Also, perhaps they're using a schema we don't support yet, like protobuf or Cap'n Proto or something.)
'confluent_schema_registry_url': 'confluentSchemaRegistryUrl' | ||
'confluent_schema_registry_subject': 'confluentSchemaRegistrySubject' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need the naming of confluent_schema_registry_...
? what if using a different schema registry, like redpanda's?
assert Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl()) | ||
: "To read from Kafka, a schema must be provided directly or though Confluent " | ||
+ "Schema Registry, but not both."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, getConfluentSchemaRegistry ... So we might just use that more generically, OR we might look [ not now ] to refactor that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, that was my idea here. Again, it's a bit uncertain to me if this isn't used by someone somewhere, so I think we still want to support that functionality. That said, though, it makes total sense to include other schema registry technologies as we keep refactoring this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My rule of thumb: If there isn't a test then it isn't important or nobody is using. So, lack of a test tells me effectively that nobody is using [ otherwise, if they found it important they would have written a test ].
Please keep this in mind, as it also helps reinforce the importance of writing tests. We need tests in place to ensure others do not break things we intend to use.
Naturally, we need to be careful, but tests help us with that.
Run Python_Coverage PreCommit |
@robertwb -- I believe case sensitivity is important [ have seen that cause more problems and things harder to debug, than the user benefits of being insensitive, but ... ]. Is it time to choose whether we want YAML to use generally upper or lower case? We probably want go-to conventions. ^ currently uppercase beam/sdks/python/apache_beam/yaml/yaml_io.py Line 140 in 7531501
^ wants to use lowercase I'll look in these a little deeper, in case we want to map upper to lower, etc ... but thought this might be worth calling out [ unless I'm just missing a connection in my mental model here ]. Related: I don't recall seeing this in the syntax doc. https://docs.google.com/document/d/10tzBd6yeElucqLN07OI8MguSQYqvRCYhqgVIChjn67w/edit Thoughts? |
Yeah, case sensitivity is a messy question. Here we're trying to represent an enum value (so there's no risk that Generally the conventions we have so far are that yaml keys (type, name, input, config, transforms, etc.) are (case-sensitive) snake_case (though preferring to go with one word), transform types are (again case-sensitive) CamelCase, and config parameters (like other keys) are also snake_case. For enum-style values, the one president we have is the "language" parameter for MapToFields (Filter, etc.) which is lower case (java, python, javacript, etc.) My prefernce would be to be consistent between that and this format parameter if we require specific casing there. I could see this boiling down to a matter of preference though. Perhaps something to ask on the list? |
Run Python_Runners PreCommit |
Run Python_Coverage PreCommit |
Run Python PreCommit |
if (format != null && format.equals("RAW")) { | ||
if (inputSchema != null) { | ||
throw new IllegalArgumentException( | ||
"To read from Kafka in raw format, you can't provide a schema."); | ||
"To read from Kafka in RAW format, you can't provide a schema."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there other error scenarios that we should include? I'm trying to think about this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't think of any other but I'm open to suggestions
@@ -120,7 +120,6 @@ public void testBuildTransformWithAvroSchema() { | |||
KafkaReadSchemaTransformConfiguration.builder() | |||
.setTopic("anytopic") | |||
.setBootstrapServers("anybootstrap") | |||
.setFormat("avro") | |||
.setSchema(AVRO_SCHEMA) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is .setSchema sufficient [ if AVRO_SCHEMA ], that we don't need AVRO in setFormat? [ I probably need to read this class to see if this gets auto set ]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, is the expectation that this test will now fail without setFormat?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original test didn't have the setFormat so I let the test like it was before to ensure compatibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand this ... since the diff of the PR shows removing the line. BUT, since tests aren't failing, I guess this isn't causing a problem. Would like to add even more test cases, BUT, I don't think required or needed to hold things up.
I would like to understand the value or importance of the codecov workflows here. I think it's comparing things are not even affected by this PR so it's confusing to me why those fail. I wonder if it has to do with the fact that my "other" files (the ones I haven't changed) have x% diff from master but hard to understand how that can be used or how it's preventing this to merge the PR |
CodeCov seems broken -- it is failing, but this did not have that much change in test coverage, so I'm going to ignore those for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think fine for now.
Let's continue to think about [ and not hesitate to ] adding tests.
Next up: proto. other ...
@ffernandez92 Congrats on your first PR into Beam!
It appears that CodeCov is comparing against an old commit [ from when originally forked ] ... it didn't seem worth digging into understand that further, for now. If recurs [ due to long open PR ], I suggest we figure out how to get CodeCov to compare PR commit against current main |
addresses #28664
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.