diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index 8f7898fc4285..2e26d13da547 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -32,6 +32,8 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.util.common.Reiterator; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; @@ -73,6 +75,10 @@ public class CoGbkResult { private static final Logger LOG = LoggerFactory.getLogger(CoGbkResult.class); + private Counter keyCount = Metrics.counter(CoGbkResult.class, "cogbk-keys"); + + private Counter largeKeyCount = Metrics.counter(CoGbkResult.class, "cogbk-large-keys"); + /** * A row in the {@link PCollection} resulting from a {@link CoGroupByKey} transform. Currently, * this row must fit into memory. @@ -91,6 +97,7 @@ public CoGbkResult( int inMemoryElementCount, int minElementsPerTag) { this.schema = schema; + keyCount.inc(); List> valuesByTag = new ArrayList<>(); for (int unionTag = 0; unionTag < schema.size(); unionTag++) { valuesByTag.add(new ArrayList<>()); @@ -103,6 +110,7 @@ public CoGbkResult( while (taggedIter.hasNext()) { if (elementCount++ >= inMemoryElementCount) { // Let the tails be lazy. + largeKeyCount.inc(); break; } RawUnionValue value = taggedIter.next(); @@ -636,6 +644,10 @@ void offer(ObservingReiterator tail) { } void finish() { + Metrics.counter( + CoGbkResult.class, + this.tail == null ? "cogbk-small-iterables" : "cogbk-large-iterables") + .inc(); finished = true; } @@ -838,8 +850,11 @@ protected Object computeNext() { // We got to the end of the iterable, update the shared set of values with those sets that // were small enough to cache. if (!sharedSeenEnd[0]) { + Counter smallIterablesCount = Metrics.counter(CoGbkResult.class, "cogbk-small-iterables"); + Counter largeIterablesCount = Metrics.counter(CoGbkResult.class, "cogbk-large-iterables"); for (int i = 0; i < sharedValueMap.size(); i++) { List localValues = localValueMap.get(i); + (localValues == null ? largeIterablesCount : smallIterablesCount).inc(); sharedValueMap.set( i, localValues != null ? localValues : simpleFilteringIterable(taggedIterable, i)); }