Skip to content

Commit

Permalink
Propagate gcs-connector options to GcsUtil (#32769)
Browse files Browse the repository at this point in the history
* Propagate gcs-connector options to GcsUtil

* newline

* Update CHANGES.md

* Remove Hadoop dependency

* Remove unused deps

* Drop googleCloudStorageReadOptions member variable

* add missing package

* fixup CHANGES.md
  • Loading branch information
clairemcginty authored Dec 3, 2024
1 parent 532880a commit 63d89cd
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

## I/Os

* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.gcp.options;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import java.util.concurrent.ExecutorService;
import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
Expand All @@ -44,6 +45,15 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline

void setGcsUtil(GcsUtil value);

@JsonIgnore
@Description(
"The GoogleCloudStorageReadOptions instance that should be used to read from Google Cloud Storage.")
@Default.InstanceFactory(GcsUtil.GcsReadOptionsFactory.class)
@Hidden
GoogleCloudStorageReadOptions getGoogleCloudStorageReadOptions();

void setGoogleCloudStorageReadOptions(GoogleCloudStorageReadOptions value);

/**
* The ExecutorService instance to use to create threads, can be overridden to specify an
* ExecutorService that is compatible with the user's environment. If unset, the default is to use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ public static GcsCountersOptions create(
}
}

public static class GcsReadOptionsFactory
implements DefaultValueFactory<GoogleCloudStorageReadOptions> {
@Override
public GoogleCloudStorageReadOptions create(PipelineOptions options) {
return GoogleCloudStorageReadOptions.DEFAULT;
}
}

/**
* This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using any transport
* flags specified on the {@link PipelineOptions}.
Expand Down Expand Up @@ -153,7 +161,8 @@ public GcsUtil create(PipelineOptions options) {
: null,
gcsOptions.getEnableBucketWriteMetricCounter()
? gcsOptions.getGcsWriteCounterPrefix()
: null));
: null),
gcsOptions.getGoogleCloudStorageReadOptions());
}

/** Returns an instance of {@link GcsUtil} based on the given parameters. */
Expand All @@ -164,7 +173,8 @@ public static GcsUtil create(
ExecutorService executorService,
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
GcsCountersOptions gcsCountersOptions) {
GcsCountersOptions gcsCountersOptions,
GoogleCloudStorageReadOptions gcsReadOptions) {
return new GcsUtil(
storageClient,
httpRequestInitializer,
Expand All @@ -173,7 +183,8 @@ public static GcsUtil create(
credentials,
uploadBufferSizeBytes,
null,
gcsCountersOptions);
gcsCountersOptions,
gcsReadOptions);
}
}

Expand Down Expand Up @@ -249,7 +260,8 @@ public static boolean isWildcard(GcsPath spec) {
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions) {
GcsCountersOptions gcsCountersOptions,
GoogleCloudStorageReadOptions gcsReadOptions) {
this.storageClient = storageClient;
this.httpRequestInitializer = httpRequestInitializer;
this.uploadBufferSizeBytes = uploadBufferSizeBytes;
Expand All @@ -260,6 +272,7 @@ public static boolean isWildcard(GcsPath spec) {
googleCloudStorageOptions =
GoogleCloudStorageOptions.builder()
.setAppName("Beam")
.setReadChannelOptions(gcsReadOptions)
.setGrpcEnabled(shouldUseGrpc)
.build();
googleCloudStorage =
Expand Down Expand Up @@ -565,7 +578,9 @@ private SeekableByteChannel wrapInCounting(
public SeekableByteChannel open(GcsPath path) throws IOException {
String bucket = path.getBucket();
SeekableByteChannel channel =
googleCloudStorage.open(new StorageResourceId(path.getBucket(), path.getObject()));
googleCloudStorage.open(
new StorageResourceId(path.getBucket(), path.getObject()),
this.googleCloudStorageOptions.getReadChannelOptions());
return wrapInCounting(channel, bucket);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public void testGcpCoreApiSurface() throws Exception {
classesInPackage("com.google.api.services.storage"),
classesInPackage("com.google.auth"),
classesInPackage("com.fasterxml.jackson.annotation"),
classesInPackage("com.google.cloud.hadoop.gcsio"),
classesInPackage("com.google.common.collect"), // Via gcs-connector ReadOptions builder
classesInPackage("java"),
classesInPackage("javax"),
classesInPackage("org.apache.beam.sdk"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,32 @@ public void testCreationWithGcsUtilProvided() {
assertSame(gcsUtil, pipelineOptions.getGcsUtil());
}

@Test
public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Exception {
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder()
.setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO)
.setSupportGzipEncoding(true)
.setFastFailOnNotFound(false)
.build();

GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
pipelineOptions.setGoogleCloudStorageReadOptions(readOptions);

GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class);
Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(SeekableByteChannel.class));
gcsUtil.setCloudStorageImpl(googleCloudStorageMock);

assertEquals(readOptions, pipelineOptions.getGoogleCloudStorageReadOptions());

// Assert read options are passed to GCS calls
pipelineOptions.getGcsUtil().open(GcsPath.fromUri("gs://bucket/path"));
Mockito.verify(googleCloudStorageMock, Mockito.times(1))
.open(StorageResourceId.fromStringPath("gs://bucket/path"), readOptions);
}

@Test
public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception {
GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
Expand Down Expand Up @@ -1630,7 +1656,8 @@ public static GcsUtilMock createMock(PipelineOptions options) {
: null,
gcsOptions.getEnableBucketWriteMetricCounter()
? gcsOptions.getGcsWriteCounterPrefix()
: null));
: null),
gcsOptions.getGoogleCloudStorageReadOptions());
}

private GcsUtilMock(
Expand All @@ -1641,7 +1668,8 @@ private GcsUtilMock(
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions) {
GcsCountersOptions gcsCountersOptions,
GoogleCloudStorageReadOptions gcsReadOptions) {
super(
storageClient,
httpRequestInitializer,
Expand All @@ -1650,7 +1678,8 @@ private GcsUtilMock(
credentials,
uploadBufferSizeBytes,
rewriteDataOpBatchLimit,
gcsCountersOptions);
gcsCountersOptions,
gcsReadOptions);
}

@Override
Expand Down

0 comments on commit 63d89cd

Please sign in to comment.