Skip to content

Commit

Permalink
BigQueryIO : control StorageWrite parallelism in batch, by reshufflin…
Browse files Browse the repository at this point in the history
…g before write on the number of streams set for BigQueryIO.write() using .withNumStorageWriteApiStreams(numStorageWriteApiStreams) (#32805)

* BigQueryIO : control StorageWrite parallelism in batch, by reshuffling before write on the number of streams set for BigQueryIO.write() using .withNumStorageWriteApiStreams(numStorageWriteApiStreams)

* fix unused dep and comment

* spotlessApply

* spotlessApply

* fix typo
  • Loading branch information
razvanculea authored Dec 27, 2024
1 parent 067deed commit 5944a30
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,20 @@
*
* <p>Example trigger command for specific test running on Dataflow runner:
*
* <p><b>Maven</b>
*
* <pre>
* mvn test -pl it/google-cloud-platform -am -Dtest="BigQueryIOLT#testAvroFileLoadsWriteThenRead" \
* -Dconfiguration=medium -Dproject=[gcpProject] -DartifactBucket=[temp bucket] -DfailIfNoTests=false
* </pre>
*
* <p><b>Gradle</b>
*
* <pre>
* ./gradlew :it:google-cloud-platform:BigQueryPerformanceTest --tests='BigQueryIOLT.testAvroFileLoadsWriteThenRead' \
* -Dconfiguration=medium -Dproject=[gcpProject] -DartifactBucket=[temp bucket] -DfailIfNoTests=false
* </pre>
*
* <p>Example trigger command for specific test and custom data configuration:
*
* <pre>mvn test -pl it/google-cloud-platform -am \
Expand Down Expand Up @@ -172,11 +181,11 @@ public static void tearDownClass() {
Configuration.class), // 1 MB
"medium",
Configuration.fromJsonString(
"{\"numRecords\":10000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":20,\"runner\":\"DataflowRunner\"}",
"{\"numRecords\":10000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":20,\"runner\":\"DataflowRunner\",\"workerMachineType\":\"e2-standard-2\",\"experiments\":\"disable_runner_v2\",\"numWorkers\":\"1\",\"maxNumWorkers\":\"1\"}",
Configuration.class), // 10 GB
"large",
Configuration.fromJsonString(
"{\"numRecords\":100000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":80,\"runner\":\"DataflowRunner\"}",
"{\"numRecords\":100000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":80,\"runner\":\"DataflowRunner\",\"workerMachineType\":\"e2-standard-2\",\"experiments\":\"disable_runner_v2\",\"numWorkers\":\"1\",\"maxNumWorkers\":\"1\",\"numStorageWriteApiStreams\":4,\"storageWriteApiTriggeringFrequencySec\":20}",
Configuration.class) // 100 GB
);
} catch (IOException e) {
Expand Down Expand Up @@ -230,16 +239,19 @@ public void testWriteAndRead() throws IOException {
writeIO =
BigQueryIO.<byte[]>write()
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withNumStorageWriteApiStreams(
configuration.numStorageWriteApiStreams) // control the number of streams
.withAvroFormatFunction(
new AvroFormatFn(
configuration.numColumns,
!("STORAGE_WRITE_API".equalsIgnoreCase(configuration.writeMethod))));

break;
case JSON:
writeIO =
BigQueryIO.<byte[]>write()
.withSuccessfulInsertsPropagation(false)
.withNumStorageWriteApiStreams(
configuration.numStorageWriteApiStreams) // control the number of streams
.withFormatFunction(new JsonFormatFn(configuration.numColumns));
break;
}
Expand Down Expand Up @@ -268,6 +280,10 @@ private void testWrite(BigQueryIO.Write<byte[]> writeIO) throws IOException {
.setSdk(PipelineLauncher.Sdk.JAVA)
.setPipeline(writePipeline)
.addParameter("runner", configuration.runner)
.addParameter("workerMachineType", configuration.workerMachineType)
.addParameter("experiments", configuration.experiments)
.addParameter("numWorkers", configuration.numWorkers)
.addParameter("maxNumWorkers", configuration.maxNumWorkers)
.build();

PipelineLauncher.LaunchInfo launchInfo = pipelineLauncher.launch(project, region, options);
Expand Down Expand Up @@ -304,6 +320,10 @@ private void testRead() throws IOException {
.setSdk(PipelineLauncher.Sdk.JAVA)
.setPipeline(readPipeline)
.addParameter("runner", configuration.runner)
.addParameter("workerMachineType", configuration.workerMachineType)
.addParameter("experiments", configuration.experiments)
.addParameter("numWorkers", configuration.numWorkers)
.addParameter("maxNumWorkers", configuration.maxNumWorkers)
.build();

PipelineLauncher.LaunchInfo launchInfo = pipelineLauncher.launch(project, region, options);
Expand Down Expand Up @@ -445,12 +465,36 @@ static class Configuration extends SyntheticSourceOptions {
/** Runner specified to run the pipeline. */
@JsonProperty public String runner = "DirectRunner";

/** Worker machine type specified to run the pipeline with Dataflow Runner. */
@JsonProperty public String workerMachineType = "";

/** Experiments specified to run the pipeline. */
@JsonProperty public String experiments = "";

/** Number of workers to start the pipeline. Must be a positive value. */
@JsonProperty public String numWorkers = "1";

/** Maximum umber of workers for the pipeline. Must be a positive value. */
@JsonProperty public String maxNumWorkers = "1";

/** BigQuery read method: DEFAULT/DIRECT_READ/EXPORT. */
@JsonProperty public String readMethod = "DEFAULT";

/** BigQuery write method: DEFAULT/FILE_LOADS/STREAMING_INSERTS/STORAGE_WRITE_API. */
@JsonProperty public String writeMethod = "DEFAULT";

/**
* BigQuery number of streams for write method STORAGE_WRITE_API. 0 let's the runner determine
* the number of streams. Remark : max limit for open connections per hour is 10K streams.
*/
@JsonProperty public int numStorageWriteApiStreams = 0;

/**
* BigQuery triggering frequency in second in combination with the number of streams for write
* method STORAGE_WRITE_API.
*/
@JsonProperty public int storageWriteApiTriggeringFrequencySec = 20;

/** BigQuery write format: AVRO/JSON. */
@JsonProperty public String writeFormat = "AVRO";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3040,9 +3040,14 @@ public Write<T> withNumFileShards(int numFileShards) {
}

/**
* Control how many parallel streams are used when using Storage API writes. Applicable only for
* streaming pipelines, and when {@link #withTriggeringFrequency} is also set. To let runner
* determine the sharding at runtime, set this to zero, or {@link #withAutoSharding()} instead.
* Control how many parallel streams are used when using Storage API writes.
*
* <p>For streaming pipelines, and when {@link #withTriggeringFrequency} is also set. To let
* runner determine the sharding at runtime, set this to zero, or {@link #withAutoSharding()}
* instead.
*
* <p>For batch pipelines, it inserts a redistribute. To not reshufle and keep the pipeline
* parallelism as is, set this to zero.
*/
public Write<T> withNumStorageWriteApiStreams(int numStorageWriteApiStreams) {
return toBuilder().setNumStorageWriteApiStreams(numStorageWriteApiStreams).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Redistribute;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
Expand Down Expand Up @@ -360,9 +361,19 @@ public WriteResult expandUntriggered(
rowUpdateFn,
badRecordRouter));

PCollection<KV<DestinationT, StorageApiWritePayload>> successfulConvertedRows =
convertMessagesResult.get(successfulConvertedRowsTag);

if (numShards > 0) {
successfulConvertedRows =
successfulConvertedRows.apply(
"ResdistibuteNumShards",
Redistribute.<KV<DestinationT, StorageApiWritePayload>>arbitrarily()
.withNumBuckets(numShards));
}

PCollectionTuple writeRecordsResult =
convertMessagesResult
.get(successfulConvertedRowsTag)
successfulConvertedRows
.apply(
"StorageApiWriteUnsharded",
new StorageApiWriteUnshardedRecords<>(
Expand Down

0 comments on commit 5944a30

Please sign in to comment.