Skip to content

Commit

Permalink
Roll forward "Read API Source v2 (#25392)" fix data loss (#28778)
Browse files Browse the repository at this point in the history
* Revert "Read API Source v2 (#25392)"

This reverts commit 4ce8eed.

* Preserve the ablility to use read API backend feature

* Add log about num of stream

* Add to CHANGES.md
  • Loading branch information
Abacn authored Feb 27, 2024
1 parent 5af76b0 commit 549faba
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 2,768 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
* Full Support for Storage Read and Write APIs
* Partial Support for File Loads (Failures writing to files supported, failures loading files to BQ unsupported)
* No Support for Extract or Streaming Inserts
* `--enableBundling` pipeline option for BigQueryIO DIRECT_READ is replaced by `--enableStorageReadApiV2`. Both were considered experimental and may subject to change (Java) ([#26354](https://github.com/apache/beam/issues/26354)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.io.AvroSource;
Expand Down Expand Up @@ -113,9 +112,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -1546,7 +1543,6 @@ private PCollection<T> expandForDirectRead(
//

PCollectionView<String> jobIdTokenView;
PCollectionTuple tuple;
PCollection<T> rows;

if (!getWithTemplateCompatibility()
Expand Down Expand Up @@ -1578,46 +1574,24 @@ public String apply(String input) {
jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());

TupleTag<ReadStream> readStreamsTag = new TupleTag<>();
TupleTag<List<ReadStream>> listReadStreamsTag = new TupleTag<>();
TupleTag<ReadSession> readSessionTag = new TupleTag<>();
TupleTag<String> tableSchemaTag = new TupleTag<>();

if (!bqOptions.getEnableBundling()) {
tuple =
createTupleForDirectRead(
jobIdTokenCollection,
outputCoder,
readStreamsTag,
readSessionTag,
tableSchemaTag);
tuple.get(readStreamsTag).setCoder(ProtoCoder.of(ReadStream.class));
} else {
tuple =
createTupleForDirectReadWithStreamBundle(
jobIdTokenCollection,
outputCoder,
listReadStreamsTag,
readSessionTag,
tableSchemaTag);
tuple.get(listReadStreamsTag).setCoder(ListCoder.of(ProtoCoder.of(ReadStream.class)));
}

PCollectionTuple tuple =
createTupleForDirectRead(
jobIdTokenCollection, outputCoder, readStreamsTag, readSessionTag, tableSchemaTag);
tuple.get(readStreamsTag).setCoder(ProtoCoder.of(ReadStream.class));
tuple.get(readSessionTag).setCoder(ProtoCoder.of(ReadSession.class));
tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of());

PCollectionView<ReadSession> readSessionView =
tuple.get(readSessionTag).apply("ReadSessionView", View.asSingleton());
PCollectionView<String> tableSchemaView =
tuple.get(tableSchemaTag).apply("TableSchemaView", View.asSingleton());

if (!bqOptions.getEnableBundling()) {
rows =
createPCollectionForDirectRead(
tuple, outputCoder, readStreamsTag, readSessionView, tableSchemaView);
} else {
rows =
createPCollectionForDirectReadWithStreamBundle(
tuple, outputCoder, listReadStreamsTag, readSessionView, tableSchemaView);
}
rows =
createPCollectionForDirectRead(
tuple, outputCoder, readStreamsTag, readSessionView, tableSchemaView);
}

PassThroughThenCleanup.CleanupOperation cleanupOperation =
Expand Down Expand Up @@ -1691,13 +1665,9 @@ public void processElement(
if (boundedSource instanceof BigQueryStorageStreamSource) {
sourceWithErrorHandlingParseFn =
((BigQueryStorageStreamSource<T>) boundedSource).fromExisting(errorHandlingParseFn);
} else if (boundedSource instanceof BigQueryStorageStreamBundleSource) {
sourceWithErrorHandlingParseFn =
((BigQueryStorageStreamBundleSource<T>) boundedSource)
.fromExisting(errorHandlingParseFn);
} else {
throw new RuntimeException(
"Bounded Source is not BigQueryStorageStreamSource or BigQueryStorageStreamBundleSource, unable to read");
"Bounded Source is not BigQueryStorageStreamSource, unable to read");
}
readSource(
options,
Expand Down Expand Up @@ -1772,79 +1742,6 @@ public void processElement(ProcessContext c) throws Exception {
return tuple;
}

private PCollectionTuple createTupleForDirectReadWithStreamBundle(
PCollection<String> jobIdTokenCollection,
Coder<T> outputCoder,
TupleTag<List<ReadStream>> listReadStreamsTag,
TupleTag<ReadSession> readSessionTag,
TupleTag<String> tableSchemaTag) {

PCollectionTuple tuple =
jobIdTokenCollection.apply(
"RunQueryJob",
ParDo.of(
new DoFn<String, List<ReadStream>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
BigQueryOptions options =
c.getPipelineOptions().as(BigQueryOptions.class);
String jobUuid = c.element();
// Execute the query and get the destination table holding the results.
// The getTargetTable call runs a new instance of the query and returns
// the destination table created to hold the results.
BigQueryStorageQuerySource<T> querySource =
createStorageQuerySource(jobUuid, outputCoder);
Table queryResultTable = querySource.getTargetTable(options);

// Create a read session without specifying a desired stream count and
// let the BigQuery storage server pick the number of streams.
CreateReadSessionRequest request =
CreateReadSessionRequest.newBuilder()
.setParent(
BigQueryHelpers.toProjectResourceName(
options.getBigQueryProject() == null
? options.getProject()
: options.getBigQueryProject()))
.setReadSession(
ReadSession.newBuilder()
.setTable(
BigQueryHelpers.toTableResourceName(
queryResultTable.getTableReference()))
.setDataFormat(DataFormat.AVRO))
.setMaxStreamCount(0)
.build();

ReadSession readSession;
try (StorageClient storageClient =
getBigQueryServices().getStorageClient(options)) {
readSession = storageClient.createReadSession(request);
}
int streamIndex = 0;
int streamsPerBundle = 10;
List<ReadStream> streamBundle = Lists.newArrayList();
for (ReadStream readStream : readSession.getStreamsList()) {
streamIndex++;
streamBundle.add(readStream);
if (streamIndex % streamsPerBundle == 0) {
c.output(streamBundle);
streamBundle = Lists.newArrayList();
}
}
if (streamIndex % streamsPerBundle != 0) {
c.output(streamBundle);
}
c.output(readSessionTag, readSession);
c.output(
tableSchemaTag,
BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
}
})
.withOutputTags(
listReadStreamsTag, TupleTagList.of(readSessionTag).and(tableSchemaTag)));

return tuple;
}

private static class ErrorHandlingParseFn<T>
implements SerializableFunction<SchemaAndRecord, T> {
private final SerializableFunction<SchemaAndRecord, T> parseFn;
Expand Down Expand Up @@ -1931,62 +1828,6 @@ public void processElement(
return resultTuple.get(rowTag).setCoder(outputCoder);
}

private PCollection<T> createPCollectionForDirectReadWithStreamBundle(
PCollectionTuple tuple,
Coder<T> outputCoder,
TupleTag<List<ReadStream>> listReadStreamsTag,
PCollectionView<ReadSession> readSessionView,
PCollectionView<String> tableSchemaView) {
TupleTag<T> rowTag = new TupleTag<>();
PCollectionTuple resultTuple =
tuple
.get(listReadStreamsTag)
.apply(Reshuffle.viaRandomKey())
.apply(
ParDo.of(
new DoFn<List<ReadStream>, T>() {
@ProcessElement
public void processElement(
ProcessContext c, MultiOutputReceiver outputReceiver)
throws Exception {
ReadSession readSession = c.sideInput(readSessionView);
TableSchema tableSchema =
BigQueryHelpers.fromJsonString(
c.sideInput(tableSchemaView), TableSchema.class);
List<ReadStream> streamBundle = c.element();

ErrorHandlingParseFn<T> errorHandlingParseFn =
new ErrorHandlingParseFn<T>(getParseFn());

BigQueryStorageStreamBundleSource<T> streamSource =
BigQueryStorageStreamBundleSource.create(
readSession,
streamBundle,
tableSchema,
errorHandlingParseFn,
outputCoder,
getBigQueryServices(),
1L);

readSource(
c.getPipelineOptions(),
rowTag,
outputReceiver,
streamSource,
errorHandlingParseFn,
getBadRecordRouter());
}
})
.withSideInputs(readSessionView, tableSchemaView)
.withOutputTags(rowTag, TupleTagList.of(BAD_RECORD_TAG)));

getBadRecordErrorHandler()
.addErrorCollection(
resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(tuple.getPipeline())));

return resultTuple.get(rowTag).setCoder(outputCoder);
}

public static <T> void readSource(
PipelineOptions options,
TupleTag<T> rowTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ public interface BigQueryOptions
void setStorageWriteApiMaxRequestSize(Long value);

@Description(
"If set, BigQueryIO.Read will use the StreamBundle based"
+ "implementation of the Read API Source")
"If set, BigQueryIO.Read will rely on the Read API backends to surface the appropriate"
+ " number of streams for read")
@Default.Boolean(false)
Boolean getEnableBundling();
Boolean getEnableStorageReadApiV2();

void setEnableBundling(Boolean value);
void setEnableStorageReadApiV2(Boolean value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public Coder<T> getOutputCoder() {
}

@Override
public List<? extends BoundedSource<T>> split(
public List<BigQueryStorageStreamSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
@Nullable Table targetTable = getTargetTable(bqOptions);
Expand Down Expand Up @@ -137,7 +137,7 @@ public List<? extends BoundedSource<T>> split(
// an appropriate number of streams for the Session to produce reasonable throughput.
// This is required when using the Read API Source V2.
int streamCount = 0;
if (!bqOptions.getEnableBundling()) {
if (!bqOptions.getEnableStorageReadApiV2()) {
if (desiredBundleSizeBytes > 0) {
long tableSizeBytes = (targetTable != null) ? targetTable.getNumBytes() : 0;
streamCount = (int) Math.min(tableSizeBytes / desiredBundleSizeBytes, MAX_SPLIT_COUNT);
Expand Down Expand Up @@ -167,27 +167,11 @@ public List<? extends BoundedSource<T>> split(
}

if (readSession.getStreamsList().isEmpty()) {
// The underlying table is empty or all rows have been pruned.
LOG.info(
"Returned stream list is empty. The underlying table is empty or all rows have been pruned.");
return ImmutableList.of();
}

streamCount = readSession.getStreamsList().size();
int streamsPerBundle = 0;
double bytesPerStream = 0;
LOG.info(
"Estimated bytes this ReadSession will scan when all Streams are consumed: '{}'",
readSession.getEstimatedTotalBytesScanned());
if (bqOptions.getEnableBundling()) {
if (desiredBundleSizeBytes > 0) {
bytesPerStream =
(double) readSession.getEstimatedTotalBytesScanned() / readSession.getStreamsCount();
LOG.info("Estimated bytes each Stream will consume: '{}'", bytesPerStream);
streamsPerBundle = (int) Math.ceil(desiredBundleSizeBytes / bytesPerStream);
} else {
streamsPerBundle = (int) Math.ceil((double) streamCount / 10);
}
streamsPerBundle = Math.min(streamCount, streamsPerBundle);
LOG.info("Distributing '{}' Streams per StreamBundle.", streamsPerBundle);
} else {
LOG.info("Read session returned {} streams", readSession.getStreamsList().size());
}

Schema sessionSchema;
Expand All @@ -204,37 +188,18 @@ public List<? extends BoundedSource<T>> split(
throw new IllegalArgumentException(
"data is not in a supported dataFormat: " + readSession.getDataFormat());
}
int streamIndex = 0;

Preconditions.checkStateNotNull(
targetTable); // TODO: this is inconsistent with method above, where it can be null
TableSchema trimmedSchema =
BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), sessionSchema);
if (!bqOptions.getEnableBundling()) {
List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
for (ReadStream readStream : readSession.getStreamsList()) {
sources.add(
BigQueryStorageStreamSource.create(
readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
}
return ImmutableList.copyOf(sources);
}
List<ReadStream> streamBundle = Lists.newArrayList();
List<BigQueryStorageStreamBundleSource<T>> sources = Lists.newArrayList();
List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
for (ReadStream readStream : readSession.getStreamsList()) {
streamIndex++;
streamBundle.add(readStream);
if (streamIndex % streamsPerBundle == 0) {
sources.add(
BigQueryStorageStreamBundleSource.create(
readSession, streamBundle, trimmedSchema, parseFn, outputCoder, bqServices, 1L));
streamBundle = Lists.newArrayList();
}
}
if (streamIndex % streamsPerBundle != 0) {
sources.add(
BigQueryStorageStreamBundleSource.create(
readSession, streamBundle, trimmedSchema, parseFn, outputCoder, bqServices, 1L));
BigQueryStorageStreamSource.create(
readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices));
}

return ImmutableList.copyOf(sources);
}

Expand Down
Loading

0 comments on commit 549faba

Please sign in to comment.