From a3a1d3140ed5b209f8a3199b53ce7a15f6a9b1c4 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Mon, 23 Sep 2024 17:07:11 -0400 Subject: [PATCH] rough sketch of optimizing away trailers Change-Id: Ibe6afdf52b246dcd71f0f689ee1353a52e11a2cb --- .../data/v2/stub/EnhancedBigtableStub.java | 76 ++++++++++++++++--- .../data/v2/stub/metrics/BigtableTracer.java | 5 ++ .../v2/stub/metrics/BuiltinMetricsTracer.java | 12 ++- .../stub/readrows/ReadRowsUserCallable.java | 36 ++++++++- 4 files changed, 118 insertions(+), 11 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index d0022a1a46..831d1284d9 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -39,12 +39,16 @@ import com.google.api.gax.retrying.RetryAlgorithm; import com.google.api.gax.retrying.RetryingExecutorWithContext; import com.google.api.gax.retrying.ScheduledRetryingExecutor; +import com.google.api.gax.retrying.SimpleStreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.Callables; import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.RequestParamsExtractor; +import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode.Code; +import com.google.api.gax.rpc.StreamController; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.tracing.ApiTracerFactory; @@ -104,6 +108,7 @@ import com.google.cloud.bigtable.data.v2.stub.changestream.GenerateInitialChangeStreamPartitionsUserCallable; import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamResumptionStrategy; import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamUserCallable; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory; @@ -794,8 +799,8 @@ private UnaryCallable> createSampleRowKeysCallable() { */ private UnaryCallable createMutateRowCallable() { String methodName = "MutateRow"; - UnaryCallable base = - GrpcRawCallableFactory.createUnaryCallable( + ServerStreamingCallable base = + GrpcRawCallableFactory.createServerStreamingCallable( GrpcCallSettings.newBuilder() .setMethodDescriptor(BigtableGrpc.getMutateRowMethod()) .setParamsExtractor( @@ -818,18 +823,71 @@ public Map extract(MutateRowRequest mutateRowRequest) { .build(), settings.mutateRowSettings().getRetryableCodes()); - UnaryCallable withStatsHeaders = - new StatsHeadersUnaryCallable<>(base); + ServerStreamingCallable withStatsHeaders = + new StatsHeadersServerStreamingCallable<>(base); - UnaryCallable withBigtableTracer = - new BigtableTracerUnaryCallable<>(withStatsHeaders); + ServerStreamingCallable withBigtableTracer = + new BigtableTracerStreamingCallable<>(withStatsHeaders); + + ServerStreamingCallSettings callSettings = ServerStreamingCallSettings.newBuilder() + .setRetryableCodes(settings.mutateRowSettings().getRetryableCodes()) + .setResumptionStrategy(new SimpleStreamResumptionStrategy()) + .setRetrySettings(settings.mutateRowSettings().getRetrySettings()) + .build(); - UnaryCallable retrying = - withRetries(withBigtableTracer, settings.mutateRowSettings()); + ServerStreamingCallable retrying = + withRetries(withBigtableTracer, callSettings); + + ServerStreamingUnaryCallable callable = new ServerStreamingUnaryCallable(retrying); return createUserFacingUnaryCallable( - methodName, new MutateRowCallable(retrying, requestContext)); + methodName, new MutateRowCallable(callable.first(), requestContext)); + } + + static class ServerStreamingUnaryCallable extends ServerStreamingCallable { + private final ServerStreamingCallable inner; + + ServerStreamingUnaryCallable(ServerStreamingCallable inner) { + this.inner = inner; + } + + @Override + public void call(ReqT reqT, ResponseObserver responseObserver, ApiCallContext apiCallContext) { + BigtableTracer tracer = (BigtableTracer) apiCallContext.getTracer(); + inner.call(reqT, new MyResponseObserver<>(responseObserver, tracer), apiCallContext); + } } + static class MyResponseObserver implements ResponseObserver { + private final ResponseObserver inner; + private final BigtableTracer tracer; + + public MyResponseObserver(ResponseObserver inner, BigtableTracer tracer) { + this.inner = inner; + this.tracer = tracer; + } + + @Override + public void onStart(StreamController streamController) { + inner.onStart(streamController); + } + + @Override + public void onResponse(T t) { + inner.onResponse(t); + tracer.operationFinishedEarly(); + } + + @Override + public void onError(Throwable throwable) { + inner.onError(throwable); + } + + @Override + public void onComplete() { + inner.onComplete(); + } + } + /** * Creates a callable chain to handle MutatesRows RPCs. This is meant to be used for manual diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index 3445514f7b..ef605942c1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -86,4 +86,9 @@ public void setLocations(String zone, String cluster) { public void grpcChannelQueuedLatencies(long queuedTimeMs) { // noop } + + + public void operationFinishedEarly() { + // noop + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index abd214d760..71b2595d17 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -132,6 +132,12 @@ public void close() {} }; } + @Override + public void operationFinishedEarly() { + attemptTimer.stop(); + operationTimer.stop(); + } + @Override public void operationSucceeded() { recordOperationCompletion(null); @@ -276,7 +282,11 @@ private void recordOperationCompletion(@Nullable Throwable status) { if (!opFinished.compareAndSet(false, true)) { return; } - operationTimer.stop(); + + // timer might've been stopped early because due to #operationFinishedEarly() + if (operationTimer.isRunning()) { + operationTimer.stop(); + } boolean isStreaming = operationType == OperationType.ServerStreaming; String statusStr = Util.extractStatus(status); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsUserCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsUserCallable.java index 3f1db6d0d8..bb99cb9dd6 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsUserCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsUserCallable.java @@ -19,9 +19,11 @@ import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; import com.google.bigtable.v2.ReadRowsRequest; import com.google.cloud.bigtable.data.v2.internal.RequestContext; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; /** * Simple wrapper for ReadRows to wrap the request protobufs. @@ -42,7 +44,39 @@ public ReadRowsUserCallable( @Override public void call(Query request, ResponseObserver responseObserver, ApiCallContext context) { + BigtableTracer tracer = (BigtableTracer) context.getTracer(); ReadRowsRequest innerRequest = request.toProto(requestContext); - inner.call(innerRequest, responseObserver, context); + inner.call(innerRequest, new EarlyFinishObserver<>(responseObserver, tracer), context); + } + + static class EarlyFinishObserver implements ResponseObserver { + private final ResponseObserver inner; + private final BigtableTracer tracer; + + EarlyFinishObserver(ResponseObserver inner, BigtableTracer tracer) { + this.inner = inner; + this.tracer = tracer; + } + + @Override + public void onStart(StreamController streamController) { + inner.onStart(streamController); + } + + @Override + public void onResponse(RowT row) { + tracer.operationFinishedEarly(); + inner.onResponse(row); + } + + @Override + public void onError(Throwable throwable) { + inner.onError(throwable); + } + + @Override + public void onComplete() { + inner.onComplete(); + } } }