diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
index 65ec12af0580..369b38158f22 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
@@ -55,8 +55,11 @@ public static Lineage getSinks() {
*
*
Specifically, If there are reserved chars (colon, whitespace, dot), escape with backtick. If
* the segment is already wrapped, return the original.
+ *
+ *
This helper method is for internal and testing usage only.
*/
- private static String wrapSegment(String value) {
+ @Internal
+ public static String wrapSegment(String value) {
if (value.startsWith("`") && value.endsWith("`")) {
return value;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java
index 1e25d1e72abb..42022fca9658 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java
@@ -29,9 +29,11 @@
import com.google.cloud.spanner.TimestampBound;
import java.io.Serializable;
import java.util.List;
+import java.util.Objects;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.ReadAll;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -42,6 +44,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -197,6 +200,10 @@ private static class ReadFromPartitionFn extends DoFn metricsForReadOperation;
+ // resolved at runtime for metrics report purpose. SpannerConfig may not have projectId set.
+ private transient String projectId;
+ private transient @Nullable String reportedLineage;
+
public ReadFromPartitionFn(
SpannerConfig config, PCollectionView extends Transaction> txView) {
this.config = config;
@@ -221,6 +228,7 @@ public ServiceCallMetric load(ReadOperation op) {
return ReadAll.buildServiceCallMetricForReadOp(config, op);
}
});
+ projectId = SpannerIO.resolveSpannerProjectId(config);
}
@Teardown
@@ -251,6 +259,22 @@ public void processElement(ProcessContext c) throws Exception {
throw (e);
}
serviceCallMetric.call("ok");
+ // Report Lineage metrics
+ @Nullable String tableName = op.getReadOperation().tryGetTableName();
+ if (!Objects.equals(reportedLineage, tableName)) {
+ ImmutableList.Builder segments =
+ ImmutableList.builder()
+ .add(
+ projectId,
+ spannerAccessor.getInstanceConfigId(),
+ config.getInstanceId().get(),
+ config.getDatabaseId().get());
+ if (tableName != null) {
+ segments.add(tableName);
+ }
+ Lineage.getSources().add("spanner", segments.build());
+ reportedLineage = tableName;
+ }
}
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java
index 2cc07b888983..3d818bd30df5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java
@@ -25,14 +25,17 @@
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
+import java.util.Objects;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,6 +87,10 @@ private static class NaiveSpannerReadFn extends DoFn {
private final @Nullable PCollectionView txView;
private transient SpannerAccessor spannerAccessor;
+ // resolved at runtime for metrics report purpose. SpannerConfig may not have projectId set.
+ private transient String projectId;
+ private transient @Nullable String reportedLineage;
+
NaiveSpannerReadFn(SpannerConfig config, @Nullable PCollectionView transaction) {
this.config = config;
this.txView = transaction;
@@ -92,6 +99,7 @@ private static class NaiveSpannerReadFn extends DoFn {
@Setup
public void setup() throws Exception {
spannerAccessor = SpannerAccessor.getOrCreate(config);
+ projectId = SpannerIO.resolveSpannerProjectId(config);
}
@Teardown
@@ -113,10 +121,26 @@ public void processElement(ProcessContext c) throws Exception {
}
} catch (SpannerException e) {
serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString());
- LOG.error("Error while reading operation: " + op.toString(), e);
+ LOG.error("Error while reading operation: " + op, e);
throw (e);
}
serviceCallMetric.call("ok");
+ // Report Lineage metrics
+ @Nullable String tableName = op.tryGetTableName();
+ if (!Objects.equals(reportedLineage, tableName)) {
+ ImmutableList.Builder segments =
+ ImmutableList.builder()
+ .add(
+ projectId,
+ spannerAccessor.getInstanceConfigId(),
+ config.getInstanceId().get(),
+ config.getDatabaseId().get());
+ if (tableName != null) {
+ segments.add(tableName);
+ }
+ Lineage.getSources().add("spanner", segments.build());
+ reportedLineage = tableName;
+ }
}
private ResultSet execute(ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java
index 4cbfe12b231f..2b9f24f09541 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java
@@ -24,6 +24,9 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;
/** Encapsulates a spanner read operation. */
@@ -116,4 +119,28 @@ public ReadOperation withIndex(String index) {
public ReadOperation withPartitionOptions(PartitionOptions partitionOptions) {
return toBuilder().setPartitionOptions(partitionOptions).build();
}
+
+ private static final Pattern queryPattern =
+ Pattern.compile(
+ "SELECT\\s+.+FROM\\s+\\[?(?[^\\s\\[\\]]+)\\]?", Pattern.CASE_INSENSITIVE);
+ /**
+ * Get table name associated with this operation.
+ *
+ * Currently only supports explicitly set table, and limited cases of set query. Return null
+ * for unsupported cases.
+ */
+ @Nullable
+ String tryGetTableName() {
+ if (!Strings.isNullOrEmpty(getTable())) {
+ return getTable();
+ } else if (getQuery() != null) {
+ String query = getQuery().getSql();
+ System.err.println(query);
+ Matcher matcher = queryPattern.matcher(query);
+ if (matcher.find()) {
+ return matcher.group("table");
+ }
+ }
+ return null;
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
index 943efc9883b6..2a2b01cca9bd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
@@ -70,6 +70,7 @@ public class SpannerAccessor implements AutoCloseable {
private final BatchClient batchClient;
private final DatabaseAdminClient databaseAdminClient;
private final SpannerConfig spannerConfig;
+ private final String instanceConfigId;
private int refcount = 0;
private SpannerAccessor(
@@ -77,12 +78,14 @@ private SpannerAccessor(
DatabaseClient databaseClient,
DatabaseAdminClient databaseAdminClient,
BatchClient batchClient,
- SpannerConfig spannerConfig) {
+ SpannerConfig spannerConfig,
+ String instanceConfigId) {
this.spanner = spanner;
this.databaseClient = databaseClient;
this.databaseAdminClient = databaseAdminClient;
this.batchClient = batchClient;
this.spannerConfig = spannerConfig;
+ this.instanceConfigId = instanceConfigId;
}
public static SpannerAccessor getOrCreate(SpannerConfig spannerConfig) {
@@ -246,9 +249,24 @@ private static SpannerAccessor createAndConnect(SpannerConfig spannerConfig) {
BatchClient batchClient =
spanner.getBatchClient(DatabaseId.of(options.getProjectId(), instanceId, databaseId));
DatabaseAdminClient databaseAdminClient = spanner.getDatabaseAdminClient();
+ String instanceConfigId = "unknown";
+ try {
+ instanceConfigId =
+ spanner
+ .getInstanceAdminClient()
+ .getInstance(instanceId)
+ .getInstanceConfigId()
+ .getInstanceConfig();
+ } catch (Exception e) {
+ // fetch instanceConfigId is fail-free.
+ // Do not emit warning when serviceFactory is overridden (e.g. in tests).
+ if (spannerConfig.getServiceFactory() == null) {
+ LOG.warn("unable to get Spanner instanceConfigId for {}: {}", instanceId, e.getMessage());
+ }
+ }
return new SpannerAccessor(
- spanner, databaseClient, databaseAdminClient, batchClient, spannerConfig);
+ spanner, databaseClient, databaseAdminClient, batchClient, spannerConfig, instanceConfigId);
}
public DatabaseClient getDatabaseClient() {
@@ -263,6 +281,10 @@ public DatabaseAdminClient getDatabaseAdminClient() {
return databaseAdminClient;
}
+ public String getInstanceConfigId() {
+ return instanceConfigId;
+ }
+
@Override
public void close() {
// Only close Spanner when present in map and refcount == 0
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 0437e4145904..435bbba9ae8e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -89,6 +89,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
@@ -2177,7 +2178,8 @@ static class WriteToSpannerFn extends DoFn, Void> {
// SpannerAccessor can not be serialized so must be initialized at runtime in setup().
private transient SpannerAccessor spannerAccessor;
-
+ // resolved at runtime for metrics report purpose. SpannerConfig may not have projectId set.
+ private transient String projectId;
/* Number of times an aborted write to spanner could be retried */
private static final int ABORTED_RETRY_ATTEMPTS = 5;
/* Error string in Aborted exception during schema change */
@@ -2250,6 +2252,8 @@ public ServiceCallMetric load(String tableName) {
return buildWriteServiceCallMetric(spannerConfig, tableName);
}
});
+
+ projectId = resolveSpannerProjectId(spannerConfig);
}
@Teardown
@@ -2302,15 +2306,29 @@ public void processElement(ProcessContext c) throws Exception {
to retry silently. These must not be counted against retry backoff.
*/
private void spannerWriteWithRetryIfSchemaChange(List batch) throws SpannerException {
+ Set tableNames = batch.stream().map(Mutation::getTable).collect(Collectors.toSet());
for (int retry = 1; ; retry++) {
try {
spannerAccessor
.getDatabaseClient()
.writeAtLeastOnceWithOptions(batch, getTransactionOptions());
- reportServiceCallMetricsForBatch(batch, "ok");
+ // Get names of all tables in batch of mutations.
+ reportServiceCallMetricsForBatch(tableNames, "ok");
+ for (String tableName : tableNames) {
+ Lineage.getSinks()
+ .add(
+ "spanner",
+ ImmutableList.of(
+ projectId,
+ spannerAccessor.getInstanceConfigId(),
+ spannerConfig.getInstanceId().get(),
+ spannerConfig.getDatabaseId().get(),
+ tableName));
+ }
return;
} catch (AbortedException e) {
- reportServiceCallMetricsForBatch(batch, e.getErrorCode().getGrpcStatusCode().toString());
+ reportServiceCallMetricsForBatch(
+ tableNames, e.getErrorCode().getGrpcStatusCode().toString());
if (retry >= ABORTED_RETRY_ATTEMPTS) {
throw e;
}
@@ -2321,7 +2339,8 @@ private void spannerWriteWithRetryIfSchemaChange(List batch) throws Sp
}
throw e;
} catch (SpannerException e) {
- reportServiceCallMetricsForBatch(batch, e.getErrorCode().getGrpcStatusCode().toString());
+ reportServiceCallMetricsForBatch(
+ tableNames, e.getErrorCode().getGrpcStatusCode().toString());
throw e;
}
}
@@ -2342,9 +2361,7 @@ private Options.TransactionOption[] getTransactionOptions() {
.toArray(Options.TransactionOption[]::new);
}
- private void reportServiceCallMetricsForBatch(List batch, String statusCode) {
- // Get names of all tables in batch of mutations.
- Set tableNames = batch.stream().map(Mutation::getTable).collect(Collectors.toSet());
+ private void reportServiceCallMetricsForBatch(Set tableNames, String statusCode) {
for (String tableName : tableNames) {
writeMetricsByTableName.getUnchecked(tableName).call(statusCode);
}
@@ -2424,16 +2441,19 @@ private static HashMap buildServiceCallMetricLabels(SpannerConfi
baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner");
baseLabels.put(
- MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID,
- config.getProjectId() == null
- || config.getProjectId().get() == null
- || config.getProjectId().get().isEmpty()
- ? SpannerOptions.getDefaultProjectId()
- : config.getProjectId().get());
+ MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, resolveSpannerProjectId(config));
baseLabels.put(
MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, config.getInstanceId().get());
baseLabels.put(
MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, config.getDatabaseId().get());
return baseLabels;
}
+
+ static String resolveSpannerProjectId(SpannerConfig config) {
+ return config.getProjectId() == null
+ || config.getProjectId().get() == null
+ || config.getProjectId().get().isEmpty()
+ ? SpannerOptions.getDefaultProjectId()
+ : config.getProjectId().get();
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java
index 8c417bdc34ec..a4583952247e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java
@@ -25,6 +25,8 @@
import com.google.cloud.spanner.BatchClient;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
+import com.google.cloud.spanner.Instance;
+import com.google.cloud.spanner.InstanceAdminClient;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import java.io.Serializable;
@@ -51,6 +53,12 @@ class FakeServiceFactory implements ServiceFactory, Ser
@GuardedBy("lock")
private static final List mockBatchClients = new ArrayList<>();
+ @GuardedBy("lock")
+ private static final List mockAdminClients = new ArrayList<>();
+
+ @GuardedBy("lock")
+ private static final List mockInstances = new ArrayList<>();
+
@GuardedBy("lock")
private static int count = 0;
@@ -62,11 +70,15 @@ public FakeServiceFactory() {
mockSpanners.add(mock(Spanner.class, withSettings().serializable()));
mockDatabaseClients.add(mock(DatabaseClient.class, withSettings().serializable()));
mockBatchClients.add(mock(BatchClient.class, withSettings().serializable()));
+ mockAdminClients.add(mock(InstanceAdminClient.class, withSettings().serializable()));
+ mockInstances.add(mock(Instance.class, withSettings().serializable()));
}
+ when(mockAdminClient().getInstance(Matchers.any(String.class))).thenReturn(mockInstance());
when(mockSpanner().getDatabaseClient(Matchers.any(DatabaseId.class)))
.thenReturn(mockDatabaseClient());
when(mockSpanner().getBatchClient(Matchers.any(DatabaseId.class)))
.thenReturn(mockBatchClient());
+ when(mockSpanner().getInstanceAdminClient()).thenReturn(mockAdminClient());
}
DatabaseClient mockDatabaseClient() {
@@ -87,6 +99,18 @@ Spanner mockSpanner() {
}
}
+ InstanceAdminClient mockAdminClient() {
+ synchronized (lock) {
+ return mockAdminClients.get(index);
+ }
+ }
+
+ Instance mockInstance() {
+ synchronized (lock) {
+ return mockInstances.get(index);
+ }
+ }
+
@Override
public Spanner create(SpannerOptions serviceOptions) {
return mockSpanner();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
index 3d4b7818c651..2a02b360b0b1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
@@ -19,6 +19,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -33,6 +34,7 @@
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.FakeBatchTransactionId;
import com.google.cloud.spanner.FakePartitionFactory;
+import com.google.cloud.spanner.InstanceConfigId;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Options.ReadAndQueryOption;
import com.google.cloud.spanner.Options.ReadQueryUpdateTransactionOption;
@@ -53,11 +55,14 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.PAssert;
@@ -65,6 +70,8 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.Before;
import org.junit.Rule;
@@ -81,6 +88,7 @@ public class SpannerIOReadTest implements Serializable {
private static final TimestampBound TIMESTAMP_BOUND =
TimestampBound.ofReadTimestamp(Timestamp.ofTimeMicroseconds(12345));
public static final String PROJECT_ID = "1234";
+ public static final String INSTANCE_CONFIG_ID = "5678";
public static final String INSTANCE_ID = "123";
public static final String DATABASE_ID = "aaa";
public static final String TABLE_ID = "users";
@@ -111,6 +119,9 @@ public class SpannerIOReadTest implements Serializable {
Struct.newBuilder().set("id").to(Value.int64(5)).set("name").to("Evan").build(),
Struct.newBuilder().set("id").to(Value.int64(6)).set("name").to("Floyd").build());
+ private static final String DEFAULT_PROJECT =
+ Lineage.wrapSegment(SpannerOptions.getDefaultProjectId());
+
@Before
public void setUp() throws Exception {
serviceFactory = new FakeServiceFactory();
@@ -130,6 +141,8 @@ public void setUp() throws Exception {
.thenReturn(mockBatchTx);
when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class)))
.thenReturn(mockBatchTx);
+ when(serviceFactory.mockInstance().getInstanceConfigId())
+ .thenReturn(InstanceConfigId.of(PROJECT_ID, INSTANCE_CONFIG_ID));
// Setup the ProcessWideContainer for testing metrics are set.
MetricsContainerImpl container = new MetricsContainerImpl(null);
@@ -139,12 +152,20 @@ public void setUp() throws Exception {
@Test
public void runBatchQueryTestWithSpannerConfig() {
- runBatchQueryTest(
- SpannerIO.read()
- .withSpannerConfig(spannerConfig)
- .withQuery(QUERY_STATEMENT)
- .withQueryName(QUERY_NAME)
- .withTimestampBound(TIMESTAMP_BOUND));
+ PipelineResult result =
+ runBatchQueryTest(
+ SpannerIO.read()
+ .withSpannerConfig(spannerConfig)
+ .withQuery(QUERY_STATEMENT)
+ .withQueryName(QUERY_NAME)
+ .withTimestampBound(TIMESTAMP_BOUND));
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+ hasItem(
+ Lineage.getFqName(
+ "spanner",
+ ImmutableList.of(
+ PROJECT_ID, INSTANCE_CONFIG_ID, INSTANCE_ID, DATABASE_ID, "users"))));
}
@Test
@@ -166,31 +187,48 @@ public void runWithQueryAndWithTableAtTheSameTimeFails() {
@Test
public void runBatchQueryTestWithUnspecifiedProject() {
// Default spannerConfig has project ID specified - use an unspecified project.
- runBatchQueryTest(
- SpannerIO.read()
- .withSpannerConfig(
- SpannerConfig.create()
- .withInstanceId(INSTANCE_ID)
- .withDatabaseId(DATABASE_ID)
- .withServiceFactory(serviceFactory))
- .withQuery(QUERY_STATEMENT)
- .withQueryName(QUERY_NAME)
- .withTimestampBound(TIMESTAMP_BOUND));
+ PipelineResult result =
+ runBatchQueryTest(
+ SpannerIO.read()
+ .withSpannerConfig(
+ SpannerConfig.create()
+ .withInstanceId(INSTANCE_ID)
+ .withDatabaseId(DATABASE_ID)
+ .withServiceFactory(serviceFactory))
+ .withQuery(QUERY_STATEMENT)
+ .withQueryName(QUERY_NAME)
+ .withTimestampBound(TIMESTAMP_BOUND));
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+ hasItem(
+ Lineage.getFqName(
+ "spanner",
+ ImmutableList.of(
+ DEFAULT_PROJECT, INSTANCE_CONFIG_ID, INSTANCE_ID, DATABASE_ID, "users"))));
}
@Test
public void runBatchQueryTestWithNullProject() {
- runBatchQueryTest(
- SpannerIO.read()
- .withSpannerConfig(
- SpannerConfig.create()
- .withProjectId((String) null)
- .withInstanceId(INSTANCE_ID)
- .withDatabaseId(DATABASE_ID)
- .withServiceFactory(serviceFactory))
- .withQuery(QUERY_STATEMENT)
- .withQueryName(QUERY_NAME)
- .withTimestampBound(TIMESTAMP_BOUND));
+ PipelineResult result =
+ runBatchQueryTest(
+ SpannerIO.read()
+ .withSpannerConfig(
+ SpannerConfig.create()
+ .withProjectId((String) null)
+ .withInstanceId(INSTANCE_ID)
+ .withDatabaseId(DATABASE_ID)
+ .withServiceFactory(serviceFactory))
+ .withQuery(QUERY_STATEMENT)
+ .withQueryName(QUERY_NAME)
+ .withTimestampBound(TIMESTAMP_BOUND));
+
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+ hasItem(
+ Lineage.getFqName(
+ "spanner",
+ ImmutableList.of(
+ DEFAULT_PROJECT, INSTANCE_CONFIG_ID, INSTANCE_ID, DATABASE_ID, "users"))));
}
@Test
@@ -218,7 +256,7 @@ public void runBatchQueryTestWithDataBoost() {
runBatchQueryTest(readTransform);
}
- private void runBatchQueryTest(SpannerIO.Read readTransform) {
+ private PipelineResult runBatchQueryTest(SpannerIO.Read readTransform) {
PCollection results = pipeline.apply("read q", readTransform);
when(mockBatchTx.partitionQuery(
@@ -234,8 +272,9 @@ private void runBatchQueryTest(SpannerIO.Read readTransform) {
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6)));
PAssert.that(results).containsInAnyOrder(FAKE_ROWS);
- pipeline.run();
+ PipelineResult result = pipeline.run();
verifyQueryRequestMetricWasSet(readTransform.getSpannerConfig(), QUERY_NAME, "ok", 4);
+ return result;
}
@Test
@@ -269,42 +308,66 @@ public void runBatchQueryTestWithFailures() {
@Test
public void runNaiveQueryTestWithProjectId() {
- runNaiveQueryTest(
- SpannerIO.read()
- .withSpannerConfig(spannerConfig)
- .withQuery(QUERY_STATEMENT)
- .withQueryName(QUERY_NAME)
- .withTimestampBound(TIMESTAMP_BOUND));
+ PipelineResult result =
+ runNaiveQueryTest(
+ SpannerIO.read()
+ .withSpannerConfig(spannerConfig)
+ .withQuery(QUERY_STATEMENT)
+ .withQueryName(QUERY_NAME)
+ .withTimestampBound(TIMESTAMP_BOUND));
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+ hasItem(
+ Lineage.getFqName(
+ "spanner",
+ ImmutableList.of(
+ PROJECT_ID, INSTANCE_CONFIG_ID, INSTANCE_ID, DATABASE_ID, "users"))));
}
@Test
public void runNaiveQueryTestWithUnspecifiedProject() {
// Default spannerConfig has project ID specified - use an unspecified project.
- runNaiveQueryTest(
- SpannerIO.read()
- .withSpannerConfig(
- SpannerConfig.create()
- .withInstanceId(INSTANCE_ID)
- .withDatabaseId(DATABASE_ID)
- .withServiceFactory(serviceFactory))
- .withQuery(QUERY_STATEMENT)
- .withQueryName(QUERY_NAME)
- .withTimestampBound(TIMESTAMP_BOUND));
+ PipelineResult result =
+ runNaiveQueryTest(
+ SpannerIO.read()
+ .withSpannerConfig(
+ SpannerConfig.create()
+ .withInstanceId(INSTANCE_ID)
+ .withDatabaseId(DATABASE_ID)
+ .withServiceFactory(serviceFactory))
+ .withQuery(QUERY_STATEMENT)
+ .withQueryName(QUERY_NAME)
+ .withTimestampBound(TIMESTAMP_BOUND));
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+ hasItem(
+ Lineage.getFqName(
+ "spanner",
+ ImmutableList.of(
+ DEFAULT_PROJECT, INSTANCE_CONFIG_ID, INSTANCE_ID, DATABASE_ID, "users"))));
}
@Test
public void runNaiveQueryTestWithNullProject() {
- runNaiveQueryTest(
- SpannerIO.read()
- .withSpannerConfig(
- SpannerConfig.create()
- .withProjectId((String) null)
- .withInstanceId(INSTANCE_ID)
- .withDatabaseId(DATABASE_ID)
- .withServiceFactory(serviceFactory))
- .withQuery(QUERY_STATEMENT)
- .withQueryName(QUERY_NAME)
- .withTimestampBound(TIMESTAMP_BOUND));
+ PipelineResult result =
+ runNaiveQueryTest(
+ SpannerIO.read()
+ .withSpannerConfig(
+ SpannerConfig.create()
+ .withProjectId((String) null)
+ .withInstanceId(INSTANCE_ID)
+ .withDatabaseId(DATABASE_ID)
+ .withServiceFactory(serviceFactory))
+ .withQuery(QUERY_STATEMENT)
+ .withQueryName(QUERY_NAME)
+ .withTimestampBound(TIMESTAMP_BOUND));
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+ hasItem(
+ Lineage.getFqName(
+ "spanner",
+ ImmutableList.of(
+ DEFAULT_PROJECT, INSTANCE_CONFIG_ID, INSTANCE_ID, DATABASE_ID, "users"))));
}
@Test
@@ -320,7 +383,7 @@ public void runNaiveQueryTestWithPriority() {
assertEquals(RpcPriority.HIGH, readTransform.getSpannerConfig().getRpcPriority().get());
}
- private void runNaiveQueryTest(SpannerIO.Read readTransform) {
+ private PipelineResult runNaiveQueryTest(SpannerIO.Read readTransform) {
readTransform = readTransform.withBatching(false);
PCollection results = pipeline.apply("read q", readTransform);
when(mockBatchTx.executeQuery(
@@ -328,8 +391,9 @@ private void runNaiveQueryTest(SpannerIO.Read readTransform) {
.thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
PAssert.that(results).containsInAnyOrder(FAKE_ROWS);
- pipeline.run();
+ PipelineResult result = pipeline.run();
verifyQueryRequestMetricWasSet(readTransform.getSpannerConfig(), QUERY_NAME, "ok", 1);
+ return result;
}
@Test
@@ -773,6 +837,21 @@ public void runReadFailsToRetrieveSchema() {
checkMessage("Cannot call getSchema when there is no schema", exception.getMessage());
}
+ @Test
+ public void testReadOperationTryGetTableName() {
+ ImmutableMap testCases =
+ ImmutableMap.builder()
+ .put("table", ReadOperation.create().withTable("table"))
+ .put("table_2", ReadOperation.create().withQuery("SELECT a, b FROM table_2"))
+ .put(
+ "table_3",
+ ReadOperation.create().withQuery(Statement.of("SELECT * FROM [table_3]")))
+ .build();
+ for (Map.Entry entry : testCases.entrySet()) {
+ assertEquals(entry.getKey(), entry.getValue().tryGetTableName());
+ }
+ }
+
private void checkMessage(String substring, @Nullable String message) {
if (message != null) {
assertThat(message, containsString(substring));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
index 0a895974e5ca..9ff2d4141392 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
@@ -21,6 +21,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -40,6 +41,7 @@
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.InstanceConfigId;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeyRange;
import com.google.cloud.spanner.KeySet;
@@ -64,12 +66,14 @@
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.BatchableMutationFilterFn;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.FailureMode;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.GatherSortCreateBatchesFn;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Write;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.WriteToSpannerFn;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.PAssert;
@@ -86,6 +90,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.joda.time.Duration;
@@ -112,12 +117,18 @@
public class SpannerIOWriteTest implements Serializable {
private static final long CELLS_PER_KEY = 7;
+ private static final String PROJECT_NAME = "test-project";
+ private static final String INSTANCE_CONFIG_NAME = "regional-us-central1";
+ private static final String INSTANCE_NAME = "test-instance";
+ private static final String DATABASE_NAME = "test-database";
private static final String TABLE_NAME = "test-table";
private static final SpannerConfig SPANNER_CONFIG =
SpannerConfig.create()
- .withDatabaseId("test-database")
- .withInstanceId("test-instance")
- .withProjectId("test-project");
+ .withDatabaseId(DATABASE_NAME)
+ .withInstanceId(INSTANCE_NAME)
+ .withProjectId(PROJECT_NAME);
+ private static final String DEFAULT_PROJECT =
+ Lineage.wrapSegment(SpannerOptions.getDefaultProjectId());
@Rule public transient TestPipeline pipeline = TestPipeline.create();
@Rule public transient ExpectedException thrown = ExpectedException.none();
@@ -141,6 +152,8 @@ public void setUp() throws Exception {
.mockDatabaseClient()
.writeAtLeastOnceWithOptions(mutationBatchesCaptor.capture(), optionsCaptor.capture()))
.thenReturn(null);
+ when(serviceFactory.mockInstance().getInstanceConfigId())
+ .thenReturn(InstanceConfigId.of(PROJECT_NAME, INSTANCE_CONFIG_NAME));
// Simplest schema: a table with int64 key
// Verify case-insensitivity of table names by using different case for teble name.
@@ -314,9 +327,20 @@ public void singleMutationPipeline() throws Exception {
mutations.apply(
SpannerIO.write().withSpannerConfig(SPANNER_CONFIG).withServiceFactory(serviceFactory));
- pipeline.run();
+ PipelineResult result = pipeline.run();
verifyBatches(buildMutationBatch(buildUpsertMutation(2L)));
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SINK),
+ hasItem(
+ Lineage.getFqName(
+ "spanner",
+ ImmutableList.of(
+ PROJECT_NAME,
+ INSTANCE_CONFIG_NAME,
+ INSTANCE_NAME,
+ DATABASE_NAME,
+ TABLE_NAME))));
}
@Test
@@ -333,9 +357,20 @@ public void singlePgMutationPipeline() throws Exception {
.withSpannerConfig(SPANNER_CONFIG)
.withServiceFactory(serviceFactory)
.withDialectView(pgDialectView));
- pipeline.run();
+ PipelineResult result = pipeline.run();
verifyBatches(buildMutationBatch(buildUpsertMutation(2L)));
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SINK),
+ hasItem(
+ Lineage.getFqName(
+ "spanner",
+ ImmutableList.of(
+ PROJECT_NAME,
+ INSTANCE_CONFIG_NAME,
+ INSTANCE_NAME,
+ DATABASE_NAME,
+ TABLE_NAME))));
}
@Test
@@ -346,7 +381,7 @@ public void singleMutationPipelineNoProjectId() throws Exception {
SpannerConfig config =
SpannerConfig.create().withInstanceId("test-instance").withDatabaseId("test-database");
mutations.apply(SpannerIO.write().withSpannerConfig(config).withServiceFactory(serviceFactory));
- pipeline.run();
+ PipelineResult result = pipeline.run();
// don't use VerifyBatches as that uses the common SPANNER_CONFIG with project ID:
verify(serviceFactory.mockDatabaseClient(), times(1))
@@ -355,6 +390,17 @@ public void singleMutationPipelineNoProjectId() throws Exception {
any(ReadQueryUpdateTransactionOption.class));
verifyTableWriteRequestMetricWasSet(config, TABLE_NAME, "ok", 1);
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SINK),
+ hasItem(
+ Lineage.getFqName(
+ "spanner",
+ ImmutableList.of(
+ DEFAULT_PROJECT,
+ INSTANCE_CONFIG_NAME,
+ INSTANCE_NAME,
+ DATABASE_NAME,
+ TABLE_NAME))));
}
@Test
@@ -368,7 +414,7 @@ public void singleMutationPipelineNullProjectId() throws Exception {
.withInstanceId("test-instance")
.withDatabaseId("test-database");
mutations.apply(SpannerIO.write().withSpannerConfig(config).withServiceFactory(serviceFactory));
- pipeline.run();
+ PipelineResult result = pipeline.run();
// don't use VerifyBatches as that uses the common SPANNER_CONFIG with project ID:
verify(serviceFactory.mockDatabaseClient(), times(1))
@@ -377,6 +423,17 @@ public void singleMutationPipelineNullProjectId() throws Exception {
any(ReadQueryUpdateTransactionOption.class));
verifyTableWriteRequestMetricWasSet(config, TABLE_NAME, "ok", 1);
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SINK),
+ hasItem(
+ Lineage.getFqName(
+ "spanner",
+ ImmutableList.of(
+ DEFAULT_PROJECT,
+ INSTANCE_CONFIG_NAME,
+ INSTANCE_NAME,
+ DATABASE_NAME,
+ TABLE_NAME))));
}
@Test
@@ -391,11 +448,22 @@ public void singleMutationGroupPipeline() throws Exception {
.withSpannerConfig(SPANNER_CONFIG)
.withServiceFactory(serviceFactory)
.grouped());
- pipeline.run();
+ PipelineResult result = pipeline.run();
verifyBatches(
buildMutationBatch(
buildUpsertMutation(1L), buildUpsertMutation(2L), buildUpsertMutation(3L)));
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SINK),
+ hasItem(
+ Lineage.getFqName(
+ "spanner",
+ ImmutableList.of(
+ PROJECT_NAME,
+ INSTANCE_CONFIG_NAME,
+ INSTANCE_NAME,
+ DATABASE_NAME,
+ TABLE_NAME))));
}
@Test
@@ -416,11 +484,22 @@ public void singlePgMutationGroupPipeline() throws Exception {
.withServiceFactory(serviceFactory)
.withDialectView(pgDialectView)
.grouped());
- pipeline.run();
+ PipelineResult result = pipeline.run();
verifyBatches(
buildMutationBatch(
buildUpsertMutation(1L), buildUpsertMutation(2L), buildUpsertMutation(3L)));
+ assertThat(
+ Lineage.query(result.metrics(), Lineage.Type.SINK),
+ hasItem(
+ Lineage.getFqName(
+ "spanner",
+ ImmutableList.of(
+ PROJECT_NAME,
+ INSTANCE_CONFIG_NAME,
+ INSTANCE_NAME,
+ DATABASE_NAME,
+ TABLE_NAME))));
}
@Test