-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Propagate gcs-connector options to GcsUtil #32769
Propagate gcs-connector options to GcsUtil #32769
Conversation
try { | ||
// Check if gcs-connector-hadoop is loaded into classpath | ||
Class.forName("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration"); | ||
Configuration config = new Configuration(); | ||
return GoogleCloudStorageReadOptions.builder() | ||
.setFastFailOnNotFound( | ||
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FAST_FAIL_ON_NOT_FOUND_ENABLE | ||
.get(config, config::getBoolean)) | ||
.setSupportGzipEncoding( | ||
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE | ||
.get(config, config::getBoolean)) | ||
.setInplaceSeekLimit( | ||
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get( | ||
config, config::getLong)) | ||
.setFadvise( | ||
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FADVISE.get( | ||
config, config::getEnum)) | ||
.setMinRangeRequestSize( | ||
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get( | ||
config, config::getInt)) | ||
.setGrpcChecksumsEnabled( | ||
GoogleHadoopFileSystemConfiguration.GCS_GRPC_CHECKSUMS_ENABLE.get( | ||
config, config::getBoolean)) | ||
.setGrpcReadTimeoutMillis( | ||
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_TIMEOUT_MS.get( | ||
config, config::getLong)) | ||
.setGrpcReadMessageTimeoutMillis( | ||
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.get( | ||
config, config::getLong)) | ||
.setGrpcReadMetadataTimeoutMillis( | ||
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_METADATA_TIMEOUT_MS.get( | ||
config, config::getLong)) | ||
.setGrpcReadZeroCopyEnabled( | ||
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_ZEROCOPY_ENABLE.get( | ||
config, config::getBoolean)) | ||
.setTraceLogEnabled( | ||
GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_ENABLE.get( | ||
config, config::getBoolean)) | ||
.setTraceLogTimeThreshold( | ||
GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_TIME_THRESHOLD_MS.get( | ||
config, config::getLong)) | ||
.build(); | ||
} catch (ClassNotFoundException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy-pasted from here: https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/v2.2.25/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java#L656-L677
I think we could make a case to make that method public in a future release so we're not pulling in Hadoop explicitly here.
Or, I could omit this if/else branch entirely and always return GoogleCloudStorageReadOptions.DEFAULT
, and leave it up to the user to supply a GoogleCloudStorageReadOptions
instance (thus passing the Hadoop dependency down to the user-end).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, I could omit this if/else branch entirely and always return GoogleCloudStorageReadOptions.DEFAULT, and leave it up to the user to supply a GoogleCloudStorageReadOptions instance (thus passing the Hadoop dependency down to the user-end).
I think this would be preferable to avoid having to pull in the other packages but asking for some other Beam maintainers more familiar with dep management etc to take a look as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @scwhittle. It is also easier for maintainers and if new options are added, we don't need to change the code to support that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good! I removed the Hadoop dep and this parsing block; this leaves it up to the user to pass in a GoogleCloudStorageReadOptions
constructed however they prefer 👍
@@ -55,6 +55,8 @@ public void testGcpCoreApiSurface() throws Exception { | |||
classesInPackage("com.google.api.services.storage"), | |||
classesInPackage("com.google.auth"), | |||
classesInPackage("com.fasterxml.jackson.annotation"), | |||
classesInPackage("com.google.cloud.hadoop.gcsio"), | |||
classesInPackage("com.google.common.collect"), // via GoogleCloudStorageReadOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unfortunate 😓 GoogleCloudStorageReadOptions has one property that's strictly an ImmutableSet: https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/v2.2.25/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java#L519
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @damondouglas for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
assigned reviewers since at first glance, the GHA failures did not look related/might be transient? could be wrong though. |
bumping this PR -- cc @scwhittle since I saw you recently made changes to GcsUtil? |
try { | ||
// Check if gcs-connector-hadoop is loaded into classpath | ||
Class.forName("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration"); | ||
Configuration config = new Configuration(); | ||
return GoogleCloudStorageReadOptions.builder() | ||
.setFastFailOnNotFound( | ||
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FAST_FAIL_ON_NOT_FOUND_ENABLE | ||
.get(config, config::getBoolean)) | ||
.setSupportGzipEncoding( | ||
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE | ||
.get(config, config::getBoolean)) | ||
.setInplaceSeekLimit( | ||
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get( | ||
config, config::getLong)) | ||
.setFadvise( | ||
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FADVISE.get( | ||
config, config::getEnum)) | ||
.setMinRangeRequestSize( | ||
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get( | ||
config, config::getInt)) | ||
.setGrpcChecksumsEnabled( | ||
GoogleHadoopFileSystemConfiguration.GCS_GRPC_CHECKSUMS_ENABLE.get( | ||
config, config::getBoolean)) | ||
.setGrpcReadTimeoutMillis( | ||
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_TIMEOUT_MS.get( | ||
config, config::getLong)) | ||
.setGrpcReadMessageTimeoutMillis( | ||
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.get( | ||
config, config::getLong)) | ||
.setGrpcReadMetadataTimeoutMillis( | ||
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_METADATA_TIMEOUT_MS.get( | ||
config, config::getLong)) | ||
.setGrpcReadZeroCopyEnabled( | ||
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_ZEROCOPY_ENABLE.get( | ||
config, config::getBoolean)) | ||
.setTraceLogEnabled( | ||
GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_ENABLE.get( | ||
config, config::getBoolean)) | ||
.setTraceLogTimeThreshold( | ||
GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_TIME_THRESHOLD_MS.get( | ||
config, config::getLong)) | ||
.build(); | ||
} catch (ClassNotFoundException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, I could omit this if/else branch entirely and always return GoogleCloudStorageReadOptions.DEFAULT, and leave it up to the user to supply a GoogleCloudStorageReadOptions instance (thus passing the Hadoop dependency down to the user-end).
I think this would be preferable to avoid having to pull in the other packages but asking for some other Beam maintainers more familiar with dep management etc to take a look as well.
googleCloudStorageOptions = | ||
GoogleCloudStorageOptions.builder() | ||
.setAppName("Beam") | ||
.setReadChannelOptions(this.googleCloudStorageReadOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a bug in GoogleCloudStorageImpl.open
method that it doesn't use these options but you can remove the separate member variable and use googleCloudStorageOptions.getReadChannelOptions()
in open
below below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah GoogleCloudStorageImpl
is set up in a slightly strange way -- it's constructed with a GoogleCloudStorageOptions arg but also accepts a separate GoogleCloudStorageOptions as an argument to open
, while ignoring the former instance variable. Dropping the separate member variable here makes sense, will do 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated!
Reminder, please take a look at this pr: @damondouglas |
R: @shunping (XQ suggested you to help review this) |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
ack. will take a look today |
@shunping , whenever you have a chance I'd appreciate any feedback on this! |
sdks/java/extensions/google-cloud-platform-core/src/test/resources/test-hadoop-conf.xml
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this option. I left some minor comments there, and overall it looks good.
After you finish revising, could you please sync to the current HEAD and so the tests are re-triggered? Thanks!
57c0b50
to
661a826
Compare
Running the failed precommit test again, though the failure seems unrelated to the code change here. |
@@ -55,6 +55,7 @@ public void testGcpCoreApiSurface() throws Exception { | |||
classesInPackage("com.google.api.services.storage"), | |||
classesInPackage("com.google.auth"), | |||
classesInPackage("com.fasterxml.jackson.annotation"), | |||
classesInPackage("com.google.cloud.hadoop.gcsio"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this breaks a test:
https://ge.apache.org/s/5ubkbvlvrjzgi/console-log/task/:sdks:java:extensions:google-cloud-platform-core:test?anchor=5&page=1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for tracking that down! Pushed a fix. one of the precommit tests is still failing though:
org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarnessTest > testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers FAILED
java.lang.AssertionError at GrpcCleanupRule.java:201
Not sure if/how this could be related to my PR
Run Java PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #32769 +/- ##
=========================================
Coverage 57.39% 57.39%
Complexity 1474 1474
=========================================
Files 970 970
Lines 154426 154426
Branches 1076 1076
=========================================
Hits 88637 88637
Misses 63585 63585
Partials 2204 2204 ☔ View full report in Codecov by Sentry. |
Context: I was reading GCS Parquet files via SplittableDoFn and noticed that
ReadableFile#openSeekable
does not propagate any of the gcs-connector options specified in mycore-site.xml
file. Particularly, I wanted to turn off fs.gs.inputstream.fast.fail.on.not.found.enable, which is redundant in a SDF with default empty-match treatment, and tweak fs.gs.inputstream.fadvise. It looks like theseGoogleCloudStorageReadOptions
options need to be set explicitly in GcsUtil, and passed to any GoogleCloudStorage#open calls (see reference).The big downside of this PR is of course, pulling in Hadoop :( The alternative is to manually copy-paste all the Configuration keys manually into
GcsUtil
, which seems harder to maintain. Or, I could omit theGcsReadOptionsFactory
factory logic entirely and leave it 100% up to the user to constructGoogleCloudStorageReadOptions
instances.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.