Skip to content

Commit

Permalink
Add Lineage metrics for BigtableIO (#32068)
Browse files Browse the repository at this point in the history
* Add Lineage metrics for BigtableIO

* add tests

* simplify metrics query logics; exclude test actually already failing

* Address comments, fix typo
  • Loading branch information
Abacn authored Aug 6, 2024
1 parent 741facf commit 5ab908b
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
*/
package org.apache.beam.sdk.metrics;

import java.util.HashSet;
import java.util.Set;

/**
* Standard collection of metrics used to record source and sinks information for lineage tracking.
*/
public class Lineage {

public static final String LINEAGE_NAMESPACE = "lineage";
public static final String SOURCE_METRIC_NAME = "sources";
public static final String SINK_METRIC_NAME = "sinks";

private static final StringSet SOURCES = Metrics.stringSet(LINEAGE_NAMESPACE, SOURCE_METRIC_NAME);
private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, SINK_METRIC_NAME);
private static final StringSet SOURCES =
Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString());
private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, Type.SINK.toString());

/** {@link StringSet} representing sources and optionally side inputs. */
public static StringSet getSources() {
Expand All @@ -38,4 +38,35 @@ public static StringSet getSources() {
public static StringSet getSinks() {
return SINKS;
}

/** Query {@link StringSet} metrics from {@link MetricResults}. */
public static Set<String> query(MetricResults results, Type type) {
MetricsFilter filter =
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.named(LINEAGE_NAMESPACE, type.toString()))
.build();
Set<String> result = new HashSet<>();
for (MetricResult<StringSetResult> metrics : results.queryMetrics(filter).getStringSets()) {
result.addAll(metrics.getCommitted().getStringSet());
result.addAll(metrics.getAttempted().getStringSet());
}
return result;
}

/** Lineage metrics resource types. */
public enum Type {
SOURCE("source"),
SINK("sink");

private final String name;

Type(String name) {
this.name = name;
}

@Override
public String toString() {
return name;
}
}
}
4 changes: 4 additions & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ task integrationTest(type: Test, dependsOn: processTestResources) {

useJUnit {
excludeCategories "org.apache.beam.sdk.testing.UsesKms"
filter {
// https://github.com/apache/beam/issues/32071
excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableReadIT.testE2EBigtableSegmentRead'
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ private static class BigtableWriterFn
private transient Set<KV<BigtableWriteException, BoundedWindow>> badRecords = null;
// Due to callback thread not supporting Beam metrics, Record pending metrics and report later.
private transient long pendingThrottlingMsecs;
private transient boolean reportedLineage;

// Assign serviceEntry in startBundle and clear it in tearDown.
@Nullable private BigtableServiceEntry serviceEntry;
Expand Down Expand Up @@ -1480,6 +1481,10 @@ public void finishBundle(FinishBundleContext c) throws Exception {
throttlingMsecs.inc(excessTime);
}
}
if (!reportedLineage) {
bigtableWriter.reportLineage();
reportedLineage = true;
}
bigtableWriter = null;
}

Expand Down Expand Up @@ -1612,6 +1617,7 @@ public String toString() {
private final BigtableConfig config;
private final BigtableReadOptions readOptions;
private @Nullable Long estimatedSizeBytes;
private transient boolean reportedLineage;

private final BigtableServiceFactory.ConfigId configId;

Expand Down Expand Up @@ -1989,6 +1995,13 @@ public List<ByteKeyRange> getRanges() {
public ValueProvider<String> getTableId() {
return readOptions.getTableId();
}

void reportLineageOnce(BigtableService.Reader reader) {
if (!reportedLineage) {
reader.reportLineage();
reportedLineage = true;
}
}
}

private static class BigtableReader extends BoundedReader<Row> {
Expand Down Expand Up @@ -2019,6 +2032,7 @@ true, makeByteKey(reader.getCurrentRow().getKey())))
|| rangeTracker.markDone();
if (hasRecord) {
++recordsReturned;
source.reportLineageOnce(reader);
}
return hasRecord;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>
* @throws IOException if there is an error closing the writer
*/
void close() throws IOException;

/** Report Lineage metrics to runner. */
default void reportLineage() {}
}

/** The interface of a class that reads from Cloud Bigtable. */
Expand All @@ -77,6 +80,9 @@ interface Reader {
Row getCurrentRow() throws NoSuchElementException;

void close();

/** Report Lineage metrics to runner. */
default void reportLineage() {}
}

/** Returns a {@link Reader} that will read from the specified source. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -212,6 +213,11 @@ public void close() {
exhausted = true;
}
}

@Override
public void reportLineage() {
Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId));
}
}

@VisibleForTesting
Expand All @@ -225,6 +231,9 @@ static class BigtableSegmentReaderImpl implements Reader {
private final int refillSegmentWaterMark;
private final long maxSegmentByteSize;
private ServiceCallMetric serviceCallMetric;
private final String projectId;
private final String instanceId;
private final String tableId;

private static class UpstreamResults {
private final List<Row> rows;
Expand Down Expand Up @@ -308,11 +317,19 @@ static BigtableSegmentReaderImpl create(
// Asynchronously refill buffer when there is 10% of the elements are left
this.refillSegmentWaterMark =
Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE));
this.projectId = projectId;
this.instanceId = instanceId;
this.tableId = tableId;
}

@Override
public void close() {}

@Override
public void reportLineage() {
Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId));
}

@Override
public boolean start() throws IOException {
future = fetchNextSegment();
Expand Down Expand Up @@ -578,6 +595,11 @@ public void writeSingleRecord(KV<ByteString, Iterable<Mutation>> record) throws
}
}

@Override
public void reportLineage() {
Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, instanceId, tableId));
}

private ServiceCallMetric createServiceCallMetric() {
// Populate metrics
HashMap<String, String> baseLabels = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
Expand All @@ -61,9 +62,6 @@
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -351,18 +349,8 @@ private void checkTypedReadQueryObjectWithValidate(
}

private void checkLineageSourceMetric(PipelineResult pipelineResult, String tableName) {
MetricQueryResults lineageMetrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.named(
Lineage.LINEAGE_NAMESPACE, Lineage.SOURCE_METRIC_NAME))
.build());
assertThat(
lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(),
contains("bigquery:" + tableName.replace(':', '.')));
Set<String> result = Lineage.query(pipelineResult.metrics(), Lineage.Type.SOURCE);
assertThat(result, contains("bigquery:" + tableName.replace(':', '.')));
}

@Before
Expand Down Expand Up @@ -600,10 +588,7 @@ public void processElement(ProcessContext c) throws Exception {
new MyData("b", 2L, bd1, bd2),
new MyData("c", 3L, bd1, bd2)));
PipelineResult result = p.run();
// Skip when direct runner splits outside of a counters context.
if (useTemplateCompatibility) {
checkLineageSourceMetric(result, "non-executing-project:somedataset.sometable");
}
checkLineageSourceMetric(result, "non-executing-project:somedataset.sometable");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
Expand Down Expand Up @@ -285,16 +282,8 @@ public void evaluate() throws Throwable {
.withJobService(fakeJobService);

private void checkLineageSinkMetric(PipelineResult pipelineResult, String tableName) {
MetricQueryResults lineageMetrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.named(Lineage.LINEAGE_NAMESPACE, Lineage.SINK_METRIC_NAME))
.build());
assertThat(
lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(),
Lineage.query(pipelineResult.metrics(), Lineage.Type.SINK),
hasItem("bigquery:" + tableName.replace(':', '.')));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;

import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
Expand All @@ -28,7 +31,9 @@
import java.util.Date;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -110,7 +115,8 @@ public void testE2EBigtableRead() {
p.apply(BigtableIO.read().withBigtableOptions(bigtableOptionsBuilder).withTableId(tableId))
.apply(Count.globally());
PAssert.thatSingleton(count).isEqualTo(numRows);
p.run();
PipelineResult r = p.run();
checkLineageSourceMetric(r, tableId);
}

@Test
Expand Down Expand Up @@ -138,6 +144,17 @@ public void testE2EBigtableSegmentRead() {
.withMaxBufferElementCount(10))
.apply(Count.globally());
PAssert.thatSingleton(count).isEqualTo(numRows);
p.run();
PipelineResult r = p.run();
checkLineageSourceMetric(r, tableId);
}

private void checkLineageSourceMetric(PipelineResult r, String tableId) {
// TODO(https://github.com/apache/beam/issues/32071) test malformed,
// when pipeline.run() is non-blocking, the metrics are not available by the time of query
if (options.getRunner().getName().contains("DirectRunner")) {
assertThat(
Lineage.query(r.metrics(), Lineage.Type.SOURCE),
hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigtable;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;

import com.google.api.gax.rpc.ServerStream;
Expand All @@ -39,8 +40,10 @@
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.PAssert;
Expand Down Expand Up @@ -142,7 +145,7 @@ public void processElement(ProcessContext c) {
.withProjectId(project)
.withInstanceId(options.getInstanceId())
.withTableId(tableId));
p.run();
PipelineResult r = p.run();

// Test number of column families and column family name equality
Table table = getTable(tableId);
Expand All @@ -154,6 +157,7 @@ public void processElement(ProcessContext c) {
// Test table data equality
List<KV<ByteString, ByteString>> tableData = getTableData(tableId);
assertThat(tableData, Matchers.containsInAnyOrder(testData.toArray()));
checkLineageSinkMetric(r, tableId);
}

@Test
Expand Down Expand Up @@ -340,7 +344,7 @@ public void failureTest(int numRows, DoFn<Long, KV<ByteString, Iterable<Mutation
errorHandler.close();
PAssert.thatSingleton(Objects.requireNonNull(errorHandler.getOutput())).isEqualTo(2L);

p.run();
PipelineResult r = p.run();

// Test number of column families and column family name equality
Table table = getTable(tableId);
Expand All @@ -352,6 +356,7 @@ public void failureTest(int numRows, DoFn<Long, KV<ByteString, Iterable<Mutation
// Test table data equality
List<KV<ByteString, ByteString>> tableData = getTableData(tableId);
assertEquals(998, tableData.size());
checkLineageSinkMetric(r, tableId);
}

@After
Expand Down Expand Up @@ -412,4 +417,13 @@ private void deleteTable(String tableId) {
tableAdminClient.deleteTable(tableId);
}
}

private void checkLineageSinkMetric(PipelineResult r, String tableId) {
// Only check lineage metrics on direct runner until Dataflow runner v2 supported report back
if (options.getRunner().getName().contains("DirectRunner")) {
assertThat(
Lineage.query(r.metrics(), Lineage.Type.SINK),
hasItem(String.format("bigtable:%s.%s.%s", project, options.getInstanceId(), tableId)));
}
}
}

0 comments on commit 5ab908b

Please sign in to comment.