diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java index 3b2242385a..80fcdd0419 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java @@ -15,11 +15,8 @@ */ package com.google.cloud.bigtable.data.v2.stub.metrics; -import com.google.common.base.Stopwatch; -import io.grpc.Attributes; import io.grpc.ClientStreamTracer; import io.grpc.Metadata; -import java.util.concurrent.TimeUnit; /** * Records the time a request is enqueued in a grpc channel queue. This a bridge between gRPC stream @@ -28,21 +25,15 @@ */ class BigtableGrpcStreamTracer extends ClientStreamTracer { - private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final BigtableTracer tracer; public BigtableGrpcStreamTracer(BigtableTracer tracer) { this.tracer = tracer; } - @Override - public void streamCreated(Attributes transportAttrs, Metadata headers) { - stopwatch.start(); - } - @Override public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { - tracer.grpcChannelQueuedLatencies(stopwatch.elapsed(TimeUnit.NANOSECONDS)); + tracer.grpcMessageSent(); } static class Factory extends ClientStreamTracer.Factory { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index 3445514f7b..d0e307d510 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -83,7 +83,14 @@ public void setLocations(String zone, String cluster) { // noop } + @Deprecated + /** @deprecated {@link #grpcMessageSent()} is called instead. */ public void grpcChannelQueuedLatencies(long queuedTimeMs) { // noop } + + /** Called when the message is sent on a grpc channel. */ + public void grpcMessageSent() { + // noop + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index abd214d760..7a3f54913e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -84,6 +84,7 @@ class BuiltinMetricsTracer extends BigtableTracer { private final Attributes baseAttributes; private Long serverLatencies = null; + private final AtomicLong grpcMessageSentDelay = new AtomicLong(0); // OpenCensus (and server) histogram buckets use [start, end), however OpenTelemetry uses (start, // end]. To work around this, we measure all the latencies in nanoseconds and convert them @@ -263,8 +264,8 @@ public void batchRequestThrottled(long throttledTimeNanos) { } @Override - public void grpcChannelQueuedLatencies(long queuedTimeNanos) { - totalClientBlockingTime.addAndGet(queuedTimeNanos); + public void grpcMessageSent() { + grpcMessageSentDelay.set(attemptTimer.elapsed(TimeUnit.NANOSECONDS)); } @Override @@ -351,6 +352,7 @@ private void recordAttemptCompletion(@Nullable Throwable status) { .put(STATUS_KEY, statusStr) .build(); + totalClientBlockingTime.addAndGet(grpcMessageSentDelay.get()); clientBlockingLatenciesHistogram.record(convertToMs(totalClientBlockingTime.get()), attributes); attemptLatenciesHistogram.record( diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index 774c6d9f22..d89aa90c6b 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -225,4 +225,11 @@ public void grpcChannelQueuedLatencies(long queuedTimeMs) { tracer.grpcChannelQueuedLatencies(queuedTimeMs); } } + + @Override + public void grpcMessageSent() { + for (BigtableTracer tracer : bigtableTracers) { + tracer.grpcMessageSent(); + } + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java index d37a2562bf..ba300f502d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -70,15 +70,11 @@ import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.StringValue; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.ForwardingClientCall; import io.grpc.ForwardingServerCall; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; -import io.grpc.MethodDescriptor; +import io.grpc.ProxiedSocketAddress; +import io.grpc.ProxyDetector; import io.grpc.Server; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; @@ -95,6 +91,8 @@ import io.opentelemetry.sdk.metrics.View; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.io.IOException; +import java.net.SocketAddress; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; @@ -104,6 +102,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -130,7 +129,7 @@ public class BuiltinMetricsTracerTest { private static final long SLEEP_VARIABILITY = 15; private static final String CLIENT_NAME = "java-bigtable/" + Version.VERSION; - private static final long CHANNEL_BLOCKING_LATENCY = 75; + private static final long CHANNEL_BLOCKING_LATENCY = 200; @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); @@ -196,28 +195,6 @@ public void sendHeaders(Metadata headers) { } }; - ClientInterceptor clientInterceptor = - new ClientInterceptor() { - @Override - public ClientCall interceptCall( - MethodDescriptor methodDescriptor, - CallOptions callOptions, - Channel channel) { - return new ForwardingClientCall.SimpleForwardingClientCall( - channel.newCall(methodDescriptor, callOptions)) { - @Override - public void sendMessage(ReqT message) { - try { - Thread.sleep(CHANNEL_BLOCKING_LATENCY); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - super.sendMessage(message); - } - }; - } - }; - server = FakeServiceBuilder.create(fakeService).intercept(trailersInterceptor).start(); BigtableDataSettings settings = @@ -225,6 +202,7 @@ public void sendMessage(ReqT message) { .setProjectId(PROJECT_ID) .setInstanceId(INSTANCE_ID) .setAppProfileId(APP_PROFILE_ID) + .setRefreshingChannel(false) .build(); EnhancedBigtableStubSettings.Builder stubSettingsBuilder = settings.getStubSettings().toBuilder(); @@ -264,7 +242,7 @@ public void sendMessage(ReqT message) { if (oldConfigurator != null) { builder = oldConfigurator.apply(builder); } - return builder.intercept(clientInterceptor); + return builder.proxyDetector(new DelayProxyDetector()); }); stubSettingsBuilder.setTransportChannelProvider(channelProvider.build()); @@ -692,9 +670,8 @@ public void testQueuedOnChannelUnaryLatencies() { .put(CLIENT_NAME_KEY, CLIENT_NAME) .build(); - long expected = CHANNEL_BLOCKING_LATENCY * 2 / 3; long actual = getAggregatedValue(clientLatency, attributes); - assertThat(actual).isAtLeast(expected); + assertThat(actual).isAtLeast(CHANNEL_BLOCKING_LATENCY); } @Test @@ -838,4 +815,18 @@ public AtomicInteger getResponseCounter() { return responseCounter; } } + + class DelayProxyDetector implements ProxyDetector { + + @Nullable + @Override + public ProxiedSocketAddress proxyFor(SocketAddress socketAddress) throws IOException { + try { + Thread.sleep(CHANNEL_BLOCKING_LATENCY); + } catch (InterruptedException e) { + + } + return null; + } + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java index 11dd0b5095..cb0916ad28 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java @@ -258,4 +258,11 @@ public void testRequestBlockedOnChannel() { verify(child3, times(1)).grpcChannelQueuedLatencies(5L); verify(child4, times(1)).grpcChannelQueuedLatencies(5L); } + + @Test + public void testGrpcMessageSent() { + compositeTracer.grpcMessageSent(); + verify(child3, times(1)).grpcMessageSent(); + verify(child4, times(1)).grpcMessageSent(); + } }