Skip to content

Commit

Permalink
fix: fix client blocking latency
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Sep 25, 2024
1 parent 0330d77 commit 1efb3ab
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.common.base.Stopwatch;
import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
import java.util.concurrent.TimeUnit;

/**
* Records the time a request is enqueued in a grpc channel queue. This a bridge between gRPC stream
Expand All @@ -28,21 +25,15 @@
*/
class BigtableGrpcStreamTracer extends ClientStreamTracer {

private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final BigtableTracer tracer;

public BigtableGrpcStreamTracer(BigtableTracer tracer) {
this.tracer = tracer;
}

@Override
public void streamCreated(Attributes transportAttrs, Metadata headers) {
stopwatch.start();
}

@Override
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
tracer.grpcChannelQueuedLatencies(stopwatch.elapsed(TimeUnit.NANOSECONDS));
tracer.grpcMessageSent();
}

static class Factory extends ClientStreamTracer.Factory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,13 @@ public void setLocations(String zone, String cluster) {
// noop
}

/** @deprecated {@link #grpcMessageSent()} is called instead. */
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
// noop
}

/** Called when the message is sent on a grpc channel. */
public void grpcMessageSent() {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ public void batchRequestThrottled(long throttledTimeNanos) {
}

@Override
public void grpcChannelQueuedLatencies(long queuedTimeNanos) {
totalClientBlockingTime.addAndGet(queuedTimeNanos);
public void grpcMessageSent() {
totalClientBlockingTime.addAndGet(attemptTimer.elapsed(TimeUnit.NANOSECONDS));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,11 @@ public void grpcChannelQueuedLatencies(long queuedTimeMs) {
tracer.grpcChannelQueuedLatencies(queuedTimeMs);
}
}

@Override
public void grpcMessageSent() {
for (BigtableTracer tracer : bigtableTracers) {
tracer.grpcMessageSent();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -692,9 +692,8 @@ public void testQueuedOnChannelUnaryLatencies() {
.put(CLIENT_NAME_KEY, CLIENT_NAME)
.build();

long expected = CHANNEL_BLOCKING_LATENCY * 2 / 3;
long actual = getAggregatedValue(clientLatency, attributes);
assertThat(actual).isAtLeast(expected);
assertThat(actual).isAtLeast(CHANNEL_BLOCKING_LATENCY);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,11 @@ public void testRequestBlockedOnChannel() {
verify(child3, times(1)).grpcChannelQueuedLatencies(5L);
verify(child4, times(1)).grpcChannelQueuedLatencies(5L);
}

@Test
public void testGrpcMessageSent() {
compositeTracer.grpcMessageSent();
verify(child3, times(1)).grpcMessageSent();
verify(child4, times(1)).grpcMessageSent();
}
}

0 comments on commit 1efb3ab

Please sign in to comment.