diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java
new file mode 100644
index 000000000000..d3e8976d20c8
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.metrics.BoundedTrie;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Tracks the current value for a {@link BoundedTrie} metric.
+ *
+ *
This class generally shouldn't be used directly. The only exception is within a runner where a
+ * counter is being reported for a specific step (rather than the counter in the current context).
+ * In that case retrieving the underlying cell and reporting directly to it avoids a step of
+ * indirection.
+ */
+public class BoundedTrieCell implements BoundedTrie, MetricCell {
+
+ private final DirtyState dirty = new DirtyState();
+ private final AtomicReference setValue =
+ new AtomicReference<>(BoundedTrieData.empty());
+ private final MetricName name;
+
+ /**
+ * Generally, runners should construct instances using the methods in {@link
+ * MetricsContainerImpl}, unless they need to define their own version of {@link
+ * MetricsContainer}. These constructors are *only* public so runners can instantiate.
+ */
+ public BoundedTrieCell(MetricName name) {
+ this.name = name;
+ }
+
+ @Override
+ public void reset() {
+ setValue.set(BoundedTrieData.empty());
+ dirty.reset();
+ }
+
+ void update(BoundedTrieData data) {
+ BoundedTrieData original;
+ do {
+ original = setValue.get();
+ } while (!setValue.compareAndSet(original, original.combine(data)));
+ dirty.afterModification();
+ }
+
+ @Override
+ public DirtyState getDirty() {
+ return dirty;
+ }
+
+ @Override
+ public BoundedTrieData getCumulative() {
+ return setValue.get();
+ }
+
+ @Override
+ public MetricName getName() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object object) {
+ if (object instanceof BoundedTrieCell) {
+ BoundedTrieCell boundedTrieCell = (BoundedTrieCell) object;
+ return Objects.equals(dirty, boundedTrieCell.dirty)
+ && Objects.equals(setValue.get(), boundedTrieCell.setValue.get())
+ && Objects.equals(name, boundedTrieCell.name);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dirty, setValue.get(), name);
+ }
+
+ @Override
+ public void add(Iterable values) {
+ BoundedTrieData original;
+ do {
+ original = setValue.get();
+ } while (!setValue.compareAndSet(original, original.add(values)));
+ dirty.afterModification();
+ }
+
+ @Override
+ public void add(String... values) {
+ add(Arrays.asList(values));
+ }
+}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java
new file mode 100644
index 000000000000..e8c7375924cc
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie;
+import org.apache.beam.sdk.metrics.BoundedTrieResult;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * Represents data stored in a bounded trie. This data structure is used to efficiently store and
+ * aggregate a collection of string sequences, paths/FQN with a limited size.
+ *
+ * The trie can be in one of two states:
+ *
+ *
+ * - **Singleton:** Contains a single path.
+ *
- **Trie:** Contains a {@link BoundedTrieNode} representing the root of the trie.
+ *
+ */
+@AutoValue
+public abstract class BoundedTrieData implements Serializable {
+
+ private static final int DEFAULT_BOUND = 100; // Default maximum size of the trie
+
+ /**
+ * Returns an {@link Optional} containing the singleton path if this {@link BoundedTrieData}
+ * represents a single path.
+ */
+ public abstract Optional> singleton();
+
+ /**
+ * Returns an {@link Optional} containing the root {@link BoundedTrieNode} if this {@link
+ * BoundedTrieData} represents a trie.
+ */
+ public abstract Optional root();
+
+ /** Returns the maximum size of the trie. */
+ public abstract int bound();
+
+ /**
+ * Creates a {@link BoundedTrieData} instance.
+ *
+ * @param singleton The singleton path (optional).
+ * @param root The root node of the trie (optional).
+ * @param bound The maximum size of the trie.
+ * @throws IllegalArgumentException If both or neither of {@code singleton} and {@code root} are
+ * specified or both are null, or if {@code bound} is less than 1.
+ */
+ public static BoundedTrieData create(
+ @Nullable List singleton, @Nullable BoundedTrieNode root, int bound) {
+ Preconditions.checkArgument(
+ (singleton == null ^ root == null),
+ "Either and only one of singleton or root must be specified.");
+ Preconditions.checkArgument(bound >= 1, "Bound must be at least 1.");
+ return new AutoValue_BoundedTrieData(
+ Optional.ofNullable(singleton), Optional.ofNullable(root), bound);
+ }
+
+ /**
+ * Creates a {@link BoundedTrieData} instance from a {@link BoundedTrieNode} with the default
+ * bound.
+ *
+ * @param root The root node of the trie.
+ */
+ public static BoundedTrieData create(@Nonnull BoundedTrieNode root) {
+ return create(null, root, DEFAULT_BOUND);
+ }
+
+ /**
+ * Creates a {@link BoundedTrieData} instance from a singleton path with the default bound.
+ *
+ * @param singleton The singleton path.
+ */
+ public static BoundedTrieData create(@Nonnull List singleton) {
+ return create(singleton, null, DEFAULT_BOUND);
+ }
+
+ /** Converts this {@link BoundedTrieData} to its proto {@link BoundedTrie}. */
+ public BoundedTrie toProto() {
+ BoundedTrie.Builder builder = BoundedTrie.newBuilder();
+ builder.setBound(bound());
+ singleton().ifPresent(builder::addAllSingleton);
+ root().ifPresent(r -> builder.setRoot(r.toProto()));
+ return builder.build();
+ }
+
+ /** Creates a {@link BoundedTrieData} instance from its proto {@link BoundedTrie}. */
+ public static BoundedTrieData fromProto(BoundedTrie proto) {
+ List singleton = proto.getSingletonList();
+ BoundedTrieNode root = proto.hasRoot() ? BoundedTrieNode.fromProto(proto.getRoot()) : null;
+ return create(singleton, root, proto.getBound());
+ }
+
+ /** Returns this {@link BoundedTrieData} as a {@link BoundedTrieNode}. */
+ public BoundedTrieNode asTrie() {
+ return root()
+ .orElseGet(
+ () -> {
+ BoundedTrieNode newRoot = new BoundedTrieNode();
+ singleton().ifPresent(newRoot::add);
+ return newRoot;
+ });
+ }
+
+ /** Returns a new {@link BoundedTrieData} instance that is a deep copy of this instance. */
+ public BoundedTrieData getCumulative() {
+ return root().isPresent()
+ ? create(null, new BoundedTrieNode(root().get()), bound())
+ : create(singleton().get(), null, bound());
+ }
+
+ /** Extracts the data from this {@link BoundedTrieData} as a {@link BoundedTrieResult}. */
+ public BoundedTrieResult getBoundedTrieResult() {
+ if (root().isPresent()) {
+ return BoundedTrieResult.create(new HashSet<>(root().get().flattened()));
+ } else if (singleton().isPresent()) {
+ List list = new ArrayList<>(singleton().get());
+ list.add(String.valueOf(false));
+ return BoundedTrieResult.create(ImmutableSet.of(list));
+ } else {
+ return BoundedTrieResult.empty();
+ }
+ }
+
+ /**
+ * Adds a new path to this {@link BoundedTrieData}.
+ *
+ * @param segments The path to add.
+ * @return A new {@link BoundedTrieData} instance with the added path.
+ */
+ public BoundedTrieData add(Iterable segments) {
+ List segmentsParts = ImmutableList.copyOf(segments);
+ if (root().isPresent() && singleton().isPresent()) {
+ return create(segmentsParts, null, bound());
+ } else if (singleton().isPresent() && singleton().get().equals(segmentsParts)) {
+ return this; // Optimize for re-adding the same value.
+ } else {
+ BoundedTrieNode newRoot = new BoundedTrieNode(asTrie());
+ newRoot.add(segmentsParts);
+ if (newRoot.getSize() > bound()) {
+ newRoot.trim();
+ }
+ return create(null, newRoot, bound());
+ }
+ }
+
+ /**
+ * Combines this {@link BoundedTrieData} with another {@link BoundedTrieData}.
+ *
+ * @param other The other {@link BoundedTrieData} to combine with.
+ * @return A new {@link BoundedTrieData} instance representing the combined data.
+ */
+ public BoundedTrieData combine(BoundedTrieData other) {
+ if (root().isPresent() && singleton().isPresent()) {
+ return other;
+ } else if (other.root().isPresent() && other.singleton().isPresent()) {
+ return this;
+ } else {
+ BoundedTrieNode combined = new BoundedTrieNode(asTrie());
+ combined.merge(other.asTrie());
+ int bound = Math.min(this.bound(), other.bound());
+ while (combined.getSize() > bound) {
+ combined.trim();
+ }
+ return create(null, combined, bound);
+ }
+ }
+
+ /**
+ * Returns the number of paths stored in this trie.
+ *
+ * @return The size of the trie.
+ */
+ public int size() {
+ if (singleton().isPresent()) {
+ return 1;
+ } else if (root().isPresent()) {
+ return root().get().getSize();
+ } else {
+ return 0;
+ }
+ }
+
+ /**
+ * Checks if the trie contains the given path.
+ *
+ * @param value The path to check.
+ * @return True if the trie contains the path, false otherwise.
+ */
+ public boolean contains(List value) {
+ if (singleton().isPresent()) {
+ return value.equals(singleton().get());
+ } else if (root().isPresent()) {
+ return root().get().contains(value);
+ } else {
+ return false;
+ }
+ }
+
+ /** Returns an empty {@link BoundedTrieData} instance. */
+ public static BoundedTrieData empty() {
+ return EmptyBoundedTrieData.INSTANCE;
+ }
+
+ @Override
+ public final boolean equals(@Nullable Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ BoundedTrieData that = (BoundedTrieData) other;
+ return this.asTrie().equals(that.asTrie());
+ }
+
+ @Override
+ public final int hashCode() {
+ return asTrie().hashCode();
+ }
+
+ @Override
+ public final String toString() {
+ return "BoundedTrieData(" + asTrie() + ")";
+ }
+
+ // ---------------------------- EmptyBoundedTrieData Implementation ---------------------------
+ /**
+ * An immutable implementation of {@link BoundedTrieData} representing an empty trie. This class
+ * provides a singleton instance for efficiency.
+ */
+ public static class EmptyBoundedTrieData extends BoundedTrieData {
+
+ private static final EmptyBoundedTrieData INSTANCE = new EmptyBoundedTrieData();
+ private static final int DEFAULT_BOUND = 1; // Define the default bound here
+
+ private EmptyBoundedTrieData() {}
+
+ /**
+ * Returns an {@link Optional} containing an empty list of strings, representing the singleton
+ * path in an empty trie.
+ */
+ @Override
+ public Optional> singleton() {
+ return Optional.of(ImmutableList.of());
+ }
+
+ /**
+ * Returns an {@link Optional} containing an empty {@link BoundedTrieNode} representing the root
+ * of an empty trie.
+ */
+ @Override
+ public Optional root() {
+ return Optional.of(new BoundedTrieNode(ImmutableMap.of(), false, DEFAULT_BOUND));
+ }
+
+ /** Returns the default bound for the empty trie. */
+ @Override
+ public int bound() {
+ return DEFAULT_BOUND;
+ }
+
+ /**
+ * Returns an empty {@link BoundedTrieResult}. This represents the result of extracting data
+ * from an empty trie.
+ */
+ @Override
+ public BoundedTrieResult getBoundedTrieResult() {
+ return BoundedTrieResult.empty();
+ }
+ }
+
+ // ------------------------------ BoundedTrieNode Implementation ------------------------------
+ protected static class BoundedTrieNode implements Serializable {
+ /**
+ * A map from strings to child nodes. Each key represents a segment of a path/FQN, and the
+ * corresponding value represents the subtree rooted at that segment.
+ */
+ @Nonnull private Map children;
+
+ /**
+ * A flag indicating whether this node has been truncated. A truncated node represents an
+ * aggregation/roll-up of multiple paths that share a common prefix.
+ */
+ private boolean truncated;
+
+ /**
+ * The size of the subtree rooted at this node. This represents the number of distinct paths
+ * that pass through this node.
+ */
+ private int size;
+
+ /** Constructs an empty `BoundedTrieNode` with size 1 and not truncated. */
+ BoundedTrieNode() {
+ this(new HashMap<>(), false, 1);
+ }
+
+ /**
+ * Constructs a `BoundedTrieNode` with the given children, truncation status, and size.
+ *
+ * @param children The children of this node.
+ * @param truncated Whether this node is truncated.
+ * @param size The size of the subtree rooted at this node.
+ */
+ BoundedTrieNode(@Nonnull Map children, boolean truncated, int size) {
+ this.children = children;
+ this.size = size;
+ this.truncated = truncated;
+ }
+
+ /**
+ * Constructs a deep copy of the given `BoundedTrieNode`.
+ *
+ * @param other The node to copy.
+ */
+ public BoundedTrieNode(BoundedTrieNode other) {
+ this.truncated = other.truncated;
+ this.size = other.size;
+ this.children = new HashMap<>();
+ // deep copy
+ other.children.forEach((key, value) -> children.put(key, new BoundedTrieNode(value)));
+ }
+
+ /**
+ * Adds a path represented by the given list of segments to this trie.
+ *
+ * @param segments The segments of the path to add.
+ * @return The change in the size of the subtree rooted at this node.
+ */
+ public int add(List segments) {
+ if (truncated || segments.isEmpty()) {
+ return 0;
+ }
+ String head = segments.get(0);
+ List tail = segments.subList(1, segments.size());
+ boolean wasEmpty = children.isEmpty();
+ BoundedTrieNode currChild = children.get(head);
+ int delta = 0;
+ if (currChild == null) {
+ currChild = new BoundedTrieNode();
+ children.put(head, currChild);
+ delta = wasEmpty ? 0 : 1;
+ }
+ if (!tail.isEmpty()) {
+ delta += currChild.add(tail);
+ }
+ size += delta;
+ return delta;
+ }
+
+ /**
+ * Adds multiple paths to this trie.
+ *
+ * @param segmentsIter An iterator over the paths to add.
+ * @return The total change in the size of the subtree rooted at this node.
+ */
+ public int addAll(List> segmentsIter) {
+ return segmentsIter.stream().mapToInt(this::add).sum();
+ }
+
+ /**
+ * Trims this trie by truncating the largest subtree.
+ *
+ * @return The change in the size of the subtree rooted at this node.
+ */
+ public int trim() {
+ if (children.isEmpty()) {
+ return 0;
+ }
+ BoundedTrieNode maxChild =
+ Collections.max(children.values(), Comparator.comparingInt(BoundedTrieNode::getSize));
+ int delta;
+ if (maxChild.size == 1) {
+ delta = 1 - size;
+ truncated = true;
+ children = new HashMap<>();
+ } else {
+ delta = maxChild.trim();
+ }
+ size += delta;
+ return delta;
+ }
+
+ /**
+ * Merges the given `BoundedTrieNode` into this node.
+ *
+ * @param other The node to merge.
+ * @return The change in the size of the subtree rooted at this node.
+ */
+ public int merge(BoundedTrieNode other) {
+ if (truncated) {
+ return 0;
+ }
+ if (other.truncated) {
+ truncated = true;
+ children = new HashMap<>();
+ int delta = 1 - size;
+ size += delta;
+ return delta;
+ }
+ if (other.children.isEmpty()) {
+ return 0;
+ }
+ if (children.isEmpty()) {
+ children = new HashMap<>(other.children);
+ int delta = other.size - size;
+ size += delta;
+ return delta;
+ }
+ int delta = 0;
+ for (Map.Entry entry : other.children.entrySet()) {
+ String prefix = entry.getKey();
+ BoundedTrieNode otherChild = entry.getValue();
+ BoundedTrieNode thisChild = children.get(prefix);
+ if (thisChild == null) {
+ children.put(prefix, otherChild);
+ delta += otherChild.size;
+ } else {
+ delta += thisChild.merge(otherChild);
+ }
+ }
+ size += delta;
+ return delta;
+ }
+
+ /**
+ * Returns a flattened representation of this trie.
+ *
+ * The flattened representation is a list of lists of strings, where each inner list
+ * represents a path in the trie and the last element in the list is a boolean in string
+ * representation denoting whether this path was truncated. i.e. <["a", "b", "false"], ["c",
+ * "true"]>
+ *
+ * @return The flattened representation of this trie.
+ */
+ public List> flattened() {
+ List> result = new ArrayList<>();
+ if (truncated) {
+ result.add(Collections.singletonList(String.valueOf(true)));
+ } else if (children.isEmpty()) {
+ result.add(Collections.singletonList(String.valueOf(false)));
+ } else {
+ List prefixes = new ArrayList<>(children.keySet());
+ Collections.sort(prefixes);
+ for (String prefix : prefixes) {
+ BoundedTrieNode child = children.get(prefix);
+ if (child != null) {
+ for (List flattened : child.flattened()) {
+ List newList = new ArrayList<>();
+ newList.add(prefix);
+ newList.addAll(flattened);
+ result.add(newList);
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Converts this `BoundedTrieNode` to proto.
+ *
+ * @return The {@link org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode}
+ * representation of this node.
+ */
+ public org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode toProto() {
+ org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode.Builder builder =
+ org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode.newBuilder();
+ builder.setTruncated(truncated);
+ children.forEach((key, value) -> builder.putChildren(key, value.toProto()));
+ return builder.build();
+ }
+
+ /**
+ * Constructs a `BoundedTrieNode` from proto.
+ *
+ * @param proto The {@link org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode}
+ * representation of the node.
+ * @return The corresponding `BoundedTrieNode`.
+ */
+ public static BoundedTrieNode fromProto(
+ org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrieNode proto) {
+ BoundedTrieNode node = new BoundedTrieNode();
+ if (proto.getTruncated()) {
+ node.truncated = true;
+ node.children = new HashMap<>();
+ } else {
+ node.children = new HashMap<>();
+ proto
+ .getChildrenMap()
+ .forEach((key, value) -> node.children.put(key, BoundedTrieNode.fromProto(value)));
+ node.size =
+ Math.max(1, node.children.values().stream().mapToInt(BoundedTrieNode::getSize).sum());
+ }
+ return node;
+ }
+
+ /**
+ * Checks if the trie contains the given path represented by the list of segments.
+ *
+ * @param segments The segments of the path to check.
+ * @return True if the trie contains the path, false otherwise.
+ */
+ public boolean contains(List segments) {
+ if (truncated || segments.isEmpty()) {
+ return true;
+ }
+ String head = segments.get(0);
+ List tail = segments.subList(1, segments.size());
+ return children.containsKey(head) && children.get(head).contains(tail);
+ }
+
+ /**
+ * Returns the size of the subtree rooted at this node.
+ *
+ * @return The size of the subtree.
+ */
+ public int getSize() {
+ return size;
+ }
+
+ /**
+ * Returns whether this node is truncated.
+ *
+ * @return Whether this node is truncated.
+ */
+ public boolean isTruncated() {
+ return truncated;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17; // standard prime numbers
+ result = 31 * result + size;
+ // recursively traverse to calculate hashcode of each node
+ result = 31 * result + children.hashCode();
+ result = 31 * result + (truncated ? 1 : 0);
+ return result;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ BoundedTrieNode that = (BoundedTrieNode) other;
+ return truncated == that.truncated && children.equals(that.children);
+ }
+
+ @Override
+ public String toString() {
+ return "{"
+ + flattened().stream()
+ .map(list -> "'" + String.join("", list) + "'")
+ .collect(Collectors.joining(", "))
+ + "}";
+ }
+ }
+}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index 2bb935111d38..e6758700b755 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -54,6 +54,8 @@ public static final class Urns {
extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_DOUBLE);
public static final String USER_SET_STRING =
extractUrn(MonitoringInfoSpecs.Enum.USER_SET_STRING);
+ public static final String USER_BOUNDED_TRIE =
+ extractUrn(MonitoringInfoSpecs.Enum.USER_BOUNDED_TRIE);
public static final String SAMPLED_BYTE_SIZE =
extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE);
public static final String WORK_COMPLETED = extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED);
@@ -165,6 +167,7 @@ public static final class TypeUrns {
public static final String BOTTOM_N_DOUBLE_TYPE = "beam:metrics:bottom_n_double:v1";
public static final String PROGRESS_TYPE = "beam:metrics:progress:v1";
public static final String SET_STRING_TYPE = "beam:metrics:set_string:v1";
+ public static final String BOUNDED_TRIE_TYPE = "beam:metrics:bounded_trie:v1";
static {
// Validate that compile time constants match the values stored in the protos.
@@ -191,6 +194,8 @@ public static final class TypeUrns {
BOTTOM_N_DOUBLE_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.BOTTOM_N_DOUBLE_TYPE)));
checkArgument(PROGRESS_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.PROGRESS_TYPE)));
checkArgument(SET_STRING_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.SET_STRING_TYPE)));
+ checkArgument(
+ BOUNDED_TRIE_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.BOUNDED_TRIE_TYPE)));
}
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java
new file mode 100644
index 000000000000..683976303663
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.*;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.core.metrics.BoundedTrieData.BoundedTrieNode;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests for {@link BoundedTrieNode}. */
+@RunWith(JUnit4.class)
+public class BoundedTrieNodeTest {
+ private static final Logger LOG = LoggerFactory.getLogger(BoundedTrieNodeTest.class);
+
+ /**
+ * Generates {@code n} random segments with a fixed depth.
+ *
+ * @param n The number of segments to generate.
+ * @param depth The depth of each segment.
+ * @param overlap The probability that a segment will share a prefix with a previous segment.
+ * @param rand A random number generator.
+ * @return A list of segments.
+ */
+ private static List> generateSegmentsFixedDepth(
+ int n, int depth, double overlap, Random rand) {
+ if (depth == 0) {
+ return Collections.nCopies(n, Collections.emptyList());
+ } else {
+ List> result = new ArrayList<>();
+ List seen = new ArrayList<>();
+ for (List suffix : generateSegmentsFixedDepth(n, depth - 1, overlap, rand)) {
+ String prefix;
+ if (seen.isEmpty() || rand.nextDouble() > overlap) {
+ prefix = String.valueOf((char) ('a' + seen.size()));
+ seen.add(prefix);
+ } else {
+ prefix = seen.get(rand.nextInt(seen.size()));
+ }
+ List newSegments = new ArrayList<>();
+ newSegments.add(prefix);
+ newSegments.addAll(suffix);
+ result.add(newSegments);
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Generates {@code n} random segments with a depth between {@code minDepth} and {@code maxDepth}.
+ *
+ * @param n The number of segments to generate.
+ * @param minDepth The minimum depth of each segment.
+ * @param maxDepth The maximum depth of each segment.
+ * @param overlap The probability that a segment will share a prefix with a previous segment.
+ * @param rand A random number generator.
+ * @return A list of segments.
+ */
+ private static List> randomSegments(
+ int n, int minDepth, int maxDepth, double overlap, Random rand) {
+ List> result = new ArrayList<>();
+ List depths = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ depths.add(minDepth + (i % (maxDepth - minDepth + 1)));
+ }
+ Iterator depthIterator = depths.iterator();
+ for (List segments : generateSegmentsFixedDepth(n, maxDepth, overlap, rand)) {
+ int depth = depthIterator.next();
+ result.add(segments.subList(0, depth));
+ }
+ return result;
+ }
+
+ /**
+ * Asserts that the given {@link BoundedTrieNode} covers the expected segments.
+ *
+ * @param node The {@link BoundedTrieNode} to check.
+ * @param expected The expected segments.
+ * @param maxTruncated The maximum number of truncated segments allowed.
+ */
+ private void assertCovers(BoundedTrieNode node, Set> expected, int maxTruncated) {
+ assertCoversFlattened(node.flattened(), expected, maxTruncated);
+ }
+
+ /**
+ * Verifies that the flattened list of segments covers the expected segments.
+ *
+ * @param flattened The flattened list of segments.
+ * @param expected The expected segments.
+ * @param maxTruncated The maximum number of truncated segments allowed.
+ */
+ private void assertCoversFlattened(
+ List> flattened, Set> expected, int maxTruncated) {
+ Set> exactSegments = new HashSet<>();
+ Set> truncatedSegments = new HashSet<>();
+ for (List entry : flattened) {
+ List segments = new ArrayList<>(entry.subList(0, entry.size() - 1));
+ String last = entry.get(entry.size() - 1);
+ if (Boolean.parseBoolean(last)) {
+ truncatedSegments.add(segments);
+ } else {
+ exactSegments.add(segments);
+ }
+ }
+
+ assertTrue(
+ "Exact segments set should not be larger than expected set",
+ exactSegments.size() <= expected.size());
+ assertTrue(
+ "Expected set should contain all exact segments", expected.containsAll(exactSegments));
+
+ assertTrue(
+ "Number of truncated segments should not exceed maxTruncated",
+ truncatedSegments.size() <= maxTruncated);
+
+ Set> seenTruncated = new HashSet<>();
+ for (List segments : expected) {
+ if (!exactSegments.contains(segments)) {
+ int found = 0;
+ for (int i = 0; i < segments.size(); i++) {
+ if (truncatedSegments.contains(segments.subList(0, i))) {
+ seenTruncated.add(segments.subList(0, i));
+ found++;
+ }
+ }
+ assertEquals(
+ String.format(
+ "Expected exactly one prefix of %s to occur in %s, found %s",
+ segments, truncatedSegments, found),
+ 1,
+ found);
+ }
+ }
+
+ assertEquals(
+ "Seen truncated segments should match the truncated segments set",
+ seenTruncated,
+ truncatedSegments);
+ }
+
+ /**
+ * Runs a test case for the {@link #assertCoversFlattened} method.
+ *
+ * @param flattened The flattened list of segments.
+ * @param expected The expected segments.
+ * @param maxTruncated The maximum number of truncated segments allowed.
+ */
+ private void runCoversTest(List flattened, List expected, int maxTruncated) {
+ List> parsedFlattened =
+ flattened.stream()
+ .map(
+ s -> {
+ List result =
+ new ArrayList<>(Arrays.asList(s.replace("*", "").split("")));
+ result.add(s.endsWith("*") ? Boolean.TRUE.toString() : Boolean.FALSE.toString());
+ return result;
+ })
+ .collect(Collectors.toList());
+ Set> parsedExpected =
+ expected.stream().map(s -> Arrays.asList(s.split(""))).collect(Collectors.toSet());
+ assertCoversFlattened(parsedFlattened, parsedExpected, maxTruncated);
+ }
+
+ /**
+ * Runs a test case for the {@link BoundedTrieNode} class.
+ *
+ * @param toAdd The segments to add to the {@link BoundedTrieNode}.
+ */
+ private void runTest(List> toAdd) {
+ Set> everything = new HashSet<>(toAdd);
+ Set> allPrefixes = new HashSet<>();
+ for (List segments : everything) {
+ for (int i = 0; i < segments.size(); i++) {
+ allPrefixes.add(segments.subList(0, i));
+ }
+ }
+ Set> everythingDeduped = new HashSet<>(everything);
+ everythingDeduped.removeAll(allPrefixes);
+
+ // Test basic addition.
+ BoundedTrieNode node = new BoundedTrieNode();
+ int initialSize = node.getSize();
+ assertEquals(1, initialSize);
+
+ int totalSize = initialSize;
+ for (List segments : everything) {
+ totalSize += node.add(segments);
+ }
+
+ assertEquals(everythingDeduped.size(), node.getSize());
+ assertEquals(totalSize, node.getSize());
+ assertCovers(node, everythingDeduped, 0);
+
+ // Test merging.
+ BoundedTrieNode node0 = new BoundedTrieNode();
+ BoundedTrieNode node1 = new BoundedTrieNode();
+ int i = 0;
+ for (List segments : everything) {
+ if (i % 2 == 0) {
+ node0.add(segments);
+ } else {
+ node1.add(segments);
+ }
+ i++;
+ }
+ int preMergeSize = node0.getSize();
+ int mergeDelta = node0.merge(node1);
+ assertEquals(preMergeSize + mergeDelta, node0.getSize());
+ assertEquals(node0, node);
+
+ // Test trimming.
+ int trimDelta = 0;
+ if (node.getSize() > 1) {
+ trimDelta = node.trim();
+ assertTrue(trimDelta < 0);
+ assertEquals(totalSize + trimDelta, node.getSize());
+ assertCovers(node, everythingDeduped, 1);
+ }
+
+ if (node.getSize() > 1) {
+ int trim2Delta = node.trim();
+ assertTrue(trim2Delta < 0);
+ assertEquals(totalSize + trimDelta + trim2Delta, node.getSize());
+ assertCovers(node, everythingDeduped, 2);
+ }
+
+ // Verify adding after trimming is a no-op.
+ BoundedTrieNode nodeCopy = new BoundedTrieNode(node);
+ for (List segments : everything) {
+ assertEquals(0, node.add(segments));
+ }
+ assertEquals(node, nodeCopy);
+
+ // Verify merging after trimming is a no-op.
+ assertEquals(0, node.merge(node0));
+ assertEquals(0, node.merge(node1));
+ assertEquals(node, nodeCopy);
+
+ // Test adding new values.
+ int expectedDelta = node.isTruncated() ? 0 : 2;
+ List> newValues =
+ Arrays.asList(Collections.singletonList("new1"), Arrays.asList("new2", "new2.1"));
+ assertEquals(expectedDelta, node.addAll(newValues));
+
+ Set> expectedWithNewValues = new HashSet<>(everythingDeduped);
+ expectedWithNewValues.addAll(newValues);
+ assertCovers(node, expectedWithNewValues, 2);
+
+ // Test merging new values.
+ BoundedTrieNode newValuesNode = new BoundedTrieNode();
+ newValuesNode.addAll(newValues);
+ assertEquals(expectedDelta, nodeCopy.merge(newValuesNode));
+ assertCovers(nodeCopy, expectedWithNewValues, 2);
+ }
+
+ /**
+ * Fuzzy segment generator for testing {@link BoundedTrieNode} class.
+ *
+ * @param iterations The number of iterations to run.
+ * @param n The number of segments to generate for each iteration.
+ * @param minDepth The minimum depth of each segment.
+ * @param maxDepth The maximum depth of each segment.
+ * @param overlap The probability that a segment will share a prefix with a previous segment.
+ */
+ private void runFuzz(int iterations, int n, int minDepth, int maxDepth, double overlap) {
+ for (int i = 0; i < iterations; i++) {
+ long seed = new Random().nextLong();
+ Random rand = new Random(seed);
+ List> segments = randomSegments(n, minDepth, maxDepth, overlap, rand);
+ try {
+ runTest(segments);
+ } catch (AssertionError e) {
+ LOG.info("SEED: {}", seed);
+ throw e;
+ }
+ }
+ }
+
+ @Test
+ public void testTrivial() {
+ runTest(Arrays.asList(Arrays.asList("a", "b"), Arrays.asList("a", "c")));
+ }
+
+ @Test
+ public void testFlat() {
+ runTest(
+ Arrays.asList(Arrays.asList("a", "a"), Arrays.asList("b", "b"), Arrays.asList("c", "c")));
+ }
+
+ @Test
+ public void testDeep() {
+ runTest(Arrays.asList(Collections.nCopies(10, "a"), Collections.nCopies(12, "b")));
+ }
+
+ @Test
+ public void testSmall() {
+ runFuzz(10, 5, 2, 3, 0.5);
+ }
+
+ @Test
+ public void testMedium() {
+ runFuzz(10, 20, 2, 4, 0.5);
+ }
+
+ @Test
+ public void testLargeSparse() {
+ runFuzz(10, 120, 2, 4, 0.2);
+ }
+
+ @Test
+ public void testLargeDense() {
+ runFuzz(10, 120, 2, 4, 0.8);
+ }
+
+ @Test
+ public void testCoversExact() {
+ runCoversTest(Arrays.asList("ab", "ac", "cd"), Arrays.asList("ab", "ac", "cd"), 0);
+
+ assertThrows(
+ AssertionError.class,
+ () -> runCoversTest(Arrays.asList("ab", "ac", "cd"), Arrays.asList("ac", "cd"), 0));
+ assertThrows(
+ AssertionError.class,
+ () -> runCoversTest(Arrays.asList("ab", "ac"), Arrays.asList("ab", "ac", "cd"), 0));
+ assertThrows(
+ AssertionError.class,
+ () -> runCoversTest(Arrays.asList("a*", "cd"), Arrays.asList("ab", "ac", "cd"), 0));
+ }
+
+ @Test
+ public void testCoversTruncated() {
+ runCoversTest(Arrays.asList("a*", "cd"), Arrays.asList("ab", "ac", "cd"), 1);
+ runCoversTest(Arrays.asList("a*", "cd"), Arrays.asList("ab", "ac", "abcde", "cd"), 1);
+
+ assertThrows(
+ AssertionError.class,
+ () -> runCoversTest(Arrays.asList("ab", "ac", "cd"), Arrays.asList("ac", "cd"), 1));
+ assertThrows(
+ AssertionError.class,
+ () -> runCoversTest(Arrays.asList("ab", "ac"), Arrays.asList("ab", "ac", "cd"), 1));
+ assertThrows(
+ AssertionError.class,
+ () -> runCoversTest(Arrays.asList("a*", "c*"), Arrays.asList("ab", "ac", "cd"), 1));
+ assertThrows(
+ AssertionError.class,
+ () -> runCoversTest(Arrays.asList("a*", "c*"), Arrays.asList("ab", "ac"), 1));
+ }
+
+ @Test
+ public void testBoundedTrieDataCombine() {
+ BoundedTrieData empty = BoundedTrieData.empty();
+ BoundedTrieData singletonA = BoundedTrieData.create(ImmutableList.of("a", "a"));
+ BoundedTrieData singletonB = BoundedTrieData.create(ImmutableList.of("b", "b"));
+ BoundedTrieNode lotsRoot = new BoundedTrieNode();
+ lotsRoot.addAll(Arrays.asList(Arrays.asList("c", "c"), Arrays.asList("d", "d")));
+ BoundedTrieData lots = BoundedTrieData.create(lotsRoot);
+
+ assertEquals(Collections.emptySet(), empty.getBoundedTrieResult().getResults());
+ assertEquals(
+ ImmutableSet.of(Arrays.asList("a", "a", String.valueOf(false))),
+ empty.combine(singletonA).getBoundedTrieResult().getResults());
+ assertEquals(
+ ImmutableSet.of(Arrays.asList("a", "a", String.valueOf(false))),
+ singletonA.combine(empty).getBoundedTrieResult().getResults());
+ assertEquals(
+ ImmutableSet.of(
+ Arrays.asList("a", "a", String.valueOf(false)),
+ Arrays.asList("b", "b", String.valueOf(false))),
+ singletonA.combine(singletonB).getBoundedTrieResult().getResults());
+ assertEquals(
+ ImmutableSet.of(
+ Arrays.asList("a", "a", String.valueOf(false)),
+ Arrays.asList("c", "c", String.valueOf(false)),
+ Arrays.asList("d", "d", String.valueOf(false))),
+ singletonA.combine(lots).getBoundedTrieResult().getResults());
+ assertEquals(
+ ImmutableSet.of(
+ Arrays.asList("a", "a", String.valueOf(false)),
+ Arrays.asList("c", "c", String.valueOf(false)),
+ Arrays.asList("d", "d", String.valueOf(false))),
+ lots.combine(singletonA).getBoundedTrieResult().getResults());
+ }
+
+ @Test
+ public void testBoundedTrieDataCombineTrim() {
+ BoundedTrieNode left = new BoundedTrieNode();
+ left.addAll(Arrays.asList(Arrays.asList("a", "x"), Arrays.asList("b", "d")));
+ BoundedTrieNode right = new BoundedTrieNode();
+ right.addAll(Arrays.asList(Arrays.asList("a", "y"), Arrays.asList("c", "d")));
+
+ BoundedTrieData combined =
+ BoundedTrieData.create(left).combine(BoundedTrieData.create(null, right, 3));
+
+ assertEquals(
+ ImmutableSet.of(
+ Arrays.asList("a", String.valueOf(true)),
+ Arrays.asList("b", "d", String.valueOf(false)),
+ Arrays.asList("c", "d", String.valueOf(false))),
+ combined.getBoundedTrieResult().getResults());
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrie.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrie.java
new file mode 100644
index 000000000000..68c73eee3230
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrie.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.util.Arrays;
+
+/**
+ * A metric that represents a bounded trie data structure. This interface extends the {@link Metric}
+ * interface and provides methods for adding string sequences (paths) to the trie.
+ *
+ * The trie is bounded in size (max=100), meaning it has a maximum capacity for storing paths.
+ * When the trie reaches its capacity, it truncates paths. This is useful for tracking and
+ * aggregating a large number of distinct paths while limiting memory usage. It is not necessary but
+ * recommended that parts of paths provided as strings are hierarchical in nature so the truncation
+ * reduces granularity rather than complete data loss.
+ */
+public interface BoundedTrie extends Metric {
+
+ /**
+ * Adds a path to the trie. The path is represented as an iterable of string segments.
+ *
+ * @param values The segments of the path to add.
+ */
+ void add(Iterable values);
+
+ /**
+ * Adds a path to the trie. The path is represented as a variable number of string arguments.
+ *
+ * @param values The segments of the path to add.
+ */
+ default void add(String... values) {
+ add(Arrays.asList(values));
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieResult.java
new file mode 100644
index 000000000000..e947ec42b451
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieResult.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * An immutable class representing the result of {@link BoundedTrie} metric. The result is a set of
+ * lists, where each list represents a path in the bounded trie. The last element in the in each
+ * path is a boolean in string representation denoting whether this path was truncated. i.e. <["a",
+ * "b", "false"], ["c", "true"]>
+ */
+@AutoValue
+public abstract class BoundedTrieResult {
+ /**
+ * Returns an immutable set of lists, where each list represents a path in the bounded trie. The
+ * last element in the in each path is a boolean in string representation denoting whether this
+ * path was truncated. i.e. <["a", "b", "false"], ["c", "true"]>
+ *
+ * @return The set of paths.
+ */
+ public abstract Set> getResults();
+
+ /**
+ * Creates a {@link BoundedTrieResult} from the given set of paths.
+ *
+ * @param paths The set of paths to include in the result.
+ * @return A new {@link BoundedTrieResult} instance.
+ */
+ public static BoundedTrieResult create(Set> paths) {
+ return new AutoValue_BoundedTrieResult(ImmutableSet.copyOf(paths));
+ }
+
+ /**
+ * Returns an empty {@link BoundedTrieResult} instance.
+ *
+ * @return An empty {@link BoundedTrieResult}.
+ */
+ public static BoundedTrieResult empty() {
+ return EmptyBoundedTrieResult.INSTANCE;
+ }
+
+ /**
+ * An immutable class representing an empty {@link BoundedTrieResult}. This class provides a
+ * singleton instance for efficiency.
+ */
+ public static class EmptyBoundedTrieResult extends BoundedTrieResult {
+
+ private static final EmptyBoundedTrieResult INSTANCE = new EmptyBoundedTrieResult();
+
+ private EmptyBoundedTrieResult() {}
+
+ /** Returns an empty immutable set of paths. */
+ @Override
+ public Set> getResults() {
+ return ImmutableSet.of();
+ }
+ }
+}