From 9f92a63db65c5f155f79c5cb4671d71ebf2277e3 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 15 Apr 2024 15:08:25 -0700 Subject: [PATCH 1/2] Add some metrics for CoGBK optimization. --- .../beam/sdk/transforms/join/CoGbkResult.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index 8f7898fc4285..482018e198d5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -32,6 +32,8 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.util.common.Reiterator; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; @@ -73,6 +75,10 @@ public class CoGbkResult { private static final Logger LOG = LoggerFactory.getLogger(CoGbkResult.class); + private Counter keyCount = Metrics.counter(CoGbkResult.class, "cogbk-keys"); + + private Counter largeKeyCount = Metrics.counter(CoGbkResult.class, "cogbk-large-keys"); + /** * A row in the {@link PCollection} resulting from a {@link CoGroupByKey} transform. Currently, * this row must fit into memory. @@ -91,6 +97,7 @@ public CoGbkResult( int inMemoryElementCount, int minElementsPerTag) { this.schema = schema; + keyCount.inc(); List> valuesByTag = new ArrayList<>(); for (int unionTag = 0; unionTag < schema.size(); unionTag++) { valuesByTag.add(new ArrayList<>()); @@ -103,6 +110,7 @@ public CoGbkResult( while (taggedIter.hasNext()) { if (elementCount++ >= inMemoryElementCount) { // Let the tails be lazy. + largeKeyCount.inc(); break; } RawUnionValue value = taggedIter.next(); @@ -617,6 +625,12 @@ private static class TagIterable implements Iterable { Reiterator tail; boolean finished; + private Counter smallIterablesCount = + Metrics.counter(CoGbkResult.class, "cogbk-small-iterables"); + + private Counter largeIterablesCount = + Metrics.counter(CoGbkResult.class, "cogbk-large-iterables"); + public TagIterable( List head, int tag, int cacheSize, ObservingReiterator tip) { this.tag = tag; @@ -636,6 +650,10 @@ void offer(ObservingReiterator tail) { } void finish() { + Metrics.counter( + CoGbkResult.class, + this.tail == null ? "cogbk-small-iterables" : "cogbk-large-iterables") + .inc(); finished = true; } @@ -838,8 +856,11 @@ protected Object computeNext() { // We got to the end of the iterable, update the shared set of values with those sets that // were small enough to cache. if (!sharedSeenEnd[0]) { + Counter smallIterablesCount = Metrics.counter(CoGbkResult.class, "cogbk-small-iterables"); + Counter largeIterablesCount = Metrics.counter(CoGbkResult.class, "cogbk-large-iterables"); for (int i = 0; i < sharedValueMap.size(); i++) { List localValues = localValueMap.get(i); + (localValues == null ? largeIterablesCount : smallIterablesCount).inc(); sharedValueMap.set( i, localValues != null ? localValues : simpleFilteringIterable(taggedIterable, i)); } From 91fe0c3dcdf25c6952f9dcf227f1fcf5f6113abf Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 16 Apr 2024 10:33:36 -0700 Subject: [PATCH 2/2] cleanup --- .../org/apache/beam/sdk/transforms/join/CoGbkResult.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java index 482018e198d5..2e26d13da547 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java @@ -625,12 +625,6 @@ private static class TagIterable implements Iterable { Reiterator tail; boolean finished; - private Counter smallIterablesCount = - Metrics.counter(CoGbkResult.class, "cogbk-small-iterables"); - - private Counter largeIterablesCount = - Metrics.counter(CoGbkResult.class, "cogbk-large-iterables"); - public TagIterable( List head, int tag, int cacheSize, ObservingReiterator tip) { this.tag = tag;