Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add predicate to control which columns are propagated when propagating successful rows #32312

Merged
merged 7 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.sdk.metrics.Counter;
Expand Down Expand Up @@ -166,12 +167,13 @@ Descriptors.Descriptor getDescriptorIgnoreRequired() {
}
}

public TableRow toTableRow(ByteString protoBytes) {
public TableRow toTableRow(ByteString protoBytes, Predicate<String> includeField) {
try {
return TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(
TableRowToStorageApiProto.wrapDescriptorProto(getDescriptor()), protoBytes),
true);
true,
includeField);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericDatumReader;
Expand Down Expand Up @@ -2238,6 +2239,7 @@ public static <T> Write<T> write() {
.setDeterministicRecordIdFn(null)
.setMaxRetryJobs(1000)
.setPropagateSuccessfulStorageApiWrites(false)
.setPropagateSuccessfulStorageApiWritesPredicate(Predicates.alwaysTrue())
.setDirectWriteProtos(true)
.setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE)
Expand Down Expand Up @@ -2300,7 +2302,8 @@ public static <T extends Message> Write<T> writeProtos(Class<T> protoMessageClas
throw new IllegalArgumentException("DynamicMessage is not supported.");
}
return BigQueryIO.<T>write()
.withFormatFunction(m -> TableRowToStorageApiProto.tableRowFromMessage(m, false))
.withFormatFunction(
m -> TableRowToStorageApiProto.tableRowFromMessage(m, false, Predicates.alwaysTrue()))
.withWriteProtosClass(protoMessageClass);
}

Expand Down Expand Up @@ -2398,6 +2401,8 @@ public enum Method {

abstract boolean getPropagateSuccessfulStorageApiWrites();

abstract Predicate<String> getPropagateSuccessfulStorageApiWritesPredicate();

abstract int getMaxFilesPerPartition();

abstract long getMaxBytesPerPartition();
Expand Down Expand Up @@ -2508,6 +2513,9 @@ abstract Builder<T> setAvroSchemaFactory(
abstract Builder<T> setPropagateSuccessfulStorageApiWrites(
boolean propagateSuccessfulStorageApiWrites);

abstract Builder<T> setPropagateSuccessfulStorageApiWritesPredicate(
Predicate<String> columnsToPropagate);

abstract Builder<T> setMaxFilesPerPartition(int maxFilesPerPartition);

abstract Builder<T> setMaxBytesPerPartition(long maxBytesPerPartition);
Expand Down Expand Up @@ -3033,6 +3041,26 @@ public Write<T> withPropagateSuccessfulStorageApiWrites(
.build();
}

/**
* If called, then all successful writes will be propagated to {@link WriteResult} and
* accessible via the {@link WriteResult#getSuccessfulStorageApiInserts} method. The predicate
* allows filtering out columns from appearing in the resulting PCollection. The argument to the
* predicate is the name of the field to potentially be included in the output. Nested fields
* will be presented using . notation - e.g. a.b.c. If you want a nested field included, you
* must ensure that the predicate returns true for every parent field. e.g. if you want field
* "a.b.c" included, the predicate must return true for "a" for "a.b" and for "a.b.c".
*
* <p>The predicate will be invoked repeatedly for every field in every message, so it is
* recommended that it be as lightweight as possible. e.g. looking up fields in a hash table
* instead of searching a list of field names.
*/
public Write<T> withPropagateSuccessfulStorageApiWrites(Predicate<String> columnsToPropagate) {
return toBuilder()
.setPropagateSuccessfulStorageApiWrites(true)
.setPropagateSuccessfulStorageApiWritesPredicate(columnsToPropagate)
.build();
}

/**
* Provides a custom location on GCS for storing temporary files to be loaded via BigQuery batch
* load jobs. See "Usage with templates" in {@link BigQueryIO} documentation for discussion.
Expand Down Expand Up @@ -3885,6 +3913,7 @@ private <DestinationT> WriteResult continueExpandTyped(
getAutoSchemaUpdate(),
getIgnoreUnknownValues(),
getPropagateSuccessfulStorageApiWrites(),
getPropagateSuccessfulStorageApiWritesPredicate(),
getRowMutationInformationFn() != null,
getDefaultMissingValueInterpretation(),
getBadRecordRouter(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.checkerframework.checker.nullness.qual.NonNull;

/** Storage API DynamicDestinations used when the input is a compiled protocol buffer. */
Expand Down Expand Up @@ -106,7 +107,8 @@ public TableRow toFailsafeTableRow(T element) {
DynamicMessage.parseFrom(
TableRowToStorageApiProto.wrapDescriptorProto(descriptorProto),
element.toByteArray()),
true);
true,
Predicates.alwaysTrue());
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class StorageApiLoads<DestinationT, ElementT>
final TupleTag<BigQueryStorageApiInsertError> failedRowsTag = new TupleTag<>("failedRows");

@Nullable TupleTag<TableRow> successfulWrittenRowsTag;
Predicate<String> successfulRowsPredicate;
private final Coder<DestinationT> destinationCoder;
private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;

Expand Down Expand Up @@ -93,6 +95,7 @@ public StorageApiLoads(
boolean autoUpdateSchema,
boolean ignoreUnknownValues,
boolean propagateSuccessfulStorageApiWrites,
Predicate<String> propagateSuccessfulStorageApiWritesPredicate,
boolean usesCdc,
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation,
BadRecordRouter badRecordRouter,
Expand All @@ -112,6 +115,7 @@ public StorageApiLoads(
if (propagateSuccessfulStorageApiWrites) {
this.successfulWrittenRowsTag = new TupleTag<>("successfulPublishedRowsTag");
}
this.successfulRowsPredicate = propagateSuccessfulStorageApiWritesPredicate;
this.usesCdc = usesCdc;
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
this.badRecordRouter = badRecordRouter;
Expand Down Expand Up @@ -174,6 +178,7 @@ public WriteResult expandInconsistent(
bqServices,
failedRowsTag,
successfulWrittenRowsTag,
successfulRowsPredicate,
BigQueryStorageApiInsertErrorCoder.of(),
TableRowJsonCoder.of(),
autoUpdateSchema,
Expand Down Expand Up @@ -271,6 +276,7 @@ public WriteResult expandTriggered(
TableRowJsonCoder.of(),
failedRowsTag,
successfulWrittenRowsTag,
successfulRowsPredicate,
autoUpdateSchema,
ignoreUnknownValues,
defaultMissingValueInterpretation));
Expand Down Expand Up @@ -358,6 +364,7 @@ public WriteResult expandUntriggered(
bqServices,
failedRowsTag,
successfulWrittenRowsTag,
successfulRowsPredicate,
BigQueryStorageApiInsertErrorCoder.of(),
TableRowJsonCoder.of(),
autoUpdateSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -42,6 +43,9 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, ElementT>
private final BigQueryServices bqServices;
private final TupleTag<BigQueryStorageApiInsertError> failedRowsTag;
private final @Nullable TupleTag<TableRow> successfulRowsTag;

private final Predicate<String> successfulRowsPredicate;

private final TupleTag<KV<String, String>> finalizeTag = new TupleTag<>("finalizeTag");
private final Coder<BigQueryStorageApiInsertError> failedRowsCoder;
private final Coder<TableRow> successfulRowsCoder;
Expand All @@ -57,6 +61,7 @@ public StorageApiWriteRecordsInconsistent(
BigQueryServices bqServices,
TupleTag<BigQueryStorageApiInsertError> failedRowsTag,
@Nullable TupleTag<TableRow> successfulRowsTag,
Predicate<String> successfulRowsPredicate,
Coder<BigQueryStorageApiInsertError> failedRowsCoder,
Coder<TableRow> successfulRowsCoder,
boolean autoUpdateSchema,
Expand All @@ -71,6 +76,7 @@ public StorageApiWriteRecordsInconsistent(
this.failedRowsCoder = failedRowsCoder;
this.successfulRowsCoder = successfulRowsCoder;
this.successfulRowsTag = successfulRowsTag;
this.successfulRowsPredicate = successfulRowsPredicate;
this.autoUpdateSchema = autoUpdateSchema;
this.ignoreUnknownValues = ignoreUnknownValues;
this.createDisposition = createDisposition;
Expand Down Expand Up @@ -103,6 +109,7 @@ public PCollectionTuple expand(PCollection<KV<DestinationT, StorageApiWritePaylo
finalizeTag,
failedRowsTag,
successfulRowsTag,
successfulRowsPredicate,
autoUpdateSchema,
ignoreUnknownValues,
createDisposition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -79,6 +80,7 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -107,6 +109,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
private final BigQueryServices bqServices;
private final TupleTag<BigQueryStorageApiInsertError> failedRowsTag;
private final @Nullable TupleTag<TableRow> successfulRowsTag;
private final Predicate<String> successfulRowsPredicate;
private final TupleTag<KV<String, String>> finalizeTag = new TupleTag<>("finalizeTag");
private final Coder<BigQueryStorageApiInsertError> failedRowsCoder;
private final Coder<TableRow> successfulRowsCoder;
Expand Down Expand Up @@ -168,6 +171,7 @@ public StorageApiWriteUnshardedRecords(
BigQueryServices bqServices,
TupleTag<BigQueryStorageApiInsertError> failedRowsTag,
@Nullable TupleTag<TableRow> successfulRowsTag,
Predicate<String> successfulRowsPredicate,
Coder<BigQueryStorageApiInsertError> failedRowsCoder,
Coder<TableRow> successfulRowsCoder,
boolean autoUpdateSchema,
Expand All @@ -180,6 +184,7 @@ public StorageApiWriteUnshardedRecords(
this.bqServices = bqServices;
this.failedRowsTag = failedRowsTag;
this.successfulRowsTag = successfulRowsTag;
this.successfulRowsPredicate = successfulRowsPredicate;
this.failedRowsCoder = failedRowsCoder;
this.successfulRowsCoder = successfulRowsCoder;
this.autoUpdateSchema = autoUpdateSchema;
Expand Down Expand Up @@ -216,6 +221,7 @@ public PCollectionTuple expand(PCollection<KV<DestinationT, StorageApiWritePaylo
finalizeTag,
failedRowsTag,
successfulRowsTag,
successfulRowsPredicate,
autoUpdateSchema,
ignoreUnknownValues,
createDisposition,
Expand Down Expand Up @@ -247,6 +253,8 @@ static class WriteRecordsDoFn<DestinationT extends @NonNull Object, ElementT>
private final TupleTag<KV<String, String>> finalizeTag;
private final TupleTag<BigQueryStorageApiInsertError> failedRowsTag;
private final @Nullable TupleTag<TableRow> successfulRowsTag;

private final Predicate<String> successfulRowsPredicate;
private final boolean autoUpdateSchema;
private final boolean ignoreUnknownValues;
private final BigQueryIO.Write.CreateDisposition createDisposition;
Expand Down Expand Up @@ -576,7 +584,9 @@ void addMessage(
} catch (TableRowToStorageApiProto.SchemaConversionException e) {
@Nullable TableRow tableRow = payload.getFailsafeTableRow();
if (tableRow == null) {
tableRow = checkNotNull(appendClientInfo).toTableRow(payloadBytes);
tableRow =
checkNotNull(appendClientInfo)
.toTableRow(payloadBytes, Predicates.alwaysTrue());
}
// TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we only
// execute this
Expand Down Expand Up @@ -641,7 +651,8 @@ long flush(
TableRowToStorageApiProto.wrapDescriptorProto(
getAppendClientInfo(true, null).getDescriptor()),
rowBytes),
true);
true,
successfulRowsPredicate);
}
org.joda.time.Instant timestamp = insertTimestamps.get(i);
failedRowsReceiver.outputWithTimestamp(
Expand Down Expand Up @@ -725,7 +736,8 @@ long flush(
Preconditions.checkStateNotNull(appendClientInfo)
.getDescriptor()),
protoBytes),
true);
true,
Predicates.alwaysTrue());
}
element =
new BigQueryStorageApiInsertError(
Expand Down Expand Up @@ -875,7 +887,9 @@ long flush(
try {
TableRow row =
TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(descriptor, rowBytes), true);
DynamicMessage.parseFrom(descriptor, rowBytes),
true,
successfulRowsPredicate);
org.joda.time.Instant timestamp = c.timestamps.get(i);
successfulRowsReceiver.outputWithTimestamp(row, timestamp);
} catch (Exception e) {
Expand Down Expand Up @@ -952,6 +966,7 @@ void postFlush() {
TupleTag<KV<String, String>> finalizeTag,
TupleTag<BigQueryStorageApiInsertError> failedRowsTag,
@Nullable TupleTag<TableRow> successfulRowsTag,
Predicate<String> successfulRowsPredicate,
boolean autoUpdateSchema,
boolean ignoreUnknownValues,
BigQueryIO.Write.CreateDisposition createDisposition,
Expand All @@ -969,6 +984,7 @@ void postFlush() {
this.finalizeTag = finalizeTag;
this.failedRowsTag = failedRowsTag;
this.successfulRowsTag = successfulRowsTag;
this.successfulRowsPredicate = successfulRowsPredicate;
this.autoUpdateSchema = autoUpdateSchema;
this.ignoreUnknownValues = ignoreUnknownValues;
this.createDisposition = createDisposition;
Expand Down
Loading
Loading