diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java index 19f7a5224c..78d507665e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java @@ -24,15 +24,16 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StreamController; import com.google.api.gax.rpc.UnaryCallable; -import com.google.api.gax.tracing.ApiTracer; import com.google.api.gax.tracing.ApiTracerFactory; import com.google.api.gax.tracing.SpanName; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; import io.grpc.Status; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; /** * Helper to convert a fake {@link ServerStreamingCallable} (ie only up to 1 response) into a {@link @@ -73,9 +74,10 @@ public BigtableUnaryOperationCallable( public ApiFuture futureCall(ReqT req, ApiCallContext apiCallContext) { apiCallContext = defaultCallContext.merge(apiCallContext); - ApiTracer apiTracer = - tracerFactory.newTracer( - apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary); + BigtableTracer apiTracer = + (BigtableTracer) + tracerFactory.newTracer( + apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary); apiCallContext = apiCallContext.withTracer(apiTracer); @@ -85,18 +87,15 @@ public ApiFuture futureCall(ReqT req, ApiCallContext apiCallContext) { } class UnaryFuture extends AbstractApiFuture implements ResponseObserver { - private final ApiTracer tracer; + private final BigtableTracer tracer; private final boolean allowNoResponse; private StreamController controller; private final AtomicBoolean upstreamCancelled = new AtomicBoolean(); - private boolean responseReceived; - private @Nullable RespT response; - private UnaryFuture(ApiTracer tracer, boolean allowNoResponse) { + private UnaryFuture(BigtableTracer tracer, boolean allowNoResponse) { this.tracer = Preconditions.checkNotNull(tracer, "tracer can't be null"); this.allowNoResponse = allowNoResponse; - this.responseReceived = false; } @Override @@ -130,23 +129,39 @@ private void cancelUpstream() { public void onResponse(RespT resp) { tracer.responseReceived(); - // happy path - buffer the only responsse - if (!responseReceived) { - responseReceived = true; - this.response = resp; + if (set(resp)) { + tracer.operationFinishEarly(); return; } - String msg = - String.format( - "Received multiple responses for a %s unary operation. Previous: %s, New: %s", - spanName, response, resp); - logger.log(Level.WARNING, msg); + // At this point we are guaranteed that the future has been resolved. However we need to check + // why. + // We know it's not because it was resolved with the current response. Moreover, since the + // future + // is resolved, our only means to flag the error is to log. + // So there are 3 possibilities: + // 1. user cancelled the future + // 2. this is an extra response and the previous one resolved the future + // 3. we got a response after the rpc failed (this should never happen and would be a bad bug) - InternalException error = - new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false); - if (setException(error)) { - tracer.operationFailed(error); + if (isCancelled()) { + return; + } + + try { + RespT prev = Futures.getDone(this); + String msg = + String.format( + "Received response after future is resolved for a %s unary operation. previous: %s, New response: %s", + spanName, prev, resp); + logger.log(Level.WARNING, msg); + } catch (ExecutionException e) { + // Should never happen + String msg = + String.format( + "Received response after future resolved as a failure for a %s unary operation. New response: %s", + spanName, resp); + logger.log(Level.WARNING, msg, e.getCause()); } cancelUpstream(); @@ -158,18 +173,24 @@ public void onError(Throwable throwable) { tracer.operationFailed(throwable); } else if (isCancelled()) { tracer.operationCancelled(); + } else { + // At this point the has been resolved, so we ignore the error + tracer.operationSucceeded(); } - // The future might've been resolved due to double response } @Override public void onComplete() { - if (allowNoResponse || responseReceived) { - if (set(response)) { - tracer.operationSucceeded(); - return; - } - } else { + if (allowNoResponse && set(null)) { + tracer.operationSucceeded(); + return; + + // Under normal circumstances the future wouldve been resolved in onResponse or via + // set(null) if it expected for + // the rpc to not have a response. So if aren't done, the only reason is that we didn't get + // a response + // but were expecting one + } else if (!isDone()) { String msg = spanName + " unary operation completed without a response message"; InternalException e = new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false); @@ -183,7 +204,10 @@ public void onComplete() { // check cancellation race if (isCancelled()) { tracer.operationCancelled(); + return; } + + tracer.operationSucceeded(); } } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 94c91fb72a..bcb084b491 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -563,7 +563,7 @@ public ServerStreamingCallable createReadRowsCallable( * */ public UnaryCallable createReadRowCallable(RowAdapter rowAdapter) { - if (!EnhancedBigtableStubSettings.SKIP_TRAILERS) { + if (!settings.getEnableSkipTrailers()) { ServerStreamingCallable readRowsCallable = createReadRowsBaseCallable( ServerStreamingCallSettings.newBuilder() @@ -1296,7 +1296,7 @@ private UnaryCallable createUnar UnaryCallSettings callSettings, Function requestTransformer, Function responseTranformer) { - if (EnhancedBigtableStubSettings.SKIP_TRAILERS) { + if (settings.getEnableSkipTrailers()) { return createUnaryCallableNew( methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer); } else { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 863389166f..1425e7b362 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -109,7 +109,7 @@ public class EnhancedBigtableStubSettings extends StubSettings jwtAudienceMapping; private final boolean enableRoutingCookie; private final boolean enableRetryInfo; + private final boolean enableSkipTrailers; private final ServerStreamingCallSettings readRowsSettings; private final UnaryCallSettings readRowSettings; @@ -287,6 +288,7 @@ private EnhancedBigtableStubSettings(Builder builder) { jwtAudienceMapping = builder.jwtAudienceMapping; enableRoutingCookie = builder.enableRoutingCookie; enableRetryInfo = builder.enableRetryInfo; + enableSkipTrailers = builder.enableSkipTrailers; metricsProvider = builder.metricsProvider; metricsEndpoint = builder.metricsEndpoint; @@ -373,6 +375,10 @@ public boolean getEnableRetryInfo() { return enableRetryInfo; } + boolean getEnableSkipTrailers() { + return enableSkipTrailers; + } + /** * Gets the Google Cloud Monitoring endpoint for publishing client side metrics. If it's null, * client will publish metrics to the default monitoring endpoint. @@ -683,6 +689,7 @@ public static class Builder extends StubSettings.Builder jwtAudienceMapping; private boolean enableRoutingCookie; private boolean enableRetryInfo; + private boolean enableSkipTrailers; private final ServerStreamingCallSettings.Builder readRowsSettings; private final UnaryCallSettings.Builder readRowSettings; @@ -721,6 +728,7 @@ private Builder() { setCredentialsProvider(defaultCredentialsProviderBuilder().build()); this.enableRoutingCookie = true; this.enableRetryInfo = true; + this.enableSkipTrailers = SKIP_TRAILERS; metricsProvider = DefaultMetricsProvider.INSTANCE; // Defaults provider @@ -1085,6 +1093,11 @@ public boolean getEnableRetryInfo() { return enableRetryInfo; } + Builder setEnableSkipTrailers(boolean enabled) { + this.enableSkipTrailers = enabled; + return this; + } + /** Returns the builder for the settings used for calls to readRows. */ public ServerStreamingCallSettings.Builder readRowsSettings() { return readRowsSettings; @@ -1212,6 +1225,7 @@ public String toString() { .add("jwtAudienceMapping", jwtAudienceMapping) .add("enableRoutingCookie", enableRoutingCookie) .add("enableRetryInfo", enableRetryInfo) + .add("enableSkipTrailers", enableSkipTrailers) .add("readRowsSettings", readRowsSettings) .add("readRowSettings", readRowSettings) .add("sampleRowKeysSettings", sampleRowKeysSettings) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index d0e307d510..dcea020485 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -52,6 +52,13 @@ public void afterResponse(long applicationLatency) { // noop } + /** + * Used by BigtableUnaryOperationCallable to signal that the user visible portion of the RPC is + * complete and that metrics should freeze the timers and then publish the frozen values when the + * internal portion of the operation completes. + */ + public void operationFinishEarly() {} + /** * Get the attempt number of the current call. Attempt number for the current call is passed in * and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index 14a112b270..99502d0dbd 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -51,6 +51,7 @@ class BuiltinMetricsTracer extends BigtableTracer { private final SpanName spanName; // Operation level metrics + private final AtomicBoolean operationFinishedEarly = new AtomicBoolean(); private final AtomicBoolean opFinished = new AtomicBoolean(); private final Stopwatch operationTimer = Stopwatch.createStarted(); private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted(); @@ -132,6 +133,13 @@ public void close() {} }; } + @Override + public void operationFinishEarly() { + operationFinishedEarly.set(true); + attemptTimer.stop(); + operationTimer.stop(); + } + @Override public void operationSucceeded() { recordOperationCompletion(null); @@ -192,6 +200,11 @@ public void attemptPermanentFailure(Throwable throwable) { @Override public void onRequest(int requestCount) { requestLeft.accumulateAndGet(requestCount, IntMath::saturatedAdd); + + if (operationFinishedEarly.get()) { + return; + } + if (flowControlIsDisabled) { // On request is only called when auto flow control is disabled. When auto flow control is // disabled, server latency is measured between onRequest and onResponse. @@ -205,6 +218,10 @@ public void onRequest(int requestCount) { @Override public void responseReceived() { + if (operationFinishedEarly.get()) { + return; + } + if (firstResponsePerOpTimer.isRunning()) { firstResponsePerOpTimer.stop(); } @@ -226,6 +243,9 @@ public void responseReceived() { @Override public void afterResponse(long applicationLatency) { if (!flowControlIsDisabled || requestLeft.decrementAndGet() > 0) { + if (operationFinishedEarly.get()) { + return; + } // When auto flow control is enabled, request will never be called, so server latency is // measured between after the last response is processed and before the next response is // received. If flow control is disabled but requestLeft is greater than 0, @@ -272,10 +292,14 @@ public void disableFlowControl() { } private void recordOperationCompletion(@Nullable Throwable status) { + if (operationFinishedEarly.get()) { + status = null; // force an ok + } + if (!opFinished.compareAndSet(false, true)) { return; } - operationTimer.stop(); + long operationLatencyNano = operationTimer.elapsed(TimeUnit.NANOSECONDS); boolean isStreaming = operationType == OperationType.ServerStreaming; String statusStr = Util.extractStatus(status); @@ -294,8 +318,6 @@ private void recordOperationCompletion(@Nullable Throwable status) { .put(STATUS_KEY, statusStr) .build(); - long operationLatencyNano = operationTimer.elapsed(TimeUnit.NANOSECONDS); - // Only record when retry count is greater than 0 so the retry // graph will be less confusing if (attemptCount > 1) { @@ -316,6 +338,9 @@ private void recordOperationCompletion(@Nullable Throwable status) { } private void recordAttemptCompletion(@Nullable Throwable status) { + if (operationFinishedEarly.get()) { + status = null; // force an ok + } // If the attempt failed, the time spent in retry should be counted in application latency. // Stop the stopwatch and decrement requestLeft. synchronized (timerLock) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index d89aa90c6b..1b3d930546 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -62,6 +62,13 @@ public void close() { }; } + @Override + public void operationFinishEarly() { + for (BigtableTracer tracer : bigtableTracers) { + tracer.operationFinishEarly(); + } + } + @Override public void operationSucceeded() { for (ApiTracer child : children) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java index 0ffabe2606..a2c5bdac1f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -84,6 +84,12 @@ public void close() {} }; } + @Override + public void operationFinishEarly() { + attemptTimer.stop(); + operationTimer.stop(); + } + @Override public void operationSucceeded() { recordOperationCompletion(null); @@ -103,7 +109,6 @@ private void recordOperationCompletion(@Nullable Throwable throwable) { if (!opFinished.compareAndSet(false, true)) { return; } - operationTimer.stop(); long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java index b6f1a24b70..0b11ce3219 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java @@ -21,7 +21,6 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.grpc.GrpcCallContext; -import com.google.api.gax.rpc.InternalException; import com.google.api.gax.tracing.ApiTracerFactory; import com.google.api.gax.tracing.SpanName; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; @@ -88,18 +87,11 @@ public void testMultipleResponses() throws Exception { call.getController().getObserver().onResponse("first"); call.getController().getObserver().onResponse("second"); - Throwable e = Assert.assertThrows(ExecutionException.class, f::get).getCause(); - assertThat(e).isInstanceOf(InternalException.class); - assertThat(e) - .hasMessageThat() - .contains( - "Received multiple responses for a Fake.method unary operation. Previous: first, New: second"); - ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(String.class); verify(callable.logger).log(Mockito.any(), msgCaptor.capture()); assertThat(msgCaptor.getValue()) .isEqualTo( - "Received multiple responses for a Fake.method unary operation. Previous: first, New: second"); + "Received response after future is resolved for a Fake.method unary operation. previous: first, New response: second"); assertThat(call.getController().isCancelled()).isTrue(); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index 5280abe1fd..fdc6b5717e 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -961,6 +961,7 @@ public void enableRetryInfoFalseValueTest() throws IOException { "jwtAudienceMapping", "enableRoutingCookie", "enableRetryInfo", + "enableSkipTrailers", "readRowsSettings", "readRowSettings", "sampleRowKeysSettings", diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SkipTrailersTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SkipTrailersTest.java new file mode 100644 index 0000000000..07ac7deee4 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SkipTrailersTest.java @@ -0,0 +1,249 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.auto.value.AutoValue; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.CheckAndMutateRowResponse; +import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.PingAndWarmResponse; +import com.google.bigtable.v2.ReadModifyWriteRowResponse; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.Row; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; +import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; +import com.google.cloud.bigtable.data.v2.models.Filters; +import com.google.cloud.bigtable.data.v2.models.Mutation; +import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.models.TableId; +import com.google.cloud.bigtable.data.v2.models.TargetId; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.StringValue; +import io.grpc.BindableService; +import io.grpc.MethodDescriptor; +import io.grpc.Server; +import io.grpc.ServerServiceDefinition; +import io.grpc.stub.ServerCalls; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.exceptions.verification.WantedButNotInvoked; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class SkipTrailersTest { + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + private static final String PROJECT_ID = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final TargetId TABLE_ID = TableId.of("fake-table"); + + private HackedBigtableService hackedService; + private Server server; + + @Mock private ApiTracerFactory tracerFactory; + @Mock private BigtableTracer tracer; + + private BigtableDataClient client; + + @Before + public void setUp() throws Exception { + hackedService = new HackedBigtableService(); + server = FakeServiceBuilder.create(hackedService).start(); + + when(tracerFactory.newTracer(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(tracer); + when(tracer.inScope()).thenReturn(Mockito.mock(ApiTracer.Scope.class)); + + BigtableDataSettings.Builder clientBuilder = + BigtableDataSettings.newBuilderForEmulator(server.getPort()) + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setCredentialsProvider(NoCredentialsProvider.create()); + clientBuilder.stubSettings().setEnableSkipTrailers(true).setTracerFactory(tracerFactory); + + client = BigtableDataClient.create(clientBuilder.build()); + } + + @After + public void tearDown() throws Exception { + client.close(); + server.shutdown(); + } + + @Test + public void testReadRow() throws InterruptedException, ExecutionException { + ReadRowsResponse fakeResponse = + ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("fake-key")) + .setFamilyName(StringValue.newBuilder().setValue("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(0) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build(); + test(() -> client.readRowAsync(TABLE_ID, "fake-key"), fakeResponse); + } + + @Test + public void testMutateRow() throws ExecutionException, InterruptedException { + test( + () -> client.mutateRowAsync(RowMutation.create(TABLE_ID, "fake-key")), + MutateRowResponse.getDefaultInstance()); + } + + @Test + public void testCheckAndMutateRow() throws ExecutionException, InterruptedException { + ConditionalRowMutation req = + ConditionalRowMutation.create(TABLE_ID, "fake-key") + .condition(Filters.FILTERS.pass()) + .then(Mutation.create().deleteRow()); + test(() -> client.checkAndMutateRowAsync(req), CheckAndMutateRowResponse.getDefaultInstance()); + } + + @Test + public void testRMW() throws ExecutionException, InterruptedException { + ReadModifyWriteRow req = ReadModifyWriteRow.create(TABLE_ID, "fake-key").append("cf", "q", "A"); + test( + () -> client.readModifyWriteRowAsync(req), + ReadModifyWriteRowResponse.newBuilder().setRow(Row.getDefaultInstance()).build()); + } + + private void test(Supplier> invoker, T fakeResponse) + throws InterruptedException, ExecutionException { + ApiFuture future = invoker.get(); + + // Wait for the call to start on the server + @SuppressWarnings("unchecked") + ServerRpc rpc = (ServerRpc) hackedService.rpcs.poll(10, TimeUnit.SECONDS); + Preconditions.checkNotNull( + rpc, "Timed out waiting for the call to be received by the mock server"); + + // Send the only row + rpc.getResponseStream().onNext(fakeResponse); + + // Ensure that the future resolves and does not throw an error + try { + future.get(1, TimeUnit.MINUTES); + } catch (TimeoutException e) { + Assert.fail("timed out waiting for the trailer optimization future to resolve"); + } + + verify(tracer, times(1)).operationFinishEarly(); + verify(tracer, never()).operationSucceeded(); + + // clean up + rpc.getResponseStream().onCompleted(); + + // Ensure that the tracer is invoked after the internal operation is complete + // Since we dont have a way to know exactly when this happens, we poll + for (int i = 10; i > 0; i--) { + try { + verify(tracer, times(1)).operationSucceeded(); + break; + } catch (WantedButNotInvoked e) { + if (i > 1) { + Thread.sleep(100); + } else { + throw e; + } + } + } + } + + /** + * Hack the srvice definition to allow grpc server to simulate delayed trailers. This will augment + * the bigtable service definition to promote unary rpcs to server streaming + */ + class HackedBigtableService implements BindableService { + private final LinkedBlockingDeque> rpcs = new LinkedBlockingDeque<>(); + + @Override + public ServerServiceDefinition bindService() { + ServerServiceDefinition.Builder builder = + ServerServiceDefinition.builder(BigtableGrpc.SERVICE_NAME) + .addMethod( + BigtableGrpc.getPingAndWarmMethod(), + ServerCalls.asyncUnaryCall( + (ignored, observer) -> { + observer.onNext(PingAndWarmResponse.getDefaultInstance()); + observer.onCompleted(); + })) + .addMethod( + BigtableGrpc.getReadRowsMethod(), + ServerCalls.asyncServerStreamingCall( + (req, observer) -> rpcs.add(ServerRpc.create(req, observer)))); + ImmutableList> + unaryDescriptors = + ImmutableList.of( + BigtableGrpc.getMutateRowMethod(), + BigtableGrpc.getCheckAndMutateRowMethod(), + BigtableGrpc.getReadModifyWriteRowMethod()); + + for (MethodDescriptor desc : + unaryDescriptors) { + builder.addMethod( + desc.toBuilder().setType(MethodDescriptor.MethodType.SERVER_STREAMING).build(), + ServerCalls.asyncServerStreamingCall( + (req, observer) -> rpcs.add(ServerRpc.create(req, observer)))); + } + return builder.build(); + } + } + + @AutoValue + abstract static class ServerRpc { + abstract ReqT getRequest(); + + abstract StreamObserver getResponseStream(); + + static ServerRpc create(ReqT req, StreamObserver resp) { + // return new AutoValue__(req, resp); + return new AutoValue_SkipTrailersTest_ServerRpc<>(req, resp); + } + } +}