retryingFuture =
+ this.executor.createFuture(retryCallable, inputContext);
+ retryCallable.setExternalFuture(retryingFuture);
+ retryCallable.call();
+ return retryingFuture;
+ }
+
+ public String toString() {
+ return String.format("retrying(%s)", this.callable);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingServerStreamingCallable.java
new file mode 100644
index 0000000000..504cf4f2b7
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/RetryingServerStreamingCallable.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.gaxx.retrying;
+
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.InternalApi;
+import com.google.api.gax.retrying.RetryingFuture;
+import com.google.api.gax.retrying.ScheduledRetryingExecutor;
+import com.google.api.gax.retrying.ServerStreamingAttemptException;
+import com.google.api.gax.retrying.StreamResumptionStrategy;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+
+// TODO: remove this once ApiResultRetryAlgorithm is added to gax.
+/**
+ * A ServerStreamingCallable that implements resumable retries.
+ *
+ * Wraps a request, a {@link ResponseObserver} and an inner {@link ServerStreamingCallable} and
+ * coordinates retries between them. When the inner callable throws an error, this class will
+ * schedule retries using the configured {@link ScheduledRetryingExecutor}.
+ *
+ *
Streams can be resumed using a {@link StreamResumptionStrategy}. The {@link
+ * StreamResumptionStrategy} is notified of incoming responses and is expected to track the progress
+ * of the stream. Upon receiving an error, the {@link StreamResumptionStrategy} is asked to modify
+ * the original request to resume the stream.
+ */
+@InternalApi
+public final class RetryingServerStreamingCallable
+ extends ServerStreamingCallable {
+
+ private final ServerStreamingCallable innerCallable;
+ private final ScheduledRetryingExecutor executor;
+ private final StreamResumptionStrategy resumptionStrategyPrototype;
+
+ public RetryingServerStreamingCallable(
+ ServerStreamingCallable innerCallable,
+ ScheduledRetryingExecutor executor,
+ StreamResumptionStrategy resumptionStrategyPrototype) {
+ this.innerCallable = innerCallable;
+ this.executor = executor;
+ this.resumptionStrategyPrototype = resumptionStrategyPrototype;
+ }
+
+ @Override
+ public void call(
+ RequestT request,
+ final ResponseObserver responseObserver,
+ ApiCallContext context) {
+
+ ServerStreamingAttemptCallable attemptCallable =
+ new ServerStreamingAttemptCallable<>(
+ innerCallable,
+ resumptionStrategyPrototype.createNew(),
+ request,
+ context,
+ responseObserver);
+
+ RetryingFuture retryingFuture = executor.createFuture(attemptCallable, context);
+ attemptCallable.setExternalFuture(retryingFuture);
+ attemptCallable.start();
+
+ // Bridge the future result back to the external responseObserver
+ ApiFutures.addCallback(
+ retryingFuture,
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ // Make sure to unwrap the underlying ApiException
+ if (throwable instanceof ServerStreamingAttemptException) {
+ throwable = throwable.getCause();
+ }
+ responseObserver.onError(throwable);
+ }
+
+ @Override
+ public void onSuccess(Void ignored) {
+ responseObserver.onComplete();
+ }
+ },
+ directExecutor());
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java
new file mode 100644
index 0000000000..793cf2e91c
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ServerStreamingAttemptCallable.java
@@ -0,0 +1,366 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.gaxx.retrying;
+
+import com.google.api.core.InternalApi;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.retrying.RetryingFuture;
+import com.google.api.gax.retrying.ServerStreamingAttemptException;
+import com.google.api.gax.retrying.StreamResumptionStrategy;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.StateCheckingResponseObserver;
+import com.google.api.gax.rpc.StreamController;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+
+// TODO: remove this once ApiResultRetryAlgorithm is added to gax.
+/**
+ * A callable that generates Server Streaming attempts. At any one time, it is responsible for at
+ * most a single outstanding attempt. During an attempt, it proxies all incoming message to the
+ * outer {@link ResponseObserver} and the {@link StreamResumptionStrategy}. Once the attempt
+ * completes, the external {@link RetryingFuture} future is notified. If the {@link RetryingFuture}
+ * decides to retry the attempt, it will invoke {@link #call()}.
+ *
+ * The lifecycle of this class is:
+ *
+ *
+ * - The caller instantiates this class.
+ *
- The caller sets the {@link RetryingFuture} via {@link #setExternalFuture(RetryingFuture)}.
+ * The {@link RetryingFuture} will be responsible for scheduling future attempts.
+ *
- The caller calls {@link #start()}. This notifies the outer {@link ResponseObserver} that
+ * call is about to start.
+ *
- The outer {@link ResponseObserver} configures inbound flow control via the {@link
+ * StreamController} that it received in {@link ResponseObserver#onStart(StreamController)}.
+ *
- The attempt call is sent via the inner/upstream {@link ServerStreamingCallable}.
+ *
- A future representing the end state of the inner attempt is passed to the outer {@link
+ * RetryingFuture}.
+ *
- All messages received from the inner {@link ServerStreamingCallable} are recorded by the
+ * {@link StreamResumptionStrategy}.
+ *
- All messages received from the inner {@link ServerStreamingCallable} are forwarded to the
+ * outer {@link ResponseObserver}.
+ *
- Upon attempt completion (either success or failure) are communicated to the outer {@link
+ * RetryingFuture}.
+ *
- If the {@link RetryingFuture} decides to resume the RPC, it will invoke {@link #call()},
+ * which will consult the {@link StreamResumptionStrategy} for the resuming request and
+ * restart the process at step 5.
+ *
- Once the {@link RetryingFuture} decides to stop the retry loop, it will notify the outer
+ * {@link ResponseObserver}.
+ *
+ *
+ * This class is meant to be used as middleware between an outer {@link ResponseObserver} and an
+ * inner {@link ServerStreamingCallable}. As such it follows the general threading model of {@link
+ * ServerStreamingCallable}s:
+ *
+ *
+ * - {@code onStart} must be called in the same thread that invoked {@code call()}
+ *
- The outer {@link ResponseObserver} can call {@code request()} and {@code cancel()} on this
+ * class' {@link StreamController} from any thread
+ *
- The inner callable will serialize calls to {@code onResponse()}, {@code onError()} and
+ * {@code onComplete}
+ *
+ *
+ * With this model in mind, this class only needs to synchronize access data that is shared
+ * between: the outer {@link ResponseObserver} (via this class' {@link StreamController}) and the
+ * inner {@link ServerStreamingCallable}: pendingRequests, cancellationCause and the current
+ * innerController.
+ *
+ * @param request type
+ * @param response type
+ */
+@InternalApi
+public final class ServerStreamingAttemptCallable implements Callable {
+ private final Object lock = new Object();
+
+ private final ServerStreamingCallable innerCallable;
+ private final StreamResumptionStrategy resumptionStrategy;
+ private final RequestT initialRequest;
+ private ApiCallContext context;
+ private final ResponseObserver outerObserver;
+
+ // Start state
+ private boolean autoFlowControl = true;
+ private boolean isStarted;
+
+ // Outer state
+ private Throwable cancellationCause;
+
+ private int pendingRequests;
+
+ private RetryingFuture outerRetryingFuture;
+
+ // Internal retry state
+ private int numAttempts;
+
+ private StreamController innerController;
+
+ private boolean seenSuccessSinceLastError;
+ private SettableApiFuture innerAttemptFuture;
+
+ public ServerStreamingAttemptCallable(
+ ServerStreamingCallable innerCallable,
+ StreamResumptionStrategy resumptionStrategy,
+ RequestT initialRequest,
+ ApiCallContext context,
+ ResponseObserver outerObserver) {
+ this.innerCallable = innerCallable;
+ this.resumptionStrategy = resumptionStrategy;
+ this.initialRequest = initialRequest;
+ this.context = context;
+ this.outerObserver = outerObserver;
+ }
+
+ /** Sets controlling {@link RetryingFuture}. Must be called be before {@link #start()}. */
+ void setExternalFuture(RetryingFuture retryingFuture) {
+ Preconditions.checkState(!isStarted, "Can't change the RetryingFuture once the call has start");
+ Preconditions.checkNotNull(retryingFuture, "RetryingFuture can't be null");
+
+ this.outerRetryingFuture = retryingFuture;
+ }
+
+ /**
+ * Starts the initial call. The call is attempted on the caller's thread. Further call attempts
+ * will be scheduled by the {@link RetryingFuture}.
+ */
+ public void start() {
+ Preconditions.checkState(!isStarted, "Already started");
+
+ // Initialize the outer observer
+ outerObserver.onStart(
+ new StreamController() {
+ @Override
+ public void disableAutoInboundFlowControl() {
+ Preconditions.checkState(
+ !isStarted, "Can't disable auto flow control once the stream is started");
+ autoFlowControl = false;
+ }
+
+ @Override
+ public void request(int count) {
+ onRequest(count);
+ }
+
+ @Override
+ public void cancel() {
+ onCancel();
+ }
+ });
+
+ if (autoFlowControl) {
+ synchronized (lock) {
+ pendingRequests = Integer.MAX_VALUE;
+ }
+ }
+ isStarted = true;
+
+ // Call the inner callable
+ call();
+ }
+
+ /**
+ * Sends the actual RPC. The request being sent will first be transformed by the {@link
+ * StreamResumptionStrategy}.
+ *
+ * This method expects to be called by one thread at a time. Furthermore, it expects that the
+ * current RPC finished before the next time it's called.
+ */
+ @Override
+ public Void call() {
+ Preconditions.checkState(isStarted, "Must be started first");
+
+ RequestT request =
+ (++numAttempts == 1) ? initialRequest : resumptionStrategy.getResumeRequest(initialRequest);
+
+ // Should never happen. onAttemptError will check if ResumptionStrategy can create a resume
+ // request,
+ // which the RetryingFuture/StreamResumptionStrategy should respect.
+ Preconditions.checkState(request != null, "ResumptionStrategy returned a null request.");
+
+ innerAttemptFuture = SettableApiFuture.create();
+ seenSuccessSinceLastError = false;
+
+ ApiCallContext attemptContext = context;
+
+ if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()
+ && attemptContext.getTimeout() == null) {
+ attemptContext =
+ attemptContext.withTimeout(outerRetryingFuture.getAttemptSettings().getRpcTimeout());
+ }
+
+ attemptContext
+ .getTracer()
+ .attemptStarted(request, outerRetryingFuture.getAttemptSettings().getOverallAttemptCount());
+
+ innerCallable.call(
+ request,
+ new StateCheckingResponseObserver() {
+ @Override
+ public void onStartImpl(StreamController controller) {
+ onAttemptStart(controller);
+ }
+
+ @Override
+ public void onResponseImpl(ResponseT response) {
+ onAttemptResponse(response);
+ }
+
+ @Override
+ public void onErrorImpl(Throwable t) {
+ onAttemptError(t);
+ }
+
+ @Override
+ public void onCompleteImpl() {
+ onAttemptComplete();
+ }
+ },
+ attemptContext);
+
+ outerRetryingFuture.setAttemptFuture(innerAttemptFuture);
+
+ return null;
+ }
+
+ /**
+ * Called by the inner {@link ServerStreamingCallable} when the call is about to start. This will
+ * transfer unfinished state from the previous attempt.
+ *
+ * @see ResponseObserver#onStart(StreamController)
+ */
+ private void onAttemptStart(StreamController controller) {
+ if (!autoFlowControl) {
+ controller.disableAutoInboundFlowControl();
+ }
+
+ Throwable localCancellationCause;
+ int numToRequest = 0;
+
+ synchronized (lock) {
+ innerController = controller;
+
+ localCancellationCause = this.cancellationCause;
+
+ if (!autoFlowControl) {
+ numToRequest = pendingRequests;
+ }
+ }
+
+ if (localCancellationCause != null) {
+ controller.cancel();
+ } else if (numToRequest > 0) {
+ controller.request(numToRequest);
+ }
+ }
+
+ /**
+ * Called when the outer {@link ResponseObserver} wants to prematurely cancel the stream.
+ *
+ * @see StreamController#cancel()
+ */
+ private void onCancel() {
+ StreamController localInnerController;
+
+ synchronized (lock) {
+ if (cancellationCause != null) {
+ return;
+ }
+ // NOTE: BasicRetryingFuture will replace j.u.c.CancellationExceptions with it's own,
+ // which will not have the current stacktrace, so a special wrapper has be used here.
+ cancellationCause =
+ new ServerStreamingAttemptException(
+ new CancellationException("User cancelled stream"),
+ resumptionStrategy.canResume(),
+ seenSuccessSinceLastError);
+ localInnerController = innerController;
+ }
+
+ if (localInnerController != null) {
+ localInnerController.cancel();
+ }
+ }
+
+ /**
+ * Called when the outer {@link ResponseObserver} is ready for more data.
+ *
+ * @see StreamController#request(int)
+ */
+ private void onRequest(int count) {
+ Preconditions.checkState(!autoFlowControl, "Automatic flow control is enabled");
+ Preconditions.checkArgument(count > 0, "Count must be > 0");
+
+ final StreamController localInnerController;
+
+ synchronized (lock) {
+ int maxInc = Integer.MAX_VALUE - pendingRequests;
+ count = Math.min(maxInc, count);
+
+ pendingRequests += count;
+ localInnerController = this.innerController;
+ }
+
+ // Note: there is a race condition here where the count might go to the previous attempt's
+ // StreamController after it failed. But it doesn't matter, because the controller will just
+ // ignore it and the current controller will pick it up onStart.
+ if (localInnerController != null) {
+ localInnerController.request(count);
+ }
+ }
+
+ /** Called when the inner callable has responses to deliver. */
+ private void onAttemptResponse(ResponseT message) {
+ if (!autoFlowControl) {
+ synchronized (lock) {
+ pendingRequests--;
+ }
+ }
+ // Update local state to allow for future resume.
+ seenSuccessSinceLastError = true;
+ message = resumptionStrategy.processResponse(message);
+ // Notify the outer observer.
+ outerObserver.onResponse(message);
+ }
+
+ /**
+ * Called when the current RPC fails. The error will be bubbled up to the outer {@link
+ * RetryingFuture} via the {@link #innerAttemptFuture}.
+ */
+ private void onAttemptError(Throwable throwable) {
+ Throwable localCancellationCause;
+ synchronized (lock) {
+ localCancellationCause = cancellationCause;
+ }
+
+ if (localCancellationCause != null) {
+ // Take special care to preserve the cancellation's stack trace.
+ innerAttemptFuture.setException(localCancellationCause);
+ } else {
+ // Wrap the original exception and provide more context for StreamingRetryAlgorithm.
+ innerAttemptFuture.setException(
+ new ServerStreamingAttemptException(
+ throwable, resumptionStrategy.canResume(), seenSuccessSinceLastError));
+ }
+ }
+
+ /**
+ * Called when the current RPC successfully completes. Notifies the outer {@link RetryingFuture}
+ * via {@link #innerAttemptFuture}.
+ */
+ private void onAttemptComplete() {
+ innerAttemptFuture.set(null);
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java
new file mode 100644
index 0000000000..b335e7f842
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java
@@ -0,0 +1,374 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.InternalException;
+import com.google.api.gax.rpc.UnavailableException;
+import com.google.bigtable.v2.BigtableGrpc;
+import com.google.bigtable.v2.CheckAndMutateRowRequest;
+import com.google.bigtable.v2.CheckAndMutateRowResponse;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.bigtable.v2.MutateRowsResponse;
+import com.google.bigtable.v2.ReadModifyWriteRowRequest;
+import com.google.bigtable.v2.ReadModifyWriteRowResponse;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsResponse;
+import com.google.bigtable.v2.SampleRowKeysRequest;
+import com.google.bigtable.v2.SampleRowKeysResponse;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.BulkMutation;
+import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
+import com.google.cloud.bigtable.data.v2.models.Filters;
+import com.google.cloud.bigtable.data.v2.models.Mutation;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
+import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Queues;
+import com.google.protobuf.Duration;
+import com.google.rpc.RetryInfo;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.protobuf.ProtoUtils;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcServerRule;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class RetryInfoTest {
+
+ @Rule public GrpcServerRule serverRule = new GrpcServerRule();
+
+ private static final Metadata.Key RETRY_INFO_KEY =
+ ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
+
+ private FakeBigtableService service;
+ private BigtableDataClient client;
+
+ private AtomicInteger attemptCounter = new AtomicInteger();
+ private Duration delay = Duration.newBuilder().setSeconds(1).setNanos(0).build();
+
+ @Before
+ public void setUp() throws IOException {
+ service = new FakeBigtableService();
+ serverRule.getServiceRegistry().addService(service);
+
+ BigtableDataSettings.Builder settings =
+ BigtableDataSettings.newBuilder()
+ .setProjectId("fake-project")
+ .setInstanceId("fake-instance")
+ .setCredentialsProvider(NoCredentialsProvider.create());
+
+ settings
+ .stubSettings()
+ .setTransportChannelProvider(
+ FixedTransportChannelProvider.create(
+ GrpcTransportChannel.create(serverRule.getChannel())))
+ // channel priming doesn't work with FixedTransportChannelProvider. Disable it for the test
+ .setRefreshingChannel(false)
+ .build();
+
+ this.client = BigtableDataClient.create(settings.build());
+ }
+
+ @Test
+ public void testReadRow() {
+ createRetryableExceptionWithDelay(delay);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.readRow("table", "row");
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testReadRowNonRetryableErrorWithRetryInfo() {
+ createNonRetryableExceptionWithDelay(delay);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.readRow("table", "row");
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testReadRows() {
+ createRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.readRows(Query.create("table")).iterator().hasNext();
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testReadRowsNonRetraybleErrorWithRetryInfo() {
+ createNonRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.readRows(Query.create("table")).iterator().hasNext();
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testMutateRows() {
+ createRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ client.bulkMutateRows(
+ BulkMutation.create("fake-table")
+ .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v")));
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testMutateRowsNonRetryableErrorWithRetryInfo() {
+ createNonRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ client.bulkMutateRows(
+ BulkMutation.create("fake-table")
+ .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v")));
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testMutateRow() {
+ createRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v"));
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(1));
+ }
+
+ @Test
+ public void testMutateRowNonRetryableErrorWithRetryInfo() {
+ createNonRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v"));
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(1));
+ }
+
+ @Test
+ public void testSampleRowKeys() {
+ createRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ client.sampleRowKeys("table");
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testSampleRowKeysNonRetryableErrorWithRetryInfo() {
+ createNonRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ client.sampleRowKeys("table");
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testCheckAndMutateRow() {
+ createRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.checkAndMutateRow(
+ ConditionalRowMutation.create("table", "key")
+ .condition(Filters.FILTERS.value().regex("old-value"))
+ .then(Mutation.create().setCell("cf", "q", "v")));
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testReadModifyWrite() {
+ createRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.readModifyWriteRow(ReadModifyWriteRow.create("table", "row").append("cf", "q", "v"));
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ private void createRetryableExceptionWithDelay(Duration delay) {
+ Metadata trailers = new Metadata();
+ RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build();
+ trailers.put(RETRY_INFO_KEY, retryInfo);
+
+ ApiException exception =
+ new UnavailableException(
+ new StatusRuntimeException(Status.UNAVAILABLE, trailers),
+ GrpcStatusCode.of(Status.Code.UNAVAILABLE),
+ true);
+
+ service.expectations.add(exception);
+ }
+
+ private void createNonRetryableExceptionWithDelay(Duration delay) {
+ Metadata trailers = new Metadata();
+ RetryInfo retryInfo =
+ RetryInfo.newBuilder()
+ .setRetryDelay(Duration.newBuilder().setSeconds(1).setNanos(0).build())
+ .build();
+ trailers.put(RETRY_INFO_KEY, retryInfo);
+
+ ApiException exception =
+ new InternalException(
+ new StatusRuntimeException(Status.INTERNAL, trailers),
+ GrpcStatusCode.of(Status.Code.INTERNAL),
+ false);
+
+ service.expectations.add(exception);
+ }
+
+ private class FakeBigtableService extends BigtableGrpc.BigtableImplBase {
+ Queue expectations = Queues.newArrayDeque();
+
+ @Override
+ public void readRows(
+ ReadRowsRequest request, StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ responseObserver.onNext(ReadRowsResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+
+ @Override
+ public void mutateRow(
+ MutateRowRequest request, StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ responseObserver.onNext(MutateRowResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+
+ @Override
+ public void mutateRows(
+ MutateRowsRequest request, StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
+ for (int i = 0; i < request.getEntriesCount(); i++) {
+ builder.addEntriesBuilder().setIndex(i);
+ }
+ responseObserver.onNext(builder.build());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+
+ @Override
+ public void sampleRowKeys(
+ SampleRowKeysRequest request, StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ responseObserver.onNext(SampleRowKeysResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+
+ @Override
+ public void checkAndMutateRow(
+ CheckAndMutateRowRequest request,
+ StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ responseObserver.onNext(CheckAndMutateRowResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+
+ @Override
+ public void readModifyWriteRow(
+ ReadModifyWriteRowRequest request,
+ StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ responseObserver.onNext(ReadModifyWriteRowResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+ }
+}