Skip to content

Commit

Permalink
chore: remodel unary callables as server streaming callables with an …
Browse files Browse the repository at this point in the history
…adapter at the end (#2403)

* chore: remodel unary callables as server streaming callables with an adapter at the end

Change-Id: I8708dff0e192d7647ef2cb361fc0992e1ddd2b24

* test + fixes

Change-Id: Id4c56656a829f5f4c7ab1170f5f980cf3cc3760c

* chore: generate libraries at Mon Nov  4 22:30:01 UTC 2024

* oops

Change-Id: I1bd8c318b3272925cd6b81601d7b1d7c772a853f

* more tests

Change-Id: I1c45f2058cadc1acb9c6abd87222be9eb233778c

* avoid multiple cancels

Change-Id: I4e05efaac6ae60f5827c6d666c3c6f6cebebaa54

* chore: generate libraries at Tue Nov  5 00:23:44 UTC 2024

* fix fallback

Change-Id: I654e70f0b34f5d4c3071ba3c2fed64ea183a865e

* chore: generate libraries at Tue Nov  5 00:42:37 UTC 2024

* proper fallback

Change-Id: Ic0106f3c6983edbb032aeba6e107e4324952397d

* Use transforming callable

Change-Id: I8d8474050e40cd819d3be2a5b251448f6eb8c94f

* fix npe

Change-Id: Ib589ca063369e26ef214eb89099e459981dafe83

* clean up logic

Change-Id: I4504c47143000d97554a96469d5f3fd368d08ef1

* oops, messed up splitting commits, this should've been part of this pr not the next

Change-Id: I16a35e19c50b7b7b855f4299cf41f0607b3e90bd

* typo

Change-Id: I8202e935975e1a55606265c502fe7573b8a4acb0

* disable watchdog for the new ReadRow callable chain

Change-Id: I4522719a65f24d27fb9dccde031c3b1cc04042c2

---------

Co-authored-by: cloud-java-bot <[email protected]>
  • Loading branch information
igorbernstein2 and cloud-java-bot authored Nov 5, 2024
1 parent bcf60c2 commit 6685aa3
Show file tree
Hide file tree
Showing 6 changed files with 620 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.AbstractApiFuture;
import com.google.api.core.ApiFuture;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.SpanName;
import com.google.common.base.Preconditions;
import io.grpc.Status;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Helper to convert a fake {@link ServerStreamingCallable} (ie only up to 1 response) into a {@link
* UnaryCallable}. It is intended to be the outermost callable of a chain.
*
* <p>Responsibilities:
*
* <ul>
* <li>Operation level metrics
* <li>Configuring the default call context
* <li>Converting the result to a future
*/
class BigtableUnaryOperationCallable<ReqT, RespT> extends UnaryCallable<ReqT, RespT> {
private static final Logger LOGGER =
Logger.getLogger(BigtableUnaryOperationCallable.class.getName());
Logger logger = LOGGER;

private final ServerStreamingCallable<ReqT, RespT> inner;
private final ApiCallContext defaultCallContext;
private final ApiTracerFactory tracerFactory;
private final SpanName spanName;
private final boolean allowNoResponse;

public BigtableUnaryOperationCallable(
ServerStreamingCallable<ReqT, RespT> inner,
ApiCallContext defaultCallContext,
ApiTracerFactory tracerFactory,
SpanName spanName,
boolean allowNoResponse) {
this.inner = inner;
this.defaultCallContext = defaultCallContext;
this.tracerFactory = tracerFactory;
this.spanName = spanName;
this.allowNoResponse = allowNoResponse;
}

@Override
public ApiFuture<RespT> futureCall(ReqT req, ApiCallContext apiCallContext) {
apiCallContext = defaultCallContext.merge(apiCallContext);

ApiTracer apiTracer =
tracerFactory.newTracer(
apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary);

apiCallContext = apiCallContext.withTracer(apiTracer);

UnaryFuture f = new UnaryFuture(apiTracer, allowNoResponse);
inner.call(req, f, apiCallContext);
return f;
}

class UnaryFuture extends AbstractApiFuture<RespT> implements ResponseObserver<RespT> {
private final ApiTracer tracer;
private final boolean allowNoResponse;

private StreamController controller;
private final AtomicBoolean upstreamCancelled = new AtomicBoolean();
private boolean responseReceived;
private @Nullable RespT response;

private UnaryFuture(ApiTracer tracer, boolean allowNoResponse) {
this.tracer = Preconditions.checkNotNull(tracer, "tracer can't be null");
this.allowNoResponse = allowNoResponse;
this.responseReceived = false;
}

@Override
public void onStart(StreamController controller) {
this.controller = controller;
controller.disableAutoInboundFlowControl();
// Request 2 to detect protocol bugs
controller.request(2);
}

/**
* Immediately cancel the future state and try to cancel the underlying operation. Will return
* false if the future is already resolved.
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (super.cancel(mayInterruptIfRunning)) {
cancelUpstream();
return true;
}
return false;
}

private void cancelUpstream() {
if (upstreamCancelled.compareAndSet(false, true)) {
controller.cancel();
}
}

@Override
public void onResponse(RespT resp) {
tracer.responseReceived();

// happy path - buffer the only responsse
if (!responseReceived) {
responseReceived = true;
this.response = resp;
return;
}

String msg =
String.format(
"Received multiple responses for a %s unary operation. Previous: %s, New: %s",
spanName, response, resp);
logger.log(Level.WARNING, msg);

InternalException error =
new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
if (setException(error)) {
tracer.operationFailed(error);
}

cancelUpstream();
}

@Override
public void onError(Throwable throwable) {
if (this.setException(throwable)) {
tracer.operationFailed(throwable);
} else if (isCancelled()) {
tracer.operationCancelled();
}
// The future might've been resolved due to double response
}

@Override
public void onComplete() {
if (allowNoResponse || responseReceived) {
if (set(response)) {
tracer.operationSucceeded();
return;
}
} else {
String msg = spanName + " unary operation completed without a response message";
InternalException e =
new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);

if (setException(e)) {
tracer.operationFailed(e);
return;
}
}

// check cancellation race
if (isCancelled()) {
tracer.operationCancelled();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.SimpleStreamResumptionStrategy;
import com.google.api.gax.retrying.StreamResumptionStrategy;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
Expand Down Expand Up @@ -136,6 +138,7 @@
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Functions;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -155,6 +158,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -559,27 +563,54 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
* </ul>
*/
public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
.setRetrySettings(settings.readRowSettings().getRetrySettings())
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
.build(),
rowAdapter);

ReadRowsUserCallable<RowT> readRowCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

ReadRowsFirstCallable<RowT> firstRow = new ReadRowsFirstCallable<>(readRowCallable);

UnaryCallable<Query, RowT> traced =
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
if (!EnhancedBigtableStubSettings.SKIP_TRAILERS) {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
.setRetrySettings(settings.readRowSettings().getRetrySettings())
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
.build(),
rowAdapter);

ReadRowsUserCallable<RowT> readRowCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);
ReadRowsFirstCallable<RowT> firstRow = new ReadRowsFirstCallable<>(readRowCallable);
UnaryCallable<Query, RowT> traced =
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
} else {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
.setRetrySettings(settings.readRowSettings().getRetrySettings())
.setIdleTimeoutDuration(Duration.ZERO)
.setWaitTimeoutDuration(Duration.ZERO)
.build(),
rowAdapter,
new SimpleStreamResumptionStrategy<>());
ServerStreamingCallable<Query, RowT> readRowCallable =
new TransformingServerStreamingCallable<>(
readRowsCallable,
(query) -> query.limit(1).toProto(requestContext),
Functions.identity());

return new BigtableUnaryOperationCallable<>(
readRowCallable,
clientContext.getDefaultCallContext(),
clientContext.getTracerFactory(),
getSpanName("ReadRow"),
/*allowNoResponses=*/ true);
}
}

private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRowsBaseCallable(
ServerStreamingCallSettings<ReqT, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {
return createReadRowsBaseCallable(
readRowsSettings, rowAdapter, new ReadRowsResumptionStrategy<RowT>(rowAdapter));
}
/**
* Creates a callable chain to handle ReadRows RPCs. The chain will:
*
Expand All @@ -596,8 +627,9 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
* <p>NOTE: the caller is responsible for adding tracing & metrics.
*/
private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRowsBaseCallable(
ServerStreamingCallSettings<ReqT, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {

ServerStreamingCallSettings<ReqT, Row> readRowsSettings,
RowAdapter<RowT> rowAdapter,
StreamResumptionStrategy<ReadRowsRequest, RowT> resumptionStrategy) {
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<ReadRowsRequest, ReadRowsResponse>newBuilder()
Expand Down Expand Up @@ -625,7 +657,7 @@ private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRo
// ReadRowsRequest -> ReadRowsResponse callable).
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
.setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
.setResumptionStrategy(resumptionStrategy)
.setRetryableCodes(readRowsSettings.getRetryableCodes())
.setRetrySettings(readRowsSettings.getRetrySettings())
.setIdleTimeout(readRowsSettings.getIdleTimeout())
Expand Down Expand Up @@ -1264,6 +1296,21 @@ private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnar
UnaryCallSettings<ReqT, RespT> callSettings,
Function<ReqT, BaseReqT> requestTransformer,
Function<BaseRespT, RespT> responseTranformer) {
if (EnhancedBigtableStubSettings.SKIP_TRAILERS) {
return createUnaryCallableNew(
methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
} else {
return createUnaryCallableOld(
methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
}
}

private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnaryCallableOld(
MethodDescriptor<BaseReqT, BaseRespT> methodDescriptor,
RequestParamsExtractor<BaseReqT> headerParamsFn,
UnaryCallSettings<ReqT, RespT> callSettings,
Function<ReqT, BaseReqT> requestTransformer,
Function<BaseRespT, RespT> responseTranformer) {

UnaryCallable<BaseReqT, BaseRespT> base =
GrpcRawCallableFactory.createUnaryCallable(
Expand Down Expand Up @@ -1300,6 +1347,50 @@ public ApiFuture<RespT> futureCall(ReqT reqT, ApiCallContext apiCallContext) {
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnaryCallableNew(
MethodDescriptor<BaseReqT, BaseRespT> methodDescriptor,
RequestParamsExtractor<BaseReqT> headerParamsFn,
UnaryCallSettings<ReqT, RespT> callSettings,
Function<ReqT, BaseReqT> requestTransformer,
Function<BaseRespT, RespT> responseTranformer) {

ServerStreamingCallable<BaseReqT, BaseRespT> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<BaseReqT, BaseRespT>newBuilder()
.setMethodDescriptor(methodDescriptor)
.setParamsExtractor(headerParamsFn)
.build(),
callSettings.getRetryableCodes());

base = new StatsHeadersServerStreamingCallable<>(base);

base = new BigtableTracerStreamingCallable<>(base);

base = withRetries(base, convertUnaryToServerStreamingSettings(callSettings));

ServerStreamingCallable<ReqT, RespT> transformed =
new TransformingServerStreamingCallable<>(base, requestTransformer, responseTranformer);

return new BigtableUnaryOperationCallable<>(
transformed,
clientContext.getDefaultCallContext(),
clientContext.getTracerFactory(),
getSpanName(methodDescriptor.getBareMethodName()),
/* allowNoResponse= */ false);
}

private static <ReqT, RespT>
ServerStreamingCallSettings<ReqT, RespT> convertUnaryToServerStreamingSettings(
UnaryCallSettings<?, ?> unarySettings) {
return ServerStreamingCallSettings.<ReqT, RespT>newBuilder()
.setResumptionStrategy(new SimpleStreamResumptionStrategy<>())
.setRetryableCodes(unarySettings.getRetryableCodes())
.setRetrySettings(unarySettings.getRetrySettings())
.setIdleTimeoutDuration(Duration.ZERO)
.setWaitTimeoutDuration(Duration.ZERO)
.build();
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarm =
GrpcRawCallableFactory.createUnaryCallable(
Expand Down
Loading

0 comments on commit 6685aa3

Please sign in to comment.