diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 2acce3e94cc2..b8e71e289827 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -198,6 +198,7 @@ task integrationTest(type: Test, dependsOn: processTestResources) { "--runner=DirectRunner", "--project=${gcpProject}", "--tempRoot=${gcpTempRoot}", + "--tempLocation=${gcpTempRoot}", "--firestoreDb=${firestoreDb}", "--firestoreHost=${firestoreHost}", "--bigtableChangeStreamInstanceId=${bigtableChangeStreamInstanceId}", diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java index 7fdcabf5c695..3ba91b92a904 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java @@ -27,6 +27,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration; +import org.apache.beam.sdk.io.gcp.bigquery.providers.PortableBigQueryDestinations; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -88,19 +90,20 @@ protected static class BigQueryWriteSchemaTransform extends SchemaTransform { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { PCollection rowPCollection = input.getSinglePCollection(); - BigQueryIO.Write write = toWrite(); + BigQueryIO.Write write = toWrite(rowPCollection.getSchema()); rowPCollection.apply(write); return PCollectionRowTuple.empty(input.getPipeline()); } - BigQueryIO.Write toWrite() { + BigQueryIO.Write toWrite(Schema schema) { + PortableBigQueryDestinations dynamicDestinations = + new PortableBigQueryDestinations(schema, configuration); BigQueryIO.Write write = BigQueryIO.write() - .to(configuration.getTable()) + .to(dynamicDestinations) .withMethod(BigQueryIO.Write.Method.FILE_LOADS) - .withFormatFunction(BigQueryUtils.toTableRow()) - .useBeamSchema(); + .withFormatFunction(dynamicDestinations.getFilterFormatFunction(false)); if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { CreateDisposition createDisposition = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index eba9dd61d510..59ecaee48bc1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -21,13 +21,10 @@ import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import com.google.api.services.bigquery.model.TableConstraints; -import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.service.AutoService; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Optional; import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; @@ -36,9 +33,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; -import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation; -import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -58,7 +53,6 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; -import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.joda.time.Duration; @@ -180,52 +174,6 @@ private static class NoOutputDoFn extends DoFn { public void process(ProcessContext c) {} } - private static class RowDynamicDestinations extends DynamicDestinations { - final Schema schema; - final String fixedDestination; - final List primaryKey; - - RowDynamicDestinations(Schema schema) { - this.schema = schema; - this.fixedDestination = null; - this.primaryKey = null; - } - - public RowDynamicDestinations( - Schema schema, String fixedDestination, List primaryKey) { - this.schema = schema; - this.fixedDestination = fixedDestination; - this.primaryKey = primaryKey; - } - - @Override - public String getDestination(ValueInSingleWindow element) { - return Optional.ofNullable(fixedDestination) - .orElseGet(() -> element.getValue().getString("destination")); - } - - @Override - public TableDestination getTable(String destination) { - return new TableDestination(destination, null); - } - - @Override - public TableSchema getSchema(String destination) { - return BigQueryUtils.toTableSchema(schema); - } - - @Override - public TableConstraints getTableConstraints(String destination) { - return Optional.ofNullable(this.primaryKey) - .filter(pk -> !pk.isEmpty()) - .map( - pk -> - new TableConstraints() - .setPrimaryKey(new TableConstraints.PrimaryKey().setColumns(pk))) - .orElse(null); - } - } - @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { // Check that the input exists @@ -327,13 +275,6 @@ public Row getConfigurationRow() { } } - void validateDynamicDestinationsExpectedSchema(Schema schema) { - checkArgument( - schema.getFieldNames().containsAll(Arrays.asList("destination", "record")), - "When writing to dynamic destinations, we expect Row Schema with a " - + "\"destination\" string field and a \"record\" Row field."); - } - BigQueryIO.Write createStorageWriteApiTransform(Schema schema) { Method writeMethod = configuration.getUseAtLeastOnceSemantics() != null @@ -344,21 +285,37 @@ BigQueryIO.Write createStorageWriteApiTransform(Schema schema) { BigQueryIO.Write write = BigQueryIO.write() .withMethod(writeMethod) - .withFormatFunction(BigQueryUtils.toTableRow()) .withWriteDisposition(WriteDisposition.WRITE_APPEND); - // in case CDC writes are configured we validate and include them in the configuration - if (Optional.ofNullable(configuration.getUseCdcWrites()).orElse(false)) { - write = validateAndIncludeCDCInformation(write, schema); - } else if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { - validateDynamicDestinationsExpectedSchema(schema); + Schema rowSchema = schema; + boolean fetchNestedRecord = false; + if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { + validateDynamicDestinationsSchema(schema); + rowSchema = schema.getField("record").getType().getRowSchema(); + fetchNestedRecord = true; + } + if (Boolean.TRUE.equals(configuration.getUseCdcWrites())) { + validateCdcSchema(schema); + rowSchema = schema.getField("record").getType().getRowSchema(); + fetchNestedRecord = true; write = write - .to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema())) - .withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record"))); - } else { - write = write.to(configuration.getTable()).useBeamSchema(); + .withPrimaryKey(configuration.getPrimaryKey()) + .withRowMutationInformationFn( + row -> + RowMutationInformation.of( + RowMutationInformation.MutationType.valueOf( + row.getRow(ROW_PROPERTY_MUTATION_INFO) + .getString(ROW_PROPERTY_MUTATION_TYPE)), + row.getRow(ROW_PROPERTY_MUTATION_INFO) + .getString(ROW_PROPERTY_MUTATION_SQN))); } + PortableBigQueryDestinations dynamicDestinations = + new PortableBigQueryDestinations(rowSchema, configuration); + write = + write + .to(dynamicDestinations) + .withFormatFunction(dynamicDestinations.getFilterFormatFunction(fetchNestedRecord)); if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { CreateDisposition createDisposition = @@ -381,8 +338,14 @@ BigQueryIO.Write createStorageWriteApiTransform(Schema schema) { return write; } - BigQueryIO.Write validateAndIncludeCDCInformation( - BigQueryIO.Write write, Schema schema) { + void validateDynamicDestinationsSchema(Schema schema) { + checkArgument( + schema.getFieldNames().containsAll(Arrays.asList("destination", "record")), + "When writing to dynamic destinations, we expect Row Schema with a " + + "\"destination\" string field and a \"record\" Row field."); + } + + private void validateCdcSchema(Schema schema) { checkArgument( schema.getFieldNames().containsAll(Arrays.asList(ROW_PROPERTY_MUTATION_INFO, "record")), "When writing using CDC functionality, we expect Row Schema with a " @@ -390,10 +353,10 @@ BigQueryIO.Write validateAndIncludeCDCInformation( + ROW_PROPERTY_MUTATION_INFO + "\" Row field and a \"record\" Row field."); - Schema rowSchema = schema.getField(ROW_PROPERTY_MUTATION_INFO).getType().getRowSchema(); + Schema mutationSchema = schema.getField(ROW_PROPERTY_MUTATION_INFO).getType().getRowSchema(); checkArgument( - rowSchema.equals(ROW_SCHEMA_MUTATION_INFO), + mutationSchema != null && mutationSchema.equals(ROW_SCHEMA_MUTATION_INFO), "When writing using CDC functionality, we expect a \"" + ROW_PROPERTY_MUTATION_INFO + "\" field of Row type with schema:\n" @@ -402,31 +365,7 @@ BigQueryIO.Write validateAndIncludeCDCInformation( + "Received \"" + ROW_PROPERTY_MUTATION_INFO + "\" field with schema:\n" - + rowSchema.toString()); - - String tableDestination = null; - - if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { - validateDynamicDestinationsExpectedSchema(schema); - } else { - tableDestination = configuration.getTable(); - } - - return write - .to( - new RowDynamicDestinations( - schema.getField("record").getType().getRowSchema(), - tableDestination, - configuration.getPrimaryKey())) - .withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record"))) - .withPrimaryKey(configuration.getPrimaryKey()) - .withRowMutationInformationFn( - row -> - RowMutationInformation.of( - RowMutationInformation.MutationType.valueOf( - row.getRow(ROW_PROPERTY_MUTATION_INFO) - .getString(ROW_PROPERTY_MUTATION_TYPE)), - row.getRow(ROW_PROPERTY_MUTATION_INFO).getString(ROW_PROPERTY_MUTATION_SQN))); + + mutationSchema); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java index acc5b1ff6ea4..0578d893c112 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java @@ -18,20 +18,18 @@ package org.apache.beam.sdk.io.gcp.bigquery.providers; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Configuration for writing to BigQuery with SchemaTransforms. Used by {@link @@ -68,11 +66,6 @@ public void validate() { !Strings.isNullOrEmpty(this.getTable()), invalidConfigMessage + "Table spec for a BigQuery Write must be specified."); - // if we have an input table spec, validate it - if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) { - checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable())); - } - // validate create and write dispositions String createDisposition = getCreateDisposition(); if (createDisposition != null && !createDisposition.isEmpty()) { @@ -186,6 +179,21 @@ public static Builder builder() { @Nullable public abstract List getPrimaryKey(); + @SchemaFieldDescription( + "A list of field names to keep in the input record. All other fields are dropped before writing. " + + "Is mutually exclusive with 'drop' and 'only'.") + public abstract @Nullable List getKeep(); + + @SchemaFieldDescription( + "A list of field names to drop from the input record before writing. " + + "Is mutually exclusive with 'keep' and 'only'.") + public abstract @Nullable List getDrop(); + + @SchemaFieldDescription( + "The name of a single record field that should be written. " + + "Is mutually exclusive with 'keep' and 'drop'.") + public abstract @Nullable String getOnly(); + /** Builder for {@link BigQueryWriteConfiguration}. */ @AutoValue.Builder public abstract static class Builder { @@ -212,6 +220,12 @@ public abstract static class Builder { public abstract Builder setPrimaryKey(List pkColumns); + public abstract Builder setKeep(List keep); + + public abstract Builder setDrop(List drop); + + public abstract Builder setOnly(String only); + /** Builds a {@link BigQueryWriteConfiguration} instance. */ public abstract BigQueryWriteConfiguration build(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java new file mode 100644 index 000000000000..f92f0085c77c --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery.providers; + +import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import com.google.api.services.bigquery.model.TableConstraints; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.util.List; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; +import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; +import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.RowFilter; +import org.apache.beam.sdk.util.RowStringInterpolator; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +@Internal +public class PortableBigQueryDestinations extends DynamicDestinations { + private @MonotonicNonNull RowStringInterpolator interpolator = null; + private final @Nullable List primaryKey; + private final RowFilter rowFilter; + + public PortableBigQueryDestinations(Schema rowSchema, BigQueryWriteConfiguration configuration) { + // DYNAMIC_DESTINATIONS magic string is the old way of doing it for cross-language. + // In that case, we do no interpolation + if (!configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { + this.interpolator = new RowStringInterpolator(configuration.getTable(), rowSchema); + } + this.primaryKey = configuration.getPrimaryKey(); + RowFilter rf = new RowFilter(rowSchema); + if (configuration.getDrop() != null) { + rf = rf.drop(checkStateNotNull(configuration.getDrop())); + } + if (configuration.getKeep() != null) { + rf = rf.keep(checkStateNotNull(configuration.getKeep())); + } + if (configuration.getOnly() != null) { + rf = rf.only(checkStateNotNull(configuration.getOnly())); + } + this.rowFilter = rf; + } + + @Override + public String getDestination(@Nullable ValueInSingleWindow element) { + if (interpolator != null) { + return interpolator.interpolate(checkArgumentNotNull(element)); + } + return checkStateNotNull(checkStateNotNull(element).getValue().getString("destination")); + } + + @Override + public TableDestination getTable(String destination) { + return new TableDestination(destination, null); + } + + @Override + public @Nullable TableSchema getSchema(String destination) { + return BigQueryUtils.toTableSchema(rowFilter.outputSchema()); + } + + @Override + public @Nullable TableConstraints getTableConstraints(String destination) { + if (primaryKey != null) { + return new TableConstraints() + .setPrimaryKey(new TableConstraints.PrimaryKey().setColumns(primaryKey)); + } + return null; + } + + public SerializableFunction getFilterFormatFunction(boolean fetchNestedRecord) { + return row -> { + if (fetchNestedRecord) { + row = checkStateNotNull(row.getRow("record")); + } + Row filtered = rowFilter.filter(row); + return BigQueryUtils.toTableRow(filtered); + }; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryManagedIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryManagedIT.java index 1ae97aaeee2e..b9921c2b78ca 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryManagedIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryManagedIT.java @@ -17,7 +17,12 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -29,10 +34,11 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.RowFilter; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; @@ -58,17 +64,14 @@ public class BigQueryManagedIT { private static final Schema SCHEMA = Schema.of( Schema.Field.of("str", Schema.FieldType.STRING), - Schema.Field.of("number", Schema.FieldType.INT64)); + Schema.Field.of("number", Schema.FieldType.INT64), + Schema.Field.of("dest", Schema.FieldType.INT64)); + + private static final SerializableFunction ROW_FUNC = + l -> Row.withSchema(SCHEMA).addValue(Long.toString(l)).addValue(l).addValue(l % 3).build(); private static final List ROWS = - LongStream.range(0, 20) - .mapToObj( - i -> - Row.withSchema(SCHEMA) - .withFieldValue("str", Long.toString(i)) - .withFieldValue("number", i) - .build()) - .collect(Collectors.toList()); + LongStream.range(0, 20).mapToObj(ROW_FUNC::apply).collect(Collectors.toList()); private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryManagedIT"); @@ -86,16 +89,13 @@ public static void setUpTestEnvironment() throws IOException, InterruptedExcepti public static void cleanup() { BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID); } + @Test public void testBatchFileLoadsWriteRead() { String table = String.format("%s:%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName()); Map config = ImmutableMap.of("table", table); - // file loads requires a GCS temp location - String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot(); - writePipeline.getOptions().setTempLocation(tempLocation); - // batch write PCollectionRowTuple.of("input", getInput(writePipeline, false)) .apply(Managed.write(Managed.BIGQUERY).withConfig(config)); @@ -132,6 +132,53 @@ public void testStreamingStorageWriteRead() { readPipeline.run().waitUntilFinish(); } + public void testDynamicDestinations(boolean streaming) throws IOException, InterruptedException { + String baseTableName = + String.format("%s:%s.dynamic_" + System.nanoTime(), PROJECT, BIG_QUERY_DATASET_ID); + String destinationTemplate = baseTableName + "_{dest}"; + Map config = + ImmutableMap.of("table", destinationTemplate, "drop", Collections.singletonList("dest")); + + // write + PCollectionRowTuple.of("input", getInput(writePipeline, streaming)) + .apply(Managed.write(Managed.BIGQUERY).withConfig(config)); + writePipeline.run().waitUntilFinish(); + + List destinations = + Arrays.asList(baseTableName + "_0", baseTableName + "_1", baseTableName + "_2"); + + // read and validate each table destination + RowFilter rowFilter = new RowFilter(SCHEMA).drop(Collections.singletonList("dest")); + for (int i = 0; i < destinations.size(); i++) { + long mod = i; + String dest = destinations.get(i); + List writtenRows = + BQ_CLIENT + .queryUnflattened(String.format("SELECT * FROM [%s]", dest), PROJECT, true, false) + .stream() + .map(tableRow -> BigQueryUtils.toBeamRow(rowFilter.outputSchema(), tableRow)) + .collect(Collectors.toList()); + + List expectedRecords = + ROWS.stream() + .filter(row -> row.getInt64("dest") == mod) + .map(rowFilter::filter) + .collect(Collectors.toList()); + + assertThat(writtenRows, containsInAnyOrder(expectedRecords.toArray())); + } + } + + @Test + public void testStreamingDynamicDestinations() throws IOException, InterruptedException { + testDynamicDestinations(true); + } + + @Test + public void testBatchDynamicDestinations() throws IOException, InterruptedException { + testDynamicDestinations(false); + } + public PCollection getInput(Pipeline p, boolean isStreaming) { if (isStreaming) { return p.apply( @@ -139,14 +186,7 @@ public PCollection getInput(Pipeline p, boolean isStreaming) { .startAt(new Instant(0)) .stopAt(new Instant(19)) .withInterval(Duration.millis(1))) - .apply( - MapElements.into(TypeDescriptors.rows()) - .via( - i -> - Row.withSchema(SCHEMA) - .withFieldValue("str", Long.toString(i.getMillis())) - .withFieldValue("number", i.getMillis()) - .build())) + .apply(MapElements.into(TypeDescriptors.rows()).via(i -> ROW_FUNC.apply(i.getMillis()))) .setRowSchema(SCHEMA); } return p.apply(Create.of(ROWS)).setRowSchema(SCHEMA); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index 3a23f5a3205a..b845df738fd6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -34,6 +34,7 @@ import java.util.stream.Stream; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; @@ -51,6 +52,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.util.RowFilter; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; @@ -109,7 +111,6 @@ public void setUp() throws Exception { public void testInvalidConfig() { List invalidConfigs = Arrays.asList( - BigQueryWriteConfiguration.builder().setTable("not_a_valid_table_spec"), BigQueryWriteConfiguration.builder() .setTable("project:dataset.table") .setCreateDisposition("INVALID_DISPOSITION")); @@ -163,10 +164,7 @@ public Boolean rowsEquals(List expectedRows, List actualRows) { } public boolean rowEquals(Row expectedRow, TableRow actualRow) { - return expectedRow.getValue("name").equals(actualRow.get("name")) - && expectedRow - .getValue("number") - .equals(Long.parseLong(actualRow.get("number").toString())); + return expectedRow.equals(BigQueryUtils.toBeamRow(expectedRow.getSchema(), actualRow)); } @Test @@ -220,6 +218,33 @@ public void testWriteToDynamicDestinations() throws Exception { fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_3").get(0))); } + @Test + public void testWriteToPortableDynamicDestinations() throws Exception { + String destinationTemplate = "project:dataset.dynamic_write_{name}_{number}"; + BigQueryWriteConfiguration config = + BigQueryWriteConfiguration.builder() + .setTable(destinationTemplate) + .setKeep(Arrays.asList("number", "dt")) + .build(); + + runWithConfig(config); + p.run().waitUntilFinish(); + + RowFilter rowFilter = new RowFilter(SCHEMA).keep(Arrays.asList("number", "dt")); + assertTrue( + rowEquals( + rowFilter.filter(ROWS.get(0)), + fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_a_1").get(0))); + assertTrue( + rowEquals( + rowFilter.filter(ROWS.get(1)), + fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_b_2").get(0))); + assertTrue( + rowEquals( + rowFilter.filter(ROWS.get(2)), + fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_c_3").get(0))); + } + List createCDCUpsertRows(List rows, boolean dynamicDestination, String tablePrefix) { Schema.Builder schemaBuilder =