From 2c4924347732473df4dedbd3a93db9c8326fb477 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 14 Aug 2024 12:50:12 -0400 Subject: [PATCH] Add Lineage metrics to FileSystems (#32090) * Add Lineage metrics to FileSystems * Handle reserved characters (colon, dot, white space) in Lineage FQN * address comments - align up --- .../org/apache/beam/sdk/io/FileBasedSink.java | 1 + .../apache/beam/sdk/io/FileBasedSource.java | 1 + .../org/apache/beam/sdk/io/FileSystem.java | 8 ++ .../org/apache/beam/sdk/io/FileSystems.java | 11 +++ .../ReadAllViaFileBasedSourceTransform.java | 5 +- .../org/apache/beam/sdk/metrics/Lineage.java | 97 +++++++++++++++++-- .../apache/beam/sdk/metrics/LineageTest.java | 59 +++++++++++ .../extensions/gcp/storage/GcsFileSystem.java | 11 +++ .../beam/sdk/io/aws/s3/S3FileSystem.java | 6 ++ .../beam/sdk/io/aws2/s3/S3FileSystem.java | 6 ++ .../blobstore/AzureBlobStoreFileSystem.java | 13 +++ .../org/apache/beam/sdk/io/text/TextIOIT.java | 5 + .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 5 +- .../io/gcp/bigquery/BigQuerySourceBase.java | 3 +- .../bigquery/BigQueryStorageSourceBase.java | 7 +- .../sdk/io/gcp/bigquery/CreateTables.java | 3 +- .../StorageApiWriteUnshardedRecords.java | 3 +- .../StorageApiWritesShardedRecords.java | 3 +- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 3 +- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 3 +- .../io/gcp/bigtable/BigtableServiceImpl.java | 6 +- .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 15 +-- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 7 +- .../io/gcp/pubsub/PubsubUnboundedSink.java | 7 +- .../io/gcp/pubsub/PubsubUnboundedSource.java | 4 +- .../sdk/io/gcp/bigtable/BigtableReadIT.java | 5 +- .../sdk/io/gcp/bigtable/BigtableWriteIT.java | 4 +- .../sdk/io/gcp/pubsub/PubsubClientTest.java | 5 +- .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 4 +- 29 files changed, 265 insertions(+), 45 deletions(-) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 34b4246a7083..b7523ee12b56 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -687,6 +687,7 @@ protected final List, ResourceId>> finalizeDestinati distinctFilenames.get(finalFilename)); distinctFilenames.put(finalFilename, result); outputFilenames.add(KV.of(result, finalFilename)); + FileSystems.reportSinkLineage(finalFilename); } return outputFilenames; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index d68338eceaf4..7ddfde441aed 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -299,6 +299,7 @@ public final List> split( splitResults.size()); return splitResults; } else { + FileSystems.reportSourceLineage(getSingleFileMetadata().resourceId()); if (isSplittable()) { @SuppressWarnings("unchecked") List> splits = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java index 501cd72daddd..11314a318b25 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.metrics.Lineage; /** * File system interface in Beam. @@ -155,4 +156,11 @@ protected abstract void rename( * @see RFC 2396 */ protected abstract String getScheme(); + + /** + * Report {@link Lineage} metrics for resource id. + * + *

Unless override by FileSystem implementations, default to no-op. + */ + protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage) {} } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index c58ce9ad8308..a4ca9b80dce3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.KV; @@ -395,6 +396,16 @@ public ResourceId apply(@Nonnull Metadata input) { .delete(resourceIdsToDelete); } + /** Report source {@link Lineage} metrics for resource id. */ + public static void reportSourceLineage(ResourceId resourceId) { + getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.getSources()); + } + + /** Report sink {@link Lineage} metrics for resource id. */ + public static void reportSinkLineage(ResourceId resourceId) { + getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, Lineage.getSinks()); + } + private static class FilterResult { public List resultSources = new ArrayList(); public List resultDestinations = new ArrayList(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java index 74680ab7086d..bbac337f2d0f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -123,8 +124,9 @@ protected abstract T makeOutput( public void process(ProcessContext c) throws IOException { FileIO.ReadableFile file = c.element().getKey(); OffsetRange range = c.element().getValue(); + ResourceId resourceId = file.getMetadata().resourceId(); FileBasedSource source = - CompressedSource.from(createSource.apply(file.getMetadata().resourceId().toString())) + CompressedSource.from(createSource.apply(resourceId.toString())) .withCompression(file.getCompression()); try (BoundedSource.BoundedReader reader = source @@ -138,6 +140,7 @@ public void process(ProcessContext c) throws IOException { throw e; } } + FileSystems.reportSourceLineage(resourceId); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 6166a562bf2d..302ae4f2fef0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -19,26 +19,109 @@ import java.util.HashSet; import java.util.Set; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Standard collection of metrics used to record source and sinks information for lineage tracking. */ public class Lineage { + public static final String LINEAGE_NAMESPACE = "lineage"; - private static final StringSet SOURCES = - Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString()); - private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, Type.SINK.toString()); + private static final Lineage SOURCES = new Lineage(Type.SOURCE); + private static final Lineage SINKS = new Lineage(Type.SINK); + private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.]"); + + private final StringSet metric; - /** {@link StringSet} representing sources and optionally side inputs. */ - public static StringSet getSources() { + private Lineage(Type type) { + this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString()); + } + + /** {@link Lineage} representing sources and optionally side inputs. */ + public static Lineage getSources() { return SOURCES; } - /** {@link StringSet} representing sinks. */ - public static StringSet getSinks() { + /** {@link Lineage} representing sinks. */ + public static Lineage getSinks() { return SINKS; } + /** + * Wrap segment to valid segment name. + * + *

Specifically, If there are reserved chars (colon, whitespace, dot), escape with backtick. If + * the segment is already wrapped, return the original. + */ + private static String wrapSegment(String value) { + if (value.startsWith("`") && value.endsWith("`")) { + return value; + } + if (RESERVED_CHARS.matcher(value).find()) { + return String.format("`%s`", value); + } + return value; + } + + /** + * Assemble fully qualified name (FQN). Format: + * + *

    + *
  • {@code system:segment1.segment2} + *
  • {@code system:routine:segment1.segment2} + *
  • {@code system:`segment1.with.dots:clons`.segment2} + *
+ * + *

This helper method is for internal and testing usage only. + */ + @Internal + public static String getFqName( + String system, @Nullable String routine, Iterable segments) { + StringBuilder builder = new StringBuilder(system); + if (!Strings.isNullOrEmpty(routine)) { + builder.append(":").append(routine); + } + int idx = 0; + for (String segment : segments) { + if (idx == 0) { + builder.append(":"); + } else { + builder.append("."); + } + builder.append(wrapSegment(segment)); + ++idx; + } + return builder.toString(); + } + + /** + * Assemble the FQN of given system, and segments. + * + *

This helper method is for internal and testing usage only. + */ + @Internal + public static String getFqName(String system, Iterable segments) { + return getFqName(system, null, segments); + } + + /** + * Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link #getFqName}. + */ + public void add(String system, @Nullable String routine, Iterable segments) { + metric.add(getFqName(system, routine, segments)); + } + + /** + * Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link #getFqName}. + */ + public void add(String system, Iterable segments) { + add(system, null, segments); + } + /** Query {@link StringSet} metrics from {@link MetricResults}. */ public static Set query(MetricResults results, Type type) { MetricsFilter filter = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java new file mode 100644 index 000000000000..432eb396fe20 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link Lineage}. */ +@RunWith(JUnit4.class) +public class LineageTest { + @Test + public void testGetFqName() { + Map testCases = + ImmutableMap.builder() + .put("apache-beam", "apache-beam") + .put("`apache-beam`", "`apache-beam`") + .put("apache.beam", "`apache.beam`") + .put("apache:beam", "`apache:beam`") + .put("apache beam", "`apache beam`") + .put("`apache beam`", "`apache beam`") + .put("apache\tbeam", "`apache\tbeam`") + .put("apache\nbeam", "`apache\nbeam`") + .build(); + testCases.forEach( + (key, value) -> + assertEquals("apache:" + value, Lineage.getFqName("apache", ImmutableList.of(key)))); + testCases.forEach( + (key, value) -> + assertEquals( + "apache:beam:" + value, + Lineage.getFqName("apache", "beam", ImmutableList.of(key)))); + testCases.forEach( + (key, value) -> + assertEquals( + "apache:beam:" + value + "." + value, + Lineage.getFqName("apache", "beam", ImmutableList.of(key, key)))); + } +} diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java index b49c434f81c6..6332051c0ddc 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Status; import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch; @@ -214,6 +215,16 @@ protected String getScheme() { return "gs"; } + @Override + protected void reportLineage(GcsResourceId resourceId, Lineage lineage) { + GcsPath path = resourceId.getGcsPath(); + if (!path.getBucket().isEmpty()) { + lineage.add("gcs", ImmutableList.of(path.getBucket(), path.getObject())); + } else { + LOG.warn("Report Lineage on relative path {} is unsupported", path.getObject()); + } + } + private List matchGlobs(List globs) { // TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503. return FluentIterable.from(globs) diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java index fa442374e3cb..7ed56efa44bd 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java @@ -65,6 +65,7 @@ import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.util.MoreFutures; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; @@ -624,6 +625,11 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir return S3ResourceId.fromUri(singleResourceSpec); } + @Override + protected void reportLineage(S3ResourceId resourceId, Lineage lineage) { + lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey())); + } + /** * Invokes tasks in a thread pool, then unwraps the resulting {@link Future Futures}. * diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java index 5f08600758f6..384c8c627ee7 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.util.MoreFutures; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; @@ -655,6 +656,11 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir return S3ResourceId.fromUri(singleResourceSpec); } + @Override + protected void reportLineage(S3ResourceId resourceId, Lineage lineage) { + lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey())); + } + /** * Invokes tasks in a thread pool, then unwraps the resulting {@link Future Futures}. * diff --git a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java index 3b6e79b4ef7c..5137eaf9bb2d 100644 --- a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java +++ b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; @@ -449,4 +450,16 @@ protected AzfsResourceId matchNewResource(String singleResourceSpec, boolean isD } return AzfsResourceId.fromUri(singleResourceSpec); } + + @Override + protected void reportLineage(AzfsResourceId resourceId, Lineage lineage) { + if (!Strings.isNullOrEmpty(resourceId.getBlob())) { + lineage.add( + "abs", + ImmutableList.of( + resourceId.getAccount(), resourceId.getContainer(), resourceId.getBlob())); + } else { + lineage.add("abs", ImmutableList.of(resourceId.getAccount(), resourceId.getContainer())); + } + } } diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java index b852020c9bb3..859c03ed7750 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment; import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix; import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import java.time.Instant; @@ -36,6 +37,7 @@ import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn; import org.apache.beam.sdk.io.common.FileBasedIOTestPipelineOptions; import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testutils.NamedTestResult; @@ -152,6 +154,9 @@ public void writeThenReadAll() { PipelineResult result = pipeline.run(); PipelineResult.State pipelineState = result.waitUntilFinish(); + assertEquals( + Lineage.query(result.metrics(), Lineage.Type.SOURCE), + Lineage.query(result.metrics(), Lineage.Type.SINK)); collectAndPublishMetrics(result); // Fail the test if pipeline failed. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 61bed66a3365..129c8314fc80 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -55,6 +55,7 @@ import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -413,7 +414,7 @@ public static String toTableSpec(TableReference ref) { return sb.toString(); } - public static String dataCatalogName(TableReference ref, BigQueryOptions options) { + public static List dataCatalogSegments(TableReference ref, BigQueryOptions options) { String tableIdBase; int ix = ref.getTableId().indexOf('$'); if (ix == -1) { @@ -429,7 +430,7 @@ public static String dataCatalogName(TableReference ref, BigQueryOptions options } else { projectId = options.getProject(); } - return String.format("bigquery:%s.%s.%s", projectId, ref.getDatasetId(), tableIdBase); + return ImmutableList.of(projectId, ref.getDatasetId(), tableIdBase); } static List getOrCreateMapListValue(Map> map, K key) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 998c82ab8d83..a8985775cbe7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -121,7 +121,8 @@ protected ExtractResult extractFiles(PipelineOptions options) throws Exception { BigQueryHelpers.toTableSpec(tableToExtract))); } // emit this table ID as a lineage source - Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract, bqOptions)); + Lineage.getSources() + .add("bigquery", BigQueryHelpers.dataCatalogSegments(tableToExtract, bqOptions)); TableSchema schema = table.getSchema(); JobService jobService = bqServices.getJobService(bqOptions); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java index 3852d18ec12d..51a5a8f391a6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.metrics.StringSet; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -109,12 +108,12 @@ public List> split( @Nullable Table targetTable = getTargetTable(bqOptions); ReadSession.Builder readSessionBuilder = ReadSession.newBuilder(); - StringSet lineageSources = Lineage.getSources(); + Lineage lineage = Lineage.getSources(); if (targetTable != null) { TableReference tableReference = targetTable.getTableReference(); readSessionBuilder.setTable(BigQueryHelpers.toTableResourceName(tableReference)); // register the table as lineage source - lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, bqOptions)); + lineage.add("bigquery", BigQueryHelpers.dataCatalogSegments(tableReference, bqOptions)); } else { // If the table does not exist targetTable will be null. // Construct the table id if we can generate it. For error recording/logging. @@ -123,7 +122,7 @@ public List> split( readSessionBuilder.setTable(tableReferenceId); // register the table as lineage source TableReference tableReference = BigQueryHelpers.parseTableUrn(tableReferenceId); - lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, bqOptions)); + lineage.add("bigquery", BigQueryHelpers.dataCatalogSegments(tableReference, bqOptions)); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java index a55bcc3fe025..1bbd4e756084 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java @@ -121,7 +121,8 @@ public void processElement(ProcessContext context) { BigQueryOptions bqOptions = context.getPipelineOptions().as(BigQueryOptions.class); Lineage.getSinks() .add( - BigQueryHelpers.dataCatalogName( + "bigquery", + BigQueryHelpers.dataCatalogSegments( tableDestination1.getTableReference(), bqOptions)); return CreateTableHelpers.possiblyCreateTable( bqOptions, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 369bb2d78634..13a22cbfe90f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -1132,7 +1132,8 @@ public void process( pipelineOptions.as(BigQueryOptions.class))); Lineage.getSinks() .add( - BigQueryHelpers.dataCatalogName( + "bigquery", + BigQueryHelpers.dataCatalogSegments( state.getTableDestination().getTableReference(), pipelineOptions.as(BigQueryOptions.class))); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index f3f512110b50..1ee001d9890f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -477,7 +477,8 @@ public void process( Lineage.getSinks() .add( - BigQueryHelpers.dataCatalogName( + "bigquery", + BigQueryHelpers.dataCatalogSegments( tableDestination.getTableReference(), bigQueryOptions)); Coder destinationCoder = dynamicDestinations.getDestinationCoder(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 1a6a6a4db70d..061e66024e29 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -210,7 +210,8 @@ public void processElement( if (!entry.getValue().isEmpty()) { Lineage.getSinks() .add( - BigQueryHelpers.dataCatalogName( + "bigquery", + BigQueryHelpers.dataCatalogSegments( entry.getKey().getTableReference(), c.getPipelineOptions().as(BigQueryOptions.class))); pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c, window)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index ace0bc5a74cd..e374d459af44 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -263,7 +263,8 @@ public void processElement( } else { Lineage.getSinks() .add( - BigQueryHelpers.dataCatalogName( + "bigquery", + BigQueryHelpers.dataCatalogSegments( tableReference, c.getPipelineOptions().as(BigQueryOptions.class))); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 6fdf67722bac..1af9ae4f932d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -216,7 +216,7 @@ public void close() { @Override public void reportLineage() { - Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + Lineage.getSources().add("bigtable", ImmutableList.of(projectId, instanceId, tableId)); } } @@ -327,7 +327,7 @@ public void close() {} @Override public void reportLineage() { - Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + Lineage.getSources().add("bigtable", ImmutableList.of(projectId, instanceId, tableId)); } @Override @@ -597,7 +597,7 @@ public void writeSingleRecord(KV> record) throws @Override public void reportLineage() { - Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId)); + Lineage.getSinks().add("bigtable", ImmutableList.of(projectId, instanceId, tableId)); } private ServiceCallMetric createServiceCallMetric() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index f66ee6e1d842..2964a29dbb6b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -37,6 +37,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -260,8 +261,8 @@ public String getFullPath() { return String.format("/subscriptions/%s/%s", projectId, subscriptionName); } - public String getDataCatalogName() { - return String.format("pubsub:subscription:%s.%s", projectId, subscriptionName); + public List getDataCatalogSegments() { + return ImmutableList.of(projectId, subscriptionName); } @Override @@ -319,14 +320,14 @@ public String getName() { } /** - * Returns the data catalog name. Format "pubsub:topic:`project`.`topic`" This method is - * fail-safe. If topic path is malformed, it returns an empty string. + * Returns the data catalog segments. This method is fail-safe. If topic path is malformed, it + * returns an empty string. */ - public String getDataCatalogName() { + public List getDataCatalogSegments() { List splits = Splitter.on('/').splitToList(path); if (splits.size() == 4) { // well-formed path - return String.format("pubsub:topic:%s.%s", splits.get(1), splits.get(3)); + return ImmutableList.of(splits.get(1), splits.get(3)); } else { // Mal-formed path. It is either a test fixture or user error and will fail on publish. // We do not throw exception instead return empty string here. @@ -334,7 +335,7 @@ public String getDataCatalogName() { "Cannot get data catalog name for malformed topic path {}. Expected format: " + "projects//topics/", path); - return ""; + return ImmutableList.of(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 0fd4e9207d81..8b582c1054f8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -85,6 +85,7 @@ import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -513,8 +514,8 @@ public String asPath() { } } - public String dataCatalogName() { - return String.format("pubsub:topic:%s.%s", project, topic); + public List dataCatalogSegments() { + return ImmutableList.of(project, topic); } @Override @@ -1624,7 +1625,7 @@ public void finishBundle() throws IOException { } // Report lineage for all topics seen for (PubsubTopic topic : output.keySet()) { - Lineage.getSinks().add(topic.dataCatalogName()); + Lineage.getSinks().add("pubsub", "topic", topic.dataCatalogSegments()); } output = null; pubsubClient.close(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index defea87e835a..38d77aa3aac3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -70,7 +70,6 @@ import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -297,9 +296,9 @@ private void publishBatch(List messages, int bytes) throws IOEx byteCounter.inc(bytes); // Report Lineage multiple once for same topic if (!topicPath.equals(reportedLineage)) { - String name = topicPath.getDataCatalogName(); - if (!Strings.isNullOrEmpty(name)) { - Lineage.getSinks().add(topicPath.getDataCatalogName()); + List segments = topicPath.getDataCatalogSegments(); + if (segments.size() != 0) { + Lineage.getSinks().add("pubsub", "topic", segments); } reportedLineage = topicPath; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index b131b521c067..95fa5c223419 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -1045,14 +1045,14 @@ public List split(int desiredNumSplits, PipelineOptions options) TopicPath topic = outer.getTopic(); if (topic != null) { // is initial split on Read.fromTopic, report Lineage based on topic - Lineage.getSources().add(topic.getDataCatalogName()); + Lineage.getSources().add("pubsub", "source", topic.getDataCatalogSegments()); } } else { if (subscriptionPath.equals(outer.getSubscriptionProvider())) { SubscriptionPath sub = subscriptionPath.get(); if (sub != null) { // is a split on Read.fromSubscription - Lineage.getSources().add(sub.getDataCatalogName()); + Lineage.getSources().add("pubsub", "subscription", sub.getDataCatalogSegments()); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java index 4ce9ad10b2c0..4faff5ad6bd2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -154,7 +155,9 @@ private void checkLineageSourceMetric(PipelineResult r, String tableId) { if (options.getRunner().getName().contains("DirectRunner")) { assertThat( Lineage.query(r.metrics(), Lineage.Type.SOURCE), - hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId))); + hasItem( + Lineage.getFqName( + "bigtable", ImmutableList.of(project, options.getInstanceId(), tableId)))); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java index 46bb3df836e5..44f3e5a19923 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java @@ -423,7 +423,9 @@ private void checkLineageSinkMetric(PipelineResult r, String tableId) { if (options.getRunner().getName().contains("DirectRunner")) { assertThat( Lineage.query(r.metrics(), Lineage.Type.SINK), - hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId))); + hasItem( + Lineage.getFqName( + "bigtable", ImmutableList.of(project, options.getInstanceId(), tableId)))); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java index fb007d1171db..9d7bc65f5954 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.joda.time.Instant; import org.junit.Rule; @@ -171,7 +172,7 @@ public void subscriptionPathFromNameWellFormed() { SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something"); assertEquals("projects/test/subscriptions/something", path.getPath()); assertEquals("/subscriptions/test/something", path.getFullPath()); - assertEquals("pubsub:subscription:test.something", path.getDataCatalogName()); + assertEquals(ImmutableList.of("test", "something"), path.getDataCatalogSegments()); } @Test @@ -179,7 +180,7 @@ public void topicPathFromNameWellFormed() { TopicPath path = PubsubClient.topicPathFromName("test", "something"); assertEquals("projects/test/topics/something", path.getPath()); assertEquals("/topics/test/something", path.getFullPath()); - assertEquals("pubsub:topic:test.something", path.getDataCatalogName()); + assertEquals(ImmutableList.of("test", "something"), path.getDataCatalogSegments()); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index 74a98f0b8b43..d4effbae40a4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -238,8 +238,8 @@ public void testValueProviderTopic() { assertThat(pubsubRead.getTopicProvider().isAccessible(), is(true)); assertThat(pubsubRead.getTopicProvider().get().asPath(), equalTo(provider.get())); assertThat( - pubsubRead.getTopicProvider().get().dataCatalogName(), - equalTo("pubsub:topic:project.topic")); + pubsubRead.getTopicProvider().get().dataCatalogSegments(), + equalTo(ImmutableList.of("project", "topic"))); } @Test