From dfc441a03a7f6e5b6ee2306eda2b80936cac1ad2 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 12 Dec 2024 14:46:51 -0500 Subject: [PATCH] Parameterize GoogleCloudStorage provider in GcsUtil to unblock gcs-connector 3.x --- .../extensions/gcp/options/GcsOptions.java | 23 ++++++++++ .../beam/sdk/extensions/gcp/util/GcsUtil.java | 43 +++++++++++++------ .../extensions/gcp/GcpCoreApiSurfaceTest.java | 1 + .../sdk/extensions/gcp/util/GcsUtilTest.java | 25 ++++++----- 4 files changed, 69 insertions(+), 23 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java index 1285b88663e7..177713bdcbee 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -18,6 +18,11 @@ package org.apache.beam.sdk.extensions.gcp.options; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.services.storage.Storage; +import com.google.auth.Credentials; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorage; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; import java.util.concurrent.ExecutorService; @@ -54,6 +59,15 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline void setGoogleCloudStorageReadOptions(GoogleCloudStorageReadOptions value); + @JsonIgnore + @Description( + "The GoogleCloudStorageProvider instance that should be used to instantiate a GoogleCloudStorage client.") + @Default.InstanceFactory(GcsUtil.GoogleCloudStorageProviderFactory.class) + @Hidden + GoogleCloudStorageProvider getGoogleCloudStorageProvider(); + + void setGoogleCloudStorageProvider(GoogleCloudStorageProvider value); + /** * The ExecutorService instance to use to create threads, can be overridden to specify an * ExecutorService that is compatible with the user's environment. If unset, the default is to use @@ -208,4 +222,13 @@ public PathValidator create(PipelineOptions options) { .build(); } } + + @FunctionalInterface + public interface GoogleCloudStorageProvider { + GoogleCloudStorage get( + GoogleCloudStorageOptions options, + Storage storage, + Credentials credentials, + HttpRequestInitializer httpRequestInitializer); + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index d58154132a72..885e0aec532a 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -162,7 +162,8 @@ public GcsUtil create(PipelineOptions options) { gcsOptions.getEnableBucketWriteMetricCounter() ? gcsOptions.getGcsWriteCounterPrefix() : null), - gcsOptions.getGoogleCloudStorageReadOptions()); + gcsOptions.getGoogleCloudStorageReadOptions(), + gcsOptions.getGoogleCloudStorageProvider()); } /** Returns an instance of {@link GcsUtil} based on the given parameters. */ @@ -174,7 +175,8 @@ public static GcsUtil create( Credentials credentials, @Nullable Integer uploadBufferSizeBytes, GcsCountersOptions gcsCountersOptions, - GoogleCloudStorageReadOptions gcsReadOptions) { + GoogleCloudStorageReadOptions gcsReadOptions, + GcsOptions.GoogleCloudStorageProvider googleCloudStorageProvider) { return new GcsUtil( storageClient, httpRequestInitializer, @@ -184,7 +186,17 @@ public static GcsUtil create( uploadBufferSizeBytes, null, gcsCountersOptions, - gcsReadOptions); + gcsReadOptions, + googleCloudStorageProvider); + } + } + + public static class GoogleCloudStorageProviderFactory + implements DefaultValueFactory { + @Override + public GcsOptions.GoogleCloudStorageProvider create(PipelineOptions options) { + return (storageOptions, storage, credentials, httpRequestInitializer) -> + new GoogleCloudStorageImpl(storageOptions, storage, credentials); } } @@ -228,6 +240,7 @@ public static GcsUtil create( private final Credentials credentials; private GoogleCloudStorage googleCloudStorage; + private GcsOptions.GoogleCloudStorageProvider googleCloudStorageProvider; private GoogleCloudStorageOptions googleCloudStorageOptions; private final int rewriteDataOpBatchLimit; @@ -261,7 +274,8 @@ public static boolean isWildcard(GcsPath spec) { @Nullable Integer uploadBufferSizeBytes, @Nullable Integer rewriteDataOpBatchLimit, GcsCountersOptions gcsCountersOptions, - GoogleCloudStorageReadOptions gcsReadOptions) { + GoogleCloudStorageReadOptions gcsReadOptions, + GcsOptions.GoogleCloudStorageProvider googleCloudStorageProvider) { this.storageClient = storageClient; this.httpRequestInitializer = httpRequestInitializer; this.uploadBufferSizeBytes = uploadBufferSizeBytes; @@ -269,6 +283,7 @@ public static boolean isWildcard(GcsPath spec) { this.credentials = credentials; this.maxBytesRewrittenPerCall = null; this.numRewriteTokensUsed = null; + this.googleCloudStorageProvider = googleCloudStorageProvider; googleCloudStorageOptions = GoogleCloudStorageOptions.builder() .setAppName("Beam") @@ -276,7 +291,8 @@ public static boolean isWildcard(GcsPath spec) { .setGrpcEnabled(shouldUseGrpc) .build(); googleCloudStorage = - createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials); + googleCloudStorageProvider.get( + googleCloudStorageOptions, storageClient, credentials, httpRequestInitializer); this.batchRequestSupplier = () -> { // Capture reference to this so that the most recent storageClient and initializer @@ -509,6 +525,11 @@ void setCloudStorageImpl(GoogleCloudStorageOptions g) { googleCloudStorageOptions = g; } + @VisibleForTesting + void setCloudStorageProviderImpl(GcsOptions.GoogleCloudStorageProvider p) { + googleCloudStorageProvider = p; + } + /** * Create an integer consumer that updates the counter identified by a prefix and a bucket name. */ @@ -695,8 +716,11 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO GoogleCloudStorageOptions newGoogleCloudStorageOptions = googleCloudStorageOptions.toBuilder().setWriteChannelOptions(wcOptions).build(); GoogleCloudStorage gcpStorage = - createGoogleCloudStorage( - newGoogleCloudStorageOptions, this.storageClient, this.credentials); + googleCloudStorageProvider.get( + newGoogleCloudStorageOptions, + this.storageClient, + this.credentials, + httpRequestInitializer); StorageResourceId resourceId = new StorageResourceId( path.getBucket(), @@ -737,11 +761,6 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO } } - GoogleCloudStorage createGoogleCloudStorage( - GoogleCloudStorageOptions options, Storage storage, Credentials credentials) { - return new GoogleCloudStorageImpl(options, storage, credentials); - } - /** * Checks whether the GCS bucket exists. Similar to {@link #bucketAccessible(GcsPath)}, but throws * exception if the bucket is inaccessible due to permissions or does not exist. diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java index 26d98125a3af..600aacb7077d 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java @@ -52,6 +52,7 @@ public void testGcpCoreApiSurface() throws Exception { classesInPackage("com.google.api.client.http"), classesInPackage("com.google.api.client.json"), classesInPackage("com.google.api.client.util"), + classesInPackage("com.google.cloud.hadoop.util"), classesInPackage("com.google.api.services.storage"), classesInPackage("com.google.auth"), classesInPackage("com.fasterxml.jackson.annotation"), diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java index 97082572ce41..fc2325566239 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java @@ -1626,6 +1626,8 @@ public static GcsUtilMock createMockWithMockStorage(PipelineOptions options, byt gcsUtilMock.googleCloudStorage = googleCloudStorageMock; // set the mock in the super object as well gcsUtilMock.setCloudStorageImpl(gcsUtilMock.googleCloudStorage); + gcsUtilMock.setCloudStorageProviderImpl( + (storageOpts, storage, credentials, httpRequestInitializer) -> googleCloudStorageMock); if (readPayload == null) { Mockito.when(googleCloudStorageMock.create(Mockito.any(), Mockito.any())) @@ -1657,7 +1659,8 @@ public static GcsUtilMock createMock(PipelineOptions options) { gcsOptions.getEnableBucketWriteMetricCounter() ? gcsOptions.getGcsWriteCounterPrefix() : null), - gcsOptions.getGoogleCloudStorageReadOptions()); + gcsOptions.getGoogleCloudStorageReadOptions(), + gcsOptions.getGoogleCloudStorageProvider()); } private GcsUtilMock( @@ -1669,7 +1672,8 @@ private GcsUtilMock( @Nullable Integer uploadBufferSizeBytes, @Nullable Integer rewriteDataOpBatchLimit, GcsCountersOptions gcsCountersOptions, - GoogleCloudStorageReadOptions gcsReadOptions) { + GoogleCloudStorageReadOptions gcsReadOptions, + GcsOptions.GoogleCloudStorageProvider googleCloudStorageProvider) { super( storageClient, httpRequestInitializer, @@ -1679,13 +1683,8 @@ private GcsUtilMock( uploadBufferSizeBytes, rewriteDataOpBatchLimit, gcsCountersOptions, - gcsReadOptions); - } - - @Override - GoogleCloudStorage createGoogleCloudStorage( - GoogleCloudStorageOptions options, Storage storage, Credentials credentials) { - return googleCloudStorage; + gcsReadOptions, + googleCloudStorageProvider); } } @@ -1698,7 +1697,9 @@ public void testCreate() throws IOException { GoogleCloudStorage mockStorage = Mockito.mock(GoogleCloudStorage.class); WritableByteChannel mockChannel = Mockito.mock(WritableByteChannel.class); - gcsUtil.googleCloudStorage = mockStorage; + gcsUtil.setCloudStorageImpl(mockStorage); + gcsUtil.setCloudStorageProviderImpl( + (options, storage, credentials, httpRequestInitializer) -> mockStorage); when(mockStorage.create(any(), any())).thenReturn(mockChannel); @@ -1716,7 +1717,9 @@ public void testCreateWithException() throws IOException { GoogleCloudStorage mockStorage = Mockito.mock(GoogleCloudStorage.class); - gcsUtil.googleCloudStorage = mockStorage; + gcsUtil.setCloudStorageImpl(mockStorage); + gcsUtil.setCloudStorageProviderImpl( + (options, storage, credentials, httpRequestInitializer) -> mockStorage); when(mockStorage.create(any(), any())).thenThrow(new RuntimeException("testException"));