Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Change-Id: Icd88367491120c32045ff11bf7b89cdcc0ea8b04
  • Loading branch information
igorbernstein2 committed Oct 3, 2024
1 parent a08a67b commit ca794e2
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.SimpleStreamResumptionStrategy;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
Expand Down Expand Up @@ -123,6 +124,8 @@
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsPartialErrorRetryAlgorithm;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.opt.UnaryOverStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.opt.UnaryOverStreamingTracerModCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsFirstCallable;
Expand Down Expand Up @@ -154,6 +157,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -563,11 +567,13 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>

ReadRowsFirstCallable<RowT> firstRow = new ReadRowsFirstCallable<>(readRowCallable);

UnaryCallable<Query, RowT> traced =
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
UnaryOverStreamingTracerModCallable modCallable =
new UnaryOverStreamingTracerModCallable(firstRow);
TracedServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
modCallable, clientContext.getTracerFactory(), getSpanName("ReadRow"));
UnaryOverStreamingCallable<Query, RowT> unaryAdapter = new UnaryOverStreamingCallable<>(traced);
return unaryAdapter.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -794,8 +800,8 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
*/
private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
String methodName = "MutateRow";
UnaryCallable<MutateRowRequest, MutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
ServerStreamingCallable<MutateRowRequest, MutateRowResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<MutateRowRequest, MutateRowResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getMutateRowMethod())
.setParamsExtractor(
Expand All @@ -818,17 +824,26 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
.build(),
settings.mutateRowSettings().getRetryableCodes());

UnaryCallable<MutateRowRequest, MutateRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);
ServerStreamingCallable<MutateRowRequest, MutateRowResponse> withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(base);

UnaryCallable<MutateRowRequest, MutateRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);
ServerStreamingCallable<MutateRowRequest, MutateRowResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(withStatsHeaders);

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
withRetries(withBigtableTracer, settings.mutateRowSettings());
ServerStreamingCallable<MutateRowRequest, MutateRowResponse> retrying =
withRetries(
withBigtableTracer,
convertUnaryToServerStreamingSettings(settings.mutateRowSettings()));

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
UnaryOverStreamingTracerModCallable<MutateRowRequest, MutateRowResponse> modCallable =
new UnaryOverStreamingTracerModCallable<>(retrying);
TracedServerStreamingCallable<MutateRowRequest, MutateRowResponse> traced =
new TracedServerStreamingCallable<>(
modCallable, clientContext.getTracerFactory(), getSpanName(methodName));
UnaryOverStreamingCallable<MutateRowRequest, MutateRowResponse> callable =
new UnaryOverStreamingCallable<>(traced);
MutateRowCallable mutateRowCallable = new MutateRowCallable(callable, requestContext);
return mutateRowCallable.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -1512,6 +1527,18 @@ private SpanName getSpanName(String methodName) {
return SpanName.of(CLIENT_NAME, methodName);
}

private <ReqT, RespT>
ServerStreamingCallSettings<ReqT, RespT> convertUnaryToServerStreamingSettings(
UnaryCallSettings<?, ?> unarySettings) {
return ServerStreamingCallSettings.<ReqT, RespT>newBuilder()
.setResumptionStrategy(new SimpleStreamResumptionStrategy<>())
.setRetryableCodes(unarySettings.getRetryableCodes())
.setRetrySettings(unarySettings.getRetrySettings())
.setIdleTimeoutDuration(Duration.ZERO)
.setWaitTimeoutDuration(Duration.ZERO)
.build();
}

@Override
public void close() {
if (closeClientContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.api.core.BetaApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.BaseApiTracer;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -86,4 +87,12 @@ public void setLocations(String zone, String cluster) {
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
// noop
}

public void overrideOperationType(ApiTracerFactory.OperationType operationType) {
// noop
}

public void operationFinishedEarly() {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
class BuiltinMetricsTracer extends BigtableTracer {

private static final String NAME = "java-bigtable/" + Version.VERSION;
private final OperationType operationType;
private volatile OperationType operationType;
private final SpanName spanName;

// Operation level metrics
Expand Down Expand Up @@ -132,6 +132,21 @@ public void close() {}
};
}

@Override
public void overrideOperationType(OperationType operationType) {
this.operationType = operationType;
}

@Override
public void operationFinishedEarly() {
if (attemptTimer.isRunning()) {
attemptTimer.stop();
}
if (operationTimer.isRunning()) {
operationTimer.stop();
}
}

@Override
public void operationSucceeded() {
recordOperationCompletion(null);
Expand Down Expand Up @@ -276,7 +291,11 @@ private void recordOperationCompletion(@Nullable Throwable status) {
if (!opFinished.compareAndSet(false, true)) {
return;
}
operationTimer.stop();

// timer might've been stopped early because due to #operationFinishedEarly()
if (operationTimer.isRunning()) {
operationTimer.stop();
}

boolean isStreaming = operationType == OperationType.ServerStreaming;
String statusStr = Util.extractStatus(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -62,6 +63,24 @@ public void close() {
};
}

@Override
public void overrideOperationType(ApiTracerFactory.OperationType operationType) {
for (ApiTracer child : children) {
if (child instanceof BigtableTracer) {
((BigtableTracer) child).overrideOperationType(operationType);
}
}
}

@Override
public void operationFinishedEarly() {
for (ApiTracer child : children) {
if (child instanceof BigtableTracer) {
((BigtableTracer) child).operationFinishedEarly();
}
}
}

@Override
public void operationSucceeded() {
for (ApiTracer child : children) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

class MetricsTracer extends BigtableTracer {

private final OperationType operationType;
private volatile OperationType operationType;

private final Tagger tagger;
private final StatsRecorder stats;
Expand Down Expand Up @@ -84,6 +84,17 @@ public void close() {}
};
}

@Override
public void overrideOperationType(OperationType operationType) {
this.operationType = operationType;
}

@Override
public void operationFinishedEarly() {
attemptTimer.stop();
operationTimer.stop();
}

@Override
public void operationSucceeded() {
recordOperationCompletion(null);
Expand All @@ -103,7 +114,9 @@ private void recordOperationCompletion(@Nullable Throwable throwable) {
if (!opFinished.compareAndSet(false, true)) {
return;
}
operationTimer.stop();
if (operationTimer.isRunning()) {
operationTimer.stop();
}

long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,18 @@
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StateCheckingResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.bigtable.data.v2.models.Query;

/**
* Enhancement for `readRowsCallable().first()` to gracefully limit the row count instead of
* cancelling the RPC
*/
@InternalApi
public class ReadRowsFirstCallable<RowT> extends UnaryCallable<Query, RowT> {
public class ReadRowsFirstCallable<RowT> extends ServerStreamingCallable<Query, RowT> {

private final ServerStreamingCallable<Query, RowT> inner;

Expand All @@ -39,41 +35,7 @@ public ReadRowsFirstCallable(ServerStreamingCallable<Query, RowT> inner) {
}

@Override
public ApiFuture<RowT> futureCall(Query query, ApiCallContext context) {
ReadRowsFirstResponseObserver<RowT> observer = new ReadRowsFirstResponseObserver<>();
this.inner.call(query.limit(1), observer, context);
return observer.getFuture();
}

private class ReadRowsFirstResponseObserver<RowT> extends StateCheckingResponseObserver<RowT> {
private StreamController innerController;
private RowT firstRow;
private SettableApiFuture<RowT> settableFuture = SettableApiFuture.create();

@Override
protected void onStartImpl(StreamController streamController) {
this.innerController = streamController;
}

@Override
protected void onResponseImpl(RowT response) {
if (firstRow == null) {
this.firstRow = response;
}
}

@Override
protected void onErrorImpl(Throwable throwable) {
settableFuture.setException(throwable);
}

@Override
protected void onCompleteImpl() {
settableFuture.set(firstRow);
}

protected ApiFuture<RowT> getFuture() {
return settableFuture;
}
public void call(Query request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
inner.call(request.limit(1), responseObserver, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,9 @@ public void testMutateRows() {
}

@Test
public void testMutateRow() {
public void testMutateRow() throws InterruptedException {
client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v"));
client.close();

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata).hasSize(fakeService.count.get());
Expand Down Expand Up @@ -682,6 +683,7 @@ public void readRows(
@Override
public void mutateRow(
MutateRowRequest request, StreamObserver<MutateRowResponse> responseObserver) {
System.out.println("server: mutateRow " + count.get());
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
maybePopulateCookie(trailers, "mutateRow");
Expand Down Expand Up @@ -801,6 +803,7 @@ private void maybePopulateCookie(Metadata trailers, String label) {
trailers.put(ROUTING_COOKIE_2, testCookie);
trailers.put(BAD_KEY, "bad-key");
}
System.out.println("server trailers (" + returnCookie + "): " + trailers);
}
}
}
Loading

0 comments on commit ca794e2

Please sign in to comment.