Skip to content

Commit

Permalink
Merge pull request apache#33196 Add bounded Trie metric type to Python.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Dec 11, 2024
2 parents a272823 + 3dcd104 commit d9092a5
Show file tree
Hide file tree
Showing 5 changed files with 538 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ message MonitoringInfoSpecs {
}]
}];

// Represents a set of strings seen across bundles.
USER_BOUNDED_TRIE = 22 [(monitoring_info_spec) = {
urn: "beam:metric:user:bounded_trie:v1",
type: "beam:metrics:bounded_trie:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
}]
}];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down Expand Up @@ -576,6 +587,12 @@ message MonitoringInfoTypeUrns {
SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:set_string:v1"];

// Represents a bounded trie of strings.
//
// Encoding: BoundedTrie proto
BOUNDED_TRIE_TYPE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:bounded_trie:v1"];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand All @@ -588,6 +605,30 @@ message MonitoringInfoTypeUrns {
}
}


// A single node in a BoundedTrie.
message BoundedTrieNode {
// Whether this node has been truncated.
// A truncated leaf represents possibly many children with the same prefix.
bool truncated = 1;

// Children of this node. Must be empty if truncated is true.
map<string, BoundedTrieNode> children = 2;
}

// The message type used for encoding metrics of type bounded trie.
message BoundedTrie {
// The maximum number of elements to store before truncation.
int32 bound = 1;

// A compact representation of all the elements in this trie.
BoundedTrieNode root = 2;

// A more efficient representation for metrics consisting of a single value.
repeated string singleton = 3;
}


// General monitored state information which contains structured information
// which does not fit into a typical metric format.
//
Expand Down
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/metrics/cells.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,14 @@ cdef class DistributionData(object):
cdef readonly libc.stdint.int64_t count
cdef readonly libc.stdint.int64_t min
cdef readonly libc.stdint.int64_t max


cdef class _BoundedTrieNode(object):
cdef readonly libc.stdint.int64_t _size
cdef readonly dict _children
cdef readonly bint _truncated

cdef class BoundedTrieData(object):
cdef readonly libc.stdint.int64_t _bound
cdef readonly object _singleton
cdef readonly _BoundedTrieNode _root
245 changes: 245 additions & 0 deletions sdks/python/apache_beam/metrics/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

# pytype: skip-file

import copy
import logging
import threading
import time
Expand All @@ -31,6 +32,8 @@
from typing import Optional
from typing import Set

from apache_beam.portability.api import metrics_pb2

try:
import cython
except ImportError:
Expand Down Expand Up @@ -312,6 +315,35 @@ def to_runner_api_monitoring_info_impl(self, name, transform_id):
ptransform=transform_id)


class BoundedTrieCell(AbstractMetricCell):
"""For internal use only; no backwards-compatibility guarantees.
Tracks the current value for a BoundedTrie metric.
Each cell tracks the state of a metric independently per context per bundle.
Therefore, each metric has a different cell in each bundle, that is later
aggregated.
This class is thread safe.
"""
def __init__(self):
super().__init__(BoundedTrieData)

def add(self, value):
self.update(value)

def _update_locked(self, value):
self.data.add(value)

def to_runner_api_monitoring_info_impl(self, name, transform_id):
from apache_beam.metrics import monitoring_infos
return monitoring_infos.user_bounded_trie(
name.namespace,
name.name,
self.get_cumulative(),
ptransform=transform_id)


class DistributionResult(object):
"""The result of a Distribution metric."""
def __init__(self, data):
Expand Down Expand Up @@ -630,3 +662,216 @@ def singleton(value: str) -> "StringSetData":
@staticmethod
def identity_element() -> "StringSetData":
return StringSetData()


class _BoundedTrieNode(object):
def __init__(self):
# invariant: size = len(self.flattened()) = min(1, sum(size of children))
self._size = 1
self._children: Optional[dict[str, '_BoundedTrieNode']] = {}
self._truncated = False

def to_proto(self) -> metrics_pb2.BoundedTrieNode:
return metrics_pb2.BoundedTrieNode(
truncated=self._truncated,
children={
name: child.to_proto()
for name, child in self._children.items()
} if self._children else None)

@staticmethod
def from_proto(proto: metrics_pb2.BoundedTrieNode) -> '_BoundedTrieNode':
node = _BoundedTrieNode()
if proto.truncated:
node._truncated = True
node._children = None
else:
node._children = {
name: _BoundedTrieNode.from_proto(child)
for name,
child in proto.children.items()
}
node._size = min(1, sum(child._size for child in node._children.values()))
return node

def size(self):
return self._size

def add(self, segments) -> int:
if self._truncated or not segments:
return 0
head, *tail = segments
was_empty = not self._children
child = self._children.get(head, None) # type: ignore[union-attr]
if child is None:
child = self._children[head] = _BoundedTrieNode() # type: ignore[index]
delta = 0 if was_empty else 1
else:
delta = 0
if tail:
delta += child.add(tail)
self._size += delta
return delta

def add_all(self, segments_iter):
return sum(self.add(segments) for segments in segments_iter)

def trim(self) -> int:
if not self._children:
return 0
max_child = max(self._children.values(), key=lambda child: child._size)
if max_child._size == 1:
delta = 1 - self._size
self._truncated = True
self._children = None
else:
delta = max_child.trim()
self._size += delta
return delta

def merge(self, other: '_BoundedTrieNode') -> int:
if self._truncated:
delta = 0
elif other._truncated:
delta = 1 - self._size
self._truncated = True
self._children = None
elif not other._children:
delta = 0
elif not self._children:
self._children = other._children
delta = self._size - other._size
else:
delta = 0
other_child: '_BoundedTrieNode'
self_child: Optional['_BoundedTrieNode']
for prefix, other_child in other._children.items():
self_child = self._children.get(prefix, None)
if self_child is None:
self._children[prefix] = other_child
delta += other_child._size
else:
delta += self_child.merge(other_child)
self._size += delta
return delta

def flattened(self):
if self._truncated:
yield (True, )
elif not self._children:
yield (False, )
else:
for prefix, child in sorted(self._children.items()):
for flattened in child.flattened():
yield (prefix, ) + flattened

def __hash__(self):
return self._truncated or hash(sorted(self._children.items()))

def __eq__(self, other):
if isinstance(other, _BoundedTrieNode):
return (
self._truncated == other._truncated and
self._children == other._children)
else:
return False

def __repr__(self):
return repr(set(''.join(str(s) for s in t) for t in self.flattened()))


class BoundedTrieData(object):
_DEFAULT_BOUND = 100

def __init__(self, *, root=None, singleton=None, bound=_DEFAULT_BOUND):
assert singleton is None or root is None
self._singleton = singleton
self._root = root
self._bound = bound

def to_proto(self) -> metrics_pb2.BoundedTrie:
return metrics_pb2.BoundedTrie(
bound=self._bound,
singleton=self._singlton if self._singleton else None,
root=self._root.to_proto() if self._root else None)

@staticmethod
def from_proto(proto: metrics_pb2.BoundedTrie) -> 'BoundedTrieData':
return BoundedTrieData(
bound=proto.bound,
singleton=tuple(proto.singleton) if proto.singleton else None,
root=_BoundedTrieNode.from_proto(proto.root) if proto.root else None)

def as_trie(self):
if self._root is not None:
return self._root
else:
root = _BoundedTrieNode()
if self._singleton is not None:
root.add(self._singleton)
return root

def __eq__(self, other: object) -> bool:
if isinstance(other, BoundedTrieData):
return self.as_trie() == other.as_trie()
else:
return False

def __hash__(self) -> int:
return hash(self.as_trie())

def __repr__(self) -> str:
return 'BoundedTrieData({})'.format(self.as_trie())

def get_cumulative(self) -> "BoundedTrieData":
return copy.deepcopy(self)

def get_result(self) -> set[tuple]:
if self._root is None:
if self._singleton is None:
return set()
else:
return set([self._singleton + (False, )])
else:
return set(self._root.flattened())

def add(self, segments):
if self._root is None and self._singleton is None:
self._singleton = segments
elif self._singleton is not None and self._singleton == segments:
# Optimize for the common case of re-adding the same value.
return
else:
if self._root is None:
self._root = self.as_trie()
self._root.add(segments)
if self._root._size > self._bound:
self._root.trim()

def combine(self, other: "BoundedTrieData") -> "BoundedTrieData":
if self._root is None and self._singleton is None:
return other
elif other._root is None and other._singleton is None:
return self
else:
if self._root is None and other._root is not None:
self, other = other, self
combined = copy.deepcopy(self.as_trie())
if other._root is not None:
combined.merge(other._root)
else:
combined.add(other._singleton)
self._bound = min(self._bound, other._bound)
while combined._size > self._bound:
combined.trim()
return BoundedTrieData(root=combined)

@staticmethod
def singleton(value: str) -> "BoundedTrieData":
s = BoundedTrieData()
s.add(value)
return s

@staticmethod
def identity_element() -> "BoundedTrieData":
return BoundedTrieData()
Loading

0 comments on commit d9092a5

Please sign in to comment.