diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml
index 4bb4684c38..42b7c77725 100644
--- a/google-cloud-bigtable/clirr-ignored-differences.xml
+++ b/google-cloud-bigtable/clirr-ignored-differences.xml
@@ -150,4 +150,9 @@
8001
com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedUnaryCallable
+
+ 6001
+ com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm
+ *
+
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
index 705b3027ed..0e3177fe50 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
@@ -30,8 +30,10 @@
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
+import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
+import com.google.api.gax.retrying.StreamingRetryAlgorithm;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
@@ -107,6 +109,8 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
+import com.google.cloud.bigtable.gaxx.retrying.RetryingCallable;
+import com.google.cloud.bigtable.gaxx.retrying.RetryingServerStreamingCallable;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -485,7 +489,7 @@ public Map extract(ReadRowsRequest readRowsRequest) {
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);
ServerStreamingCallable retrying2 =
- Callables.retrying(retrying1, innerSettings, clientContext);
+ createRetryingStreamingCallable(retrying1, innerSettings);
return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}
@@ -568,7 +572,8 @@ public Map extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);
UnaryCallable> retryable =
- Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);
+ createRetryingUnaryCallable(
+ withBigtableTracer, settings.sampleRowKeysSettings().getRetrySettings());
return createUserFacingUnaryCallable(
methodName, new SampleRowKeysCallable(retryable, requestContext));
@@ -607,7 +612,8 @@ public Map extract(MutateRowRequest mutateRowRequest) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);
UnaryCallable retrying =
- Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext);
+ createRetryingUnaryCallable(
+ withBigtableTracer, settings.mutateRowSettings().getRetrySettings());
return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
@@ -810,7 +816,8 @@ public Map extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);
UnaryCallable retrying =
- Callables.retrying(withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);
+ createRetryingUnaryCallable(
+ withBigtableTracer, settings.checkAndMutateRowSettings().getRetrySettings());
return createUserFacingUnaryCallable(
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
@@ -851,8 +858,8 @@ public Map extract(ReadModifyWriteRowRequest request) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);
UnaryCallable retrying =
- Callables.retrying(
- withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);
+ createRetryingUnaryCallable(
+ withBigtableTracer, settings.readModifyWriteRowSettings().getRetrySettings());
return createUserFacingUnaryCallable(
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
@@ -1062,6 +1069,34 @@ public Map extract(PingAndWarmRequest request) {
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
}
+
+ private UnaryCallable createRetryingUnaryCallable(
+ UnaryCallable innerCallable, RetrySettings retrySettings) {
+ RetryAlgorithm retryAlgorithm =
+ new RetryAlgorithm<>(
+ new ApiResultRetryAlgorithm<>(),
+ new ExponentialRetryAlgorithm(retrySettings, clientContext.getClock()));
+ ScheduledRetryingExecutor retryingExecutor =
+ new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
+
+ return new RetryingCallable<>(
+ clientContext.getDefaultCallContext(), innerCallable, retryingExecutor);
+ }
+
+ private
+ ServerStreamingCallable createRetryingStreamingCallable(
+ ServerStreamingCallable innerCallable,
+ ServerStreamingCallSettings settings) {
+ RetryAlgorithm retryAlgorithm =
+ new StreamingRetryAlgorithm<>(
+ new ApiResultRetryAlgorithm<>(),
+ new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));
+ ScheduledRetryingExecutor retryingExecutor =
+ new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
+
+ return new RetryingServerStreamingCallable(
+ innerCallable, retryingExecutor, settings.getResumptionStrategy());
+ }
//
//
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java
index b049219a95..b6184bca44 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java
@@ -31,6 +31,7 @@
import com.google.bigtable.v2.MutateRowsResponse.Entry;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException;
import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation;
+import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.NonCancellableFuture;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -235,7 +236,8 @@ private void handleAttemptError(Throwable rpcError) {
FailedMutation failedMutation = FailedMutation.create(origIndex, entryError);
allFailures.add(failedMutation);
- if (!failedMutation.getError().isRetryable()) {
+ if (ApiResultRetryAlgorithm.extractRetryDelay(failedMutation.getError()) == null
+ && !failedMutation.getError().isRetryable()) {
permanentFailures.add(failedMutation);
} else {
// Schedule the mutation entry for the next RPC by adding it to the request builder and
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java
index c7f3d18b62..f809a30881 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Google LLC
+ * Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,26 +19,33 @@
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
-import com.google.api.gax.rpc.DeadlineExceededException;
+import com.google.protobuf.util.Durations;
+import com.google.rpc.RetryInfo;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.protobuf.ProtoUtils;
import org.threeten.bp.Duration;
-/** For internal use, public for technical reasons. */
+/**
+ * For internal use, public for technical reasons. This retry algorithm checks the metadata of an
+ * exception for additional error details. If the metadata has a RetryInfo field, use the retry
+ * delay to set the wait time between attempts.
+ */
@InternalApi
public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm {
- // Duration to sleep on if the error is DEADLINE_EXCEEDED.
- public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);
+
+ private static final Metadata.Key KEY_RETRY_INFO =
+ ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
- if (prevThrowable != null && prevThrowable instanceof DeadlineExceededException) {
- return TimedAttemptSettings.newBuilder()
- .setGlobalSettings(prevSettings.getGlobalSettings())
- .setRetryDelay(prevSettings.getRetryDelay())
- .setRpcTimeout(prevSettings.getRpcTimeout())
- .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION)
+ Duration retryDelay = extractRetryDelay(prevThrowable);
+ if (retryDelay != null) {
+ return prevSettings
+ .toBuilder()
+ .setRandomizedRetryDelay(retryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
- .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
.build();
}
return null;
@@ -46,6 +53,26 @@ public TimedAttemptSettings createNextAttempt(
@Override
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
- return (prevThrowable instanceof ApiException) && ((ApiException) prevThrowable).isRetryable();
+ return extractRetryDelay(prevThrowable) != null
+ || ((prevThrowable instanceof ApiException)
+ && ((ApiException) prevThrowable).isRetryable());
+ }
+
+ public static Duration extractRetryDelay(Throwable throwable) {
+ if (throwable == null) {
+ return null;
+ }
+ Metadata trailers = Status.trailersFromThrowable(throwable);
+ if (trailers == null) {
+ return null;
+ }
+ RetryInfo retryInfo = trailers.get(KEY_RETRY_INFO);
+ if (retryInfo == null) {
+ return null;
+ }
+ if (!retryInfo.hasRetryDelay()) {
+ return null;
+ }
+ return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay()));
}
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java
new file mode 100644
index 0000000000..b335e7f842
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java
@@ -0,0 +1,374 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.InternalException;
+import com.google.api.gax.rpc.UnavailableException;
+import com.google.bigtable.v2.BigtableGrpc;
+import com.google.bigtable.v2.CheckAndMutateRowRequest;
+import com.google.bigtable.v2.CheckAndMutateRowResponse;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.bigtable.v2.MutateRowsResponse;
+import com.google.bigtable.v2.ReadModifyWriteRowRequest;
+import com.google.bigtable.v2.ReadModifyWriteRowResponse;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsResponse;
+import com.google.bigtable.v2.SampleRowKeysRequest;
+import com.google.bigtable.v2.SampleRowKeysResponse;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.BulkMutation;
+import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
+import com.google.cloud.bigtable.data.v2.models.Filters;
+import com.google.cloud.bigtable.data.v2.models.Mutation;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
+import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Queues;
+import com.google.protobuf.Duration;
+import com.google.rpc.RetryInfo;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.protobuf.ProtoUtils;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcServerRule;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class RetryInfoTest {
+
+ @Rule public GrpcServerRule serverRule = new GrpcServerRule();
+
+ private static final Metadata.Key RETRY_INFO_KEY =
+ ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
+
+ private FakeBigtableService service;
+ private BigtableDataClient client;
+
+ private AtomicInteger attemptCounter = new AtomicInteger();
+ private Duration delay = Duration.newBuilder().setSeconds(1).setNanos(0).build();
+
+ @Before
+ public void setUp() throws IOException {
+ service = new FakeBigtableService();
+ serverRule.getServiceRegistry().addService(service);
+
+ BigtableDataSettings.Builder settings =
+ BigtableDataSettings.newBuilder()
+ .setProjectId("fake-project")
+ .setInstanceId("fake-instance")
+ .setCredentialsProvider(NoCredentialsProvider.create());
+
+ settings
+ .stubSettings()
+ .setTransportChannelProvider(
+ FixedTransportChannelProvider.create(
+ GrpcTransportChannel.create(serverRule.getChannel())))
+ // channel priming doesn't work with FixedTransportChannelProvider. Disable it for the test
+ .setRefreshingChannel(false)
+ .build();
+
+ this.client = BigtableDataClient.create(settings.build());
+ }
+
+ @Test
+ public void testReadRow() {
+ createRetryableExceptionWithDelay(delay);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.readRow("table", "row");
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testReadRowNonRetryableErrorWithRetryInfo() {
+ createNonRetryableExceptionWithDelay(delay);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.readRow("table", "row");
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testReadRows() {
+ createRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.readRows(Query.create("table")).iterator().hasNext();
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testReadRowsNonRetraybleErrorWithRetryInfo() {
+ createNonRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.readRows(Query.create("table")).iterator().hasNext();
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testMutateRows() {
+ createRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ client.bulkMutateRows(
+ BulkMutation.create("fake-table")
+ .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v")));
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testMutateRowsNonRetryableErrorWithRetryInfo() {
+ createNonRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ client.bulkMutateRows(
+ BulkMutation.create("fake-table")
+ .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v")));
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testMutateRow() {
+ createRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v"));
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(1));
+ }
+
+ @Test
+ public void testMutateRowNonRetryableErrorWithRetryInfo() {
+ createNonRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v"));
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(1));
+ }
+
+ @Test
+ public void testSampleRowKeys() {
+ createRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ client.sampleRowKeys("table");
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testSampleRowKeysNonRetryableErrorWithRetryInfo() {
+ createNonRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ client.sampleRowKeys("table");
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testCheckAndMutateRow() {
+ createRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.checkAndMutateRow(
+ ConditionalRowMutation.create("table", "key")
+ .condition(Filters.FILTERS.value().regex("old-value"))
+ .then(Mutation.create().setCell("cf", "q", "v")));
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ @Test
+ public void testReadModifyWrite() {
+ createRetryableExceptionWithDelay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ client.readModifyWriteRow(ReadModifyWriteRow.create("table", "row").append("cf", "q", "v"));
+ stopwatch.stop();
+
+ assertThat(attemptCounter.get()).isEqualTo(2);
+ assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds()));
+ }
+
+ private void createRetryableExceptionWithDelay(Duration delay) {
+ Metadata trailers = new Metadata();
+ RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build();
+ trailers.put(RETRY_INFO_KEY, retryInfo);
+
+ ApiException exception =
+ new UnavailableException(
+ new StatusRuntimeException(Status.UNAVAILABLE, trailers),
+ GrpcStatusCode.of(Status.Code.UNAVAILABLE),
+ true);
+
+ service.expectations.add(exception);
+ }
+
+ private void createNonRetryableExceptionWithDelay(Duration delay) {
+ Metadata trailers = new Metadata();
+ RetryInfo retryInfo =
+ RetryInfo.newBuilder()
+ .setRetryDelay(Duration.newBuilder().setSeconds(1).setNanos(0).build())
+ .build();
+ trailers.put(RETRY_INFO_KEY, retryInfo);
+
+ ApiException exception =
+ new InternalException(
+ new StatusRuntimeException(Status.INTERNAL, trailers),
+ GrpcStatusCode.of(Status.Code.INTERNAL),
+ false);
+
+ service.expectations.add(exception);
+ }
+
+ private class FakeBigtableService extends BigtableGrpc.BigtableImplBase {
+ Queue expectations = Queues.newArrayDeque();
+
+ @Override
+ public void readRows(
+ ReadRowsRequest request, StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ responseObserver.onNext(ReadRowsResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+
+ @Override
+ public void mutateRow(
+ MutateRowRequest request, StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ responseObserver.onNext(MutateRowResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+
+ @Override
+ public void mutateRows(
+ MutateRowsRequest request, StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
+ for (int i = 0; i < request.getEntriesCount(); i++) {
+ builder.addEntriesBuilder().setIndex(i);
+ }
+ responseObserver.onNext(builder.build());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+
+ @Override
+ public void sampleRowKeys(
+ SampleRowKeysRequest request, StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ responseObserver.onNext(SampleRowKeysResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+
+ @Override
+ public void checkAndMutateRow(
+ CheckAndMutateRowRequest request,
+ StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ responseObserver.onNext(CheckAndMutateRowResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+
+ @Override
+ public void readModifyWriteRow(
+ ReadModifyWriteRowRequest request,
+ StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ responseObserver.onNext(ReadModifyWriteRowResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+ }
+}