Skip to content

Commit

Permalink
Add Cloud Bigtable Change Stream integration tests (#29127)
Browse files Browse the repository at this point in the history
* Add Cloud Bigtable Change Stream integration tests

Change-Id: I68a877d5686f1898686b18491c6b4aff5e699862

* Add default value to bigtable environment endpoint

Change-Id: I490fca7ba2f24b15faa288d6f2c3f209db59f948

* Move Bigtable options to main and register it automatically so we can set instanceId

Change-Id: I4d5e5347dbba09a0e1ee170b8aa911d2f0a772ef

* Remove bigtableProject from BigtableTestOptions. There was no way to set it explicitly prior to this

Change-Id: I5ee3c663d3120ee85970ae5f24a962b9535323b3

* Add comment explaining why we build the test pipeline in a different package

Change-Id: I228dc61ca0b27131cd38a3ab24f136d0f924d9f7

* Change instanceID to bigtableInstanceId to clarify the value we're specifying

Change-Id: Ic66c4c061ed2f5979f6a530905e3cbbddd238f18

* Change BigtableChangeStreamTestOptions to use more specific field names to avoid conflicts

Change-Id: I489850e07812058e8c7ebb3c9878eae9d4bc9f06
  • Loading branch information
tonytanger authored Oct 31, 2023
1 parent 9a97ec8 commit 9fcd0c9
Show file tree
Hide file tree
Showing 6 changed files with 504 additions and 1 deletion.
2 changes: 2 additions & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,14 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests'
def firestoreDb = project.findProperty('firestoreDb') ?: 'firestoredb'
def firestoreHost = project.findProperty('firestoreHost') ?: 'batch-firestore.googleapis.com:443'
def bigtableChangeStreamInstanceId = project.findProperty('bigtableChangeStreamInstanceId') ?: 'beam-test'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=DirectRunner",
"--project=${gcpProject}",
"--tempRoot=${gcpTempRoot}",
"--firestoreDb=${firestoreDb}",
"--firestoreHost=${firestoreHost}",
"--bigtableChangeStreamInstanceId=${bigtableChangeStreamInstanceId}",
])

// Disable Gradle cache: these ITs interact with live service that should always be considered "out of date"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams;

import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.testing.TestPipelineOptions;

public interface BigtableChangeStreamTestOptions extends TestPipelineOptions {
@Description("Instance ID for Bigtable Change Stream")
@Default.String("beam-test")
String getBigtableChangeStreamInstanceId();

void setBigtableChangeStreamInstanceId(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.BigtableChangeStreamTestOptions;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -38,6 +39,7 @@ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
.add(PubsubOptions.class)
.add(FirestoreOptions.class)
.add(TestBigQueryOptions.class)
.add(BigtableChangeStreamTestOptions.class)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import com.google.bigtable.v2.Mutation;
import com.google.protobuf.ByteString;
import java.util.List;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
import org.joda.time.Instant;

class BigtableTestUtils {
public class BigtableTestUtils {

static final String BOOL_COLUMN = "boolColumn";
static final String LONG_COLUMN = "longColumn";
Expand Down Expand Up @@ -144,4 +146,27 @@ private static Cell createCell(ByteString value, long timestamp, String... label
}
return builder.build();
}

// We have to build the pipeline at this package level and not changestreams package because
// endTime is package private and we can only create a pipeline with endTime here. Setting endTime
// allows the tests to predictably terminate.
public static BigtableIO.ReadChangeStream buildTestPipelineInput(
String projectId,
String instanceId,
String tableId,
String appProfileId,
String metadataTableName,
Instant startTime,
Instant endTime,
BigtableClientOverride clientOverride) {
return BigtableIO.readChangeStream()
.withProjectId(projectId)
.withInstanceId(instanceId)
.withTableId(tableId)
.withAppProfileId(appProfileId)
.withMetadataTableTableId(metadataTableName)
.withStartTime(startTime)
.withEndTime(endTime)
.withBigtableClientOverride(clientOverride);
}
}
Loading

0 comments on commit 9fcd0c9

Please sign in to comment.