Skip to content

Commit

Permalink
feat: add internal "deadline remaining" client side metric #2341 (#2370)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)
- [ ] Rollback plan is reviewed and LGTMed
- [ ] All new data plane features have a completed end to end testing plan

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
djyau authored Nov 6, 2024
1 parent 6685aa3 commit 75d4105
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import com.google.cloud.bigtable.data.v2.stub.changestream.GenerateInitialChangeStreamPartitionsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamUserCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
Expand Down Expand Up @@ -545,7 +546,12 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.readRowsSettings().getRetrySettings().getTotalTimeout()));
}

/**
Expand Down Expand Up @@ -579,7 +585,12 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
UnaryCallable<Query, RowT> traced =
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.readRowSettings().getRetrySettings().getTotalTimeout()));
} else {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(
Expand All @@ -599,7 +610,11 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>

return new BigtableUnaryOperationCallable<>(
readRowCallable,
clientContext.getDefaultCallContext(),
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.readRowSettings().getRetrySettings().getTotalTimeout()),
clientContext.getTracerFactory(),
getSpanName("ReadRow"),
/*allowNoResponses=*/ true);
Expand Down Expand Up @@ -715,7 +730,12 @@ private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
UnaryCallable<Query, List<RowT>> traced =
new TracedUnaryCallable<>(tracedBatcher, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.bulkReadRowsSettings().getRetrySettings().getTotalTimeout()));
}

/**
Expand Down Expand Up @@ -780,7 +800,14 @@ public ApiFuture<List<KeyOffset>> futureCall(String s, ApiCallContext apiCallCon
retryable = withRetries(withBigtableTracer, settings.sampleRowKeysSettings());

return createUserFacingUnaryCallable(
methodName, new SampleRowKeysCallableWithRequest(retryable, requestContext));
methodName,
new SampleRowKeysCallableWithRequest(retryable, requestContext)
.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.sampleRowKeysSettings().getRetrySettings().getTotalTimeout())));
}

/**
Expand Down Expand Up @@ -903,7 +930,12 @@ private UnaryCallable<BulkMutation, MutateRowsAttemptResult> createMutateRowsBas
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.bulkMutateRowsSettings().getRetrySettings().getTotalTimeout()));
}

/**
Expand Down Expand Up @@ -1108,7 +1140,15 @@ private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable(
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings
.generateInitialChangeStreamPartitionsSettings()
.getRetrySettings()
.getTotalTimeout()));
}

/**
Expand Down Expand Up @@ -1180,7 +1220,12 @@ private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable(
new TracedServerStreamingCallable<>(
readChangeStreamUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.readChangeStreamSettings().getRetrySettings().getTotalTimeout()));
}

/**
Expand Down Expand Up @@ -1266,7 +1311,13 @@ public Map<String, String> extract(ExecuteQueryRequest executeQueryRequest) {
new TracedServerStreamingCallable<>(retries, clientContext.getTracerFactory(), span);

return new ExecuteQueryCallable(
traced.withDefaultCallContext(clientContext.getDefaultCallContext()), requestContext);
traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.executeQuerySettings().getRetrySettings().getTotalTimeout())),
requestContext);
}

/**
Expand Down Expand Up @@ -1344,7 +1395,12 @@ public ApiFuture<RespT> futureCall(ReqT reqT, ApiCallContext apiCallContext) {
clientContext.getTracerFactory(),
getSpanName(methodDescriptor.getBareMethodName()));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
callSettings.getRetrySettings().getTotalTimeout()));
}

private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnaryCallableNew(
Expand Down Expand Up @@ -1373,7 +1429,11 @@ private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnar

return new BigtableUnaryOperationCallable<>(
transformed,
clientContext.getDefaultCallContext(),
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
callSettings.getRetrySettings().getTotalTimeout()),
clientContext.getTracerFactory(),
getSpanName(methodDescriptor.getBareMethodName()),
/* allowNoResponse= */ false);
Expand Down Expand Up @@ -1407,7 +1467,12 @@ public Map<String, String> extract(PingAndWarmRequest request) {
})
.build(),
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
return pingAndWarm.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.pingAndWarmSettings().getRetrySettings().getTotalTimeout()));
}

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METER_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.OPERATION_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.PER_CONNECTION_ERROR_COUNT_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.REMAINING_DEADLINE_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.RETRY_COUNT_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.SERVER_LATENCIES_NAME;

Expand Down Expand Up @@ -115,7 +116,8 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {
CLIENT_BLOCKING_LATENCIES_NAME,
APPLICATION_BLOCKING_LATENCIES_NAME,
RETRY_COUNT_NAME,
CONNECTIVITY_ERROR_COUNT_NAME)
CONNECTIVITY_ERROR_COUNT_NAME,
REMAINING_DEADLINE_NAME)
.stream()
.map(m -> METER_NAME + m)
.collect(ImmutableList.toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.BaseApiTracer;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
Expand All @@ -30,6 +32,10 @@ public class BigtableTracer extends BaseApiTracer {

private volatile int attempt = 0;

@InternalApi("for internal use only")
public static final ApiCallContext.Key<Duration> OPERATION_TIMEOUT_KEY =
ApiCallContext.Key.create("OPERATION_TIMEOUT");

@Override
public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
Expand Down Expand Up @@ -93,4 +99,12 @@ public void grpcChannelQueuedLatencies(long queuedTimeMs) {
public void grpcMessageSent() {
// noop
}

/**
* Record the operation timeout from user settings for calculating remaining deadline. This will
* be called in BuiltinMetricsTracer.
*/
public void setOperationTimeout(Duration operationTimeout) {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
Expand All @@ -26,6 +27,7 @@
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.threeten.bp.Duration;

/**
* This callable will
Expand Down Expand Up @@ -62,6 +64,11 @@ public void call(
BigtableTracerResponseObserver<ResponseT> innerObserver =
new BigtableTracerResponseObserver<>(
responseObserver, (BigtableTracer) context.getTracer(), responseMetadata);
GrpcCallContext callContext = (GrpcCallContext) context;
Duration deadline = callContext.getOption(BigtableTracer.OPERATION_TIMEOUT_KEY);
if (deadline != null) {
((BigtableTracer) context.getTracer()).setOperationTimeout(deadline);
}
innerCallable.call(
request,
innerObserver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import javax.annotation.Nonnull;
import org.threeten.bp.Duration;

/**
* This callable will:
Expand Down Expand Up @@ -58,6 +60,11 @@ public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context)
BigtableTracerUnaryCallback<ResponseT> callback =
new BigtableTracerUnaryCallback<ResponseT>(
(BigtableTracer) context.getTracer(), responseMetadata);
GrpcCallContext callContext = (GrpcCallContext) context;
Duration deadline = callContext.getOption(BigtableTracer.OPERATION_TIMEOUT_KEY);
if (deadline != null) {
((BigtableTracer) context.getTracer()).setOperationTimeout(deadline);
}
ApiFuture<ResponseT> future =
innerCallable.futureCall(
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class BuiltinMetricsConstants {
static final String SERVER_LATENCIES_NAME = "server_latencies";
static final String FIRST_RESPONSE_LATENCIES_NAME = "first_response_latencies";
static final String APPLICATION_BLOCKING_LATENCIES_NAME = "application_latencies";
static final String REMAINING_DEADLINE_NAME = "remaining_deadline";
static final String CLIENT_BLOCKING_LATENCIES_NAME = "throttling_latencies";
static final String PER_CONNECTION_ERROR_COUNT_NAME = "per_connection_error_count";

Expand Down Expand Up @@ -214,6 +215,16 @@ public static Map<InstrumentSelector, View> getAllViews() {
ImmutableSet.<AttributeKey>builder()
.add(BIGTABLE_PROJECT_ID_KEY, INSTANCE_ID_KEY, APP_PROFILE_KEY, CLIENT_NAME_KEY)
.build());
defineView(
views,
REMAINING_DEADLINE_NAME,
AGGREGATION_WITH_MILLIS_HISTOGRAM,
InstrumentType.HISTOGRAM,
"ms",
ImmutableSet.<AttributeKey>builder()
.addAll(COMMON_ATTRIBUTES)
.add(STREAMING_KEY, STATUS_KEY)
.build());

return views.build();
}
Expand Down
Loading

0 comments on commit 75d4105

Please sign in to comment.