Skip to content

Commit

Permalink
Add bounded Trie metric type.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Nov 26, 2024
1 parent d37d141 commit 60acaee
Show file tree
Hide file tree
Showing 3 changed files with 436 additions and 0 deletions.
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/metrics/cells.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,14 @@ cdef class DistributionData(object):
cdef readonly libc.stdint.int64_t count
cdef readonly libc.stdint.int64_t min
cdef readonly libc.stdint.int64_t max


cdef class _BoundedTrieNode(object):
cdef readonly libc.stdint.int64_t _size
cdef readonly dict _children
cdef readonly bint _truncated

cdef class BoundedTrieData(object):
cdef readonly libc.stdint.int64_t _bound
cdef readonly object _singleton
cdef readonly _BoundedTrieNode _root
207 changes: 207 additions & 0 deletions sdks/python/apache_beam/metrics/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

# pytype: skip-file

import copy
import logging
import threading
import time
Expand Down Expand Up @@ -312,6 +313,35 @@ def to_runner_api_monitoring_info_impl(self, name, transform_id):
ptransform=transform_id)


class BoundedTrieCell(AbstractMetricCell):
"""For internal use only; no backwards-compatibility guarantees.
Tracks the current value for a StringSet metric.
Each cell tracks the state of a metric independently per context per bundle.
Therefore, each metric has a different cell in each bundle, that is later
aggregated.
This class is thread safe.
"""
def __init__(self):
super().__init__(BoundedTrieData)

def add(self, value):
self.update(value)

def _update_locked(self, value):
self.data.add(value)

def to_runner_api_monitoring_info_impl(self, name, transform_id):
from apache_beam.metrics import monitoring_infos
return monitoring_infos.user_bounded_trie(
name.namespace,
name.name,
self.get_cumulative(),
ptransform=transform_id)


class DistributionResult(object):
"""The result of a Distribution metric."""
def __init__(self, data):
Expand Down Expand Up @@ -630,3 +660,180 @@ def singleton(value: str) -> "StringSetData":
@staticmethod
def identity_element() -> "StringSetData":
return StringSetData()


class _BoundedTrieNode(object):
def __init__(self):
# invariant: size = len(self.flattened()) = min(1, sum(size of children))
self._size = 1
self._children: Optional[dict[str, '_BoundedTrieNode']] = {}
self._truncated = False

def size(self):
return self._size

def add(self, segments) -> int:
if self._truncated or not segments:
return 0
head, *tail = segments
was_empty = not self._children
child = self._children.get(head, None) # type: ignore[union-attr]
if child is None:
child = self._children[head] = _BoundedTrieNode() # type: ignore[index]
delta = 0 if was_empty else 1
else:
delta = 0
if tail:
delta += child.add(tail)
self._size += delta
return delta

def add_all(self, segments_iter):
return sum(self.add(segments) for segments in segments_iter)

def trim(self) -> int:
if not self._children:
return 0
max_child = max(self._children.values(), key=lambda child: child._size)
if max_child._size == 1:
delta = 1 - self._size
self._truncated = True
self._children = None
else:
delta = max_child.trim()
self._size += delta
return delta

def merge(self, other: '_BoundedTrieNode') -> int:
if self._truncated:
delta = 0
elif other._truncated:
delta = 1 - self._size
self._truncated = True
self._children = None
elif not other._children:
delta = 0
elif not self._children:
self._children = other._children
delta = self._size - other._size
else:
delta = 0
other_child : '_BoundedTrieNode'
self_child: Optional['_BoundedTrieNode']
for prefix, other_child in other._children.items():
self_child = self._children.get(prefix, None)
if self_child is None:
self._children[prefix] = other_child
delta += other_child._size
else:
delta += self_child.merge(other_child)
self._size += delta
return delta

def flattened(self):
if self._truncated:
yield (True, )
elif not self._children:
yield (False, )
else:
for prefix, child in sorted(self._children.items()):
for flattened in child.flattened():
yield (prefix, ) + flattened

def __hash__(self):
return self._truncated or hash(sorted(self._children.items()))

def __eq__(self, other):
if isinstance(other, _BoundedTrieNode):
return (
self._truncated == other._truncated and
self._children == other._children)
else:
return False

def __repr__(self):
return repr(set(''.join(str(s) for s in t) for t in self.flattened()))


class BoundedTrieData(object):
_DEFAULT_BOUND = 100

def __init__(self, *, root=None, singleton=None, bound=_DEFAULT_BOUND):
assert singleton is None or root is None
self._singleton = singleton
self._root = root
self._bound = bound

def as_trie(self):
if self._root is not None:
return self._root
else:
root = _BoundedTrieNode()
if self._singleton is not None:
root.add(self._singleton)
return root

def __eq__(self, other: object) -> bool:
if isinstance(other, BoundedTrieData):
return self.as_trie() == other.as_trie()
else:
return False

def __hash__(self) -> int:
return hash(self.as_trie())

def __repr__(self) -> str:
return 'BoundedTrieData({})'.format(self.as_trie())

def get_cumulative(self) -> "BoundedTrieData":
return copy.deepcopy(self)

def get_result(self) -> set[tuple]:
if self._root is None:
if self._singleton is None:
return set()
else:
return set([self._singleton + (False, )])
else:
return set(self._root.flattened())

def add(self, segments):
if self._root is None and self._singleton is None:
self._singleton = segments
elif self._singleton is not None and self._singleton == segments:
# Optimize for the common case of re-adding the same value.
return
else:
if self._root is None:
self._root = self.as_trie()
self._root.add(segments)
if self._root._size > self._bound:
self._root.trim()

def combine(self, other: "BoundedTrieData") -> "BoundedTrieData":
if self._root is None and self._singleton is None:
return other
elif other._root is None and other._singleton is None:
return self
else:
if self._root is None and other._root is not None:
self, other = other, self
combined = copy.deepcopy(self.as_trie())
if other._root is not None:
combined.merge(other._root)
else:
combined.add(other._singleton)
self._bound = min(self._bound, other._bound)
while combined._size > self._bound:
combined.trim()
return BoundedTrieData(root=combined)

@staticmethod
def singleton(value: str) -> "BoundedTrieData":
s = BoundedTrieData()
s.add(value)
return s

@staticmethod
def identity_element() -> "BoundedTrieData":
return BoundedTrieData()
Loading

0 comments on commit 60acaee

Please sign in to comment.