From 103253348ec3f1e193bb124e52dffcc603c929d9 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Wed, 13 Sep 2023 15:34:59 +0000 Subject: [PATCH] Add file loads streaming integration tests (#28312) * file loads streaming integration tests * fix dynamic destinations copy jobs * disable for runnerV2 until pane index is fixed --- .../google-cloud-dataflow-java/build.gradle | 3 + .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 11 +- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 4 + .../io/gcp/bigquery/BigQueryUtilsTest.java | 27 +- .../io/gcp/bigquery/FileLoadsStreamingIT.java | 497 ++++++++++++++++++ 5 files changed, 532 insertions(+), 10 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index f6e2b9b147c5..2acc30455e22 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -612,6 +612,9 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) { exclude '**/FhirIOLROIT.class' exclude '**/FhirIOSearchIT.class' exclude '**/FhirIOPatientEverythingIT.class' + // failing due to pane index not incrementing after Reshuffle: + // https://github.com/apache/beam/issues/28219 + exclude '**/FileLoadsStreamingIT.class' maxParallelForks 4 classpath = configurations.googleCloudPlatformIntegrationTest diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index f5f193aecb74..32ee29738bf8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -34,7 +34,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; -import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; @@ -399,10 +398,12 @@ private WriteResult expandTriggered(PCollection> inpu "Window Into Global Windows", Window.>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) - .apply("Add Void Key", WithKeys.of((Void) null)) - .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder())) - .apply("GroupByKey", GroupByKey.create()) - .apply("Extract Values", Values.create()) + // We use this and the following GBK to aggregate by final destination. + // This way, each destination has its own pane sequence + .apply("AddDestinationKeys", WithKeys.of(result -> result.getKey())) + .setCoder(KvCoder.of(destinationCoder, tempTables.getCoder())) + .apply("GroupTempTablesByFinalDestination", GroupByKey.create()) + .apply("ExtractTempTables", Values.create()) .apply( ParDo.of( new UpdateSchemaDestination( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 0063952d8b13..00ee815c3c93 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -689,6 +689,10 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso } } + if (jsonBQValue instanceof byte[] && fieldType.getTypeName() == TypeName.BYTES) { + return jsonBQValue; + } + if (jsonBQValue instanceof List) { if (fieldType.getCollectionElementType() == null) { throw new IllegalArgumentException( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index 9bff77a16588..f4074cc1a556 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -87,6 +87,7 @@ public class BigQueryUtilsTest { .addNullableField("time0s_0ns", Schema.FieldType.logicalType(SqlTypes.TIME)) .addNullableField("valid", Schema.FieldType.BOOLEAN) .addNullableField("binary", Schema.FieldType.BYTES) + .addNullableField("raw_bytes", Schema.FieldType.BYTES) .addNullableField("numeric", Schema.FieldType.DECIMAL) .addNullableField("boolean", Schema.FieldType.BOOLEAN) .addNullableField("long", Schema.FieldType.INT64) @@ -188,6 +189,9 @@ public class BigQueryUtilsTest { private static final TableFieldSchema BINARY = new TableFieldSchema().setName("binary").setType(StandardSQLTypeName.BYTES.toString()); + private static final TableFieldSchema RAW_BYTES = + new TableFieldSchema().setName("raw_bytes").setType(StandardSQLTypeName.BYTES.toString()); + private static final TableFieldSchema NUMERIC = new TableFieldSchema().setName("numeric").setType(StandardSQLTypeName.NUMERIC.toString()); @@ -246,6 +250,7 @@ public class BigQueryUtilsTest { TIME_0S_0NS, VALID, BINARY, + RAW_BYTES, NUMERIC, BOOLEAN, LONG, @@ -276,6 +281,7 @@ public class BigQueryUtilsTest { TIME_0S_0NS, VALID, BINARY, + RAW_BYTES, NUMERIC, BOOLEAN, LONG, @@ -316,6 +322,7 @@ public class BigQueryUtilsTest { LocalTime.parse("12:34"), false, Base64.getDecoder().decode("ABCD1234"), + Base64.getDecoder().decode("ABCD1234"), new BigDecimal("123.456").setScale(3, RoundingMode.HALF_UP), true, 123L, @@ -346,6 +353,7 @@ public class BigQueryUtilsTest { .set("time0s_0ns", "12:34:00") .set("valid", "false") .set("binary", "ABCD1234") + .set("raw_bytes", Base64.getDecoder().decode("ABCD1234")) .set("numeric", "123.456") .set("boolean", true) .set("long", 123L) @@ -355,7 +363,7 @@ public class BigQueryUtilsTest { Row.withSchema(FLAT_TYPE) .addValues( null, null, null, null, null, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, null, null, null) + null, null, null, null, null, null, null, null, null) .build(); private static final TableRow BQ_NULL_FLAT_ROW = @@ -378,6 +386,7 @@ public class BigQueryUtilsTest { .set("time0s_0ns", null) .set("valid", null) .set("binary", null) + .set("raw_bytes", null) .set("numeric", null) .set("boolean", null) .set("long", null) @@ -457,6 +466,7 @@ public class BigQueryUtilsTest { TIME_0S_0NS, VALID, BINARY, + RAW_BYTES, NUMERIC, BOOLEAN, LONG, @@ -512,6 +522,7 @@ public void testToTableSchema_flat() { TIME_0S_0NS, VALID, BINARY, + RAW_BYTES, NUMERIC, BOOLEAN, LONG, @@ -562,6 +573,7 @@ public void testToTableSchema_row() { TIME_0S_0NS, VALID, BINARY, + RAW_BYTES, NUMERIC, BOOLEAN, LONG, @@ -598,6 +610,7 @@ public void testToTableSchema_array_row() { TIME_0S_0NS, VALID, BINARY, + RAW_BYTES, NUMERIC, BOOLEAN, LONG, @@ -620,7 +633,7 @@ public void testToTableSchema_map() { public void testToTableRow_flat() { TableRow row = toTableRow().apply(FLAT_ROW); - assertThat(row.size(), equalTo(22)); + assertThat(row.size(), equalTo(23)); assertThat(row, hasEntry("id", "123")); assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876")); @@ -635,6 +648,7 @@ public void testToTableRow_flat() { assertThat(row, hasEntry("name", "test")); assertThat(row, hasEntry("valid", "false")); assertThat(row, hasEntry("binary", "ABCD1234")); + assertThat(row, hasEntry("raw_bytes", "ABCD1234")); assertThat(row, hasEntry("numeric", "123.456")); assertThat(row, hasEntry("boolean", "true")); assertThat(row, hasEntry("long", "123")); @@ -674,7 +688,7 @@ public void testToTableRow_row() { assertThat(row.size(), equalTo(1)); row = (TableRow) row.get("row"); - assertThat(row.size(), equalTo(22)); + assertThat(row.size(), equalTo(23)); assertThat(row, hasEntry("id", "123")); assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876")); @@ -689,6 +703,7 @@ public void testToTableRow_row() { assertThat(row, hasEntry("name", "test")); assertThat(row, hasEntry("valid", "false")); assertThat(row, hasEntry("binary", "ABCD1234")); + assertThat(row, hasEntry("raw_bytes", "ABCD1234")); assertThat(row, hasEntry("numeric", "123.456")); assertThat(row, hasEntry("boolean", "true")); assertThat(row, hasEntry("long", "123")); @@ -701,7 +716,7 @@ public void testToTableRow_array_row() { assertThat(row.size(), equalTo(1)); row = ((List) row.get("rows")).get(0); - assertThat(row.size(), equalTo(22)); + assertThat(row.size(), equalTo(23)); assertThat(row, hasEntry("id", "123")); assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876")); @@ -716,6 +731,7 @@ public void testToTableRow_array_row() { assertThat(row, hasEntry("name", "test")); assertThat(row, hasEntry("valid", "false")); assertThat(row, hasEntry("binary", "ABCD1234")); + assertThat(row, hasEntry("raw_bytes", "ABCD1234")); assertThat(row, hasEntry("numeric", "123.456")); assertThat(row, hasEntry("boolean", "true")); assertThat(row, hasEntry("long", "123")); @@ -726,7 +742,7 @@ public void testToTableRow_array_row() { public void testToTableRow_null_row() { TableRow row = toTableRow().apply(NULL_FLAT_ROW); - assertThat(row.size(), equalTo(22)); + assertThat(row.size(), equalTo(23)); assertThat(row, hasEntry("id", null)); assertThat(row, hasEntry("value", null)); assertThat(row, hasEntry("name", null)); @@ -745,6 +761,7 @@ public void testToTableRow_null_row() { assertThat(row, hasEntry("time0s_0ns", null)); assertThat(row, hasEntry("valid", null)); assertThat(row, hasEntry("binary", null)); + assertThat(row, hasEntry("raw_bytes", null)); assertThat(row, hasEntry("numeric", null)); assertThat(row, hasEntry("boolean", null)); assertThat(row, hasEntry("long", null)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java new file mode 100644 index 000000000000..012afed6fb43 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; + +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class FileLoadsStreamingIT { + private static final Logger LOG = LoggerFactory.getLogger(FileLoadsStreamingIT.class); + + @Parameterized.Parameters + public static Iterable data() { + return ImmutableList.of(new Object[] {false}, new Object[] {true}); + } + + @Parameterized.Parameter(0) + public boolean useInputSchema; + + @Rule public TestName testName = new TestName(); + + private static final BigqueryClient BQ_CLIENT = new BigqueryClient("FileLoadsStreamingIT"); + private static final String PROJECT = + TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + private static final String BIG_QUERY_DATASET_ID = "file_loads_streaming_it_" + System.nanoTime(); + + private static final String[] FIELDS = { + "BOOL", + "BOOLEAN", + "BYTES", + "INT64", + "INTEGER", + "FLOAT", + "FLOAT64", + "NUMERIC", + "STRING", + "DATE", + "TIMESTAMP" + }; + + private static final int TOTAL_N = 50; + + private final Random randomGenerator = new Random(); + + @BeforeClass + public static void setUpTestEnvironment() throws IOException, InterruptedException { + // Create one BQ dataset for all test cases. + cleanUp(); + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID); + } + + @AfterClass + public static void cleanUp() { + BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID); + } + + static class GenerateRowFunc implements SerializableFunction { + private final List fieldNames; + + public GenerateRowFunc(List fieldNames) { + this.fieldNames = fieldNames; + } + + @Override + public TableRow apply(Long rowId) { + TableRow row = new TableRow(); + row.set("id", rowId); + + for (String name : fieldNames) { + String type = Iterables.get(Splitter.on('_').split(name), 0); + switch (type) { + case "BOOL": + case "BOOLEAN": + if (rowId % 2 == 0) { + row.set(name, false); + } else { + row.set(name, true); + } + break; + case "BYTES": + row.set(name, String.format("test_blob_%s", rowId).getBytes(StandardCharsets.UTF_8)); + break; + case "INT64": + case "INTEGER": + row.set(name, String.valueOf(rowId + 10)); + break; + case "FLOAT": + case "FLOAT64": + row.set(name, String.valueOf(0.5 + rowId)); + break; + case "NUMERIC": + row.set(name, String.valueOf(rowId + 0.12345)); + break; + case "DATE": + row.set(name, "2022-01-01"); + break; + case "TIMESTAMP": + row.set(name, "2022-01-01 10:10:10.012 UTC"); + break; + case "STRING": + row.set(name, "test_string" + rowId); + break; + default: + row.set(name, "unknown" + rowId); + break; + } + } + return row; + } + } + + private static TableSchema makeTableSchemaFromTypes(List fieldNames) { + ImmutableList.Builder builder = ImmutableList.builder(); + + // Add an id field for verification of correctness + builder.add(new TableFieldSchema().setType("INTEGER").setName("id").setMode("REQUIRED")); + + // the name is prefix with type_. + for (String name : fieldNames) { + String mode = "REQUIRED"; + builder.add(new TableFieldSchema().setType(name).setName(name).setMode(mode)); + } + + return new TableSchema().setFields(builder.build()); + } + + private String maybeCreateTable(TableSchema tableSchema, String suffix) + throws IOException, InterruptedException { + String tableId = Iterables.get(Splitter.on('[').split(testName.getMethodName()), 0); + + BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, tableId + suffix); + if (!useInputSchema) { + BQ_CLIENT.createNewTable( + PROJECT, + BIG_QUERY_DATASET_ID, + new Table() + .setSchema(tableSchema) + .setTableReference( + new TableReference() + .setTableId(tableId + suffix) + .setDatasetId(BIG_QUERY_DATASET_ID) + .setProjectId(PROJECT))); + } else { + tableId += "WithInputSchema"; + } + return String.format("%s.%s.%s", PROJECT, BIG_QUERY_DATASET_ID, tableId + suffix); + } + + private void runStreaming(int numFileShards, boolean useCopyJobs) + throws IOException, InterruptedException { + TestPipelineOptions opts = TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + opts.setTempLocation(opts.getTempRoot()); + Pipeline p = Pipeline.create(opts); + + // Only run the most relevant test case on Dataflow. + // Testing this dimension on DirectRunner is sufficient + if (p.getOptions().getRunner().getName().contains("DataflowRunner")) { + assumeTrue("Skipping in favor of more relevant test case", useInputSchema); + // Need to manually enable streaming engine for legacy dataflow runner + ExperimentalOptions.addExperiment( + p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT); + } + + List fieldNamesOrigin = Arrays.asList(FIELDS); + // Shuffle the fields in the write schema to do fuzz testing on field order + List fieldNamesShuffled = new ArrayList(fieldNamesOrigin); + Collections.shuffle(fieldNamesShuffled, randomGenerator); + + TableSchema bqTableSchema = makeTableSchemaFromTypes(fieldNamesOrigin); + TableSchema inputSchema = makeTableSchemaFromTypes(fieldNamesShuffled); + String tableSpec = maybeCreateTable(bqTableSchema, ""); + + // set up and build pipeline + Instant start = new Instant(0); + GenerateRowFunc generateRowFunc = new GenerateRowFunc(fieldNamesShuffled); + PCollection instants = + p.apply( + "Generate Instants", + PeriodicImpulse.create() + .startAt(start) + .stopAt(start.plus(Duration.standardSeconds(TOTAL_N - 1))) + .withInterval(Duration.standardSeconds(1)) + .catchUpToNow(false)); + PCollection rows = + instants.apply( + "Create TableRows", + MapElements.into(TypeDescriptor.of(TableRow.class)) + .via(instant -> generateRowFunc.apply(instant.getMillis() / 1000))); + // build write transform + Write write = + BigQueryIO.writeTableRows() + .to(tableSpec) + .withMethod(Write.Method.FILE_LOADS) + .withTriggeringFrequency(Duration.standardSeconds(10)); + if (useCopyJobs) { + write = write.withMaxBytesPerPartition(250); + } + if (useInputSchema) { + // we're creating the table with the input schema + write = + write + .withSchema(inputSchema) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE); + } else { + // table already exists with a schema, no need to create it + write = + write + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withWriteDisposition(WriteDisposition.WRITE_APPEND); + } + write = numFileShards == 0 ? write.withAutoSharding() : write.withNumFileShards(numFileShards); + + rows.apply("Stream loads to BigQuery", write); + p.run().waitUntilFinish(); + + List expectedRows = new ArrayList<>(); + for (long i = 0; i < TOTAL_N; i++) { + expectedRows.add(generateRowFunc.apply(i)); + } + + // Perform checks + checkRowCompleteness(tableSpec, inputSchema, expectedRows); + } + + // Check that the expected rows reached the table. + private static void checkRowCompleteness( + String tableSpec, TableSchema schema, List expectedRows) + throws IOException, InterruptedException { + List actualTableRows = + BQ_CLIENT.queryUnflattened( + String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true, false); + + Schema rowSchema = BigQueryUtils.fromTableSchema(schema); + List actualBeamRows = + actualTableRows.stream() + .map(tableRow -> BigQueryUtils.toBeamRow(rowSchema, tableRow)) + .collect(Collectors.toList()); + List expectedBeamRows = + expectedRows.stream() + .map(tableRow -> BigQueryUtils.toBeamRow(rowSchema, tableRow)) + .collect(Collectors.toList()); + LOG.info( + "Actual rows number: {}, expected: {}", actualBeamRows.size(), expectedBeamRows.size()); + + assertThat( + "Comparing expected rows with actual rows", + actualBeamRows, + containsInAnyOrder(expectedBeamRows.toArray())); + assertEquals( + "Checking there is no duplication", expectedBeamRows.size(), actualBeamRows.size()); + } + + @Test + public void testLoadWithFixedShards() throws IOException, InterruptedException { + runStreaming(5, false); + } + + @Test + public void testLoadWithAutoShardingAndCopyJobs() throws IOException, InterruptedException { + runStreaming(0, true); + } + + @Test + public void testDynamicDestinationsWithFixedShards() throws IOException, InterruptedException { + runStreamingToDynamicDestinations(6, false); + } + + @Test + public void testDynamicDestinationsWithAutoShardingAndCopyJobs() + throws IOException, InterruptedException { + runStreamingToDynamicDestinations(0, true); + } + + private void runStreamingToDynamicDestinations(int numFileShards, boolean useCopyJobs) + throws IOException, InterruptedException { + TestPipelineOptions opts = TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + opts.setTempLocation(opts.getTempRoot()); + Pipeline p = Pipeline.create(opts); + // Only run the most relevant test cases on Dataflow. Testing this dimension on DirectRunner is + // sufficient + if (p.getOptions().getRunner().getName().contains("DataflowRunner")) { + assumeTrue("Skipping in favor of more relevant test case", useInputSchema); + // Need to manually enable streaming engine for legacy dataflow runner + ExperimentalOptions.addExperiment( + p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT); + } + + List allFields = Arrays.asList(FIELDS); + List subFields0 = new ArrayList<>(allFields.subList(0, 4)); + List subFields1 = new ArrayList<>(allFields.subList(4, 8)); + List subFields2 = new ArrayList<>(allFields.subList(8, 11)); + TableSchema table0Schema = makeTableSchemaFromTypes(subFields0); + TableSchema table1Schema = makeTableSchemaFromTypes(subFields1); + TableSchema table2Schema = makeTableSchemaFromTypes(subFields2); + String table0Id = maybeCreateTable(table0Schema, "-0"); + String table1Id = maybeCreateTable(table1Schema, "-1"); + String table2Id = maybeCreateTable(table2Schema, "-2"); + GenerateRowFunc generateRowFunc0 = new GenerateRowFunc(subFields0); + GenerateRowFunc generateRowFunc1 = new GenerateRowFunc(subFields1); + GenerateRowFunc generateRowFunc2 = new GenerateRowFunc(subFields2); + + String tablePrefix = table0Id.substring(0, table0Id.length() - 2); + + // set up and build pipeline + Instant start = new Instant(0); + PCollection instants = + p.apply( + "Generate Instants", + PeriodicImpulse.create() + .startAt(start) + .stopAt(start.plus(Duration.standardSeconds(TOTAL_N - 1))) + .withInterval(Duration.standardSeconds(1)) + .catchUpToNow(false)); + PCollection longs = + instants.apply( + "Create TableRows", + MapElements.into(TypeDescriptors.longs()).via(instant -> instant.getMillis() / 1000)); + // build write transform + Write write = + BigQueryIO.write() + .to( + new TestDynamicDest( + tablePrefix, subFields0, subFields1, subFields2, useInputSchema)) + .withFormatFunction( + id -> { + long dest = id % 3; + TableRow row; + if (dest == 0) { + row = generateRowFunc0.apply(id); + } else if (dest == 1) { + row = generateRowFunc1.apply(id); + } else { + row = generateRowFunc2.apply(id); + } + return row; + }) + .withMethod(Write.Method.FILE_LOADS) + .withTriggeringFrequency(Duration.standardSeconds(10)); + if (useCopyJobs) { + write = write.withMaxBytesPerPartition(150); + } + if (useInputSchema) { + // we're creating the table with the input schema + write = + write + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE); + } else { + // table already exists with a schema, no need to create it + write = + write + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withWriteDisposition(WriteDisposition.WRITE_APPEND); + } + write = numFileShards == 0 ? write.withAutoSharding() : write.withNumFileShards(numFileShards); + + longs.apply("Stream loads to dynamic destinations", write); + p.run().waitUntilFinish(); + + List expectedRows0 = new ArrayList<>(); + List expectedRows1 = new ArrayList<>(); + List expectedRows2 = new ArrayList<>(); + for (long i = 0; i < TOTAL_N; i++) { + long dest = i % 3; + if (dest == 0) { + expectedRows0.add(generateRowFunc0.apply(i)); + } else if (dest == 1) { + expectedRows1.add(generateRowFunc1.apply(i)); + } else { + expectedRows2.add(generateRowFunc2.apply(i)); + } + } + // Perform checks + checkRowCompleteness(table0Id, makeTableSchemaFromTypes(subFields0), expectedRows0); + checkRowCompleteness(table1Id, makeTableSchemaFromTypes(subFields1), expectedRows1); + checkRowCompleteness(table2Id, makeTableSchemaFromTypes(subFields2), expectedRows2); + } + + static class TestDynamicDest extends DynamicDestinations { + String tablePrefix; + List table0Fields; + List table1Fields; + List table2Fields; + boolean useInputSchema; + + public TestDynamicDest( + String tablePrefix, + List table0Fields, + List table1Fields, + List table2Fields, + boolean useInputSchema) { + this.tablePrefix = tablePrefix; + this.table0Fields = table0Fields; + this.table1Fields = table1Fields; + this.table2Fields = table2Fields; + this.useInputSchema = useInputSchema; + } + + @Override + public Long getDestination(@Nullable ValueInSingleWindow element) { + return element.getValue() % 3; + } + + @Override + public TableDestination getTable(Long destination) { + return new TableDestination(tablePrefix + "-" + destination, null); + } + + @Override + public @Nullable TableSchema getSchema(Long destination) { + if (!useInputSchema) { + return null; + } + List fields; + if (destination == 0) { + fields = table0Fields; + } else if (destination == 1) { + fields = table1Fields; + } else { + fields = table2Fields; + } + List tableFields = + fields.stream() + .map(name -> new TableFieldSchema().setName(name).setType(name).setMode("REQUIRED")) + .collect(Collectors.toList()); + // we attach an ID to each row in addition to the existing schema fields + tableFields.add( + 0, new TableFieldSchema().setName("id").setType("INTEGER").setMode("REQUIRED")); + return new TableSchema().setFields(tableFields); + } + } +}