diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java index 95317b37ff1..2665e49e06a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core.metrics; +import com.google.common.annotations.VisibleForTesting; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -44,8 +45,8 @@ * intentional for performance concerns. Hence, this class does not expose the contained node and * should not be modified to do so in future when used with multiple threads. This class choose to * achieve thread-safety through locks rather than just creating and returning immutable instances - * to its caller because the combining of a large and wide trie require per-node copy which has - * exponential cost and more expensive than synchronization. + * to its caller because the combining of a large and wide trie require per-node copy which is + * more expensive than synchronization. * *

Note: {@link #equals(Object)}, {@link #hashCode()} of this class are not synchronized and if * their usage needs synchronization then the client should do it. @@ -197,28 +198,36 @@ public synchronized void add(Iterable segments) { * @param other The other {@link BoundedTrieData} to combine with. * @return The combined {@link BoundedTrieData}. */ - public synchronized BoundedTrieData combine(@Nonnull BoundedTrieData other) { - if (other.root == null && other.singleton == null) { - return this; - } + public BoundedTrieData combine(@Nonnull BoundedTrieData other) { + BoundedTrieData otherDeepCopy; // other can be modified in some different thread, and we need to atomically access - // its fields to combine correctly. Furthermore, simply doing this under synchronized(other) - // is not safe as it might lead to deadlock. Assume the current thread got lock on - // 'this' and is executing combine with `other` and waiting to get a lock on `other` - // while some other thread is performing `other.combiner(this)` and waiting to get a - // lock on `this` object. - BoundedTrieData otherDeepCopy = other.getCumulative(); - if (this.root == null && this.singleton == null) { - return otherDeepCopy; + // its fields to combine correctly. Furthermore, doing this whole method under + // synchronized(other) is not safe as it might lead to deadlock. Assume the current + // thread got lock on 'this' and is executing combine with `other` and waiting to get a + // lock on `other` while some other thread is performing `other.combiner(this)` and + // waiting to get a lock on `this` object. + // Here it is safe to get a lock on other as we don't yet hold a lock on this to end up with + // race condition. + synchronized (other) { + if (other.root == null && other.singleton == null) { + return this; + } + otherDeepCopy = other.getCumulative(); } - otherDeepCopy.root = otherDeepCopy.asTrie(); - otherDeepCopy.singleton = null; - otherDeepCopy.root.merge(this.asTrie()); - otherDeepCopy.bound = Math.min(this.bound, otherDeepCopy.bound); - while (otherDeepCopy.root.getSize() > otherDeepCopy.bound) { - otherDeepCopy.root.trim(); + + synchronized (this) { + if (this.root == null && this.singleton == null) { + return otherDeepCopy; + } + otherDeepCopy.root = otherDeepCopy.asTrie(); + otherDeepCopy.singleton = null; + otherDeepCopy.root.merge(this.asTrie()); + otherDeepCopy.bound = Math.min(this.bound, otherDeepCopy.bound); + while (otherDeepCopy.root.getSize() > otherDeepCopy.bound) { + otherDeepCopy.root.trim(); + } + return otherDeepCopy; } - return otherDeepCopy; } /** @@ -287,6 +296,7 @@ public final String toString() { * of the tree itself. This avoids acquiring and release N nodes in a path. This class is not * intended to be used directly outside of {@link BoundedTrieData} with multiple threads. */ + @VisibleForTesting static class BoundedTrieNode implements Serializable { public static final String TRUNCATED_TRUE = String.valueOf(true); @@ -375,6 +385,7 @@ int add(List segments) { * @param segmentsIter An iterator over the paths to add. * @return The total change in the size of the subtree rooted at this node. */ + @VisibleForTesting int addAll(List> segmentsIter) { return segmentsIter.stream().mapToInt(this::add).sum(); } diff --git a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java index 4f6f4156887..66a3b6bfac7 100644 --- a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java +++ b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java @@ -166,7 +166,7 @@ private static Iterable> extractBoundedTrieMetri return monitoringInfoList.stream() .filter(item -> BOUNDED_TRIE_TYPE.equals(item.getType())) .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null) - .map(PortableMetrics::convertBoundedTrieMonitoringInfoToStringSet) + .map(PortableMetrics::convertBoundedTrieMonitoringInfoToBoundedTrie) .collect(Collectors.toList()); } @@ -183,7 +183,7 @@ private static MetricResult convertStringSetMonitoringInfoToStr return MetricResult.create(key, false, result); } - private static MetricResult convertBoundedTrieMonitoringInfoToStringSet( + private static MetricResult convertBoundedTrieMonitoringInfoToBoundedTrie( MetricsApi.MonitoringInfo monitoringInfo) { Map labelsMap = monitoringInfo.getLabelsMap(); MetricKey key =