Skip to content

Commit

Permalink
rough sketch of optimizing away trailers
Browse files Browse the repository at this point in the history
Change-Id: Ibe6afdf52b246dcd71f0f689ee1353a52e11a2cb
  • Loading branch information
igorbernstein2 committed Sep 23, 2024
1 parent 377437f commit a3a1d31
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.SimpleStreamResumptionStrategy;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.ApiTracerFactory;
Expand Down Expand Up @@ -104,6 +108,7 @@
import com.google.cloud.bigtable.data.v2.stub.changestream.GenerateInitialChangeStreamPartitionsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamUserCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
Expand Down Expand Up @@ -794,8 +799,8 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
*/
private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
String methodName = "MutateRow";
UnaryCallable<MutateRowRequest, MutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
ServerStreamingCallable<MutateRowRequest, MutateRowResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<MutateRowRequest, MutateRowResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getMutateRowMethod())
.setParamsExtractor(
Expand All @@ -818,18 +823,71 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
.build(),
settings.mutateRowSettings().getRetryableCodes());

UnaryCallable<MutateRowRequest, MutateRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);
ServerStreamingCallable<MutateRowRequest, MutateRowResponse> withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(base);

UnaryCallable<MutateRowRequest, MutateRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);
ServerStreamingCallable<MutateRowRequest, MutateRowResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(withStatsHeaders);

ServerStreamingCallSettings callSettings = ServerStreamingCallSettings.newBuilder()
.setRetryableCodes(settings.mutateRowSettings().getRetryableCodes())
.setResumptionStrategy(new SimpleStreamResumptionStrategy())
.setRetrySettings(settings.mutateRowSettings().getRetrySettings())
.build();

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
withRetries(withBigtableTracer, settings.mutateRowSettings());
ServerStreamingCallable<MutateRowRequest, MutateRowResponse> retrying =
withRetries(withBigtableTracer, callSettings);

ServerStreamingUnaryCallable callable = new ServerStreamingUnaryCallable(retrying);

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
methodName, new MutateRowCallable(callable.first(), requestContext));
}

static class ServerStreamingUnaryCallable<ReqT, RespT> extends ServerStreamingCallable<ReqT, RespT> {
private final ServerStreamingCallable<ReqT, RespT> inner;

ServerStreamingUnaryCallable(ServerStreamingCallable<ReqT, RespT> inner) {
this.inner = inner;
}

@Override
public void call(ReqT reqT, ResponseObserver<RespT> responseObserver, ApiCallContext apiCallContext) {
BigtableTracer tracer = (BigtableTracer) apiCallContext.getTracer();
inner.call(reqT, new MyResponseObserver<>(responseObserver, tracer), apiCallContext);
}
}
static class MyResponseObserver<T> implements ResponseObserver<T> {
private final ResponseObserver<T> inner;
private final BigtableTracer tracer;

public MyResponseObserver(ResponseObserver<T> inner, BigtableTracer tracer) {
this.inner = inner;
this.tracer = tracer;
}

@Override
public void onStart(StreamController streamController) {
inner.onStart(streamController);
}

@Override
public void onResponse(T t) {
inner.onResponse(t);
tracer.operationFinishedEarly();
}

@Override
public void onError(Throwable throwable) {
inner.onError(throwable);
}

@Override
public void onComplete() {
inner.onComplete();
}
}


/**
* Creates a callable chain to handle MutatesRows RPCs. This is meant to be used for manual
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ public void setLocations(String zone, String cluster) {
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
// noop
}


public void operationFinishedEarly() {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ public void close() {}
};
}

@Override
public void operationFinishedEarly() {
attemptTimer.stop();
operationTimer.stop();
}

@Override
public void operationSucceeded() {
recordOperationCompletion(null);
Expand Down Expand Up @@ -276,7 +282,11 @@ private void recordOperationCompletion(@Nullable Throwable status) {
if (!opFinished.compareAndSet(false, true)) {
return;
}
operationTimer.stop();

// timer might've been stopped early because due to #operationFinishedEarly()
if (operationTimer.isRunning()) {
operationTimer.stop();
}

boolean isStreaming = operationType == OperationType.ServerStreaming;
String statusStr = Util.extractStatus(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer;

/**
* Simple wrapper for ReadRows to wrap the request protobufs.
Expand All @@ -42,7 +44,39 @@ public ReadRowsUserCallable(

@Override
public void call(Query request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
BigtableTracer tracer = (BigtableTracer) context.getTracer();
ReadRowsRequest innerRequest = request.toProto(requestContext);
inner.call(innerRequest, responseObserver, context);
inner.call(innerRequest, new EarlyFinishObserver<>(responseObserver, tracer), context);
}

static class EarlyFinishObserver<RowT> implements ResponseObserver<RowT> {
private final ResponseObserver<RowT> inner;
private final BigtableTracer tracer;

EarlyFinishObserver(ResponseObserver<RowT> inner, BigtableTracer tracer) {
this.inner = inner;
this.tracer = tracer;
}

@Override
public void onStart(StreamController streamController) {
inner.onStart(streamController);
}

@Override
public void onResponse(RowT row) {
tracer.operationFinishedEarly();
inner.onResponse(row);
}

@Override
public void onError(Throwable throwable) {
inner.onError(throwable);
}

@Override
public void onComplete() {
inner.onComplete();
}
}
}

0 comments on commit a3a1d31

Please sign in to comment.