From 9fcd0c9184479e5d3211d82747fd4f56e708cbe5 Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Tue, 31 Oct 2023 10:45:30 -0400 Subject: [PATCH] Add Cloud Bigtable Change Stream integration tests (#29127) * Add Cloud Bigtable Change Stream integration tests Change-Id: I68a877d5686f1898686b18491c6b4aff5e699862 * Add default value to bigtable environment endpoint Change-Id: I490fca7ba2f24b15faa288d6f2c3f209db59f948 * Move Bigtable options to main and register it automatically so we can set instanceId Change-Id: I4d5e5347dbba09a0e1ee170b8aa911d2f0a772ef * Remove bigtableProject from BigtableTestOptions. There was no way to set it explicitly prior to this Change-Id: I5ee3c663d3120ee85970ae5f24a962b9535323b3 * Add comment explaining why we build the test pipeline in a different package Change-Id: I228dc61ca0b27131cd38a3ab24f136d0f924d9f7 * Change instanceID to bigtableInstanceId to clarify the value we're specifying Change-Id: Ic66c4c061ed2f5979f6a530905e3cbbddd238f18 * Change BigtableChangeStreamTestOptions to use more specific field names to avoid conflicts Change-Id: I489850e07812058e8c7ebb3c9878eae9d4bc9f06 --- .../io/google-cloud-platform/build.gradle | 2 + .../BigtableChangeStreamTestOptions.java | 30 ++ .../common/GcpIoPipelineOptionsRegistrar.java | 2 + .../io/gcp/bigtable/BigtableTestUtils.java | 27 +- .../it/BigtableChangeStreamIT.java | 361 ++++++++++++++++++ ...BigtableClientIntegrationTestOverride.java | 83 ++++ 6 files changed, 504 insertions(+), 1 deletion(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/BigtableChangeStreamTestOptions.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableChangeStreamIT.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableClientIntegrationTestOverride.java diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index d66122e4d107..b0122035a015 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -187,12 +187,14 @@ task integrationTest(type: Test, dependsOn: processTestResources) { def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests' def firestoreDb = project.findProperty('firestoreDb') ?: 'firestoredb' def firestoreHost = project.findProperty('firestoreHost') ?: 'batch-firestore.googleapis.com:443' + def bigtableChangeStreamInstanceId = project.findProperty('bigtableChangeStreamInstanceId') ?: 'beam-test' systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ "--runner=DirectRunner", "--project=${gcpProject}", "--tempRoot=${gcpTempRoot}", "--firestoreDb=${firestoreDb}", "--firestoreHost=${firestoreHost}", + "--bigtableChangeStreamInstanceId=${bigtableChangeStreamInstanceId}", ]) // Disable Gradle cache: these ITs interact with live service that should always be considered "out of date" diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/BigtableChangeStreamTestOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/BigtableChangeStreamTestOptions.java new file mode 100644 index 000000000000..71303a0e84ac --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/BigtableChangeStreamTestOptions.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.testing.TestPipelineOptions; + +public interface BigtableChangeStreamTestOptions extends TestPipelineOptions { + @Description("Instance ID for Bigtable Change Stream") + @Default.String("beam-test") + String getBigtableChangeStreamInstanceId(); + + void setBigtableChangeStreamInstanceId(String value); +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java index f1ff827fc633..6cfc03c9eaa7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.BigtableChangeStreamTestOptions; import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions; import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -38,6 +39,7 @@ public Iterable> getPipelineOptions() { .add(PubsubOptions.class) .add(FirestoreOptions.class) .add(TestBigQueryOptions.class) + .add(BigtableChangeStreamTestOptions.class) .build(); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java index c35b7c54c4d9..6bd2f3b25b3c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java @@ -31,12 +31,14 @@ import com.google.bigtable.v2.Mutation; import com.google.protobuf.ByteString; import java.util.List; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; +import org.joda.time.Instant; -class BigtableTestUtils { +public class BigtableTestUtils { static final String BOOL_COLUMN = "boolColumn"; static final String LONG_COLUMN = "longColumn"; @@ -144,4 +146,27 @@ private static Cell createCell(ByteString value, long timestamp, String... label } return builder.build(); } + + // We have to build the pipeline at this package level and not changestreams package because + // endTime is package private and we can only create a pipeline with endTime here. Setting endTime + // allows the tests to predictably terminate. + public static BigtableIO.ReadChangeStream buildTestPipelineInput( + String projectId, + String instanceId, + String tableId, + String appProfileId, + String metadataTableName, + Instant startTime, + Instant endTime, + BigtableClientOverride clientOverride) { + return BigtableIO.readChangeStream() + .withProjectId(projectId) + .withInstanceId(instanceId) + .withTableId(tableId) + .withAppProfileId(appProfileId) + .withMetadataTableTableId(metadataTableName) + .withStartTime(startTime) + .withEndTime(endTime) + .withBigtableClientOverride(clientOverride); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableChangeStreamIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableChangeStreamIT.java new file mode 100644 index 000000000000..e6455cbfd581 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableChangeStreamIT.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.it; + +import com.google.api.gax.batching.Batcher; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; +import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; +import com.google.cloud.bigtable.admin.v2.models.UpdateTableRequest; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.Range; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.Arrays; +import java.util.stream.Collectors; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.common.IOITHelper; +import org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.BigtableChangeStreamTestOptions; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** End-to-end tests of Bigtable Change Stream. */ +@SuppressWarnings("FutureReturnValueIgnored") +@RunWith(JUnit4.class) +public class BigtableChangeStreamIT { + private static final Logger LOG = LoggerFactory.getLogger(BigtableChangeStreamIT.class); + private static final String COLUMN_FAMILY1 = "CF"; + private static final String COLUMN_FAMILY2 = "CF2"; + private static final String COLUMN_QUALIFIER = "CQ"; + private static String projectId; + private static String instanceId; + private static String tableId; + private static String appProfileId; + private static String metadataTableId; + private static BigtableTableAdminClient adminClient; + private static BigtableDataClient dataClient; + private static BigtableClientIntegrationTestOverride bigtableClientOverride; + private static Batcher mutationBatcher; + private static BigtableChangeStreamTestOptions options; + private transient TestPipeline pipeline; + + @BeforeClass + public static void beforeClass() throws IOException { + options = IOITHelper.readIOTestPipelineOptions(BigtableChangeStreamTestOptions.class); + LOG.info("Pipeline options: {}", options); + projectId = options.as(GcpOptions.class).getProject(); + instanceId = options.getBigtableChangeStreamInstanceId(); + + long randomId = Instant.now().getMillis(); + tableId = "beam-change-stream-test-" + randomId; + metadataTableId = "beam-change-stream-test-md-" + randomId; + appProfileId = "default"; + + bigtableClientOverride = new BigtableClientIntegrationTestOverride(); + LOG.info(bigtableClientOverride.toString()); + + BigtableDataSettings.Builder dataSettingsBuilder = BigtableDataSettings.newBuilder(); + BigtableTableAdminSettings.Builder tableAdminSettingsBuilder = + BigtableTableAdminSettings.newBuilder(); + dataSettingsBuilder.setProjectId(projectId); + tableAdminSettingsBuilder.setProjectId(projectId); + dataSettingsBuilder.setInstanceId(instanceId); + tableAdminSettingsBuilder.setInstanceId(instanceId); + dataSettingsBuilder.setAppProfileId(appProfileId); + // TODO: Remove this later. But for now, disable direct path. + dataSettingsBuilder + .stubSettings() + .setTransportChannelProvider( + EnhancedBigtableStubSettings.defaultGrpcTransportProviderBuilder() + .setAttemptDirectPath(false) + .build()); + + bigtableClientOverride.updateDataClientSettings(dataSettingsBuilder); + bigtableClientOverride.updateTableAdminClientSettings(tableAdminSettingsBuilder); + + // These clients are used to modify the table and write to it + dataClient = BigtableDataClient.create(dataSettingsBuilder.build()); + adminClient = BigtableTableAdminClient.create(tableAdminSettingsBuilder.build()); + + // Create change stream enabled table + adminClient.createTable( + CreateTableRequest.of(tableId) + .addChangeStreamRetention(org.threeten.bp.Duration.ofDays(1)) + .addFamily(COLUMN_FAMILY1) + .addFamily(COLUMN_FAMILY2)); + + mutationBatcher = dataClient.newBulkMutationBatcher(tableId); + } + + @Before + public void before() { + pipeline = TestPipeline.fromOptions(options).enableAbandonedNodeEnforcement(false); + } + + @AfterClass + public static void afterClass() { + if (adminClient != null) { + if (adminClient.exists(tableId)) { + adminClient.updateTable(UpdateTableRequest.of(tableId).disableChangeStreamRetention()); + adminClient.deleteTable(tableId); + adminClient.deleteTable(metadataTableId); + } + adminClient.close(); + } + if (dataClient != null) { + dataClient.close(); + } + } + + @Test + public void testReadBigtableChangeStream() throws InterruptedException { + Instant startTime = Instant.now(); + String rowKey = "rowKeySetCell"; + RowMutationEntry setCellEntry = + RowMutationEntry.create(rowKey).setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, "cell value 1"); + mutationBatcher.add(setCellEntry); + mutationBatcher.flush(); + Instant endTime = Instant.now().plus(Duration.standardSeconds(1)); + + PCollection changeStream = buildPipeline(startTime, endTime); + PAssert.that(changeStream).containsInAnyOrder(setCellEntry.toProto()); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testDeleteRow() throws InterruptedException { + Instant startTime = Instant.now(); + String rowKeyToDelete = "rowKeyToDelete"; + RowMutationEntry setCellMutationToDelete = + RowMutationEntry.create(rowKeyToDelete) + .setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, "cell value 1"); + RowMutationEntry deleteRowMutation = RowMutationEntry.create(rowKeyToDelete).deleteRow(); + mutationBatcher.add(setCellMutationToDelete); + mutationBatcher.flush(); + mutationBatcher.add(deleteRowMutation); + mutationBatcher.flush(); + Instant endTime = Instant.now().plus(Duration.standardSeconds(1)); + + PCollection changeStream = buildPipeline(startTime, endTime); + PAssert.that(changeStream) + .containsInAnyOrder( + setCellMutationToDelete.toProto(), + // Delete row becomes one deleteFamily per family + RowMutationEntry.create(rowKeyToDelete) + .deleteFamily(COLUMN_FAMILY1) + .deleteFamily(COLUMN_FAMILY2) + .toProto()); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testDeleteColumnFamily() throws InterruptedException { + Instant startTime = Instant.now(); + String cellValue = "cell value 1"; + String rowKeyMultiFamily = "rowKeyMultiFamily"; + RowMutationEntry setCells = + RowMutationEntry.create(rowKeyMultiFamily) + .setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, cellValue) + .setCell(COLUMN_FAMILY2, COLUMN_QUALIFIER, cellValue); + mutationBatcher.add(setCells); + mutationBatcher.flush(); + RowMutationEntry deleteCF2 = + RowMutationEntry.create(rowKeyMultiFamily).deleteFamily(COLUMN_FAMILY2); + mutationBatcher.add(deleteCF2); + mutationBatcher.flush(); + Instant endTime = Instant.now().plus(Duration.standardSeconds(1)); + + PCollection changeStream = buildPipeline(startTime, endTime); + PAssert.that(changeStream).containsInAnyOrder(setCells.toProto(), deleteCF2.toProto()); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testDeleteCell() throws InterruptedException { + Instant startTime = Instant.now(); + String cellValue = "cell value 1"; + String rowKeyMultiCell = "rowKeyMultiCell"; + RowMutationEntry setCells = + RowMutationEntry.create(rowKeyMultiCell) + .setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, cellValue) + .setCell(COLUMN_FAMILY1, "CQ2", cellValue); + mutationBatcher.add(setCells); + mutationBatcher.flush(); + RowMutationEntry deleteCQ2 = + RowMutationEntry.create(rowKeyMultiCell) + // need to set timestamp range to make change stream output match + .deleteCells( + COLUMN_FAMILY1, + ByteString.copyFromUtf8("CQ2"), + Range.TimestampRange.create( + startTime.getMillis() * 1000, + startTime.plus(Duration.standardMinutes(2)).getMillis() * 1000)); + mutationBatcher.add(deleteCQ2); + mutationBatcher.flush(); + Instant endTime = Instant.now().plus(Duration.standardSeconds(1)); + + PCollection changeStream = buildPipeline(startTime, endTime); + PAssert.that(changeStream).containsInAnyOrder(setCells.toProto(), deleteCQ2.toProto()); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testComplexMutation() throws InterruptedException { + Instant startTime = Instant.now(); + String rowKey = "rowKeyComplex"; + // We'll delete this in the next mutation + RowMutationEntry setCell = + RowMutationEntry.create(rowKey).setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, "cell value 1"); + mutationBatcher.add(setCell); + mutationBatcher.flush(); + RowMutationEntry complexMutation = + RowMutationEntry.create(rowKey) + .setCell(COLUMN_FAMILY1, "CQ2", "cell value 2") + .setCell(COLUMN_FAMILY1, "CQ3", "cell value 3") + // need to set timestamp range to make change stream output match + .deleteCells( + COLUMN_FAMILY1, + ByteString.copyFromUtf8(COLUMN_QUALIFIER), + Range.TimestampRange.create( + startTime.getMillis() * 1000, + startTime.plus(Duration.standardMinutes(2)).getMillis() * 1000)); + mutationBatcher.add(complexMutation); + mutationBatcher.flush(); + Instant endTime = Instant.now().plus(Duration.standardSeconds(1)); + + PCollection changeStream = buildPipeline(startTime, endTime); + PAssert.that(changeStream).containsInAnyOrder(setCell.toProto(), complexMutation.toProto()); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testLargeMutation() throws InterruptedException { + Instant startTime = Instant.now(); + // test set cell w size > 1MB so it triggers chunking + char[] chars = new char[1024 * 1500]; + Arrays.fill(chars, '\u200B'); // zero-width space + String largeString = String.valueOf(chars); + String rowKeyLargeCell = "rowKeyLargeCell"; + RowMutationEntry setLargeCell = + RowMutationEntry.create(rowKeyLargeCell) + .setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, largeString); + mutationBatcher.add(setLargeCell); + mutationBatcher.flush(); + Instant endTime = Instant.now().plus(Duration.standardSeconds(1)); + + PCollection changeStream = buildPipeline(startTime, endTime); + PAssert.that(changeStream).containsInAnyOrder(setLargeCell.toProto()); + pipeline.run().waitUntilFinish(); + } + + @Test + public void testManyMutations() throws InterruptedException { + Instant startTime = Instant.now(); + // test set cell w size > 1MB so it triggers chunking + char[] chars = new char[1024 * 3]; + Arrays.fill(chars, '\u200B'); // zero-width space + String largeString = String.valueOf(chars); + + ImmutableList.Builder originalWrites = ImmutableList.builder(); + for (int i = 0; i < 100; ++i) { + String rowKey = "rowKey" + i; + // SetCell. + RowMutationEntry setLargeCell = + RowMutationEntry.create(rowKey).setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, largeString); + // DeleteFamily. + RowMutationEntry deleteFamily = RowMutationEntry.create(rowKey).deleteFamily(COLUMN_FAMILY1); + // DeleteCells. + RowMutationEntry deleteCells = + RowMutationEntry.create(rowKey) + // need to set timestamp range to make change stream output match + .deleteCells( + COLUMN_FAMILY1, + ByteString.copyFromUtf8(COLUMN_QUALIFIER), + Range.TimestampRange.create( + startTime.getMillis() * 1000, + startTime.plus(Duration.standardMinutes(2)).getMillis() * 1000)); + // Apply the mutations. + originalWrites.add(setLargeCell); + mutationBatcher.add(setLargeCell); + mutationBatcher.flush(); + + originalWrites.add(deleteFamily); + mutationBatcher.add(deleteFamily); + mutationBatcher.flush(); + + originalWrites.add(deleteCells); + mutationBatcher.add(deleteCells); + mutationBatcher.flush(); + } + Instant endTime = Instant.now().plus(Duration.standardSeconds(1)); + + PCollection changeStream = buildPipeline(startTime, endTime); + PAssert.that(changeStream) + .containsInAnyOrder( + originalWrites.build().stream() + .map(RowMutationEntry::toProto) + .collect(Collectors.toList())); + pipeline.run().waitUntilFinish(); + } + + private PCollection buildPipeline(Instant startTime, Instant endTime) { + return pipeline + .apply( + BigtableTestUtils.buildTestPipelineInput( + projectId, + instanceId, + tableId, + appProfileId, + metadataTableId, + startTime, + endTime, + bigtableClientOverride)) + .apply(ParDo.of(new ConvertToEntry())); + } + + private static class ConvertToEntry + extends DoFn, MutateRowsRequest.Entry> { + @ProcessElement + public void processElement( + @Element KV element, + OutputReceiver out) { + out.output(element.getValue().toRowMutationEntry().toProto()); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableClientIntegrationTestOverride.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableClientIntegrationTestOverride.java new file mode 100644 index 000000000000..0d6766aa20df --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableClientIntegrationTestOverride.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.it; + +import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings; +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.errorprone.annotations.CheckReturnValue; +import java.io.Serializable; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride; + +/** Implements BigtableClientOverride to override data and admin endpoints. */ +@CheckReturnValue +final class BigtableClientIntegrationTestOverride implements Serializable, BigtableClientOverride { + private static final long serialVersionUID = 4188505491566837311L; + + // The address of the admin API endpoint. + private static final String ADMIN_ENDPOINT_ENV_VAR = + getenv("BIGTABLE_ENV_ADMIN_ENDPOINT", "bigtableadmin.googleapis.com:443"); + // The address of the data API endpoint. + private static final String DATA_ENDPOINT_ENV_VAR = + getenv("BIGTABLE_ENV_DATA_ENDPOINT", "bigtable.googleapis.com:443"); + + private final String adminEndpoint; + private final String dataEndpoint; + + @Override + public String toString() { + return "BigtableClientIntegrationTestOverride{" + + "adminEndpoint=" + + adminEndpoint + + ", dataEndpoint=" + + dataEndpoint + + "}"; + } + + /** Applies the test environment settings to the builder. */ + @Override + public void updateInstanceAdminClientSettings(BigtableInstanceAdminSettings.Builder builder) { + builder.stubSettings().setEndpoint(adminEndpoint); + } + + /** Applies the test environment settings to the builder. */ + @Override + public void updateTableAdminClientSettings(BigtableTableAdminSettings.Builder builder) { + builder.stubSettings().setEndpoint(adminEndpoint); + } + + /** Applies the test environment settings to the builder. */ + @Override + public void updateDataClientSettings(BigtableDataSettings.Builder builder) { + builder.stubSettings().setEndpoint(dataEndpoint); + } + + /** Returns the value of the environment variable, or default string if not found. */ + private static String getenv(String name, String defaultValue) { + final String value = System.getenv(name); + if (value != null) { + return value; + } + return defaultValue; + } + + BigtableClientIntegrationTestOverride() { + adminEndpoint = ADMIN_ENDPOINT_ENV_VAR; + dataEndpoint = DATA_ENDPOINT_ENV_VAR; + } +}