Skip to content

Commit

Permalink
Report Lineage metrics for SpannerIO (apache#32561)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn authored and reeba212 committed Dec 4, 2024
1 parent 6db2567 commit 9e522e8
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ public static Lineage getSinks() {
*
* <p>Specifically, If there are reserved chars (colon, whitespace, dot), escape with backtick. If
* the segment is already wrapped, return the original.
*
* <p>This helper method is for internal and testing usage only.
*/
private static String wrapSegment(String value) {
@Internal
public static String wrapSegment(String value) {
if (value.startsWith("`") && value.endsWith("`")) {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import com.google.cloud.spanner.TimestampBound;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.ReadAll;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand All @@ -42,6 +44,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -197,6 +200,10 @@ private static class ReadFromPartitionFn extends DoFn<PartitionedReadOperation,
private transient SpannerAccessor spannerAccessor;
private transient LoadingCache<ReadOperation, ServiceCallMetric> metricsForReadOperation;

// resolved at runtime for metrics report purpose. SpannerConfig may not have projectId set.
private transient String projectId;
private transient @Nullable String reportedLineage;

public ReadFromPartitionFn(
SpannerConfig config, PCollectionView<? extends Transaction> txView) {
this.config = config;
Expand All @@ -221,6 +228,7 @@ public ServiceCallMetric load(ReadOperation op) {
return ReadAll.buildServiceCallMetricForReadOp(config, op);
}
});
projectId = SpannerIO.resolveSpannerProjectId(config);
}

@Teardown
Expand Down Expand Up @@ -251,6 +259,22 @@ public void processElement(ProcessContext c) throws Exception {
throw (e);
}
serviceCallMetric.call("ok");
// Report Lineage metrics
@Nullable String tableName = op.getReadOperation().tryGetTableName();
if (!Objects.equals(reportedLineage, tableName)) {
ImmutableList.Builder<String> segments =
ImmutableList.<String>builder()
.add(
projectId,
spannerAccessor.getInstanceConfigId(),
config.getInstanceId().get(),
config.getDatabaseId().get());
if (tableName != null) {
segments.add(tableName);
}
Lineage.getSources().add("spanner", segments.build());
reportedLineage = tableName;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import java.util.Objects;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -84,6 +87,10 @@ private static class NaiveSpannerReadFn extends DoFn<ReadOperation, Struct> {
private final @Nullable PCollectionView<Transaction> txView;
private transient SpannerAccessor spannerAccessor;

// resolved at runtime for metrics report purpose. SpannerConfig may not have projectId set.
private transient String projectId;
private transient @Nullable String reportedLineage;

NaiveSpannerReadFn(SpannerConfig config, @Nullable PCollectionView<Transaction> transaction) {
this.config = config;
this.txView = transaction;
Expand All @@ -92,6 +99,7 @@ private static class NaiveSpannerReadFn extends DoFn<ReadOperation, Struct> {
@Setup
public void setup() throws Exception {
spannerAccessor = SpannerAccessor.getOrCreate(config);
projectId = SpannerIO.resolveSpannerProjectId(config);
}

@Teardown
Expand All @@ -113,10 +121,26 @@ public void processElement(ProcessContext c) throws Exception {
}
} catch (SpannerException e) {
serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString());
LOG.error("Error while reading operation: " + op.toString(), e);
LOG.error("Error while reading operation: " + op, e);
throw (e);
}
serviceCallMetric.call("ok");
// Report Lineage metrics
@Nullable String tableName = op.tryGetTableName();
if (!Objects.equals(reportedLineage, tableName)) {
ImmutableList.Builder<String> segments =
ImmutableList.<String>builder()
.add(
projectId,
spannerAccessor.getInstanceConfigId(),
config.getInstanceId().get(),
config.getDatabaseId().get());
if (tableName != null) {
segments.add(tableName);
}
Lineage.getSources().add("spanner", segments.build());
reportedLineage = tableName;
}
}

private ResultSet execute(ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Encapsulates a spanner read operation. */
Expand Down Expand Up @@ -116,4 +119,28 @@ public ReadOperation withIndex(String index) {
public ReadOperation withPartitionOptions(PartitionOptions partitionOptions) {
return toBuilder().setPartitionOptions(partitionOptions).build();
}

private static final Pattern queryPattern =
Pattern.compile(
"SELECT\\s+.+FROM\\s+\\[?(?<table>[^\\s\\[\\]]+)\\]?", Pattern.CASE_INSENSITIVE);
/**
* Get table name associated with this operation.
*
* <p>Currently only supports explicitly set table, and limited cases of set query. Return null
* for unsupported cases.
*/
@Nullable
String tryGetTableName() {
if (!Strings.isNullOrEmpty(getTable())) {
return getTable();
} else if (getQuery() != null) {
String query = getQuery().getSql();
System.err.println(query);
Matcher matcher = queryPattern.matcher(query);
if (matcher.find()) {
return matcher.group("table");
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,22 @@ public class SpannerAccessor implements AutoCloseable {
private final BatchClient batchClient;
private final DatabaseAdminClient databaseAdminClient;
private final SpannerConfig spannerConfig;
private final String instanceConfigId;
private int refcount = 0;

private SpannerAccessor(
Spanner spanner,
DatabaseClient databaseClient,
DatabaseAdminClient databaseAdminClient,
BatchClient batchClient,
SpannerConfig spannerConfig) {
SpannerConfig spannerConfig,
String instanceConfigId) {
this.spanner = spanner;
this.databaseClient = databaseClient;
this.databaseAdminClient = databaseAdminClient;
this.batchClient = batchClient;
this.spannerConfig = spannerConfig;
this.instanceConfigId = instanceConfigId;
}

public static SpannerAccessor getOrCreate(SpannerConfig spannerConfig) {
Expand Down Expand Up @@ -246,9 +249,24 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) {
BatchClient batchClient =
spanner.getBatchClient(DatabaseId.of(options.getProjectId(), instanceId, databaseId));
DatabaseAdminClient databaseAdminClient = spanner.getDatabaseAdminClient();
String instanceConfigId = "unknown";
try {
instanceConfigId =
spanner
.getInstanceAdminClient()
.getInstance(instanceId)
.getInstanceConfigId()
.getInstanceConfig();
} catch (Exception e) {
// fetch instanceConfigId is fail-free.
// Do not emit warning when serviceFactory is overridden (e.g. in tests).
if (spannerConfig.getServiceFactory() == null) {
LOG.warn("unable to get Spanner instanceConfigId for {}: {}", instanceId, e.getMessage());
}
}

return new SpannerAccessor(
spanner, databaseClient, databaseAdminClient, batchClient, spannerConfig);
spanner, databaseClient, databaseAdminClient, batchClient, spannerConfig, instanceConfigId);
}

public DatabaseClient getDatabaseClient() {
Expand All @@ -263,6 +281,10 @@ public DatabaseAdminClient getDatabaseAdminClient() {
return databaseAdminClient;
}

public String getInstanceConfigId() {
return instanceConfigId;
}

@Override
public void close() {
// Only close Spanner when present in map and refcount == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
Expand Down Expand Up @@ -2177,7 +2178,8 @@ static class WriteToSpannerFn extends DoFn<Iterable<MutationGroup>, Void> {

// SpannerAccessor can not be serialized so must be initialized at runtime in setup().
private transient SpannerAccessor spannerAccessor;

// resolved at runtime for metrics report purpose. SpannerConfig may not have projectId set.
private transient String projectId;
/* Number of times an aborted write to spanner could be retried */
private static final int ABORTED_RETRY_ATTEMPTS = 5;
/* Error string in Aborted exception during schema change */
Expand Down Expand Up @@ -2250,6 +2252,8 @@ public ServiceCallMetric load(String tableName) {
return buildWriteServiceCallMetric(spannerConfig, tableName);
}
});

projectId = resolveSpannerProjectId(spannerConfig);
}

@Teardown
Expand Down Expand Up @@ -2302,15 +2306,29 @@ public void processElement(ProcessContext c) throws Exception {
to retry silently. These must not be counted against retry backoff.
*/
private void spannerWriteWithRetryIfSchemaChange(List<Mutation> batch) throws SpannerException {
Set<String> tableNames = batch.stream().map(Mutation::getTable).collect(Collectors.toSet());
for (int retry = 1; ; retry++) {
try {
spannerAccessor
.getDatabaseClient()
.writeAtLeastOnceWithOptions(batch, getTransactionOptions());
reportServiceCallMetricsForBatch(batch, "ok");
// Get names of all tables in batch of mutations.
reportServiceCallMetricsForBatch(tableNames, "ok");
for (String tableName : tableNames) {
Lineage.getSinks()
.add(
"spanner",
ImmutableList.of(
projectId,
spannerAccessor.getInstanceConfigId(),
spannerConfig.getInstanceId().get(),
spannerConfig.getDatabaseId().get(),
tableName));
}
return;
} catch (AbortedException e) {
reportServiceCallMetricsForBatch(batch, e.getErrorCode().getGrpcStatusCode().toString());
reportServiceCallMetricsForBatch(
tableNames, e.getErrorCode().getGrpcStatusCode().toString());
if (retry >= ABORTED_RETRY_ATTEMPTS) {
throw e;
}
Expand All @@ -2321,7 +2339,8 @@ private void spannerWriteWithRetryIfSchemaChange(List<Mutation> batch) throws Sp
}
throw e;
} catch (SpannerException e) {
reportServiceCallMetricsForBatch(batch, e.getErrorCode().getGrpcStatusCode().toString());
reportServiceCallMetricsForBatch(
tableNames, e.getErrorCode().getGrpcStatusCode().toString());
throw e;
}
}
Expand All @@ -2342,9 +2361,7 @@ private Options.TransactionOption[] getTransactionOptions() {
.toArray(Options.TransactionOption[]::new);
}

private void reportServiceCallMetricsForBatch(List<Mutation> batch, String statusCode) {
// Get names of all tables in batch of mutations.
Set<String> tableNames = batch.stream().map(Mutation::getTable).collect(Collectors.toSet());
private void reportServiceCallMetricsForBatch(Set<String> tableNames, String statusCode) {
for (String tableName : tableNames) {
writeMetricsByTableName.getUnchecked(tableName).call(statusCode);
}
Expand Down Expand Up @@ -2424,16 +2441,19 @@ private static HashMap<String, String> buildServiceCallMetricLabels(SpannerConfi
baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner");
baseLabels.put(
MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID,
config.getProjectId() == null
|| config.getProjectId().get() == null
|| config.getProjectId().get().isEmpty()
? SpannerOptions.getDefaultProjectId()
: config.getProjectId().get());
MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, resolveSpannerProjectId(config));
baseLabels.put(
MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, config.getInstanceId().get());
baseLabels.put(
MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, config.getDatabaseId().get());
return baseLabels;
}

static String resolveSpannerProjectId(SpannerConfig config) {
return config.getProjectId() == null
|| config.getProjectId().get() == null
|| config.getProjectId().get().isEmpty()
? SpannerOptions.getDefaultProjectId()
: config.getProjectId().get();
}
}
Loading

0 comments on commit 9e522e8

Please sign in to comment.