diff --git a/sdks/python/apache_beam/metrics/cells.pxd b/sdks/python/apache_beam/metrics/cells.pxd index 7590bd8b5966..ebadeec97984 100644 --- a/sdks/python/apache_beam/metrics/cells.pxd +++ b/sdks/python/apache_beam/metrics/cells.pxd @@ -55,6 +55,10 @@ cdef class StringSetCell(AbstractMetricCell): pass +cdef class BoundedTrieCell(AbstractMetricCell): + pass + + cdef class DistributionData(object): cdef readonly libc.stdint.int64_t sum cdef readonly libc.stdint.int64_t count diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index b765c830f69c..92f5c7cbe2f6 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -691,12 +691,18 @@ def from_proto(proto: metrics_pb2.BoundedTrieNode) -> '_BoundedTrieNode': for name, child in proto.children.items() } - node._size = min(1, sum(child._size for child in node._children.values())) + node._size = max(1, sum(child._size for child in node._children.values())) return node def size(self): return self._size + def contains(self, segments): + if self._truncated or not segments: + return True + head, *tail = segments + return head in self._children and self._children[head].contains(tail) + def add(self, segments) -> int: if self._truncated or not segments: return 0 @@ -784,15 +790,31 @@ class BoundedTrieData(object): _DEFAULT_BOUND = 100 def __init__(self, *, root=None, singleton=None, bound=_DEFAULT_BOUND): - assert singleton is None or root is None self._singleton = singleton self._root = root self._bound = bound + assert singleton is None or root is None + + def size(self): + if self._singleton is not None: + return 1 + elif self._root is not None: + return self._root.size() + else: + return 0 + + def contains(self, value): + if self._singleton is not None: + return tuple(value) == self._singleton + elif self._root is not None: + return self._root.contains(value) + else: + return False def to_proto(self) -> metrics_pb2.BoundedTrie: return metrics_pb2.BoundedTrie( bound=self._bound, - singleton=self._singlton if self._singleton else None, + singleton=self._singleton if self._singleton else None, root=self._root.to_proto() if self._root else None) @staticmethod @@ -844,6 +866,7 @@ def add(self, segments): else: if self._root is None: self._root = self.as_trie() + self._singleton = None self._root.add(segments) if self._root._size > self._bound: self._root.trim() diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index fa70d3a4d9c0..c28c8340a505 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -43,6 +43,7 @@ from typing import cast from apache_beam.metrics import monitoring_infos +from apache_beam.metrics.cells import BoundedTrieCell from apache_beam.metrics.cells import CounterCell from apache_beam.metrics.cells import DistributionCell from apache_beam.metrics.cells import GaugeCell @@ -52,6 +53,7 @@ from apache_beam.runners.worker.statesampler import get_current_tracker if TYPE_CHECKING: + from apache_beam.metrics.cells import BoundedTrieData from apache_beam.metrics.cells import GaugeData from apache_beam.metrics.cells import DistributionData from apache_beam.metrics.cells import MetricCell @@ -265,6 +267,9 @@ def get_string_set(self, metric_name): StringSetCell, self.get_metric_cell(_TypedMetricName(StringSetCell, metric_name))) + def get_bounded_trie(self, metric_name): + return self.get_metric_cell(_TypedMetricName(BoundedTrieCell, metric_name)) + def get_metric_cell(self, typed_metric_name): # type: (_TypedMetricName) -> MetricCell cell = self.metrics.get(typed_metric_name, None) @@ -304,7 +309,14 @@ def get_cumulative(self): v in self.metrics.items() if k.cell_type == StringSetCell } - return MetricUpdates(counters, distributions, gauges, string_sets) + bounded_tries = { + MetricKey(self.step_name, k.metric_name): v.get_cumulative() + for k, + v in self.metrics.items() if k.cell_type == BoundedTrieCell + } + + return MetricUpdates( + counters, distributions, gauges, string_sets, bounded_tries) def to_runner_api(self): return [ @@ -358,6 +370,7 @@ def __init__( distributions=None, # type: Optional[Dict[MetricKey, DistributionData]] gauges=None, # type: Optional[Dict[MetricKey, GaugeData]] string_sets=None, # type: Optional[Dict[MetricKey, StringSetData]] + bounded_tries=None, # type: Optional[Dict[MetricKey, BoundedTrieData]] ): # type: (...) -> None @@ -368,8 +381,10 @@ def __init__( distributions: Dictionary of MetricKey:MetricUpdate objects. gauges: Dictionary of MetricKey:MetricUpdate objects. string_sets: Dictionary of MetricKey:MetricUpdate objects. + bounded_tries: Dictionary of MetricKey:MetricUpdate objects. """ self.counters = counters or {} self.distributions = distributions or {} self.gauges = gauges or {} self.string_sets = string_sets or {} + self.bounded_tries = bounded_tries or {} diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 3e665dd805ea..33af25e20ca4 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -42,6 +42,7 @@ from apache_beam.metrics import cells from apache_beam.metrics.execution import MetricResult from apache_beam.metrics.execution import MetricUpdater +from apache_beam.metrics.metricbase import BoundedTrie from apache_beam.metrics.metricbase import Counter from apache_beam.metrics.metricbase import Distribution from apache_beam.metrics.metricbase import Gauge @@ -135,6 +136,22 @@ def string_set( namespace = Metrics.get_namespace(namespace) return Metrics.DelegatingStringSet(MetricName(namespace, name)) + @staticmethod + def bounded_trie( + namespace: Union[Type, str], + name: str) -> 'Metrics.DelegatingBoundedTrie': + """Obtains or creates a Bounded Trie metric. + + Args: + namespace: A class or string that gives the namespace to a metric + name: A string that gives a unique name to a metric + + Returns: + A BoundedTrie object. + """ + namespace = Metrics.get_namespace(namespace) + return Metrics.DelegatingBoundedTrie(MetricName(namespace, name)) + class DelegatingCounter(Counter): """Metrics Counter that Delegates functionality to MetricsEnvironment.""" def __init__( @@ -164,12 +181,19 @@ def __init__(self, metric_name: MetricName) -> None: super().__init__(metric_name) self.add = MetricUpdater(cells.StringSetCell, metric_name) # type: ignore[method-assign] + class DelegatingBoundedTrie(BoundedTrie): + """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" + def __init__(self, metric_name: MetricName) -> None: + super().__init__(metric_name) + self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign] + class MetricResults(object): COUNTERS = "counters" DISTRIBUTIONS = "distributions" GAUGES = "gauges" STRINGSETS = "string_sets" + BOUNDED_TRIES = "bounded_tries" @staticmethod def _matches_name(filter: 'MetricsFilter', metric_key: 'MetricKey') -> bool: diff --git a/sdks/python/apache_beam/metrics/metricbase.py b/sdks/python/apache_beam/metrics/metricbase.py index 7819dbb093a5..9b35bb24f895 100644 --- a/sdks/python/apache_beam/metrics/metricbase.py +++ b/sdks/python/apache_beam/metrics/metricbase.py @@ -43,6 +43,7 @@ 'Distribution', 'Gauge', 'StringSet', + 'BoundedTrie', 'Histogram', 'MetricName' ] @@ -152,6 +153,14 @@ def add(self, value): raise NotImplementedError +class BoundedTrie(Metric): + """BoundedTrie Metric interface. + + Reports set of unique string values during pipeline execution..""" + def add(self, value): + raise NotImplementedError + + class Histogram(Metric): """Histogram Metric interface. diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py index 397fcc578d53..cb4e60e218f6 100644 --- a/sdks/python/apache_beam/metrics/monitoring_infos.py +++ b/sdks/python/apache_beam/metrics/monitoring_infos.py @@ -27,6 +27,7 @@ from apache_beam.coders import coder_impl from apache_beam.coders import coders +from apache_beam.metrics.cells import BoundedTrieData from apache_beam.metrics.cells import DistributionData from apache_beam.metrics.cells import DistributionResult from apache_beam.metrics.cells import GaugeData @@ -168,6 +169,14 @@ def extract_string_set_value(monitoring_info_proto): return set(coder.decode(monitoring_info_proto.payload)) +def extract_bounded_trie_value(monitoring_info_proto): + if not is_bounded_trie(monitoring_info_proto): + raise ValueError('Unsupported type %s' % monitoring_info_proto.type) + + return BoundedTrieData.from_proto( + metrics_pb2.BoundedTrie.FromString(monitoring_info_proto.payload)) + + def create_labels(ptransform=None, namespace=None, name=None, pcollection=None): """Create the label dictionary based on the provided values. @@ -382,6 +391,11 @@ def is_string_set(monitoring_info_proto): return monitoring_info_proto.type in STRING_SET_TYPES +def is_bounded_trie(monitoring_info_proto): + """Returns true if the monitoring info is a BoundedTrie metric.""" + return monitoring_info_proto.type in BOUNDED_TRIE_TYPES + + def is_user_monitoring_info(monitoring_info_proto): """Returns true if the monitoring info is a user metric.""" return monitoring_info_proto.urn in USER_METRIC_URNS @@ -389,7 +403,7 @@ def is_user_monitoring_info(monitoring_info_proto): def extract_metric_result_map_value( monitoring_info_proto -) -> Union[None, int, DistributionResult, GaugeResult, set]: +) -> Union[None, int, DistributionResult, GaugeResult, set, BoundedTrieData]: """Returns the relevant GaugeResult, DistributionResult or int value for counter metric, set for StringSet metric. @@ -407,6 +421,8 @@ def extract_metric_result_map_value( return GaugeResult(GaugeData(value, timestamp)) if is_string_set(monitoring_info_proto): return extract_string_set_value(monitoring_info_proto) + if is_bounded_trie(monitoring_info_proto): + return extract_bounded_trie_value(monitoring_info_proto) return None diff --git a/sdks/python/apache_beam/runners/direct/direct_metrics.py b/sdks/python/apache_beam/runners/direct/direct_metrics.py index d20849d769af..5beb19d4610a 100644 --- a/sdks/python/apache_beam/runners/direct/direct_metrics.py +++ b/sdks/python/apache_beam/runners/direct/direct_metrics.py @@ -27,6 +27,7 @@ from typing import Any from typing import SupportsInt +from apache_beam.metrics.cells import BoundedTrieData from apache_beam.metrics.cells import DistributionData from apache_beam.metrics.cells import GaugeData from apache_beam.metrics.cells import StringSetData @@ -102,6 +103,8 @@ def __init__(self): lambda: DirectMetric(GenericAggregator(GaugeData))) self._string_sets = defaultdict( lambda: DirectMetric(GenericAggregator(StringSetData))) + self._bounded_tries = defaultdict( + lambda: DirectMetric(GenericAggregator(BoundedTrieData))) def _apply_operation(self, bundle, updates, op): for k, v in updates.counters.items(): @@ -116,6 +119,9 @@ def _apply_operation(self, bundle, updates, op): for k, v in updates.string_sets.items(): op(self._string_sets[k], bundle, v) + for k, v in updates.bounded_tries.items(): + op(self._bounded_tries[k], bundle, v) + def commit_logical(self, bundle, updates): op = lambda obj, bundle, update: obj.commit_logical(bundle, update) self._apply_operation(bundle, updates, op) @@ -157,12 +163,20 @@ def query(self, filter=None): v.extract_latest_attempted()) for k, v in self._string_sets.items() if self.matches(filter, k) ] + bounded_tries = [ + MetricResult( + MetricKey(k.step, k.metric), + v.extract_committed(), + v.extract_latest_attempted()) for k, + v in self._bounded_tries.items() if self.matches(filter, k) + ] return { self.COUNTERS: counters, self.DISTRIBUTIONS: distributions, self.GAUGES: gauges, - self.STRINGSETS: string_sets + self.STRINGSETS: string_sets, + self.BOUNDED_TRIES: bounded_tries, } diff --git a/sdks/python/apache_beam/runners/direct/direct_runner_test.py b/sdks/python/apache_beam/runners/direct/direct_runner_test.py index d8f1ea097b88..1af5f1bc7bea 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner_test.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner_test.py @@ -78,6 +78,8 @@ def process(self, element): distro.update(element) str_set = Metrics.string_set(self.__class__, 'element_str_set') str_set.add(str(element % 4)) + Metrics.bounded_trie(self.__class__, 'element_bounded_trie').add( + ("a", "b", str(element % 4))) return [element] p = Pipeline(DirectRunner()) @@ -124,6 +126,14 @@ def process(self, element): hc.assert_that(len(str_set_result.committed), hc.equal_to(4)) hc.assert_that(len(str_set_result.attempted), hc.equal_to(4)) + bounded_trie_results = metrics['bounded_tries'][0] + hc.assert_that( + bounded_trie_results.key, + hc.equal_to( + MetricKey('Do', MetricName(namespace, 'element_bounded_trie')))) + hc.assert_that(bounded_trie_results.committed.size(), hc.equal_to(4)) + hc.assert_that(bounded_trie_results.attempted.size(), hc.equal_to(4)) + def test_create_runner(self): self.assertTrue(isinstance(create_runner('DirectRunner'), DirectRunner)) self.assertTrue( diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index 4dc2446fdd9d..30f1a4c06025 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -303,7 +303,7 @@ def test_flattened_side_input(self): super().test_flattened_side_input(with_transcoding=False) def test_metrics(self): - super().test_metrics(check_gauge=False) + super().test_metrics(check_gauge=False, check_bounded_trie=False) def test_sdf_with_watermark_tracking(self): raise unittest.SkipTest("BEAM-2939") diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 1ed21942d28f..95bcb7567918 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -1536,16 +1536,18 @@ def __init__(self, step_monitoring_infos, user_metrics_only=True): self._distributions = {} self._gauges = {} self._string_sets = {} + self._bounded_tries = {} self._user_metrics_only = user_metrics_only self._monitoring_infos = step_monitoring_infos for smi in step_monitoring_infos.values(): - counters, distributions, gauges, string_sets = \ - portable_metrics.from_monitoring_infos(smi, user_metrics_only) + counters, distributions, gauges, string_sets, bounded_tries = ( + portable_metrics.from_monitoring_infos(smi, user_metrics_only)) self._counters.update(counters) self._distributions.update(distributions) self._gauges.update(gauges) self._string_sets.update(string_sets) + self._bounded_tries.update(bounded_tries) def query(self, filter=None): counters = [ @@ -1564,12 +1566,17 @@ def query(self, filter=None): MetricResult(k, v, v) for k, v in self._string_sets.items() if self.matches(filter, k) ] + bounded_tries = [ + MetricResult(k, v, v) for k, + v in self._bounded_tries.items() if self.matches(filter, k) + ] return { self.COUNTERS: counters, self.DISTRIBUTIONS: distributions, self.GAUGES: gauges, - self.STRINGSETS: string_sets + self.STRINGSETS: string_sets, + self.BOUNDED_TRIES: bounded_tries, } def monitoring_infos(self) -> List[metrics_pb2.MonitoringInfo]: diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index 1309e7c74abc..3f036ab27f6e 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -1209,13 +1209,14 @@ def expand(self, pcolls): pcoll_b = p | 'b' >> beam.Create(['b']) assert_that((pcoll_a, pcoll_b) | First(), equal_to(['a'])) - def test_metrics(self, check_gauge=True): + def test_metrics(self, check_gauge=True, check_bounded_trie=False): p = self.create_pipeline() counter = beam.metrics.Metrics.counter('ns', 'counter') distribution = beam.metrics.Metrics.distribution('ns', 'distribution') gauge = beam.metrics.Metrics.gauge('ns', 'gauge') string_set = beam.metrics.Metrics.string_set('ns', 'string_set') + bounded_trie = beam.metrics.Metrics.bounded_trie('ns', 'bounded_trie') elements = ['a', 'zzz'] pcoll = p | beam.Create(elements) @@ -1225,6 +1226,7 @@ def test_metrics(self, check_gauge=True): pcoll | 'dist' >> beam.FlatMap(lambda x: distribution.update(len(x))) pcoll | 'gauge' >> beam.FlatMap(lambda x: gauge.set(3)) pcoll | 'string_set' >> beam.FlatMap(lambda x: string_set.add(x)) + pcoll | 'bounded_trie' >> beam.FlatMap(lambda x: bounded_trie.add(tuple(x))) res = p.run() res.wait_until_finish() @@ -1248,6 +1250,14 @@ def test_metrics(self, check_gauge=True): .with_name('string_set'))['string_sets'] self.assertEqual(str_set.committed, set(elements)) + if check_bounded_trie: + bounded_trie, = res.metrics().query(beam.metrics.MetricsFilter() + .with_name('bounded_trie'))['bounded_tries'] + self.assertEqual(bounded_trie.committed.size(), 2) + for element in elements: + self.assertTrue( + bounded_trie.committed.contains(tuple(element)), element) + def test_callbacks_with_exception(self): elements_list = ['1', '2'] diff --git a/sdks/python/apache_beam/runners/portability/portable_metrics.py b/sdks/python/apache_beam/runners/portability/portable_metrics.py index 5bc3e0539181..e92d33910415 100644 --- a/sdks/python/apache_beam/runners/portability/portable_metrics.py +++ b/sdks/python/apache_beam/runners/portability/portable_metrics.py @@ -42,6 +42,7 @@ def from_monitoring_infos(monitoring_info_list, user_metrics_only=False): distributions = {} gauges = {} string_sets = {} + bounded_tries = {} for mi in monitoring_info_list: if (user_metrics_only and not monitoring_infos.is_user_monitoring_info(mi)): @@ -62,8 +63,10 @@ def from_monitoring_infos(monitoring_info_list, user_metrics_only=False): gauges[key] = metric_result elif monitoring_infos.is_string_set(mi): string_sets[key] = metric_result + elif monitoring_infos.is_bounded_trie(mi): + bounded_tries[key] = metric_result - return counters, distributions, gauges, string_sets + return counters, distributions, gauges, string_sets, bounded_tries def _create_metric_key(monitoring_info): diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index ba48bbec6d3a..fe9dcfa62b29 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -437,7 +437,7 @@ def _combine(committed, attempted, filter): ] def query(self, filter=None): - counters, distributions, gauges, stringsets = [ + counters, distributions, gauges, stringsets, bounded_tries = [ self._combine(x, y, filter) for x, y in zip(self.committed, self.attempted) ] @@ -446,7 +446,8 @@ def query(self, filter=None): self.COUNTERS: counters, self.DISTRIBUTIONS: distributions, self.GAUGES: gauges, - self.STRINGSETS: stringsets + self.STRINGSETS: stringsets, + self.BOUNDED_TRIES: bounded_tries, } diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index bc72d551f966..337ac9919487 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -231,6 +231,9 @@ def test_pack_combiners(self): "Requires Prism to support coder:" + " 'beam:coder:tuple:v1'. https://github.com/apache/beam/issues/32636") + def test_metrics(self): + super().test_metrics(check_bounded_trie=False) + # Inherits all other tests.