Skip to content

Commit

Permalink
Add some metrics for CoGBK optimization.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Apr 15, 2024
1 parent c165f8a commit 9f92a63
Showing 1 changed file with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.util.common.Reiterator;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
Expand Down Expand Up @@ -73,6 +75,10 @@ public class CoGbkResult {

private static final Logger LOG = LoggerFactory.getLogger(CoGbkResult.class);

private Counter keyCount = Metrics.counter(CoGbkResult.class, "cogbk-keys");

private Counter largeKeyCount = Metrics.counter(CoGbkResult.class, "cogbk-large-keys");

/**
* A row in the {@link PCollection} resulting from a {@link CoGroupByKey} transform. Currently,
* this row must fit into memory.
Expand All @@ -91,6 +97,7 @@ public CoGbkResult(
int inMemoryElementCount,
int minElementsPerTag) {
this.schema = schema;
keyCount.inc();
List<List<Object>> valuesByTag = new ArrayList<>();
for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
valuesByTag.add(new ArrayList<>());
Expand All @@ -103,6 +110,7 @@ public CoGbkResult(
while (taggedIter.hasNext()) {
if (elementCount++ >= inMemoryElementCount) {
// Let the tails be lazy.
largeKeyCount.inc();
break;
}
RawUnionValue value = taggedIter.next();
Expand Down Expand Up @@ -617,6 +625,12 @@ private static class TagIterable<T> implements Iterable<T> {
Reiterator<RawUnionValue> tail;
boolean finished;

private Counter smallIterablesCount =
Metrics.counter(CoGbkResult.class, "cogbk-small-iterables");

private Counter largeIterablesCount =
Metrics.counter(CoGbkResult.class, "cogbk-large-iterables");

public TagIterable(
List<T> head, int tag, int cacheSize, ObservingReiterator<RawUnionValue> tip) {
this.tag = tag;
Expand All @@ -636,6 +650,10 @@ void offer(ObservingReiterator<RawUnionValue> tail) {
}

void finish() {
Metrics.counter(
CoGbkResult.class,
this.tail == null ? "cogbk-small-iterables" : "cogbk-large-iterables")
.inc();
finished = true;
}

Expand Down Expand Up @@ -838,8 +856,11 @@ protected Object computeNext() {
// We got to the end of the iterable, update the shared set of values with those sets that
// were small enough to cache.
if (!sharedSeenEnd[0]) {
Counter smallIterablesCount = Metrics.counter(CoGbkResult.class, "cogbk-small-iterables");
Counter largeIterablesCount = Metrics.counter(CoGbkResult.class, "cogbk-large-iterables");
for (int i = 0; i < sharedValueMap.size(); i++) {
List<Object> localValues = localValueMap.get(i);
(localValues == null ? largeIterablesCount : smallIterablesCount).inc();
sharedValueMap.set(
i, localValues != null ? localValues : simpleFilteringIterable(taggedIterable, i));
}
Expand Down

0 comments on commit 9f92a63

Please sign in to comment.