-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Re-add iceberg bounded source; test splitting #30805
Conversation
R: @chamikaramj |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
The failure is in the basic reading of data - none is read. I'll try to grok things and see about that. Still interested in commentary on the approaches here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
}; | ||
} | ||
|
||
public static class Write<ElementT, DestinationT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noting that this also has Write stuff but I'm ignoring those for this review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for noticing. I will revert any added write stuff in this PR.
import org.apache.iceberg.data.Record; | ||
import org.checkerframework.checker.nullness.qual.NonNull; | ||
|
||
public class IcebergIO { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we are missing the Read
transform ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do soon. I realize all the tests are a vanilla Read.from(IcebergBoundedSource)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import org.apache.iceberg.io.CloseableIterable; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
public class IcebergBoundedSource extends BoundedSource<Row> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should use SDF but I think it would suffice to add a TODO to convert this to an SDF in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree
TableScan tableScan = table().newScan(); | ||
|
||
if (desiredBundleSizeBytes > 0) { | ||
tableScan = tableScan.option(TableProperties.SPLIT_SIZE, "" + desiredBundleSizeBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String.valueOf(desiredBundleSizeBytes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
break; | ||
case BATCH: | ||
// TODO: Add batch scan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should fail here to prevent data loss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
InputFilesDecryptor decryptor = | ||
checkStateNotNull(this.decryptor, "decryptor null in adance() - did you call start()?"); | ||
|
||
// This is a lie, but the most expedient way to work with IcebergIO's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update/remove comment ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I added this comment to explain why the type was fake. I reworded it.
@Nullable FileScanTask fileTask = null; | ||
while (!files.isEmpty() && fileTask == null) { | ||
fileTask = files.remove(); | ||
if (fileTask.isDataTask()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we skipping data tasks ? Was this supposed to be if (!fileTask.isDataTask())
?
Seems like DataTasks contain actual data: https://iceberg.apache.org/javadoc/0.11.0/org/apache/iceberg/DataTask.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was obsolete code. The issue is that ScanTask
is a oneof and this is checking a oneof that it is not.
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/CombinedScanReader.java
Outdated
Show resolved
Hide resolved
private Row convert(Record record) { | ||
Row.Builder b = Row.withSchema(schema); | ||
for (int i = 0; i < schema.getFieldCount(); i++) { | ||
// TODO: A lot obviously |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mean ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha it means that this was a fake implementation. I added conversions.
21e0a47
to
177aea2
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #30805 +/- ##
============================================
+ Coverage 70.96% 71.47% +0.51%
============================================
Files 1257 710 -547
Lines 140931 104815 -36116
Branches 4307 0 -4307
============================================
- Hits 100007 74915 -25092
+ Misses 37444 28268 -9176
+ Partials 3480 1632 -1848
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
177aea2
to
e26cacd
Compare
b53e022
to
0b5e391
Compare
Looks like there is something broke in the GHA workflow FYI just so you don't wait for that to go green. I pushed some possible fixes up for that too but I'm not sure if in this case the workflow comes from master or the PR. |
0b5e391
to
37b23f7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
case BATCH: | ||
throw new UnsupportedOperationException("BATCH scan not supported"); | ||
} | ||
return splits; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if the sources produced at line 104 above get re-split by the runner ?
If such sources cannot be re-split, we should have a trivial case where we just return the original source to prevent data loss.
} | ||
|
||
@Override | ||
public BoundedSource<Row> getCurrentSource() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to also implement getFractionConsumed
to support progress reporting in a meaningful way ?
I think it will be very useful for autoscaling when using this with Dataflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be in a future PR since we want to get this in by release cut.
|
||
testPipeline.run(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add a test for splitting using SourceTestUtils.assertSourcesEqualReferenceSource
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaAndRowConversions.java
Show resolved
Hide resolved
19037fb
to
a5b995a
Compare
} | ||
break; | ||
case BATCH: | ||
throw new UnsupportedOperationException("BATCH scan not supported"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also fail for the default path prevent dataloss (or just return the original source if that can be read directly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
LGTM other than handling the re-splitting case above. |
a5b995a
to
a169e6a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test for double-splitting. To make splitting behavior clearer, I factored into two different kinds of sources.
} | ||
break; | ||
case BATCH: | ||
throw new UnsupportedOperationException("BATCH scan not supported"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM.
Currently we have failing in the exhaustive splitting, but I would also like feedback early.
This is exactly #30797 plus one commit, so if you click on just the last commit you should be able to see just the read/source diff
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.