Skip to content

Commit

Permalink
Add Storage API streaming max retries parameter for BigQueryOptions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Amar3tto authored Jun 26, 2024
1 parent 90d3f8a commit b482337
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ public interface BigQueryOptions

void setStorageWriteApiTriggeringFrequencySec(Integer value);

@Description(
"Maximum number of retries for Storage Write API writes. "
+ "Currently it is only applicable for streaming pipeline.")
@Default.Integer(500)
Integer getStorageWriteApiMaxRetries();

void setStorageWriteApiMaxRetries(Integer value);

@Description(
"When auto-sharding is used, the maximum duration in milliseconds the input records are"
+ " allowed to be buffered before being written to BigQuery.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public PCollectionTuple expand(PCollection<KV<DestinationT, StorageApiWritePaylo
createDisposition,
kmsKey,
usesCdc,
defaultMissingValueInterpretation))
defaultMissingValueInterpretation,
bigQueryOptions.getStorageWriteApiMaxRetries()))
.withOutputTags(finalizeTag, tupleTagList)
.withSideInputs(dynamicDestinations.getSideInputs()));
result.get(failedRowsTag).setCoder(failedRowsCoder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ public PCollectionTuple expand(PCollection<KV<DestinationT, StorageApiWritePaylo
createDisposition,
kmsKey,
usesCdc,
defaultMissingValueInterpretation))
defaultMissingValueInterpretation,
options.getStorageWriteApiMaxRetries()))
.withOutputTags(finalizeTag, tupleTagList)
.withSideInputs(dynamicDestinations.getSideInputs()));

Expand Down Expand Up @@ -889,6 +890,7 @@ void postFlush() {
private int numPendingRecordBytes = 0;
private final int flushThresholdBytes;
private final int flushThresholdCount;
private final int maxRetries;
private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private final BigQueryServices bqServices;
private final boolean useDefaultStream;
Expand All @@ -910,7 +912,8 @@ void postFlush() {
BigQueryIO.Write.CreateDisposition createDisposition,
@Nullable String kmsKey,
boolean usesCdc,
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation,
int maxRetries) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
this.dynamicDestinations = dynamicDestinations;
this.bqServices = bqServices;
Expand All @@ -927,6 +930,7 @@ void postFlush() {
this.kmsKey = kmsKey;
this.usesCdc = usesCdc;
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
this.maxRetries = maxRetries;
}

boolean shouldFlush() {
Expand Down Expand Up @@ -960,7 +964,7 @@ void flushAll(
new RetryManager<>(
Duration.standardSeconds(1),
Duration.standardSeconds(20),
500,
maxRetries,
BigQuerySinkMetrics.throttledTimeCounter(
BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
retryManagers.add(retryManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ public PCollectionTuple expand(
BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
final long splitSize = bigQueryOptions.getStorageApiAppendThresholdBytes();
final long maxRequestSize = bigQueryOptions.getStorageWriteApiMaxRequestSize();
final int maxRetries = bigQueryOptions.getStorageWriteApiMaxRetries();

String operationName = input.getName() + "/" + getName();
TupleTagList tupleTagList = TupleTagList.of(failedRowsTag);
Expand All @@ -252,7 +253,9 @@ public PCollectionTuple expand(
PCollectionTuple writeRecordsResult =
input.apply(
"Write Records",
ParDo.of(new WriteRecordsDoFn(operationName, streamIdleTime, splitSize, maxRequestSize))
ParDo.of(
new WriteRecordsDoFn(
operationName, streamIdleTime, splitSize, maxRequestSize, maxRetries))
.withSideInputs(dynamicDestinations.getSideInputs())
.withOutputTags(flushTag, tupleTagList));

Expand Down Expand Up @@ -340,13 +343,19 @@ class WriteRecordsDoFn
private final Duration streamIdleTime;
private final long splitSize;
private final long maxRequestSize;
private final int maxRetries;

public WriteRecordsDoFn(
String operationName, Duration streamIdleTime, long splitSize, long maxRequestSize) {
String operationName,
Duration streamIdleTime,
long splitSize,
long maxRequestSize,
int maxRetries) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
this.streamIdleTime = streamIdleTime;
this.splitSize = splitSize;
this.maxRequestSize = maxRequestSize;
this.maxRetries = maxRetries;
}

@StartBundle
Expand Down Expand Up @@ -819,7 +828,7 @@ public void process(
new RetryManager<>(
Duration.standardSeconds(1),
Duration.standardSeconds(20),
500,
maxRetries,
BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
int numAppends = 0;
for (SplittingIterable.Value splitValue : messages) {
Expand Down

0 comments on commit b482337

Please sign in to comment.