From 62fd89b79844a9dce9f67b585973ef5939269ec9 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Mon, 4 Nov 2024 19:21:51 -0500 Subject: [PATCH 1/5] feat: add an experimental feature to skip waiting for trailers for unary ops This is off by default and can be enabled using an environment variable. When enabled, BigtableUnaryOperationCallable will resolve the user visible future immediately when a response is available and will tell metrics to freeze all timers. Metrics will still wait for the trailers in the background for necessary metadata to publish the frozen timer values. Change-Id: I2101ff375de711693720af4fd2e9535aa5355f9d --- .../stub/BigtableUnaryOperationCallable.java | 84 ++++++++++++------- .../data/v2/stub/metrics/BigtableTracer.java | 2 + .../v2/stub/metrics/BuiltinMetricsTracer.java | 31 ++++++- .../data/v2/stub/metrics/CompositeTracer.java | 7 ++ .../data/v2/stub/metrics/MetricsTracer.java | 12 ++- .../BigtableUnaryOperationCallableTest.java | 10 +-- 6 files changed, 105 insertions(+), 41 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java index 19f7a5224c..78d507665e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java @@ -24,15 +24,16 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StreamController; import com.google.api.gax.rpc.UnaryCallable; -import com.google.api.gax.tracing.ApiTracer; import com.google.api.gax.tracing.ApiTracerFactory; import com.google.api.gax.tracing.SpanName; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; import io.grpc.Status; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; /** * Helper to convert a fake {@link ServerStreamingCallable} (ie only up to 1 response) into a {@link @@ -73,9 +74,10 @@ public BigtableUnaryOperationCallable( public ApiFuture futureCall(ReqT req, ApiCallContext apiCallContext) { apiCallContext = defaultCallContext.merge(apiCallContext); - ApiTracer apiTracer = - tracerFactory.newTracer( - apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary); + BigtableTracer apiTracer = + (BigtableTracer) + tracerFactory.newTracer( + apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary); apiCallContext = apiCallContext.withTracer(apiTracer); @@ -85,18 +87,15 @@ public ApiFuture futureCall(ReqT req, ApiCallContext apiCallContext) { } class UnaryFuture extends AbstractApiFuture implements ResponseObserver { - private final ApiTracer tracer; + private final BigtableTracer tracer; private final boolean allowNoResponse; private StreamController controller; private final AtomicBoolean upstreamCancelled = new AtomicBoolean(); - private boolean responseReceived; - private @Nullable RespT response; - private UnaryFuture(ApiTracer tracer, boolean allowNoResponse) { + private UnaryFuture(BigtableTracer tracer, boolean allowNoResponse) { this.tracer = Preconditions.checkNotNull(tracer, "tracer can't be null"); this.allowNoResponse = allowNoResponse; - this.responseReceived = false; } @Override @@ -130,23 +129,39 @@ private void cancelUpstream() { public void onResponse(RespT resp) { tracer.responseReceived(); - // happy path - buffer the only responsse - if (!responseReceived) { - responseReceived = true; - this.response = resp; + if (set(resp)) { + tracer.operationFinishEarly(); return; } - String msg = - String.format( - "Received multiple responses for a %s unary operation. Previous: %s, New: %s", - spanName, response, resp); - logger.log(Level.WARNING, msg); + // At this point we are guaranteed that the future has been resolved. However we need to check + // why. + // We know it's not because it was resolved with the current response. Moreover, since the + // future + // is resolved, our only means to flag the error is to log. + // So there are 3 possibilities: + // 1. user cancelled the future + // 2. this is an extra response and the previous one resolved the future + // 3. we got a response after the rpc failed (this should never happen and would be a bad bug) - InternalException error = - new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false); - if (setException(error)) { - tracer.operationFailed(error); + if (isCancelled()) { + return; + } + + try { + RespT prev = Futures.getDone(this); + String msg = + String.format( + "Received response after future is resolved for a %s unary operation. previous: %s, New response: %s", + spanName, prev, resp); + logger.log(Level.WARNING, msg); + } catch (ExecutionException e) { + // Should never happen + String msg = + String.format( + "Received response after future resolved as a failure for a %s unary operation. New response: %s", + spanName, resp); + logger.log(Level.WARNING, msg, e.getCause()); } cancelUpstream(); @@ -158,18 +173,24 @@ public void onError(Throwable throwable) { tracer.operationFailed(throwable); } else if (isCancelled()) { tracer.operationCancelled(); + } else { + // At this point the has been resolved, so we ignore the error + tracer.operationSucceeded(); } - // The future might've been resolved due to double response } @Override public void onComplete() { - if (allowNoResponse || responseReceived) { - if (set(response)) { - tracer.operationSucceeded(); - return; - } - } else { + if (allowNoResponse && set(null)) { + tracer.operationSucceeded(); + return; + + // Under normal circumstances the future wouldve been resolved in onResponse or via + // set(null) if it expected for + // the rpc to not have a response. So if aren't done, the only reason is that we didn't get + // a response + // but were expecting one + } else if (!isDone()) { String msg = spanName + " unary operation completed without a response message"; InternalException e = new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false); @@ -183,7 +204,10 @@ public void onComplete() { // check cancellation race if (isCancelled()) { tracer.operationCancelled(); + return; } + + tracer.operationSucceeded(); } } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index d0e307d510..0b14d3bbfd 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -52,6 +52,8 @@ public void afterResponse(long applicationLatency) { // noop } + public void operationFinishEarly() {} + /** * Get the attempt number of the current call. Attempt number for the current call is passed in * and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index 14a112b270..c1a7737043 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -51,6 +51,7 @@ class BuiltinMetricsTracer extends BigtableTracer { private final SpanName spanName; // Operation level metrics + private final AtomicBoolean operationFinishedEarly = new AtomicBoolean(); private final AtomicBoolean opFinished = new AtomicBoolean(); private final Stopwatch operationTimer = Stopwatch.createStarted(); private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted(); @@ -132,6 +133,13 @@ public void close() {} }; } + @Override + public void operationFinishEarly() { + operationFinishedEarly.set(true); + attemptTimer.stop(); + operationTimer.stop(); + } + @Override public void operationSucceeded() { recordOperationCompletion(null); @@ -192,6 +200,11 @@ public void attemptPermanentFailure(Throwable throwable) { @Override public void onRequest(int requestCount) { requestLeft.accumulateAndGet(requestCount, IntMath::saturatedAdd); + + if (operationFinishedEarly.get()) { + return; + } + if (flowControlIsDisabled) { // On request is only called when auto flow control is disabled. When auto flow control is // disabled, server latency is measured between onRequest and onResponse. @@ -205,6 +218,10 @@ public void onRequest(int requestCount) { @Override public void responseReceived() { + if (operationFinishedEarly.get()) { + return; + } + if (firstResponsePerOpTimer.isRunning()) { firstResponsePerOpTimer.stop(); } @@ -226,6 +243,9 @@ public void responseReceived() { @Override public void afterResponse(long applicationLatency) { if (!flowControlIsDisabled || requestLeft.decrementAndGet() > 0) { + if (operationFinishedEarly.get()) { + return; + } // When auto flow control is enabled, request will never be called, so server latency is // measured between after the last response is processed and before the next response is // received. If flow control is disabled but requestLeft is greater than 0, @@ -272,10 +292,16 @@ public void disableFlowControl() { } private void recordOperationCompletion(@Nullable Throwable status) { + if (operationFinishedEarly.get()) { + status = null; // force an ok + } + if (!opFinished.compareAndSet(false, true)) { return; } - operationTimer.stop(); + if (operationTimer.isRunning()) { + operationTimer.stop(); + } boolean isStreaming = operationType == OperationType.ServerStreaming; String statusStr = Util.extractStatus(status); @@ -316,6 +342,9 @@ private void recordOperationCompletion(@Nullable Throwable status) { } private void recordAttemptCompletion(@Nullable Throwable status) { + if (operationFinishedEarly.get()) { + status = null; // force an ok + } // If the attempt failed, the time spent in retry should be counted in application latency. // Stop the stopwatch and decrement requestLeft. synchronized (timerLock) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index d89aa90c6b..1b3d930546 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -62,6 +62,13 @@ public void close() { }; } + @Override + public void operationFinishEarly() { + for (BigtableTracer tracer : bigtableTracers) { + tracer.operationFinishEarly(); + } + } + @Override public void operationSucceeded() { for (ApiTracer child : children) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java index 0ffabe2606..e9137d1cf3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -84,6 +84,12 @@ public void close() {} }; } + @Override + public void operationFinishEarly() { + attemptTimer.stop(); + operationTimer.stop(); + } + @Override public void operationSucceeded() { recordOperationCompletion(null); @@ -103,7 +109,11 @@ private void recordOperationCompletion(@Nullable Throwable throwable) { if (!opFinished.compareAndSet(false, true)) { return; } - operationTimer.stop(); + + // Mightve stopped in operationFinishEarly() + if (operationTimer.isRunning()) { + operationTimer.stop(); + } long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java index b6f1a24b70..0b11ce3219 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java @@ -21,7 +21,6 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.grpc.GrpcCallContext; -import com.google.api.gax.rpc.InternalException; import com.google.api.gax.tracing.ApiTracerFactory; import com.google.api.gax.tracing.SpanName; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; @@ -88,18 +87,11 @@ public void testMultipleResponses() throws Exception { call.getController().getObserver().onResponse("first"); call.getController().getObserver().onResponse("second"); - Throwable e = Assert.assertThrows(ExecutionException.class, f::get).getCause(); - assertThat(e).isInstanceOf(InternalException.class); - assertThat(e) - .hasMessageThat() - .contains( - "Received multiple responses for a Fake.method unary operation. Previous: first, New: second"); - ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(String.class); verify(callable.logger).log(Mockito.any(), msgCaptor.capture()); assertThat(msgCaptor.getValue()) .isEqualTo( - "Received multiple responses for a Fake.method unary operation. Previous: first, New: second"); + "Received response after future is resolved for a Fake.method unary operation. previous: first, New response: second"); assertThat(call.getController().isCancelled()).isTrue(); } From d9223bfb8ac8c1b154192e6b45067851d75f9c42 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Wed, 6 Nov 2024 18:30:48 -0500 Subject: [PATCH 2/5] more testing Change-Id: Ifc95aa89c080ee8395d43adce1172f11354c306e --- .../data/v2/stub/EnhancedBigtableStub.java | 4 +- .../v2/stub/EnhancedBigtableStubSettings.java | 16 +- .../data/v2/stub/SkipTrailersTest.java | 245 ++++++++++++++++++ 3 files changed, 262 insertions(+), 3 deletions(-) create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SkipTrailersTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 94c91fb72a..bcb084b491 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -563,7 +563,7 @@ public ServerStreamingCallable createReadRowsCallable( * */ public UnaryCallable createReadRowCallable(RowAdapter rowAdapter) { - if (!EnhancedBigtableStubSettings.SKIP_TRAILERS) { + if (!settings.getEnableSkipTrailers()) { ServerStreamingCallable readRowsCallable = createReadRowsBaseCallable( ServerStreamingCallSettings.newBuilder() @@ -1296,7 +1296,7 @@ private UnaryCallable createUnar UnaryCallSettings callSettings, Function requestTransformer, Function responseTranformer) { - if (EnhancedBigtableStubSettings.SKIP_TRAILERS) { + if (settings.getEnableSkipTrailers()) { return createUnaryCallableNew( methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer); } else { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 863389166f..1425e7b362 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -109,7 +109,7 @@ public class EnhancedBigtableStubSettings extends StubSettings jwtAudienceMapping; private final boolean enableRoutingCookie; private final boolean enableRetryInfo; + private final boolean enableSkipTrailers; private final ServerStreamingCallSettings readRowsSettings; private final UnaryCallSettings readRowSettings; @@ -287,6 +288,7 @@ private EnhancedBigtableStubSettings(Builder builder) { jwtAudienceMapping = builder.jwtAudienceMapping; enableRoutingCookie = builder.enableRoutingCookie; enableRetryInfo = builder.enableRetryInfo; + enableSkipTrailers = builder.enableSkipTrailers; metricsProvider = builder.metricsProvider; metricsEndpoint = builder.metricsEndpoint; @@ -373,6 +375,10 @@ public boolean getEnableRetryInfo() { return enableRetryInfo; } + boolean getEnableSkipTrailers() { + return enableSkipTrailers; + } + /** * Gets the Google Cloud Monitoring endpoint for publishing client side metrics. If it's null, * client will publish metrics to the default monitoring endpoint. @@ -683,6 +689,7 @@ public static class Builder extends StubSettings.Builder jwtAudienceMapping; private boolean enableRoutingCookie; private boolean enableRetryInfo; + private boolean enableSkipTrailers; private final ServerStreamingCallSettings.Builder readRowsSettings; private final UnaryCallSettings.Builder readRowSettings; @@ -721,6 +728,7 @@ private Builder() { setCredentialsProvider(defaultCredentialsProviderBuilder().build()); this.enableRoutingCookie = true; this.enableRetryInfo = true; + this.enableSkipTrailers = SKIP_TRAILERS; metricsProvider = DefaultMetricsProvider.INSTANCE; // Defaults provider @@ -1085,6 +1093,11 @@ public boolean getEnableRetryInfo() { return enableRetryInfo; } + Builder setEnableSkipTrailers(boolean enabled) { + this.enableSkipTrailers = enabled; + return this; + } + /** Returns the builder for the settings used for calls to readRows. */ public ServerStreamingCallSettings.Builder readRowsSettings() { return readRowsSettings; @@ -1212,6 +1225,7 @@ public String toString() { .add("jwtAudienceMapping", jwtAudienceMapping) .add("enableRoutingCookie", enableRoutingCookie) .add("enableRetryInfo", enableRetryInfo) + .add("enableSkipTrailers", enableSkipTrailers) .add("readRowsSettings", readRowsSettings) .add("readRowSettings", readRowSettings) .add("sampleRowKeysSettings", sampleRowKeysSettings) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SkipTrailersTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SkipTrailersTest.java new file mode 100644 index 0000000000..053a3dc3cf --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SkipTrailersTest.java @@ -0,0 +1,245 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.auto.value.AutoValue; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.CheckAndMutateRowResponse; +import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.PingAndWarmResponse; +import com.google.bigtable.v2.ReadModifyWriteRowResponse; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.Row; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; +import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; +import com.google.cloud.bigtable.data.v2.models.Filters; +import com.google.cloud.bigtable.data.v2.models.Mutation; +import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.models.TableId; +import com.google.cloud.bigtable.data.v2.models.TargetId; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.StringValue; +import io.grpc.BindableService; +import io.grpc.MethodDescriptor; +import io.grpc.Server; +import io.grpc.ServerServiceDefinition; +import io.grpc.stub.ServerCalls; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.exceptions.verification.WantedButNotInvoked; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class SkipTrailersTest { + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + private static final String PROJECT_ID = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final TargetId TABLE_ID = TableId.of("fake-table"); + + private HackedBigtableService hackedService; + private Server server; + + @Mock private ApiTracerFactory tracerFactory; + @Mock private BigtableTracer tracer; + + private BigtableDataClient client; + + @Before + public void setUp() throws Exception { + hackedService = new HackedBigtableService(); + server = FakeServiceBuilder.create(hackedService).start(); + + when(tracerFactory.newTracer(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(tracer); + when(tracer.inScope()).thenReturn(Mockito.mock(ApiTracer.Scope.class)); + + BigtableDataSettings.Builder clientBuilder = + BigtableDataSettings.newBuilderForEmulator(server.getPort()) + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setCredentialsProvider(NoCredentialsProvider.create()); + clientBuilder.stubSettings().setEnableSkipTrailers(true).setTracerFactory(tracerFactory); + + client = BigtableDataClient.create(clientBuilder.build()); + } + + @After + public void tearDown() throws Exception { + client.close(); + server.shutdown(); + } + + @Test + public void testReadRow() throws InterruptedException, ExecutionException { + ReadRowsResponse fakeResponse = + ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("fake-key")) + .setFamilyName(StringValue.newBuilder().setValue("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(0) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build(); + test(() -> client.readRowAsync(TABLE_ID, "fake-key"), fakeResponse); + } + + @Test + public void testMutateRow() throws ExecutionException, InterruptedException { + test( + () -> client.mutateRowAsync(RowMutation.create(TABLE_ID, "fake-key")), + MutateRowResponse.getDefaultInstance()); + } + + @Test + public void testCheckAndMutateRow() throws ExecutionException, InterruptedException { + ConditionalRowMutation req = + ConditionalRowMutation.create(TABLE_ID, "fake-key") + .condition(Filters.FILTERS.pass()) + .then(Mutation.create().deleteRow()); + test(() -> client.checkAndMutateRowAsync(req), CheckAndMutateRowResponse.getDefaultInstance()); + } + + @Test + public void testRMW() throws ExecutionException, InterruptedException { + ReadModifyWriteRow req = ReadModifyWriteRow.create(TABLE_ID, "fake-key").append("cf", "q", "A"); + test( + () -> client.readModifyWriteRowAsync(req), + ReadModifyWriteRowResponse.newBuilder().setRow(Row.getDefaultInstance()).build()); + } + + private void test(Supplier> invoker, T fakeResponse) + throws InterruptedException, ExecutionException { + ApiFuture future = invoker.get(); + + // Wait for the call to start on the server + @SuppressWarnings("unchecked") + ServerRpc rpc = (ServerRpc) hackedService.rpcs.poll(10, TimeUnit.SECONDS); + Preconditions.checkNotNull( + rpc, "Timed out waiting for the call to be received by the mock server"); + + // Send the only row + rpc.getResponseStream().onNext(fakeResponse); + + // Ensure that the future resolves and does not throw an error + try { + future.get(1, TimeUnit.MINUTES); + } catch (TimeoutException e) { + Assert.fail("timed out waiting for the trailer optimization future to resolve"); + } + + verify(tracer, times(1)).operationFinishEarly(); + verify(tracer, never()).operationSucceeded(); + + // clean up + rpc.getResponseStream().onCompleted(); + + // Ensure that the tracer is invoked after the internal operation is complete + // Since we dont have a way to know exactly when this happens, we poll + for (int i = 10; i > 0; i--) { + try { + verify(tracer, times(1)).operationSucceeded(); + break; + } catch (WantedButNotInvoked e) { + if (i > 1) { + Thread.sleep(100); + } else { + throw e; + } + } + } + } + + class HackedBigtableService implements BindableService { + private final LinkedBlockingDeque> rpcs = new LinkedBlockingDeque<>(); + + @Override + public ServerServiceDefinition bindService() { + ServerServiceDefinition.Builder builder = + ServerServiceDefinition.builder(BigtableGrpc.SERVICE_NAME) + .addMethod( + BigtableGrpc.getPingAndWarmMethod(), + ServerCalls.asyncUnaryCall( + (ignored, observer) -> { + observer.onNext(PingAndWarmResponse.getDefaultInstance()); + observer.onCompleted(); + })) + .addMethod( + BigtableGrpc.getReadRowsMethod(), + ServerCalls.asyncServerStreamingCall( + (req, observer) -> rpcs.add(ServerRpc.create(req, observer)))); + ImmutableList> + unaryDescriptors = + ImmutableList.of( + BigtableGrpc.getMutateRowMethod(), + BigtableGrpc.getCheckAndMutateRowMethod(), + BigtableGrpc.getReadModifyWriteRowMethod()); + + for (MethodDescriptor desc : + unaryDescriptors) { + builder.addMethod( + desc.toBuilder().setType(MethodDescriptor.MethodType.SERVER_STREAMING).build(), + ServerCalls.asyncServerStreamingCall( + (req, observer) -> rpcs.add(ServerRpc.create(req, observer)))); + } + return builder.build(); + } + } + + @AutoValue + abstract static class ServerRpc { + abstract ReqT getRequest(); + + abstract StreamObserver getResponseStream(); + + static ServerRpc create(ReqT req, StreamObserver resp) { + // return new AutoValue__(req, resp); + return new AutoValue_SkipTrailersTest_ServerRpc<>(req, resp); + } + } +} From 643ba49b8fb50e4f96fb034246b951784a1ea780 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Wed, 6 Nov 2024 18:35:02 -0500 Subject: [PATCH 3/5] cosmetics Change-Id: I679aeac3ec7475757ce769f4c64ede1130b35ebd --- .../cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java | 5 +++++ .../bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java | 6 +----- .../cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java | 5 ----- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index 0b14d3bbfd..dcea020485 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -52,6 +52,11 @@ public void afterResponse(long applicationLatency) { // noop } + /** + * Used by BigtableUnaryOperationCallable to signal that the user visible portion of the RPC is + * complete and that metrics should freeze the timers and then publish the frozen values when the + * internal portion of the operation completes. + */ public void operationFinishEarly() {} /** diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index c1a7737043..99502d0dbd 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -299,9 +299,7 @@ private void recordOperationCompletion(@Nullable Throwable status) { if (!opFinished.compareAndSet(false, true)) { return; } - if (operationTimer.isRunning()) { - operationTimer.stop(); - } + long operationLatencyNano = operationTimer.elapsed(TimeUnit.NANOSECONDS); boolean isStreaming = operationType == OperationType.ServerStreaming; String statusStr = Util.extractStatus(status); @@ -320,8 +318,6 @@ private void recordOperationCompletion(@Nullable Throwable status) { .put(STATUS_KEY, statusStr) .build(); - long operationLatencyNano = operationTimer.elapsed(TimeUnit.NANOSECONDS); - // Only record when retry count is greater than 0 so the retry // graph will be less confusing if (attemptCount > 1) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java index e9137d1cf3..a2c5bdac1f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -110,11 +110,6 @@ private void recordOperationCompletion(@Nullable Throwable throwable) { return; } - // Mightve stopped in operationFinishEarly() - if (operationTimer.isRunning()) { - operationTimer.stop(); - } - long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS); MeasureMap measures = From 5b142336c9d799639254d01ee133eabfe5b421fa Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Wed, 6 Nov 2024 18:38:05 -0500 Subject: [PATCH 4/5] comment Change-Id: Ia535905f4fed6f30854c05ceb300af39877ca4a1 --- .../google/cloud/bigtable/data/v2/stub/SkipTrailersTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SkipTrailersTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SkipTrailersTest.java index 053a3dc3cf..07ac7deee4 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SkipTrailersTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SkipTrailersTest.java @@ -195,6 +195,10 @@ private void test(Supplier> invoker, T fakeResponse) } } + /** + * Hack the srvice definition to allow grpc server to simulate delayed trailers. This will augment + * the bigtable service definition to promote unary rpcs to server streaming + */ class HackedBigtableService implements BindableService { private final LinkedBlockingDeque> rpcs = new LinkedBlockingDeque<>(); From 2fe1c7c70a81bf7ef262a376e2e92905adf683a9 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Wed, 6 Nov 2024 22:35:26 -0500 Subject: [PATCH 5/5] fix test Change-Id: I77664e40c9fd2d52b609f5063386b158cbc1e81e --- .../bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index 5280abe1fd..fdc6b5717e 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -961,6 +961,7 @@ public void enableRetryInfoFalseValueTest() throws IOException { "jwtAudienceMapping", "enableRoutingCookie", "enableRetryInfo", + "enableSkipTrailers", "readRowsSettings", "readRowSettings", "sampleRowKeysSettings",