diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 3809fc704f..4d26c3329d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -42,6 +42,7 @@ import com.google.api.gax.retrying.RetryingExecutorWithContext; import com.google.api.gax.retrying.ScheduledRetryingExecutor; import com.google.api.gax.retrying.SimpleStreamResumptionStrategy; +import com.google.api.gax.retrying.StreamResumptionStrategy; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.Callables; import com.google.api.gax.rpc.ClientContext; @@ -562,16 +563,16 @@ public ServerStreamingCallable createReadRowsCallable( * */ public UnaryCallable createReadRowCallable(RowAdapter rowAdapter) { - ServerStreamingCallable readRowsCallable = - createReadRowsBaseCallable( - ServerStreamingCallSettings.newBuilder() - .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) - .setRetrySettings(settings.readRowSettings().getRetrySettings()) - .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) - .build(), - rowAdapter); - if (!EnhancedBigtableStubSettings.SKIP_TRAILERS) { + ServerStreamingCallable readRowsCallable = + createReadRowsBaseCallable( + ServerStreamingCallSettings.newBuilder() + .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) + .setRetrySettings(settings.readRowSettings().getRetrySettings()) + .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) + .build(), + rowAdapter); + ReadRowsUserCallable readRowCallable = new ReadRowsUserCallable<>(readRowsCallable, requestContext); ReadRowsFirstCallable firstRow = new ReadRowsFirstCallable<>(readRowCallable); @@ -580,6 +581,15 @@ public UnaryCallable createReadRowCallable(RowAdapter firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow")); return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } else { + ServerStreamingCallable readRowsCallable = + createReadRowsBaseCallable( + ServerStreamingCallSettings.newBuilder() + .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) + .setRetrySettings(settings.readRowSettings().getRetrySettings()) + .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) + .build(), + rowAdapter, + new SimpleStreamResumptionStrategy<>()); ServerStreamingCallable readRowCallable = new TransformingServerStreamingCallable<>( readRowsCallable, @@ -595,6 +605,11 @@ public UnaryCallable createReadRowCallable(RowAdapter } } + private ServerStreamingCallable createReadRowsBaseCallable( + ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { + return createReadRowsBaseCallable( + readRowsSettings, rowAdapter, new ReadRowsResumptionStrategy(rowAdapter)); + } /** * Creates a callable chain to handle ReadRows RPCs. The chain will: * @@ -611,8 +626,9 @@ public UnaryCallable createReadRowCallable(RowAdapter *

NOTE: the caller is responsible for adding tracing & metrics. */ private ServerStreamingCallable createReadRowsBaseCallable( - ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { - + ServerStreamingCallSettings readRowsSettings, + RowAdapter rowAdapter, + StreamResumptionStrategy resumptionStrategy) { ServerStreamingCallable base = GrpcRawCallableFactory.createServerStreamingCallable( GrpcCallSettings.newBuilder() @@ -653,7 +669,7 @@ public Map extract(ReadRowsRequest readRowsRequest) { // ReadRowsRequest -> ReadRowsResponse callable). ServerStreamingCallSettings innerSettings = ServerStreamingCallSettings.newBuilder() - .setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter)) + .setResumptionStrategy(resumptionStrategy) .setRetryableCodes(readRowsSettings.getRetryableCodes()) .setRetrySettings(readRowsSettings.getRetrySettings()) .setIdleTimeout(readRowsSettings.getIdleTimeout()) @@ -1344,7 +1360,7 @@ private UnaryCallable createUnar return createUnaryCallableNew( methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer); } else { - return createUnaryCallableNew( + return createUnaryCallableOld( methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer); } }