* NOTE: the caller is responsible for adding tracing & metrics.
*/
private ServerStreamingCallable createReadRowsBaseCallable(
- ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) {
-
+ ServerStreamingCallSettings readRowsSettings,
+ RowAdapter rowAdapter,
+ StreamResumptionStrategy resumptionStrategy) {
ServerStreamingCallable base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.newBuilder()
.setMethodDescriptor(BigtableGrpc.getReadRowsMethod())
.setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(ReadRowsRequest readRowsRequest) {
- String tableName = readRowsRequest.getTableName();
- String authorizedViewName = readRowsRequest.getAuthorizedViewName();
- if (tableName.isEmpty()) {
- tableName =
- NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
- }
- return ImmutableMap.of(
- "table_name",
- tableName,
- "app_profile_id",
- readRowsRequest.getAppProfileId());
- }
- })
+ r ->
+ composeRequestParams(
+ r.getAppProfileId(), r.getTableName(), r.getAuthorizedViewName()))
.build(),
readRowsSettings.getRetryableCodes());
@@ -636,7 +672,7 @@ public Map extract(ReadRowsRequest readRowsRequest) {
// ReadRowsRequest -> ReadRowsResponse callable).
ServerStreamingCallSettings innerSettings =
ServerStreamingCallSettings.newBuilder()
- .setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
+ .setResumptionStrategy(resumptionStrategy)
.setRetryableCodes(readRowsSettings.getRetryableCodes())
.setRetrySettings(readRowsSettings.getRetrySettings())
.setIdleTimeout(readRowsSettings.getIdleTimeout())
@@ -694,15 +730,49 @@ private UnaryCallable> createBulkReadRowsCallable(
UnaryCallable> traced =
new TracedUnaryCallable<>(tracedBatcher, clientContext.getTracerFactory(), span);
- return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
+ return traced.withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withOption(
+ BigtableTracer.OPERATION_TIMEOUT_KEY,
+ settings.bulkReadRowsSettings().getRetrySettings().getTotalTimeout()));
}
/**
- * Helper function that should only be used by createSampleRowKeysCallable() and
- * createSampleRowKeysWithRequestCallable().
+ * Simple wrapper around {@link #createSampleRowKeysCallableWithRequest()} to provide backwards
+ * compatibility
+ *
+ * @deprecated
+ */
+ @Deprecated
+ private UnaryCallable> createSampleRowKeysCallable() {
+ UnaryCallable> baseCallable =
+ createSampleRowKeysCallableWithRequest();
+ return new UnaryCallable>() {
+ @Override
+ public ApiFuture> futureCall(String s, ApiCallContext apiCallContext) {
+ return baseCallable.futureCall(SampleRowKeysRequest.create(TableId.of(s)), apiCallContext);
+ }
+ };
+ }
+
+ /**
+ * Creates a callable chain to handle SampleRowKeys RPcs. The chain will:
+ *
+ *
+ * - Convert a {@link SampleRowKeysRequest} to a {@link
+ * com.google.bigtable.v2.SampleRowKeysRequest}.
+ *
- Dispatch the request to the GAPIC's {@link BigtableStub#sampleRowKeysCallable()}.
+ *
- Spool responses into a list.
+ *
- Retry on failure.
+ *
- Convert the responses into {@link KeyOffset}s.
+ *
- Add tracing & metrics.
+ *
*/
- private UnaryCallable>
- createSampleRowKeysBaseCallable() {
+ private UnaryCallable>
+ createSampleRowKeysCallableWithRequest() {
+ String methodName = "SampleRowKeys";
+
ServerStreamingCallable
base =
GrpcRawCallableFactory.createServerStreamingCallable(
@@ -711,25 +781,9 @@ private UnaryCallable> createBulkReadRowsCallable(
newBuilder()
.setMethodDescriptor(BigtableGrpc.getSampleRowKeysMethod())
.setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(
- com.google.bigtable.v2.SampleRowKeysRequest sampleRowKeysRequest) {
- String tableName = sampleRowKeysRequest.getTableName();
- String authorizedViewName =
- sampleRowKeysRequest.getAuthorizedViewName();
- if (tableName.isEmpty()) {
- tableName =
- NameUtil.extractTableNameFromAuthorizedViewName(
- authorizedViewName);
- }
- return ImmutableMap.of(
- "table_name",
- tableName,
- "app_profile_id",
- sampleRowKeysRequest.getAppProfileId());
- }
- })
+ r ->
+ composeRequestParams(
+ r.getAppProfileId(), r.getTableName(), r.getAuthorizedViewName()))
.build(),
settings.sampleRowKeysSettings().getRetryableCodes());
@@ -745,51 +799,15 @@ public Map extract(
UnaryCallable>
retryable = withRetries(withBigtableTracer, settings.sampleRowKeysSettings());
- return retryable;
- }
-
- /**
- * Creates a callable chain to handle SampleRowKeys RPcs. The chain will:
- *
- *
- * - Convert a table id to a {@link com.google.bigtable.v2.SampleRowKeysRequest}.
- *
- Dispatch the request to the GAPIC's {@link BigtableStub#sampleRowKeysCallable()}.
- *
- Spool responses into a list.
- *
- Retry on failure.
- *
- Convert the responses into {@link KeyOffset}s.
- *
- Add tracing & metrics.
- *
- */
- private UnaryCallable> createSampleRowKeysCallable() {
- String methodName = "SampleRowKeys";
-
- UnaryCallable>
- baseCallable = createSampleRowKeysBaseCallable();
return createUserFacingUnaryCallable(
- methodName, new SampleRowKeysCallable(baseCallable, requestContext));
- }
-
- /**
- * Creates a callable chain to handle SampleRowKeys RPcs. The chain will:
- *
- *
- * - Convert a {@link SampleRowKeysRequest} to a {@link
- * com.google.bigtable.v2.SampleRowKeysRequest}.
- *
- Dispatch the request to the GAPIC's {@link BigtableStub#sampleRowKeysCallable()}.
- *
- Spool responses into a list.
- *
- Retry on failure.
- *
- Convert the responses into {@link KeyOffset}s.
- *
- Add tracing & metrics.
- *
- */
- private UnaryCallable>
- createSampleRowKeysCallableWithRequest() {
- String methodName = "SampleRowKeys";
-
- UnaryCallable>
- baseCallable = createSampleRowKeysBaseCallable();
- return createUserFacingUnaryCallable(
- methodName, new SampleRowKeysCallableWithRequest(baseCallable, requestContext));
+ methodName,
+ new SampleRowKeysCallableWithRequest(retryable, requestContext)
+ .withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withOption(
+ BigtableTracer.OPERATION_TIMEOUT_KEY,
+ settings.sampleRowKeysSettings().getRetrySettings().getTotalTimeout())));
}
/**
@@ -801,42 +819,14 @@ private UnaryCallable> createSampleRowKeysCallable() {
*
*/
private UnaryCallable createMutateRowCallable() {
- String methodName = "MutateRow";
- UnaryCallable base =
- GrpcRawCallableFactory.createUnaryCallable(
- GrpcCallSettings.newBuilder()
- .setMethodDescriptor(BigtableGrpc.getMutateRowMethod())
- .setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(MutateRowRequest mutateRowRequest) {
- String tableName = mutateRowRequest.getTableName();
- String authorizedViewName = mutateRowRequest.getAuthorizedViewName();
- if (tableName.isEmpty()) {
- tableName =
- NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
- }
- return ImmutableMap.of(
- "table_name",
- tableName,
- "app_profile_id",
- mutateRowRequest.getAppProfileId());
- }
- })
- .build(),
- settings.mutateRowSettings().getRetryableCodes());
-
- UnaryCallable withStatsHeaders =
- new StatsHeadersUnaryCallable<>(base);
-
- UnaryCallable withBigtableTracer =
- new BigtableTracerUnaryCallable<>(withStatsHeaders);
-
- UnaryCallable retrying =
- withRetries(withBigtableTracer, settings.mutateRowSettings());
-
- return createUserFacingUnaryCallable(
- methodName, new MutateRowCallable(retrying, requestContext));
+ return createUnaryCallable(
+ BigtableGrpc.getMutateRowMethod(),
+ req ->
+ composeRequestParams(
+ req.getAppProfileId(), req.getTableName(), req.getAuthorizedViewName()),
+ settings.mutateRowSettings(),
+ req -> req.toProto(requestContext),
+ resp -> null);
}
/**
@@ -863,22 +853,9 @@ private UnaryCallable createMutateRowsBas
GrpcCallSettings.newBuilder()
.setMethodDescriptor(BigtableGrpc.getMutateRowsMethod())
.setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(MutateRowsRequest mutateRowsRequest) {
- String tableName = mutateRowsRequest.getTableName();
- String authorizedViewName = mutateRowsRequest.getAuthorizedViewName();
- if (tableName.isEmpty()) {
- tableName =
- NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
- }
- return ImmutableMap.of(
- "table_name",
- tableName,
- "app_profile_id",
- mutateRowsRequest.getAppProfileId());
- }
- })
+ r ->
+ composeRequestParams(
+ r.getAppProfileId(), r.getTableName(), r.getAuthorizedViewName()))
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());
@@ -953,7 +930,12 @@ public Map extract(MutateRowsRequest mutateRowsRequest) {
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);
- return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
+ return traced.withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withOption(
+ BigtableTracer.OPERATION_TIMEOUT_KEY,
+ settings.bulkMutateRowsSettings().getRetrySettings().getTotalTimeout()));
}
/**
@@ -1056,44 +1038,14 @@ public Batcher newBulkReadRowsBatcher(
*
*/
private UnaryCallable createCheckAndMutateRowCallable() {
- String methodName = "CheckAndMutateRow";
- UnaryCallable base =
- GrpcRawCallableFactory.createUnaryCallable(
- GrpcCallSettings.newBuilder()
- .setMethodDescriptor(BigtableGrpc.getCheckAndMutateRowMethod())
- .setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(
- CheckAndMutateRowRequest checkAndMutateRowRequest) {
- String tableName = checkAndMutateRowRequest.getTableName();
- String authorizedViewName =
- checkAndMutateRowRequest.getAuthorizedViewName();
- if (tableName.isEmpty()) {
- tableName =
- NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
- }
- return ImmutableMap.of(
- "table_name",
- tableName,
- "app_profile_id",
- checkAndMutateRowRequest.getAppProfileId());
- }
- })
- .build(),
- settings.checkAndMutateRowSettings().getRetryableCodes());
-
- UnaryCallable withStatsHeaders =
- new StatsHeadersUnaryCallable<>(base);
-
- UnaryCallable withBigtableTracer =
- new BigtableTracerUnaryCallable<>(withStatsHeaders);
-
- UnaryCallable retrying =
- withRetries(withBigtableTracer, settings.checkAndMutateRowSettings());
-
- return createUserFacingUnaryCallable(
- methodName, new CheckAndMutateRowCallable(retrying, requestContext));
+ return createUnaryCallable(
+ BigtableGrpc.getCheckAndMutateRowMethod(),
+ req ->
+ composeRequestParams(
+ req.getAppProfileId(), req.getTableName(), req.getAuthorizedViewName()),
+ settings.checkAndMutateRowSettings(),
+ req -> req.toProto(requestContext),
+ CheckAndMutateRowResponse::getPredicateMatched);
}
/**
@@ -1107,39 +1059,16 @@ public Map extract(
*
*/
private UnaryCallable createReadModifyWriteRowCallable() {
- UnaryCallable base =
- GrpcRawCallableFactory.createUnaryCallable(
- GrpcCallSettings.newBuilder()
- .setMethodDescriptor(BigtableGrpc.getReadModifyWriteRowMethod())
- .setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(ReadModifyWriteRowRequest request) {
- String tableName = request.getTableName();
- String authorizedViewName = request.getAuthorizedViewName();
- if (tableName.isEmpty()) {
- tableName =
- NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
- }
- return ImmutableMap.of(
- "table_name", tableName, "app_profile_id", request.getAppProfileId());
- }
- })
- .build(),
- settings.readModifyWriteRowSettings().getRetryableCodes());
-
- UnaryCallable withStatsHeaders =
- new StatsHeadersUnaryCallable<>(base);
-
- String methodName = "ReadModifyWriteRow";
- UnaryCallable withBigtableTracer =
- new BigtableTracerUnaryCallable<>(withStatsHeaders);
-
- UnaryCallable retrying =
- withRetries(withBigtableTracer, settings.readModifyWriteRowSettings());
-
- return createUserFacingUnaryCallable(
- methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
+ DefaultRowAdapter rowAdapter = new DefaultRowAdapter();
+
+ return createUnaryCallable(
+ BigtableGrpc.getReadModifyWriteRowMethod(),
+ req ->
+ composeRequestParams(
+ req.getAppProfileId(), req.getTableName(), req.getAuthorizedViewName()),
+ settings.readModifyWriteRowSettings(),
+ req -> req.toProto(requestContext),
+ resp -> rowAdapter.createRowFromProto(resp.getRow()));
}
/**
@@ -1168,18 +1097,7 @@ public Map extract(ReadModifyWriteRowRequest request) {
.setMethodDescriptor(
BigtableGrpc.getGenerateInitialChangeStreamPartitionsMethod())
.setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(
- GenerateInitialChangeStreamPartitionsRequest
- generateInitialChangeStreamPartitionsRequest) {
- return ImmutableMap.of(
- "table_name",
- generateInitialChangeStreamPartitionsRequest.getTableName(),
- "app_profile_id",
- generateInitialChangeStreamPartitionsRequest.getAppProfileId());
- }
- })
+ r -> composeRequestParams(r.getAppProfileId(), r.getTableName(), ""))
.build(),
settings.generateInitialChangeStreamPartitionsSettings().getRetryableCodes());
@@ -1222,7 +1140,15 @@ public Map extract(
ServerStreamingCallable traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);
- return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
+ return traced.withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withOption(
+ BigtableTracer.OPERATION_TIMEOUT_KEY,
+ settings
+ .generateInitialChangeStreamPartitionsSettings()
+ .getRetrySettings()
+ .getTotalTimeout()));
}
/**
@@ -1248,15 +1174,7 @@ public Map extract(
GrpcCallSettings.newBuilder()
.setMethodDescriptor(BigtableGrpc.getReadChangeStreamMethod())
.setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(
- ReadChangeStreamRequest readChangeStreamRequest) {
- return ImmutableMap.of(
- "table_name", readChangeStreamRequest.getTableName(),
- "app_profile_id", readChangeStreamRequest.getAppProfileId());
- }
- })
+ r -> composeRequestParams(r.getAppProfileId(), r.getTableName(), ""))
.build(),
settings.readChangeStreamSettings().getRetryableCodes());
@@ -1302,7 +1220,12 @@ public Map extract(
new TracedServerStreamingCallable<>(
readChangeStreamUserCallable, clientContext.getTracerFactory(), span);
- return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
+ return traced.withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withOption(
+ BigtableTracer.OPERATION_TIMEOUT_KEY,
+ settings.readChangeStreamSettings().getRetrySettings().getTotalTimeout()));
}
/**
@@ -1388,7 +1311,13 @@ public Map extract(ExecuteQueryRequest executeQueryRequest) {
new TracedServerStreamingCallable<>(retries, clientContext.getTracerFactory(), span);
return new ExecuteQueryCallable(
- traced.withDefaultCallContext(clientContext.getDefaultCallContext()), requestContext);
+ traced.withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withOption(
+ BigtableTracer.OPERATION_TIMEOUT_KEY,
+ settings.executeQuerySettings().getRetrySettings().getTotalTimeout())),
+ requestContext);
}
/**
@@ -1404,6 +1333,124 @@ private UnaryCallable createUserFacin
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
+ private Map composeRequestParams(
+ String appProfileId, String tableName, String authorizedViewName) {
+ if (tableName.isEmpty() && !authorizedViewName.isEmpty()) {
+ tableName = NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
+ }
+ return ImmutableMap.of("table_name", tableName, "app_profile_id", appProfileId);
+ }
+
+ private UnaryCallable createUnaryCallable(
+ MethodDescriptor methodDescriptor,
+ RequestParamsExtractor headerParamsFn,
+ UnaryCallSettings callSettings,
+ Function requestTransformer,
+ Function responseTranformer) {
+ if (settings.getEnableSkipTrailers()) {
+ return createUnaryCallableNew(
+ methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
+ } else {
+ return createUnaryCallableOld(
+ methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
+ }
+ }
+
+ private UnaryCallable createUnaryCallableOld(
+ MethodDescriptor methodDescriptor,
+ RequestParamsExtractor headerParamsFn,
+ UnaryCallSettings callSettings,
+ Function requestTransformer,
+ Function responseTranformer) {
+
+ UnaryCallable base =
+ GrpcRawCallableFactory.createUnaryCallable(
+ GrpcCallSettings.newBuilder()
+ .setMethodDescriptor(methodDescriptor)
+ .setParamsExtractor(headerParamsFn)
+ .build(),
+ callSettings.getRetryableCodes());
+
+ UnaryCallable withStatsHeaders = new StatsHeadersUnaryCallable<>(base);
+
+ UnaryCallable withBigtableTracer =
+ new BigtableTracerUnaryCallable<>(withStatsHeaders);
+
+ UnaryCallable retrying = withRetries(withBigtableTracer, callSettings);
+
+ UnaryCallable transformed =
+ new UnaryCallable() {
+ @Override
+ public ApiFuture futureCall(ReqT reqT, ApiCallContext apiCallContext) {
+ ApiFuture f =
+ retrying.futureCall(requestTransformer.apply(reqT), apiCallContext);
+ return ApiFutures.transform(
+ f, responseTranformer::apply, MoreExecutors.directExecutor());
+ }
+ };
+
+ UnaryCallable traced =
+ new TracedUnaryCallable<>(
+ transformed,
+ clientContext.getTracerFactory(),
+ getSpanName(methodDescriptor.getBareMethodName()));
+
+ return traced.withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withOption(
+ BigtableTracer.OPERATION_TIMEOUT_KEY,
+ callSettings.getRetrySettings().getTotalTimeout()));
+ }
+
+ private UnaryCallable createUnaryCallableNew(
+ MethodDescriptor methodDescriptor,
+ RequestParamsExtractor headerParamsFn,
+ UnaryCallSettings callSettings,
+ Function requestTransformer,
+ Function responseTranformer) {
+
+ ServerStreamingCallable base =
+ GrpcRawCallableFactory.createServerStreamingCallable(
+ GrpcCallSettings.newBuilder()
+ .setMethodDescriptor(methodDescriptor)
+ .setParamsExtractor(headerParamsFn)
+ .build(),
+ callSettings.getRetryableCodes());
+
+ base = new StatsHeadersServerStreamingCallable<>(base);
+
+ base = new BigtableTracerStreamingCallable<>(base);
+
+ base = withRetries(base, convertUnaryToServerStreamingSettings(callSettings));
+
+ ServerStreamingCallable transformed =
+ new TransformingServerStreamingCallable<>(base, requestTransformer, responseTranformer);
+
+ return new BigtableUnaryOperationCallable<>(
+ transformed,
+ clientContext
+ .getDefaultCallContext()
+ .withOption(
+ BigtableTracer.OPERATION_TIMEOUT_KEY,
+ callSettings.getRetrySettings().getTotalTimeout()),
+ clientContext.getTracerFactory(),
+ getSpanName(methodDescriptor.getBareMethodName()),
+ /* allowNoResponse= */ false);
+ }
+
+ private static
+ ServerStreamingCallSettings convertUnaryToServerStreamingSettings(
+ UnaryCallSettings, ?> unarySettings) {
+ return ServerStreamingCallSettings.newBuilder()
+ .setResumptionStrategy(new SimpleStreamResumptionStrategy<>())
+ .setRetryableCodes(unarySettings.getRetryableCodes())
+ .setRetrySettings(unarySettings.getRetrySettings())
+ .setIdleTimeoutDuration(Duration.ZERO)
+ .setWaitTimeoutDuration(Duration.ZERO)
+ .build();
+ }
+
private UnaryCallable createPingAndWarmCallable() {
UnaryCallable pingAndWarm =
GrpcRawCallableFactory.createUnaryCallable(
@@ -1420,7 +1467,12 @@ public Map extract(PingAndWarmRequest request) {
})
.build(),
Collections.emptySet());
- return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
+ return pingAndWarm.withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withOption(
+ BigtableTracer.OPERATION_TIMEOUT_KEY,
+ settings.pingAndWarmSettings().getRetrySettings().getTotalTimeout()));
}
private UnaryCallable withRetries(
@@ -1470,6 +1522,8 @@ public UnaryCallable readRowCallable() {
return readRowCallable;
}
+ /** Deprecated, please use {@link #sampleRowKeysCallableWithRequest} */
+ @Deprecated
public UnaryCallable> sampleRowKeysCallable() {
return sampleRowKeysCallable;
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
index 2a3d0ddba4..1425e7b362 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
@@ -62,6 +62,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
@@ -105,7 +106,14 @@ public class EnhancedBigtableStubSettings extends StubSettings IDEMPOTENT_RETRY_CODES =
ImmutableSet.of(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE);
@@ -232,6 +240,7 @@ public class EnhancedBigtableStubSettings extends StubSettings jwtAudienceMapping;
private final boolean enableRoutingCookie;
private final boolean enableRetryInfo;
+ private final boolean enableSkipTrailers;
private final ServerStreamingCallSettings readRowsSettings;
private final UnaryCallSettings readRowSettings;
@@ -279,6 +288,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
jwtAudienceMapping = builder.jwtAudienceMapping;
enableRoutingCookie = builder.enableRoutingCookie;
enableRetryInfo = builder.enableRetryInfo;
+ enableSkipTrailers = builder.enableSkipTrailers;
metricsProvider = builder.metricsProvider;
metricsEndpoint = builder.metricsEndpoint;
@@ -365,6 +375,10 @@ public boolean getEnableRetryInfo() {
return enableRetryInfo;
}
+ boolean getEnableSkipTrailers() {
+ return enableSkipTrailers;
+ }
+
/**
* Gets the Google Cloud Monitoring endpoint for publishing client side metrics. If it's null,
* client will publish metrics to the default monitoring endpoint.
@@ -376,10 +390,9 @@ public String getMetricsEndpoint() {
/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
- Boolean isDirectpathEnabled = Boolean.parseBoolean(System.getenv(CBT_ENABLE_DIRECTPATH));
InstantiatingGrpcChannelProvider.Builder grpcTransportProviderBuilder =
BigtableStubSettings.defaultGrpcTransportProviderBuilder();
- if (isDirectpathEnabled) {
+ if (DIRECT_PATH_ENABLED) {
// Attempts direct access to CBT service over gRPC to improve throughput,
// whether the attempt is allowed is totally controlled by service owner.
grpcTransportProviderBuilder
@@ -676,6 +689,7 @@ public static class Builder extends StubSettings.Builder jwtAudienceMapping;
private boolean enableRoutingCookie;
private boolean enableRetryInfo;
+ private boolean enableSkipTrailers;
private final ServerStreamingCallSettings.Builder readRowsSettings;
private final UnaryCallSettings.Builder readRowSettings;
@@ -714,6 +728,7 @@ private Builder() {
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
this.enableRoutingCookie = true;
this.enableRetryInfo = true;
+ this.enableSkipTrailers = SKIP_TRAILERS;
metricsProvider = DefaultMetricsProvider.INSTANCE;
// Defaults provider
@@ -830,7 +845,11 @@ private Builder() {
.setWaitTimeout(Duration.ofMinutes(5));
featureFlags =
- FeatureFlags.newBuilder().setReverseScans(true).setLastScannedRowResponses(true);
+ FeatureFlags.newBuilder()
+ .setReverseScans(true)
+ .setLastScannedRowResponses(true)
+ .setDirectAccessRequested(DIRECT_PATH_ENABLED)
+ .setTrafficDirectorEnabled(DIRECT_PATH_ENABLED);
}
private Builder(EnhancedBigtableStubSettings settings) {
@@ -1074,6 +1093,11 @@ public boolean getEnableRetryInfo() {
return enableRetryInfo;
}
+ Builder setEnableSkipTrailers(boolean enabled) {
+ this.enableSkipTrailers = enabled;
+ return this;
+ }
+
/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder readRowsSettings() {
return readRowsSettings;
@@ -1201,6 +1225,7 @@ public String toString() {
.add("jwtAudienceMapping", jwtAudienceMapping)
.add("enableRoutingCookie", enableRoutingCookie)
.add("enableRetryInfo", enableRetryInfo)
+ .add("enableSkipTrailers", enableSkipTrailers)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MutateRowCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MutateRowCallable.java
deleted file mode 100644
index 36f47c2d1f..0000000000
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MutateRowCallable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2018 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub;
-
-import com.google.api.core.ApiFunction;
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutures;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.bigtable.v2.MutateRowRequest;
-import com.google.bigtable.v2.MutateRowResponse;
-import com.google.cloud.bigtable.data.v2.internal.RequestContext;
-import com.google.cloud.bigtable.data.v2.models.RowMutation;
-import com.google.common.util.concurrent.MoreExecutors;
-
-/** Simple wrapper for MutateRow to wrap the request and response protobufs. */
-class MutateRowCallable extends UnaryCallable {
- private final UnaryCallable inner;
- private final RequestContext requestContext;
-
- MutateRowCallable(
- UnaryCallable inner, RequestContext requestContext) {
-
- this.inner = inner;
- this.requestContext = requestContext;
- }
-
- @Override
- public ApiFuture futureCall(RowMutation request, ApiCallContext context) {
- ApiFuture rawResponse =
- inner.futureCall(request.toProto(requestContext), context);
-
- return ApiFutures.transform(
- rawResponse,
- new ApiFunction() {
- @Override
- public Void apply(MutateRowResponse mutateRowResponse) {
- return null;
- }
- },
- MoreExecutors.directExecutor());
- }
-}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ReadModifyWriteRowCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ReadModifyWriteRowCallable.java
deleted file mode 100644
index 09e133678e..0000000000
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ReadModifyWriteRowCallable.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2018 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub;
-
-import com.google.api.core.ApiFunction;
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutures;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.bigtable.v2.ReadModifyWriteRowRequest;
-import com.google.bigtable.v2.ReadModifyWriteRowResponse;
-import com.google.cloud.bigtable.data.v2.internal.RequestContext;
-import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
-import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
-import com.google.cloud.bigtable.data.v2.models.Row;
-import com.google.common.util.concurrent.MoreExecutors;
-
-/** Simple wrapper for ReadModifyWriteRow to wrap the request and response protobufs. */
-class ReadModifyWriteRowCallable extends UnaryCallable {
- private final UnaryCallable inner;
- private final RequestContext requestContext;
- private final DefaultRowAdapter rowAdapter;
-
- ReadModifyWriteRowCallable(
- UnaryCallable inner,
- RequestContext requestContext) {
- this.inner = inner;
- this.requestContext = requestContext;
- this.rowAdapter = new DefaultRowAdapter();
- }
-
- @Override
- public ApiFuture futureCall(ReadModifyWriteRow request, ApiCallContext context) {
- ApiFuture rawResponse =
- inner.futureCall(request.toProto(requestContext), context);
-
- return ApiFutures.transform(
- rawResponse,
- new ApiFunction() {
- @Override
- public Row apply(ReadModifyWriteRowResponse readModifyWriteRowResponse) {
- return convertResponse(readModifyWriteRowResponse);
- }
- },
- MoreExecutors.directExecutor());
- }
-
- private Row convertResponse(ReadModifyWriteRowResponse response) {
- return rowAdapter.createRowFromProto(response.getRow());
- }
-}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java
index 7c65bdf95a..0133dd3c2b 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java
@@ -83,7 +83,7 @@ public final void onResponse(ResponseT response) {
@Override
public final void onError(Throwable throwable) {
if (!isClosed.compareAndSet(false, true)) {
- logException("Received error after the stream is closed");
+ logException("Received error after the stream is closed", throwable);
return;
}
@@ -113,6 +113,10 @@ private void logException(String message) {
LOGGER.log(Level.WARNING, message, new IllegalStateException(message));
}
+ private void logException(String message, Throwable cause) {
+ LOGGER.log(Level.WARNING, message, new IllegalStateException(message, cause));
+ }
+
protected abstract void onStartImpl(StreamController streamController);
protected abstract void onResponseImpl(ResponseT response);
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SampleRowKeysCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SampleRowKeysCallable.java
deleted file mode 100644
index 7658e41492..0000000000
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SampleRowKeysCallable.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright 2018 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub;
-
-import com.google.api.core.ApiFunction;
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutures;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.bigtable.v2.SampleRowKeysRequest;
-import com.google.bigtable.v2.SampleRowKeysResponse;
-import com.google.cloud.bigtable.data.v2.internal.NameUtil;
-import com.google.cloud.bigtable.data.v2.internal.RequestContext;
-import com.google.cloud.bigtable.data.v2.models.KeyOffset;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.List;
-
-/** Simple wrapper for SampleRowKeys to wrap the request and response protobufs. */
-class SampleRowKeysCallable extends UnaryCallable> {
- private final RequestContext requestContext;
- private final UnaryCallable> inner;
-
- SampleRowKeysCallable(
- UnaryCallable> inner,
- RequestContext requestContext) {
-
- this.requestContext = requestContext;
- this.inner = inner;
- }
-
- @Override
- public ApiFuture> futureCall(String tableId, ApiCallContext context) {
- String tableName =
- NameUtil.formatTableName(
- requestContext.getProjectId(), requestContext.getInstanceId(), tableId);
-
- SampleRowKeysRequest request =
- SampleRowKeysRequest.newBuilder()
- .setTableName(tableName)
- .setAppProfileId(requestContext.getAppProfileId())
- .build();
-
- ApiFuture> rawResponse = inner.futureCall(request, context);
-
- return ApiFutures.transform(
- rawResponse,
- new ApiFunction, List>() {
- @Override
- public List apply(List rawResponse) {
- return convert(rawResponse);
- }
- },
- MoreExecutors.directExecutor());
- }
-
- private static List convert(List rawResponse) {
- ImmutableList.Builder results = ImmutableList.builder();
-
- for (SampleRowKeysResponse element : rawResponse) {
- results.add(KeyOffset.create(element.getRowKey(), element.getOffsetBytes()));
- }
-
- return results.build();
- }
-}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallable.java
new file mode 100644
index 0000000000..29b104965e
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallable.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub;
+
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.StreamController;
+import java.util.function.Function;
+
+/** Callable to help crossing api boundary lines between models and protos */
+class TransformingServerStreamingCallable
+ extends ServerStreamingCallable {
+ private final ServerStreamingCallable inner;
+ private final Function requestTransformer;
+ private final Function responseTransformer;
+
+ public TransformingServerStreamingCallable(
+ ServerStreamingCallable inner,
+ Function requestTransformer,
+ Function responseTransformer) {
+ this.inner = inner;
+ this.requestTransformer = requestTransformer;
+ this.responseTransformer = responseTransformer;
+ }
+
+ @Override
+ public void call(
+ OuterReqT outerReqT,
+ ResponseObserver outerObserver,
+ ApiCallContext apiCallContext) {
+ InnerReqT innerReq = requestTransformer.apply(outerReqT);
+
+ inner.call(
+ innerReq,
+ new SafeResponseObserver(outerObserver) {
+ @Override
+ public void onStartImpl(StreamController streamController) {
+ outerObserver.onStart(streamController);
+ }
+
+ @Override
+ public void onResponseImpl(InnerRespT innerResp) {
+ outerObserver.onResponse(responseTransformer.apply(innerResp));
+ }
+
+ @Override
+ public void onErrorImpl(Throwable throwable) {
+ outerObserver.onError(throwable);
+ }
+
+ @Override
+ public void onCompleteImpl() {
+ outerObserver.onComplete();
+ }
+ },
+ apiCallContext);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java
index fd54313e8d..8aa53fa198 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java
@@ -23,6 +23,7 @@
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METER_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.OPERATION_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.PER_CONNECTION_ERROR_COUNT_NAME;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.REMAINING_DEADLINE_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.RETRY_COUNT_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.SERVER_LATENCIES_NAME;
@@ -115,7 +116,8 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {
CLIENT_BLOCKING_LATENCIES_NAME,
APPLICATION_BLOCKING_LATENCIES_NAME,
RETRY_COUNT_NAME,
- CONNECTIVITY_ERROR_COUNT_NAME)
+ CONNECTIVITY_ERROR_COUNT_NAME,
+ REMAINING_DEADLINE_NAME)
.stream()
.map(m -> METER_NAME + m)
.collect(ImmutableList.toImmutableList());
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
index d0e307d510..fb6a84a88d 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
@@ -16,10 +16,12 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;
import com.google.api.core.BetaApi;
+import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.BaseApiTracer;
import javax.annotation.Nullable;
+import org.threeten.bp.Duration;
/**
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
@@ -30,6 +32,10 @@ public class BigtableTracer extends BaseApiTracer {
private volatile int attempt = 0;
+ @InternalApi("for internal use only")
+ public static final ApiCallContext.Key OPERATION_TIMEOUT_KEY =
+ ApiCallContext.Key.create("OPERATION_TIMEOUT");
+
@Override
public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
@@ -52,6 +58,13 @@ public void afterResponse(long applicationLatency) {
// noop
}
+ /**
+ * Used by BigtableUnaryOperationCallable to signal that the user visible portion of the RPC is
+ * complete and that metrics should freeze the timers and then publish the frozen values when the
+ * internal portion of the operation completes.
+ */
+ public void operationFinishEarly() {}
+
/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
@@ -93,4 +106,12 @@ public void grpcChannelQueuedLatencies(long queuedTimeMs) {
public void grpcMessageSent() {
// noop
}
+
+ /**
+ * Record the operation timeout from user settings for calculating remaining deadline. This will
+ * be called in BuiltinMetricsTracer.
+ */
+ public void setOperationTimeout(Duration operationTimeout) {
+ // noop
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
index 167cd0dc2e..b977a0a2c7 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
@@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;
import com.google.api.core.InternalApi;
+import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
@@ -26,6 +27,7 @@
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import org.threeten.bp.Duration;
/**
* This callable will
@@ -62,6 +64,11 @@ public void call(
BigtableTracerResponseObserver innerObserver =
new BigtableTracerResponseObserver<>(
responseObserver, (BigtableTracer) context.getTracer(), responseMetadata);
+ GrpcCallContext callContext = (GrpcCallContext) context;
+ Duration deadline = callContext.getOption(BigtableTracer.OPERATION_TIMEOUT_KEY);
+ if (deadline != null) {
+ ((BigtableTracer) context.getTracer()).setOperationTimeout(deadline);
+ }
innerCallable.call(
request,
innerObserver,
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
index 7dfca8b753..1f000c4639 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
@@ -19,12 +19,14 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
+import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import javax.annotation.Nonnull;
+import org.threeten.bp.Duration;
/**
* This callable will:
@@ -58,6 +60,11 @@ public ApiFuture futureCall(RequestT request, ApiCallContext context)
BigtableTracerUnaryCallback callback =
new BigtableTracerUnaryCallback(
(BigtableTracer) context.getTracer(), responseMetadata);
+ GrpcCallContext callContext = (GrpcCallContext) context;
+ Duration deadline = callContext.getOption(BigtableTracer.OPERATION_TIMEOUT_KEY);
+ if (deadline != null) {
+ ((BigtableTracer) context.getTracer()).setOperationTimeout(deadline);
+ }
ApiFuture future =
innerCallable.futureCall(
request,
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java
index d85300828b..62ac0f1153 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java
@@ -58,6 +58,7 @@ public class BuiltinMetricsConstants {
static final String SERVER_LATENCIES_NAME = "server_latencies";
static final String FIRST_RESPONSE_LATENCIES_NAME = "first_response_latencies";
static final String APPLICATION_BLOCKING_LATENCIES_NAME = "application_latencies";
+ static final String REMAINING_DEADLINE_NAME = "remaining_deadline";
static final String CLIENT_BLOCKING_LATENCIES_NAME = "throttling_latencies";
static final String PER_CONNECTION_ERROR_COUNT_NAME = "per_connection_error_count";
@@ -214,6 +215,16 @@ public static Map getAllViews() {
ImmutableSet.builder()
.add(BIGTABLE_PROJECT_ID_KEY, INSTANCE_ID_KEY, APP_PROFILE_KEY, CLIENT_NAME_KEY)
.build());
+ defineView(
+ views,
+ REMAINING_DEADLINE_NAME,
+ AGGREGATION_WITH_MILLIS_HISTOGRAM,
+ InstrumentType.HISTOGRAM,
+ "ms",
+ ImmutableSet.builder()
+ .addAll(COMMON_ATTRIBUTES)
+ .add(STREAMING_KEY, STATUS_KEY)
+ .build());
return views.build();
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
index 8012edfaba..20e7eb3716 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java
@@ -37,6 +37,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
@@ -46,11 +48,14 @@
*/
class BuiltinMetricsTracer extends BigtableTracer {
+ private static final Logger logger = Logger.getLogger(BuiltinMetricsTracer.class.getName());
+
private static final String NAME = "java-bigtable/" + Version.VERSION;
private final OperationType operationType;
private final SpanName spanName;
// Operation level metrics
+ private final AtomicBoolean operationFinishedEarly = new AtomicBoolean();
private final AtomicBoolean opFinished = new AtomicBoolean();
private final Stopwatch operationTimer = Stopwatch.createStarted();
private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted();
@@ -67,7 +72,6 @@ class BuiltinMetricsTracer extends BigtableTracer {
// Stopwatch is not thread safe so this is a workaround to check if the stopwatch changes is
// flushed to memory.
private final Stopwatch serverLatencyTimer = Stopwatch.createUnstarted();
- private boolean serverLatencyTimerIsRunning = false;
private final Object timerLock = new Object();
private boolean flowControlIsDisabled = false;
@@ -86,6 +90,9 @@ class BuiltinMetricsTracer extends BigtableTracer {
private Long serverLatencies = null;
private final AtomicLong grpcMessageSentDelay = new AtomicLong(0);
+ private Duration operationTimeout = Duration.ofMillis(0);
+ private long remainingOperationTimeout = 0;
+
// OpenCensus (and server) histogram buckets use [start, end), however OpenTelemetry uses (start,
// end]. To work around this, we measure all the latencies in nanoseconds and convert them
// to milliseconds and use DoubleHistogram. This should minimize the chance of a data
@@ -96,6 +103,7 @@ class BuiltinMetricsTracer extends BigtableTracer {
private final DoubleHistogram firstResponseLatenciesHistogram;
private final DoubleHistogram clientBlockingLatenciesHistogram;
private final DoubleHistogram applicationBlockingLatenciesHistogram;
+ private final DoubleHistogram remainingDeadlineHistogram;
private final LongCounter connectivityErrorCounter;
private final LongCounter retryCounter;
@@ -109,6 +117,7 @@ class BuiltinMetricsTracer extends BigtableTracer {
DoubleHistogram firstResponseLatenciesHistogram,
DoubleHistogram clientBlockingLatenciesHistogram,
DoubleHistogram applicationBlockingLatenciesHistogram,
+ DoubleHistogram deadlineHistogram,
LongCounter connectivityErrorCounter,
LongCounter retryCounter) {
this.operationType = operationType;
@@ -121,6 +130,7 @@ class BuiltinMetricsTracer extends BigtableTracer {
this.firstResponseLatenciesHistogram = firstResponseLatenciesHistogram;
this.clientBlockingLatenciesHistogram = clientBlockingLatenciesHistogram;
this.applicationBlockingLatenciesHistogram = applicationBlockingLatenciesHistogram;
+ this.remainingDeadlineHistogram = deadlineHistogram;
this.connectivityErrorCounter = connectivityErrorCounter;
this.retryCounter = retryCounter;
}
@@ -133,6 +143,13 @@ public void close() {}
};
}
+ @Override
+ public void operationFinishEarly() {
+ operationFinishedEarly.set(true);
+ attemptTimer.stop();
+ operationTimer.stop();
+ }
+
@Override
public void operationSucceeded() {
recordOperationCompletion(null);
@@ -163,12 +180,16 @@ public void attemptStarted(Object request, int attemptNumber) {
}
if (!flowControlIsDisabled) {
synchronized (timerLock) {
- if (!serverLatencyTimerIsRunning) {
+ if (!serverLatencyTimer.isRunning()) {
serverLatencyTimer.start();
- serverLatencyTimerIsRunning = true;
}
}
}
+ // OperationTimeout is only set after the first attempt.
+ if (attemptCount > 1) {
+ remainingOperationTimeout =
+ operationTimeout.toMillis() - operationTimer.elapsed(TimeUnit.MILLISECONDS);
+ }
}
@Override
@@ -194,13 +215,17 @@ public void attemptPermanentFailure(Throwable throwable) {
@Override
public void onRequest(int requestCount) {
requestLeft.accumulateAndGet(requestCount, IntMath::saturatedAdd);
+
+ if (operationFinishedEarly.get()) {
+ return;
+ }
+
if (flowControlIsDisabled) {
// On request is only called when auto flow control is disabled. When auto flow control is
// disabled, server latency is measured between onRequest and onResponse.
synchronized (timerLock) {
- if (!serverLatencyTimerIsRunning) {
+ if (!serverLatencyTimer.isRunning()) {
serverLatencyTimer.start();
- serverLatencyTimerIsRunning = true;
}
}
}
@@ -208,6 +233,10 @@ public void onRequest(int requestCount) {
@Override
public void responseReceived() {
+ if (operationFinishedEarly.get()) {
+ return;
+ }
+
if (firstResponsePerOpTimer.isRunning()) {
firstResponsePerOpTimer.stop();
}
@@ -219,10 +248,9 @@ public void responseReceived() {
// latency is measured between afterResponse and responseReceived.
// In all the cases, we want to stop the serverLatencyTimer here.
synchronized (timerLock) {
- if (serverLatencyTimerIsRunning) {
+ if (serverLatencyTimer.isRunning()) {
totalServerLatencyNano.addAndGet(serverLatencyTimer.elapsed(TimeUnit.NANOSECONDS));
serverLatencyTimer.reset();
- serverLatencyTimerIsRunning = false;
}
}
}
@@ -230,14 +258,16 @@ public void responseReceived() {
@Override
public void afterResponse(long applicationLatency) {
if (!flowControlIsDisabled || requestLeft.decrementAndGet() > 0) {
+ if (operationFinishedEarly.get()) {
+ return;
+ }
// When auto flow control is enabled, request will never be called, so server latency is
// measured between after the last response is processed and before the next response is
// received. If flow control is disabled but requestLeft is greater than 0,
// also start the timer to count the time between afterResponse and responseReceived.
synchronized (timerLock) {
- if (!serverLatencyTimerIsRunning) {
+ if (!serverLatencyTimer.isRunning()) {
serverLatencyTimer.start();
- serverLatencyTimerIsRunning = true;
}
}
}
@@ -271,16 +301,28 @@ public void grpcMessageSent() {
grpcMessageSentDelay.set(attemptTimer.elapsed(TimeUnit.NANOSECONDS));
}
+ /*
+ This is called by BigtableTracerCallables that sets operation timeout from user settings.
+ */
+ @Override
+ public void setOperationTimeout(Duration operationTimeout) {
+ this.operationTimeout = operationTimeout;
+ }
+
@Override
public void disableFlowControl() {
flowControlIsDisabled = true;
}
private void recordOperationCompletion(@Nullable Throwable status) {
+ if (operationFinishedEarly.get()) {
+ status = null; // force an ok
+ }
+
if (!opFinished.compareAndSet(false, true)) {
return;
}
- operationTimer.stop();
+ long operationLatencyNano = operationTimer.elapsed(TimeUnit.NANOSECONDS);
boolean isStreaming = operationType == OperationType.ServerStreaming;
String statusStr = Util.extractStatus(status);
@@ -299,8 +341,6 @@ private void recordOperationCompletion(@Nullable Throwable status) {
.put(STATUS_KEY, statusStr)
.build();
- long operationLatencyNano = operationTimer.elapsed(TimeUnit.NANOSECONDS);
-
// Only record when retry count is greater than 0 so the retry
// graph will be less confusing
if (attemptCount > 1) {
@@ -321,14 +361,16 @@ private void recordOperationCompletion(@Nullable Throwable status) {
}
private void recordAttemptCompletion(@Nullable Throwable status) {
+ if (operationFinishedEarly.get()) {
+ status = null; // force an ok
+ }
// If the attempt failed, the time spent in retry should be counted in application latency.
// Stop the stopwatch and decrement requestLeft.
synchronized (timerLock) {
- if (serverLatencyTimerIsRunning) {
+ if (serverLatencyTimer.isRunning()) {
requestLeft.decrementAndGet();
totalServerLatencyNano.addAndGet(serverLatencyTimer.elapsed(TimeUnit.NANOSECONDS));
serverLatencyTimer.reset();
- serverLatencyTimerIsRunning = false;
}
}
@@ -361,6 +403,17 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
attemptLatenciesHistogram.record(
convertToMs(attemptTimer.elapsed(TimeUnit.NANOSECONDS)), attributes);
+ if (attemptCount <= 1) {
+ remainingDeadlineHistogram.record(operationTimeout.toMillis(), attributes);
+ } else if (remainingOperationTimeout >= 0) {
+ remainingDeadlineHistogram.record(remainingOperationTimeout, attributes);
+ } else if (operationTimeout.toMillis() != 0) {
+ // If the operationTimeout is set but remaining deadline is < 0, log a warning. This should
+ // never happen.
+ logger.log(
+ Level.WARNING, "The remaining deadline was less than 0: " + remainingOperationTimeout);
+ }
+
if (serverLatencies != null) {
serverLatenciesHistogram.record(serverLatencies, attributes);
connectivityErrorCounter.add(0, attributes);
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java
index f0ac656978..18d3a3ace9 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java
@@ -22,6 +22,7 @@
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.FIRST_RESPONSE_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METER_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.OPERATION_LATENCIES_NAME;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.REMAINING_DEADLINE_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.RETRY_COUNT_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.SERVER_LATENCIES_NAME;
@@ -55,6 +56,7 @@ public class BuiltinMetricsTracerFactory extends BaseApiTracerFactory {
private final DoubleHistogram firstResponseLatenciesHistogram;
private final DoubleHistogram clientBlockingLatenciesHistogram;
private final DoubleHistogram applicationBlockingLatenciesHistogram;
+ private final DoubleHistogram remainingDeadlineHistogram;
private final LongCounter connectivityErrorCounter;
private final LongCounter retryCounter;
@@ -108,6 +110,13 @@ public static BuiltinMetricsTracerFactory create(
"The latency of the client application consuming available response data.")
.setUnit(MILLISECOND)
.build();
+ remainingDeadlineHistogram =
+ meter
+ .histogramBuilder(REMAINING_DEADLINE_NAME)
+ .setDescription(
+ "The remaining deadline when the request is sent to grpc. This will either be the operation timeout, or the remaining deadline from operation timeout after retries and back offs.")
+ .setUnit(MILLISECOND)
+ .build();
connectivityErrorCounter =
meter
.counterBuilder(CONNECTIVITY_ERROR_COUNT_NAME)
@@ -135,6 +144,7 @@ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType op
firstResponseLatenciesHistogram,
clientBlockingLatenciesHistogram,
applicationBlockingLatenciesHistogram,
+ remainingDeadlineHistogram,
connectivityErrorCounter,
retryCounter);
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
index d89aa90c6b..2cee944aa4 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java
@@ -62,6 +62,13 @@ public void close() {
};
}
+ @Override
+ public void operationFinishEarly() {
+ for (BigtableTracer tracer : bigtableTracers) {
+ tracer.operationFinishEarly();
+ }
+ }
+
@Override
public void operationSucceeded() {
for (ApiTracer child : children) {
@@ -232,4 +239,11 @@ public void grpcMessageSent() {
tracer.grpcMessageSent();
}
}
+
+ @Override
+ public void setOperationTimeout(Duration operationTimeout) {
+ for (BigtableTracer tracer : bigtableTracers) {
+ tracer.setOperationTimeout(operationTimeout);
+ }
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
index 0ffabe2606..a2c5bdac1f 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
@@ -84,6 +84,12 @@ public void close() {}
};
}
+ @Override
+ public void operationFinishEarly() {
+ attemptTimer.stop();
+ operationTimer.stop();
+ }
+
@Override
public void operationSucceeded() {
recordOperationCompletion(null);
@@ -103,7 +109,6 @@ private void recordOperationCompletion(@Nullable Throwable throwable) {
if (!opFinished.compareAndSet(false, true)) {
return;
}
- operationTimer.stop();
long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS);
diff --git a/google-cloud-bigtable/src/main/resources/META-INF/native-image/com.google.cloud.bigtable.admin.v2/reflect-config.json b/google-cloud-bigtable/src/main/resources/META-INF/native-image/com.google.cloud.bigtable.admin.v2/reflect-config.json
index 5b9d183faa..e725f7653b 100644
--- a/google-cloud-bigtable/src/main/resources/META-INF/native-image/com.google.cloud.bigtable.admin.v2/reflect-config.json
+++ b/google-cloud-bigtable/src/main/resources/META-INF/native-image/com.google.cloud.bigtable.admin.v2/reflect-config.json
@@ -395,6 +395,24 @@
"allDeclaredClasses": true,
"allPublicClasses": true
},
+ {
+ "name": "com.google.api.SelectiveGapicGeneration",
+ "queryAllDeclaredConstructors": true,
+ "queryAllPublicConstructors": true,
+ "queryAllDeclaredMethods": true,
+ "allPublicMethods": true,
+ "allDeclaredClasses": true,
+ "allPublicClasses": true
+ },
+ {
+ "name": "com.google.api.SelectiveGapicGeneration$Builder",
+ "queryAllDeclaredConstructors": true,
+ "queryAllPublicConstructors": true,
+ "queryAllDeclaredMethods": true,
+ "allPublicMethods": true,
+ "allDeclaredClasses": true,
+ "allPublicClasses": true
+ },
{
"name": "com.google.bigtable.admin.v2.AppProfile",
"queryAllDeclaredConstructors": true,
diff --git a/google-cloud-bigtable/src/main/resources/META-INF/native-image/com.google.cloud.bigtable.data.v2/reflect-config.json b/google-cloud-bigtable/src/main/resources/META-INF/native-image/com.google.cloud.bigtable.data.v2/reflect-config.json
index 7114460ddb..4b89db83f8 100644
--- a/google-cloud-bigtable/src/main/resources/META-INF/native-image/com.google.cloud.bigtable.data.v2/reflect-config.json
+++ b/google-cloud-bigtable/src/main/resources/META-INF/native-image/com.google.cloud.bigtable.data.v2/reflect-config.json
@@ -431,6 +431,24 @@
"allDeclaredClasses": true,
"allPublicClasses": true
},
+ {
+ "name": "com.google.api.SelectiveGapicGeneration",
+ "queryAllDeclaredConstructors": true,
+ "queryAllPublicConstructors": true,
+ "queryAllDeclaredMethods": true,
+ "allPublicMethods": true,
+ "allDeclaredClasses": true,
+ "allPublicClasses": true
+ },
+ {
+ "name": "com.google.api.SelectiveGapicGeneration$Builder",
+ "queryAllDeclaredConstructors": true,
+ "queryAllPublicConstructors": true,
+ "queryAllDeclaredMethods": true,
+ "allPublicMethods": true,
+ "allDeclaredClasses": true,
+ "allPublicClasses": true
+ },
{
"name": "com.google.bigtable.v2.ArrayValue",
"queryAllDeclaredConstructors": true,
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java
new file mode 100644
index 0000000000..0b11ce3219
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.grpc.GrpcCallContext;
+import com.google.api.gax.tracing.ApiTracerFactory;
+import com.google.api.gax.tracing.SpanName;
+import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer;
+import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi;
+import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCall;
+import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCallable;
+import com.google.common.collect.ImmutableList;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@RunWith(JUnit4.class)
+public class BigtableUnaryOperationCallableTest {
+ @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Mock private ApiTracerFactory tracerFactory;
+ @Mock private BigtableTracer tracer;
+
+ @Before
+ public void setUp() throws Exception {
+ Mockito.when(tracerFactory.newTracer(Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(tracer);
+ }
+
+ @Test
+ public void testFutureResolve() throws Exception {
+ BigtableUnaryOperationCallable callable =
+ new BigtableUnaryOperationCallable<>(
+ new FakeStreamingApi.ServerStreamingStashCallable<>(ImmutableList.of("value")),
+ GrpcCallContext.createDefault(),
+ tracerFactory,
+ SpanName.of("Fake", "method"),
+ false);
+
+ ApiFuture f = callable.futureCall("fake");
+ assertThat(f.get()).isEqualTo("value");
+ }
+
+ @Test
+ public void testMultipleResponses() throws Exception {
+ MockServerStreamingCallable inner = new MockServerStreamingCallable<>();
+
+ BigtableUnaryOperationCallable callable =
+ new BigtableUnaryOperationCallable<>(
+ inner,
+ GrpcCallContext.createDefault(),
+ tracerFactory,
+ SpanName.of("Fake", "method"),
+ false);
+ callable.logger = Mockito.mock(Logger.class);
+
+ ApiFuture f = callable.futureCall("fake");
+ MockServerStreamingCall call = inner.popLastCall();
+ call.getController().getObserver().onResponse("first");
+ call.getController().getObserver().onResponse("second");
+
+ ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(String.class);
+ verify(callable.logger).log(Mockito.any(), msgCaptor.capture());
+ assertThat(msgCaptor.getValue())
+ .isEqualTo(
+ "Received response after future is resolved for a Fake.method unary operation. previous: first, New response: second");
+
+ assertThat(call.getController().isCancelled()).isTrue();
+ }
+
+ @Test
+ public void testCancel() {
+ MockServerStreamingCallable inner = new MockServerStreamingCallable<>();
+ BigtableUnaryOperationCallable callable =
+ new BigtableUnaryOperationCallable<>(
+ inner,
+ GrpcCallContext.createDefault(),
+ tracerFactory,
+ SpanName.of("Fake", "method"),
+ false);
+ ApiFuture f = callable.futureCall("req");
+ f.cancel(true);
+
+ MockServerStreamingCall call = inner.popLastCall();
+ assertThat(call.getController().isCancelled()).isTrue();
+ }
+
+ @Test
+ public void testMissingResponse() {
+ MockServerStreamingCallable inner = new MockServerStreamingCallable<>();
+ BigtableUnaryOperationCallable callable =
+ new BigtableUnaryOperationCallable<>(
+ inner,
+ GrpcCallContext.createDefault(),
+ tracerFactory,
+ SpanName.of("Fake", "method"),
+ false);
+ ApiFuture f = callable.futureCall("req");
+ MockServerStreamingCall call = inner.popLastCall();
+ call.getController().getObserver().onComplete();
+
+ Throwable cause = Assert.assertThrows(ExecutionException.class, f::get).getCause();
+ assertThat(cause)
+ .hasMessageThat()
+ .isEqualTo("Fake.method unary operation completed without a response message");
+ }
+
+ @Test
+ public void testTracing() throws Exception {
+ MockServerStreamingCallable inner = new MockServerStreamingCallable<>();
+ BigtableUnaryOperationCallable callable =
+ new BigtableUnaryOperationCallable<>(
+ inner,
+ GrpcCallContext.createDefault(),
+ tracerFactory,
+ SpanName.of("Fake", "method"),
+ false);
+ ApiFuture f = callable.futureCall("req");
+ MockServerStreamingCall call = inner.popLastCall();
+ call.getController().getObserver().onResponse("value");
+ call.getController().getObserver().onComplete();
+
+ f.get();
+ verify(tracer).responseReceived();
+ verify(tracer).operationSucceeded();
+
+ // afterResponse is the responsibility of BigtableTracerStreamingCallable
+ verify(tracer, never()).afterResponse(Mockito.anyLong());
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CheckAndMutateRowCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CheckAndMutateRowCallableTest.java
deleted file mode 100644
index 5441f1d1f8..0000000000
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CheckAndMutateRowCallableTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright 2018 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.api.core.ApiFuture;
-import com.google.api.core.SettableApiFuture;
-import com.google.api.gax.grpc.GrpcStatusCode;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.NotFoundException;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.bigtable.v2.CheckAndMutateRowRequest;
-import com.google.bigtable.v2.CheckAndMutateRowResponse;
-import com.google.bigtable.v2.Mutation.DeleteFromRow;
-import com.google.cloud.bigtable.data.v2.internal.NameUtil;
-import com.google.cloud.bigtable.data.v2.internal.RequestContext;
-import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
-import com.google.cloud.bigtable.data.v2.models.Mutation;
-import com.google.protobuf.ByteString;
-import io.grpc.Status.Code;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class CheckAndMutateRowCallableTest {
-
- private final RequestContext requestContext =
- RequestContext.create("my-project", "my-instance", "my-app-profile");
- private FakeCallable inner;
- private CheckAndMutateRowCallable callable;
-
- @Before
- public void setUp() {
- inner = new FakeCallable();
- callable = new CheckAndMutateRowCallable(inner, requestContext);
- }
-
- @Test
- public void requestIsCorrect() {
- callable.futureCall(
- ConditionalRowMutation.create("my-table", "row-key").then(Mutation.create().deleteRow()));
-
- assertThat(inner.request)
- .isEqualTo(
- CheckAndMutateRowRequest.newBuilder()
- .setTableName(
- NameUtil.formatTableName(
- requestContext.getProjectId(), requestContext.getInstanceId(), "my-table"))
- .setRowKey(ByteString.copyFromUtf8("row-key"))
- .setAppProfileId(requestContext.getAppProfileId())
- .addTrueMutations(
- com.google.bigtable.v2.Mutation.newBuilder()
- .setDeleteFromRow(DeleteFromRow.getDefaultInstance()))
- .build());
- }
-
- @Test
- public void responseCorrectlyTransformed() throws Exception {
- ApiFuture result =
- callable.futureCall(
- ConditionalRowMutation.create("my-table", "row-key")
- .then(Mutation.create().deleteRow()));
-
- inner.response.set(CheckAndMutateRowResponse.newBuilder().setPredicateMatched(true).build());
-
- assertThat(result.get(1, TimeUnit.SECONDS)).isEqualTo(true);
- }
-
- @Test
- public void errorIsPropagated() throws Exception {
- ApiFuture result =
- callable.futureCall(
- ConditionalRowMutation.create("my-table", "row-key")
- .then(Mutation.create().deleteRow()));
-
- Throwable expectedError =
- new NotFoundException("fake error", null, GrpcStatusCode.of(Code.NOT_FOUND), false);
- inner.response.setException(expectedError);
-
- Throwable actualError = null;
- try {
- result.get(1, TimeUnit.SECONDS);
- } catch (ExecutionException e) {
- actualError = e.getCause();
- }
-
- assertThat(actualError).isEqualTo(expectedError);
- }
-
- static class FakeCallable
- extends UnaryCallable {
- CheckAndMutateRowRequest request;
- ApiCallContext callContext;
- SettableApiFuture response = SettableApiFuture.create();
-
- @Override
- public ApiFuture futureCall(
- CheckAndMutateRowRequest request, ApiCallContext context) {
- this.request = request;
- this.callContext = context;
-
- return response;
- }
- }
-}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java
index 5280abe1fd..fdc6b5717e 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java
@@ -961,6 +961,7 @@ public void enableRetryInfoFalseValueTest() throws IOException {
"jwtAudienceMapping",
"enableRoutingCookie",
"enableRetryInfo",
+ "enableSkipTrailers",
"readRowsSettings",
"readRowSettings",
"sampleRowKeysSettings",
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
index 50d086b711..495250fe13 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
@@ -37,6 +37,7 @@
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FailedPreconditionException;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.InstantiatingWatchdogProvider;
import com.google.api.gax.rpc.ServerStream;
@@ -44,15 +45,21 @@
import com.google.api.gax.rpc.WatchdogTimeoutException;
import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
import com.google.bigtable.v2.BigtableGrpc;
+import com.google.bigtable.v2.CheckAndMutateRowRequest;
+import com.google.bigtable.v2.CheckAndMutateRowResponse;
import com.google.bigtable.v2.ExecuteQueryRequest;
import com.google.bigtable.v2.ExecuteQueryResponse;
import com.google.bigtable.v2.FeatureFlags;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
+import com.google.bigtable.v2.ReadModifyWriteRowRequest;
+import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.RowSet;
@@ -62,7 +69,19 @@
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.internal.SqlRow;
-import com.google.cloud.bigtable.data.v2.models.*;
+import com.google.cloud.bigtable.data.v2.models.BulkMutation;
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
+import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
+import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
+import com.google.cloud.bigtable.data.v2.models.Filters;
+import com.google.cloud.bigtable.data.v2.models.Mutation;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
+import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
+import com.google.cloud.bigtable.data.v2.models.TableId;
import com.google.cloud.bigtable.data.v2.models.sql.ResultSetMetadata;
import com.google.cloud.bigtable.data.v2.models.sql.Statement;
import com.google.cloud.bigtable.data.v2.stub.sql.ExecuteQueryCallable;
@@ -75,6 +94,7 @@
import com.google.protobuf.StringValue;
import com.google.rpc.Code;
import com.google.rpc.Status;
+import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
@@ -105,6 +125,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -118,8 +139,9 @@ public class EnhancedBigtableStubTest {
private static final String PROJECT_ID = "fake-project";
private static final String INSTANCE_ID = "fake-instance";
+ private static final String TABLE_ID = "fake-table";
private static final String TABLE_NAME =
- NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, "fake-table");
+ NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID);
private static final String APP_PROFILE_ID = "app-profile-id";
private static final String WAIT_TIME_TABLE_ID = "test-wait-timeout";
private static final String WAIT_TIME_QUERY = "test-wait-timeout";
@@ -269,6 +291,101 @@ public void testFeatureFlags() throws InterruptedException, IOException, Executi
assertThat(featureFlags.getLastScannedRowResponses()).isTrue();
}
+ @Test
+ public void testCheckAndMutateRequestResponseConversion()
+ throws ExecutionException, InterruptedException {
+ ConditionalRowMutation req =
+ ConditionalRowMutation.create(TableId.of("my-table"), "my-key")
+ .condition(Filters.FILTERS.pass())
+ .then(Mutation.create().deleteRow());
+
+ ApiFuture f = enhancedBigtableStub.checkAndMutateRowCallable().futureCall(req, null);
+ f.get();
+
+ CheckAndMutateRowRequest protoReq =
+ fakeDataService.checkAndMutateRowRequests.poll(1, TimeUnit.SECONDS);
+ assertThat(protoReq)
+ .isEqualTo(req.toProto(RequestContext.create(PROJECT_ID, INSTANCE_ID, APP_PROFILE_ID)));
+ assertThat(f.get()).isEqualTo(true);
+ }
+
+ @Test
+ public void testRMWRequestResponseConversion() throws ExecutionException, InterruptedException {
+ ReadModifyWriteRow req =
+ ReadModifyWriteRow.create(TableId.of("my-table"), "my-key").append("f", "q", "v");
+
+ ApiFuture f = enhancedBigtableStub.readModifyWriteRowCallable().futureCall(req, null);
+ f.get();
+
+ ReadModifyWriteRowRequest protoReq = fakeDataService.rmwRequests.poll(1, TimeUnit.SECONDS);
+ assertThat(protoReq)
+ .isEqualTo(req.toProto(RequestContext.create(PROJECT_ID, INSTANCE_ID, APP_PROFILE_ID)));
+ assertThat(f.get().getKey()).isEqualTo(ByteString.copyFromUtf8("my-key"));
+ }
+
+ @Test
+ public void testMutateRowRequestResponseConversion()
+ throws ExecutionException, InterruptedException {
+ RowMutation req = RowMutation.create(TableId.of("my-table"), "my-key").deleteRow();
+ CallOptions.Key testKey = CallOptions.Key.create("test-key");
+
+ GrpcCallContext ctx =
+ GrpcCallContext.createDefault()
+ .withCallOptions(CallOptions.DEFAULT.withOption(testKey, "callopt-value"));
+ ApiFuture f = enhancedBigtableStub.mutateRowCallable().futureCall(req, ctx);
+ f.get();
+
+ MutateRowRequest protoReq = fakeDataService.mutateRowRequests.poll(1, TimeUnit.SECONDS);
+ assertThat(protoReq)
+ .isEqualTo(req.toProto(RequestContext.create(PROJECT_ID, INSTANCE_ID, APP_PROFILE_ID)));
+ assertThat(f.get()).isEqualTo(null);
+ }
+
+ @Test
+ public void testMutateRowRequestParams() throws ExecutionException, InterruptedException {
+ RowMutation req = RowMutation.create(TableId.of(TABLE_ID), "my-key").deleteRow();
+
+ ApiFuture f = enhancedBigtableStub.mutateRowCallable().futureCall(req, null);
+ f.get();
+
+ Metadata reqMetadata = metadataInterceptor.headers.poll(1, TimeUnit.SECONDS);
+
+ // RequestParamsExtractor
+ String reqParams =
+ reqMetadata.get(Key.of("x-goog-request-params", Metadata.ASCII_STRING_MARSHALLER));
+ assertThat(reqParams).contains("table_name=" + TABLE_NAME.replace("/", "%2F"));
+ assertThat(reqParams).contains(String.format("app_profile_id=%s", APP_PROFILE_ID));
+
+ // StatsHeadersUnaryCallable
+ assertThat(reqMetadata.keys()).contains("bigtable-client-attempt-epoch-usec");
+
+ assertThat(f.get()).isEqualTo(null);
+ }
+
+ @Test
+ public void testMutateRowErrorPropagation() {
+ AtomicInteger invocationCount = new AtomicInteger();
+ Mockito.doAnswer(
+ invocationOnMock -> {
+ StreamObserver observer = invocationOnMock.getArgument(1);
+ if (invocationCount.getAndIncrement() == 0) {
+ observer.onError(io.grpc.Status.UNAVAILABLE.asRuntimeException());
+ } else {
+ observer.onError(io.grpc.Status.FAILED_PRECONDITION.asRuntimeException());
+ }
+ return null;
+ })
+ .when(fakeDataService)
+ .mutateRow(Mockito.any(), Mockito.any(StreamObserver.class));
+
+ RowMutation req = RowMutation.create(TableId.of(TABLE_ID), "my-key").deleteRow();
+ ApiFuture f = enhancedBigtableStub.mutateRowCallable().futureCall(req, null);
+
+ ExecutionException e = assertThrows(ExecutionException.class, f::get);
+ assertThat(e.getCause()).isInstanceOf(FailedPreconditionException.class);
+ assertThat(invocationCount.get()).isEqualTo(2);
+ }
+
@Test
public void testCreateReadRowsCallable() throws InterruptedException {
ServerStreamingCallable streamingCallable =
@@ -751,6 +868,10 @@ private static class FakeDataService extends BigtableGrpc.BigtableImplBase {
Queues.newLinkedBlockingDeque();
final BlockingQueue pingRequests = Queues.newLinkedBlockingDeque();
final BlockingQueue executeQueryRequests = Queues.newLinkedBlockingDeque();
+ final BlockingQueue mutateRowRequests = Queues.newLinkedBlockingDeque();
+ final BlockingQueue checkAndMutateRowRequests =
+ Queues.newLinkedBlockingDeque();
+ final BlockingQueue rmwRequests = Queues.newLinkedBlockingDeque();
@SuppressWarnings("unchecked")
ReadRowsRequest popLastRequest() throws InterruptedException {
@@ -761,6 +882,37 @@ ExecuteQueryRequest popLastExecuteQueryRequest() throws InterruptedException {
return executeQueryRequests.poll(1, TimeUnit.SECONDS);
}
+ @Override
+ public void mutateRow(
+ MutateRowRequest request, StreamObserver responseObserver) {
+ mutateRowRequests.add(request);
+
+ responseObserver.onNext(MutateRowResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void checkAndMutateRow(
+ CheckAndMutateRowRequest request,
+ StreamObserver responseObserver) {
+ checkAndMutateRowRequests.add(request);
+ responseObserver.onNext(
+ CheckAndMutateRowResponse.newBuilder().setPredicateMatched(true).build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void readModifyWriteRow(
+ ReadModifyWriteRowRequest request,
+ StreamObserver responseObserver) {
+ rmwRequests.add(request);
+ responseObserver.onNext(
+ ReadModifyWriteRowResponse.newBuilder()
+ .setRow(com.google.bigtable.v2.Row.newBuilder().setKey(request.getRowKey()))
+ .build());
+ responseObserver.onCompleted();
+ }
+
@Override
public void mutateRows(
MutateRowsRequest request, StreamObserver responseObserver) {
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/MutateRowCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/MutateRowCallableTest.java
deleted file mode 100644
index 4792b66890..0000000000
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/MutateRowCallableTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2018 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub;
-
-import com.google.api.core.SettableApiFuture;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.bigtable.v2.MutateRowRequest;
-import com.google.bigtable.v2.MutateRowResponse;
-import com.google.cloud.bigtable.data.v2.internal.RequestContext;
-import com.google.cloud.bigtable.data.v2.models.RowMutation;
-import com.google.common.primitives.Longs;
-import com.google.common.truth.Truth;
-import com.google.protobuf.ByteString;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-
-@RunWith(JUnit4.class)
-public class MutateRowCallableTest {
-
- private static final RequestContext REQUEST_CONTEXT =
- RequestContext.create("fake-project", "fake-instance", "fake-profile");
- private UnaryCallable innerCallable;
- private ArgumentCaptor innerMutation;
- private SettableApiFuture innerResult;
-
- @SuppressWarnings("unchecked")
- @Before
- public void setUp() {
- innerCallable = Mockito.mock(UnaryCallable.class);
- innerMutation = ArgumentCaptor.forClass(MutateRowRequest.class);
- innerResult = SettableApiFuture.create();
- Mockito.when(innerCallable.futureCall(innerMutation.capture(), Mockito.any()))
- .thenReturn(innerResult);
- }
-
- @Test
- public void testRequestConversion() {
- MutateRowCallable callable = new MutateRowCallable(innerCallable, REQUEST_CONTEXT);
- RowMutation outerRequest =
- RowMutation.create("fake-table", "fake-key")
- .setCell("fake-family", "fake-qualifier", 1_000, "fake-value")
- .addToCell("family-2", "qualifier", 1_000, 1234)
- .mergeToCell(
- "family-2", "qualifier2", 1_000, ByteString.copyFrom(Longs.toByteArray(1234L)));
-
- innerResult.set(MutateRowResponse.getDefaultInstance());
- callable.call(outerRequest);
-
- Truth.assertThat(innerMutation.getValue()).isEqualTo(outerRequest.toProto(REQUEST_CONTEXT));
- }
-}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/ReadModifyWriteRowCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/ReadModifyWriteRowCallableTest.java
deleted file mode 100644
index 4a8f857d05..0000000000
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/ReadModifyWriteRowCallableTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Copyright 2018 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.api.core.ApiFuture;
-import com.google.api.core.SettableApiFuture;
-import com.google.api.gax.grpc.GrpcStatusCode;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.NotFoundException;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.bigtable.v2.Cell;
-import com.google.bigtable.v2.Column;
-import com.google.bigtable.v2.Family;
-import com.google.bigtable.v2.ReadModifyWriteRowRequest;
-import com.google.bigtable.v2.ReadModifyWriteRowResponse;
-import com.google.bigtable.v2.ReadModifyWriteRule;
-import com.google.cloud.bigtable.data.v2.internal.NameUtil;
-import com.google.cloud.bigtable.data.v2.internal.RequestContext;
-import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
-import com.google.cloud.bigtable.data.v2.models.Row;
-import com.google.cloud.bigtable.data.v2.models.RowCell;
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-import io.grpc.Status.Code;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class ReadModifyWriteRowCallableTest {
- private final RequestContext requestContext =
- RequestContext.create("fake-project", "fake-instance", "fake-profile");
- private FakeCallable inner;
- private ReadModifyWriteRowCallable callable;
-
- @Before
- public void setUp() {
- inner = new FakeCallable();
- callable = new ReadModifyWriteRowCallable(inner, requestContext);
- }
-
- @Test
- public void requestIsCorrect() {
- callable.futureCall(
- ReadModifyWriteRow.create("my-table", "my-key").append("my-family", "", "suffix"));
-
- assertThat(inner.request)
- .isEqualTo(
- ReadModifyWriteRowRequest.newBuilder()
- .setTableName(
- NameUtil.formatTableName(
- requestContext.getProjectId(), requestContext.getInstanceId(), "my-table"))
- .setAppProfileId(requestContext.getAppProfileId())
- .setRowKey(ByteString.copyFromUtf8("my-key"))
- .addRules(
- ReadModifyWriteRule.newBuilder()
- .setFamilyName("my-family")
- .setColumnQualifier(ByteString.EMPTY)
- .setAppendValue(ByteString.copyFromUtf8("suffix")))
- .build());
- }
-
- @Test
- public void responseCorrectlyTransformed() throws Exception {
- ApiFuture result =
- callable.futureCall(
- ReadModifyWriteRow.create("my-table", "my-key").append("my-family", "col", "suffix"));
-
- inner.response.set(
- ReadModifyWriteRowResponse.newBuilder()
- .setRow(
- com.google.bigtable.v2.Row.newBuilder()
- .setKey(ByteString.copyFromUtf8("my-key"))
- .addFamilies(
- Family.newBuilder()
- .setName("my-family")
- .addColumns(
- Column.newBuilder()
- .setQualifier(ByteString.copyFromUtf8("col"))
- .addCells(
- Cell.newBuilder()
- .setTimestampMicros(1_000)
- .setValue(ByteString.copyFromUtf8("suffix"))))))
- .build());
-
- assertThat(result.get(1, TimeUnit.SECONDS))
- .isEqualTo(
- Row.create(
- ByteString.copyFromUtf8("my-key"),
- ImmutableList.of(
- RowCell.create(
- "my-family",
- ByteString.copyFromUtf8("col"),
- 1_000,
- ImmutableList.of(),
- ByteString.copyFromUtf8("suffix")))));
- }
-
- @Test
- public void responseSortsFamilies() throws Exception {
- ByteString col = ByteString.copyFromUtf8("col1");
- ByteString value1 = ByteString.copyFromUtf8("value1");
- ByteString value2 = ByteString.copyFromUtf8("value2");
-
- ApiFuture result =
- callable.futureCall(
- ReadModifyWriteRow.create("my-table", "my-key").append("my-family", "col", "suffix"));
-
- inner.response.set(
- ReadModifyWriteRowResponse.newBuilder()
- .setRow(
- com.google.bigtable.v2.Row.newBuilder()
- .setKey(ByteString.copyFromUtf8("my-key"))
- // family2 is out of order
- .addFamilies(
- Family.newBuilder()
- .setName("family2")
- .addColumns(
- Column.newBuilder()
- .setQualifier(col)
- .addCells(
- Cell.newBuilder()
- .setTimestampMicros(1_000)
- .setValue(value2))))
- .addFamilies(
- Family.newBuilder()
- .setName("family1")
- .addColumns(
- Column.newBuilder()
- .setQualifier(col)
- .addCells(
- Cell.newBuilder()
- .setTimestampMicros(1_000)
- .setValue(value1)))
- .build()))
- .build());
-
- assertThat(result.get(1, TimeUnit.SECONDS))
- .isEqualTo(
- Row.create(
- ByteString.copyFromUtf8("my-key"),
- ImmutableList.of(
- RowCell.create("family1", col, 1_000, ImmutableList.