* NOTE: the caller is responsible for adding tracing & metrics.
*/
private ServerStreamingCallable createReadRowsBaseCallable(
- ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) {
-
+ ServerStreamingCallSettings readRowsSettings,
+ RowAdapter rowAdapter,
+ StreamResumptionStrategy resumptionStrategy) {
ServerStreamingCallable base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.newBuilder()
.setMethodDescriptor(BigtableGrpc.getReadRowsMethod())
.setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(ReadRowsRequest readRowsRequest) {
- String tableName = readRowsRequest.getTableName();
- String authorizedViewName = readRowsRequest.getAuthorizedViewName();
- if (tableName.isEmpty()) {
- tableName =
- NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
- }
- return ImmutableMap.of(
- "table_name",
- tableName,
- "app_profile_id",
- readRowsRequest.getAppProfileId());
- }
- })
+ r ->
+ composeRequestParams(
+ r.getAppProfileId(), r.getTableName(), r.getAuthorizedViewName()))
.build(),
readRowsSettings.getRetryableCodes());
@@ -628,7 +491,7 @@ public Map extract(ReadRowsRequest readRowsRequest) {
// ReadRowsRequest -> ReadRowsResponse callable).
ServerStreamingCallSettings innerSettings =
ServerStreamingCallSettings.newBuilder()
- .setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
+ .setResumptionStrategy(resumptionStrategy)
.setRetryableCodes(readRowsSettings.getRetryableCodes())
.setRetrySettings(readRowsSettings.getRetrySettings())
.setIdleTimeout(readRowsSettings.getIdleTimeout())
@@ -686,15 +549,47 @@ private UnaryCallable> createBulkReadRowsCallable(
UnaryCallable> traced =
new TracedUnaryCallable<>(tracedBatcher, clientContext.getTracerFactory(), span);
- return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
+ return traced.withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withRetrySettings(settings.readRowsSettings().getRetrySettings()));
}
/**
- * Helper function that should only be used by createSampleRowKeysCallable() and
- * createSampleRowKeysWithRequestCallable().
+ * Simple wrapper around {@link #createSampleRowKeysCallableWithRequest()} to provide backwards
+ * compatibility
+ *
+ * @deprecated
+ */
+ @Deprecated
+ private UnaryCallable> createSampleRowKeysCallable() {
+ UnaryCallable> baseCallable =
+ createSampleRowKeysCallableWithRequest();
+ return new UnaryCallable>() {
+ @Override
+ public ApiFuture> futureCall(String s, ApiCallContext apiCallContext) {
+ return baseCallable.futureCall(SampleRowKeysRequest.create(TableId.of(s)), apiCallContext);
+ }
+ };
+ }
+
+ /**
+ * Creates a callable chain to handle SampleRowKeys RPcs. The chain will:
+ *
+ *
+ * - Convert a {@link SampleRowKeysRequest} to a {@link
+ * com.google.bigtable.v2.SampleRowKeysRequest}.
+ *
- Dispatch the request to the GAPIC's {@link BigtableStub#sampleRowKeysCallable()}.
+ *
- Spool responses into a list.
+ *
- Retry on failure.
+ *
- Convert the responses into {@link KeyOffset}s.
+ *
- Add tracing & metrics.
+ *
*/
- private UnaryCallable>
- createSampleRowKeysBaseCallable() {
+ private UnaryCallable>
+ createSampleRowKeysCallableWithRequest() {
+ String methodName = "SampleRowKeys";
+
ServerStreamingCallable
base =
GrpcRawCallableFactory.createServerStreamingCallable(
@@ -703,25 +598,9 @@ private UnaryCallable> createBulkReadRowsCallable(
newBuilder()
.setMethodDescriptor(BigtableGrpc.getSampleRowKeysMethod())
.setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(
- com.google.bigtable.v2.SampleRowKeysRequest sampleRowKeysRequest) {
- String tableName = sampleRowKeysRequest.getTableName();
- String authorizedViewName =
- sampleRowKeysRequest.getAuthorizedViewName();
- if (tableName.isEmpty()) {
- tableName =
- NameUtil.extractTableNameFromAuthorizedViewName(
- authorizedViewName);
- }
- return ImmutableMap.of(
- "table_name",
- tableName,
- "app_profile_id",
- sampleRowKeysRequest.getAppProfileId());
- }
- })
+ r ->
+ composeRequestParams(
+ r.getAppProfileId(), r.getTableName(), r.getAuthorizedViewName()))
.build(),
settings.sampleRowKeysSettings().getRetryableCodes());
@@ -737,51 +616,13 @@ public Map extract(
UnaryCallable>
retryable = withRetries(withBigtableTracer, settings.sampleRowKeysSettings());
- return retryable;
- }
-
- /**
- * Creates a callable chain to handle SampleRowKeys RPcs. The chain will:
- *
- *
- * - Convert a table id to a {@link com.google.bigtable.v2.SampleRowKeysRequest}.
- *
- Dispatch the request to the GAPIC's {@link BigtableStub#sampleRowKeysCallable()}.
- *
- Spool responses into a list.
- *
- Retry on failure.
- *
- Convert the responses into {@link KeyOffset}s.
- *
- Add tracing & metrics.
- *
- */
- private UnaryCallable> createSampleRowKeysCallable() {
- String methodName = "SampleRowKeys";
-
- UnaryCallable>
- baseCallable = createSampleRowKeysBaseCallable();
- return createUserFacingUnaryCallable(
- methodName, new SampleRowKeysCallable(baseCallable, requestContext));
- }
-
- /**
- * Creates a callable chain to handle SampleRowKeys RPcs. The chain will:
- *
- *
- * - Convert a {@link SampleRowKeysRequest} to a {@link
- * com.google.bigtable.v2.SampleRowKeysRequest}.
- *
- Dispatch the request to the GAPIC's {@link BigtableStub#sampleRowKeysCallable()}.
- *
- Spool responses into a list.
- *
- Retry on failure.
- *
- Convert the responses into {@link KeyOffset}s.
- *
- Add tracing & metrics.
- *
- */
- private UnaryCallable>
- createSampleRowKeysCallableWithRequest() {
- String methodName = "SampleRowKeys";
-
- UnaryCallable>
- baseCallable = createSampleRowKeysBaseCallable();
return createUserFacingUnaryCallable(
- methodName, new SampleRowKeysCallableWithRequest(baseCallable, requestContext));
+ methodName,
+ new SampleRowKeysCallableWithRequest(retryable, requestContext)
+ .withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withRetrySettings(settings.sampleRowKeysSettings().getRetrySettings())));
}
/**
@@ -793,42 +634,14 @@ private UnaryCallable> createSampleRowKeysCallable() {
*
*/
private UnaryCallable createMutateRowCallable() {
- String methodName = "MutateRow";
- UnaryCallable base =
- GrpcRawCallableFactory.createUnaryCallable(
- GrpcCallSettings.newBuilder()
- .setMethodDescriptor(BigtableGrpc.getMutateRowMethod())
- .setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(MutateRowRequest mutateRowRequest) {
- String tableName = mutateRowRequest.getTableName();
- String authorizedViewName = mutateRowRequest.getAuthorizedViewName();
- if (tableName.isEmpty()) {
- tableName =
- NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
- }
- return ImmutableMap.of(
- "table_name",
- tableName,
- "app_profile_id",
- mutateRowRequest.getAppProfileId());
- }
- })
- .build(),
- settings.mutateRowSettings().getRetryableCodes());
-
- UnaryCallable withStatsHeaders =
- new StatsHeadersUnaryCallable<>(base);
-
- UnaryCallable withBigtableTracer =
- new BigtableTracerUnaryCallable<>(withStatsHeaders);
-
- UnaryCallable retrying =
- withRetries(withBigtableTracer, settings.mutateRowSettings());
-
- return createUserFacingUnaryCallable(
- methodName, new MutateRowCallable(retrying, requestContext));
+ return createUnaryCallable(
+ BigtableGrpc.getMutateRowMethod(),
+ req ->
+ composeRequestParams(
+ req.getAppProfileId(), req.getTableName(), req.getAuthorizedViewName()),
+ settings.mutateRowSettings(),
+ req -> req.toProto(requestContext),
+ resp -> null);
}
/**
@@ -855,22 +668,9 @@ private UnaryCallable createMutateRowsBas
GrpcCallSettings.newBuilder()
.setMethodDescriptor(BigtableGrpc.getMutateRowsMethod())
.setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(MutateRowsRequest mutateRowsRequest) {
- String tableName = mutateRowsRequest.getTableName();
- String authorizedViewName = mutateRowsRequest.getAuthorizedViewName();
- if (tableName.isEmpty()) {
- tableName =
- NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
- }
- return ImmutableMap.of(
- "table_name",
- tableName,
- "app_profile_id",
- mutateRowsRequest.getAppProfileId());
- }
- })
+ r ->
+ composeRequestParams(
+ r.getAppProfileId(), r.getTableName(), r.getAuthorizedViewName()))
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());
@@ -945,7 +745,10 @@ public Map extract(MutateRowsRequest mutateRowsRequest) {
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);
- return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
+ return traced.withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withRetrySettings(settings.bulkMutateRowsSettings().getRetrySettings()));
}
/**
@@ -1048,44 +851,14 @@ public Batcher newBulkReadRowsBatcher(
*
*/
private UnaryCallable createCheckAndMutateRowCallable() {
- String methodName = "CheckAndMutateRow";
- UnaryCallable base =
- GrpcRawCallableFactory.createUnaryCallable(
- GrpcCallSettings.newBuilder()
- .setMethodDescriptor(BigtableGrpc.getCheckAndMutateRowMethod())
- .setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(
- CheckAndMutateRowRequest checkAndMutateRowRequest) {
- String tableName = checkAndMutateRowRequest.getTableName();
- String authorizedViewName =
- checkAndMutateRowRequest.getAuthorizedViewName();
- if (tableName.isEmpty()) {
- tableName =
- NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
- }
- return ImmutableMap.of(
- "table_name",
- tableName,
- "app_profile_id",
- checkAndMutateRowRequest.getAppProfileId());
- }
- })
- .build(),
- settings.checkAndMutateRowSettings().getRetryableCodes());
-
- UnaryCallable withStatsHeaders =
- new StatsHeadersUnaryCallable<>(base);
-
- UnaryCallable withBigtableTracer =
- new BigtableTracerUnaryCallable<>(withStatsHeaders);
-
- UnaryCallable retrying =
- withRetries(withBigtableTracer, settings.checkAndMutateRowSettings());
-
- return createUserFacingUnaryCallable(
- methodName, new CheckAndMutateRowCallable(retrying, requestContext));
+ return createUnaryCallable(
+ BigtableGrpc.getCheckAndMutateRowMethod(),
+ req ->
+ composeRequestParams(
+ req.getAppProfileId(), req.getTableName(), req.getAuthorizedViewName()),
+ settings.checkAndMutateRowSettings(),
+ req -> req.toProto(requestContext),
+ CheckAndMutateRowResponse::getPredicateMatched);
}
/**
@@ -1099,39 +872,16 @@ public Map extract(
*
*/
private UnaryCallable createReadModifyWriteRowCallable() {
- UnaryCallable base =
- GrpcRawCallableFactory.createUnaryCallable(
- GrpcCallSettings.newBuilder()
- .setMethodDescriptor(BigtableGrpc.getReadModifyWriteRowMethod())
- .setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(ReadModifyWriteRowRequest request) {
- String tableName = request.getTableName();
- String authorizedViewName = request.getAuthorizedViewName();
- if (tableName.isEmpty()) {
- tableName =
- NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
- }
- return ImmutableMap.of(
- "table_name", tableName, "app_profile_id", request.getAppProfileId());
- }
- })
- .build(),
- settings.readModifyWriteRowSettings().getRetryableCodes());
-
- UnaryCallable withStatsHeaders =
- new StatsHeadersUnaryCallable<>(base);
-
- String methodName = "ReadModifyWriteRow";
- UnaryCallable withBigtableTracer =
- new BigtableTracerUnaryCallable<>(withStatsHeaders);
-
- UnaryCallable retrying =
- withRetries(withBigtableTracer, settings.readModifyWriteRowSettings());
-
- return createUserFacingUnaryCallable(
- methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
+ DefaultRowAdapter rowAdapter = new DefaultRowAdapter();
+
+ return createUnaryCallable(
+ BigtableGrpc.getReadModifyWriteRowMethod(),
+ req ->
+ composeRequestParams(
+ req.getAppProfileId(), req.getTableName(), req.getAuthorizedViewName()),
+ settings.readModifyWriteRowSettings(),
+ req -> req.toProto(requestContext),
+ resp -> rowAdapter.createRowFromProto(resp.getRow()));
}
/**
@@ -1160,18 +910,7 @@ public Map extract(ReadModifyWriteRowRequest request) {
.setMethodDescriptor(
BigtableGrpc.getGenerateInitialChangeStreamPartitionsMethod())
.setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(
- GenerateInitialChangeStreamPartitionsRequest
- generateInitialChangeStreamPartitionsRequest) {
- return ImmutableMap.of(
- "table_name",
- generateInitialChangeStreamPartitionsRequest.getTableName(),
- "app_profile_id",
- generateInitialChangeStreamPartitionsRequest.getAppProfileId());
- }
- })
+ r -> composeRequestParams(r.getAppProfileId(), r.getTableName(), ""))
.build(),
settings.generateInitialChangeStreamPartitionsSettings().getRetryableCodes());
@@ -1214,7 +953,11 @@ public Map extract(
ServerStreamingCallable traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);
- return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
+ return traced.withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withRetrySettings(
+ settings.generateInitialChangeStreamPartitionsSettings().getRetrySettings()));
}
/**
@@ -1240,15 +983,7 @@ public Map extract(
GrpcCallSettings.newBuilder()
.setMethodDescriptor(BigtableGrpc.getReadChangeStreamMethod())
.setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(
- ReadChangeStreamRequest readChangeStreamRequest) {
- return ImmutableMap.of(
- "table_name", readChangeStreamRequest.getTableName(),
- "app_profile_id", readChangeStreamRequest.getAppProfileId());
- }
- })
+ r -> composeRequestParams(r.getAppProfileId(), r.getTableName(), ""))
.build(),
settings.readChangeStreamSettings().getRetryableCodes());
@@ -1294,7 +1029,10 @@ public Map extract(
new TracedServerStreamingCallable<>(
readChangeStreamUserCallable, clientContext.getTracerFactory(), span);
- return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
+ return traced.withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withRetrySettings(settings.readChangeStreamSettings().getRetrySettings()));
}
/**
@@ -1335,9 +1073,8 @@ public Map extract(ExecuteQueryRequest executeQueryRequest) {
ServerStreamingCallable withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(base);
- ServerStreamingCallSettings innerSettings =
+ ServerStreamingCallSettings watchdogSettings =
ServerStreamingCallSettings.newBuilder()
- // TODO resumption strategy and retry settings
.setIdleTimeout(settings.executeQuerySettings().getIdleTimeout())
.setWaitTimeout(settings.executeQuerySettings().getWaitTimeout())
.build();
@@ -1345,7 +1082,7 @@ public Map extract(ExecuteQueryRequest executeQueryRequest) {
// Watchdog needs to stay above the metadata observer so that watchdog errors
// are passed through to the metadata future.
ServerStreamingCallable watched =
- Callables.watched(withStatsHeaders, innerSettings, clientContext);
+ Callables.watched(withStatsHeaders, watchdogSettings, clientContext);
ServerStreamingCallable withMetadataObserver =
new MetadataResolvingCallable(watched);
@@ -1356,13 +1093,36 @@ public Map extract(ExecuteQueryRequest executeQueryRequest) {
ServerStreamingCallable withBigtableTracer =
new BigtableTracerStreamingCallable<>(merging);
+ ServerStreamingCallSettings retrySettings =
+ ServerStreamingCallSettings.newBuilder()
+ // TODO add resumption strategy and pass through retry settings unchanged
+ // we pass through retry settings to use the deadlines now but don't
+ // support retries
+ .setRetrySettings(
+ settings
+ .executeQuerySettings()
+ .getRetrySettings()
+ .toBuilder()
+ // override maxAttempts as a safeguard against changes from user
+ .setMaxAttempts(1)
+ .build())
+ .build();
+
+ // Adding RetryingCallable to the callable chain so that client side metrics can be
+ // measured correctly and deadlines are set. Retries are currently disabled.
+ ServerStreamingCallable retries =
+ withRetries(withBigtableTracer, retrySettings);
+
SpanName span = getSpanName("ExecuteQuery");
ServerStreamingCallable traced =
- new TracedServerStreamingCallable<>(
- withBigtableTracer, clientContext.getTracerFactory(), span);
+ new TracedServerStreamingCallable<>(retries, clientContext.getTracerFactory(), span);
return new ExecuteQueryCallable(
- traced.withDefaultCallContext(clientContext.getDefaultCallContext()), requestContext);
+ traced.withDefaultCallContext(
+ clientContext
+ .getDefaultCallContext()
+ .withRetrySettings(settings.executeQuerySettings().getRetrySettings())),
+ requestContext);
}
/**
@@ -1378,23 +1138,114 @@ private UnaryCallable createUserFacin
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
- private UnaryCallable createPingAndWarmCallable() {
- UnaryCallable pingAndWarm =
+ private Map composeRequestParams(
+ String appProfileId, String tableName, String authorizedViewName) {
+ if (tableName.isEmpty() && !authorizedViewName.isEmpty()) {
+ tableName = NameUtil.extractTableNameFromAuthorizedViewName(authorizedViewName);
+ }
+ return ImmutableMap.of("table_name", tableName, "app_profile_id", appProfileId);
+ }
+
+ private UnaryCallable createUnaryCallable(
+ MethodDescriptor methodDescriptor,
+ RequestParamsExtractor headerParamsFn,
+ UnaryCallSettings callSettings,
+ Function requestTransformer,
+ Function responseTranformer) {
+ if (settings.getEnableSkipTrailers()) {
+ return createUnaryCallableNew(
+ methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
+ } else {
+ return createUnaryCallableOld(
+ methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer);
+ }
+ }
+
+ private UnaryCallable createUnaryCallableOld(
+ MethodDescriptor methodDescriptor,
+ RequestParamsExtractor headerParamsFn,
+ UnaryCallSettings callSettings,
+ Function requestTransformer,
+ Function responseTranformer) {
+
+ UnaryCallable base =
GrpcRawCallableFactory.createUnaryCallable(
- GrpcCallSettings.newBuilder()
- .setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod())
- .setParamsExtractor(
- new RequestParamsExtractor() {
- @Override
- public Map extract(PingAndWarmRequest request) {
- return ImmutableMap.of(
- "name", request.getName(),
- "app_profile_id", request.getAppProfileId());
- }
- })
+ GrpcCallSettings.newBuilder()
+ .setMethodDescriptor(methodDescriptor)
+ .setParamsExtractor(headerParamsFn)
.build(),
- Collections.emptySet());
- return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
+ callSettings.getRetryableCodes());
+
+ UnaryCallable withStatsHeaders = new StatsHeadersUnaryCallable<>(base);
+
+ UnaryCallable withBigtableTracer =
+ new BigtableTracerUnaryCallable<>(withStatsHeaders);
+
+ UnaryCallable retrying = withRetries(withBigtableTracer, callSettings);
+
+ UnaryCallable transformed =
+ new UnaryCallable() {
+ @Override
+ public ApiFuture futureCall(ReqT reqT, ApiCallContext apiCallContext) {
+ ApiFuture f =
+ retrying.futureCall(requestTransformer.apply(reqT), apiCallContext);
+ return ApiFutures.transform(
+ f, responseTranformer::apply, MoreExecutors.directExecutor());
+ }
+ };
+
+ UnaryCallable traced =
+ new TracedUnaryCallable<>(
+ transformed,
+ clientContext.getTracerFactory(),
+ getSpanName(methodDescriptor.getBareMethodName()));
+
+ return traced.withDefaultCallContext(
+ clientContext.getDefaultCallContext().withRetrySettings(callSettings.getRetrySettings()));
+ }
+
+ private UnaryCallable createUnaryCallableNew(
+ MethodDescriptor methodDescriptor,
+ RequestParamsExtractor headerParamsFn,
+ UnaryCallSettings callSettings,
+ Function requestTransformer,
+ Function responseTranformer) {
+
+ ServerStreamingCallable base =
+ GrpcRawCallableFactory.createServerStreamingCallable(
+ GrpcCallSettings.newBuilder()
+ .setMethodDescriptor(methodDescriptor)
+ .setParamsExtractor(headerParamsFn)
+ .build(),
+ callSettings.getRetryableCodes());
+
+ base = new StatsHeadersServerStreamingCallable<>(base);
+
+ base = new BigtableTracerStreamingCallable<>(base);
+
+ base = withRetries(base, convertUnaryToServerStreamingSettings(callSettings));
+
+ ServerStreamingCallable transformed =
+ new TransformingServerStreamingCallable<>(base, requestTransformer, responseTranformer);
+
+ return new BigtableUnaryOperationCallable<>(
+ transformed,
+ clientContext.getDefaultCallContext().withRetrySettings(callSettings.getRetrySettings()),
+ clientContext.getTracerFactory(),
+ getSpanName(methodDescriptor.getBareMethodName()),
+ /* allowNoResponse= */ false);
+ }
+
+ private static
+ ServerStreamingCallSettings convertUnaryToServerStreamingSettings(
+ UnaryCallSettings, ?> unarySettings) {
+ return ServerStreamingCallSettings.newBuilder()
+ .setResumptionStrategy(new SimpleStreamResumptionStrategy<>())
+ .setRetryableCodes(unarySettings.getRetryableCodes())
+ .setRetrySettings(unarySettings.getRetrySettings())
+ .setIdleTimeoutDuration(Duration.ZERO)
+ .setWaitTimeoutDuration(Duration.ZERO)
+ .build();
}
private UnaryCallable withRetries(
@@ -1444,6 +1295,8 @@ public UnaryCallable readRowCallable() {
return readRowCallable;
}
+ /** Deprecated, please use {@link #sampleRowKeysCallableWithRequest} */
+ @Deprecated
public UnaryCallable> sampleRowKeysCallable() {
return sampleRowKeysCallable;
}
@@ -1502,10 +1355,6 @@ public ExecuteQueryCallable executeQueryCallable() {
return executeQueryCallable;
}
- UnaryCallable pingAndWarmCallable() {
- return pingAndWarmCallable;
- }
-
//
private SpanName getSpanName(String methodName) {
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
index 46933c1690..5e9e2cfe08 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
@@ -62,9 +62,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.threeten.bp.Duration;
/**
@@ -104,7 +106,14 @@ public class EnhancedBigtableStubSettings extends StubSettings IDEMPOTENT_RETRY_CODES =
ImmutableSet.of(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE);
@@ -231,6 +240,7 @@ public class EnhancedBigtableStubSettings extends StubSettings jwtAudienceMapping;
private final boolean enableRoutingCookie;
private final boolean enableRetryInfo;
+ private final boolean enableSkipTrailers;
private final ServerStreamingCallSettings readRowsSettings;
private final UnaryCallSettings readRowSettings;
@@ -250,6 +260,7 @@ public class EnhancedBigtableStubSettings extends StubSettings jwtAudienceMapping;
private boolean enableRoutingCookie;
private boolean enableRetryInfo;
+ private boolean enableSkipTrailers;
private final ServerStreamingCallSettings.Builder readRowsSettings;
private final UnaryCallSettings.Builder readRowSettings;
@@ -684,6 +710,7 @@ public static class Builder extends StubSettings.Builder getJwtAudienceMapping() {
return jwtAudienceMapping;
@@ -1042,6 +1093,11 @@ public boolean getEnableRetryInfo() {
return enableRetryInfo;
}
+ Builder setEnableSkipTrailers(boolean enabled) {
+ this.enableSkipTrailers = enabled;
+ return this;
+ }
+
/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder readRowsSettings() {
return readRowsSettings;
@@ -1169,6 +1225,7 @@ public String toString() {
.add("jwtAudienceMapping", jwtAudienceMapping)
.add("enableRoutingCookie", enableRoutingCookie)
.add("enableRetryInfo", enableRetryInfo)
+ .add("enableSkipTrailers", enableSkipTrailers)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
@@ -1184,6 +1241,7 @@ public String toString() {
.add("pingAndWarmSettings", pingAndWarmSettings)
.add("executeQuerySettings", executeQuerySettings)
.add("metricsProvider", metricsProvider)
+ .add("metricsEndpoint", metricsEndpoint)
.add("parent", super.toString())
.toString();
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MutateRowCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MutateRowCallable.java
deleted file mode 100644
index 36f47c2d1f..0000000000
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MutateRowCallable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2018 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub;
-
-import com.google.api.core.ApiFunction;
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutures;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.bigtable.v2.MutateRowRequest;
-import com.google.bigtable.v2.MutateRowResponse;
-import com.google.cloud.bigtable.data.v2.internal.RequestContext;
-import com.google.cloud.bigtable.data.v2.models.RowMutation;
-import com.google.common.util.concurrent.MoreExecutors;
-
-/** Simple wrapper for MutateRow to wrap the request and response protobufs. */
-class MutateRowCallable extends UnaryCallable {
- private final UnaryCallable inner;
- private final RequestContext requestContext;
-
- MutateRowCallable(
- UnaryCallable inner, RequestContext requestContext) {
-
- this.inner = inner;
- this.requestContext = requestContext;
- }
-
- @Override
- public ApiFuture futureCall(RowMutation request, ApiCallContext context) {
- ApiFuture rawResponse =
- inner.futureCall(request.toProto(requestContext), context);
-
- return ApiFutures.transform(
- rawResponse,
- new ApiFunction() {
- @Override
- public Void apply(MutateRowResponse mutateRowResponse) {
- return null;
- }
- },
- MoreExecutors.directExecutor());
- }
-}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ReadModifyWriteRowCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ReadModifyWriteRowCallable.java
deleted file mode 100644
index 09e133678e..0000000000
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ReadModifyWriteRowCallable.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2018 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub;
-
-import com.google.api.core.ApiFunction;
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutures;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.bigtable.v2.ReadModifyWriteRowRequest;
-import com.google.bigtable.v2.ReadModifyWriteRowResponse;
-import com.google.cloud.bigtable.data.v2.internal.RequestContext;
-import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
-import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
-import com.google.cloud.bigtable.data.v2.models.Row;
-import com.google.common.util.concurrent.MoreExecutors;
-
-/** Simple wrapper for ReadModifyWriteRow to wrap the request and response protobufs. */
-class ReadModifyWriteRowCallable extends UnaryCallable {
- private final UnaryCallable inner;
- private final RequestContext requestContext;
- private final DefaultRowAdapter rowAdapter;
-
- ReadModifyWriteRowCallable(
- UnaryCallable inner,
- RequestContext requestContext) {
- this.inner = inner;
- this.requestContext = requestContext;
- this.rowAdapter = new DefaultRowAdapter();
- }
-
- @Override
- public ApiFuture futureCall(ReadModifyWriteRow request, ApiCallContext context) {
- ApiFuture rawResponse =
- inner.futureCall(request.toProto(requestContext), context);
-
- return ApiFutures.transform(
- rawResponse,
- new ApiFunction() {
- @Override
- public Row apply(ReadModifyWriteRowResponse readModifyWriteRowResponse) {
- return convertResponse(readModifyWriteRowResponse);
- }
- },
- MoreExecutors.directExecutor());
- }
-
- private Row convertResponse(ReadModifyWriteRowResponse response) {
- return rowAdapter.createRowFromProto(response.getRow());
- }
-}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java
index 7c65bdf95a..0133dd3c2b 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java
@@ -83,7 +83,7 @@ public final void onResponse(ResponseT response) {
@Override
public final void onError(Throwable throwable) {
if (!isClosed.compareAndSet(false, true)) {
- logException("Received error after the stream is closed");
+ logException("Received error after the stream is closed", throwable);
return;
}
@@ -113,6 +113,10 @@ private void logException(String message) {
LOGGER.log(Level.WARNING, message, new IllegalStateException(message));
}
+ private void logException(String message, Throwable cause) {
+ LOGGER.log(Level.WARNING, message, new IllegalStateException(message, cause));
+ }
+
protected abstract void onStartImpl(StreamController streamController);
protected abstract void onResponseImpl(ResponseT response);
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SampleRowKeysCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SampleRowKeysCallable.java
deleted file mode 100644
index 7658e41492..0000000000
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SampleRowKeysCallable.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright 2018 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub;
-
-import com.google.api.core.ApiFunction;
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutures;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.bigtable.v2.SampleRowKeysRequest;
-import com.google.bigtable.v2.SampleRowKeysResponse;
-import com.google.cloud.bigtable.data.v2.internal.NameUtil;
-import com.google.cloud.bigtable.data.v2.internal.RequestContext;
-import com.google.cloud.bigtable.data.v2.models.KeyOffset;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.List;
-
-/** Simple wrapper for SampleRowKeys to wrap the request and response protobufs. */
-class SampleRowKeysCallable extends UnaryCallable> {
- private final RequestContext requestContext;
- private final UnaryCallable> inner;
-
- SampleRowKeysCallable(
- UnaryCallable> inner,
- RequestContext requestContext) {
-
- this.requestContext = requestContext;
- this.inner = inner;
- }
-
- @Override
- public ApiFuture> futureCall(String tableId, ApiCallContext context) {
- String tableName =
- NameUtil.formatTableName(
- requestContext.getProjectId(), requestContext.getInstanceId(), tableId);
-
- SampleRowKeysRequest request =
- SampleRowKeysRequest.newBuilder()
- .setTableName(tableName)
- .setAppProfileId(requestContext.getAppProfileId())
- .build();
-
- ApiFuture> rawResponse = inner.futureCall(request, context);
-
- return ApiFutures.transform(
- rawResponse,
- new ApiFunction, List>() {
- @Override
- public List apply(List rawResponse) {
- return convert(rawResponse);
- }
- },
- MoreExecutors.directExecutor());
- }
-
- private static List convert(List rawResponse) {
- ImmutableList.Builder results = ImmutableList.builder();
-
- for (SampleRowKeysResponse element : rawResponse) {
- results.add(KeyOffset.create(element.getRowKey(), element.getOffsetBytes()));
- }
-
- return results.build();
- }
-}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallable.java
new file mode 100644
index 0000000000..29b104965e
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallable.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub;
+
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.StreamController;
+import java.util.function.Function;
+
+/** Callable to help crossing api boundary lines between models and protos */
+class TransformingServerStreamingCallable
+ extends ServerStreamingCallable {
+ private final ServerStreamingCallable inner;
+ private final Function requestTransformer;
+ private final Function responseTransformer;
+
+ public TransformingServerStreamingCallable(
+ ServerStreamingCallable inner,
+ Function requestTransformer,
+ Function responseTransformer) {
+ this.inner = inner;
+ this.requestTransformer = requestTransformer;
+ this.responseTransformer = responseTransformer;
+ }
+
+ @Override
+ public void call(
+ OuterReqT outerReqT,
+ ResponseObserver outerObserver,
+ ApiCallContext apiCallContext) {
+ InnerReqT innerReq = requestTransformer.apply(outerReqT);
+
+ inner.call(
+ innerReq,
+ new SafeResponseObserver(outerObserver) {
+ @Override
+ public void onStartImpl(StreamController streamController) {
+ outerObserver.onStart(streamController);
+ }
+
+ @Override
+ public void onResponseImpl(InnerRespT innerResp) {
+ outerObserver.onResponse(responseTransformer.apply(innerResp));
+ }
+
+ @Override
+ public void onErrorImpl(Throwable throwable) {
+ outerObserver.onError(throwable);
+ }
+
+ @Override
+ public void onCompleteImpl() {
+ outerObserver.onComplete();
+ }
+ },
+ apiCallContext);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java
index f6a2527302..ff5bcd81c1 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableCloudMonitoringExporter.java
@@ -23,6 +23,7 @@
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METER_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.OPERATION_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.PER_CONNECTION_ERROR_COUNT_NAME;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.REMAINING_DEADLINE_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.RETRY_COUNT_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.SERVER_LATENCIES_NAME;
@@ -39,7 +40,6 @@
import com.google.cloud.monitoring.v3.MetricServiceClient;
import com.google.cloud.monitoring.v3.MetricServiceSettings;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -58,6 +58,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
@@ -79,11 +80,12 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {
Logger.getLogger(BigtableCloudMonitoringExporter.class.getName());
// This system property can be used to override the monitoring endpoint
- // to a different environment. It's meant for internal testing only.
- private static final String MONITORING_ENDPOINT =
- MoreObjects.firstNonNull(
- System.getProperty("bigtable.test-monitoring-endpoint"),
- MetricServiceSettings.getDefaultEndpoint());
+ // to a different environment. It's meant for internal testing only and
+ // will be removed in future versions. Use settings in EnhancedBigtableStubSettings
+ // to override the endpoint.
+ @Deprecated @Nullable
+ private static final String MONITORING_ENDPOINT_OVERRIDE_SYS_PROP =
+ System.getProperty("bigtable.test-monitoring-endpoint");
private static final String APPLICATION_RESOURCE_PROJECT_ID = "project_id";
@@ -93,7 +95,6 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {
private final MetricServiceClient client;
- private final String bigtableProjectId;
private final String taskId;
// The resource the client application is running on
@@ -115,7 +116,8 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {
CLIENT_BLOCKING_LATENCIES_NAME,
APPLICATION_BLOCKING_LATENCIES_NAME,
RETRY_COUNT_NAME,
- CONNECTIVITY_ERROR_COUNT_NAME)
+ CONNECTIVITY_ERROR_COUNT_NAME,
+ REMAINING_DEADLINE_NAME)
.stream()
.map(m -> METER_NAME + m)
.collect(ImmutableList.toImmutableList());
@@ -126,14 +128,21 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {
.collect(ImmutableList.toImmutableList());
public static BigtableCloudMonitoringExporter create(
- String projectId, @Nullable Credentials credentials) throws IOException {
+ @Nullable Credentials credentials, @Nullable String endpoint) throws IOException {
MetricServiceSettings.Builder settingsBuilder = MetricServiceSettings.newBuilder();
CredentialsProvider credentialsProvider =
Optional.ofNullable(credentials)
.map(FixedCredentialsProvider::create)
.orElse(NoCredentialsProvider.create());
settingsBuilder.setCredentialsProvider(credentialsProvider);
- settingsBuilder.setEndpoint(MONITORING_ENDPOINT);
+ if (MONITORING_ENDPOINT_OVERRIDE_SYS_PROP != null) {
+ logger.warning(
+ "Setting the monitoring endpoint through system variable will be removed in future versions");
+ settingsBuilder.setEndpoint(MONITORING_ENDPOINT_OVERRIDE_SYS_PROP);
+ }
+ if (endpoint != null) {
+ settingsBuilder.setEndpoint(endpoint);
+ }
org.threeten.bp.Duration timeout = Duration.ofMinutes(1);
// TODO: createServiceTimeSeries needs special handling if the request failed. Leaving
@@ -154,7 +163,6 @@ public static BigtableCloudMonitoringExporter create(
}
return new BigtableCloudMonitoringExporter(
- projectId,
MetricServiceClient.create(settingsBuilder.build()),
applicationResource,
BigtableExporterUtils.getDefaultTaskValue());
@@ -162,14 +170,10 @@ public static BigtableCloudMonitoringExporter create(
@VisibleForTesting
BigtableCloudMonitoringExporter(
- String projectId,
- MetricServiceClient client,
- @Nullable MonitoredResource applicationResource,
- String taskId) {
+ MetricServiceClient client, @Nullable MonitoredResource applicationResource, String taskId) {
this.client = client;
this.taskId = taskId;
this.applicationResource = applicationResource;
- this.bigtableProjectId = projectId;
}
@Override
@@ -201,15 +205,8 @@ private CompletableResultCode exportBigtableResourceMetrics(Collection metricData.getData().getPoints().stream())
- .allMatch(pd -> bigtableProjectId.equals(BigtableExporterUtils.getProjectId(pd)))) {
- logger.log(Level.WARNING, "Metric data has different a projectId. Skip exporting.");
- return CompletableResultCode.ofFailure();
- }
-
- List bigtableTimeSeries;
+ // List of timeseries by project id
+ Map> bigtableTimeSeries;
try {
bigtableTimeSeries =
BigtableExporterUtils.convertToBigtableTimeSeries(bigtableMetricData, taskId);
@@ -221,37 +218,39 @@ private CompletableResultCode exportBigtableResourceMetrics(Collection> future = exportTimeSeries(projectName, bigtableTimeSeries);
-
CompletableResultCode bigtableExportCode = new CompletableResultCode();
- ApiFutures.addCallback(
- future,
- new ApiFutureCallback>() {
- @Override
- public void onFailure(Throwable throwable) {
- if (bigtableExportFailureLogged.compareAndSet(false, true)) {
- String msg = "createServiceTimeSeries request failed for bigtable metrics.";
- if (throwable instanceof PermissionDeniedException) {
- msg +=
- String.format(
- " Need monitoring metric writer permission on project=%s. Follow https://cloud.google.com/bigtable/docs/client-side-metrics-setup to set up permissions.",
- projectName.getProject());
- }
- logger.log(Level.WARNING, msg, throwable);
- }
- bigtableExportCode.fail();
- }
+ bigtableTimeSeries.forEach(
+ (projectId, ts) -> {
+ ProjectName projectName = ProjectName.of(projectId);
+ ApiFuture> future = exportTimeSeries(projectName, ts);
+ ApiFutures.addCallback(
+ future,
+ new ApiFutureCallback>() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ if (bigtableExportFailureLogged.compareAndSet(false, true)) {
+ String msg = "createServiceTimeSeries request failed for bigtable metrics.";
+ if (throwable instanceof PermissionDeniedException) {
+ msg +=
+ String.format(
+ " Need monitoring metric writer permission on project=%s. Follow https://cloud.google.com/bigtable/docs/client-side-metrics-setup to set up permissions.",
+ projectName.getProject());
+ }
+ logger.log(Level.WARNING, msg, throwable);
+ }
+ bigtableExportCode.fail();
+ }
- @Override
- public void onSuccess(List emptyList) {
- // When an export succeeded reset the export failure flag to false so if there's a
- // transient failure it'll be logged.
- bigtableExportFailureLogged.set(false);
- bigtableExportCode.succeed();
- }
- },
- MoreExecutors.directExecutor());
+ @Override
+ public void onSuccess(List emptyList) {
+ // When an export succeeded reset the export failure flag to false so if there's a
+ // transient failure it'll be logged.
+ bigtableExportFailureLogged.set(false);
+ bigtableExportCode.succeed();
+ }
+ },
+ MoreExecutors.directExecutor());
+ });
return bigtableExportCode;
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableExporterUtils.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableExporterUtils.java
index 5bf6688e17..821c2295e0 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableExporterUtils.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableExporterUtils.java
@@ -63,6 +63,7 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -110,17 +111,24 @@ static String getProjectId(PointData pointData) {
return pointData.getAttributes().get(BIGTABLE_PROJECT_ID_KEY);
}
- static List convertToBigtableTimeSeries(List collection, String taskId) {
- List allTimeSeries = new ArrayList<>();
+ // Returns a list of timeseries by project id
+ static Map> convertToBigtableTimeSeries(
+ List collection, String taskId) {
+ Map> allTimeSeries = new HashMap<>();
for (MetricData metricData : collection) {
if (!metricData.getInstrumentationScopeInfo().getName().equals(METER_NAME)) {
// Filter out metric data for instruments that are not part of the bigtable builtin metrics
continue;
}
- metricData.getData().getPoints().stream()
- .map(pointData -> convertPointToBigtableTimeSeries(metricData, pointData, taskId))
- .forEach(allTimeSeries::add);
+
+ for (PointData pd : metricData.getData().getPoints()) {
+ String projectId = getProjectId(pd);
+ List current =
+ allTimeSeries.computeIfAbsent(projectId, ignored -> new ArrayList<>());
+ current.add(convertPointToBigtableTimeSeries(metricData, pd, taskId));
+ allTimeSeries.put(projectId, current);
+ }
}
return allTimeSeries;
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java
index 3b2242385a..80fcdd0419 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableGrpcStreamTracer.java
@@ -15,11 +15,8 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;
-import com.google.common.base.Stopwatch;
-import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
-import java.util.concurrent.TimeUnit;
/**
* Records the time a request is enqueued in a grpc channel queue. This a bridge between gRPC stream
@@ -28,21 +25,15 @@
*/
class BigtableGrpcStreamTracer extends ClientStreamTracer {
- private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final BigtableTracer tracer;
public BigtableGrpcStreamTracer(BigtableTracer tracer) {
this.tracer = tracer;
}
- @Override
- public void streamCreated(Attributes transportAttrs, Metadata headers) {
- stopwatch.start();
- }
-
@Override
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
- tracer.grpcChannelQueuedLatencies(stopwatch.elapsed(TimeUnit.NANOSECONDS));
+ tracer.grpcMessageSent();
}
static class Factory extends ClientStreamTracer.Factory {
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
index 3445514f7b..5874751512 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java
@@ -19,6 +19,7 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.BaseApiTracer;
+import java.time.Duration;
import javax.annotation.Nullable;
/**
@@ -52,6 +53,13 @@ public void afterResponse(long applicationLatency) {
// noop
}
+ /**
+ * Used by BigtableUnaryOperationCallable to signal that the user visible portion of the RPC is
+ * complete and that metrics should freeze the timers and then publish the frozen values when the
+ * internal portion of the operation completes.
+ */
+ public void operationFinishEarly() {}
+
/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
@@ -83,7 +91,23 @@ public void setLocations(String zone, String cluster) {
// noop
}
+ @Deprecated
+ /** @deprecated {@link #grpcMessageSent()} is called instead. */
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
// noop
}
+
+ /** Called when the message is sent on a grpc channel. */
+ public void grpcMessageSent() {
+ // noop
+ }
+
+ /**
+ * Record the operation timeout from user settings for calculating remaining deadline. Currently,
+ * it's called in BuiltinMetricsTracer on attempt start from {@link BigtableTracerUnaryCallable}
+ * and {@link BigtableTracerStreamingCallable}.
+ */
+ public void setTotalTimeoutDuration(Duration totalTimeoutDuration) {
+ // noop
+ }
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
index 167cd0dc2e..13b832b8b1 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerStreamingCallable.java
@@ -59,9 +59,12 @@ public void call(
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
// tracer should always be an instance of bigtable tracer
if (context.getTracer() instanceof BigtableTracer) {
+ BigtableTracer tracer = (BigtableTracer) context.getTracer();
BigtableTracerResponseObserver innerObserver =
- new BigtableTracerResponseObserver<>(
- responseObserver, (BigtableTracer) context.getTracer(), responseMetadata);
+ new BigtableTracerResponseObserver<>(responseObserver, tracer, responseMetadata);
+ if (context.getRetrySettings() != null) {
+ tracer.setTotalTimeoutDuration(context.getRetrySettings().getTotalTimeoutDuration());
+ }
innerCallable.call(
request,
innerObserver,
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
index 7dfca8b753..37ba74bfdb 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerUnaryCallable.java
@@ -54,10 +54,14 @@ public BigtableTracerUnaryCallable(@Nonnull UnaryCallable i
public ApiFuture futureCall(RequestT request, ApiCallContext context) {
// tracer should always be an instance of BigtableTracer
if (context.getTracer() instanceof BigtableTracer) {
+ BigtableTracer tracer = (BigtableTracer) context.getTracer();
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
BigtableTracerUnaryCallback callback =
new BigtableTracerUnaryCallback(
(BigtableTracer) context.getTracer(), responseMetadata);
+ if (context.getRetrySettings() != null) {
+ tracer.setTotalTimeoutDuration(context.getRetrySettings().getTotalTimeoutDuration());
+ }
ApiFuture