Skip to content

Commit

Permalink
Merge pull request #28374: Add mechanism to override bigtable client …
Browse files Browse the repository at this point in the history
…for testing
  • Loading branch information
kennknowles authored Sep 15, 2023
2 parents a0f14a8 + 13b12da commit 23b583b
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
Expand All @@ -49,6 +50,9 @@ public abstract class BigtableConfig implements Serializable {
/** Returns the app profile being read from. */
public abstract @Nullable ValueProvider<String> getAppProfileId();

/** Returns the Bigtable client override. */
public abstract @Nullable BigtableClientOverride getBigtableClientOverride();

/**
* Returns the Google Cloud Bigtable instance being written to, and other parameters.
*
Expand Down Expand Up @@ -113,6 +117,8 @@ abstract Builder setBigtableOptionsConfigurator(

abstract Builder setChannelCount(int count);

abstract Builder setBigtableClientOverride(BigtableClientOverride clientOverride);

abstract BigtableConfig build();
}

Expand Down Expand Up @@ -156,6 +162,12 @@ public BigtableConfig withEmulator(String emulatorHost) {
return toBuilder().setEmulatorHost(emulatorHost).build();
}

@VisibleForTesting
BigtableConfig withBigtableClientOverride(BigtableClientOverride clientOverride) {
checkArgument(clientOverride != null, "clientOverride can not be null");
return toBuilder().setBigtableClientOverride(clientOverride).build();
}

void validate() {
checkArgument(
(getProjectId() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableChangeStreamAccessor;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn;
Expand Down Expand Up @@ -1988,6 +1989,24 @@ public ReadChangeStream withMetadataTableAppProfileId(String appProfileId) {
.build();
}

/**
* Returns a new {@link BigtableIO.ReadChangeStream} that overrides the config of data and/or
* admin client for streaming changes and for managing the metadata. For testing purposes only.
* Not intended for use.
*
* <p>Does not modify this object.
*/
@VisibleForTesting
ReadChangeStream withBigtableClientOverride(BigtableClientOverride clientOverride) {
BigtableConfig config = getBigtableConfig();
BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig();
return toBuilder()
.setBigtableConfig(config.withBigtableClientOverride(clientOverride))
.setMetadataTableBigtableConfig(
metadataTableConfig.withBigtableClientOverride(clientOverride))
.build();
}

/**
* Returns a new {@link BigtableIO.ReadChangeStream} that, if set to true, will create or update
* metadata table before launching pipeline. Otherwise, it is expected that a metadata table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ private static BigtableChangeStreamAccessor createAccessor(@NonNull BigtableConf
.setMaxAttempts(10)
.build());

final BigtableClientOverride clientOverride = bigtableConfig.getBigtableClientOverride();
if (clientOverride != null) {
clientOverride.updateTableAdminClientSettings(tableAdminSettingsBuilder);
clientOverride.updateInstanceAdminClientSettings(instanceAdminSettingsBuilder);
clientOverride.updateDataClientSettings(dataSettingsBuilder);
}

BigtableDataClient dataClient = BigtableDataClient.create(dataSettingsBuilder.build());
BigtableTableAdminClient tableAdminClient =
BigtableTableAdminClient.create(tableAdminSettingsBuilder.build());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;

import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import java.io.IOException;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;

/** Override the configuration of Cloud Bigtable data and admin client. */
@VisibleForTesting
public interface BigtableClientOverride {
/**
* Update {@link BigtableInstanceAdminSettings.Builder} with custom configurations.
*
* <p>For example, to update the admin api endpoint.
*
* @param builder builds the instance admin client
* @throws IOException when dependency initialization fails
*/
void updateInstanceAdminClientSettings(BigtableInstanceAdminSettings.Builder builder)
throws IOException;

/**
* Update {@link BigtableTableAdminSettings.Builder} with custom configurations.
*
* <p>For example, to update the admin api endpoint.
*
* @param builder builds the table admin client
* @throws IOException when dependency initialization fails
*/
void updateTableAdminClientSettings(BigtableTableAdminSettings.Builder builder)
throws IOException;

/**
* Update {@link BigtableDataSettings.Builder} with custom configurations.
*
* @param builder builds the data client
* @throws IOException when dependency initialization fails
*/
void updateDataClientSettings(BigtableDataSettings.Builder builder) throws IOException;
}

0 comments on commit 23b583b

Please sign in to comment.