diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index 4ec189e4637f..33bb5ae729f8 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -198,6 +198,17 @@ message MonitoringInfoSpecs { }] }]; + // Represents a set of strings seen across bundles. + USER_BOUNDED_TRIE = 22 [(monitoring_info_spec) = { + urn: "beam:metric:user:bounded_trie:v1", + type: "beam:metrics:bounded_trie:v1", + required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"], + annotations: [{ + key: "description", + value: "URN utilized to report user metric." + }] + }]; + // General monitored state information which contains structured information // which does not fit into a typical metric format. See MonitoringTableData // for more details. @@ -576,6 +587,12 @@ message MonitoringInfoTypeUrns { SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metrics:set_string:v1"]; + // Represents a bounded trie of strings. + // + // Encoding: BoundedTrie proto + BOUNDED_TRIE_TYPE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:metrics:bounded_trie:v1"]; + // General monitored state information which contains structured information // which does not fit into a typical metric format. See MonitoringTableData // for more details. @@ -588,6 +605,30 @@ message MonitoringInfoTypeUrns { } } + +// A single node in a BoundedTrie. +message BoundedTrieNode { + // Whether this node has been truncated. + // A truncated leaf represents possibly many children with the same prefix. + bool truncated = 1; + + // Children of this node. Must be empty if truncated is true. + map children = 2; +} + +// The message type used for encoding metrics of type bounded trie. +message BoundedTrie { + // The maximum number of elements to store before truncation. + int32 bound = 1; + + // A compact representation of all the elements in this trie. + BoundedTrieNode root = 2; + + // A more efficient representation for metrics consisting of a single value. + repeated string singleton = 3; +} + + // General monitored state information which contains structured information // which does not fit into a typical metric format. // diff --git a/sdks/python/apache_beam/metrics/cells.pxd b/sdks/python/apache_beam/metrics/cells.pxd index c583dabeb0c0..7590bd8b5966 100644 --- a/sdks/python/apache_beam/metrics/cells.pxd +++ b/sdks/python/apache_beam/metrics/cells.pxd @@ -60,3 +60,14 @@ cdef class DistributionData(object): cdef readonly libc.stdint.int64_t count cdef readonly libc.stdint.int64_t min cdef readonly libc.stdint.int64_t max + + +cdef class _BoundedTrieNode(object): + cdef readonly libc.stdint.int64_t _size + cdef readonly dict _children + cdef readonly bint _truncated + +cdef class BoundedTrieData(object): + cdef readonly libc.stdint.int64_t _bound + cdef readonly object _singleton + cdef readonly _BoundedTrieNode _root diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index 5802c6914eb2..9a62cae14691 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -23,6 +23,7 @@ # pytype: skip-file +import copy import logging import threading import time @@ -31,6 +32,8 @@ from typing import Optional from typing import Set +from apache_beam.portability.api import metrics_pb2 + try: import cython except ImportError: @@ -312,6 +315,35 @@ def to_runner_api_monitoring_info_impl(self, name, transform_id): ptransform=transform_id) +class BoundedTrieCell(AbstractMetricCell): + """For internal use only; no backwards-compatibility guarantees. + + Tracks the current value for a BoundedTrie metric. + + Each cell tracks the state of a metric independently per context per bundle. + Therefore, each metric has a different cell in each bundle, that is later + aggregated. + + This class is thread safe. + """ + def __init__(self): + super().__init__(BoundedTrieData) + + def add(self, value): + self.update(value) + + def _update_locked(self, value): + self.data.add(value) + + def to_runner_api_monitoring_info_impl(self, name, transform_id): + from apache_beam.metrics import monitoring_infos + return monitoring_infos.user_bounded_trie( + name.namespace, + name.name, + self.get_cumulative(), + ptransform=transform_id) + + class DistributionResult(object): """The result of a Distribution metric.""" def __init__(self, data): @@ -630,3 +662,216 @@ def singleton(value: str) -> "StringSetData": @staticmethod def identity_element() -> "StringSetData": return StringSetData() + + +class _BoundedTrieNode(object): + def __init__(self): + # invariant: size = len(self.flattened()) = min(1, sum(size of children)) + self._size = 1 + self._children: Optional[dict[str, '_BoundedTrieNode']] = {} + self._truncated = False + + def to_proto(self) -> metrics_pb2.BoundedTrieNode: + return metrics_pb2.BoundedTrieNode( + truncated=self._truncated, + children={ + name: child.to_proto() + for name, child in self._children.items() + } if self._children else None) + + @staticmethod + def from_proto(proto: metrics_pb2.BoundedTrieNode) -> '_BoundedTrieNode': + node = _BoundedTrieNode() + if proto.truncated: + node._truncated = True + node._children = None + else: + node._children = { + name: _BoundedTrieNode.from_proto(child) + for name, + child in proto.children.items() + } + node._size = min(1, sum(child._size for child in node._children.values())) + return node + + def size(self): + return self._size + + def add(self, segments) -> int: + if self._truncated or not segments: + return 0 + head, *tail = segments + was_empty = not self._children + child = self._children.get(head, None) # type: ignore[union-attr] + if child is None: + child = self._children[head] = _BoundedTrieNode() # type: ignore[index] + delta = 0 if was_empty else 1 + else: + delta = 0 + if tail: + delta += child.add(tail) + self._size += delta + return delta + + def add_all(self, segments_iter): + return sum(self.add(segments) for segments in segments_iter) + + def trim(self) -> int: + if not self._children: + return 0 + max_child = max(self._children.values(), key=lambda child: child._size) + if max_child._size == 1: + delta = 1 - self._size + self._truncated = True + self._children = None + else: + delta = max_child.trim() + self._size += delta + return delta + + def merge(self, other: '_BoundedTrieNode') -> int: + if self._truncated: + delta = 0 + elif other._truncated: + delta = 1 - self._size + self._truncated = True + self._children = None + elif not other._children: + delta = 0 + elif not self._children: + self._children = other._children + delta = self._size - other._size + else: + delta = 0 + other_child: '_BoundedTrieNode' + self_child: Optional['_BoundedTrieNode'] + for prefix, other_child in other._children.items(): + self_child = self._children.get(prefix, None) + if self_child is None: + self._children[prefix] = other_child + delta += other_child._size + else: + delta += self_child.merge(other_child) + self._size += delta + return delta + + def flattened(self): + if self._truncated: + yield (True, ) + elif not self._children: + yield (False, ) + else: + for prefix, child in sorted(self._children.items()): + for flattened in child.flattened(): + yield (prefix, ) + flattened + + def __hash__(self): + return self._truncated or hash(sorted(self._children.items())) + + def __eq__(self, other): + if isinstance(other, _BoundedTrieNode): + return ( + self._truncated == other._truncated and + self._children == other._children) + else: + return False + + def __repr__(self): + return repr(set(''.join(str(s) for s in t) for t in self.flattened())) + + +class BoundedTrieData(object): + _DEFAULT_BOUND = 100 + + def __init__(self, *, root=None, singleton=None, bound=_DEFAULT_BOUND): + assert singleton is None or root is None + self._singleton = singleton + self._root = root + self._bound = bound + + def to_proto(self) -> metrics_pb2.BoundedTrie: + return metrics_pb2.BoundedTrie( + bound=self._bound, + singleton=self._singlton if self._singleton else None, + root=self._root.to_proto() if self._root else None) + + @staticmethod + def from_proto(proto: metrics_pb2.BoundedTrie) -> 'BoundedTrieData': + return BoundedTrieData( + bound=proto.bound, + singleton=tuple(proto.singleton) if proto.singleton else None, + root=_BoundedTrieNode.from_proto(proto.root) if proto.root else None) + + def as_trie(self): + if self._root is not None: + return self._root + else: + root = _BoundedTrieNode() + if self._singleton is not None: + root.add(self._singleton) + return root + + def __eq__(self, other: object) -> bool: + if isinstance(other, BoundedTrieData): + return self.as_trie() == other.as_trie() + else: + return False + + def __hash__(self) -> int: + return hash(self.as_trie()) + + def __repr__(self) -> str: + return 'BoundedTrieData({})'.format(self.as_trie()) + + def get_cumulative(self) -> "BoundedTrieData": + return copy.deepcopy(self) + + def get_result(self) -> set[tuple]: + if self._root is None: + if self._singleton is None: + return set() + else: + return set([self._singleton + (False, )]) + else: + return set(self._root.flattened()) + + def add(self, segments): + if self._root is None and self._singleton is None: + self._singleton = segments + elif self._singleton is not None and self._singleton == segments: + # Optimize for the common case of re-adding the same value. + return + else: + if self._root is None: + self._root = self.as_trie() + self._root.add(segments) + if self._root._size > self._bound: + self._root.trim() + + def combine(self, other: "BoundedTrieData") -> "BoundedTrieData": + if self._root is None and self._singleton is None: + return other + elif other._root is None and other._singleton is None: + return self + else: + if self._root is None and other._root is not None: + self, other = other, self + combined = copy.deepcopy(self.as_trie()) + if other._root is not None: + combined.merge(other._root) + else: + combined.add(other._singleton) + self._bound = min(self._bound, other._bound) + while combined._size > self._bound: + combined.trim() + return BoundedTrieData(root=combined) + + @staticmethod + def singleton(value: str) -> "BoundedTrieData": + s = BoundedTrieData() + s.add(value) + return s + + @staticmethod + def identity_element() -> "BoundedTrieData": + return BoundedTrieData() diff --git a/sdks/python/apache_beam/metrics/cells_test.py b/sdks/python/apache_beam/metrics/cells_test.py index d1ee37b8ed82..68e3f12a73fd 100644 --- a/sdks/python/apache_beam/metrics/cells_test.py +++ b/sdks/python/apache_beam/metrics/cells_test.py @@ -17,9 +17,13 @@ # pytype: skip-file +import copy +import itertools +import random import threading import unittest +from apache_beam.metrics.cells import BoundedTrieData from apache_beam.metrics.cells import CounterCell from apache_beam.metrics.cells import DistributionCell from apache_beam.metrics.cells import DistributionData @@ -27,6 +31,7 @@ from apache_beam.metrics.cells import GaugeData from apache_beam.metrics.cells import StringSetCell from apache_beam.metrics.cells import StringSetData +from apache_beam.metrics.cells import _BoundedTrieNode from apache_beam.metrics.metricbase import MetricName @@ -203,5 +208,218 @@ def test_add_size_tracked_correctly(self): self.assertEqual(s.data.string_size, 3) +class TestBoundedTrieNode(unittest.TestCase): + @classmethod + def random_segments_fixed_depth(cls, n, depth, overlap, rand): + if depth == 0: + yield from ((), ) * n + else: + seen = [] + to_string = lambda ix: chr(ord('a') + ix) if ix < 26 else f'z{ix}' + for suffix in cls.random_segments_fixed_depth(n, depth - 1, overlap, + rand): + if not seen or rand.random() > overlap: + prefix = to_string(len(seen)) + seen.append(prefix) + else: + prefix = rand.choice(seen) + yield (prefix, ) + suffix + + @classmethod + def random_segments(cls, n, min_depth, max_depth, overlap, rand): + for depth, segments in zip( + itertools.cycle(range(min_depth, max_depth + 1)), + cls.random_segments_fixed_depth(n, max_depth, overlap, rand)): + yield segments[:depth] + + def assert_covers(self, node, expected, max_truncated=0): + self.assert_covers_flattened(node.flattened(), expected, max_truncated) + + def assert_covers_flattened(self, flattened, expected, max_truncated=0): + expected = set(expected) + # Split node into the exact and truncated segments. + partitioned = {True: set(), False: set()} + for segments in flattened: + partitioned[segments[-1]].add(segments[:-1]) + exact, truncated = partitioned[False], partitioned[True] + # Check we cover both parts. + self.assertLessEqual(len(truncated), max_truncated, truncated) + self.assertTrue(exact.issubset(expected), exact - expected) + seen_truncated = set() + for segments in expected - exact: + found = 0 + for ix in range(len(segments)): + if segments[:ix] in truncated: + seen_truncated.add(segments[:ix]) + found += 1 + if found != 1: + self.fail( + f"Expected exactly one prefix of {segments} " + f"to occur in {truncated}, found {found}") + self.assertEqual(seen_truncated, truncated, truncated - seen_truncated) + + def run_covers_test(self, flattened, expected, max_truncated): + def parse(s): + return tuple(s.strip('*')) + (s.endswith('*'), ) + + self.assert_covers_flattened([parse(s) for s in flattened], + [tuple(s) for s in expected], + max_truncated) + + def test_covers_exact(self): + self.run_covers_test(['ab', 'ac', 'cd'], ['ab', 'ac', 'cd'], 0) + with self.assertRaises(AssertionError): + self.run_covers_test(['ab', 'ac', 'cd'], ['ac', 'cd'], 0) + with self.assertRaises(AssertionError): + self.run_covers_test(['ab', 'ac'], ['ab', 'ac', 'cd'], 0) + with self.assertRaises(AssertionError): + self.run_covers_test(['a*', 'cd'], ['ab', 'ac', 'cd'], 0) + + def test_covers_trunacted(self): + self.run_covers_test(['a*', 'cd'], ['ab', 'ac', 'cd'], 1) + self.run_covers_test(['a*', 'cd'], ['ab', 'ac', 'abcde', 'cd'], 1) + with self.assertRaises(AssertionError): + self.run_covers_test(['ab', 'ac', 'cd'], ['ac', 'cd'], 1) + with self.assertRaises(AssertionError): + self.run_covers_test(['ab', 'ac'], ['ab', 'ac', 'cd'], 1) + with self.assertRaises(AssertionError): + self.run_covers_test(['a*', 'c*'], ['ab', 'ac', 'cd'], 1) + with self.assertRaises(AssertionError): + self.run_covers_test(['a*', 'c*'], ['ab', 'ac'], 1) + + def run_test(self, to_add): + everything = list(set(to_add)) + all_prefixees = set( + segments[:ix] for segments in everything for ix in range(len(segments))) + everything_deduped = set(everything) - all_prefixees + + # Check basic addition. + node = _BoundedTrieNode() + total_size = node.size() + self.assertEqual(total_size, 1) + for segments in everything: + total_size += node.add(segments) + self.assertEqual(node.size(), len(everything_deduped), node) + self.assertEqual(node.size(), total_size, node) + self.assert_covers(node, everything_deduped) + + # Check merging + node0 = _BoundedTrieNode() + node0.add_all(everything[0::2]) + node1 = _BoundedTrieNode() + node1.add_all(everything[1::2]) + pre_merge_size = node0.size() + merge_delta = node0.merge(node1) + self.assertEqual(node0.size(), pre_merge_size + merge_delta) + self.assertEqual(node0, node) + + # Check trimming. + if node.size() > 1: + trim_delta = node.trim() + self.assertLess(trim_delta, 0, node) + self.assertEqual(node.size(), total_size + trim_delta) + self.assert_covers(node, everything_deduped, max_truncated=1) + + if node.size() > 1: + trim2_delta = node.trim() + self.assertLess(trim2_delta, 0) + self.assertEqual(node.size(), total_size + trim_delta + trim2_delta) + self.assert_covers(node, everything_deduped, max_truncated=2) + + # Adding after trimming should be a no-op. + node_copy = copy.deepcopy(node) + for segments in everything: + self.assertEqual(node.add(segments), 0) + self.assertEqual(node, node_copy) + + # Merging after trimming should be a no-op. + self.assertEqual(node.merge(node0), 0) + self.assertEqual(node.merge(node1), 0) + self.assertEqual(node, node_copy) + + if node._truncated: + expected_delta = 0 + else: + expected_delta = 2 + + # Adding something new is not. + new_values = [('new1', ), ('new2', 'new2.1')] + self.assertEqual(node.add_all(new_values), expected_delta) + self.assert_covers( + node, list(everything_deduped) + new_values, max_truncated=2) + + # Nor is merging something new. + new_values_node = _BoundedTrieNode() + new_values_node.add_all(new_values) + self.assertEqual(node_copy.merge(new_values_node), expected_delta) + self.assert_covers( + node_copy, list(everything_deduped) + new_values, max_truncated=2) + + def run_fuzz(self, iterations=10, **params): + for _ in range(iterations): + seed = random.getrandbits(64) + segments = self.random_segments(**params, rand=random.Random(seed)) + try: + self.run_test(segments) + except: + print("SEED", seed) + raise + + def test_trivial(self): + self.run_test([('a', 'b'), ('a', 'c')]) + + def test_flat(self): + self.run_test([('a', 'a'), ('b', 'b'), ('c', 'c')]) + + def test_deep(self): + self.run_test([('a', ) * 10, ('b', ) * 12]) + + def test_small(self): + self.run_fuzz(n=5, min_depth=2, max_depth=3, overlap=0.5) + + def test_medium(self): + self.run_fuzz(n=20, min_depth=2, max_depth=4, overlap=0.5) + + def test_large_sparse(self): + self.run_fuzz(n=120, min_depth=2, max_depth=4, overlap=0.2) + + def test_large_dense(self): + self.run_fuzz(n=120, min_depth=2, max_depth=4, overlap=0.8) + + def test_bounded_trie_data_combine(self): + empty = BoundedTrieData() + # The merging here isn't complicated we're just ensuring that + # BoundedTrieData invokes _BoundedTrieNode correctly. + singletonA = BoundedTrieData(singleton=('a', 'a')) + singletonB = BoundedTrieData(singleton=('b', 'b')) + lots_root = _BoundedTrieNode() + lots_root.add_all([('c', 'c'), ('d', 'd')]) + lots = BoundedTrieData(root=lots_root) + self.assertEqual(empty.get_result(), set()) + self.assertEqual( + empty.combine(singletonA).get_result(), set([('a', 'a', False)])) + self.assertEqual( + singletonA.combine(empty).get_result(), set([('a', 'a', False)])) + self.assertEqual( + singletonA.combine(singletonB).get_result(), + set([('a', 'a', False), ('b', 'b', False)])) + self.assertEqual( + singletonA.combine(lots).get_result(), + set([('a', 'a', False), ('c', 'c', False), ('d', 'd', False)])) + self.assertEqual( + lots.combine(singletonA).get_result(), + set([('a', 'a', False), ('c', 'c', False), ('d', 'd', False)])) + + def test_bounded_trie_data_combine_trim(self): + left = _BoundedTrieNode() + left.add_all([('a', 'x'), ('b', 'd')]) + right = _BoundedTrieNode() + right.add_all([('a', 'y'), ('c', 'd')]) + self.assertEqual( + BoundedTrieData(root=left).combine( + BoundedTrieData(root=right, bound=3)).get_result(), + set([('a', True), ('b', 'd', False), ('c', 'd', False)])) + + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py index 5227a4c9872b..397fcc578d53 100644 --- a/sdks/python/apache_beam/metrics/monitoring_infos.py +++ b/sdks/python/apache_beam/metrics/monitoring_infos.py @@ -50,11 +50,14 @@ common_urns.monitoring_info_specs.USER_DISTRIBUTION_INT64.spec.urn) USER_GAUGE_URN = common_urns.monitoring_info_specs.USER_LATEST_INT64.spec.urn USER_STRING_SET_URN = common_urns.monitoring_info_specs.USER_SET_STRING.spec.urn +USER_BOUNDED_TRIE_URN = ( + common_urns.monitoring_info_specs.USER_BOUNDED_TRIE.spec.urn) USER_METRIC_URNS = set([ USER_COUNTER_URN, USER_DISTRIBUTION_URN, USER_GAUGE_URN, - USER_STRING_SET_URN + USER_STRING_SET_URN, + USER_BOUNDED_TRIE_URN, ]) WORK_REMAINING_URN = common_urns.monitoring_info_specs.WORK_REMAINING.spec.urn WORK_COMPLETED_URN = common_urns.monitoring_info_specs.WORK_COMPLETED.spec.urn @@ -72,11 +75,13 @@ LATEST_INT64_TYPE = common_urns.monitoring_info_types.LATEST_INT64_TYPE.urn PROGRESS_TYPE = common_urns.monitoring_info_types.PROGRESS_TYPE.urn STRING_SET_TYPE = common_urns.monitoring_info_types.SET_STRING_TYPE.urn +BOUNDED_TRIE_TYPE = common_urns.monitoring_info_types.BOUNDED_TRIE_TYPE.urn COUNTER_TYPES = set([SUM_INT64_TYPE]) DISTRIBUTION_TYPES = set([DISTRIBUTION_INT64_TYPE]) GAUGE_TYPES = set([LATEST_INT64_TYPE]) STRING_SET_TYPES = set([STRING_SET_TYPE]) +BOUNDED_TRIE_TYPES = set([BOUNDED_TRIE_TYPE]) # TODO(migryz) extract values from beam_fn_api.proto::MonitoringInfoLabels PCOLLECTION_LABEL = ( @@ -320,6 +325,23 @@ def user_set_string(namespace, name, metric, ptransform=None): USER_STRING_SET_URN, STRING_SET_TYPE, metric, labels) +def user_bounded_trie(namespace, name, metric, ptransform=None): + """Return the string set monitoring info for the URN, metric and labels. + + Args: + namespace: User-defined namespace of BoundedTrie. + name: Name of BoundedTrie. + metric: The BoundedTrieData representing the metrics. + ptransform: The ptransform id used as a label. + """ + labels = create_labels(ptransform=ptransform, namespace=namespace, name=name) + return create_monitoring_info( + USER_BOUNDED_TRIE_URN, + BOUNDED_TRIE_TYPE, + metric.to_proto().SerializeToString(), + labels) + + def create_monitoring_info( urn, type_urn, payload, labels=None) -> metrics_pb2.MonitoringInfo: """Return the gauge monitoring info for the URN, type, metric and labels.