diff --git a/CHANGES.md b/CHANGES.md index 3bb490cc83c4..fc32398a7a5a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -62,6 +62,7 @@ ## I/Os +* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)). * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## New Features / Improvements diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 18d637254115..1285b88663e7 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.gcp.options; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; import java.util.concurrent.ExecutorService; import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator; @@ -44,6 +45,15 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline void setGcsUtil(GcsUtil value); + @JsonIgnore + @Description( + "The GoogleCloudStorageReadOptions instance that should be used to read from Google Cloud Storage.") + @Default.InstanceFactory(GcsUtil.GcsReadOptionsFactory.class) + @Hidden + GoogleCloudStorageReadOptions getGoogleCloudStorageReadOptions(); + + void setGoogleCloudStorageReadOptions(GoogleCloudStorageReadOptions value); + /** * The ExecutorService instance to use to create threads, can be overridden to specify an * ExecutorService that is compatible with the user's environment. If unset, the default is to use diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 8d3596f17b3b..d58154132a72 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -123,6 +123,14 @@ public static GcsCountersOptions create( } } + public static class GcsReadOptionsFactory + implements DefaultValueFactory { + @Override + public GoogleCloudStorageReadOptions create(PipelineOptions options) { + return GoogleCloudStorageReadOptions.DEFAULT; + } + } + /** * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using any transport * flags specified on the {@link PipelineOptions}. @@ -153,7 +161,8 @@ public GcsUtil create(PipelineOptions options) { : null, gcsOptions.getEnableBucketWriteMetricCounter() ? gcsOptions.getGcsWriteCounterPrefix() - : null)); + : null), + gcsOptions.getGoogleCloudStorageReadOptions()); } /** Returns an instance of {@link GcsUtil} based on the given parameters. */ @@ -164,7 +173,8 @@ public static GcsUtil create( ExecutorService executorService, Credentials credentials, @Nullable Integer uploadBufferSizeBytes, - GcsCountersOptions gcsCountersOptions) { + GcsCountersOptions gcsCountersOptions, + GoogleCloudStorageReadOptions gcsReadOptions) { return new GcsUtil( storageClient, httpRequestInitializer, @@ -173,7 +183,8 @@ public static GcsUtil create( credentials, uploadBufferSizeBytes, null, - gcsCountersOptions); + gcsCountersOptions, + gcsReadOptions); } } @@ -249,7 +260,8 @@ public static boolean isWildcard(GcsPath spec) { Credentials credentials, @Nullable Integer uploadBufferSizeBytes, @Nullable Integer rewriteDataOpBatchLimit, - GcsCountersOptions gcsCountersOptions) { + GcsCountersOptions gcsCountersOptions, + GoogleCloudStorageReadOptions gcsReadOptions) { this.storageClient = storageClient; this.httpRequestInitializer = httpRequestInitializer; this.uploadBufferSizeBytes = uploadBufferSizeBytes; @@ -260,6 +272,7 @@ public static boolean isWildcard(GcsPath spec) { googleCloudStorageOptions = GoogleCloudStorageOptions.builder() .setAppName("Beam") + .setReadChannelOptions(gcsReadOptions) .setGrpcEnabled(shouldUseGrpc) .build(); googleCloudStorage = @@ -565,7 +578,9 @@ private SeekableByteChannel wrapInCounting( public SeekableByteChannel open(GcsPath path) throws IOException { String bucket = path.getBucket(); SeekableByteChannel channel = - googleCloudStorage.open(new StorageResourceId(path.getBucket(), path.getObject())); + googleCloudStorage.open( + new StorageResourceId(path.getBucket(), path.getObject()), + this.googleCloudStorageOptions.getReadChannelOptions()); return wrapInCounting(channel, bucket); } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java index f5075a3f2c55..26d98125a3af 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java @@ -55,6 +55,8 @@ public void testGcpCoreApiSurface() throws Exception { classesInPackage("com.google.api.services.storage"), classesInPackage("com.google.auth"), classesInPackage("com.fasterxml.jackson.annotation"), + classesInPackage("com.google.cloud.hadoop.gcsio"), + classesInPackage("com.google.common.collect"), // Via gcs-connector ReadOptions builder classesInPackage("java"), classesInPackage("javax"), classesInPackage("org.apache.beam.sdk"), diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java index bd7f46ec8951..97082572ce41 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java @@ -177,6 +177,32 @@ public void testCreationWithGcsUtilProvided() { assertSame(gcsUtil, pipelineOptions.getGcsUtil()); } + @Test + public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Exception { + GoogleCloudStorageReadOptions readOptions = + GoogleCloudStorageReadOptions.builder() + .setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO) + .setSupportGzipEncoding(true) + .setFastFailOnNotFound(false) + .build(); + + GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); + pipelineOptions.setGoogleCloudStorageReadOptions(readOptions); + + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class); + Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any())) + .thenReturn(Mockito.mock(SeekableByteChannel.class)); + gcsUtil.setCloudStorageImpl(googleCloudStorageMock); + + assertEquals(readOptions, pipelineOptions.getGoogleCloudStorageReadOptions()); + + // Assert read options are passed to GCS calls + pipelineOptions.getGcsUtil().open(GcsPath.fromUri("gs://bucket/path")); + Mockito.verify(googleCloudStorageMock, Mockito.times(1)) + .open(StorageResourceId.fromStringPath("gs://bucket/path"), readOptions); + } + @Test public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception { GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); @@ -1630,7 +1656,8 @@ public static GcsUtilMock createMock(PipelineOptions options) { : null, gcsOptions.getEnableBucketWriteMetricCounter() ? gcsOptions.getGcsWriteCounterPrefix() - : null)); + : null), + gcsOptions.getGoogleCloudStorageReadOptions()); } private GcsUtilMock( @@ -1641,7 +1668,8 @@ private GcsUtilMock( Credentials credentials, @Nullable Integer uploadBufferSizeBytes, @Nullable Integer rewriteDataOpBatchLimit, - GcsCountersOptions gcsCountersOptions) { + GcsCountersOptions gcsCountersOptions, + GoogleCloudStorageReadOptions gcsReadOptions) { super( storageClient, httpRequestInitializer, @@ -1650,7 +1678,8 @@ private GcsUtilMock( credentials, uploadBufferSizeBytes, rewriteDataOpBatchLimit, - gcsCountersOptions); + gcsCountersOptions, + gcsReadOptions); } @Override