From b482337b82ebb5b9929ad854064c7d4b8385c289 Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Wed, 26 Jun 2024 23:13:43 +0400 Subject: [PATCH] Add Storage API streaming max retries parameter for BigQueryOptions (#31683) --- .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 8 ++++++++ .../StorageApiWriteRecordsInconsistent.java | 3 ++- .../bigquery/StorageApiWriteUnshardedRecords.java | 10 +++++++--- .../bigquery/StorageApiWritesShardedRecords.java | 15 ++++++++++++--- 4 files changed, 29 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 118db7a3711b..cd1fc6d3842c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -133,6 +133,14 @@ public interface BigQueryOptions void setStorageWriteApiTriggeringFrequencySec(Integer value); + @Description( + "Maximum number of retries for Storage Write API writes. " + + "Currently it is only applicable for streaming pipeline.") + @Default.Integer(500) + Integer getStorageWriteApiMaxRetries(); + + void setStorageWriteApiMaxRetries(Integer value); + @Description( "When auto-sharding is used, the maximum duration in milliseconds the input records are" + " allowed to be buffered before being written to BigQuery.") diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java index 022ee1fbed08..389c749d4a4d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java @@ -108,7 +108,8 @@ public PCollectionTuple expand(PCollection dynamicDestinations; private final BigQueryServices bqServices; private final boolean useDefaultStream; @@ -910,7 +912,8 @@ void postFlush() { BigQueryIO.Write.CreateDisposition createDisposition, @Nullable String kmsKey, boolean usesCdc, - AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) { + AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, + int maxRetries) { this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; @@ -927,6 +930,7 @@ void postFlush() { this.kmsKey = kmsKey; this.usesCdc = usesCdc; this.defaultMissingValueInterpretation = defaultMissingValueInterpretation; + this.maxRetries = maxRetries; } boolean shouldFlush() { @@ -960,7 +964,7 @@ void flushAll( new RetryManager<>( Duration.standardSeconds(1), Duration.standardSeconds(20), - 500, + maxRetries, BigQuerySinkMetrics.throttledTimeCounter( BigQuerySinkMetrics.RpcMethod.APPEND_ROWS)); retryManagers.add(retryManager); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index bb7c3b640dc1..1232e1a7097c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -242,6 +242,7 @@ public PCollectionTuple expand( BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); final long splitSize = bigQueryOptions.getStorageApiAppendThresholdBytes(); final long maxRequestSize = bigQueryOptions.getStorageWriteApiMaxRequestSize(); + final int maxRetries = bigQueryOptions.getStorageWriteApiMaxRetries(); String operationName = input.getName() + "/" + getName(); TupleTagList tupleTagList = TupleTagList.of(failedRowsTag); @@ -252,7 +253,9 @@ public PCollectionTuple expand( PCollectionTuple writeRecordsResult = input.apply( "Write Records", - ParDo.of(new WriteRecordsDoFn(operationName, streamIdleTime, splitSize, maxRequestSize)) + ParDo.of( + new WriteRecordsDoFn( + operationName, streamIdleTime, splitSize, maxRequestSize, maxRetries)) .withSideInputs(dynamicDestinations.getSideInputs()) .withOutputTags(flushTag, tupleTagList)); @@ -340,13 +343,19 @@ class WriteRecordsDoFn private final Duration streamIdleTime; private final long splitSize; private final long maxRequestSize; + private final int maxRetries; public WriteRecordsDoFn( - String operationName, Duration streamIdleTime, long splitSize, long maxRequestSize) { + String operationName, + Duration streamIdleTime, + long splitSize, + long maxRequestSize, + int maxRetries) { this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); this.streamIdleTime = streamIdleTime; this.splitSize = splitSize; this.maxRequestSize = maxRequestSize; + this.maxRetries = maxRetries; } @StartBundle @@ -819,7 +828,7 @@ public void process( new RetryManager<>( Duration.standardSeconds(1), Duration.standardSeconds(20), - 500, + maxRetries, BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS)); int numAppends = 0; for (SplittingIterable.Value splitValue : messages) {