diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index 802d814c965..2da7b5fec3c 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -32,6 +32,8 @@ from typing import Optional from typing import Set +from apache_beam.portability.api import metrics_pb2 + try: import cython except ImportError: @@ -669,6 +671,29 @@ def __init__(self): self._children: Optional[dict[str, '_BoundedTrieNode']] = {} self._truncated = False + def to_proto(self) -> metrics_pb2.BoundedTrieNode: + return metrics_pb2.BoundedTrieNode( + truncated=self._truncated, + children={ + name: child.to_proto() + for name, child in self._children.items() + } if self._children else None) + + @staticmethod + def from_proto(proto: metrics_pb2.BoundedTrieNode) -> '_BoundedTrieNode': + node = _BoundedTrieNode() + if proto.truncated: + node._truncated = True + node._children = None + else: + node._children = { + name: _BoundedTrieNode.from_proto(child) + for name, + child in proto.children.items() + } + node._size = min(1, sum(child._size for child in node._children.values())) + return node + def size(self): return self._size @@ -760,6 +785,19 @@ def __init__(self, *, root=None, singleton=None, bound=_DEFAULT_BOUND): self._root = root self._bound = bound + def to_proto(self) -> metrics_pb2.BoundedTrie: + return metrics_pb2.BoundedTrie( + bound=self._bound, + singleton=self._singlton if self._singleton else None, + root=self._root.to_proto() if self._root else None) + + @staticmethod + def from_proto(proto: metrics_pb2.BoundedTrie) -> 'BoundedTrieData': + return BoundedTrieData( + bound=proto.bound, + singleton=tuple(proto.singleton) if proto.singleton else None, + root=_BoundedTrieNode.from_proto(proto.root) if proto.root else None) + def as_trie(self): if self._root is not None: return self._root diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py index 5227a4c9872..397fcc578d5 100644 --- a/sdks/python/apache_beam/metrics/monitoring_infos.py +++ b/sdks/python/apache_beam/metrics/monitoring_infos.py @@ -50,11 +50,14 @@ common_urns.monitoring_info_specs.USER_DISTRIBUTION_INT64.spec.urn) USER_GAUGE_URN = common_urns.monitoring_info_specs.USER_LATEST_INT64.spec.urn USER_STRING_SET_URN = common_urns.monitoring_info_specs.USER_SET_STRING.spec.urn +USER_BOUNDED_TRIE_URN = ( + common_urns.monitoring_info_specs.USER_BOUNDED_TRIE.spec.urn) USER_METRIC_URNS = set([ USER_COUNTER_URN, USER_DISTRIBUTION_URN, USER_GAUGE_URN, - USER_STRING_SET_URN + USER_STRING_SET_URN, + USER_BOUNDED_TRIE_URN, ]) WORK_REMAINING_URN = common_urns.monitoring_info_specs.WORK_REMAINING.spec.urn WORK_COMPLETED_URN = common_urns.monitoring_info_specs.WORK_COMPLETED.spec.urn @@ -72,11 +75,13 @@ LATEST_INT64_TYPE = common_urns.monitoring_info_types.LATEST_INT64_TYPE.urn PROGRESS_TYPE = common_urns.monitoring_info_types.PROGRESS_TYPE.urn STRING_SET_TYPE = common_urns.monitoring_info_types.SET_STRING_TYPE.urn +BOUNDED_TRIE_TYPE = common_urns.monitoring_info_types.BOUNDED_TRIE_TYPE.urn COUNTER_TYPES = set([SUM_INT64_TYPE]) DISTRIBUTION_TYPES = set([DISTRIBUTION_INT64_TYPE]) GAUGE_TYPES = set([LATEST_INT64_TYPE]) STRING_SET_TYPES = set([STRING_SET_TYPE]) +BOUNDED_TRIE_TYPES = set([BOUNDED_TRIE_TYPE]) # TODO(migryz) extract values from beam_fn_api.proto::MonitoringInfoLabels PCOLLECTION_LABEL = ( @@ -320,6 +325,23 @@ def user_set_string(namespace, name, metric, ptransform=None): USER_STRING_SET_URN, STRING_SET_TYPE, metric, labels) +def user_bounded_trie(namespace, name, metric, ptransform=None): + """Return the string set monitoring info for the URN, metric and labels. + + Args: + namespace: User-defined namespace of BoundedTrie. + name: Name of BoundedTrie. + metric: The BoundedTrieData representing the metrics. + ptransform: The ptransform id used as a label. + """ + labels = create_labels(ptransform=ptransform, namespace=namespace, name=name) + return create_monitoring_info( + USER_BOUNDED_TRIE_URN, + BOUNDED_TRIE_TYPE, + metric.to_proto().SerializeToString(), + labels) + + def create_monitoring_info( urn, type_urn, payload, labels=None) -> metrics_pb2.MonitoringInfo: """Return the gauge monitoring info for the URN, type, metric and labels.