Skip to content

Commit

Permalink
oops, messed up splitting commits, this should've been part of this p…
Browse files Browse the repository at this point in the history
…r not the next

Change-Id: I16a35e19c50b7b7b855f4299cf41f0607b3e90bd
  • Loading branch information
igorbernstein2 committed Nov 5, 2024
1 parent 14b97b4 commit 35272b6
Showing 1 changed file with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.SimpleStreamResumptionStrategy;
import com.google.api.gax.retrying.StreamResumptionStrategy;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
Expand Down Expand Up @@ -562,16 +563,16 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
* </ul>
*/
public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
.setRetrySettings(settings.readRowSettings().getRetrySettings())
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
.build(),
rowAdapter);

if (!EnhancedBigtableStubSettings.SKIP_TRAILERS) {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
.setRetrySettings(settings.readRowSettings().getRetrySettings())
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
.build(),
rowAdapter);

ReadRowsUserCallable<RowT> readRowCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);
ReadRowsFirstCallable<RowT> firstRow = new ReadRowsFirstCallable<>(readRowCallable);
Expand All @@ -580,6 +581,15 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
} else {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
.setRetrySettings(settings.readRowSettings().getRetrySettings())
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
.build(),
rowAdapter,
new SimpleStreamResumptionStrategy<>());
ServerStreamingCallable<Query, RowT> readRowCallable =
new TransformingServerStreamingCallable<>(
readRowsCallable,
Expand All @@ -595,6 +605,11 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
}
}

private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRowsBaseCallable(
ServerStreamingCallSettings<ReqT, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {
return createReadRowsBaseCallable(
readRowsSettings, rowAdapter, new ReadRowsResumptionStrategy<RowT>(rowAdapter));
}
/**
* Creates a callable chain to handle ReadRows RPCs. The chain will:
*
Expand All @@ -611,8 +626,9 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
* <p>NOTE: the caller is responsible for adding tracing & metrics.
*/
private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRowsBaseCallable(
ServerStreamingCallSettings<ReqT, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {

ServerStreamingCallSettings<ReqT, Row> readRowsSettings,
RowAdapter<RowT> rowAdapter,
StreamResumptionStrategy<ReadRowsRequest, RowT> resumptionStrategy) {
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<ReadRowsRequest, ReadRowsResponse>newBuilder()
Expand Down Expand Up @@ -653,7 +669,7 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
// ReadRowsRequest -> ReadRowsResponse callable).
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
.setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
.setResumptionStrategy(resumptionStrategy)
.setRetryableCodes(readRowsSettings.getRetryableCodes())
.setRetrySettings(readRowsSettings.getRetrySettings())
.setIdleTimeout(readRowsSettings.getIdleTimeout())
Expand Down Expand Up @@ -1344,7 +1360,7 @@ private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnar
return createUnaryCallableNew(
methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
} else {
return createUnaryCallableNew(
return createUnaryCallableOld(
methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
}
}
Expand Down

0 comments on commit 35272b6

Please sign in to comment.