diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java index 19f7a5224c..78d507665e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java @@ -24,15 +24,16 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StreamController; import com.google.api.gax.rpc.UnaryCallable; -import com.google.api.gax.tracing.ApiTracer; import com.google.api.gax.tracing.ApiTracerFactory; import com.google.api.gax.tracing.SpanName; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; import io.grpc.Status; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; /** * Helper to convert a fake {@link ServerStreamingCallable} (ie only up to 1 response) into a {@link @@ -73,9 +74,10 @@ public BigtableUnaryOperationCallable( public ApiFuture futureCall(ReqT req, ApiCallContext apiCallContext) { apiCallContext = defaultCallContext.merge(apiCallContext); - ApiTracer apiTracer = - tracerFactory.newTracer( - apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary); + BigtableTracer apiTracer = + (BigtableTracer) + tracerFactory.newTracer( + apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary); apiCallContext = apiCallContext.withTracer(apiTracer); @@ -85,18 +87,15 @@ public ApiFuture futureCall(ReqT req, ApiCallContext apiCallContext) { } class UnaryFuture extends AbstractApiFuture implements ResponseObserver { - private final ApiTracer tracer; + private final BigtableTracer tracer; private final boolean allowNoResponse; private StreamController controller; private final AtomicBoolean upstreamCancelled = new AtomicBoolean(); - private boolean responseReceived; - private @Nullable RespT response; - private UnaryFuture(ApiTracer tracer, boolean allowNoResponse) { + private UnaryFuture(BigtableTracer tracer, boolean allowNoResponse) { this.tracer = Preconditions.checkNotNull(tracer, "tracer can't be null"); this.allowNoResponse = allowNoResponse; - this.responseReceived = false; } @Override @@ -130,23 +129,39 @@ private void cancelUpstream() { public void onResponse(RespT resp) { tracer.responseReceived(); - // happy path - buffer the only responsse - if (!responseReceived) { - responseReceived = true; - this.response = resp; + if (set(resp)) { + tracer.operationFinishEarly(); return; } - String msg = - String.format( - "Received multiple responses for a %s unary operation. Previous: %s, New: %s", - spanName, response, resp); - logger.log(Level.WARNING, msg); + // At this point we are guaranteed that the future has been resolved. However we need to check + // why. + // We know it's not because it was resolved with the current response. Moreover, since the + // future + // is resolved, our only means to flag the error is to log. + // So there are 3 possibilities: + // 1. user cancelled the future + // 2. this is an extra response and the previous one resolved the future + // 3. we got a response after the rpc failed (this should never happen and would be a bad bug) - InternalException error = - new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false); - if (setException(error)) { - tracer.operationFailed(error); + if (isCancelled()) { + return; + } + + try { + RespT prev = Futures.getDone(this); + String msg = + String.format( + "Received response after future is resolved for a %s unary operation. previous: %s, New response: %s", + spanName, prev, resp); + logger.log(Level.WARNING, msg); + } catch (ExecutionException e) { + // Should never happen + String msg = + String.format( + "Received response after future resolved as a failure for a %s unary operation. New response: %s", + spanName, resp); + logger.log(Level.WARNING, msg, e.getCause()); } cancelUpstream(); @@ -158,18 +173,24 @@ public void onError(Throwable throwable) { tracer.operationFailed(throwable); } else if (isCancelled()) { tracer.operationCancelled(); + } else { + // At this point the has been resolved, so we ignore the error + tracer.operationSucceeded(); } - // The future might've been resolved due to double response } @Override public void onComplete() { - if (allowNoResponse || responseReceived) { - if (set(response)) { - tracer.operationSucceeded(); - return; - } - } else { + if (allowNoResponse && set(null)) { + tracer.operationSucceeded(); + return; + + // Under normal circumstances the future wouldve been resolved in onResponse or via + // set(null) if it expected for + // the rpc to not have a response. So if aren't done, the only reason is that we didn't get + // a response + // but were expecting one + } else if (!isDone()) { String msg = spanName + " unary operation completed without a response message"; InternalException e = new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false); @@ -183,7 +204,10 @@ public void onComplete() { // check cancellation race if (isCancelled()) { tracer.operationCancelled(); + return; } + + tracer.operationSucceeded(); } } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index d0e307d510..0b14d3bbfd 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -52,6 +52,8 @@ public void afterResponse(long applicationLatency) { // noop } + public void operationFinishEarly() {} + /** * Get the attempt number of the current call. Attempt number for the current call is passed in * and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index 14a112b270..c1a7737043 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -51,6 +51,7 @@ class BuiltinMetricsTracer extends BigtableTracer { private final SpanName spanName; // Operation level metrics + private final AtomicBoolean operationFinishedEarly = new AtomicBoolean(); private final AtomicBoolean opFinished = new AtomicBoolean(); private final Stopwatch operationTimer = Stopwatch.createStarted(); private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted(); @@ -132,6 +133,13 @@ public void close() {} }; } + @Override + public void operationFinishEarly() { + operationFinishedEarly.set(true); + attemptTimer.stop(); + operationTimer.stop(); + } + @Override public void operationSucceeded() { recordOperationCompletion(null); @@ -192,6 +200,11 @@ public void attemptPermanentFailure(Throwable throwable) { @Override public void onRequest(int requestCount) { requestLeft.accumulateAndGet(requestCount, IntMath::saturatedAdd); + + if (operationFinishedEarly.get()) { + return; + } + if (flowControlIsDisabled) { // On request is only called when auto flow control is disabled. When auto flow control is // disabled, server latency is measured between onRequest and onResponse. @@ -205,6 +218,10 @@ public void onRequest(int requestCount) { @Override public void responseReceived() { + if (operationFinishedEarly.get()) { + return; + } + if (firstResponsePerOpTimer.isRunning()) { firstResponsePerOpTimer.stop(); } @@ -226,6 +243,9 @@ public void responseReceived() { @Override public void afterResponse(long applicationLatency) { if (!flowControlIsDisabled || requestLeft.decrementAndGet() > 0) { + if (operationFinishedEarly.get()) { + return; + } // When auto flow control is enabled, request will never be called, so server latency is // measured between after the last response is processed and before the next response is // received. If flow control is disabled but requestLeft is greater than 0, @@ -272,10 +292,16 @@ public void disableFlowControl() { } private void recordOperationCompletion(@Nullable Throwable status) { + if (operationFinishedEarly.get()) { + status = null; // force an ok + } + if (!opFinished.compareAndSet(false, true)) { return; } - operationTimer.stop(); + if (operationTimer.isRunning()) { + operationTimer.stop(); + } boolean isStreaming = operationType == OperationType.ServerStreaming; String statusStr = Util.extractStatus(status); @@ -316,6 +342,9 @@ private void recordOperationCompletion(@Nullable Throwable status) { } private void recordAttemptCompletion(@Nullable Throwable status) { + if (operationFinishedEarly.get()) { + status = null; // force an ok + } // If the attempt failed, the time spent in retry should be counted in application latency. // Stop the stopwatch and decrement requestLeft. synchronized (timerLock) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index d89aa90c6b..1b3d930546 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -62,6 +62,13 @@ public void close() { }; } + @Override + public void operationFinishEarly() { + for (BigtableTracer tracer : bigtableTracers) { + tracer.operationFinishEarly(); + } + } + @Override public void operationSucceeded() { for (ApiTracer child : children) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java index 0ffabe2606..e9137d1cf3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -84,6 +84,12 @@ public void close() {} }; } + @Override + public void operationFinishEarly() { + attemptTimer.stop(); + operationTimer.stop(); + } + @Override public void operationSucceeded() { recordOperationCompletion(null); @@ -103,7 +109,11 @@ private void recordOperationCompletion(@Nullable Throwable throwable) { if (!opFinished.compareAndSet(false, true)) { return; } - operationTimer.stop(); + + // Mightve stopped in operationFinishEarly() + if (operationTimer.isRunning()) { + operationTimer.stop(); + } long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java index b6f1a24b70..0b11ce3219 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java @@ -21,7 +21,6 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.grpc.GrpcCallContext; -import com.google.api.gax.rpc.InternalException; import com.google.api.gax.tracing.ApiTracerFactory; import com.google.api.gax.tracing.SpanName; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; @@ -88,18 +87,11 @@ public void testMultipleResponses() throws Exception { call.getController().getObserver().onResponse("first"); call.getController().getObserver().onResponse("second"); - Throwable e = Assert.assertThrows(ExecutionException.class, f::get).getCause(); - assertThat(e).isInstanceOf(InternalException.class); - assertThat(e) - .hasMessageThat() - .contains( - "Received multiple responses for a Fake.method unary operation. Previous: first, New: second"); - ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(String.class); verify(callable.logger).log(Mockito.any(), msgCaptor.capture()); assertThat(msgCaptor.getValue()) .isEqualTo( - "Received multiple responses for a Fake.method unary operation. Previous: first, New: second"); + "Received response after future is resolved for a Fake.method unary operation. previous: first, New response: second"); assertThat(call.getController().isCancelled()).isTrue(); }