Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14121] Fix SpannerIO service call metrics and improve tests. #17335

Merged
merged 1 commit into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ public static String datastoreResource(String projectId, String namespace) {
"//bigtable.googleapis.com/projects/%s/namespaces/%s", projectId, namespace);
}

public static String spannerTable(String projectId, String databaseId, String tableId) {
public static String spannerTable(
String projectId, String instanceId, String databaseId, String tableId) {
return String.format(
"//spanner.googleapis.com/projects/%s/topics/%s/tables/%s", projectId, databaseId, tableId);
"//spanner.googleapis.com/projects/%s/instances/%s/databases/%s/tables/%s",
projectId, instanceId, databaseId, tableId);
}

public static String spannerQuery(String projectId, String queryName) {
return String.format("//spanner.googleapis.com/projects/%s/queries/%s", projectId, queryName);
public static String spannerQuery(
String projectId, String instanceId, String databaseId, String queryName) {
return String.format(
"//spanner.googleapis.com/projects/%s/instances/%s/databases/%s/queries/%s",
projectId, instanceId, databaseId, queryName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,27 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import java.util.HashMap;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.ReadAll;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -69,6 +70,21 @@ public static BatchSpannerRead create(

abstract TimestampBound getTimestampBound();

/**
* Container class to combine a ReadOperation with a Partition so that Metrics are implemented
* properly.
*/
@AutoValue
protected abstract static class PartitionedReadOperation implements Serializable {
abstract ReadOperation getReadOperation();

abstract Partition getPartition();

static PartitionedReadOperation create(ReadOperation readOperation, Partition partition) {
return new AutoValue_BatchSpannerRead_PartitionedReadOperation(readOperation, partition);
}
}

@Override
public PCollection<Struct> expand(PCollection<ReadOperation> input) {
PCollectionView<Transaction> txView = getTxView();
Expand All @@ -84,14 +100,14 @@ public PCollection<Struct> expand(PCollection<ReadOperation> input) {
.apply(
"Generate Partitions",
ParDo.of(new GeneratePartitionsFn(getSpannerConfig(), txView)).withSideInputs(txView))
.apply("Shuffle partitions", Reshuffle.<Partition>viaRandomKey())
.apply("Shuffle partitions", Reshuffle.viaRandomKey())
.apply(
"Read from Partitions",
ParDo.of(new ReadFromPartitionFn(getSpannerConfig(), txView)).withSideInputs(txView));
}

@VisibleForTesting
static class GeneratePartitionsFn extends DoFn<ReadOperation, Partition> {
static class GeneratePartitionsFn extends DoFn<ReadOperation, PartitionedReadOperation> {

private final SpannerConfig config;
private final PCollectionView<? extends Transaction> txView;
Expand All @@ -102,6 +118,8 @@ public GeneratePartitionsFn(
SpannerConfig config, PCollectionView<? extends Transaction> txView) {
this.config = config;
this.txView = txView;
Preconditions.checkNotNull(config.getRpcPriority());
Preconditions.checkNotNull(config.getRpcPriority().get());
}

@Setup
Expand All @@ -117,75 +135,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
BatchReadOnlyTransaction context =
BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
for (Partition p : execute(c.element(), context)) {
c.output(p);
}
}

private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction tx) {
if (config.getRpcPriority() != null && config.getRpcPriority().get() != null) {
return executeWithPriority(op, tx, config.getRpcPriority().get());
} else {
return executeWithoutPriority(op, tx);
}
}

private List<Partition> executeWithoutPriority(ReadOperation op, BatchReadOnlyTransaction tx) {
// Query was selected.
if (op.getQuery() != null) {
return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
}
// Read with index was selected.
if (op.getIndex() != null) {
return tx.partitionReadUsingIndex(
op.getPartitionOptions(),
op.getTable(),
op.getIndex(),
op.getKeySet(),
op.getColumns());
}
// Read from table was selected.
return tx.partitionRead(
op.getPartitionOptions(), op.getTable(), op.getKeySet(), op.getColumns());
}

private List<Partition> executeWithPriority(
ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority rpcPriority) {
// Query was selected.
if (op.getQuery() != null) {
return tx.partitionQuery(
op.getPartitionOptions(), op.getQuery(), Options.priority(rpcPriority));
ReadOperation op = c.element();

// While this creates a ServiceCallMetric for every input element, in reality, the number
// of input elements will either be very few (normally 1!), or they will differ and
// need different metrics.
ServiceCallMetric metric = ReadAll.buildServiceCallMetricForReadOp(config, op);

List<Partition> partitions;
try {
nielm marked this conversation as resolved.
Show resolved Hide resolved
if (op.getQuery() != null) {
// Query was selected.
partitions =
batchTx.partitionQuery(
op.getPartitionOptions(),
op.getQuery(),
Options.priority(config.getRpcPriority().get()));
} else if (op.getIndex() != null) {
// Read with index was selected.
partitions =
batchTx.partitionReadUsingIndex(
op.getPartitionOptions(),
op.getTable(),
op.getIndex(),
op.getKeySet(),
op.getColumns(),
Options.priority(config.getRpcPriority().get()));
nielm marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Read from table was selected.
partitions =
batchTx.partitionRead(
op.getPartitionOptions(),
op.getTable(),
op.getKeySet(),
op.getColumns(),
Options.priority(config.getRpcPriority().get()));
nielm marked this conversation as resolved.
Show resolved Hide resolved
}
metric.call("ok");
} catch (SpannerException e) {
metric.call(e.getErrorCode().getGrpcStatusCode().toString());
throw e;
}
// Read with index was selected.
if (op.getIndex() != null) {
return tx.partitionReadUsingIndex(
op.getPartitionOptions(),
op.getTable(),
op.getIndex(),
op.getKeySet(),
op.getColumns(),
Options.priority(rpcPriority));
for (Partition p : partitions) {
c.output(PartitionedReadOperation.create(op, p));
}
// Read from table was selected.
return tx.partitionRead(
op.getPartitionOptions(),
op.getTable(),
op.getKeySet(),
op.getColumns(),
Options.priority(rpcPriority));
}
}

private static class ReadFromPartitionFn extends DoFn<Partition, Struct> {
private static class ReadFromPartitionFn extends DoFn<PartitionedReadOperation, Struct> {

private final SpannerConfig config;
private final PCollectionView<? extends Transaction> txView;

private transient SpannerAccessor spannerAccessor;
private transient String projectId;
private transient ServiceCallMetric serviceCallMetric;
private transient LoadingCache<ReadOperation, ServiceCallMetric> metricsForReadOperation;

public ReadFromPartitionFn(
SpannerConfig config, PCollectionView<? extends Transaction> txView) {
Expand All @@ -196,24 +201,28 @@ public ReadFromPartitionFn(
@Setup
public void setup() throws Exception {
spannerAccessor = SpannerAccessor.getOrCreate(config);
projectId =
this.config.getProjectId() == null
|| this.config.getProjectId().get() == null
|| this.config.getProjectId().get().isEmpty()
? SpannerOptions.getDefaultProjectId()
: this.config.getProjectId().get();

// Use a LoadingCache for metrics as there can be different read operations which result in
// different service call metrics labels. ServiceCallMetric items are created on-demand and
// added to the cache.
metricsForReadOperation =
CacheBuilder.newBuilder()
.maximumSize(SpannerIO.METRICS_CACHE_SIZE)
// worker.
.build(
new CacheLoader<ReadOperation, ServiceCallMetric>() {
@Override
public ServiceCallMetric load(ReadOperation op) {
return ReadAll.buildServiceCallMetricForReadOp(config, op);
}
});
}

@Teardown
public void teardown() throws Exception {
spannerAccessor.close();
}

@StartBundle
public void startBundle() throws Exception {
serviceCallMetric =
createServiceCallMetric(
projectId, this.config.getDatabaseId().get(), this.config.getInstanceId().get());
metricsForReadOperation.invalidateAll();
metricsForReadOperation.cleanUp();
}

@ProcessElement
Expand All @@ -223,8 +232,9 @@ public void processElement(ProcessContext c) throws Exception {
BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());

Partition p = c.element();
try (ResultSet resultSet = batchTx.execute(p)) {
PartitionedReadOperation op = c.element();
ServiceCallMetric serviceCallMetric = metricsForReadOperation.get(op.getReadOperation());
try (ResultSet resultSet = batchTx.execute(op.getPartition())) {
while (resultSet.next()) {
Struct s = resultSet.getCurrentRowAsStruct();
c.output(s);
Expand All @@ -236,22 +246,5 @@ public void processElement(ProcessContext c) throws Exception {
}
serviceCallMetric.call("ok");
}

private ServiceCallMetric createServiceCallMetric(
String projectId, String databaseId, String tableId) {
HashMap<String, String> baseLabels = new HashMap<>();
baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner");
baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "Read");
baseLabels.put(
MonitoringInfoConstants.Labels.RESOURCE,
GcpResourceIdentifiers.spannerTable(projectId, databaseId, tableId));
baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, projectId);
baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, databaseId);
baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, tableId);
ServiceCallMetric serviceCallMetric =
new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
return serviceCallMetric;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -97,37 +99,26 @@ public void teardown() throws Exception {
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
ReadOperation op = c.element();
ServiceCallMetric serviceCallMetric =
SpannerIO.ReadAll.buildServiceCallMetricForReadOp(config, op);
BatchReadOnlyTransaction context =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
try (ResultSet resultSet = execute(op, context)) {
while (resultSet.next()) {
c.output(resultSet.getCurrentRowAsStruct());
}
} catch (SpannerException e) {
serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString());
throw (e);
}
serviceCallMetric.call("ok");
}

private ResultSet execute(ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) {
RpcPriority rpcPriority = SpannerConfig.DEFAULT_RPC_PRIORITY;
if (config.getRpcPriority() != null && config.getRpcPriority().get() != null) {
return executeWithPriority(op, readOnlyTransaction, config.getRpcPriority().get());
} else {
return executeWithoutPriority(op, readOnlyTransaction);
rpcPriority = config.getRpcPriority().get();
}
}

private ResultSet executeWithoutPriority(
ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) {
if (op.getQuery() != null) {
return readOnlyTransaction.executeQuery(op.getQuery());
}
if (op.getIndex() != null) {
return readOnlyTransaction.readUsingIndex(
op.getTable(), op.getIndex(), op.getKeySet(), op.getColumns());
}
return readOnlyTransaction.read(op.getTable(), op.getKeySet(), op.getColumns());
}

private ResultSet executeWithPriority(
ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction, RpcPriority rpcPriority) {
if (op.getQuery() != null) {
return readOnlyTransaction.executeQuery(op.getQuery(), Options.priority(rpcPriority));
}
Expand Down
Loading