Skip to content

Commit

Permalink
feat: handle retry info so client respect the delay server sets
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Dec 7, 2023
1 parent 41c5355 commit 3a6040c
Show file tree
Hide file tree
Showing 5 changed files with 463 additions and 20 deletions.
5 changes: 5 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,9 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedUnaryCallable</className>
</difference>
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm</className>
<field>*</field>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.StreamingRetryAlgorithm;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
Expand Down Expand Up @@ -107,6 +109,8 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.RetryingCallable;
import com.google.cloud.bigtable.gaxx.retrying.RetryingServerStreamingCallable;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -485,7 +489,7 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
createRetryingStreamingCallable(retrying1, innerSettings);

return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}
Expand Down Expand Up @@ -568,7 +572,8 @@ public Map<String, String> extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);
createRetryingUnaryCallable(
withBigtableTracer, settings.sampleRowKeysSettings().getRetrySettings());

return createUserFacingUnaryCallable(
methodName, new SampleRowKeysCallable(retryable, requestContext));
Expand Down Expand Up @@ -607,7 +612,8 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext);
createRetryingUnaryCallable(
withBigtableTracer, settings.mutateRowSettings().getRetrySettings());

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -810,7 +816,8 @@ public Map<String, String> extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);
createRetryingUnaryCallable(
withBigtableTracer, settings.checkAndMutateRowSettings().getRetrySettings());

return createUserFacingUnaryCallable(
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -851,8 +858,8 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(
withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);
createRetryingUnaryCallable(
withBigtableTracer, settings.readModifyWriteRowSettings().getRetrySettings());

return createUserFacingUnaryCallable(
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -1062,6 +1069,34 @@ public Map<String, String> extract(PingAndWarmRequest request) {
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createRetryingUnaryCallable(
UnaryCallable<RequestT, ResponseT> innerCallable, RetrySettings retrySettings) {
RetryAlgorithm<ResponseT> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<>(),
new ExponentialRetryAlgorithm(retrySettings, clientContext.getClock()));
ScheduledRetryingExecutor<ResponseT> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new RetryingCallable<>(
clientContext.getDefaultCallContext(), innerCallable, retryingExecutor);
}

private <RequestT, ResponseT>
ServerStreamingCallable<RequestT, ResponseT> createRetryingStreamingCallable(
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
ServerStreamingCallSettings<RequestT, ResponseT> settings) {
RetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<>(),
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));
ScheduledRetryingExecutor<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new RetryingServerStreamingCallable<RequestT, ResponseT>(
innerCallable, retryingExecutor, settings.getResumptionStrategy());
}
// </editor-fold>

// <editor-fold desc="Callable accessors">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.bigtable.v2.MutateRowsResponse.Entry;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.NonCancellableFuture;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -235,7 +236,8 @@ private void handleAttemptError(Throwable rpcError) {
FailedMutation failedMutation = FailedMutation.create(origIndex, entryError);
allFailures.add(failedMutation);

if (!failedMutation.getError().isRetryable()) {
if (ApiResultRetryAlgorithm.extractRetryDelay(failedMutation.getError()) == null
&& !failedMutation.getError().isRetryable()) {
permanentFailures.add(failedMutation);
} else {
// Schedule the mutation entry for the next RPC by adding it to the request builder and
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Google LLC
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,33 +19,60 @@
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.protobuf.util.Durations;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.threeten.bp.Duration;

/** For internal use, public for technical reasons. */
/**
* For internal use, public for technical reasons. This retry algorithm checks the metadata of an
* exception for additional error details. If the metadata has a RetryInfo field, use the retry
* delay to set the wait time between attempts.
*/
@InternalApi
public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<ResponseT> {
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);

private static final Metadata.Key<RetryInfo> KEY_RETRY_INFO =
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
if (prevThrowable != null && prevThrowable instanceof DeadlineExceededException) {
return TimedAttemptSettings.newBuilder()
.setGlobalSettings(prevSettings.getGlobalSettings())
.setRetryDelay(prevSettings.getRetryDelay())
.setRpcTimeout(prevSettings.getRpcTimeout())
.setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION)
Duration retryDelay = extractRetryDelay(prevThrowable);
if (retryDelay != null) {
return prevSettings
.toBuilder()
.setRandomizedRetryDelay(retryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
.build();
}
return null;
}

@Override
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
return (prevThrowable instanceof ApiException) && ((ApiException) prevThrowable).isRetryable();
return extractRetryDelay(prevThrowable) != null
|| ((prevThrowable instanceof ApiException)
&& ((ApiException) prevThrowable).isRetryable());
}

public static Duration extractRetryDelay(Throwable throwable) {
if (throwable == null) {
return null;
}
Metadata trailers = Status.trailersFromThrowable(throwable);
if (trailers == null) {
return null;
}
RetryInfo retryInfo = trailers.get(KEY_RETRY_INFO);
if (retryInfo == null) {
return null;
}
if (!retryInfo.hasRetryDelay()) {
return null;
}
return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay()));
}
}
Loading

0 comments on commit 3a6040c

Please sign in to comment.