Skip to content

Commit

Permalink
Add Lineage metrics to PubsubIO (#32037)
Browse files Browse the repository at this point in the history
* Add Lineage metrics to PubsubIO

* fix format and add test

* make getDatacatalogname fail safe
  • Loading branch information
Abacn authored Aug 6, 2024
1 parent 5ab908b commit 17283bb
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** An (abstract) helper class for talking to Pubsub via an underlying transport. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public abstract class PubsubClient implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(PubsubClient.class);
private static final Map<String, SerializableFunction<String, Schema>>
schemaTypeToConversionFnMap =
ImmutableMap.of(
Expand Down Expand Up @@ -257,6 +260,10 @@ public String getFullPath() {
return String.format("/subscriptions/%s/%s", projectId, subscriptionName);
}

public String getDataCatalogName() {
return String.format("pubsub:subscription:%s.%s", projectId, subscriptionName);
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
Expand Down Expand Up @@ -293,6 +300,7 @@ public static SubscriptionPath subscriptionPathFromName(

/** Path representing a Pubsub topic. */
public static class TopicPath implements Serializable {
// Format: "projects/<project>/topics/<topic>"
private final String path;

TopicPath(String path) {
Expand All @@ -310,6 +318,26 @@ public String getName() {
return splits.get(3);
}

/**
* Returns the data catalog name. Format "pubsub:topic:`project`.`topic`" This method is
* fail-safe. If topic path is malformed, it returns an empty string.
*/
public String getDataCatalogName() {
List<String> splits = Splitter.on('/').splitToList(path);
if (splits.size() == 4) {
// well-formed path
return String.format("pubsub:topic:%s.%s", splits.get(1), splits.get(3));
} else {
// Mal-formed path. It is either a test fixture or user error and will fail on publish.
// We do not throw exception instead return empty string here.
LOG.warn(
"Cannot get data catalog name for malformed topic path {}. Expected format: "
+ "projects/<project>/topics/<topic>",
path);
return "";
}
}

public String getFullPath() {
List<String> splits = Splitter.on('/').splitToList(path);
checkState(splits.size() == 4, "Malformed topic path %s", path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
Expand Down Expand Up @@ -512,6 +513,10 @@ public String asPath() {
}
}

public String dataCatalogName() {
return String.format("pubsub:topic:%s.%s", project, topic);
}

@Override
public String toString() {
return asPath();
Expand Down Expand Up @@ -1617,6 +1622,10 @@ public void finishBundle() throws IOException {
for (Map.Entry<PubsubTopic, OutgoingData> entry : output.entrySet()) {
publish(entry.getKey(), entry.getValue().messages);
}
// Report lineage for all topics seen
for (PubsubTopic topic : output.keySet()) {
Lineage.getSinks().add(topic.dataCatalogName());
}
output = null;
pubsubClient.close();
pubsubClient = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
Expand Down Expand Up @@ -231,6 +233,9 @@ private static class WriterFn extends DoFn<Iterable<OutgoingMessage>, Void> {
/** Client on which to talk to Pubsub. Null until created by {@link #startBundle}. */
private transient @Nullable PubsubClient pubsubClient;

/** Last TopicPath that reported Lineage. */
private transient @Nullable TopicPath reportedLineage;

private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches");
private final Counter elementCounter = SinkMetrics.elementsWritten();
private final Counter byteCounter = SinkMetrics.bytesWritten();
Expand Down Expand Up @@ -290,6 +295,14 @@ private void publishBatch(List<OutgoingMessage> messages, int bytes) throws IOEx
batchCounter.inc();
elementCounter.inc(messages.size());
byteCounter.inc(bytes);
// Report Lineage multiple once for same topic
if (!topicPath.equals(reportedLineage)) {
String name = topicPath.getDataCatalogName();
if (!Strings.isNullOrEmpty(name)) {
Lineage.getSinks().add(topicPath.getDataCatalogName());
}
reportedLineage = topicPath;
}
}

@StartBundle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -1041,6 +1042,19 @@ public List<PubsubSource> split(int desiredNumSplits, PipelineOptions options)
splitSource =
new PubsubSource(
outer, StaticValueProvider.of(outer.createRandomSubscription(options)));
TopicPath topic = outer.getTopic();
if (topic != null) {
// is initial split on Read.fromTopic, report Lineage based on topic
Lineage.getSources().add(topic.getDataCatalogName());
}
} else {
if (subscriptionPath.equals(outer.getSubscriptionProvider())) {
SubscriptionPath sub = subscriptionPath.get();
if (sub != null) {
// is a split on Read.fromSubscription
Lineage.getSources().add(sub.getDataCatalogName());
}
}
}
for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
// Since the source is immutable and Pubsub automatically shards we simply
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,15 @@ public void subscriptionPathFromNameWellFormed() {
SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", "something");
assertEquals("projects/test/subscriptions/something", path.getPath());
assertEquals("/subscriptions/test/something", path.getFullPath());
assertEquals("pubsub:subscription:test.something", path.getDataCatalogName());
}

@Test
public void topicPathFromNameWellFormed() {
TopicPath path = PubsubClient.topicPathFromName("test", "something");
assertEquals("projects/test/topics/something", path.getPath());
assertEquals("/topics/test/something", path.getFullPath());
assertEquals("pubsub:topic:test.something", path.getDataCatalogName());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ public void testValueProviderTopic() {
assertThat(pubsubRead.getTopicProvider(), not(nullValue()));
assertThat(pubsubRead.getTopicProvider().isAccessible(), is(true));
assertThat(pubsubRead.getTopicProvider().get().asPath(), equalTo(provider.get()));
assertThat(
pubsubRead.getTopicProvider().get().dataCatalogName(),
equalTo("pubsub:topic:project.topic"));
}

@Test
Expand Down

0 comments on commit 17283bb

Please sign in to comment.