diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java index d3e8976d20c8..65c6f6a765d1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java @@ -19,10 +19,8 @@ import java.util.Arrays; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.metrics.BoundedTrie; import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricsContainer; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -33,36 +31,29 @@ * In that case retrieving the underlying cell and reporting directly to it avoids a step of * indirection. */ +// TODO: Write multi-threaded test in MetricContainerImp for this Cell class too. public class BoundedTrieCell implements BoundedTrie, MetricCell { private final DirtyState dirty = new DirtyState(); - private final AtomicReference setValue = - new AtomicReference<>(BoundedTrieData.empty()); + private final BoundedTrieData value; private final MetricName name; - /** - * Generally, runners should construct instances using the methods in {@link - * MetricsContainerImpl}, unless they need to define their own version of {@link - * MetricsContainer}. These constructors are *only* public so runners can instantiate. - */ public BoundedTrieCell(MetricName name) { this.name = name; + this.value = new BoundedTrieData(); + } + + public void update(BoundedTrieCell other) { + this.value.combine(other.value); + dirty.afterModification(); } @Override public void reset() { - setValue.set(BoundedTrieData.empty()); + value.clear(); dirty.reset(); } - void update(BoundedTrieData data) { - BoundedTrieData original; - do { - original = setValue.get(); - } while (!setValue.compareAndSet(original, original.combine(data))); - dirty.afterModification(); - } - @Override public DirtyState getDirty() { return dirty; @@ -70,7 +61,7 @@ public DirtyState getDirty() { @Override public BoundedTrieData getCumulative() { - return setValue.get(); + return value.getCumulative(); } @Override @@ -83,7 +74,7 @@ public boolean equals(@Nullable Object object) { if (object instanceof BoundedTrieCell) { BoundedTrieCell boundedTrieCell = (BoundedTrieCell) object; return Objects.equals(dirty, boundedTrieCell.dirty) - && Objects.equals(setValue.get(), boundedTrieCell.setValue.get()) + && Objects.equals(value, boundedTrieCell.value) && Objects.equals(name, boundedTrieCell.name); } return false; @@ -91,15 +82,12 @@ public boolean equals(@Nullable Object object) { @Override public int hashCode() { - return Objects.hash(dirty, setValue.get(), name); + return Objects.hash(dirty, value, name); } @Override public void add(Iterable values) { - BoundedTrieData original; - do { - original = setValue.get(); - } while (!setValue.compareAndSet(original, original.add(values))); + this.value.add(values); dirty.afterModification(); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java index e8c7375924cc..6e2b722675a1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java @@ -17,101 +17,99 @@ */ package org.apache.beam.runners.core.metrics; -import com.google.auto.value.AutoValue; -import com.google.common.base.Preconditions; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie; -import org.apache.beam.sdk.metrics.BoundedTrieResult; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; /** - * Represents data stored in a bounded trie. This data structure is used to efficiently store and + * Experimental and subject to incompatible changes, or even removal, in a future releases. + * + *

Represents data stored in a bounded trie. This data structure is used to efficiently store and * aggregate a collection of string sequences, paths/FQN with a limited size. * - *

The trie can be in one of two states: + *

This class is thread-safe but the underlying BoundedTrieNode contained on it isn't. This is + * intentional for performance concerns. Hence, this class does not expose the contained node and + * should not be modified to do so in future when used with multiple threads. This class choose to + * achieve thread-safety through locks rather than just creating and returning immutable instances + * to its caller because the combining of a large and wide trie require per-node copy which has + * exponential cost and more expensive than synchronization. * - *

    - *
  • **Singleton:** Contains a single path. - *
  • **Trie:** Contains a {@link BoundedTrieNode} representing the root of the trie. - *
+ *

Note: {@link #equals(Object)}, {@link #hashCode()} of this class are not synchronized and if + * their usage needs synchronization then the client should do it. */ -@AutoValue -public abstract class BoundedTrieData implements Serializable { +@Internal +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class BoundedTrieData implements Serializable { private static final int DEFAULT_BOUND = 100; // Default maximum size of the trie - /** - * Returns an {@link Optional} containing the singleton path if this {@link BoundedTrieData} - * represents a single path. - */ - public abstract Optional> singleton(); + /** Returns the singleton path if this {@link BoundedTrieData} represents a single path. */ + private List singleton; /** * Returns an {@link Optional} containing the root {@link BoundedTrieNode} if this {@link * BoundedTrieData} represents a trie. */ - public abstract Optional root(); + private BoundedTrieNode root; /** Returns the maximum size of the trie. */ - public abstract int bound(); + private int bound; - /** - * Creates a {@link BoundedTrieData} instance. - * - * @param singleton The singleton path (optional). - * @param root The root node of the trie (optional). - * @param bound The maximum size of the trie. - * @throws IllegalArgumentException If both or neither of {@code singleton} and {@code root} are - * specified or both are null, or if {@code bound} is less than 1. - */ - public static BoundedTrieData create( - @Nullable List singleton, @Nullable BoundedTrieNode root, int bound) { - Preconditions.checkArgument( - (singleton == null ^ root == null), - "Either and only one of singleton or root must be specified."); - Preconditions.checkArgument(bound >= 1, "Bound must be at least 1."); - return new AutoValue_BoundedTrieData( - Optional.ofNullable(singleton), Optional.ofNullable(root), bound); + public BoundedTrieData() { + this(null, null, DEFAULT_BOUND); } - /** - * Creates a {@link BoundedTrieData} instance from a {@link BoundedTrieNode} with the default - * bound. - * - * @param root The root node of the trie. - */ - public static BoundedTrieData create(@Nonnull BoundedTrieNode root) { - return create(null, root, DEFAULT_BOUND); + public BoundedTrieData(List singleton) { + this(singleton, null, DEFAULT_BOUND); + } + + public BoundedTrieData(BoundedTrieNode root) { + this(null, root, DEFAULT_BOUND); } /** - * Creates a {@link BoundedTrieData} instance from a singleton path with the default bound. + * Constructs a new BoundedTrieData object. + * + *

A BoundedTrieData object represents the data stored in a bounded trie. It can either be a + * singleton list of strings, or a trie with a given root and bound. * - * @param singleton The singleton path. + * @param singleton the singleton list of strings, or null if the data is a trie + * @param root the root of the trie, or null if the data is a singleton list + * @param bound the maximum number of elements allowed in the trie + * @throws IllegalArgumentException if both {@code singleton} and {@code root} are non-null */ - public static BoundedTrieData create(@Nonnull List singleton) { - return create(singleton, null, DEFAULT_BOUND); + public BoundedTrieData(List singleton, BoundedTrieNode root, int bound) { + assert singleton == null || root == null; + this.singleton = singleton; + this.root = root; + this.bound = bound; } /** Converts this {@link BoundedTrieData} to its proto {@link BoundedTrie}. */ - public BoundedTrie toProto() { + public synchronized BoundedTrie toProto() { BoundedTrie.Builder builder = BoundedTrie.newBuilder(); - builder.setBound(bound()); - singleton().ifPresent(builder::addAllSingleton); - root().ifPresent(r -> builder.setRoot(r.toProto())); + builder.setBound(this.bound); + if (this.singleton != null) { + builder.addAllSingleton(this.singleton); + } + if (this.root != null) { + builder.setRoot(this.root.toProto()); + } return builder.build(); } @@ -119,82 +117,104 @@ public BoundedTrie toProto() { public static BoundedTrieData fromProto(BoundedTrie proto) { List singleton = proto.getSingletonList(); BoundedTrieNode root = proto.hasRoot() ? BoundedTrieNode.fromProto(proto.getRoot()) : null; - return create(singleton, root, proto.getBound()); + return new BoundedTrieData(singleton, root, proto.getBound()); } /** Returns this {@link BoundedTrieData} as a {@link BoundedTrieNode}. */ - public BoundedTrieNode asTrie() { - return root() - .orElseGet( - () -> { - BoundedTrieNode newRoot = new BoundedTrieNode(); - singleton().ifPresent(newRoot::add); - return newRoot; - }); + @Nonnull + private synchronized BoundedTrieNode asTrie() { + if (this.root != null) { + return this.root; + } else { + BoundedTrieNode trieNode = new BoundedTrieNode(); + if (this.singleton != null) { + trieNode.add(this.singleton); + } + return trieNode; + } } /** Returns a new {@link BoundedTrieData} instance that is a deep copy of this instance. */ - public BoundedTrieData getCumulative() { - return root().isPresent() - ? create(null, new BoundedTrieNode(root().get()), bound()) - : create(singleton().get(), null, bound()); + public synchronized BoundedTrieData getCumulative() { + List singleton = this.singleton == null ? null : new ArrayList<>(this.singleton); + // deep copy + BoundedTrieNode root = this.root == null ? null : this.root.deepCopy(); + return new BoundedTrieData(singleton, root, this.bound); } - /** Extracts the data from this {@link BoundedTrieData} as a {@link BoundedTrieResult}. */ - public BoundedTrieResult getBoundedTrieResult() { - if (root().isPresent()) { - return BoundedTrieResult.create(new HashSet<>(root().get().flattened())); - } else if (singleton().isPresent()) { - List list = new ArrayList<>(singleton().get()); - list.add(String.valueOf(false)); - return BoundedTrieResult.create(ImmutableSet.of(list)); + /** + * Returns an immutable set of lists, where each list represents a path in the bounded trie. The + * last element in each path is a boolean in string representation denoting whether this path was + * truncated. i.e. <["a", "b", "false"], ["c", "true"]> + * + * @return The set of paths. + */ + public synchronized Set> getResult() { + if (this.root == null) { + if (this.singleton == null) { + return ImmutableSet.of(); + } else { + List list = new ArrayList<>(this.singleton); + list.add(String.valueOf(false)); + return ImmutableSet.of(list); + } } else { - return BoundedTrieResult.empty(); + return ImmutableSet.copyOf(this.root.flattened()); } } /** - * Adds a new path to this {@link BoundedTrieData}. + * Adds a new path to this {@link BoundedTrieData} and hence the {@link BoundedTrieData} is + * modified. * * @param segments The path to add. - * @return A new {@link BoundedTrieData} instance with the added path. */ - public BoundedTrieData add(Iterable segments) { + public synchronized void add(Iterable segments) { List segmentsParts = ImmutableList.copyOf(segments); - if (root().isPresent() && singleton().isPresent()) { - return create(segmentsParts, null, bound()); - } else if (singleton().isPresent() && singleton().get().equals(segmentsParts)) { - return this; // Optimize for re-adding the same value. - } else { - BoundedTrieNode newRoot = new BoundedTrieNode(asTrie()); - newRoot.add(segmentsParts); - if (newRoot.getSize() > bound()) { - newRoot.trim(); + if (this.root == null) { + if (this.singleton == null || !this.singleton.equals(segmentsParts)) { + this.root = this.asTrie(); + this.singleton = null; + } + } + + if (this.root != null) { + this.root.add(segmentsParts); + if (this.root.getSize() > this.bound) { + this.root.trim(); } - return create(null, newRoot, bound()); } } /** - * Combines this {@link BoundedTrieData} with another {@link BoundedTrieData}. + * Combines this {@link BoundedTrieData} with another {@link BoundedTrieData} by doing a deep + * copy. * * @param other The other {@link BoundedTrieData} to combine with. - * @return A new {@link BoundedTrieData} instance representing the combined data. */ - public BoundedTrieData combine(BoundedTrieData other) { - if (root().isPresent() && singleton().isPresent()) { - return other; - } else if (other.root().isPresent() && other.singleton().isPresent()) { - return this; - } else { - BoundedTrieNode combined = new BoundedTrieNode(asTrie()); - combined.merge(other.asTrie()); - int bound = Math.min(this.bound(), other.bound()); - while (combined.getSize() > bound) { - combined.trim(); + public synchronized void combine(@Nonnull BoundedTrieData other) { + if (other.root == null && other.singleton == null) { + return; + } + // other can be modified in some different thread, and we need to atomically access + // its fields to combine correctly. Furthermore, simply doing this under synchronized(other) + // is not safe as it might lead to deadlock. Assume the current thread got lock on + // 'this' and is executing combine with `other` and waiting to get a lock on `other` + // while some other thread is performing `other.combiner(this)` and waiting to get a + // lock on `this` object. + BoundedTrieData otherDeepCopy = other.getCumulative(); + if (this.root != null || this.singleton != null) { + otherDeepCopy.root = otherDeepCopy.asTrie(); + otherDeepCopy.singleton = null; + otherDeepCopy.root.merge(this.asTrie()); + otherDeepCopy.bound = Math.min(this.bound, otherDeepCopy.bound); + while (otherDeepCopy.root.getSize() > otherDeepCopy.bound) { + otherDeepCopy.root.trim(); } - return create(null, combined, bound); } + this.root = otherDeepCopy.root; + this.singleton = otherDeepCopy.singleton; + this.bound = otherDeepCopy.bound; } /** @@ -202,43 +222,44 @@ public BoundedTrieData combine(BoundedTrieData other) { * * @return The size of the trie. */ - public int size() { - if (singleton().isPresent()) { + public synchronized int size() { + if (this.singleton != null) { return 1; - } else if (root().isPresent()) { - return root().get().getSize(); + } else if (this.root != null) { + return root.getSize(); } else { return 0; } } + public synchronized void clear() { + this.root = null; + this.singleton = null; + this.bound = DEFAULT_BOUND; + } + /** * Checks if the trie contains the given path. * * @param value The path to check. * @return True if the trie contains the path, false otherwise. */ - public boolean contains(List value) { - if (singleton().isPresent()) { - return value.equals(singleton().get()); - } else if (root().isPresent()) { - return root().get().contains(value); + public synchronized boolean contains(@Nonnull List value) { + if (this.singleton != null) { + return value.equals(this.singleton); + } else if (this.root != null) { + return this.root.contains(value); } else { return false; } } - /** Returns an empty {@link BoundedTrieData} instance. */ - public static BoundedTrieData empty() { - return EmptyBoundedTrieData.INSTANCE; - } - @Override public final boolean equals(@Nullable Object other) { if (this == other) { return true; } - if (other == null || getClass() != other.getClass()) { + if (other == null || this.getClass() != other.getClass()) { return false; } BoundedTrieData that = (BoundedTrieData) other; @@ -247,67 +268,31 @@ public final boolean equals(@Nullable Object other) { @Override public final int hashCode() { - return asTrie().hashCode(); + return this.asTrie().hashCode(); } @Override public final String toString() { - return "BoundedTrieData(" + asTrie() + ")"; + return "BoundedTrieData(" + this.asTrie() + ")"; } - // ---------------------------- EmptyBoundedTrieData Implementation --------------------------- + // ------------------------------ BoundedTrieNode Implementation ------------------------------ + /** - * An immutable implementation of {@link BoundedTrieData} representing an empty trie. This class - * provides a singleton instance for efficiency. + * BoundedTrieNode implementation. This class is not thread-safe and relies on the {@link + * BoundedTrieData} which uses this class to ensure thread-safety by acquiring a lock on the root + * of the tree itself. This avoids acquiring and release N nodes in a path. This class is not + * intended to be used directly outside of {@link BoundedTrieData} with multiple threads. */ - public static class EmptyBoundedTrieData extends BoundedTrieData { - - private static final EmptyBoundedTrieData INSTANCE = new EmptyBoundedTrieData(); - private static final int DEFAULT_BOUND = 1; // Define the default bound here - - private EmptyBoundedTrieData() {} - - /** - * Returns an {@link Optional} containing an empty list of strings, representing the singleton - * path in an empty trie. - */ - @Override - public Optional> singleton() { - return Optional.of(ImmutableList.of()); - } - - /** - * Returns an {@link Optional} containing an empty {@link BoundedTrieNode} representing the root - * of an empty trie. - */ - @Override - public Optional root() { - return Optional.of(new BoundedTrieNode(ImmutableMap.of(), false, DEFAULT_BOUND)); - } - - /** Returns the default bound for the empty trie. */ - @Override - public int bound() { - return DEFAULT_BOUND; - } - - /** - * Returns an empty {@link BoundedTrieResult}. This represents the result of extracting data - * from an empty trie. - */ - @Override - public BoundedTrieResult getBoundedTrieResult() { - return BoundedTrieResult.empty(); - } - } + static class BoundedTrieNode implements Serializable { - // ------------------------------ BoundedTrieNode Implementation ------------------------------ - protected static class BoundedTrieNode implements Serializable { + public static final String TRUNCATED_TRUE = String.valueOf(true); + public static final String TRUNCATED_FALSE = String.valueOf(false); /** * A map from strings to child nodes. Each key represents a segment of a path/FQN, and the * corresponding value represents the subtree rooted at that segment. */ - @Nonnull private Map children; + private Map children; /** * A flag indicating whether this node has been truncated. A truncated node represents an @@ -333,23 +318,25 @@ protected static class BoundedTrieNode implements Serializable { * @param truncated Whether this node is truncated. * @param size The size of the subtree rooted at this node. */ - BoundedTrieNode(@Nonnull Map children, boolean truncated, int size) { + BoundedTrieNode(Map children, boolean truncated, int size) { this.children = children; this.size = size; this.truncated = truncated; } /** - * Constructs a deep copy of the given `BoundedTrieNode`. + * Constructs a deep copy of this `BoundedTrieNode`. * - * @param other The node to copy. + * @return A deep copy of this node. */ - public BoundedTrieNode(BoundedTrieNode other) { - this.truncated = other.truncated; - this.size = other.size; - this.children = new HashMap<>(); + BoundedTrieNode deepCopy() { + BoundedTrieNode copyNode = new BoundedTrieNode(); + copyNode.truncated = this.truncated; + copyNode.size = this.size; + copyNode.children = new HashMap<>(); // deep copy - other.children.forEach((key, value) -> children.put(key, new BoundedTrieNode(value))); + this.children.forEach((key, value) -> copyNode.children.put(key, value.deepCopy())); + return copyNode; } /** @@ -358,7 +345,7 @@ public BoundedTrieNode(BoundedTrieNode other) { * @param segments The segments of the path to add. * @return The change in the size of the subtree rooted at this node. */ - public int add(List segments) { + int add(List segments) { if (truncated || segments.isEmpty()) { return 0; } @@ -385,7 +372,7 @@ public int add(List segments) { * @param segmentsIter An iterator over the paths to add. * @return The total change in the size of the subtree rooted at this node. */ - public int addAll(List> segmentsIter) { + int addAll(List> segmentsIter) { return segmentsIter.stream().mapToInt(this::add).sum(); } @@ -394,7 +381,7 @@ public int addAll(List> segmentsIter) { * * @return The change in the size of the subtree rooted at this node. */ - public int trim() { + int trim() { if (children.isEmpty()) { return 0; } @@ -413,12 +400,13 @@ public int trim() { } /** - * Merges the given `BoundedTrieNode` into this node. + * Merges the given `BoundedTrieNode` into this node and as a result this node is changed. * * @param other The node to merge. * @return The change in the size of the subtree rooted at this node. */ - public int merge(BoundedTrieNode other) { + // TODO: merge has bug where size is incorrect when a mege is done on empty node + int merge(BoundedTrieNode other) { if (truncated) { return 0; } @@ -433,8 +421,9 @@ public int merge(BoundedTrieNode other) { return 0; } if (children.isEmpty()) { - children = new HashMap<>(other.children); - int delta = other.size - size; + children = new HashMap<>(); + children.putAll(other.children); + int delta = this.size - other.size; size += delta; return delta; } @@ -464,12 +453,12 @@ public int merge(BoundedTrieNode other) { * * @return The flattened representation of this trie. */ - public List> flattened() { + List> flattened() { List> result = new ArrayList<>(); if (truncated) { - result.add(Collections.singletonList(String.valueOf(true))); + result.add(Collections.singletonList(TRUNCATED_TRUE)); } else if (children.isEmpty()) { - result.add(Collections.singletonList(String.valueOf(false))); + result.add(Collections.singletonList(TRUNCATED_FALSE)); } else { List prefixes = new ArrayList<>(children.keySet()); Collections.sort(prefixes); @@ -494,7 +483,7 @@ public List> flattened() { * @return The {@link org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode} * representation of this node. */ - public org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode toProto() { + org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode toProto() { org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode.Builder builder = org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode.newBuilder(); builder.setTruncated(truncated); @@ -509,7 +498,7 @@ public org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode toProto() { * representation of the node. * @return The corresponding `BoundedTrieNode`. */ - public static BoundedTrieNode fromProto( + static BoundedTrieNode fromProto( org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode proto) { BoundedTrieNode node = new BoundedTrieNode(); if (proto.getTruncated()) { @@ -532,7 +521,7 @@ public static BoundedTrieNode fromProto( * @param segments The segments of the path to check. * @return True if the trie contains the path, false otherwise. */ - public boolean contains(List segments) { + boolean contains(List segments) { if (truncated || segments.isEmpty()) { return true; } @@ -546,7 +535,7 @@ public boolean contains(List segments) { * * @return The size of the subtree. */ - public int getSize() { + int getSize() { return size; } @@ -555,7 +544,7 @@ public int getSize() { * * @return Whether this node is truncated. */ - public boolean isTruncated() { + boolean isTruncated() { return truncated; } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieCellTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieCellTest.java new file mode 100644 index 000000000000..cbdc81e26521 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieCellTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.metrics; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.junit.Test; + +/** Tests for {@link BoundedTrieCell}. */ +public class BoundedTrieCellTest { + private final BoundedTrieCell cell = new BoundedTrieCell(MetricName.named("lineage", "sources")); + + @Test + public void testDeltaAndCumulative() { + cell.add("a"); + cell.add("b", "c"); + cell.add("b", "d"); + cell.add("a", "a"); + BoundedTrieData cumulative = cell.getCumulative(); + assertEquals( + ImmutableSet.of( + Arrays.asList("a", "a", String.valueOf(false)), + Arrays.asList("b", "d", String.valueOf(false)), + Arrays.asList("b", "c", String.valueOf(false))), + cumulative.getResult()); + + assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + cell.getDirty().afterCommit(); + assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + + // adding new value changes the cell + cell.add("b", "a"); + BoundedTrieData newCumulative = cell.getCumulative(); + assertEquals( + newCumulative.getResult(), + ImmutableSet.of( + Arrays.asList("a", "a", String.valueOf(false)), + Arrays.asList("b", "d", String.valueOf(false)), + Arrays.asList("b", "c", String.valueOf(false)), + Arrays.asList("b", "a", String.valueOf(false)))); + + // but not previously obtained cumulative value + assertEquals( + cumulative.getResult(), + ImmutableSet.of( + Arrays.asList("a", "a", String.valueOf(false)), + Arrays.asList("b", "d", String.valueOf(false)), + Arrays.asList("b", "c", String.valueOf(false)))); + + assertThat( + "Adding a new value made the cell dirty", cell.getDirty().beforeCommit(), equalTo(true)); + } + // TODO:Add more tests +} diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java index 683976303663..0aa28327bc56 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java @@ -21,8 +21,18 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.beam.runners.core.metrics.BoundedTrieData.BoundedTrieNode; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -186,13 +196,7 @@ private void runCoversTest(List flattened, List expected, int ma assertCoversFlattened(parsedFlattened, parsedExpected, maxTruncated); } - /** - * Runs a test case for the {@link BoundedTrieNode} class. - * - * @param toAdd The segments to add to the {@link BoundedTrieNode}. - */ - private void runTest(List> toAdd) { - Set> everything = new HashSet<>(toAdd); + private Set> everythingDeduped(Set> everything) { Set> allPrefixes = new HashSet<>(); for (List segments : everything) { for (int i = 0; i < segments.size(); i++) { @@ -201,6 +205,17 @@ private void runTest(List> toAdd) { } Set> everythingDeduped = new HashSet<>(everything); everythingDeduped.removeAll(allPrefixes); + return everythingDeduped; + } + + /** + * Runs a test case for the {@link BoundedTrieNode} class. + * + * @param toAdd The segments to add to the {@link BoundedTrieNode}. + */ + private void runTest(List> toAdd) { + Set> everything = new HashSet<>(toAdd); + Set> everythingDeduped = everythingDeduped(everything); // Test basic addition. BoundedTrieNode node = new BoundedTrieNode(); @@ -250,7 +265,7 @@ private void runTest(List> toAdd) { } // Verify adding after trimming is a no-op. - BoundedTrieNode nodeCopy = new BoundedTrieNode(node); + BoundedTrieNode nodeCopy = node.deepCopy(); for (List segments : everything) { assertEquals(0, node.add(segments)); } @@ -274,8 +289,19 @@ private void runTest(List> toAdd) { // Test merging new values. BoundedTrieNode newValuesNode = new BoundedTrieNode(); newValuesNode.addAll(newValues); + assertCovers(newValuesNode, new HashSet<>(newValues), 0); assertEquals(expectedDelta, nodeCopy.merge(newValuesNode)); assertCovers(nodeCopy, expectedWithNewValues, 2); + // adding after merge should not change previous node on which this was merged + List additionalValue = Arrays.asList("new3", "new3.1"); + expectedDelta = newValuesNode.isTruncated() ? 0 : 1; + assertEquals(expectedDelta, newValuesNode.add(additionalValue)); + // previous node on which the merge was done should have remained same + assertCovers(nodeCopy, expectedWithNewValues, 2); + // the newValuesNode should have changed + Set> updatedNewValues = new HashSet<>(newValues); + updatedNewValues.add(additionalValue); + assertCovers(newValuesNode, updatedNewValues, 0); } /** @@ -373,37 +399,42 @@ public void testCoversTruncated() { @Test public void testBoundedTrieDataCombine() { - BoundedTrieData empty = BoundedTrieData.empty(); - BoundedTrieData singletonA = BoundedTrieData.create(ImmutableList.of("a", "a")); - BoundedTrieData singletonB = BoundedTrieData.create(ImmutableList.of("b", "b")); + BoundedTrieData empty = new BoundedTrieData(); + BoundedTrieData singletonA = new BoundedTrieData(ImmutableList.of("a", "a")); + BoundedTrieData singletonB = new BoundedTrieData(ImmutableList.of("b", "b")); BoundedTrieNode lotsRoot = new BoundedTrieNode(); lotsRoot.addAll(Arrays.asList(Arrays.asList("c", "c"), Arrays.asList("d", "d"))); - BoundedTrieData lots = BoundedTrieData.create(lotsRoot); + BoundedTrieData lots = new BoundedTrieData(lotsRoot); - assertEquals(Collections.emptySet(), empty.getBoundedTrieResult().getResults()); + assertEquals(Collections.emptySet(), empty.getResult()); + empty.combine(singletonA); assertEquals( - ImmutableSet.of(Arrays.asList("a", "a", String.valueOf(false))), - empty.combine(singletonA).getBoundedTrieResult().getResults()); + ImmutableSet.of(Arrays.asList("a", "a", String.valueOf(false))), empty.getResult()); + singletonA.combine(empty); assertEquals( - ImmutableSet.of(Arrays.asList("a", "a", String.valueOf(false))), - singletonA.combine(empty).getBoundedTrieResult().getResults()); + ImmutableSet.of(Arrays.asList("a", "a", String.valueOf(false))), singletonA.getResult()); + singletonA.combine(singletonB); assertEquals( ImmutableSet.of( Arrays.asList("a", "a", String.valueOf(false)), Arrays.asList("b", "b", String.valueOf(false))), - singletonA.combine(singletonB).getBoundedTrieResult().getResults()); + singletonA.getResult()); + singletonA.combine(lots); assertEquals( ImmutableSet.of( Arrays.asList("a", "a", String.valueOf(false)), + Arrays.asList("b", "b", String.valueOf(false)), Arrays.asList("c", "c", String.valueOf(false)), Arrays.asList("d", "d", String.valueOf(false))), - singletonA.combine(lots).getBoundedTrieResult().getResults()); + singletonA.getResult()); + lots.combine(singletonA); assertEquals( ImmutableSet.of( Arrays.asList("a", "a", String.valueOf(false)), + Arrays.asList("b", "b", String.valueOf(false)), Arrays.asList("c", "c", String.valueOf(false)), Arrays.asList("d", "d", String.valueOf(false))), - lots.combine(singletonA).getBoundedTrieResult().getResults()); + lots.getResult()); } @Test @@ -413,14 +444,98 @@ public void testBoundedTrieDataCombineTrim() { BoundedTrieNode right = new BoundedTrieNode(); right.addAll(Arrays.asList(Arrays.asList("a", "y"), Arrays.asList("c", "d"))); - BoundedTrieData combined = - BoundedTrieData.create(left).combine(BoundedTrieData.create(null, right, 3)); + BoundedTrieData mainTree = new BoundedTrieData(null, left, 10); + mainTree.combine(new BoundedTrieData(null, right, 3)); assertEquals( ImmutableSet.of( Arrays.asList("a", String.valueOf(true)), Arrays.asList("b", "d", String.valueOf(false)), Arrays.asList("c", "d", String.valueOf(false))), - combined.getBoundedTrieResult().getResults()); + mainTree.getResult()); + } + + @Test + public void testAddMultiThreaded() throws InterruptedException { + final int numThreads = 10; + final BoundedTrieData mainTrie = new BoundedTrieData(); + final CountDownLatch latch = new CountDownLatch(numThreads); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + + Random rand = new Random(new Random().nextLong()); + List> segments = randomSegments(numThreads, 3, 9, 0.5, rand); + + for (int curThread = 0; curThread < numThreads; curThread++) { + int finalCurThread = curThread; + executor.execute( + () -> { + try { + mainTrie.add(segments.get(finalCurThread)); + // } + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + executor.shutdown(); + assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS)); + HashSet> dedupedSegments = new HashSet<>(segments); + assertEquals(everythingDeduped(dedupedSegments).size(), mainTrie.size()); + // Assert that all added paths are present in the mainTrie + for (List seg : dedupedSegments) { + assertTrue(mainTrie.contains(seg)); + } + } + + @Test + public void testCombineMultiThreaded() throws InterruptedException { + final int numThreads = 10; + BoundedTrieData mainTrie = new BoundedTrieData(); + final CountDownLatch latch = new CountDownLatch(numThreads); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + Random rand = new Random(new Random().nextLong()); + + // initialize mainTrie + List initialSegment = randomSegments(1, 3, 9, 0.5, rand).get(0); + mainTrie.add(initialSegment); + + // prepare all segments in advance outside of multiple threads + List> segments = randomSegments(numThreads, 3, 9, 0.5, rand); + segments.add(initialSegment); + List anotherSegment = randomSegments(1, 3, 9, 0.5, rand).get(0); + segments.add(anotherSegment); + + for (int curThread = 0; curThread < numThreads; curThread++) { + int finalCurThread = curThread; + executor.execute( + () -> { + try { + BoundedTrieData other = new BoundedTrieData(); + // only reads of segments; no write should be done here + other.add(segments.get(finalCurThread)); + // for one node we add more than one segment to trigger root over + // singleton and test combine with root. + if (finalCurThread == 7) { // just a randomly selected prime number + other.add(anotherSegment); + } + mainTrie.combine(other); + // } + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + executor.shutdown(); + assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS)); + HashSet> dedupedSegments = new HashSet<>(segments); + assertEquals(everythingDeduped(dedupedSegments).size(), mainTrie.size()); + // Assert that all added paths are present in the mainTrie + for (List seg : dedupedSegments) { + assertTrue(mainTrie.contains(seg)); + } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java index f9cd098edaa6..1b5170c9075d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java @@ -19,6 +19,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.sdk.metrics.BoundedTrie; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; @@ -28,6 +29,9 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.metrics.StringSet; import org.apache.beam.sdk.util.HistogramData; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; /** * An implementation of {@link MetricsContainer} that reads the current execution state (tracked in @@ -79,6 +83,11 @@ public StringSet getStringSet(MetricName metricName) { return getCurrentContainer().getStringSet(metricName); } + @Override + public BoundedTrie getBoundedTrie(MetricName metricName) { + return getCurrentContainer().getBoundedTrie(metricName); + } + @Override public Histogram getPerWorkerHistogram( MetricName metricName, HistogramData.BucketType bucketType) { diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java index 64455d704c9b..b7868683d761 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.core.metrics.GaugeData; import org.apache.beam.runners.core.metrics.MetricUpdates; import org.apache.beam.runners.core.metrics.StringSetData; +import org.apache.beam.sdk.metrics.BoundedTrie; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; @@ -35,6 +36,9 @@ import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.StringSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; /** Jet specific implementation of {@link MetricsContainer}. */ public class JetMetricsContainer implements MetricsContainer { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrie.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrie.java index 68c73eee3230..f3db90839b3d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrie.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrie.java @@ -18,10 +18,13 @@ package org.apache.beam.sdk.metrics; import java.util.Arrays; +import org.apache.beam.sdk.annotations.Internal; /** - * A metric that represents a bounded trie data structure. This interface extends the {@link Metric} - * interface and provides methods for adding string sequences (paths) to the trie. + * Internal: For internal use only and not for public consumption. This API is subject to + * incompatible changes, or even removal, in a future release. A metric that represents a bounded + * trie data structure. This interface extends the {@link Metric} interface and provides methods for + * adding string sequences (paths) to the trie. * *

The trie is bounded in size (max=100), meaning it has a maximum capacity for storing paths. * When the trie reaches its capacity, it truncates paths. This is useful for tracking and @@ -29,6 +32,7 @@ * recommended that parts of paths provided as strings are hierarchical in nature so the truncation * reduces granularity rather than complete data loss. */ +@Internal public interface BoundedTrie extends Metric { /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieResult.java deleted file mode 100644 index e947ec42b451..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieResult.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import com.google.auto.value.AutoValue; -import java.util.List; -import java.util.Set; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; - -/** - * An immutable class representing the result of {@link BoundedTrie} metric. The result is a set of - * lists, where each list represents a path in the bounded trie. The last element in the in each - * path is a boolean in string representation denoting whether this path was truncated. i.e. <["a", - * "b", "false"], ["c", "true"]> - */ -@AutoValue -public abstract class BoundedTrieResult { - /** - * Returns an immutable set of lists, where each list represents a path in the bounded trie. The - * last element in the in each path is a boolean in string representation denoting whether this - * path was truncated. i.e. <["a", "b", "false"], ["c", "true"]> - * - * @return The set of paths. - */ - public abstract Set> getResults(); - - /** - * Creates a {@link BoundedTrieResult} from the given set of paths. - * - * @param paths The set of paths to include in the result. - * @return A new {@link BoundedTrieResult} instance. - */ - public static BoundedTrieResult create(Set> paths) { - return new AutoValue_BoundedTrieResult(ImmutableSet.copyOf(paths)); - } - - /** - * Returns an empty {@link BoundedTrieResult} instance. - * - * @return An empty {@link BoundedTrieResult}. - */ - public static BoundedTrieResult empty() { - return EmptyBoundedTrieResult.INSTANCE; - } - - /** - * An immutable class representing an empty {@link BoundedTrieResult}. This class provides a - * singleton instance for efficiency. - */ - public static class EmptyBoundedTrieResult extends BoundedTrieResult { - - private static final EmptyBoundedTrieResult INSTANCE = new EmptyBoundedTrieResult(); - - private EmptyBoundedTrieResult() {} - - /** Returns an empty immutable set of paths. */ - @Override - public Set> getResults() { - return ImmutableSet.of(); - } - } -} diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java index bcd243ba746d..11b9bc49448c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java @@ -36,6 +36,7 @@ import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings; +import org.apache.beam.sdk.metrics.BoundedTrie; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; @@ -225,6 +226,14 @@ public StringSet getStringSet(MetricName metricName) { return tracker.metricsContainerRegistry.getUnboundContainer().getStringSet(metricName); } + @Override + public BoundedTrie getBoundedTrie(MetricName metricName) { + if (tracker.currentState != null) { + return tracker.currentState.metricsContainer.getBoundedTrie(metricName); + } + return tracker.metricsContainerRegistry.getUnboundContainer().getBoundedTrie(metricName); + } + @Override public Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType) { if (tracker.currentState != null) {