diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml index 4bb4684c38..42b7c77725 100644 --- a/google-cloud-bigtable/clirr-ignored-differences.xml +++ b/google-cloud-bigtable/clirr-ignored-differences.xml @@ -150,4 +150,9 @@ 8001 com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerBatchedUnaryCallable + + 6001 + com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm + * + diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 705b3027ed..0e3177fe50 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -30,8 +30,10 @@ import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.ExponentialRetryAlgorithm; import com.google.api.gax.retrying.RetryAlgorithm; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.RetryingExecutorWithContext; import com.google.api.gax.retrying.ScheduledRetryingExecutor; +import com.google.api.gax.retrying.StreamingRetryAlgorithm; import com.google.api.gax.rpc.Callables; import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.RequestParamsExtractor; @@ -107,6 +109,8 @@ import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm; +import com.google.cloud.bigtable.gaxx.retrying.RetryingCallable; +import com.google.cloud.bigtable.gaxx.retrying.RetryingServerStreamingCallable; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -485,7 +489,7 @@ public Map extract(ReadRowsRequest readRowsRequest) { new ReadRowsRetryCompletedCallable<>(withBigtableTracer); ServerStreamingCallable retrying2 = - Callables.retrying(retrying1, innerSettings, clientContext); + createRetryingStreamingCallable(retrying1, innerSettings); return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); } @@ -568,7 +572,8 @@ public Map extract( new BigtableTracerUnaryCallable<>(withStatsHeaders); UnaryCallable> retryable = - Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext); + createRetryingUnaryCallable( + withBigtableTracer, settings.sampleRowKeysSettings().getRetrySettings()); return createUserFacingUnaryCallable( methodName, new SampleRowKeysCallable(retryable, requestContext)); @@ -607,7 +612,8 @@ public Map extract(MutateRowRequest mutateRowRequest) { new BigtableTracerUnaryCallable<>(withStatsHeaders); UnaryCallable retrying = - Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext); + createRetryingUnaryCallable( + withBigtableTracer, settings.mutateRowSettings().getRetrySettings()); return createUserFacingUnaryCallable( methodName, new MutateRowCallable(retrying, requestContext)); @@ -810,7 +816,8 @@ public Map extract( new BigtableTracerUnaryCallable<>(withStatsHeaders); UnaryCallable retrying = - Callables.retrying(withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext); + createRetryingUnaryCallable( + withBigtableTracer, settings.checkAndMutateRowSettings().getRetrySettings()); return createUserFacingUnaryCallable( methodName, new CheckAndMutateRowCallable(retrying, requestContext)); @@ -851,8 +858,8 @@ public Map extract(ReadModifyWriteRowRequest request) { new BigtableTracerUnaryCallable<>(withStatsHeaders); UnaryCallable retrying = - Callables.retrying( - withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext); + createRetryingUnaryCallable( + withBigtableTracer, settings.readModifyWriteRowSettings().getRetrySettings()); return createUserFacingUnaryCallable( methodName, new ReadModifyWriteRowCallable(retrying, requestContext)); @@ -1062,6 +1069,34 @@ public Map extract(PingAndWarmRequest request) { Collections.emptySet()); return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext()); } + + private UnaryCallable createRetryingUnaryCallable( + UnaryCallable innerCallable, RetrySettings retrySettings) { + RetryAlgorithm retryAlgorithm = + new RetryAlgorithm<>( + new ApiResultRetryAlgorithm<>(), + new ExponentialRetryAlgorithm(retrySettings, clientContext.getClock())); + ScheduledRetryingExecutor retryingExecutor = + new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor()); + + return new RetryingCallable<>( + clientContext.getDefaultCallContext(), innerCallable, retryingExecutor); + } + + private + ServerStreamingCallable createRetryingStreamingCallable( + ServerStreamingCallable innerCallable, + ServerStreamingCallSettings settings) { + RetryAlgorithm retryAlgorithm = + new StreamingRetryAlgorithm<>( + new ApiResultRetryAlgorithm<>(), + new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock())); + ScheduledRetryingExecutor retryingExecutor = + new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor()); + + return new RetryingServerStreamingCallable( + innerCallable, retryingExecutor, settings.getResumptionStrategy()); + } // // diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java index b049219a95..b6184bca44 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java @@ -31,6 +31,7 @@ import com.google.bigtable.v2.MutateRowsResponse.Entry; import com.google.cloud.bigtable.data.v2.models.MutateRowsException; import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation; +import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm; import com.google.cloud.bigtable.gaxx.retrying.NonCancellableFuture; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -235,7 +236,8 @@ private void handleAttemptError(Throwable rpcError) { FailedMutation failedMutation = FailedMutation.create(origIndex, entryError); allFailures.add(failedMutation); - if (!failedMutation.getError().isRetryable()) { + if (ApiResultRetryAlgorithm.extractRetryDelay(failedMutation.getError()) == null + && !failedMutation.getError().isRetryable()) { permanentFailures.add(failedMutation); } else { // Schedule the mutation entry for the next RPC by adding it to the request builder and diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java index c7f3d18b62..f809a30881 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 Google LLC + * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,26 +19,33 @@ import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.DeadlineExceededException; +import com.google.protobuf.util.Durations; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.protobuf.ProtoUtils; import org.threeten.bp.Duration; -/** For internal use, public for technical reasons. */ +/** + * For internal use, public for technical reasons. This retry algorithm checks the metadata of an + * exception for additional error details. If the metadata has a RetryInfo field, use the retry + * delay to set the wait time between attempts. + */ @InternalApi public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm { - // Duration to sleep on if the error is DEADLINE_EXCEEDED. - public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + + private static final Metadata.Key KEY_RETRY_INFO = + ProtoUtils.keyForProto(RetryInfo.getDefaultInstance()); @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { - if (prevThrowable != null && prevThrowable instanceof DeadlineExceededException) { - return TimedAttemptSettings.newBuilder() - .setGlobalSettings(prevSettings.getGlobalSettings()) - .setRetryDelay(prevSettings.getRetryDelay()) - .setRpcTimeout(prevSettings.getRpcTimeout()) - .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) + Duration retryDelay = extractRetryDelay(prevThrowable); + if (retryDelay != null) { + return prevSettings + .toBuilder() + .setRandomizedRetryDelay(retryDelay) .setAttemptCount(prevSettings.getAttemptCount() + 1) - .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) .build(); } return null; @@ -46,6 +53,26 @@ public TimedAttemptSettings createNextAttempt( @Override public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { - return (prevThrowable instanceof ApiException) && ((ApiException) prevThrowable).isRetryable(); + return extractRetryDelay(prevThrowable) != null + || ((prevThrowable instanceof ApiException) + && ((ApiException) prevThrowable).isRetryable()); + } + + public static Duration extractRetryDelay(Throwable throwable) { + if (throwable == null) { + return null; + } + Metadata trailers = Status.trailersFromThrowable(throwable); + if (trailers == null) { + return null; + } + RetryInfo retryInfo = trailers.get(KEY_RETRY_INFO); + if (retryInfo == null) { + return null; + } + if (!retryInfo.hasRetryDelay()) { + return null; + } + return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay())); } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java new file mode 100644 index 0000000000..b335e7f842 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RetryInfoTest.java @@ -0,0 +1,374 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.InternalException; +import com.google.api.gax.rpc.UnavailableException; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.CheckAndMutateRowRequest; +import com.google.bigtable.v2.CheckAndMutateRowResponse; +import com.google.bigtable.v2.MutateRowRequest; +import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.ReadModifyWriteRowRequest; +import com.google.bigtable.v2.ReadModifyWriteRowResponse; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.SampleRowKeysRequest; +import com.google.bigtable.v2.SampleRowKeysResponse; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.BulkMutation; +import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; +import com.google.cloud.bigtable.data.v2.models.Filters; +import com.google.cloud.bigtable.data.v2.models.Mutation; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Queues; +import com.google.protobuf.Duration; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.protobuf.ProtoUtils; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class RetryInfoTest { + + @Rule public GrpcServerRule serverRule = new GrpcServerRule(); + + private static final Metadata.Key RETRY_INFO_KEY = + ProtoUtils.keyForProto(RetryInfo.getDefaultInstance()); + + private FakeBigtableService service; + private BigtableDataClient client; + + private AtomicInteger attemptCounter = new AtomicInteger(); + private Duration delay = Duration.newBuilder().setSeconds(1).setNanos(0).build(); + + @Before + public void setUp() throws IOException { + service = new FakeBigtableService(); + serverRule.getServiceRegistry().addService(service); + + BigtableDataSettings.Builder settings = + BigtableDataSettings.newBuilder() + .setProjectId("fake-project") + .setInstanceId("fake-instance") + .setCredentialsProvider(NoCredentialsProvider.create()); + + settings + .stubSettings() + .setTransportChannelProvider( + FixedTransportChannelProvider.create( + GrpcTransportChannel.create(serverRule.getChannel()))) + // channel priming doesn't work with FixedTransportChannelProvider. Disable it for the test + .setRefreshingChannel(false) + .build(); + + this.client = BigtableDataClient.create(settings.build()); + } + + @Test + public void testReadRow() { + createRetryableExceptionWithDelay(delay); + Stopwatch stopwatch = Stopwatch.createStarted(); + client.readRow("table", "row"); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testReadRowNonRetryableErrorWithRetryInfo() { + createNonRetryableExceptionWithDelay(delay); + Stopwatch stopwatch = Stopwatch.createStarted(); + client.readRow("table", "row"); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testReadRows() { + createRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + client.readRows(Query.create("table")).iterator().hasNext(); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testReadRowsNonRetraybleErrorWithRetryInfo() { + createNonRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + client.readRows(Query.create("table")).iterator().hasNext(); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testMutateRows() { + createRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v"))); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testMutateRowsNonRetryableErrorWithRetryInfo() { + createNonRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v"))); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testMutateRow() { + createRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v")); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(1)); + } + + @Test + public void testMutateRowNonRetryableErrorWithRetryInfo() { + createNonRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v")); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(1)); + } + + @Test + public void testSampleRowKeys() { + createRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + client.sampleRowKeys("table"); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testSampleRowKeysNonRetryableErrorWithRetryInfo() { + createNonRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + client.sampleRowKeys("table"); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testCheckAndMutateRow() { + createRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + client.checkAndMutateRow( + ConditionalRowMutation.create("table", "key") + .condition(Filters.FILTERS.value().regex("old-value")) + .then(Mutation.create().setCell("cf", "q", "v"))); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + @Test + public void testReadModifyWrite() { + createRetryableExceptionWithDelay(delay); + + Stopwatch stopwatch = Stopwatch.createStarted(); + client.readModifyWriteRow(ReadModifyWriteRow.create("table", "row").append("cf", "q", "v")); + stopwatch.stop(); + + assertThat(attemptCounter.get()).isEqualTo(2); + assertThat(stopwatch.elapsed()).isAtLeast(java.time.Duration.ofSeconds(delay.getSeconds())); + } + + private void createRetryableExceptionWithDelay(Duration delay) { + Metadata trailers = new Metadata(); + RetryInfo retryInfo = RetryInfo.newBuilder().setRetryDelay(delay).build(); + trailers.put(RETRY_INFO_KEY, retryInfo); + + ApiException exception = + new UnavailableException( + new StatusRuntimeException(Status.UNAVAILABLE, trailers), + GrpcStatusCode.of(Status.Code.UNAVAILABLE), + true); + + service.expectations.add(exception); + } + + private void createNonRetryableExceptionWithDelay(Duration delay) { + Metadata trailers = new Metadata(); + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(1).setNanos(0).build()) + .build(); + trailers.put(RETRY_INFO_KEY, retryInfo); + + ApiException exception = + new InternalException( + new StatusRuntimeException(Status.INTERNAL, trailers), + GrpcStatusCode.of(Status.Code.INTERNAL), + false); + + service.expectations.add(exception); + } + + private class FakeBigtableService extends BigtableGrpc.BigtableImplBase { + Queue expectations = Queues.newArrayDeque(); + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(ReadRowsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void mutateRow( + MutateRowRequest request, StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(MutateRowResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void mutateRows( + MutateRowsRequest request, StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void sampleRowKeys( + SampleRowKeysRequest request, StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(SampleRowKeysResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void checkAndMutateRow( + CheckAndMutateRowRequest request, + StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(CheckAndMutateRowResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + + @Override + public void readModifyWriteRow( + ReadModifyWriteRowRequest request, + StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(ReadModifyWriteRowResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + } +}