Skip to content

Commit

Permalink
feat: add an experimental feature to skip waiting for trailers for un…
Browse files Browse the repository at this point in the history
…ary ops (#2404)

* feat: add an experimental feature to skip waiting for trailers for unary ops

This is off by default and can be enabled using an environment variable.
When enabled, BigtableUnaryOperationCallable will resolve the user visible future
immediately when a response is available and will tell metrics to freeze all timers.
Metrics will still wait for the trailers in the background for necessary metadata to
publish the frozen timer values.

Change-Id: I2101ff375de711693720af4fd2e9535aa5355f9d

* more testing

Change-Id: Ifc95aa89c080ee8395d43adce1172f11354c306e

* cosmetics

Change-Id: I679aeac3ec7475757ce769f4c64ede1130b35ebd

* comment

Change-Id: Ia535905f4fed6f30854c05ceb300af39877ca4a1

* fix test

Change-Id: I77664e40c9fd2d52b609f5063386b158cbc1e81e
  • Loading branch information
igorbernstein2 authored Nov 7, 2024
1 parent 75d4105 commit cf58f26
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.SpanName;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import io.grpc.Status;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Helper to convert a fake {@link ServerStreamingCallable} (ie only up to 1 response) into a {@link
Expand Down Expand Up @@ -73,9 +74,10 @@ public BigtableUnaryOperationCallable(
public ApiFuture<RespT> futureCall(ReqT req, ApiCallContext apiCallContext) {
apiCallContext = defaultCallContext.merge(apiCallContext);

ApiTracer apiTracer =
tracerFactory.newTracer(
apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary);
BigtableTracer apiTracer =
(BigtableTracer)
tracerFactory.newTracer(
apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary);

apiCallContext = apiCallContext.withTracer(apiTracer);

Expand All @@ -85,18 +87,15 @@ public ApiFuture<RespT> futureCall(ReqT req, ApiCallContext apiCallContext) {
}

class UnaryFuture extends AbstractApiFuture<RespT> implements ResponseObserver<RespT> {
private final ApiTracer tracer;
private final BigtableTracer tracer;
private final boolean allowNoResponse;

private StreamController controller;
private final AtomicBoolean upstreamCancelled = new AtomicBoolean();
private boolean responseReceived;
private @Nullable RespT response;

private UnaryFuture(ApiTracer tracer, boolean allowNoResponse) {
private UnaryFuture(BigtableTracer tracer, boolean allowNoResponse) {
this.tracer = Preconditions.checkNotNull(tracer, "tracer can't be null");
this.allowNoResponse = allowNoResponse;
this.responseReceived = false;
}

@Override
Expand Down Expand Up @@ -130,23 +129,39 @@ private void cancelUpstream() {
public void onResponse(RespT resp) {
tracer.responseReceived();

// happy path - buffer the only responsse
if (!responseReceived) {
responseReceived = true;
this.response = resp;
if (set(resp)) {
tracer.operationFinishEarly();
return;
}

String msg =
String.format(
"Received multiple responses for a %s unary operation. Previous: %s, New: %s",
spanName, response, resp);
logger.log(Level.WARNING, msg);
// At this point we are guaranteed that the future has been resolved. However we need to check
// why.
// We know it's not because it was resolved with the current response. Moreover, since the
// future
// is resolved, our only means to flag the error is to log.
// So there are 3 possibilities:
// 1. user cancelled the future
// 2. this is an extra response and the previous one resolved the future
// 3. we got a response after the rpc failed (this should never happen and would be a bad bug)

InternalException error =
new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
if (setException(error)) {
tracer.operationFailed(error);
if (isCancelled()) {
return;
}

try {
RespT prev = Futures.getDone(this);
String msg =
String.format(
"Received response after future is resolved for a %s unary operation. previous: %s, New response: %s",
spanName, prev, resp);
logger.log(Level.WARNING, msg);
} catch (ExecutionException e) {
// Should never happen
String msg =
String.format(
"Received response after future resolved as a failure for a %s unary operation. New response: %s",
spanName, resp);
logger.log(Level.WARNING, msg, e.getCause());
}

cancelUpstream();
Expand All @@ -158,18 +173,24 @@ public void onError(Throwable throwable) {
tracer.operationFailed(throwable);
} else if (isCancelled()) {
tracer.operationCancelled();
} else {
// At this point the has been resolved, so we ignore the error
tracer.operationSucceeded();
}
// The future might've been resolved due to double response
}

@Override
public void onComplete() {
if (allowNoResponse || responseReceived) {
if (set(response)) {
tracer.operationSucceeded();
return;
}
} else {
if (allowNoResponse && set(null)) {
tracer.operationSucceeded();
return;

// Under normal circumstances the future wouldve been resolved in onResponse or via
// set(null) if it expected for
// the rpc to not have a response. So if aren't done, the only reason is that we didn't get
// a response
// but were expecting one
} else if (!isDone()) {
String msg = spanName + " unary operation completed without a response message";
InternalException e =
new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
Expand All @@ -183,7 +204,10 @@ public void onComplete() {
// check cancellation race
if (isCancelled()) {
tracer.operationCancelled();
return;
}

tracer.operationSucceeded();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
* </ul>
*/
public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
if (!EnhancedBigtableStubSettings.SKIP_TRAILERS) {
if (!settings.getEnableSkipTrailers()) {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
Expand Down Expand Up @@ -1347,7 +1347,7 @@ private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnar
UnaryCallSettings<ReqT, RespT> callSettings,
Function<ReqT, BaseReqT> requestTransformer,
Function<BaseRespT, RespT> responseTranformer) {
if (EnhancedBigtableStubSettings.SKIP_TRAILERS) {
if (settings.getEnableSkipTrailers()) {
return createUnaryCallableNew(
methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private static final boolean DIRECT_PATH_ENABLED =
Boolean.parseBoolean(System.getenv("CBT_ENABLE_DIRECTPATH"));

static final boolean SKIP_TRAILERS =
private static final boolean SKIP_TRAILERS =
Optional.ofNullable(System.getenv("CBT_SKIP_HEADERS"))
.map(Boolean::parseBoolean)
.orElse(DIRECT_PATH_ENABLED);
Expand Down Expand Up @@ -240,6 +240,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final Map<String, String> jwtAudienceMapping;
private final boolean enableRoutingCookie;
private final boolean enableRetryInfo;
private final boolean enableSkipTrailers;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -287,6 +288,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
jwtAudienceMapping = builder.jwtAudienceMapping;
enableRoutingCookie = builder.enableRoutingCookie;
enableRetryInfo = builder.enableRetryInfo;
enableSkipTrailers = builder.enableSkipTrailers;
metricsProvider = builder.metricsProvider;
metricsEndpoint = builder.metricsEndpoint;

Expand Down Expand Up @@ -373,6 +375,10 @@ public boolean getEnableRetryInfo() {
return enableRetryInfo;
}

boolean getEnableSkipTrailers() {
return enableSkipTrailers;
}

/**
* Gets the Google Cloud Monitoring endpoint for publishing client side metrics. If it's null,
* client will publish metrics to the default monitoring endpoint.
Expand Down Expand Up @@ -683,6 +689,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private Map<String, String> jwtAudienceMapping;
private boolean enableRoutingCookie;
private boolean enableRetryInfo;
private boolean enableSkipTrailers;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand Down Expand Up @@ -721,6 +728,7 @@ private Builder() {
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
this.enableRoutingCookie = true;
this.enableRetryInfo = true;
this.enableSkipTrailers = SKIP_TRAILERS;
metricsProvider = DefaultMetricsProvider.INSTANCE;

// Defaults provider
Expand Down Expand Up @@ -1085,6 +1093,11 @@ public boolean getEnableRetryInfo() {
return enableRetryInfo;
}

Builder setEnableSkipTrailers(boolean enabled) {
this.enableSkipTrailers = enabled;
return this;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -1212,6 +1225,7 @@ public String toString() {
.add("jwtAudienceMapping", jwtAudienceMapping)
.add("enableRoutingCookie", enableRoutingCookie)
.add("enableRetryInfo", enableRetryInfo)
.add("enableSkipTrailers", enableSkipTrailers)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ public void afterResponse(long applicationLatency) {
// noop
}

/**
* Used by BigtableUnaryOperationCallable to signal that the user visible portion of the RPC is
* complete and that metrics should freeze the timers and then publish the frozen values when the
* internal portion of the operation completes.
*/
public void operationFinishEarly() {}

/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class BuiltinMetricsTracer extends BigtableTracer {
private final SpanName spanName;

// Operation level metrics
private final AtomicBoolean operationFinishedEarly = new AtomicBoolean();
private final AtomicBoolean opFinished = new AtomicBoolean();
private final Stopwatch operationTimer = Stopwatch.createStarted();
private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted();
Expand Down Expand Up @@ -142,6 +143,13 @@ public void close() {}
};
}

@Override
public void operationFinishEarly() {
operationFinishedEarly.set(true);
attemptTimer.stop();
operationTimer.stop();
}

@Override
public void operationSucceeded() {
recordOperationCompletion(null);
Expand Down Expand Up @@ -207,6 +215,11 @@ public void attemptPermanentFailure(Throwable throwable) {
@Override
public void onRequest(int requestCount) {
requestLeft.accumulateAndGet(requestCount, IntMath::saturatedAdd);

if (operationFinishedEarly.get()) {
return;
}

if (flowControlIsDisabled) {
// On request is only called when auto flow control is disabled. When auto flow control is
// disabled, server latency is measured between onRequest and onResponse.
Expand All @@ -220,6 +233,10 @@ public void onRequest(int requestCount) {

@Override
public void responseReceived() {
if (operationFinishedEarly.get()) {
return;
}

if (firstResponsePerOpTimer.isRunning()) {
firstResponsePerOpTimer.stop();
}
Expand All @@ -241,6 +258,9 @@ public void responseReceived() {
@Override
public void afterResponse(long applicationLatency) {
if (!flowControlIsDisabled || requestLeft.decrementAndGet() > 0) {
if (operationFinishedEarly.get()) {
return;
}
// When auto flow control is enabled, request will never be called, so server latency is
// measured between after the last response is processed and before the next response is
// received. If flow control is disabled but requestLeft is greater than 0,
Expand Down Expand Up @@ -295,10 +315,14 @@ public void disableFlowControl() {
}

private void recordOperationCompletion(@Nullable Throwable status) {
if (operationFinishedEarly.get()) {
status = null; // force an ok
}

if (!opFinished.compareAndSet(false, true)) {
return;
}
operationTimer.stop();
long operationLatencyNano = operationTimer.elapsed(TimeUnit.NANOSECONDS);

boolean isStreaming = operationType == OperationType.ServerStreaming;
String statusStr = Util.extractStatus(status);
Expand All @@ -317,8 +341,6 @@ private void recordOperationCompletion(@Nullable Throwable status) {
.put(STATUS_KEY, statusStr)
.build();

long operationLatencyNano = operationTimer.elapsed(TimeUnit.NANOSECONDS);

// Only record when retry count is greater than 0 so the retry
// graph will be less confusing
if (attemptCount > 1) {
Expand All @@ -339,6 +361,9 @@ private void recordOperationCompletion(@Nullable Throwable status) {
}

private void recordAttemptCompletion(@Nullable Throwable status) {
if (operationFinishedEarly.get()) {
status = null; // force an ok
}
// If the attempt failed, the time spent in retry should be counted in application latency.
// Stop the stopwatch and decrement requestLeft.
synchronized (timerLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public void close() {
};
}

@Override
public void operationFinishEarly() {
for (BigtableTracer tracer : bigtableTracers) {
tracer.operationFinishEarly();
}
}

@Override
public void operationSucceeded() {
for (ApiTracer child : children) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ public void close() {}
};
}

@Override
public void operationFinishEarly() {
attemptTimer.stop();
operationTimer.stop();
}

@Override
public void operationSucceeded() {
recordOperationCompletion(null);
Expand All @@ -103,7 +109,6 @@ private void recordOperationCompletion(@Nullable Throwable throwable) {
if (!opFinished.compareAndSet(false, true)) {
return;
}
operationTimer.stop();

long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS);

Expand Down
Loading

0 comments on commit cf58f26

Please sign in to comment.