From ef68037c587a0416aebc97a3e8eff7acf1777c51 Mon Sep 17 00:00:00 2001 From: nielm Date: Sat, 26 Feb 2022 22:30:33 +0100 Subject: [PATCH] Fix SpannerIO service call metrics and improve tests. Adds metrics to NaiveSpannerRead. Fixes metric resource identifiers. Refactor and improve SpannerIOReadTest unit tests to add additional coverage. Add unit tests for NaiveSpannerRead (non-partitioned read) Fix metrics for SpannerIO.Write --- .../core/metrics/GcpResourceIdentifiers.java | 13 +- .../sdk/io/gcp/spanner/BatchSpannerRead.java | 187 ++-- .../sdk/io/gcp/spanner/NaiveSpannerRead.java | 29 +- .../sdk/io/gcp/spanner/SpannerConfig.java | 11 +- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 154 +++- .../sdk/io/gcp/spanner/SpannerIOReadTest.java | 815 +++++++++--------- .../SpannerIOWriteExceptionHandlingTest.java | 6 +- .../io/gcp/spanner/SpannerIOWriteTest.java | 709 ++++++++------- 8 files changed, 1073 insertions(+), 851 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java index 336f08d287de..4ab5cacae919 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java @@ -52,12 +52,17 @@ public static String datastoreResource(String projectId, String namespace) { "//bigtable.googleapis.com/projects/%s/namespaces/%s", projectId, namespace); } - public static String spannerTable(String projectId, String databaseId, String tableId) { + public static String spannerTable( + String projectId, String instanceId, String databaseId, String tableId) { return String.format( - "//spanner.googleapis.com/projects/%s/topics/%s/tables/%s", projectId, databaseId, tableId); + "//spanner.googleapis.com/projects/%s/instances/%s/databases/%s/tables/%s", + projectId, instanceId, databaseId, tableId); } - public static String spannerQuery(String projectId, String queryName) { - return String.format("//spanner.googleapis.com/projects/%s/queries/%s", projectId, queryName); + public static String spannerQuery( + String projectId, String instanceId, String databaseId, String queryName) { + return String.format( + "//spanner.googleapis.com/projects/%s/instances/%s/databases/%s/queries/%s", + projectId, instanceId, databaseId, queryName); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java index 97c1c1a39a0c..9c5fb202ea2f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java @@ -20,19 +20,16 @@ import com.google.auto.value.AutoValue; import com.google.cloud.spanner.BatchReadOnlyTransaction; import com.google.cloud.spanner.Options; -import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.Partition; import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.SpannerException; -import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TimestampBound; -import java.util.HashMap; +import java.io.Serializable; import java.util.List; -import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; -import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; import org.apache.beam.runners.core.metrics.ServiceCallMetric; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.ReadAll; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -40,6 +37,10 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +70,21 @@ public static BatchSpannerRead create( abstract TimestampBound getTimestampBound(); + /** + * Container class to combine a ReadOperation with a Partition so that Metrics are implemented + * properly. + */ + @AutoValue + protected abstract static class PartitionedReadOperation implements Serializable { + abstract ReadOperation getReadOperation(); + + abstract Partition getPartition(); + + static PartitionedReadOperation create(ReadOperation readOperation, Partition partition) { + return new AutoValue_BatchSpannerRead_PartitionedReadOperation(readOperation, partition); + } + } + @Override public PCollection expand(PCollection input) { PCollectionView txView = getTxView(); @@ -84,14 +100,14 @@ public PCollection expand(PCollection input) { .apply( "Generate Partitions", ParDo.of(new GeneratePartitionsFn(getSpannerConfig(), txView)).withSideInputs(txView)) - .apply("Shuffle partitions", Reshuffle.viaRandomKey()) + .apply("Shuffle partitions", Reshuffle.viaRandomKey()) .apply( "Read from Partitions", ParDo.of(new ReadFromPartitionFn(getSpannerConfig(), txView)).withSideInputs(txView)); } @VisibleForTesting - static class GeneratePartitionsFn extends DoFn { + static class GeneratePartitionsFn extends DoFn { private final SpannerConfig config; private final PCollectionView txView; @@ -102,6 +118,8 @@ public GeneratePartitionsFn( SpannerConfig config, PCollectionView txView) { this.config = config; this.txView = txView; + Preconditions.checkNotNull(config.getRpcPriority()); + Preconditions.checkNotNull(config.getRpcPriority().get()); } @Setup @@ -117,75 +135,62 @@ public void teardown() throws Exception { @ProcessElement public void processElement(ProcessContext c) throws Exception { Transaction tx = c.sideInput(txView); - BatchReadOnlyTransaction context = + BatchReadOnlyTransaction batchTx = spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId()); - for (Partition p : execute(c.element(), context)) { - c.output(p); - } - } - - private List execute(ReadOperation op, BatchReadOnlyTransaction tx) { - if (config.getRpcPriority() != null && config.getRpcPriority().get() != null) { - return executeWithPriority(op, tx, config.getRpcPriority().get()); - } else { - return executeWithoutPriority(op, tx); - } - } - - private List executeWithoutPriority(ReadOperation op, BatchReadOnlyTransaction tx) { - // Query was selected. - if (op.getQuery() != null) { - return tx.partitionQuery(op.getPartitionOptions(), op.getQuery()); - } - // Read with index was selected. - if (op.getIndex() != null) { - return tx.partitionReadUsingIndex( - op.getPartitionOptions(), - op.getTable(), - op.getIndex(), - op.getKeySet(), - op.getColumns()); - } - // Read from table was selected. - return tx.partitionRead( - op.getPartitionOptions(), op.getTable(), op.getKeySet(), op.getColumns()); - } - - private List executeWithPriority( - ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority rpcPriority) { - // Query was selected. - if (op.getQuery() != null) { - return tx.partitionQuery( - op.getPartitionOptions(), op.getQuery(), Options.priority(rpcPriority)); + ReadOperation op = c.element(); + + // While this creates a ServiceCallMetric for every input element, in reality, the number + // of input elements will either be very few (normally 1!), or they will differ and + // need different metrics. + ServiceCallMetric metric = ReadAll.buildServiceCallMetricForReadOp(config, op); + + List partitions; + try { + if (op.getQuery() != null) { + // Query was selected. + partitions = + batchTx.partitionQuery( + op.getPartitionOptions(), + op.getQuery(), + Options.priority(config.getRpcPriority().get())); + } else if (op.getIndex() != null) { + // Read with index was selected. + partitions = + batchTx.partitionReadUsingIndex( + op.getPartitionOptions(), + op.getTable(), + op.getIndex(), + op.getKeySet(), + op.getColumns(), + Options.priority(config.getRpcPriority().get())); + } else { + // Read from table was selected. + partitions = + batchTx.partitionRead( + op.getPartitionOptions(), + op.getTable(), + op.getKeySet(), + op.getColumns(), + Options.priority(config.getRpcPriority().get())); + } + metric.call("ok"); + } catch (SpannerException e) { + metric.call(e.getErrorCode().getGrpcStatusCode().toString()); + throw e; } - // Read with index was selected. - if (op.getIndex() != null) { - return tx.partitionReadUsingIndex( - op.getPartitionOptions(), - op.getTable(), - op.getIndex(), - op.getKeySet(), - op.getColumns(), - Options.priority(rpcPriority)); + for (Partition p : partitions) { + c.output(PartitionedReadOperation.create(op, p)); } - // Read from table was selected. - return tx.partitionRead( - op.getPartitionOptions(), - op.getTable(), - op.getKeySet(), - op.getColumns(), - Options.priority(rpcPriority)); } } - private static class ReadFromPartitionFn extends DoFn { + private static class ReadFromPartitionFn extends DoFn { private final SpannerConfig config; private final PCollectionView txView; private transient SpannerAccessor spannerAccessor; - private transient String projectId; - private transient ServiceCallMetric serviceCallMetric; + private transient LoadingCache metricsForReadOperation; public ReadFromPartitionFn( SpannerConfig config, PCollectionView txView) { @@ -196,24 +201,28 @@ public ReadFromPartitionFn( @Setup public void setup() throws Exception { spannerAccessor = SpannerAccessor.getOrCreate(config); - projectId = - this.config.getProjectId() == null - || this.config.getProjectId().get() == null - || this.config.getProjectId().get().isEmpty() - ? SpannerOptions.getDefaultProjectId() - : this.config.getProjectId().get(); + + // Use a LoadingCache for metrics as there can be different read operations which result in + // different service call metrics labels. ServiceCallMetric items are created on-demand and + // added to the cache. + metricsForReadOperation = + CacheBuilder.newBuilder() + .maximumSize(SpannerIO.METRICS_CACHE_SIZE) + // worker. + .build( + new CacheLoader() { + @Override + public ServiceCallMetric load(ReadOperation op) { + return ReadAll.buildServiceCallMetricForReadOp(config, op); + } + }); } @Teardown public void teardown() throws Exception { spannerAccessor.close(); - } - - @StartBundle - public void startBundle() throws Exception { - serviceCallMetric = - createServiceCallMetric( - projectId, this.config.getDatabaseId().get(), this.config.getInstanceId().get()); + metricsForReadOperation.invalidateAll(); + metricsForReadOperation.cleanUp(); } @ProcessElement @@ -223,8 +232,9 @@ public void processElement(ProcessContext c) throws Exception { BatchReadOnlyTransaction batchTx = spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId()); - Partition p = c.element(); - try (ResultSet resultSet = batchTx.execute(p)) { + PartitionedReadOperation op = c.element(); + ServiceCallMetric serviceCallMetric = metricsForReadOperation.get(op.getReadOperation()); + try (ResultSet resultSet = batchTx.execute(op.getPartition())) { while (resultSet.next()) { Struct s = resultSet.getCurrentRowAsStruct(); c.output(s); @@ -236,22 +246,5 @@ public void processElement(ProcessContext c) throws Exception { } serviceCallMetric.call("ok"); } - - private ServiceCallMetric createServiceCallMetric( - String projectId, String databaseId, String tableId) { - HashMap baseLabels = new HashMap<>(); - baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); - baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner"); - baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "Read"); - baseLabels.put( - MonitoringInfoConstants.Labels.RESOURCE, - GcpResourceIdentifiers.spannerTable(projectId, databaseId, tableId)); - baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, projectId); - baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, databaseId); - baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, tableId); - ServiceCallMetric serviceCallMetric = - new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); - return serviceCallMetric; - } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java index e460e20df191..831489ecf2f8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java @@ -22,8 +22,10 @@ import com.google.cloud.spanner.Options; import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TimestampBound; +import org.apache.beam.runners.core.metrics.ServiceCallMetric; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -97,37 +99,26 @@ public void teardown() throws Exception { public void processElement(ProcessContext c) throws Exception { Transaction tx = c.sideInput(txView); ReadOperation op = c.element(); + ServiceCallMetric serviceCallMetric = + SpannerIO.ReadAll.buildServiceCallMetricForReadOp(config, op); BatchReadOnlyTransaction context = spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId()); try (ResultSet resultSet = execute(op, context)) { while (resultSet.next()) { c.output(resultSet.getCurrentRowAsStruct()); } + } catch (SpannerException e) { + serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString()); + throw (e); } + serviceCallMetric.call("ok"); } private ResultSet execute(ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) { + RpcPriority rpcPriority = SpannerConfig.DEFAULT_RPC_PRIORITY; if (config.getRpcPriority() != null && config.getRpcPriority().get() != null) { - return executeWithPriority(op, readOnlyTransaction, config.getRpcPriority().get()); - } else { - return executeWithoutPriority(op, readOnlyTransaction); + rpcPriority = config.getRpcPriority().get(); } - } - - private ResultSet executeWithoutPriority( - ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) { - if (op.getQuery() != null) { - return readOnlyTransaction.executeQuery(op.getQuery()); - } - if (op.getIndex() != null) { - return readOnlyTransaction.readUsingIndex( - op.getTable(), op.getIndex(), op.getKeySet(), op.getColumns()); - } - return readOnlyTransaction.read(op.getTable(), op.getKeySet(), op.getColumns()); - } - - private ResultSet executeWithPriority( - ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction, RpcPriority rpcPriority) { if (op.getQuery() != null) { return readOnlyTransaction.executeQuery(op.getQuery(), Options.priority(rpcPriority)); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java index 608396c8cfec..2ee8bc6fe63a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -47,7 +48,7 @@ public abstract class SpannerConfig implements Serializable { // Total allowable backoff time. private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardMinutes(15); // A default priority for batch traffic. - private static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.MEDIUM; + static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.MEDIUM; public abstract @Nullable ValueProvider getProjectId(); @@ -160,6 +161,8 @@ public SpannerConfig withProjectId(String projectId) { /** Specifies the Cloud Spanner instance ID. */ public SpannerConfig withInstanceId(ValueProvider instanceId) { + Preconditions.checkNotNull(instanceId); + Preconditions.checkNotNull(instanceId.get()); return toBuilder().setInstanceId(instanceId).build(); } @@ -170,6 +173,8 @@ public SpannerConfig withInstanceId(String instanceId) { /** Specifies the Cloud Spanner database ID. */ public SpannerConfig withDatabaseId(ValueProvider databaseId) { + Preconditions.checkNotNull(databaseId); + Preconditions.checkNotNull(databaseId.get()); return toBuilder().setDatabaseId(databaseId).build(); } @@ -180,6 +185,8 @@ public SpannerConfig withDatabaseId(String databaseId) { /** Specifies the Cloud Spanner host. */ public SpannerConfig withHost(ValueProvider host) { + Preconditions.checkNotNull(host); + Preconditions.checkNotNull(host.get()); return toBuilder().setHost(host).build(); } @@ -250,6 +257,8 @@ public SpannerConfig withRpcPriority(RpcPriority rpcPriority) { /** Specifies the RPC priority. */ public SpannerConfig withRpcPriority(ValueProvider rpcPriority) { + Preconditions.checkNotNull(rpcPriority); + Preconditions.checkNotNull(rpcPriority.get()); return toBuilder().setRpcPriority(rpcPriority).build(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 9523ca6bbb89..792cbdd1d491 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -58,7 +58,9 @@ import java.util.HashMap; import java.util.List; import java.util.OptionalInt; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; import org.apache.beam.runners.core.metrics.ServiceCallMetric; @@ -118,6 +120,9 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -377,6 +382,12 @@ public class SpannerIO { // Multiple of mutation size to use to gather and sort mutations private static final int DEFAULT_GROUPING_FACTOR = 1000; + // Size of caches for read/write ServiceCallMetric objects . + // This is a reasonable limit, as for reads, each worker will process very few different table + // read requests, and for writes, batching will ensure that write operations for the same + // table occur at the same time (within a bundle). + static final int METRICS_CACHE_SIZE = 100; + /** * Creates an uninitialized instance of {@link Read}. Before use, the {@link Read} must be * configured with a {@link Read#withInstanceId} and {@link Read#withDatabaseId} that identify the @@ -587,6 +598,41 @@ public PCollection expand(PCollection input) { .apply("Reshuffle", Reshuffle.viaRandomKey()) .apply("Read from Cloud Spanner", readTransform); } + + /** Helper function to create ServiceCallMetrics. */ + static ServiceCallMetric buildServiceCallMetricForReadOp( + SpannerConfig config, ReadOperation op) { + + HashMap baseLabels = buildServiceCallMetricLabels(config); + baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "Read"); + + if (op.getQuery() != null) { + String queryName = op.getQueryName(); + if (queryName == null || queryName.isEmpty()) { + // if queryName is not specified, use a hash of the SQL statement string. + queryName = String.format("UNNAMED_QUERY#%08x", op.getQuery().getSql().hashCode()); + } + + baseLabels.put( + MonitoringInfoConstants.Labels.RESOURCE, + GcpResourceIdentifiers.spannerQuery( + baseLabels.get(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID), + config.getInstanceId().get(), + config.getDatabaseId().get(), + queryName)); + baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_QUERY_NAME, queryName); + } else { + baseLabels.put( + MonitoringInfoConstants.Labels.RESOURCE, + GcpResourceIdentifiers.spannerTable( + baseLabels.get(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID), + config.getInstanceId().get(), + config.getDatabaseId().get(), + op.getTable())); + baseLabels.put(MonitoringInfoConstants.Labels.TABLE_ID, op.getTable()); + } + return new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); + } } /** Implementation of {@link #read}. */ @@ -1962,8 +2008,7 @@ static class WriteToSpannerFn extends DoFn, Void> { // Fluent Backoff is not serializable so create at runtime in setup(). private transient FluentBackoff bundleWriteBackoff; - private transient String projectId; - private transient ServiceCallMetric serviceCallMetric; + private transient LoadingCache writeMetricsByTableName; WriteToSpannerFn( SpannerConfig spannerConfig, FailureMode failureMode, TupleTag failedTag) { @@ -1980,12 +2025,19 @@ public void setup() { .withMaxCumulativeBackoff(spannerConfig.getMaxCumulativeBackoff().get()) .withInitialBackoff(spannerConfig.getMaxCumulativeBackoff().get().dividedBy(60)); - projectId = - this.spannerConfig.getProjectId() == null - || this.spannerConfig.getProjectId().get() == null - || this.spannerConfig.getProjectId().get().isEmpty() - ? SpannerOptions.getDefaultProjectId() - : this.spannerConfig.getProjectId().get(); + // Use a LoadingCache for metrics as there can be different tables being written to which + // result in different service call metrics labels. ServiceCallMetric items are created + // on-demand and added to the cache. + writeMetricsByTableName = + CacheBuilder.newBuilder() + .maximumSize(METRICS_CACHE_SIZE) + .build( + new CacheLoader() { + @Override + public ServiceCallMetric load(String tableName) { + return buildWriteServiceCallMetric(spannerConfig, tableName); + } + }); } @Teardown @@ -1993,35 +2045,25 @@ public void teardown() { spannerAccessor.close(); } - @StartBundle - public void startBundle() { - serviceCallMetric = - createServiceCallMetric( - projectId, - this.spannerConfig.getDatabaseId().get(), - this.spannerConfig.getInstanceId().get(), - "Write"); - } - @ProcessElement public void processElement(ProcessContext c) throws Exception { - Iterable mutations = c.element(); + List mutations = ImmutableList.copyOf(c.element()); // Batch upsert rows. try { mutationGroupBatchesReceived.inc(); - mutationGroupsReceived.inc(Iterables.size(mutations)); + mutationGroupsReceived.inc(mutations.size()); Iterable batch = Iterables.concat(mutations); writeMutations(batch); mutationGroupBatchesWriteSuccess.inc(); - mutationGroupsWriteSuccess.inc(Iterables.size(mutations)); + mutationGroupsWriteSuccess.inc(mutations.size()); return; } catch (SpannerException e) { mutationGroupBatchesWriteFail.inc(); if (failureMode == FailureMode.REPORT_FAILURES) { // fall through and retry individual mutationGroups. } else if (failureMode == FailureMode.FAIL_FAST) { - mutationGroupsWriteFail.inc(Iterables.size(mutations)); + mutationGroupsWriteFail.inc(mutations.size()); throw e; } else { throw new IllegalArgumentException("Unknown failure mode " + failureMode); @@ -2046,8 +2088,7 @@ public void processElement(ProcessContext c) throws Exception { Spanner aborts all inflight transactions during a schema change. Client is expected to retry silently. These must not be counted against retry backoff. */ - private void spannerWriteWithRetryIfSchemaChange(Iterable batch) - throws SpannerException { + private void spannerWriteWithRetryIfSchemaChange(List batch) throws SpannerException { for (int retry = 1; ; retry++) { try { if (spannerConfig.getRpcPriority() != null @@ -2059,10 +2100,10 @@ private void spannerWriteWithRetryIfSchemaChange(Iterable batch) } else { spannerAccessor.getDatabaseClient().writeAtLeastOnce(batch); } - serviceCallMetric.call("ok"); + reportServiceCallMetricsForBatch(batch, "ok"); return; } catch (AbortedException e) { - serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString()); + reportServiceCallMetricsForBatch(batch, e.getErrorCode().getGrpcStatusCode().toString()); if (retry >= ABORTED_RETRY_ATTEMPTS) { throw e; } @@ -2071,33 +2112,40 @@ private void spannerWriteWithRetryIfSchemaChange(Iterable batch) } throw e; } catch (SpannerException e) { - serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString()); + reportServiceCallMetricsForBatch(batch, e.getErrorCode().getGrpcStatusCode().toString()); throw e; } } } - private ServiceCallMetric createServiceCallMetric( - String projectId, String databaseId, String tableId, String method) { - HashMap baseLabels = new HashMap<>(); - baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); - baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner"); - baseLabels.put(MonitoringInfoConstants.Labels.METHOD, method); + private void reportServiceCallMetricsForBatch(List batch, String statusCode) { + // Get names of all tables in batch of mutations. + Set tableNames = batch.stream().map(Mutation::getTable).collect(Collectors.toSet()); + for (String tableName : tableNames) { + writeMetricsByTableName.getUnchecked(tableName).call(statusCode); + } + } + + private static ServiceCallMetric buildWriteServiceCallMetric( + SpannerConfig config, String tableId) { + HashMap baseLabels = buildServiceCallMetricLabels(config); + baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "Write"); baseLabels.put( MonitoringInfoConstants.Labels.RESOURCE, - GcpResourceIdentifiers.spannerTable(projectId, databaseId, tableId)); - baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, projectId); - baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, databaseId); - baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, tableId); - ServiceCallMetric serviceCallMetric = - new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); - return serviceCallMetric; + GcpResourceIdentifiers.spannerTable( + baseLabels.get(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID), + config.getInstanceId().get(), + config.getDatabaseId().get(), + tableId)); + baseLabels.put(MonitoringInfoConstants.Labels.TABLE_ID, tableId); + return new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); } /** Write the Mutations to Spanner, handling DEADLINE_EXCEEDED with backoff/retries. */ - private void writeMutations(Iterable mutations) throws SpannerException, IOException { + private void writeMutations(Iterable mutationIterable) + throws SpannerException, IOException { BackOff backoff = bundleWriteBackoff.backoff(); - long mutationsSize = Iterables.size(mutations); + List mutations = ImmutableList.copyOf(mutationIterable); while (true) { Stopwatch timer = Stopwatch.createStarted(); @@ -2116,7 +2164,7 @@ private void writeMutations(Iterable mutations) throws SpannerExceptio LOG.error( "DEADLINE_EXCEEDED writing batch of {} mutations to Cloud Spanner. " + "Aborting after too many retries.", - mutationsSize); + mutations.size()); spannerWriteFail.inc(); throw exception; } @@ -2124,7 +2172,7 @@ private void writeMutations(Iterable mutations) throws SpannerExceptio "DEADLINE_EXCEEDED writing batch of {} mutations to Cloud Spanner, " + "retrying after backoff of {}ms\n" + "({})", - mutationsSize, + mutations.size(), sleepTimeMsecs, exception.getMessage()); spannerWriteRetries.inc(); @@ -2146,4 +2194,22 @@ private void writeMutations(Iterable mutations) throws SpannerExceptio } private SpannerIO() {} // Prevent construction. + + private static HashMap buildServiceCallMetricLabels(SpannerConfig config) { + HashMap baseLabels = new HashMap<>(); + baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); + baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner"); + baseLabels.put( + MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, + config.getProjectId() == null + || config.getProjectId().get() == null + || config.getProjectId().get().isEmpty() + ? SpannerOptions.getDefaultProjectId() + : config.getProjectId().get()); + baseLabels.put( + MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, config.getInstanceId().get()); + baseLabels.put( + MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, config.getDatabaseId().get()); + return baseLabels; + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java index d6af52232ba3..bbee8f975fd8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.spanner; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; @@ -34,17 +35,17 @@ import com.google.cloud.spanner.Partition; import com.google.cloud.spanner.PartitionOptions; import com.google.cloud.spanner.ResultSets; -import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerExceptionFactory; +import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TimestampBound; import com.google.cloud.spanner.Type; import com.google.cloud.spanner.Value; import com.google.protobuf.ByteString; -import io.grpc.Status.Code; import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; @@ -52,18 +53,16 @@ import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; -import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Read; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.checkerframework.checker.nullness.qual.Nullable; +import org.jetbrains.annotations.NotNull; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mockito; @@ -72,14 +71,23 @@ @RunWith(JUnit4.class) public class SpannerIOReadTest implements Serializable { + private static final TimestampBound TIMESTAMP_BOUND = + TimestampBound.ofReadTimestamp(Timestamp.ofTimeMicroseconds(12345)); + public static final String PROJECT_ID = "1234"; + public static final String INSTANCE_ID = "123"; + public static final String DATABASE_ID = "aaa"; + public static final String TABLE_ID = "users"; + public static final String QUERY_NAME = "My-query"; + public static final String QUERY_STATEMENT = "SELECT * FROM users"; + @Rule public final transient TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); - @Rule public final transient ExpectedException thrown = ExpectedException.none(); - private FakeServiceFactory serviceFactory; private BatchReadOnlyTransaction mockBatchTx; + private Partition fakePartition; + private SpannerConfig spannerConfig; private static final Type FAKE_TYPE = Type.struct( @@ -95,201 +103,299 @@ public class SpannerIOReadTest implements Serializable { Struct.newBuilder().set("id").to(Value.int64(6)).set("name").to("Floyd").build()); @Before - @SuppressWarnings("unchecked") public void setUp() throws Exception { serviceFactory = new FakeServiceFactory(); mockBatchTx = Mockito.mock(BatchReadOnlyTransaction.class); + fakePartition = FakePartitionFactory.createFakeQueryPartition(ByteString.copyFromUtf8("one")); + spannerConfig = + SpannerConfig.create() + .withProjectId(PROJECT_ID) + .withInstanceId(INSTANCE_ID) + .withDatabaseId(DATABASE_ID) + .withServiceFactory(serviceFactory); + + // Setup the common mocks. + when(mockBatchTx.getBatchTransactionId()) + .thenReturn(new FakeBatchTransactionId("runQueryTest")); + when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(TIMESTAMP_BOUND)) + .thenReturn(mockBatchTx); + when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class))) + .thenReturn(mockBatchTx); + // Setup the ProcessWideContainer for testing metrics are set. MetricsContainerImpl container = new MetricsContainerImpl(null); MetricsEnvironment.setProcessWideContainer(container); + MetricsEnvironment.setCurrentContainer(container); } @Test - public void runQuery() throws Exception { - Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp); - - SpannerConfig spannerConfig = getSpannerConfig(); + public void runBatchQueryTestWithProjectId() { + runBatchQueryTest( + SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery(QUERY_STATEMENT) + .withQueryName(QUERY_NAME) + .withTimestampBound(TIMESTAMP_BOUND)); + } - PCollection one = - pipeline.apply( - "read q", - SpannerIO.read() - .withSpannerConfig(spannerConfig) - .withQuery("SELECT * FROM users") - .withTimestampBound(timestampBound)); + @Test + public void runBatchQueryTestWithUnspecifiedProject() { + // Default spannerConfig has project ID specified - use an unspecified project. + runBatchQueryTest( + SpannerIO.read() + .withSpannerConfig( + SpannerConfig.create() + .withInstanceId(INSTANCE_ID) + .withDatabaseId(DATABASE_ID) + .withServiceFactory(serviceFactory)) + .withQuery(QUERY_STATEMENT) + .withQueryName(QUERY_NAME) + .withTimestampBound(TIMESTAMP_BOUND)); + } - FakeBatchTransactionId id = new FakeBatchTransactionId("runQueryTest"); - when(mockBatchTx.getBatchTransactionId()).thenReturn(id); + @Test + public void runBatchQueryTestWithNullProject() { + runBatchQueryTest( + SpannerIO.read() + .withSpannerConfig( + SpannerConfig.create() + .withProjectId((String) null) + .withInstanceId(INSTANCE_ID) + .withDatabaseId(DATABASE_ID) + .withServiceFactory(serviceFactory)) + .withQuery(QUERY_STATEMENT) + .withQueryName(QUERY_NAME) + .withTimestampBound(TIMESTAMP_BOUND)); + } - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound)) - .thenReturn(mockBatchTx); - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class))) - .thenReturn(mockBatchTx); + @Test + public void runBatchQueryTestWithPriority() { + SpannerIO.Read readTransform = + SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery(QUERY_STATEMENT) + .withQueryName(QUERY_NAME) + .withTimestampBound(TIMESTAMP_BOUND) + .withHighPriority(); + runBatchQueryTest(readTransform); + assertEquals(RpcPriority.HIGH, readTransform.getSpannerConfig().getRpcPriority().get()); + } - Partition fakePartition = - FakePartitionFactory.createFakeQueryPartition(ByteString.copyFromUtf8("one")); + private void runBatchQueryTest(SpannerIO.Read readTransform) { + PCollection results = pipeline.apply("read q", readTransform); when(mockBatchTx.partitionQuery( any(PartitionOptions.class), - eq(Statement.of("SELECT * FROM users")), + eq(Statement.of(QUERY_STATEMENT)), any(ReadQueryUpdateTransactionOption.class))) - .thenReturn(Arrays.asList(fakePartition, fakePartition)); + .thenReturn(Arrays.asList(fakePartition, fakePartition, fakePartition)); when(mockBatchTx.execute(any(Partition.class))) .thenReturn( ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)), - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 6))); - - PAssert.that(one).containsInAnyOrder(FAKE_ROWS); + ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)), + ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))); + PAssert.that(results).containsInAnyOrder(FAKE_ROWS); pipeline.run(); + verifyQueryRequestMetricWasSet(readTransform.getSpannerConfig(), QUERY_NAME, "ok", 4); } @Test - public void runQueryWithPriority() throws Exception { - Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp); - - SpannerConfig spannerConfig = getSpannerConfig(); - Read read = - SpannerIO.read() - .withSpannerConfig(spannerConfig) - .withQuery("SELECT * FROM users") - .withTimestampBound(timestampBound) - .withHighPriority(); - - PCollection one = pipeline.apply("read q", read); - - FakeBatchTransactionId id = new FakeBatchTransactionId("runQueryTest"); - when(mockBatchTx.getBatchTransactionId()).thenReturn(id); - - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound)) - .thenReturn(mockBatchTx); - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class))) - .thenReturn(mockBatchTx); + public void runBatchQueryTestWithFailures() { - Partition fakePartition = - FakePartitionFactory.createFakeQueryPartition(ByteString.copyFromUtf8("one")); + PCollection results = + pipeline.apply( + "read q", + SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery(QUERY_STATEMENT) + .withQueryName(QUERY_NAME) + .withTimestampBound(TIMESTAMP_BOUND)); when(mockBatchTx.partitionQuery( any(PartitionOptions.class), - eq(Statement.of("SELECT * FROM users")), + eq(Statement.of(QUERY_STATEMENT)), any(ReadQueryUpdateTransactionOption.class))) .thenReturn(Arrays.asList(fakePartition, fakePartition)); when(mockBatchTx.execute(any(Partition.class))) - .thenReturn( - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)), - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 6))); + .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS)) + .thenThrow( + SpannerExceptionFactory.newSpannerException( + ErrorCode.PERMISSION_DENIED, "Simulated Failure")); - PAssert.that(one).containsInAnyOrder(FAKE_ROWS); - assertEquals(RpcPriority.HIGH, read.getSpannerConfig().getRpcPriority().get()); - pipeline.run(); - } + PAssert.that(results).containsInAnyOrder(FAKE_ROWS); - private SpannerConfig getSpannerConfig() { - return SpannerConfig.create() - .withProjectId("test") - .withInstanceId("123") - .withDatabaseId("aaa") - .withServiceFactory(serviceFactory); + assertThrows( + "PERMISSION_DENIED: Simulated Failure", PipelineExecutionException.class, pipeline::run); + verifyQueryRequestMetricWasSet(spannerConfig, QUERY_NAME, "ok", 2); + verifyQueryRequestMetricWasSet(spannerConfig, QUERY_NAME, "permission_denied", 1); } @Test - public void runReadTestWithProjectId() throws Exception { - runReadTest(getSpannerConfig()); + public void runNaiveQueryTestWithProjectId() { + runNaiveQueryTest( + SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery(QUERY_STATEMENT) + .withQueryName(QUERY_NAME) + .withTimestampBound(TIMESTAMP_BOUND)); } @Test - public void runReadTestWithDefaultProject() throws Exception { - runReadTest( - SpannerConfig.create() - .withInstanceId("123") - .withDatabaseId("aaa") - .withServiceFactory(serviceFactory)); + public void runNaiveQueryTestWithUnspecifiedProject() { + // Default spannerConfig has project ID specified - use an unspecified project. + runNaiveQueryTest( + SpannerIO.read() + .withSpannerConfig( + SpannerConfig.create() + .withInstanceId(INSTANCE_ID) + .withDatabaseId(DATABASE_ID) + .withServiceFactory(serviceFactory)) + .withQuery(QUERY_STATEMENT) + .withQueryName(QUERY_NAME) + .withTimestampBound(TIMESTAMP_BOUND)); } @Test - public void runReadTestWithNullProject() throws Exception { - runReadTest( - SpannerConfig.create() - .withProjectId((String) null) - .withInstanceId("123") - .withDatabaseId("aaa") - .withServiceFactory(serviceFactory)); + public void runNaiveQueryTestWithNullProject() { + runNaiveQueryTest( + SpannerIO.read() + .withSpannerConfig( + SpannerConfig.create() + .withProjectId((String) null) + .withInstanceId(INSTANCE_ID) + .withDatabaseId(DATABASE_ID) + .withServiceFactory(serviceFactory)) + .withQuery(QUERY_STATEMENT) + .withQueryName(QUERY_NAME) + .withTimestampBound(TIMESTAMP_BOUND)); } - private void runReadTest(SpannerConfig spannerConfig) throws Exception { - Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp); - - PCollection one = - pipeline.apply( - "read q", - SpannerIO.read() - .withSpannerConfig(spannerConfig) - .withTable("users") - .withColumns("id", "name") - .withTimestampBound(timestampBound)); - - FakeBatchTransactionId id = new FakeBatchTransactionId("runReadTest"); - when(mockBatchTx.getBatchTransactionId()).thenReturn(id); - - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound)) - .thenReturn(mockBatchTx); - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class))) - .thenReturn(mockBatchTx); + @Test + public void runNaiveQueryTestWithPriority() { + SpannerIO.Read readTransform = + SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery(QUERY_STATEMENT) + .withQueryName(QUERY_NAME) + .withTimestampBound(TIMESTAMP_BOUND) + .withHighPriority(); + runNaiveQueryTest(readTransform); + assertEquals(RpcPriority.HIGH, readTransform.getSpannerConfig().getRpcPriority().get()); + } - Partition fakePartition = - FakePartitionFactory.createFakeReadPartition(ByteString.copyFromUtf8("one")); + private void runNaiveQueryTest(SpannerIO.Read readTransform) { + readTransform = readTransform.withBatching(false); + PCollection results = pipeline.apply("read q", readTransform); + when(mockBatchTx.executeQuery( + eq(Statement.of(QUERY_STATEMENT)), any(ReadQueryUpdateTransactionOption.class))) + .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS)); - when(mockBatchTx.partitionRead( - any(PartitionOptions.class), - eq("users"), - eq(KeySet.all()), - eq(Arrays.asList("id", "name")), - any(ReadQueryUpdateTransactionOption.class))) - .thenReturn(Arrays.asList(fakePartition, fakePartition, fakePartition)); - when(mockBatchTx.execute(any(Partition.class))) - .thenReturn( - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)), - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)), - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))); - - PAssert.that(one).containsInAnyOrder(FAKE_ROWS); + PAssert.that(results).containsInAnyOrder(FAKE_ROWS); + pipeline.run(); + verifyQueryRequestMetricWasSet(readTransform.getSpannerConfig(), QUERY_NAME, "ok", 1); + } + @Test + public void runNaiveQueryTestWithAnonymousQuery() { + SpannerIO.Read readTransform = + SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery(QUERY_STATEMENT) + .withTimestampBound(TIMESTAMP_BOUND) + .withHighPriority() + .withBatching(false); + PCollection results = pipeline.apply("read q", readTransform); + when(mockBatchTx.executeQuery( + eq(Statement.of(QUERY_STATEMENT)), any(ReadQueryUpdateTransactionOption.class))) + .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS)); + + PAssert.that(results).containsInAnyOrder(FAKE_ROWS); pipeline.run(); + String queryName = String.format("UNNAMED_QUERY#%08x", QUERY_STATEMENT.hashCode()); + verifyQueryRequestMetricWasSet(spannerConfig, queryName, "ok", 1); } @Test - public void runReadWithPriority() throws Exception { - Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp); + public void runNaiveQueryTestWithFailures() { - SpannerConfig spannerConfig = getSpannerConfig(); + pipeline.apply( + "read q", + SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery(QUERY_STATEMENT) + .withQueryName(QUERY_NAME) + .withTimestampBound(TIMESTAMP_BOUND) + .withBatching(false)); + when(mockBatchTx.executeQuery( + eq(Statement.of(QUERY_STATEMENT)), any(ReadQueryUpdateTransactionOption.class))) + .thenThrow( + SpannerExceptionFactory.newSpannerException( + ErrorCode.PERMISSION_DENIED, "Simulated Failure")); + assertThrows( + "PERMISSION_DENIED: Simulated Failure", PipelineExecutionException.class, pipeline::run); + verifyQueryRequestMetricWasSet(spannerConfig, QUERY_NAME, "permission_denied", 1); + } - Read read = + @Test + public void runBatchReadTestWithProjectId() { + runBatchReadTest( SpannerIO.read() .withSpannerConfig(spannerConfig) - .withTable("users") + .withTable(TABLE_ID) .withColumns("id", "name") - .withTimestampBound(timestampBound) - .withLowPriority(); - PCollection one = pipeline.apply("read q", read); + .withTimestampBound(TIMESTAMP_BOUND)); + } - FakeBatchTransactionId id = new FakeBatchTransactionId("runReadTest"); - when(mockBatchTx.getBatchTransactionId()).thenReturn(id); + @Test + public void runBatchReadTestWithUnspecifiedProject() { + // Default spannerConfig has project ID specified - use an unspecified project. + runBatchReadTest( + SpannerIO.read() + .withSpannerConfig( + SpannerConfig.create() + .withInstanceId(INSTANCE_ID) + .withDatabaseId(DATABASE_ID) + .withServiceFactory(serviceFactory)) + .withTable(TABLE_ID) + .withColumns("id", "name") + .withTimestampBound(TIMESTAMP_BOUND)); + } - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound)) - .thenReturn(mockBatchTx); - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class))) - .thenReturn(mockBatchTx); + @Test + public void runBatchReadTestWithNullProject() { + runBatchReadTest( + SpannerIO.read() + .withSpannerConfig( + SpannerConfig.create() + .withProjectId((String) null) + .withInstanceId(INSTANCE_ID) + .withDatabaseId(DATABASE_ID) + .withServiceFactory(serviceFactory)) + .withTable(TABLE_ID) + .withColumns("id", "name") + .withTimestampBound(TIMESTAMP_BOUND)); + } + + @Test + public void runBatchReadTestWithPriority() { + SpannerIO.Read readTransform = + SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withTable(TABLE_ID) + .withColumns("id", "name") + .withTimestampBound(TIMESTAMP_BOUND) + .withHighPriority(); + runBatchReadTest(readTransform); + assertEquals(RpcPriority.HIGH, readTransform.getSpannerConfig().getRpcPriority().get()); + } - Partition fakePartition = - FakePartitionFactory.createFakeReadPartition(ByteString.copyFromUtf8("one")); + private void runBatchReadTest(SpannerIO.Read readTransform) { + PCollection results = pipeline.apply("read q", readTransform); when(mockBatchTx.partitionRead( any(PartitionOptions.class), - eq("users"), + eq(TABLE_ID), eq(KeySet.all()), eq(Arrays.asList("id", "name")), any(ReadQueryUpdateTransactionOption.class))) @@ -300,265 +406,152 @@ public void runReadWithPriority() throws Exception { ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)), ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))); - PAssert.that(one).containsInAnyOrder(FAKE_ROWS); - assertEquals(RpcPriority.LOW, read.getSpannerConfig().getRpcPriority().get()); + PAssert.that(results).containsInAnyOrder(FAKE_ROWS); pipeline.run(); + verifyTableRequestMetricWasSet(readTransform.getSpannerConfig(), TABLE_ID, "ok", 4); } @Test - public void testQueryMetricsFail() throws Exception { - Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp); - - SpannerConfig spannerConfig = getSpannerConfig(); - - pipeline.apply( - "read q", + public void runBatchReadTestWithFailures() { + SpannerIO.Read readTransform = SpannerIO.read() .withSpannerConfig(spannerConfig) - .withQuery("SELECT * FROM users") - .withQueryName("queryName") - .withTimestampBound(timestampBound)); - - FakeBatchTransactionId id = new FakeBatchTransactionId("runQueryTest"); - when(mockBatchTx.getBatchTransactionId()).thenReturn(id); - - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound)) - .thenReturn(mockBatchTx); - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class))) - .thenReturn(mockBatchTx); + .withTable(TABLE_ID) + .withColumns("id", "name") + .withTimestampBound(TIMESTAMP_BOUND); - Partition fakePartition = - FakePartitionFactory.createFakeQueryPartition(ByteString.copyFromUtf8("one")); + pipeline.apply("read q", readTransform); - when(mockBatchTx.partitionQuery( + when(mockBatchTx.partitionRead( any(PartitionOptions.class), - eq(Statement.of("SELECT * FROM users")), + eq(TABLE_ID), + eq(KeySet.all()), + eq(Arrays.asList("id", "name")), any(ReadQueryUpdateTransactionOption.class))) .thenReturn(Arrays.asList(fakePartition)); when(mockBatchTx.execute(any(Partition.class))) .thenThrow( SpannerExceptionFactory.newSpannerException( - ErrorCode.DEADLINE_EXCEEDED, "Simulated Timeout 1")); - try { - pipeline.run(); - } catch (PipelineExecutionException e) { - if (e.getCause() instanceof SpannerException - && ((SpannerException) e.getCause()).getErrorCode().getGrpcStatusCode() - == Code.DEADLINE_EXCEEDED) { - // expected - } else { - throw e; - } - } - verifyMetricWasSet("test", "aaa", "123", "deadline_exceeded", null, 1); - verifyMetricWasSet("test", "aaa", "123", "ok", null, 0); - } + ErrorCode.PERMISSION_DENIED, "Simulated Failure")); - @Test - public void testQueryMetricsSucceed() throws Exception { - Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp); + assertThrows( + "PERMISSION_DENIED: Simulated Failure", PipelineExecutionException.class, pipeline::run); - SpannerConfig spannerConfig = getSpannerConfig(); + verifyTableRequestMetricWasSet(spannerConfig, TABLE_ID, "ok", 1); + verifyTableRequestMetricWasSet(spannerConfig, TABLE_ID, "permission_denied", 1); + } - pipeline.apply( - "read q", + @Test + public void runNaiveReadTestWithProjectId() { + runNaiveReadTest( SpannerIO.read() .withSpannerConfig(spannerConfig) - .withQuery("SELECT * FROM users") - .withQueryName("queryName") - .withTimestampBound(timestampBound)); - - FakeBatchTransactionId id = new FakeBatchTransactionId("runQueryTest"); - when(mockBatchTx.getBatchTransactionId()).thenReturn(id); - - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound)) - .thenReturn(mockBatchTx); - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class))) - .thenReturn(mockBatchTx); - - Partition fakePartition = - FakePartitionFactory.createFakeQueryPartition(ByteString.copyFromUtf8("one")); - - when(mockBatchTx.partitionQuery( - any(PartitionOptions.class), - eq(Statement.of("SELECT * FROM users")), - any(ReadQueryUpdateTransactionOption.class))) - .thenReturn(Arrays.asList(fakePartition, fakePartition)); - when(mockBatchTx.execute(any(Partition.class))) - .thenReturn( - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)), - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)), - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))) - .thenReturn( - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)), - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)), - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))); - - pipeline.run(); - verifyMetricWasSet("test", "aaa", "123", "deadline_exceeded", null, 0); - verifyMetricWasSet("test", "aaa", "123", "ok", null, 2); + .withTable(TABLE_ID) + .withColumns("id", "name") + .withTimestampBound(TIMESTAMP_BOUND)); } @Test - public void testReadMetricsFail() throws Exception { - Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp); + public void runNaiveReadTestWithUnspecifiedProject() { + // Default spannerConfig has project ID specified - use an unspecified project. + runNaiveReadTest( + SpannerIO.read() + .withSpannerConfig( + SpannerConfig.create() + .withInstanceId(INSTANCE_ID) + .withDatabaseId(DATABASE_ID) + .withServiceFactory(serviceFactory)) + .withTable(TABLE_ID) + .withColumns("id", "name") + .withTimestampBound(TIMESTAMP_BOUND)); + } - SpannerConfig spannerConfig = getSpannerConfig(); + @Test + public void runNaiveReadTestWithNullProject() { + runNaiveReadTest( + SpannerIO.read() + .withSpannerConfig( + SpannerConfig.create() + .withProjectId((String) null) + .withInstanceId(INSTANCE_ID) + .withDatabaseId(DATABASE_ID) + .withServiceFactory(serviceFactory)) + .withTable(TABLE_ID) + .withColumns("id", "name") + .withTimestampBound(TIMESTAMP_BOUND)); + } - pipeline.apply( - "read q", + @Test + public void runNaiveReadTestWithPriority() { + SpannerIO.Read readTransform = SpannerIO.read() .withSpannerConfig(spannerConfig) - .withTable("users") + .withTable(TABLE_ID) .withColumns("id", "name") - .withTimestampBound(timestampBound)); - - FakeBatchTransactionId id = new FakeBatchTransactionId("runReadTest"); - when(mockBatchTx.getBatchTransactionId()).thenReturn(id); - - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound)) - .thenReturn(mockBatchTx); - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class))) - .thenReturn(mockBatchTx); + .withTimestampBound(TIMESTAMP_BOUND) + .withHighPriority(); + runNaiveReadTest(readTransform); + assertEquals(RpcPriority.HIGH, readTransform.getSpannerConfig().getRpcPriority().get()); + } - Partition fakePartition = - FakePartitionFactory.createFakeReadPartition(ByteString.copyFromUtf8("one")); + private void runNaiveReadTest(SpannerIO.Read readTransform) { + readTransform = readTransform.withBatching(false); - when(mockBatchTx.partitionRead( - any(PartitionOptions.class), - eq("users"), + PCollection results = pipeline.apply("read q", readTransform); + when(mockBatchTx.read( + eq(TABLE_ID), eq(KeySet.all()), eq(Arrays.asList("id", "name")), any(ReadQueryUpdateTransactionOption.class))) - .thenReturn(Arrays.asList(fakePartition)); - when(mockBatchTx.execute(any(Partition.class))) - .thenThrow( - SpannerExceptionFactory.newSpannerException( - ErrorCode.DEADLINE_EXCEEDED, "Simulated Timeout 1")); - try { - pipeline.run(); - } catch (PipelineExecutionException e) { - if (e.getCause() instanceof SpannerException - && ((SpannerException) e.getCause()).getErrorCode().getGrpcStatusCode() - == Code.DEADLINE_EXCEEDED) { - // expected - } else { - throw e; - } - } - verifyMetricWasSet("test", "aaa", "123", "deadline_exceeded", null, 1); - verifyMetricWasSet("test", "aaa", "123", "ok", null, 0); + .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS)); + + PAssert.that(results).containsInAnyOrder(FAKE_ROWS); + pipeline.run(); + verifyTableRequestMetricWasSet(readTransform.getSpannerConfig(), TABLE_ID, "ok", 1); } @Test - public void testReadMetricsSucceed() throws Exception { - Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp); - - SpannerConfig spannerConfig = getSpannerConfig(); + public void runNaiveReadTestWithFailures() { pipeline.apply( "read q", SpannerIO.read() .withSpannerConfig(spannerConfig) - .withTable("users") + .withTable(TABLE_ID) .withColumns("id", "name") - .withTimestampBound(timestampBound)); - - FakeBatchTransactionId id = new FakeBatchTransactionId("runReadTest"); - when(mockBatchTx.getBatchTransactionId()).thenReturn(id); + .withTimestampBound(TIMESTAMP_BOUND) + .withBatching(false)); - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound)) - .thenReturn(mockBatchTx); - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class))) - .thenReturn(mockBatchTx); - - Partition fakePartition = - FakePartitionFactory.createFakeReadPartition(ByteString.copyFromUtf8("one")); - - when(mockBatchTx.partitionRead( - any(PartitionOptions.class), - eq("users"), + when(mockBatchTx.read( + eq(TABLE_ID), eq(KeySet.all()), eq(Arrays.asList("id", "name")), any(ReadQueryUpdateTransactionOption.class))) - .thenReturn(Arrays.asList(fakePartition, fakePartition, fakePartition)); - when(mockBatchTx.execute(any(Partition.class))) - .thenReturn( - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)), - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)), - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))); - - pipeline.run(); - verifyMetricWasSet("test", "aaa", "123", "ok", null, 3); - } - - private void verifyMetricWasSet( - String projectId, - String databaseId, - String tableId, - String status, - @Nullable String queryName, - long count) { - // Verify the metric was reported. - HashMap labels = new HashMap<>(); - labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); - labels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner"); - labels.put(MonitoringInfoConstants.Labels.METHOD, "Read"); - labels.put( - MonitoringInfoConstants.Labels.RESOURCE, - GcpResourceIdentifiers.spannerTable(projectId, databaseId, tableId)); - labels.put(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, projectId); - labels.put(MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, databaseId); - labels.put(MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, tableId); - if (queryName != null) { - labels.put(MonitoringInfoConstants.Labels.SPANNER_QUERY_NAME, queryName); - } - labels.put(MonitoringInfoConstants.Labels.STATUS, status); + .thenThrow( + SpannerExceptionFactory.newSpannerException( + ErrorCode.PERMISSION_DENIED, "Simulated Failure")); - MonitoringInfoMetricName name = - MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, labels); - MetricsContainerImpl container = - (MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer(); - assertEquals(count, (long) container.getCounter(name).getCumulative()); + assertThrows( + "PERMISSION_DENIED: Simulated Failure", PipelineExecutionException.class, pipeline::run); + verifyTableRequestMetricWasSet(spannerConfig, TABLE_ID, "permission_denied", 1); } @Test - public void runReadUsingIndex() throws Exception { - Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp); - - SpannerConfig spannerConfig = getSpannerConfig(); - + public void runBatchReadUsingIndex() { PCollection one = pipeline.apply( "read q", SpannerIO.read() .withTimestamp(Timestamp.now()) .withSpannerConfig(spannerConfig) - .withTable("users") + .withTable(TABLE_ID) .withColumns("id", "name") .withIndex("theindex") - .withTimestampBound(timestampBound)); + .withTimestampBound(TIMESTAMP_BOUND)); - FakeBatchTransactionId id = new FakeBatchTransactionId("runReadUsingIndexTest"); - when(mockBatchTx.getBatchTransactionId()).thenReturn(id); - - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound)) - .thenReturn(mockBatchTx); - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class))) - .thenReturn(mockBatchTx); - - Partition fakePartition = - FakePartitionFactory.createFakeReadPartition(ByteString.copyFromUtf8("one")); - - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(id)).thenReturn(mockBatchTx); when(mockBatchTx.partitionReadUsingIndex( any(PartitionOptions.class), - eq("users"), + eq(TABLE_ID), eq("theindex"), eq(KeySet.all()), eq(Arrays.asList("id", "name")), @@ -572,98 +565,68 @@ public void runReadUsingIndex() throws Exception { ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))); PAssert.that(one).containsInAnyOrder(FAKE_ROWS); - pipeline.run(); + verifyTableRequestMetricWasSet(spannerConfig, TABLE_ID, "ok", 4); } @Test - public void readPipeline() throws Exception { - Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp); - - SpannerConfig spannerConfig = getSpannerConfig(); - - PCollection one = + public void runNaiveReadUsingIndex() { + PCollection results = pipeline.apply( "read q", SpannerIO.read() + .withTimestamp(Timestamp.now()) .withSpannerConfig(spannerConfig) - .withQuery("SELECT * FROM users") - .withTimestampBound(timestampBound)); - FakeBatchTransactionId txId = new FakeBatchTransactionId("readPipelineTest"); - when(mockBatchTx.getBatchTransactionId()).thenReturn(txId); - - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound)) - .thenReturn(mockBatchTx); - - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class))) - .thenReturn(mockBatchTx); + .withTable(TABLE_ID) + .withColumns("id", "name") + .withIndex("theindex") + .withTimestampBound(TIMESTAMP_BOUND) + .withBatching(false)); - Partition fakePartition = - FakePartitionFactory.createFakeQueryPartition(ByteString.copyFromUtf8("one")); - when(mockBatchTx.partitionQuery( - any(PartitionOptions.class), - eq(Statement.of("SELECT * FROM users")), + when(mockBatchTx.readUsingIndex( + eq(TABLE_ID), + eq("theindex"), + eq(KeySet.all()), + eq(Arrays.asList("id", "name")), any(ReadQueryUpdateTransactionOption.class))) - .thenReturn(Arrays.asList(fakePartition, fakePartition)); - - when(mockBatchTx.execute(any(Partition.class))) - .thenReturn( - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)), - ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 6))); - - PAssert.that(one).containsInAnyOrder(FAKE_ROWS); + .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS)); + PAssert.that(results).containsInAnyOrder(FAKE_ROWS); pipeline.run(); + verifyTableRequestMetricWasSet(spannerConfig, TABLE_ID, "ok", 1); } @Test - public void readAllPipeline() throws Exception { - Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp); - - SpannerConfig spannerConfig = getSpannerConfig(); - + public void readAllPipeline() { PCollectionView tx = pipeline.apply( "tx", SpannerIO.createTransaction() .withSpannerConfig(spannerConfig) - .withTimestampBound(timestampBound)); + .withTimestampBound(TIMESTAMP_BOUND)); PCollection reads = pipeline.apply( Create.of( - ReadOperation.create().withQuery("SELECT * FROM users"), - ReadOperation.create().withTable("users").withColumns("id", "name"))); + ReadOperation.create().withQuery(QUERY_STATEMENT).withQueryName(QUERY_NAME), + ReadOperation.create().withTable(TABLE_ID).withColumns("id", "name"))); - PCollection one = + PCollection results = reads.apply( "read all", SpannerIO.readAll().withSpannerConfig(spannerConfig).withTransaction(tx)); - BatchTransactionId txId = new FakeBatchTransactionId("tx"); - when(mockBatchTx.getBatchTransactionId()).thenReturn(txId); - - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound)) - .thenReturn(mockBatchTx); - - when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class))) - .thenReturn(mockBatchTx); - - Partition fakePartition = - FakePartitionFactory.createFakeReadPartition(ByteString.copyFromUtf8("partition")); when(mockBatchTx.partitionQuery( any(PartitionOptions.class), - eq(Statement.of("SELECT * FROM users")), + eq(Statement.of(QUERY_STATEMENT)), any(ReadQueryUpdateTransactionOption.class))) .thenReturn(Arrays.asList(fakePartition, fakePartition)); when(mockBatchTx.partitionRead( any(PartitionOptions.class), - eq("users"), + eq(TABLE_ID), eq(KeySet.all()), eq(Arrays.asList("id", "name")), any(ReadQueryUpdateTransactionOption.class))) - .thenReturn(Arrays.asList(fakePartition)); + .thenReturn(Collections.singletonList(fakePartition)); when(mockBatchTx.execute(any(Partition.class))) .thenReturn( @@ -671,8 +634,70 @@ public void readAllPipeline() throws Exception { ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)), ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))); - PAssert.that(one).containsInAnyOrder(FAKE_ROWS); - + PAssert.that(results).containsInAnyOrder(FAKE_ROWS); pipeline.run(); + verifyTableRequestMetricWasSet(spannerConfig, TABLE_ID, "ok", 2); + verifyQueryRequestMetricWasSet(spannerConfig, QUERY_NAME, "ok", 3); + } + + private void verifyTableRequestMetricWasSet( + SpannerConfig config, String table, String status, long count) { + + HashMap baseLabels = getBaseMetricsLabels(config); + baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "Read"); + baseLabels.put(MonitoringInfoConstants.Labels.TABLE_ID, table); + baseLabels.put( + MonitoringInfoConstants.Labels.RESOURCE, + GcpResourceIdentifiers.spannerTable( + baseLabels.get(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID), + config.getInstanceId().get(), + config.getDatabaseId().get(), + table)); + baseLabels.put(MonitoringInfoConstants.Labels.STATUS, status); + + MonitoringInfoMetricName name = + MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); + MetricsContainerImpl container = + (MetricsContainerImpl) MetricsEnvironment.getCurrentContainer(); + assertEquals(count, (long) container.getCounter(name).getCumulative()); + } + + private void verifyQueryRequestMetricWasSet( + SpannerConfig config, String queryName, String status, long count) { + + HashMap baseLabels = getBaseMetricsLabels(config); + baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "Read"); + baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_QUERY_NAME, queryName); + baseLabels.put( + MonitoringInfoConstants.Labels.RESOURCE, + GcpResourceIdentifiers.spannerQuery( + baseLabels.get(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID), + config.getInstanceId().get(), + config.getDatabaseId().get(), + queryName)); + baseLabels.put(MonitoringInfoConstants.Labels.STATUS, status); + + MonitoringInfoMetricName name = + MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); + MetricsContainerImpl container = + (MetricsContainerImpl) MetricsEnvironment.getCurrentContainer(); + assertEquals(count, (long) container.getCounter(name).getCumulative()); + } + + @NotNull + private HashMap getBaseMetricsLabels(SpannerConfig config) { + HashMap baseLabels = new HashMap<>(); + baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); + baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner"); + baseLabels.put( + MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, + config.getProjectId() == null || config.getProjectId().get() == null + ? SpannerOptions.getDefaultProjectId() + : config.getProjectId().get()); + baseLabels.put( + MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, config.getInstanceId().get()); + baseLabels.put( + MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, config.getDatabaseId().get()); + return baseLabels; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteExceptionHandlingTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteExceptionHandlingTest.java index 8c564ff0708f..2572e8053780 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteExceptionHandlingTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteExceptionHandlingTest.java @@ -135,7 +135,7 @@ public void setUp() throws Exception { @Test public void testExceptionHandlingForSimpleWrite() throws InterruptedException { - List mutationList = Arrays.asList(SpannerIOWriteTest.m((long) 1)); + List mutationList = Arrays.asList(SpannerIOWriteTest.buildUpsertMutation((long) 1)); // mock sleeper so that it does not actually sleep. SpannerIO.WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class); @@ -180,7 +180,9 @@ public void testExceptionHandlingForSimpleWrite() throws InterruptedException { @Test public void testExceptionHandlingForWriteGrouped() throws InterruptedException { List mutationList = - Arrays.asList(SpannerIOWriteTest.g(SpannerIOWriteTest.m((long) 1))); + Arrays.asList( + SpannerIOWriteTest.buildMutationGroup( + SpannerIOWriteTest.buildUpsertMutation((long) 1))); // mock sleeper so that it does not actually sleep. SpannerIO.WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java index d0e4e2e13eea..ecc6ba31ee4e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java @@ -49,6 +49,7 @@ import com.google.cloud.spanner.ReadOnlyTransaction; import com.google.cloud.spanner.ResultSets; import com.google.cloud.spanner.SpannerExceptionFactory; +import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.Type; @@ -109,6 +110,12 @@ public class SpannerIOWriteTest implements Serializable { private static final long CELLS_PER_KEY = 7; + private static final String TABLE_NAME = "test-table"; + private static final SpannerConfig SPANNER_CONFIG = + SpannerConfig.create() + .withDatabaseId("test-database") + .withInstanceId("test-instance") + .withProjectId("test-project"); @Rule public transient TestPipeline pipeline = TestPipeline.create(); @Rule public transient ExpectedException thrown = ExpectedException.none(); @@ -120,7 +127,6 @@ public class SpannerIOWriteTest implements Serializable { private FakeServiceFactory serviceFactory; @Before - @SuppressWarnings("unchecked") public void setUp() throws Exception { MockitoAnnotations.initMocks(this); serviceFactory = new FakeServiceFactory(); @@ -135,20 +141,23 @@ public void setUp() throws Exception { .thenReturn(null); // Simplest schema: a table with int64 key - preparePkMetadata(tx, Arrays.asList(pkMetadata("tEsT", "key", "ASC"))); - prepareColumnMetadata(tx, Arrays.asList(columnMetadata("tEsT", "key", "INT64", CELLS_PER_KEY))); + // Verify case-insensitivity of table names by using different case for teble name. + preparePkMetadata(tx, Arrays.asList(pkMetadata("tEsT-TaBlE", "key", "ASC"))); + prepareColumnMetadata( + tx, Arrays.asList(columnMetadata("tEsT-TaBlE", "key", "INT64", CELLS_PER_KEY))); preparePgColumnMetadata( - tx, Arrays.asList(columnMetadata("tEsT", "key", "bigint", CELLS_PER_KEY))); + tx, Arrays.asList(columnMetadata("tEsT-TaBlE", "key", "bigint", CELLS_PER_KEY))); // Setup the ProcessWideContainer for testing metrics are set. MetricsContainerImpl container = new MetricsContainerImpl(null); MetricsEnvironment.setProcessWideContainer(container); + MetricsEnvironment.setCurrentContainer(container); } private SpannerSchema getSchema() { return SpannerSchema.builder() - .addColumn("tEsT", "key", "INT64", CELLS_PER_KEY) - .addKeyPart("tEsT", "key", false) + .addColumn("tEsT-TaBlE", "key", "INT64", CELLS_PER_KEY) + .addKeyPart("tEsT-TaBlE", "key", false) .build(); } @@ -273,23 +282,19 @@ public void emptyDatabaseId() throws Exception { @Test public void singleMutationPipeline() throws Exception { - Mutation mutation = m(2L); + Mutation mutation = buildUpsertMutation(2L); PCollection mutations = pipeline.apply(Create.of(mutation)); mutations.apply( - SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") - .withServiceFactory(serviceFactory)); + SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory(serviceFactory)); pipeline.run(); - verifyBatches(batch(m(2L))); + verifyBatches(buildMutationBatch(buildUpsertMutation(2L))); } @Test public void singlePgMutationPipeline() throws Exception { - Mutation mutation = m(2L); + Mutation mutation = buildUpsertMutation(2L); PCollection mutations = pipeline.apply(Create.of(mutation)); PCollectionView pgDialectView = pipeline @@ -298,67 +303,81 @@ public void singlePgMutationPipeline() throws Exception { mutations.apply( SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .withDialectView(pgDialectView)); pipeline.run(); - verifyBatches(batch(m(2L))); + verifyBatches(buildMutationBatch(buildUpsertMutation(2L))); } @Test public void singleMutationPipelineNoProjectId() throws Exception { - Mutation mutation = m(2L); + Mutation mutation = buildUpsertMutation(2L); PCollection mutations = pipeline.apply(Create.of(mutation)); - mutations.apply( - SpannerIO.write() - .withInstanceId("test-instance") - .withDatabaseId("test-database") - .withServiceFactory(serviceFactory)); + SpannerConfig config = + SpannerConfig.create().withInstanceId("test-instance").withDatabaseId("test-database"); + mutations.apply(SpannerIO.write().withSpannerConfig(config).withServiceFactory(serviceFactory)); pipeline.run(); - verifyBatches(batch(m(2L))); + // don't use VerifyBatches as that uses the common SPANNER_CONFIG with project ID: + verify(serviceFactory.mockDatabaseClient(), times(1)) + .writeAtLeastOnceWithOptions( + mutationsInNoOrder(buildMutationBatch(buildUpsertMutation(2L))), + any(ReadQueryUpdateTransactionOption.class)); + + verifyTableWriteRequestMetricWasSet(config, TABLE_NAME, "ok", 1); } @Test public void singleMutationPipelineNullProjectId() throws Exception { - Mutation mutation = m(2L); + Mutation mutation = buildUpsertMutation(2L); PCollection mutations = pipeline.apply(Create.of(mutation)); - mutations.apply( - SpannerIO.write() + SpannerConfig config = + SpannerConfig.create() .withProjectId((String) null) .withInstanceId("test-instance") - .withDatabaseId("test-database") - .withServiceFactory(serviceFactory)); + .withDatabaseId("test-database"); + mutations.apply(SpannerIO.write().withSpannerConfig(config).withServiceFactory(serviceFactory)); pipeline.run(); - verifyBatches(batch(m(2L))); + // don't use VerifyBatches as that uses the common SPANNER_CONFIG with project ID: + verify(serviceFactory.mockDatabaseClient(), times(1)) + .writeAtLeastOnceWithOptions( + mutationsInNoOrder(buildMutationBatch(buildUpsertMutation(2L))), + any(ReadQueryUpdateTransactionOption.class)); + + verifyTableWriteRequestMetricWasSet(config, TABLE_NAME, "ok", 1); } @Test public void singleMutationGroupPipeline() throws Exception { PCollection mutations = - pipeline.apply(Create.of(g(m(1L), m(2L), m(3L)))); + pipeline.apply( + Create.of( + buildMutationGroup( + buildUpsertMutation(1L), buildUpsertMutation(2L), buildUpsertMutation(3L)))); mutations.apply( SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .grouped()); pipeline.run(); - verifyBatches(batch(m(1L), m(2L), m(3L))); + verifyBatches( + buildMutationBatch( + buildUpsertMutation(1L), buildUpsertMutation(2L), buildUpsertMutation(3L))); } @Test public void singlePgMutationGroupPipeline() throws Exception { PCollection mutations = - pipeline.apply(Create.of(g(m(1L), m(2L), m(3L)))); + pipeline.apply( + Create.of( + buildMutationGroup( + buildUpsertMutation(1L), buildUpsertMutation(2L), buildUpsertMutation(3L)))); PCollectionView pgDialectView = pipeline .apply("Create PG dialect", Create.of(Dialect.POSTGRESQL)) @@ -366,15 +385,31 @@ public void singlePgMutationGroupPipeline() throws Exception { mutations.apply( SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .withDialectView(pgDialectView) .grouped()); pipeline.run(); - verifyBatches(batch(m(1L), m(2L), m(3L))); + verifyBatches( + buildMutationBatch( + buildUpsertMutation(1L), buildUpsertMutation(2L), buildUpsertMutation(3L))); + } + + @Test + public void metricsForDifferentTables() throws Exception { + Mutation mutation = buildUpsertMutation(2L); + Mutation mutation2 = + Mutation.newInsertOrUpdateBuilder("other-table").set("key").to("3L").build(); + + PCollection mutations = pipeline.apply(Create.of(mutation, mutation2)); + + mutations.apply( + SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory(serviceFactory)); + pipeline.run(); + + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 1); + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, "other-table", "ok", 1); } private void verifyBatches(Iterable... batches) { @@ -383,6 +418,7 @@ private void verifyBatches(Iterable... batches) { .writeAtLeastOnceWithOptions( mutationsInNoOrder(b), any(ReadQueryUpdateTransactionOption.class)); } + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", batches.length); } @Test @@ -400,12 +436,14 @@ public void noBatching() throws Exception { .writeAtLeastOnceWithOptions(mutationBatchesCaptor.capture(), optionsCaptor.capture())) .thenReturn(null); - PCollection mutations = pipeline.apply(Create.of(g(m(1L)), g(m(2L)))); + PCollection mutations = + pipeline.apply( + Create.of( + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(2L)))); mutations.apply( SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(fakeServiceFactory) .withBatchSizeBytes(1) .grouped()); @@ -413,58 +451,61 @@ public void noBatching() throws Exception { verify(fakeServiceFactory.mockDatabaseClient(), times(1)) .writeAtLeastOnceWithOptions( - mutationsInNoOrder(batch(m(1L))), any(ReadQueryUpdateTransactionOption.class)); + mutationsInNoOrder(buildMutationBatch(buildUpsertMutation(1L))), + any(ReadQueryUpdateTransactionOption.class)); verify(fakeServiceFactory.mockDatabaseClient(), times(1)) .writeAtLeastOnceWithOptions( - mutationsInNoOrder(batch(m(2L))), any(ReadQueryUpdateTransactionOption.class)); + mutationsInNoOrder(buildMutationBatch(buildUpsertMutation(2L))), + any(ReadQueryUpdateTransactionOption.class)); // If no batching then the DB schema is never read. verify(tx, never()).executeQuery(any()); + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 2); } @Test public void streamingWrites() throws Exception { TestStream testStream = TestStream.create(SerializableCoder.of(Mutation.class)) - .addElements(m(1L), m(2L)) + .addElements(buildUpsertMutation(1L), buildUpsertMutation(2L)) .advanceProcessingTime(Duration.standardMinutes(1)) - .addElements(m(3L), m(4L)) + .addElements(buildUpsertMutation(3L), buildUpsertMutation(4L)) .advanceProcessingTime(Duration.standardMinutes(1)) - .addElements(m(5L), m(6L)) + .addElements(buildUpsertMutation(5L), buildUpsertMutation(6L)) .advanceWatermarkToInfinity(); pipeline .apply(testStream) .apply( - SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") - .withServiceFactory(serviceFactory)); + SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory(serviceFactory)); pipeline.run(); - verifyBatches(batch(m(1L), m(2L)), batch(m(3L), m(4L)), batch(m(5L), m(6L))); + verifyBatches( + buildMutationBatch(buildUpsertMutation(1L), buildUpsertMutation(2L)), + buildMutationBatch(buildUpsertMutation(3L), buildUpsertMutation(4L)), + buildMutationBatch(buildUpsertMutation(5L), buildUpsertMutation(6L))); } @Test public void streamingWritesWithPriority() throws Exception { TestStream testStream = TestStream.create(SerializableCoder.of(Mutation.class)) - .addElements(m(1L), m(2L)) + .addElements(buildUpsertMutation(1L), buildUpsertMutation(2L)) .advanceProcessingTime(Duration.standardMinutes(1)) - .addElements(m(3L), m(4L)) + .addElements(buildUpsertMutation(3L), buildUpsertMutation(4L)) .advanceProcessingTime(Duration.standardMinutes(1)) - .addElements(m(5L), m(6L)) + .addElements(buildUpsertMutation(5L), buildUpsertMutation(6L)) .advanceWatermarkToInfinity(); Write write = SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .withHighPriority(); pipeline.apply(testStream).apply(write); pipeline.run(); assertEquals(RpcPriority.HIGH, write.getSpannerConfig().getRpcPriority().get()); - verifyBatches(batch(m(1L), m(2L)), batch(m(3L), m(4L)), batch(m(5L), m(6L))); + verifyBatches( + buildMutationBatch(buildUpsertMutation(1L), buildUpsertMutation(2L)), + buildMutationBatch(buildUpsertMutation(3L), buildUpsertMutation(4L)), + buildMutationBatch(buildUpsertMutation(5L), buildUpsertMutation(6L))); } @Test @@ -473,22 +514,29 @@ public void streamingWritesWithGrouping() throws Exception { // verify that grouping/sorting occurs when set. TestStream testStream = TestStream.create(SerializableCoder.of(Mutation.class)) - .addElements(m(1L), m(5L), m(2L), m(4L), m(3L), m(6L)) + .addElements( + buildUpsertMutation(1L), + buildUpsertMutation(5L), + buildUpsertMutation(2L), + buildUpsertMutation(4L), + buildUpsertMutation(3L), + buildUpsertMutation(6L)) .advanceWatermarkToInfinity(); pipeline .apply(testStream) .apply( SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .withGroupingFactor(40) .withMaxNumRows(2)); pipeline.run(); // Output should be batches of sorted mutations. - verifyBatches(batch(m(1L), m(2L)), batch(m(3L), m(4L)), batch(m(5L), m(6L))); + verifyBatches( + buildMutationBatch(buildUpsertMutation(1L), buildUpsertMutation(2L)), + buildMutationBatch(buildUpsertMutation(3L), buildUpsertMutation(4L)), + buildMutationBatch(buildUpsertMutation(5L), buildUpsertMutation(6L))); } @Test @@ -497,14 +545,18 @@ public void streamingWritesWithGroupingWithPriority() throws Exception { // verify that grouping/sorting occurs when set. TestStream testStream = TestStream.create(SerializableCoder.of(Mutation.class)) - .addElements(m(1L), m(5L), m(2L), m(4L), m(3L), m(6L)) + .addElements( + buildUpsertMutation(1L), + buildUpsertMutation(5L), + buildUpsertMutation(2L), + buildUpsertMutation(4L), + buildUpsertMutation(3L), + buildUpsertMutation(6L)) .advanceWatermarkToInfinity(); Write write = SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .withGroupingFactor(40) .withMaxNumRows(2) @@ -514,7 +566,10 @@ public void streamingWritesWithGroupingWithPriority() throws Exception { assertEquals(RpcPriority.LOW, write.getSpannerConfig().getRpcPriority().get()); // Output should be batches of sorted mutations. - verifyBatches(batch(m(1L), m(2L)), batch(m(3L), m(4L)), batch(m(5L), m(6L))); + verifyBatches( + buildMutationBatch(buildUpsertMutation(1L), buildUpsertMutation(2L)), + buildMutationBatch(buildUpsertMutation(3L), buildUpsertMutation(4L)), + buildMutationBatch(buildUpsertMutation(5L), buildUpsertMutation(6L))); } @Test @@ -523,7 +578,13 @@ public void streamingWritesNoGrouping() throws Exception { // verify that grouping/sorting does not occur - batches should be created in received order. TestStream testStream = TestStream.create(SerializableCoder.of(Mutation.class)) - .addElements(m(1L), m(5L), m(2L), m(4L), m(3L), m(6L)) + .addElements( + buildUpsertMutation(1L), + buildUpsertMutation(5L), + buildUpsertMutation(2L), + buildUpsertMutation(4L), + buildUpsertMutation(3L), + buildUpsertMutation(6L)) .advanceWatermarkToInfinity(); // verify that grouping/sorting does not occur when notset. @@ -531,14 +592,15 @@ public void streamingWritesNoGrouping() throws Exception { .apply(testStream) .apply( SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .withMaxNumRows(2)); pipeline.run(); - verifyBatches(batch(m(1L), m(5L)), batch(m(2L), m(4L)), batch(m(3L), m(6L))); + verifyBatches( + buildMutationBatch(buildUpsertMutation(1L), buildUpsertMutation(5L)), + buildMutationBatch(buildUpsertMutation(2L), buildUpsertMutation(4L)), + buildMutationBatch(buildUpsertMutation(3L), buildUpsertMutation(6L))); } @Test @@ -546,7 +608,7 @@ public void reportFailures() throws Exception { MutationGroup[] mutationGroups = new MutationGroup[10]; for (int i = 0; i < mutationGroups.length; i++) { - mutationGroups[i] = g(m((long) i)); + mutationGroups[i] = buildMutationGroup(buildUpsertMutation((long) i)); } List mutationGroupList = Arrays.asList(mutationGroups); @@ -565,9 +627,7 @@ public void reportFailures() throws Exception { .apply(Create.of(mutationGroupList)) .apply( SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .withBatchSizeBytes(0) .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES) @@ -585,69 +645,14 @@ public void reportFailures() throws Exception { // (which as they are unbatched = each mutation group) then again for the individual retry. verify(serviceFactory.mockDatabaseClient(), times(20)) .writeAtLeastOnceWithOptions(any(), any(ReadQueryUpdateTransactionOption.class)); - } - @Test - public void testSpannerWriteMetricIsSet() { - Mutation mutation = m(2L); - PCollection mutations = pipeline.apply(Create.of(mutation)); - - // respond with 2 error codes and a success. - when(serviceFactory - .mockDatabaseClient() - .writeAtLeastOnceWithOptions(any(), any(ReadQueryUpdateTransactionOption.class))) - .thenThrow( - SpannerExceptionFactory.newSpannerException( - ErrorCode.DEADLINE_EXCEEDED, "Simulated Timeout 1")) - .thenThrow( - SpannerExceptionFactory.newSpannerException( - ErrorCode.DEADLINE_EXCEEDED, "Simulated Timeout 2")) - .thenReturn(new CommitResponse(Timestamp.now())); - - mutations.apply( - SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") - .withFailureMode(FailureMode.FAIL_FAST) - .withServiceFactory(serviceFactory)); - pipeline.run(); - - verifyMetricWasSet( - "test-project", "test-database", "test-instance", "Write", "deadline_exceeded", 2); - verifyMetricWasSet("test-project", "test-database", "test-instance", "Write", "ok", 1); - } - - private void verifyMetricWasSet( - String projectId, - String databaseId, - String tableId, - String method, - String status, - long count) { - // Verify the metric was reported. - HashMap labels = new HashMap<>(); - labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); - labels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner"); - labels.put(MonitoringInfoConstants.Labels.METHOD, method); - labels.put( - MonitoringInfoConstants.Labels.RESOURCE, - GcpResourceIdentifiers.spannerTable(projectId, databaseId, tableId)); - labels.put(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, projectId); - labels.put(MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, databaseId); - labels.put(MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, tableId); - labels.put(MonitoringInfoConstants.Labels.STATUS, status); - - MonitoringInfoMetricName name = - MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, labels); - MetricsContainerImpl container = - (MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer(); - assertEquals(count, (long) container.getCounter(name).getCumulative()); + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 0); + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "already_exists", 20); } @Test public void deadlineExceededRetries() throws InterruptedException { - List mutationList = Arrays.asList(m((long) 1)); + List mutationList = Arrays.asList(buildUpsertMutation((long) 1)); // mock sleeper so that it does not actually sleep. WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class); @@ -669,9 +674,7 @@ public void deadlineExceededRetries() throws InterruptedException { .apply(Create.of(mutationList)) .apply( SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .withBatchSizeBytes(0) .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES)); @@ -690,11 +693,14 @@ public void deadlineExceededRetries() throws InterruptedException { // 3 write attempts for the single mutationGroup. verify(serviceFactory.mockDatabaseClient(), times(3)) .writeAtLeastOnceWithOptions(any(), any(ReadQueryUpdateTransactionOption.class)); + + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 1); + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "deadline_exceeded", 2); } @Test public void deadlineExceededFailsAfterRetries() throws InterruptedException { - List mutationList = Arrays.asList(m((long) 1)); + List mutationList = Arrays.asList(buildUpsertMutation((long) 1)); // mock sleeper so that it does not actually sleep. WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class); @@ -712,9 +718,7 @@ public void deadlineExceededFailsAfterRetries() throws InterruptedException { .apply(Create.of(mutationList)) .apply( SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .withBatchSizeBytes(0) .withMaxCumulativeBackoff(Duration.standardHours(2)) @@ -746,11 +750,15 @@ public void deadlineExceededFailsAfterRetries() throws InterruptedException { // then 1 individual attempt + numSleeps/2 individual retries verify(serviceFactory.mockDatabaseClient(), times(numSleeps + 2)) .writeAtLeastOnceWithOptions(any(), any(ReadQueryUpdateTransactionOption.class)); + + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 0); + verifyTableWriteRequestMetricWasSet( + SPANNER_CONFIG, TABLE_NAME, "deadline_exceeded", numSleeps + 2); } @Test public void retryOnSchemaChangeException() throws InterruptedException { - List mutationList = Arrays.asList(m((long) 1)); + List mutationList = Arrays.asList(buildUpsertMutation((long) 1)); String errString = "Transaction aborted. " @@ -772,9 +780,7 @@ public void retryOnSchemaChangeException() throws InterruptedException { .apply(Create.of(mutationList)) .apply( SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .withBatchSizeBytes(0) .withFailureMode(FailureMode.FAIL_FAST)); @@ -793,11 +799,14 @@ public void retryOnSchemaChangeException() throws InterruptedException { // 3 write attempts for the single mutationGroup. verify(serviceFactory.mockDatabaseClient(), times(3)) .writeAtLeastOnceWithOptions(any(), any(ReadQueryUpdateTransactionOption.class)); + + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 1); + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "aborted", 2); } @Test public void retryMaxOnSchemaChangeException() throws InterruptedException { - List mutationList = Arrays.asList(m((long) 1)); + List mutationList = Arrays.asList(buildUpsertMutation((long) 1)); String errString = "Transaction aborted. " @@ -822,9 +831,7 @@ public void retryMaxOnSchemaChangeException() throws InterruptedException { .apply(Create.of(mutationList)) .apply( SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .withBatchSizeBytes(0) .withFailureMode(FailureMode.FAIL_FAST)); @@ -836,20 +843,21 @@ public void retryMaxOnSchemaChangeException() throws InterruptedException { assertEquals(1, Iterables.size(m)); return null; }); - try { - pipeline.run().waitUntilFinish(); - } finally { - // 0 calls to sleeper - verify(WriteToSpannerFn.sleeper, times(0)).sleep(anyLong()); - // 5 write attempts for the single mutationGroup. - verify(serviceFactory.mockDatabaseClient(), times(5)) - .writeAtLeastOnceWithOptions(any(), any(ReadQueryUpdateTransactionOption.class)); - } + pipeline.run().waitUntilFinish(); + + // 0 calls to sleeper + verify(WriteToSpannerFn.sleeper, times(0)).sleep(anyLong()); + // 5 write attempts for the single mutationGroup. + verify(serviceFactory.mockDatabaseClient(), times(5)) + .writeAtLeastOnceWithOptions(any(), any(ReadQueryUpdateTransactionOption.class)); + + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 0); + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "aborted", 5); } @Test public void retryOnAbortedAndDeadlineExceeded() throws InterruptedException { - List mutationList = Arrays.asList(m((long) 1)); + List mutationList = Arrays.asList(buildUpsertMutation((long) 1)); String errString = "Transaction aborted. " @@ -881,9 +889,7 @@ public void retryOnAbortedAndDeadlineExceeded() throws InterruptedException { .apply(Create.of(mutationList)) .apply( SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withServiceFactory(serviceFactory) .withBatchSizeBytes(0) .withFailureMode(FailureMode.FAIL_FAST)); @@ -895,22 +901,24 @@ public void retryOnAbortedAndDeadlineExceeded() throws InterruptedException { assertEquals(0, Iterables.size(m)); return null; }); - pipeline.run().waitUntilFinish(); + // 2 calls to sleeper verify(WriteToSpannerFn.sleeper, times(2)).sleep(anyLong()); // 8 write attempts for the single mutationGroup. verify(serviceFactory.mockDatabaseClient(), times(8)) .writeAtLeastOnceWithOptions(any(), any(ReadQueryUpdateTransactionOption.class)); + + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "ok", 1); + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "aborted", 5); + verifyTableWriteRequestMetricWasSet(SPANNER_CONFIG, TABLE_NAME, "deadline_exceeded", 2); } @Test public void displayDataWrite() throws Exception { SpannerIO.Write write = SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withBatchSizeBytes(123) .withMaxNumMutations(456) .withMaxNumRows(789) @@ -927,11 +935,7 @@ public void displayDataWrite() throws Exception { assertThat(data, hasDisplayItem("groupingFactor", "100")); // check for default grouping value - write = - SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database"); + write = SpannerIO.write().withSpannerConfig(SPANNER_CONFIG); data = DisplayData.from(write); assertThat(data.items(), hasSize(7)); @@ -942,9 +946,7 @@ public void displayDataWrite() throws Exception { public void displayDataWriteGrouped() throws Exception { SpannerIO.WriteGrouped writeGrouped = SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") + .withSpannerConfig(SPANNER_CONFIG) .withBatchSizeBytes(123) .withMaxNumMutations(456) .withMaxNumRows(789) @@ -962,12 +964,7 @@ public void displayDataWriteGrouped() throws Exception { assertThat(data, hasDisplayItem("groupingFactor", "100")); // check for default grouping value - writeGrouped = - SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") - .grouped(); + writeGrouped = SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).grouped(); data = DisplayData.from(writeGrouped); assertThat(data.items(), hasSize(7)); @@ -976,21 +973,25 @@ public void displayDataWriteGrouped() throws Exception { @Test public void testBatchableMutationFilterFn_cells() { - Mutation all = Mutation.delete("test", KeySet.all()); - Mutation prefix = Mutation.delete("test", KeySet.prefixRange(Key.of(1L))); + Mutation all = Mutation.delete(TABLE_NAME, KeySet.all()); + Mutation prefix = Mutation.delete(TABLE_NAME, KeySet.prefixRange(Key.of(1L))); Mutation range = Mutation.delete( - "test", KeySet.range(KeyRange.openOpen(Key.of(1L), Key.newBuilder().build()))); + TABLE_NAME, KeySet.range(KeyRange.openOpen(Key.of(1L), Key.newBuilder().build()))); MutationGroup[] mutationGroups = new MutationGroup[] { - g(m(1L)), - g(m(2L), m(3L)), - g(m(2L), m(3L), m(4L), m(5L)), // not batchable - too big. - g(del(1L)), - g(del(5L, 6L)), // not point delete. - g(all), - g(prefix), - g(range) + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(2L), buildUpsertMutation(3L)), + buildMutationGroup( + buildUpsertMutation(2L), + buildUpsertMutation(3L), + buildUpsertMutation(4L), + buildUpsertMutation(5L)), // not batchable - too big. + buildMutationGroup(buildDeleteMutation(1L)), + buildMutationGroup(buildDeleteMutation(5L, 6L)), // not point delete. + buildMutationGroup(all), + buildMutationGroup(prefix), + buildMutationGroup(range) }; BatchableMutationFilterFn testFn = @@ -1013,7 +1014,10 @@ public void testBatchableMutationFilterFn_cells() { // Verify captured batchable elements. assertThat( mutationGroupCaptor.getAllValues(), - containsInAnyOrder(g(m(1L)), g(m(2L), m(3L)), g(del(1L)))); + containsInAnyOrder( + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(2L), buildUpsertMutation(3L)), + buildMutationGroup(buildDeleteMutation(1L)))); // Verify captured unbatchable mutations Iterable unbatchableMutations = @@ -1021,33 +1025,41 @@ public void testBatchableMutationFilterFn_cells() { assertThat( unbatchableMutations, containsInAnyOrder( - g(m(2L), m(3L), m(4L), m(5L)), // not batchable - too big. - g(del(5L, 6L)), // not point delete. - g(all), - g(prefix), - g(range))); + buildMutationGroup( + buildUpsertMutation(2L), + buildUpsertMutation(3L), + buildUpsertMutation(4L), + buildUpsertMutation(5L)), // not batchable - too big. + buildMutationGroup(buildDeleteMutation(5L, 6L)), // not point delete. + buildMutationGroup(all), + buildMutationGroup(prefix), + buildMutationGroup(range))); } @Test public void testBatchableMutationFilterFn_size() { - Mutation all = Mutation.delete("test", KeySet.all()); - Mutation prefix = Mutation.delete("test", KeySet.prefixRange(Key.of(1L))); + Mutation all = Mutation.delete(TABLE_NAME, KeySet.all()); + Mutation prefix = Mutation.delete(TABLE_NAME, KeySet.prefixRange(Key.of(1L))); Mutation range = Mutation.delete( - "test", KeySet.range(KeyRange.openOpen(Key.of(1L), Key.newBuilder().build()))); + TABLE_NAME, KeySet.range(KeyRange.openOpen(Key.of(1L), Key.newBuilder().build()))); MutationGroup[] mutationGroups = new MutationGroup[] { - g(m(1L)), - g(m(2L), m(3L)), - g(m(1L), m(3L), m(4L), m(5L)), // not batchable - too big. - g(del(1L)), - g(del(5L, 6L)), // not point delete. - g(all), - g(prefix), - g(range) + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(2L), buildUpsertMutation(3L)), + buildMutationGroup( + buildUpsertMutation(1L), + buildUpsertMutation(3L), + buildUpsertMutation(4L), + buildUpsertMutation(5L)), // not batchable - too big. + buildMutationGroup(buildDeleteMutation(1L)), + buildMutationGroup(buildDeleteMutation(5L, 6L)), // not point delete. + buildMutationGroup(all), + buildMutationGroup(prefix), + buildMutationGroup(range) }; - long mutationSize = MutationSizeEstimator.sizeOf(m(1L)); + long mutationSize = MutationSizeEstimator.sizeOf(buildUpsertMutation(1L)); BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null, null, mutationSize * 3, 1000, 1000); @@ -1068,7 +1080,10 @@ public void testBatchableMutationFilterFn_size() { // Verify captured batchable elements. assertThat( mutationGroupCaptor.getAllValues(), - containsInAnyOrder(g(m(1L)), g(m(2L), m(3L)), g(del(1L)))); + containsInAnyOrder( + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(2L), buildUpsertMutation(3L)), + buildMutationGroup(buildDeleteMutation(1L)))); // Verify captured unbatchable mutations Iterable unbatchableMutations = @@ -1076,30 +1091,38 @@ public void testBatchableMutationFilterFn_size() { assertThat( unbatchableMutations, containsInAnyOrder( - g(m(1L), m(3L), m(4L), m(5L)), // not batchable - too big. - g(del(5L, 6L)), // not point delete. - g(all), - g(prefix), - g(range))); + buildMutationGroup( + buildUpsertMutation(1L), + buildUpsertMutation(3L), + buildUpsertMutation(4L), + buildUpsertMutation(5L)), // not batchable - too big. + buildMutationGroup(buildDeleteMutation(5L, 6L)), // not point delete. + buildMutationGroup(all), + buildMutationGroup(prefix), + buildMutationGroup(range))); } @Test public void testBatchableMutationFilterFn_rows() { - Mutation all = Mutation.delete("test", KeySet.all()); - Mutation prefix = Mutation.delete("test", KeySet.prefixRange(Key.of(1L))); + Mutation all = Mutation.delete(TABLE_NAME, KeySet.all()); + Mutation prefix = Mutation.delete(TABLE_NAME, KeySet.prefixRange(Key.of(1L))); Mutation range = Mutation.delete( - "test", KeySet.range(KeyRange.openOpen(Key.of(1L), Key.newBuilder().build()))); + TABLE_NAME, KeySet.range(KeyRange.openOpen(Key.of(1L), Key.newBuilder().build()))); MutationGroup[] mutationGroups = new MutationGroup[] { - g(m(1L)), - g(m(2L), m(3L)), - g(m(1L), m(3L), m(4L), m(5L)), // not batchable - too many rows. - g(del(1L)), - g(del(5L, 6L)), // not point delete. - g(all), - g(prefix), - g(range) + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(2L), buildUpsertMutation(3L)), + buildMutationGroup( + buildUpsertMutation(1L), + buildUpsertMutation(3L), + buildUpsertMutation(4L), + buildUpsertMutation(5L)), // not batchable - too many rows. + buildMutationGroup(buildDeleteMutation(1L)), + buildMutationGroup(buildDeleteMutation(5L, 6L)), // not point delete. + buildMutationGroup(all), + buildMutationGroup(prefix), + buildMutationGroup(range) }; BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null, null, 1000, 1000, 3); @@ -1121,7 +1144,10 @@ public void testBatchableMutationFilterFn_rows() { // Verify captured batchable elements. assertThat( mutationGroupCaptor.getAllValues(), - containsInAnyOrder(g(m(1L)), g(m(2L), m(3L)), g(del(1L)))); + containsInAnyOrder( + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(2L), buildUpsertMutation(3L)), + buildMutationGroup(buildDeleteMutation(1L)))); // Verify captured unbatchable mutations Iterable unbatchableMutations = @@ -1129,17 +1155,26 @@ public void testBatchableMutationFilterFn_rows() { assertThat( unbatchableMutations, containsInAnyOrder( - g(m(1L), m(3L), m(4L), m(5L)), // not batchable - too many rows. - g(del(5L, 6L)), // not point delete. - g(all), - g(prefix), - g(range))); + buildMutationGroup( + buildUpsertMutation(1L), + buildUpsertMutation(3L), + buildUpsertMutation(4L), + buildUpsertMutation(5L)), // not batchable - too many rows. + buildMutationGroup(buildDeleteMutation(5L, 6L)), // not point delete. + buildMutationGroup(all), + buildMutationGroup(prefix), + buildMutationGroup(range))); } @Test public void testBatchableMutationFilterFn_batchingDisabled() { MutationGroup[] mutationGroups = - new MutationGroup[] {g(m(1L)), g(m(2L)), g(del(1L)), g(del(5L, 6L))}; + new MutationGroup[] { + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(2L)), + buildMutationGroup(buildDeleteMutation(1L)), + buildMutationGroup(buildDeleteMutation(5L, 6L)) + }; BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null, null, 0, 0, 0); @@ -1194,18 +1229,18 @@ public void testGatherSortAndBatchFn() throws Exception { // each mutation is considered 7 cells, // should be sorted and output as 2 lists of 5, then 1 list of 2 // with mutations sorted in order. - g(m(4L)), - g(m(1L)), - g(m(7L)), - g(m(12L)), - g(m(10L)), - g(m(11L)), - g(m(2L)), - g(del(8L)), - g(m(3L)), - g(m(6L)), - g(m(9L)), - g(m(5L)) + buildMutationGroup(buildUpsertMutation(4L)), + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(7L)), + buildMutationGroup(buildUpsertMutation(12L)), + buildMutationGroup(buildUpsertMutation(10L)), + buildMutationGroup(buildUpsertMutation(11L)), + buildMutationGroup(buildUpsertMutation(2L)), + buildMutationGroup(buildDeleteMutation(8L)), + buildMutationGroup(buildUpsertMutation(3L)), + buildMutationGroup(buildUpsertMutation(6L)), + buildMutationGroup(buildUpsertMutation(9L)), + buildMutationGroup(buildUpsertMutation(5L)) }; // Process all elements as one bundle. @@ -1223,9 +1258,21 @@ public void testGatherSortAndBatchFn() throws Exception { assertThat( mutationGroupListCaptor.getAllValues(), contains( - Arrays.asList(g(m(1L)), g(m(2L)), g(m(3L)), g(m(4L)), g(m(5L))), - Arrays.asList(g(m(6L)), g(m(7L)), g(del(8L)), g(m(9L)), g(m(10L))), - Arrays.asList(g(m(11L)), g(m(12L))))); + Arrays.asList( + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(2L)), + buildMutationGroup(buildUpsertMutation(3L)), + buildMutationGroup(buildUpsertMutation(4L)), + buildMutationGroup(buildUpsertMutation(5L))), + Arrays.asList( + buildMutationGroup(buildUpsertMutation(6L)), + buildMutationGroup(buildUpsertMutation(7L)), + buildMutationGroup(buildDeleteMutation(8L)), + buildMutationGroup(buildUpsertMutation(9L)), + buildMutationGroup(buildUpsertMutation(10L))), + Arrays.asList( + buildMutationGroup(buildUpsertMutation(11L)), + buildMutationGroup(buildUpsertMutation(12L))))); } @Test @@ -1260,18 +1307,18 @@ public void testGatherBundleAndSortFn_flushOversizedBundle() throws Exception { // each mutation is considered 7 cells, // should be sorted and output as 2 lists of 5, then 1 list of 2 // with mutations sorted in order. - g(m(4L)), - g(m(1L)), - g(m(7L)), - g(m(9L)), - g(m(10L)), - g(m(11L)), + buildMutationGroup(buildUpsertMutation(4L)), + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(7L)), + buildMutationGroup(buildUpsertMutation(9L)), + buildMutationGroup(buildUpsertMutation(10L)), + buildMutationGroup(buildUpsertMutation(11L)), // end group - g(m(2L)), - g(del(8L)), // end batch - g(m(3L)), - g(m(6L)), // end batch - g(m(5L)) + buildMutationGroup(buildUpsertMutation(2L)), + buildMutationGroup(buildDeleteMutation(8L)), // end batch + buildMutationGroup(buildUpsertMutation(3L)), + buildMutationGroup(buildUpsertMutation(6L)), // end batch + buildMutationGroup(buildUpsertMutation(5L)) // end bundle, so end group and end batch. }; @@ -1292,14 +1339,34 @@ public void testGatherBundleAndSortFn_flushOversizedBundle() throws Exception { assertEquals(6, mgListGroups.size()); // verify contents of 6 sorted groups. // first group should be 1,3,4,7,9,11 - assertThat(mgListGroups.get(0), contains(g(m(1L)), g(m(4L)))); - assertThat(mgListGroups.get(1), contains(g(m(7L)), g(m(9L)))); - assertThat(mgListGroups.get(2), contains(g(m(10L)), g(m(11L)))); + assertThat( + mgListGroups.get(0), + contains( + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(4L)))); + assertThat( + mgListGroups.get(1), + contains( + buildMutationGroup(buildUpsertMutation(7L)), + buildMutationGroup(buildUpsertMutation(9L)))); + assertThat( + mgListGroups.get(2), + contains( + buildMutationGroup(buildUpsertMutation(10L)), + buildMutationGroup(buildUpsertMutation(11L)))); // second group at finishBundle should be 2,3,5,6,8 - assertThat(mgListGroups.get(3), contains(g(m(2L)), g(m(3L)))); - assertThat(mgListGroups.get(4), contains(g(m(5L)), g(m(6L)))); - assertThat(mgListGroups.get(5), contains(g(del(8L)))); + assertThat( + mgListGroups.get(3), + contains( + buildMutationGroup(buildUpsertMutation(2L)), + buildMutationGroup(buildUpsertMutation(3L)))); + assertThat( + mgListGroups.get(4), + contains( + buildMutationGroup(buildUpsertMutation(5L)), + buildMutationGroup(buildUpsertMutation(6L)))); + assertThat(mgListGroups.get(5), contains(buildMutationGroup(buildDeleteMutation(8L)))); } @Test @@ -1320,7 +1387,7 @@ public void testBatchFn_cells() throws Exception { @Test public void testBatchFn_size() throws Exception { - long mutationSize = MutationSizeEstimator.sizeOf(m(1L)); + long mutationSize = MutationSizeEstimator.sizeOf(buildUpsertMutation(1L)); // Setup class to bundle every 3 mutations by size) GatherSortCreateBatchesFn testFn = @@ -1363,13 +1430,18 @@ private void testAndVerifyBatches(GatherSortCreateBatchesFn testFn) throws Excep List mutationGroups = Arrays.asList( - g(m(1L)), - g(m(4L)), - g(m(5L), m(6L), m(7L), m(8L), m(9L)), - g(m(3L)), - g(m(10L)), - g(m(11L)), - g(m(2L))); + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(4L)), + buildMutationGroup( + buildUpsertMutation(5L), + buildUpsertMutation(6L), + buildUpsertMutation(7L), + buildUpsertMutation(8L), + buildUpsertMutation(9L)), + buildMutationGroup(buildUpsertMutation(3L)), + buildMutationGroup(buildUpsertMutation(10L)), + buildMutationGroup(buildUpsertMutation(11L)), + buildMutationGroup(buildUpsertMutation(2L))); // Process elements. for (MutationGroup m : mutationGroups) { @@ -1384,10 +1456,31 @@ private void testAndVerifyBatches(GatherSortCreateBatchesFn testFn) throws Excep assertEquals(4, batches.size()); // verify contents of 4 batches. - assertThat(batches.get(0), contains(g(m(1L)), g(m(2L)), g(m(3L)))); - assertThat(batches.get(1), contains(g(m(4L)))); // small batch : next mutation group is too big. - assertThat(batches.get(2), contains(g(m(5L), m(6L), m(7L), m(8L), m(9L)))); - assertThat(batches.get(3), contains(g(m(10L)), g(m(11L)))); + assertThat( + batches.get(0), + contains( + buildMutationGroup(buildUpsertMutation(1L)), + buildMutationGroup(buildUpsertMutation(2L)), + buildMutationGroup(buildUpsertMutation(3L)))); + assertThat( + batches.get(1), + contains( + buildMutationGroup( + buildUpsertMutation(4L)))); // small batch : next mutation group is too big. + assertThat( + batches.get(2), + contains( + buildMutationGroup( + buildUpsertMutation(5L), + buildUpsertMutation(6L), + buildUpsertMutation(7L), + buildUpsertMutation(8L), + buildUpsertMutation(9L)))); + assertThat( + batches.get(3), + contains( + buildMutationGroup(buildUpsertMutation(10L)), + buildMutationGroup(buildUpsertMutation(11L)))); } @Test @@ -1468,25 +1561,25 @@ public void testRefCountedSpannerAccessorDifferentDbsOnlyOnce() { verify(serviceFactory.mockSpanner(), times(2)).close(); } - static MutationGroup g(Mutation m, Mutation... other) { + static MutationGroup buildMutationGroup(Mutation m, Mutation... other) { return MutationGroup.create(m, other); } - static Mutation m(Long key) { - return Mutation.newInsertOrUpdateBuilder("test").set("key").to(key).build(); + static Mutation buildUpsertMutation(Long key) { + return Mutation.newInsertOrUpdateBuilder(TABLE_NAME).set("key").to(key).build(); } - private static Iterable batch(Mutation... m) { + private static Iterable buildMutationBatch(Mutation... m) { return Arrays.asList(m); } - private static Mutation del(Long... keys) { + private static Mutation buildDeleteMutation(Long... keys) { KeySet.Builder builder = KeySet.newBuilder(); for (Long key : keys) { builder.addKey(Key.of(key)); } - return Mutation.delete("test", builder.build()); + return Mutation.delete(TABLE_NAME, builder.build()); } private static Iterable mutationsInNoOrder(Iterable expected) { @@ -1509,4 +1602,42 @@ public String toString() { } }); } + + private void verifyTableWriteRequestMetricWasSet( + SpannerConfig config, String table, String status, long count) { + + HashMap baseLabels = getBaseMetricsLabels(config); + baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "Write"); + baseLabels.put(MonitoringInfoConstants.Labels.TABLE_ID, table); + baseLabels.put( + MonitoringInfoConstants.Labels.RESOURCE, + GcpResourceIdentifiers.spannerTable( + baseLabels.get(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID), + config.getInstanceId().get(), + config.getDatabaseId().get(), + table)); + baseLabels.put(MonitoringInfoConstants.Labels.STATUS, status); + + MonitoringInfoMetricName name = + MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); + MetricsContainerImpl container = + (MetricsContainerImpl) MetricsEnvironment.getCurrentContainer(); + assertEquals(count, (long) container.getCounter(name).getCumulative()); + } + + private HashMap getBaseMetricsLabels(SpannerConfig config) { + HashMap baseLabels = new HashMap<>(); + baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); + baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner"); + baseLabels.put( + MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, + config.getProjectId() == null || config.getProjectId().get() == null + ? SpannerOptions.getDefaultProjectId() + : config.getProjectId().get()); + baseLabels.put( + MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, config.getInstanceId().get()); + baseLabels.put( + MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, config.getDatabaseId().get()); + return baseLabels; + } }