diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java index 018e538f552c..15230c8adef9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -49,6 +50,9 @@ public abstract class BigtableConfig implements Serializable { /** Returns the app profile being read from. */ public abstract @Nullable ValueProvider getAppProfileId(); + /** Returns the Bigtable client override. */ + public abstract @Nullable BigtableClientOverride getBigtableClientOverride(); + /** * Returns the Google Cloud Bigtable instance being written to, and other parameters. * @@ -113,6 +117,8 @@ abstract Builder setBigtableOptionsConfigurator( abstract Builder setChannelCount(int count); + abstract Builder setBigtableClientOverride(BigtableClientOverride clientOverride); + abstract BigtableConfig build(); } @@ -156,6 +162,12 @@ public BigtableConfig withEmulator(String emulatorHost) { return toBuilder().setEmulatorHost(emulatorHost).build(); } + @VisibleForTesting + BigtableConfig withBigtableClientOverride(BigtableClientOverride clientOverride) { + checkArgument(clientOverride != null, "clientOverride can not be null"); + return toBuilder().setBigtableClientOverride(clientOverride).build(); + } + void validate() { checkArgument( (getProjectId() != null diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index a655a29e92b2..9f3c627a89ef 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableChangeStreamAccessor; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn; @@ -1988,6 +1989,24 @@ public ReadChangeStream withMetadataTableAppProfileId(String appProfileId) { .build(); } + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that overrides the config of data and/or + * admin client for streaming changes and for managing the metadata. For testing purposes only. + * Not intended for use. + * + *

Does not modify this object. + */ + @VisibleForTesting + ReadChangeStream withBigtableClientOverride(BigtableClientOverride clientOverride) { + BigtableConfig config = getBigtableConfig(); + BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig(); + return toBuilder() + .setBigtableConfig(config.withBigtableClientOverride(clientOverride)) + .setMetadataTableBigtableConfig( + metadataTableConfig.withBigtableClientOverride(clientOverride)) + .build(); + } + /** * Returns a new {@link BigtableIO.ReadChangeStream} that, if set to true, will create or update * metadata table before launching pipeline. Otherwise, it is expected that a metadata table diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java index ecf9a7039598..cb296aef6c28 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java @@ -210,6 +210,13 @@ private static BigtableChangeStreamAccessor createAccessor(@NonNull BigtableConf .setMaxAttempts(10) .build()); + final BigtableClientOverride clientOverride = bigtableConfig.getBigtableClientOverride(); + if (clientOverride != null) { + clientOverride.updateTableAdminClientSettings(tableAdminSettingsBuilder); + clientOverride.updateInstanceAdminClientSettings(instanceAdminSettingsBuilder); + clientOverride.updateDataClientSettings(dataSettingsBuilder); + } + BigtableDataClient dataClient = BigtableDataClient.create(dataSettingsBuilder.build()); BigtableTableAdminClient tableAdminClient = BigtableTableAdminClient.create(tableAdminSettingsBuilder.build()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableClientOverride.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableClientOverride.java new file mode 100644 index 000000000000..72b3e39871ef --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableClientOverride.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao; + +import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import java.io.IOException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; + +/** Override the configuration of Cloud Bigtable data and admin client. */ +@VisibleForTesting +public interface BigtableClientOverride { + /** + * Update {@link BigtableInstanceAdminSettings.Builder} with custom configurations. + * + *

For example, to update the admin api endpoint. + * + * @param builder builds the instance admin client + * @throws IOException when dependency initialization fails + */ + void updateInstanceAdminClientSettings(BigtableInstanceAdminSettings.Builder builder) + throws IOException; + + /** + * Update {@link BigtableTableAdminSettings.Builder} with custom configurations. + * + *

For example, to update the admin api endpoint. + * + * @param builder builds the table admin client + * @throws IOException when dependency initialization fails + */ + void updateTableAdminClientSettings(BigtableTableAdminSettings.Builder builder) + throws IOException; + + /** + * Update {@link BigtableDataSettings.Builder} with custom configurations. + * + * @param builder builds the data client + * @throws IOException when dependency initialization fails + */ + void updateDataClientSettings(BigtableDataSettings.Builder builder) throws IOException; +}