From 17283bb8294f22edfc4d00c49bf3d9a518a1551b Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 6 Aug 2024 15:35:16 -0400 Subject: [PATCH] Add Lineage metrics to PubsubIO (#32037) * Add Lineage metrics to PubsubIO * fix format and add test * make getDatacatalogname fail safe --- .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 28 +++++++++++++++++++ .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 9 ++++++ .../io/gcp/pubsub/PubsubUnboundedSink.java | 13 +++++++++ .../io/gcp/pubsub/PubsubUnboundedSource.java | 14 ++++++++++ .../sdk/io/gcp/pubsub/PubsubClientTest.java | 2 ++ .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 3 ++ 6 files changed, 69 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index 79a9bb7f07d6..f66ee6e1d842 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -39,12 +39,15 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** An (abstract) helper class for talking to Pubsub via an underlying transport. */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public abstract class PubsubClient implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(PubsubClient.class); private static final Map> schemaTypeToConversionFnMap = ImmutableMap.of( @@ -257,6 +260,10 @@ public String getFullPath() { return String.format("/subscriptions/%s/%s", projectId, subscriptionName); } + public String getDataCatalogName() { + return String.format("pubsub:subscription:%s.%s", projectId, subscriptionName); + } + @Override public boolean equals(@Nullable Object o) { if (this == o) { @@ -293,6 +300,7 @@ public static SubscriptionPath subscriptionPathFromName( /** Path representing a Pubsub topic. */ public static class TopicPath implements Serializable { + // Format: "projects//topics/" private final String path; TopicPath(String path) { @@ -310,6 +318,26 @@ public String getName() { return splits.get(3); } + /** + * Returns the data catalog name. Format "pubsub:topic:`project`.`topic`" This method is + * fail-safe. If topic path is malformed, it returns an empty string. + */ + public String getDataCatalogName() { + List splits = Splitter.on('/').splitToList(path); + if (splits.size() == 4) { + // well-formed path + return String.format("pubsub:topic:%s.%s", splits.get(1), splits.get(3)); + } else { + // Mal-formed path. It is either a test fixture or user error and will fail on publish. + // We do not throw exception instead return empty string here. + LOG.warn( + "Cannot get data catalog name for malformed topic path {}. Expected format: " + + "projects//topics/", + path); + return ""; + } + } + public String getFullPath() { List splits = Splitter.on('/').splitToList(path); checkState(splits.size() == 4, "Malformed topic path %s", path); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 6233cf669080..0fd4e9207d81 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -512,6 +513,10 @@ public String asPath() { } } + public String dataCatalogName() { + return String.format("pubsub:topic:%s.%s", project, topic); + } + @Override public String toString() { return asPath(); @@ -1617,6 +1622,10 @@ public void finishBundle() throws IOException { for (Map.Entry entry : output.entrySet()) { publish(entry.getKey(), entry.getValue().messages); } + // Report lineage for all topics seen + for (PubsubTopic topic : output.keySet()) { + Lineage.getSinks().add(topic.dataCatalogName()); + } output = null; pubsubClient.close(); pubsubClient = null; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index aa8e3a411486..defea87e835a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.options.ValueProvider; @@ -69,6 +70,7 @@ import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -231,6 +233,9 @@ private static class WriterFn extends DoFn, Void> { /** Client on which to talk to Pubsub. Null until created by {@link #startBundle}. */ private transient @Nullable PubsubClient pubsubClient; + /** Last TopicPath that reported Lineage. */ + private transient @Nullable TopicPath reportedLineage; + private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches"); private final Counter elementCounter = SinkMetrics.elementsWritten(); private final Counter byteCounter = SinkMetrics.bytesWritten(); @@ -290,6 +295,14 @@ private void publishBatch(List messages, int bytes) throws IOEx batchCounter.inc(); elementCounter.inc(messages.size()); byteCounter.inc(bytes); + // Report Lineage multiple once for same topic + if (!topicPath.equals(reportedLineage)) { + String name = topicPath.getDataCatalogName(); + if (!Strings.isNullOrEmpty(name)) { + Lineage.getSinks().add(topicPath.getDataCatalogName()); + } + reportedLineage = topicPath; + } } @StartBundle diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index b9a554d54ade..b131b521c067 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -1041,6 +1042,19 @@ public List split(int desiredNumSplits, PipelineOptions options) splitSource = new PubsubSource( outer, StaticValueProvider.of(outer.createRandomSubscription(options))); + TopicPath topic = outer.getTopic(); + if (topic != null) { + // is initial split on Read.fromTopic, report Lineage based on topic + Lineage.getSources().add(topic.getDataCatalogName()); + } + } else { + if (subscriptionPath.equals(outer.getSubscriptionProvider())) { + SubscriptionPath sub = subscriptionPath.get(); + if (sub != null) { + // is a split on Read.fromSubscription + Lineage.getSources().add(sub.getDataCatalogName()); + } + } } for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) { // Since the source is immutable and Pubsub automatically shards we simply diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java index 895ed35bfb12..fb007d1171db 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java @@ -171,6 +171,7 @@ public void subscriptionPathFromNameWellFormed() { SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something"); assertEquals("projects/test/subscriptions/something", path.getPath()); assertEquals("/subscriptions/test/something", path.getFullPath()); + assertEquals("pubsub:subscription:test.something", path.getDataCatalogName()); } @Test @@ -178,6 +179,7 @@ public void topicPathFromNameWellFormed() { TopicPath path = PubsubClient.topicPathFromName("test", "something"); assertEquals("projects/test/topics/something", path.getPath()); assertEquals("/topics/test/something", path.getFullPath()); + assertEquals("pubsub:topic:test.something", path.getDataCatalogName()); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index 3027db6aee9d..74a98f0b8b43 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -237,6 +237,9 @@ public void testValueProviderTopic() { assertThat(pubsubRead.getTopicProvider(), not(nullValue())); assertThat(pubsubRead.getTopicProvider().isAccessible(), is(true)); assertThat(pubsubRead.getTopicProvider().get().asPath(), equalTo(provider.get())); + assertThat( + pubsubRead.getTopicProvider().get().dataCatalogName(), + equalTo("pubsub:topic:project.topic")); } @Test