-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove Avro-related code from Java SDK "core" module #27851
Remove Avro-related code from Java SDK "core" module #27851
Conversation
@@ -485,8 +486,8 @@ public long getSplitBacklogBytes() { | |||
* The checkpoint for an unbounded {@link CountingSource} is simply the last value produced. The | |||
* associated source object encapsulates the information needed to produce the next value. | |||
*/ | |||
@DefaultCoder(AvroCoder.class) | |||
public static class CounterMark implements UnboundedSource.CheckpointMark { | |||
@DefaultCoder(SerializableCoder.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should not be a breaking change, should it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is certainly a breaking change. Any checkpoint that was serialized and persisted with a previous version of Beam will fail on deserialization once upgrading Beam :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right but I don't think Beam supports a hot upgrade.
I don't see a way how to avoid such change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make a note about this (as well as the larger change) in CHANGES.md.
Serialization isn't guaranteed to be stable either--if we're switching coders should we switch to a custom (or Beam-schema-based) coder here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It worries me a bit that there is no workaround. I suppose we could advise copying the old CountingSource into your own project and using that directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The serialization might not be stable when upgrading a pipeline between Java versions (or, technically, if this class ever gets changed or moved).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, right. I guess a use case for that - it should be a streaming pipeline that was restarted with different (upgraded) version of Beam. Do we commit (Beam) to support this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if it's a solid commitment (e.g. this is itself a breaking change), but we really try hard where feasible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend creating a custom coder which is binary compatible to the Avro based one. This is actually rather trivial, Avro does zigzag varint encoding and the bytes of both longs in CounterMark are just written after another without anything added.
Zigzag encoding is also trivial to implement, or using org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.CodedOutputStream.encodeZigZag64
(and Protobuf will always remain a core dependency):
public static long encodeZigZag64(final long n) {
return (n << 1) ^ (n >> 63);
}
public static long decodeZigZag64(final long n) {
return (n >>> 1) ^ -(n & 1);
}
And varint encoding / decoding is already implemented in org.apache.beam.sdk.util.VarInt
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mosche I think you have a point but it's overkill in this case. I'd stick with SchemaCoder
as Robert suggested.
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java
Outdated
Show resolved
Hide resolved
49832e7
to
0b63672
Compare
0b63672
to
a2e6321
Compare
4a194de
to
57f7e9e
Compare
57f7e9e
to
d981472
Compare
d981472
to
f5ee525
Compare
Test Results 1 257 files +1 111 1 257 suites +1 111 2h 52m 43s ⏱️ + 2h 13m 31s For more details on these failures, see this check. Results for commit f5ee525a. ± Comparison against base commit cf0cf3b. This pull request removes 9 and adds 9105 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
retest this please |
@mosche Could you take a look, please? I think it's ready for review. |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
f5ee525
to
07ce6ce
Compare
Run Java_Pulsar_IO_Direct PreCommit |
07ce6ce
to
46e75c7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of comments @aromanenko-dev
Are you sure enough time has already passed to remove this from core, it hasn't been deprecated for too long?
@@ -485,8 +486,8 @@ public long getSplitBacklogBytes() { | |||
* The checkpoint for an unbounded {@link CountingSource} is simply the last value produced. The | |||
* associated source object encapsulates the information needed to produce the next value. | |||
*/ | |||
@DefaultCoder(AvroCoder.class) | |||
public static class CounterMark implements UnboundedSource.CheckpointMark { | |||
@DefaultCoder(SerializableCoder.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is certainly a breaking change. Any checkpoint that was serialized and persisted with a previous version of Beam will fail on deserialization once upgrading Beam :/
@@ -472,7 +472,7 @@ public void testCoderPrecedence() throws Exception { | |||
assertEquals(SerializableCoder.of(MyValueC.class), registry.getCoder(MyValueC.class)); | |||
} | |||
|
|||
@DefaultCoder(AvroCoder.class) | |||
@DefaultCoder(MockAvroCoder.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use SerializableCoder here if the intention is to just test precedence of coders, any mentioning of Avro will just confuse people.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this class for tests where SerializableCoder
is used implicitly and we need to have another default coder to test a correct coder precedence. So I use MockDefaultCoder
for that.
@@ -39,7 +39,7 @@ public class DefaultCoderTest { | |||
|
|||
@Rule public ExpectedException thrown = ExpectedException.none(); | |||
|
|||
@DefaultCoder(AvroCoder.class) | |||
@DefaultCoder(MockAvroCoder.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename the record class and coder to not contain Avro. Also, I'd recommend to inline the coder impl into the test class to limit its scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed it to MockDefaultCoder
and it's used in other test as well, so let's keep it as a separate file
Regarding whether enough time has passed: I am OK with it. Users have an easy migration path. Thanks for following through with this very difficult change! |
451a504
to
f22a161
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for finishing up this huge effort!
Run Java_Examples_Dataflow_Java11 PreCommit |
Run Java_Examples_Dataflow_Java17 PreCommit |
Run Java_GCP_IO_Direct PreCommit |
Run Java_Examples_Dataflow_Java11 PreCommit |
Seems like all tests passed, failed jobs looks not related. @mosche @robertwb @kennknowles |
f22a161
to
43a1d7d
Compare
The precommit timed out and I can't quite tell but maybe locally try: I don't think this requires cloud. |
run java precommit |
retest this please |
43a1d7d
to
9d175ec
Compare
Run Java_GCP_IO_Direct PreCommit |
Run Java PostCommit |
9d175ec
to
0850a6c
Compare
Run Java PreCommit |
Run Java_Amazon-Web-Services2_IO_Direct PreCommit |
Run Java PostCommit |
Yes, it passed locally |
There are two failing PreCommit actions but I don't think they are related. So, if there are no objections or any other comments, I'd merge this PR. |
I think you can go ahead and merge.
…On Wed, Oct 18, 2023 at 4:41 AM Alexey Romanenko ***@***.***> wrote:
There are two failing PreCommit actions but I don't think they are related.
So, if there are no objections or any other comments, I'd merge this PR.
—
Reply to this email directly, view it on GitHub
<#27851 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AADWVAPMYEHVCIGZTX5VQ2LX76545AVCNFSM6AAAAAA3ELBRNWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTONRYGI3DSNZSGQ>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
🎉 🎉 🎉 🎉 |
This is the final step to remove Avro dependency from Java SDK "core" after it was deprecated in 2.46.0 release.
Closes #25252
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.