From b970623edbcd2533477392e2a9ca31944de895c2 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Sun, 25 Aug 2024 00:32:50 -0700 Subject: [PATCH 1/7] Make changes --- .../sdk/io/gcp/bigquery/AppendClientInfo.java | 5 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 21 +++++++- .../StorageApiDynamicDestinationsProto.java | 3 +- .../sdk/io/gcp/bigquery/StorageApiLoads.java | 8 +++ .../StorageApiWriteRecordsInconsistent.java | 8 +++ .../StorageApiWriteUnshardedRecords.java | 49 ++++++++++++------- .../StorageApiWritesShardedRecords.java | 7 ++- .../bigquery/TableRowToStorageApiProto.java | 15 +++--- .../io/gcp/testing/FakeDatasetService.java | 3 +- 9 files changed, 88 insertions(+), 31 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index 7505f77fb5b4..356cfa0f93ca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -28,6 +28,7 @@ import com.google.protobuf.DynamicMessage; import com.google.protobuf.Message; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.beam.sdk.metrics.Counter; @@ -166,12 +167,12 @@ Descriptors.Descriptor getDescriptorIgnoreRequired() { } } - public TableRow toTableRow(ByteString protoBytes) { + public TableRow toTableRow(ByteString protoBytes, Predicate includeField) { try { return TableRowToStorageApiProto.tableRowFromMessage( DynamicMessage.parseFrom( TableRowToStorageApiProto.wrapDescriptorProto(getDescriptor()), protoBytes), - true); + true, includeField); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 71302339af1a..959850b04bf6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -60,6 +60,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.avro.generic.GenericDatumReader; @@ -2238,6 +2239,7 @@ public static Write write() { .setDeterministicRecordIdFn(null) .setMaxRetryJobs(1000) .setPropagateSuccessfulStorageApiWrites(false) + .setPropagateSuccessfulStorageApiWritesPredicate(Predicates.alwaysTrue()) .setDirectWriteProtos(true) .setDefaultMissingValueInterpretation( AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) @@ -2300,7 +2302,7 @@ public static Write writeProtos(Class protoMessageClas throw new IllegalArgumentException("DynamicMessage is not supported."); } return BigQueryIO.write() - .withFormatFunction(m -> TableRowToStorageApiProto.tableRowFromMessage(m, false)) + .withFormatFunction(m -> TableRowToStorageApiProto.tableRowFromMessage(m, false, Predicates.alwaysTrue())) .withWriteProtosClass(protoMessageClass); } @@ -2398,6 +2400,8 @@ public enum Method { abstract boolean getPropagateSuccessfulStorageApiWrites(); + abstract Predicate getPropagateSuccessfulStorageApiWritesPredicate(); + abstract int getMaxFilesPerPartition(); abstract long getMaxBytesPerPartition(); @@ -2508,6 +2512,9 @@ abstract Builder setAvroSchemaFactory( abstract Builder setPropagateSuccessfulStorageApiWrites( boolean propagateSuccessfulStorageApiWrites); + abstract Builder setPropagateSuccessfulStorageApiWritesPredicate( + Predicate columnsToPropagate); + abstract Builder setMaxFilesPerPartition(int maxFilesPerPartition); abstract Builder setMaxBytesPerPartition(long maxBytesPerPartition); @@ -3033,6 +3040,17 @@ public Write withPropagateSuccessfulStorageApiWrites( .build(); } + /** + * If set to true, then all successful writes will be propagated to {@link WriteResult} and + * accessible via the {@link WriteResult#getSuccessfulStorageApiInserts} method. + */ + public Write withPropagateSuccessfulStorageApiWrites(Predicate columnsToPropagate) { + return toBuilder() + .setPropagateSuccessfulStorageApiWrites(true) + .setPropagateSuccessfulStorageApiWritesPredicate(columnsToPropagate) + .build(); + } + /** * Provides a custom location on GCS for storing temporary files to be loaded via BigQuery batch * load jobs. See "Usage with templates" in {@link BigQueryIO} documentation for discussion. @@ -3885,6 +3903,7 @@ private WriteResult continueExpandTyped( getAutoSchemaUpdate(), getIgnoreUnknownValues(), getPropagateSuccessfulStorageApiWrites(), + getPropagateSuccessfulStorageApiWritesPredicate(), getRowMutationInformationFn() != null, getDefaultMissingValueInterpretation(), getBadRecordRouter(), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java index d7359f99b96d..a328ebb3cebf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.checkerframework.checker.nullness.qual.NonNull; /** Storage API DynamicDestinations used when the input is a compiled protocol buffer. */ @@ -106,7 +107,7 @@ public TableRow toFailsafeTableRow(T element) { DynamicMessage.parseFrom( TableRowToStorageApiProto.wrapDescriptorProto(descriptorProto), element.toByteArray()), - true); + true, Predicates.alwaysTrue()); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index 62174b5c917a..78368a43c049 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Predicate; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -58,6 +59,8 @@ public class StorageApiLoads final TupleTag failedRowsTag = new TupleTag<>("failedRows"); @Nullable TupleTag successfulWrittenRowsTag; + @Nullable Predicate successfulRowsPredicate; + private final Coder destinationCoder; private final StorageApiDynamicDestinations dynamicDestinations; @@ -93,6 +96,7 @@ public StorageApiLoads( boolean autoUpdateSchema, boolean ignoreUnknownValues, boolean propagateSuccessfulStorageApiWrites, + Predicate propagateSuccessfulStorageApiWritesPredicate, boolean usesCdc, AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, BadRecordRouter badRecordRouter, @@ -111,6 +115,7 @@ public StorageApiLoads( this.ignoreUnknownValues = ignoreUnknownValues; if (propagateSuccessfulStorageApiWrites) { this.successfulWrittenRowsTag = new TupleTag<>("successfulPublishedRowsTag"); + this.successfulRowsPredicate = propagateSuccessfulStorageApiWritesPredicate; } this.usesCdc = usesCdc; this.defaultMissingValueInterpretation = defaultMissingValueInterpretation; @@ -174,6 +179,7 @@ public WriteResult expandInconsistent( bqServices, failedRowsTag, successfulWrittenRowsTag, + successfulRowsPredicate, BigQueryStorageApiInsertErrorCoder.of(), TableRowJsonCoder.of(), autoUpdateSchema, @@ -271,6 +277,7 @@ public WriteResult expandTriggered( TableRowJsonCoder.of(), failedRowsTag, successfulWrittenRowsTag, + successfulRowsPredicate, autoUpdateSchema, ignoreUnknownValues, defaultMissingValueInterpretation)); @@ -358,6 +365,7 @@ public WriteResult expandUntriggered( bqServices, failedRowsTag, successfulWrittenRowsTag, + successfulRowsPredicate, BigQueryStorageApiInsertErrorCoder.of(), TableRowJsonCoder.of(), autoUpdateSchema, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java index 389c749d4a4d..bc7a7f983143 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java @@ -29,6 +29,8 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import java.util.function.Predicate; + /** * A transform to write sharded records to BigQuery using the Storage API. This transform uses the * default stream to write the records. Records written will show up in BigQuery immediately, @@ -42,6 +44,9 @@ public class StorageApiWriteRecordsInconsistent private final BigQueryServices bqServices; private final TupleTag failedRowsTag; private final @Nullable TupleTag successfulRowsTag; + + private final @Nullable Predicate successfulRowsPredicate; + private final TupleTag> finalizeTag = new TupleTag<>("finalizeTag"); private final Coder failedRowsCoder; private final Coder successfulRowsCoder; @@ -57,6 +62,7 @@ public StorageApiWriteRecordsInconsistent( BigQueryServices bqServices, TupleTag failedRowsTag, @Nullable TupleTag successfulRowsTag, + @Nullable Predicate successfulRowsPredicate, Coder failedRowsCoder, Coder successfulRowsCoder, boolean autoUpdateSchema, @@ -71,6 +77,7 @@ public StorageApiWriteRecordsInconsistent( this.failedRowsCoder = failedRowsCoder; this.successfulRowsCoder = successfulRowsCoder; this.successfulRowsTag = successfulRowsTag; + this.successfulRowsPredicate = successfulRowsPredicate; this.autoUpdateSchema = autoUpdateSchema; this.ignoreUnknownValues = ignoreUnknownValues; this.createDisposition = createDisposition; @@ -103,6 +110,7 @@ public PCollectionTuple expand(PCollection private final BigQueryServices bqServices; private final TupleTag failedRowsTag; private final @Nullable TupleTag successfulRowsTag; + private final @Nullable Predicate successfulRowsPredicate; private final TupleTag> finalizeTag = new TupleTag<>("finalizeTag"); private final Coder failedRowsCoder; private final Coder successfulRowsCoder; @@ -168,6 +171,7 @@ public StorageApiWriteUnshardedRecords( BigQueryServices bqServices, TupleTag failedRowsTag, @Nullable TupleTag successfulRowsTag, + @Nullable Predicate successfulRowsPredicate, Coder failedRowsCoder, Coder successfulRowsCoder, boolean autoUpdateSchema, @@ -180,6 +184,7 @@ public StorageApiWriteUnshardedRecords( this.bqServices = bqServices; this.failedRowsTag = failedRowsTag; this.successfulRowsTag = successfulRowsTag; + this.successfulRowsPredicate = successfulRowsPredicate; this.failedRowsCoder = failedRowsCoder; this.successfulRowsCoder = successfulRowsCoder; this.autoUpdateSchema = autoUpdateSchema; @@ -216,6 +221,7 @@ public PCollectionTuple expand(PCollection private final TupleTag> finalizeTag; private final TupleTag failedRowsTag; private final @Nullable TupleTag successfulRowsTag; + + private final @Nullable Predicate successfulRowsPredicate; private final boolean autoUpdateSchema; private final boolean ignoreUnknownValues; private final BigQueryIO.Write.CreateDisposition createDisposition; @@ -641,7 +649,7 @@ long flush( TableRowToStorageApiProto.wrapDescriptorProto( getAppendClientInfo(true, null).getDescriptor()), rowBytes), - true); + true, successfulRowsPredicate); } org.joda.time.Instant timestamp = insertTimestamps.get(i); failedRowsReceiver.outputWithTimestamp( @@ -725,7 +733,7 @@ long flush( Preconditions.checkStateNotNull(appendClientInfo) .getDescriptor()), protoBytes), - true); + true, Predicates.alwaysTrue()); } element = new BigQueryStorageApiInsertError( @@ -875,7 +883,8 @@ long flush( try { TableRow row = TableRowToStorageApiProto.tableRowFromMessage( - DynamicMessage.parseFrom(descriptor, rowBytes), true); + DynamicMessage.parseFrom(descriptor, rowBytes), true, + successfulRowsPredicate); org.joda.time.Instant timestamp = c.timestamps.get(i); successfulRowsReceiver.outputWithTimestamp(row, timestamp); } catch (Exception e) { @@ -942,23 +951,24 @@ void postFlush() { private int streamAppendClientCount; WriteRecordsDoFn( - String operationName, - StorageApiDynamicDestinations dynamicDestinations, - BigQueryServices bqServices, - boolean useDefaultStream, - int flushThresholdBytes, - int flushThresholdCount, - int streamAppendClientCount, - TupleTag> finalizeTag, - TupleTag failedRowsTag, - @Nullable TupleTag successfulRowsTag, + String operationName, + StorageApiDynamicDestinations dynamicDestinations, + BigQueryServices bqServices, + boolean useDefaultStream, + int flushThresholdBytes, + int flushThresholdCount, + int streamAppendClientCount, + TupleTag> finalizeTag, + TupleTag failedRowsTag, + @Nullable TupleTag successfulRowsTag, + @Nullable Predicate successfulRowsPredicate, boolean autoUpdateSchema, - boolean ignoreUnknownValues, - BigQueryIO.Write.CreateDisposition createDisposition, - @Nullable String kmsKey, - boolean usesCdc, - AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, - int maxRetries) { + boolean ignoreUnknownValues, + BigQueryIO.Write.CreateDisposition createDisposition, + @Nullable String kmsKey, + boolean usesCdc, + AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, + int maxRetries) { this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; @@ -969,6 +979,7 @@ void postFlush() { this.finalizeTag = finalizeTag; this.failedRowsTag = failedRowsTag; this.successfulRowsTag = successfulRowsTag; + this.successfulRowsPredicate = successfulRowsPredicate; this.autoUpdateSchema = autoUpdateSchema; this.ignoreUnknownValues = ignoreUnknownValues; this.createDisposition = createDisposition; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 1ee001d9890f..7e9da8fdbe3c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -48,6 +48,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.beam.sdk.coders.Coder; @@ -133,6 +134,8 @@ public class StorageApiWritesShardedRecords failedRowsTag; private final @Nullable TupleTag successfulRowsTag; + + private final @Nullable Predicate successfulRowsPredicate; private final Coder succussfulRowsCoder; private final TupleTag> flushTag = new TupleTag<>("flushTag"); @@ -225,6 +228,7 @@ public StorageApiWritesShardedRecords( Coder successfulRowsCoder, TupleTag failedRowsTag, @Nullable TupleTag successfulRowsTag, + @Nullable Predicate successfulRowsPredicate, boolean autoUpdateSchema, boolean ignoreUnknownValues, AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) { @@ -236,6 +240,7 @@ public StorageApiWritesShardedRecords( this.failedRowsCoder = failedRowsCoder; this.failedRowsTag = failedRowsTag; this.successfulRowsTag = successfulRowsTag; + this.successfulRowsPredicate = successfulRowsPredicate; this.succussfulRowsCoder = successfulRowsCoder; this.autoUpdateSchema = autoUpdateSchema; this.ignoreUnknownValues = ignoreUnknownValues; @@ -833,7 +838,7 @@ public void process( ByteString protoBytes = context.protoRows.getSerializedRows(i); org.joda.time.Instant timestamp = context.timestamps.get(i); o.get(successfulRowsTag) - .outputWithTimestamp(appendClientInfo.get().toTableRow(protoBytes), timestamp); + .outputWithTimestamp(appendClientInfo.get().toTableRow(protoBytes, successfulRowsPredicate), timestamp); } } }; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index fbc17fb59704..408a309c0b50 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -56,6 +56,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -1092,34 +1093,36 @@ private static long toEpochMicros(Instant timestamp) { } @VisibleForTesting - public static TableRow tableRowFromMessage(Message message, boolean includeCdcColumns) { + public static TableRow tableRowFromMessage(Message message, boolean includeCdcColumns, + Predicate includeField) { // TODO: Would be more correct to generate TableRows using setF. TableRow tableRow = new TableRow(); for (Map.Entry field : message.getAllFields().entrySet()) { FieldDescriptor fieldDescriptor = field.getKey(); Object fieldValue = field.getValue(); - if (includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fieldDescriptor.getName())) { + if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fieldDescriptor.getName())) + && includeField.test(fieldDescriptor.getName())) { tableRow.put( fieldDescriptor.getName(), - jsonValueFromMessageValue(fieldDescriptor, fieldValue, true)); + jsonValueFromMessageValue(fieldDescriptor, fieldValue, true, includeField)); } } return tableRow; } public static Object jsonValueFromMessageValue( - FieldDescriptor fieldDescriptor, Object fieldValue, boolean expandRepeated) { + FieldDescriptor fieldDescriptor, Object fieldValue, boolean expandRepeated, Predicate includeField) { if (expandRepeated && fieldDescriptor.isRepeated()) { List valueList = (List) fieldValue; return valueList.stream() - .map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false)) + .map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false, includeField)) .collect(toList()); } switch (fieldDescriptor.getType()) { case GROUP: case MESSAGE: - return tableRowFromMessage((Message) fieldValue, false); + return tableRowFromMessage((Message) fieldValue, false, includeField); case BYTES: return BaseEncoding.base64().encode(((ByteString) fieldValue).toByteArray()); case ENUM: diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 83c63d1c75ff..ba0933ca5094 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -76,6 +76,7 @@ import org.apache.beam.sdk.values.FailsafeValueInSingleWindow; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBasedTable; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -648,7 +649,7 @@ public ApiFuture appendRows(long offset, ProtoRows rows) } TableRow tableRow = TableRowToStorageApiProto.tableRowFromMessage( - DynamicMessage.parseFrom(protoDescriptor, bytes), false); + DynamicMessage.parseFrom(protoDescriptor, bytes), false, Predicates.alwaysTrue()); if (shouldFailRow.apply(tableRow)) { rowIndexToErrorMessage.put(i, "Failing row " + tableRow.toPrettyString()); } From f92e4fbb804cab7399681d8002d0d0ae068b5a62 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Sun, 25 Aug 2024 19:39:55 -0700 Subject: [PATCH 2/7] fix errors --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 +- .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 5 ++--- .../bigquery/StorageApiWriteRecordsInconsistent.java | 4 ++-- .../gcp/bigquery/StorageApiWriteUnshardedRecords.java | 10 +++++----- .../gcp/bigquery/StorageApiWritesShardedRecords.java | 11 ++++++----- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 959850b04bf6..857e3f60650a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2239,7 +2239,7 @@ public static Write write() { .setDeterministicRecordIdFn(null) .setMaxRetryJobs(1000) .setPropagateSuccessfulStorageApiWrites(false) - .setPropagateSuccessfulStorageApiWritesPredicate(Predicates.alwaysTrue()) + .setPropagateSuccessfulStorageApiWritesPredicate(Predicates.alwaysTrue()) .setDirectWriteProtos(true) .setDefaultMissingValueInterpretation( AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index 78368a43c049..4ca9d5035c81 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -59,8 +59,7 @@ public class StorageApiLoads final TupleTag failedRowsTag = new TupleTag<>("failedRows"); @Nullable TupleTag successfulWrittenRowsTag; - @Nullable Predicate successfulRowsPredicate; - + Predicate successfulRowsPredicate; private final Coder destinationCoder; private final StorageApiDynamicDestinations dynamicDestinations; @@ -115,8 +114,8 @@ public StorageApiLoads( this.ignoreUnknownValues = ignoreUnknownValues; if (propagateSuccessfulStorageApiWrites) { this.successfulWrittenRowsTag = new TupleTag<>("successfulPublishedRowsTag"); - this.successfulRowsPredicate = propagateSuccessfulStorageApiWritesPredicate; } + this.successfulRowsPredicate = propagateSuccessfulStorageApiWritesPredicate; this.usesCdc = usesCdc; this.defaultMissingValueInterpretation = defaultMissingValueInterpretation; this.badRecordRouter = badRecordRouter; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java index bc7a7f983143..0e38fc42e74b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java @@ -45,7 +45,7 @@ public class StorageApiWriteRecordsInconsistent private final TupleTag failedRowsTag; private final @Nullable TupleTag successfulRowsTag; - private final @Nullable Predicate successfulRowsPredicate; + private final Predicate successfulRowsPredicate; private final TupleTag> finalizeTag = new TupleTag<>("finalizeTag"); private final Coder failedRowsCoder; @@ -62,7 +62,7 @@ public StorageApiWriteRecordsInconsistent( BigQueryServices bqServices, TupleTag failedRowsTag, @Nullable TupleTag successfulRowsTag, - @Nullable Predicate successfulRowsPredicate, + Predicate successfulRowsPredicate, Coder failedRowsCoder, Coder successfulRowsCoder, boolean autoUpdateSchema, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 1e4a10d6633d..1d9cc1f21c69 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -109,7 +109,7 @@ public class StorageApiWriteUnshardedRecords private final BigQueryServices bqServices; private final TupleTag failedRowsTag; private final @Nullable TupleTag successfulRowsTag; - private final @Nullable Predicate successfulRowsPredicate; + private final Predicate successfulRowsPredicate; private final TupleTag> finalizeTag = new TupleTag<>("finalizeTag"); private final Coder failedRowsCoder; private final Coder successfulRowsCoder; @@ -171,7 +171,7 @@ public StorageApiWriteUnshardedRecords( BigQueryServices bqServices, TupleTag failedRowsTag, @Nullable TupleTag successfulRowsTag, - @Nullable Predicate successfulRowsPredicate, + Predicate successfulRowsPredicate, Coder failedRowsCoder, Coder successfulRowsCoder, boolean autoUpdateSchema, @@ -254,7 +254,7 @@ static class WriteRecordsDoFn private final TupleTag failedRowsTag; private final @Nullable TupleTag successfulRowsTag; - private final @Nullable Predicate successfulRowsPredicate; + private final Predicate successfulRowsPredicate; private final boolean autoUpdateSchema; private final boolean ignoreUnknownValues; private final BigQueryIO.Write.CreateDisposition createDisposition; @@ -584,7 +584,7 @@ void addMessage( } catch (TableRowToStorageApiProto.SchemaConversionException e) { @Nullable TableRow tableRow = payload.getFailsafeTableRow(); if (tableRow == null) { - tableRow = checkNotNull(appendClientInfo).toTableRow(payloadBytes); + tableRow = checkNotNull(appendClientInfo).toTableRow(payloadBytes, Predicates.alwaysTrue()); } // TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we only // execute this @@ -961,7 +961,7 @@ void postFlush() { TupleTag> finalizeTag, TupleTag failedRowsTag, @Nullable TupleTag successfulRowsTag, - @Nullable Predicate successfulRowsPredicate, + Predicate successfulRowsPredicate, boolean autoUpdateSchema, boolean ignoreUnknownValues, BigQueryIO.Write.CreateDisposition createDisposition, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 7e9da8fdbe3c..f22a14aaf0da 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -94,6 +94,7 @@ import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; @@ -135,7 +136,7 @@ public class StorageApiWritesShardedRecords failedRowsTag; private final @Nullable TupleTag successfulRowsTag; - private final @Nullable Predicate successfulRowsPredicate; + private final Predicate successfulRowsPredicate; private final Coder succussfulRowsCoder; private final TupleTag> flushTag = new TupleTag<>("flushTag"); @@ -228,7 +229,7 @@ public StorageApiWritesShardedRecords( Coder successfulRowsCoder, TupleTag failedRowsTag, @Nullable TupleTag successfulRowsTag, - @Nullable Predicate successfulRowsPredicate, + Predicate successfulRowsPredicate, boolean autoUpdateSchema, boolean ignoreUnknownValues, AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) { @@ -582,7 +583,7 @@ public void process( element.getValue(), splitSize, (fields, ignore) -> appendClientInfo.get().encodeUnknownFields(fields, ignore), - bytes -> appendClientInfo.get().toTableRow(bytes), + bytes -> appendClientInfo.get().toTableRow(bytes, Predicates.alwaysTrue()), (failedRow, errorMessage) -> { o.get(failedRowsTag) .outputWithTimestamp( @@ -699,7 +700,7 @@ public void process( TableRow failedRow = failedContext.failsafeTableRows.get(failedIndex); if (failedRow == null) { ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex); - failedRow = appendClientInfo.get().toTableRow(protoBytes); + failedRow = appendClientInfo.get().toTableRow(protoBytes, Predicates.alwaysTrue()); } org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex); o.get(failedRowsTag) @@ -869,7 +870,7 @@ public void process( TableRow failedRow = splitValue.getFailsafeTableRows().get(i); if (failedRow == null) { ByteString rowBytes = splitValue.getProtoRows().getSerializedRows(i); - failedRow = appendClientInfo.get().toTableRow(rowBytes); + failedRow = appendClientInfo.get().toTableRow(rowBytes, Predicates.alwaysTrue()); } o.get(failedRowsTag) .outputWithTimestamp( From af724fb4135a849db0f1a75ca86fab130e8421ed Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Sun, 25 Aug 2024 19:41:55 -0700 Subject: [PATCH 3/7] spotless --- .../sdk/io/gcp/bigquery/AppendClientInfo.java | 3 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 29 +++++------ .../StorageApiDynamicDestinationsProto.java | 3 +- .../StorageApiWriteRecordsInconsistent.java | 3 +- .../StorageApiWriteUnshardedRecords.java | 49 ++++++++++--------- .../StorageApiWritesShardedRecords.java | 7 ++- .../bigquery/TableRowToStorageApiProto.java | 11 +++-- .../io/gcp/testing/FakeDatasetService.java | 4 +- 8 files changed, 62 insertions(+), 47 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index 356cfa0f93ca..c5867cc7f522 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -172,7 +172,8 @@ public TableRow toTableRow(ByteString protoBytes, Predicate includeField return TableRowToStorageApiProto.tableRowFromMessage( DynamicMessage.parseFrom( TableRowToStorageApiProto.wrapDescriptorProto(getDescriptor()), protoBytes), - true, includeField); + true, + includeField); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 857e3f60650a..e309b23b54cc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2239,7 +2239,7 @@ public static Write write() { .setDeterministicRecordIdFn(null) .setMaxRetryJobs(1000) .setPropagateSuccessfulStorageApiWrites(false) - .setPropagateSuccessfulStorageApiWritesPredicate(Predicates.alwaysTrue()) + .setPropagateSuccessfulStorageApiWritesPredicate(Predicates.alwaysTrue()) .setDirectWriteProtos(true) .setDefaultMissingValueInterpretation( AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) @@ -2302,7 +2302,8 @@ public static Write writeProtos(Class protoMessageClas throw new IllegalArgumentException("DynamicMessage is not supported."); } return BigQueryIO.write() - .withFormatFunction(m -> TableRowToStorageApiProto.tableRowFromMessage(m, false, Predicates.alwaysTrue())) + .withFormatFunction( + m -> TableRowToStorageApiProto.tableRowFromMessage(m, false, Predicates.alwaysTrue())) .withWriteProtosClass(protoMessageClass); } @@ -2512,8 +2513,8 @@ abstract Builder setAvroSchemaFactory( abstract Builder setPropagateSuccessfulStorageApiWrites( boolean propagateSuccessfulStorageApiWrites); - abstract Builder setPropagateSuccessfulStorageApiWritesPredicate( - Predicate columnsToPropagate); + abstract Builder setPropagateSuccessfulStorageApiWritesPredicate( + Predicate columnsToPropagate); abstract Builder setMaxFilesPerPartition(int maxFilesPerPartition); @@ -3040,16 +3041,16 @@ public Write withPropagateSuccessfulStorageApiWrites( .build(); } - /** - * If set to true, then all successful writes will be propagated to {@link WriteResult} and - * accessible via the {@link WriteResult#getSuccessfulStorageApiInserts} method. - */ - public Write withPropagateSuccessfulStorageApiWrites(Predicate columnsToPropagate) { - return toBuilder() - .setPropagateSuccessfulStorageApiWrites(true) - .setPropagateSuccessfulStorageApiWritesPredicate(columnsToPropagate) - .build(); - } + /** + * If set to true, then all successful writes will be propagated to {@link WriteResult} and + * accessible via the {@link WriteResult#getSuccessfulStorageApiInserts} method. + */ + public Write withPropagateSuccessfulStorageApiWrites(Predicate columnsToPropagate) { + return toBuilder() + .setPropagateSuccessfulStorageApiWrites(true) + .setPropagateSuccessfulStorageApiWritesPredicate(columnsToPropagate) + .build(); + } /** * Provides a custom location on GCS for storing temporary files to be loaded via BigQuery batch diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java index a328ebb3cebf..7f4ec4a77d0b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java @@ -107,7 +107,8 @@ public TableRow toFailsafeTableRow(T element) { DynamicMessage.parseFrom( TableRowToStorageApiProto.wrapDescriptorProto(descriptorProto), element.toByteArray()), - true, Predicates.alwaysTrue()); + true, + Predicates.alwaysTrue()); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java index 0e38fc42e74b..0860b4eda8a2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java @@ -19,6 +19,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; +import java.util.function.Predicate; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.PTransform; @@ -29,8 +30,6 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import java.util.function.Predicate; - /** * A transform to write sharded records to BigQuery using the Storage API. This transform uses the * default stream to write the records. Records written will show up in BigQuery immediately, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 1d9cc1f21c69..c4ba8a8600d9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -584,7 +584,9 @@ void addMessage( } catch (TableRowToStorageApiProto.SchemaConversionException e) { @Nullable TableRow tableRow = payload.getFailsafeTableRow(); if (tableRow == null) { - tableRow = checkNotNull(appendClientInfo).toTableRow(payloadBytes, Predicates.alwaysTrue()); + tableRow = + checkNotNull(appendClientInfo) + .toTableRow(payloadBytes, Predicates.alwaysTrue()); } // TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we only // execute this @@ -649,7 +651,8 @@ long flush( TableRowToStorageApiProto.wrapDescriptorProto( getAppendClientInfo(true, null).getDescriptor()), rowBytes), - true, successfulRowsPredicate); + true, + successfulRowsPredicate); } org.joda.time.Instant timestamp = insertTimestamps.get(i); failedRowsReceiver.outputWithTimestamp( @@ -733,7 +736,8 @@ long flush( Preconditions.checkStateNotNull(appendClientInfo) .getDescriptor()), protoBytes), - true, Predicates.alwaysTrue()); + true, + Predicates.alwaysTrue()); } element = new BigQueryStorageApiInsertError( @@ -883,8 +887,9 @@ long flush( try { TableRow row = TableRowToStorageApiProto.tableRowFromMessage( - DynamicMessage.parseFrom(descriptor, rowBytes), true, - successfulRowsPredicate); + DynamicMessage.parseFrom(descriptor, rowBytes), + true, + successfulRowsPredicate); org.joda.time.Instant timestamp = c.timestamps.get(i); successfulRowsReceiver.outputWithTimestamp(row, timestamp); } catch (Exception e) { @@ -951,24 +956,24 @@ void postFlush() { private int streamAppendClientCount; WriteRecordsDoFn( - String operationName, - StorageApiDynamicDestinations dynamicDestinations, - BigQueryServices bqServices, - boolean useDefaultStream, - int flushThresholdBytes, - int flushThresholdCount, - int streamAppendClientCount, - TupleTag> finalizeTag, - TupleTag failedRowsTag, - @Nullable TupleTag successfulRowsTag, - Predicate successfulRowsPredicate, + String operationName, + StorageApiDynamicDestinations dynamicDestinations, + BigQueryServices bqServices, + boolean useDefaultStream, + int flushThresholdBytes, + int flushThresholdCount, + int streamAppendClientCount, + TupleTag> finalizeTag, + TupleTag failedRowsTag, + @Nullable TupleTag successfulRowsTag, + Predicate successfulRowsPredicate, boolean autoUpdateSchema, - boolean ignoreUnknownValues, - BigQueryIO.Write.CreateDisposition createDisposition, - @Nullable String kmsKey, - boolean usesCdc, - AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, - int maxRetries) { + boolean ignoreUnknownValues, + BigQueryIO.Write.CreateDisposition createDisposition, + @Nullable String kmsKey, + boolean usesCdc, + AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, + int maxRetries) { this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index f22a14aaf0da..e2674fe34f2e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -700,7 +700,8 @@ public void process( TableRow failedRow = failedContext.failsafeTableRows.get(failedIndex); if (failedRow == null) { ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex); - failedRow = appendClientInfo.get().toTableRow(protoBytes, Predicates.alwaysTrue()); + failedRow = + appendClientInfo.get().toTableRow(protoBytes, Predicates.alwaysTrue()); } org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex); o.get(failedRowsTag) @@ -839,7 +840,9 @@ public void process( ByteString protoBytes = context.protoRows.getSerializedRows(i); org.joda.time.Instant timestamp = context.timestamps.get(i); o.get(successfulRowsTag) - .outputWithTimestamp(appendClientInfo.get().toTableRow(protoBytes, successfulRowsPredicate), timestamp); + .outputWithTimestamp( + appendClientInfo.get().toTableRow(protoBytes, successfulRowsPredicate), + timestamp); } } }; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index 408a309c0b50..a18b52dd834c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -1093,15 +1093,15 @@ private static long toEpochMicros(Instant timestamp) { } @VisibleForTesting - public static TableRow tableRowFromMessage(Message message, boolean includeCdcColumns, - Predicate includeField) { + public static TableRow tableRowFromMessage( + Message message, boolean includeCdcColumns, Predicate includeField) { // TODO: Would be more correct to generate TableRows using setF. TableRow tableRow = new TableRow(); for (Map.Entry field : message.getAllFields().entrySet()) { FieldDescriptor fieldDescriptor = field.getKey(); Object fieldValue = field.getValue(); if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fieldDescriptor.getName())) - && includeField.test(fieldDescriptor.getName())) { + && includeField.test(fieldDescriptor.getName())) { tableRow.put( fieldDescriptor.getName(), jsonValueFromMessageValue(fieldDescriptor, fieldValue, true, includeField)); @@ -1111,7 +1111,10 @@ public static TableRow tableRowFromMessage(Message message, boolean includeCdcCo } public static Object jsonValueFromMessageValue( - FieldDescriptor fieldDescriptor, Object fieldValue, boolean expandRepeated, Predicate includeField) { + FieldDescriptor fieldDescriptor, + Object fieldValue, + boolean expandRepeated, + Predicate includeField) { if (expandRepeated && fieldDescriptor.isRepeated()) { List valueList = (List) fieldValue; return valueList.stream() diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index ba0933ca5094..a99e4bea37a7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -649,7 +649,9 @@ public ApiFuture appendRows(long offset, ProtoRows rows) } TableRow tableRow = TableRowToStorageApiProto.tableRowFromMessage( - DynamicMessage.parseFrom(protoDescriptor, bytes), false, Predicates.alwaysTrue()); + DynamicMessage.parseFrom(protoDescriptor, bytes), + false, + Predicates.alwaysTrue()); if (shouldFailRow.apply(tableRow)) { rowIndexToErrorMessage.put(i, "Failing row " + tableRow.toPrettyString()); } From 56bfec6e513c2931bc2b0902483242e69799feb6 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 27 Aug 2024 13:36:02 -0700 Subject: [PATCH 4/7] add test --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 5 +- .../bigquery/TableRowToStorageApiProto.java | 22 +++-- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 87 +++++++++++++++++++ 3 files changed, 107 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index e309b23b54cc..42529ca9eefb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3042,8 +3042,9 @@ public Write withPropagateSuccessfulStorageApiWrites( } /** - * If set to true, then all successful writes will be propagated to {@link WriteResult} and - * accessible via the {@link WriteResult#getSuccessfulStorageApiInserts} method. + * If called, then all successful writes will be propagated to {@link WriteResult} and + * accessible via the {@link WriteResult#getSuccessfulStorageApiInserts} method. The predicate + * allows filtering out columns from appearing in the resulting PCollection. */ public Write withPropagateSuccessfulStorageApiWrites(Predicate columnsToPropagate) { return toBuilder() diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index a18b52dd834c..8c2c035f2b19 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -1095,16 +1095,27 @@ private static long toEpochMicros(Instant timestamp) { @VisibleForTesting public static TableRow tableRowFromMessage( Message message, boolean includeCdcColumns, Predicate includeField) { + return tableRowFromMessage(message, includeCdcColumns, includeField, ""); + } + + public static TableRow tableRowFromMessage( + Message message, + boolean includeCdcColumns, + Predicate includeField, + String namePrefix) { // TODO: Would be more correct to generate TableRows using setF. TableRow tableRow = new TableRow(); for (Map.Entry field : message.getAllFields().entrySet()) { + StringBuilder fullName = new StringBuilder(); FieldDescriptor fieldDescriptor = field.getKey(); + fullName = fullName.append(namePrefix).append(fieldDescriptor.getName()); Object fieldValue = field.getValue(); - if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fieldDescriptor.getName())) + if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fullName.toString())) && includeField.test(fieldDescriptor.getName())) { tableRow.put( fieldDescriptor.getName(), - jsonValueFromMessageValue(fieldDescriptor, fieldValue, true, includeField)); + jsonValueFromMessageValue( + fieldDescriptor, fieldValue, true, includeField, fullName.append(".").toString())); } } return tableRow; @@ -1114,18 +1125,19 @@ public static Object jsonValueFromMessageValue( FieldDescriptor fieldDescriptor, Object fieldValue, boolean expandRepeated, - Predicate includeField) { + Predicate includeField, + String prefix) { if (expandRepeated && fieldDescriptor.isRepeated()) { List valueList = (List) fieldValue; return valueList.stream() - .map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false, includeField)) + .map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false, includeField, prefix)) .collect(toList()); } switch (fieldDescriptor.getType()) { case GROUP: case MESSAGE: - return tableRowFromMessage((Message) fieldValue, false, includeField); + return tableRowFromMessage((Message) fieldValue, false, includeField, prefix); case BYTES: return BaseEncoding.base64().encode(((ByteString) fieldValue).toByteArray()); case ENUM: diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 2736ed7beb88..58b6746ea5c4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -35,6 +35,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeFalse; import static org.junit.Assume.assumeTrue; import com.google.api.core.ApiFuture; @@ -80,6 +81,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import java.util.function.LongFunction; +import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -1593,6 +1595,91 @@ public void testStreamingStorageApiWriteWithAutoShardingWithErrorHandling() thro storageWriteWithErrorHandling(true); } + private void storageWriteWithSuccessHandling(boolean columnSubset) throws Exception { + assumeTrue(useStorageApi); + if (!useStreaming) { + assumeFalse(useStorageApiApproximate); + } + List elements = + IntStream.range(0, 30) + .mapToObj(Integer::toString) + .map(i -> new TableRow().set("number", i).set("string", i)) + .collect(Collectors.toList()); + + List expectedSuccessElements = elements; + if (columnSubset) { + expectedSuccessElements = + elements.stream() + .map(tr -> new TableRow().set("number", tr.get("number"))) + .collect(Collectors.toList()); + } + + TableSchema tableSchema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER"), + new TableFieldSchema().setName("string").setType("STRING"))); + + TestStream testStream = + TestStream.create(TableRowJsonCoder.of()) + .addElements( + elements.get(0), Iterables.toArray(elements.subList(1, 10), TableRow.class)) + .advanceProcessingTime(Duration.standardMinutes(1)) + .addElements( + elements.get(10), Iterables.toArray(elements.subList(11, 20), TableRow.class)) + .advanceProcessingTime(Duration.standardMinutes(1)) + .addElements( + elements.get(20), Iterables.toArray(elements.subList(21, 30), TableRow.class)) + .advanceWatermarkToInfinity(); + + BigQueryIO.Write write = + BigQueryIO.writeTableRows() + .to("project-id:dataset-id.table-id") + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withSchema(tableSchema) + .withMethod(Method.STORAGE_WRITE_API) + .withTestServices(fakeBqServices) + .withPropagateSuccessfulStorageApiWrites(true) + .withoutValidation(); + if (columnSubset) { + write = + write.withPropagateSuccessfulStorageApiWrites( + (Serializable & Predicate) s -> s.equals("number")); + } + if (useStreaming) { + if (useStorageApiApproximate) { + write = write.withMethod(Method.STORAGE_API_AT_LEAST_ONCE); + } else { + write = write.withAutoSharding(); + } + } + + PTransform> source = + useStreaming ? testStream : Create.of(elements).withCoder(TableRowJsonCoder.of()); + PCollection success = + p.apply(source).apply("WriteToBQ", write).getSuccessfulStorageApiInserts(); + + PAssert.that(success) + .containsInAnyOrder(Iterables.toArray(expectedSuccessElements, TableRow.class)); + + p.run().waitUntilFinish(); + + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"), + containsInAnyOrder(Iterables.toArray(elements, TableRow.class))); + } + + @Test + public void testStorageApiWriteWithSuccessfulRows() throws Exception { + storageWriteWithSuccessHandling(false); + } + + @Test + public void testStorageApiWriteWithSuccessfulRowsColumnSubset() throws Exception { + storageWriteWithSuccessHandling(true); + } + @DefaultSchema(JavaFieldSchema.class) static class SchemaPojo { final String name; From 303c62b70431aa7b3864a233442cc0cfd445d43d Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 27 Aug 2024 16:10:56 -0700 Subject: [PATCH 5/7] add test and comment --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 8 ++++++- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 24 +++++++++++++++---- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 42529ca9eefb..74a5042a5c11 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3044,7 +3044,13 @@ public Write withPropagateSuccessfulStorageApiWrites( /** * If called, then all successful writes will be propagated to {@link WriteResult} and * accessible via the {@link WriteResult#getSuccessfulStorageApiInserts} method. The predicate - * allows filtering out columns from appearing in the resulting PCollection. + * allows filtering out columns from appearing in the resulting PCollection. The argument to the + * predicate is the name of the field to potentially be included in the output. Nested fields + * will be presented using . notation - e.g. a.b.c. + * + *

The predicate will be invoked repeatedly for every field in every message, so it is + * recommended that it be as lightweight as possible. e.g. looking up fields in a hash table + * instead of searching a list of field names. */ public Write withPropagateSuccessfulStorageApiWrites(Predicate columnsToPropagate) { return toBuilder() diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 58b6746ea5c4..d96e22f84907 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -1603,14 +1603,23 @@ private void storageWriteWithSuccessHandling(boolean columnSubset) throws Except List elements = IntStream.range(0, 30) .mapToObj(Integer::toString) - .map(i -> new TableRow().set("number", i).set("string", i)) + .map( + i -> + new TableRow() + .set("number", i) + .set("string", i) + .set("nested", new TableRow().set("number", i))) .collect(Collectors.toList()); List expectedSuccessElements = elements; if (columnSubset) { expectedSuccessElements = elements.stream() - .map(tr -> new TableRow().set("number", tr.get("number"))) + .map( + tr -> + new TableRow() + .set("number", tr.get("number")) + .set("nested", new TableRow().set("number", tr.get("number")))) .collect(Collectors.toList()); } @@ -1619,7 +1628,13 @@ private void storageWriteWithSuccessHandling(boolean columnSubset) throws Except .setFields( ImmutableList.of( new TableFieldSchema().setName("number").setType("INTEGER"), - new TableFieldSchema().setName("string").setType("STRING"))); + new TableFieldSchema().setName("string").setType("STRING"), + new TableFieldSchema() + .setName("nested") + .setType("RECORD") + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER"))))); TestStream testStream = TestStream.create(TableRowJsonCoder.of()) @@ -1645,7 +1660,8 @@ private void storageWriteWithSuccessHandling(boolean columnSubset) throws Except if (columnSubset) { write = write.withPropagateSuccessfulStorageApiWrites( - (Serializable & Predicate) s -> s.equals("number")); + (Serializable & Predicate) + s -> s.equals("number") || s.equals("nested") || s.equals("nested.number")); } if (useStreaming) { if (useStorageApiApproximate) { From 868dbaa442aaf71f25530c84815a93608866ec19 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 27 Aug 2024 16:25:48 -0700 Subject: [PATCH 6/7] add clarifying comment --- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 74a5042a5c11..88006017a5df 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3046,7 +3046,9 @@ public Write withPropagateSuccessfulStorageApiWrites( * accessible via the {@link WriteResult#getSuccessfulStorageApiInserts} method. The predicate * allows filtering out columns from appearing in the resulting PCollection. The argument to the * predicate is the name of the field to potentially be included in the output. Nested fields - * will be presented using . notation - e.g. a.b.c. + * will be presented using . notation - e.g. a.b.c. If you want a nested field included, you + * must ensure that the predicate returns true for every parent field. e.g. if you want field + * "a.b.c" included, the predicate must return true for "a" for "a.b" and for "a.b.c". * *

The predicate will be invoked repeatedly for every field in every message, so it is * recommended that it be as lightweight as possible. e.g. looking up fields in a hash table From ff84ea5023a4d2906154993596a73b132ddfea1b Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 28 Aug 2024 09:34:29 -0700 Subject: [PATCH 7/7] fix translation --- .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 11 +++++++++++ .../io/gcp/bigquery/BigQueryIOTranslationTest.java | 3 +++ 2 files changed, 14 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index 5b9b80071ecd..efbe97f36522 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; @@ -398,6 +399,7 @@ static class BigQueryIOWriteTranslator implements TransformPayloadTranslator transform) { fieldValues.put( "propagate_successful_storage_api_writes", transform.getPropagateSuccessfulStorageApiWrites()); + fieldValues.put( + "propagate_successful_storage_api_writes_predicate", + toByteArray(transform.getPropagateSuccessfulStorageApiWritesPredicate())); fieldValues.put("max_files_per_partition", transform.getMaxFilesPerPartition()); fieldValues.put("max_bytes_per_partition", transform.getMaxBytesPerPartition()); if (transform.getTriggeringFrequency() != null) { @@ -752,6 +757,12 @@ public Write fromConfigRow(Row configRow, PipelineOptions options) { builder = builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites); } + byte[] predicate = configRow.getBytes("propagate_successful_storage_api_writes_predicate"); + if (predicate != null) { + builder = + builder.setPropagateSuccessfulStorageApiWritesPredicate( + (Predicate) fromByteArray(predicate)); + } Integer maxFilesPerPartition = configRow.getInt32("max_files_per_partition"); if (maxFilesPerPartition != null) { builder = builder.setMaxFilesPerPartition(maxFilesPerPartition); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java index 950ebcaafe45..e15258e6ab40 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java @@ -105,6 +105,9 @@ public class BigQueryIOTranslationTest { "getNumStorageWriteApiStreams", "num_storage_write_api_streams"); WRITE_TRANSFORM_SCHEMA_MAPPING.put( "getPropagateSuccessfulStorageApiWrites", "propagate_successful_storage_api_writes"); + WRITE_TRANSFORM_SCHEMA_MAPPING.put( + "getPropagateSuccessfulStorageApiWritesPredicate", + "propagate_successful_storage_api_writes_predicate"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getMaxFilesPerPartition", "max_files_per_partition"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getMaxBytesPerPartition", "max_bytes_per_partition"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getTriggeringFrequency", "triggering_frequency");