diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index 8f3d7bba22ad..8fd8457489ae 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -811,6 +811,9 @@ def contains(self, value): else: return False + def flattened(self): + return self.as_trie().flattened() + def to_proto(self) -> metrics_pb2.BoundedTrie: return metrics_pb2.BoundedTrie( bound=self._bound, diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 15f6b53c78e0..9cf42370f4b1 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -456,14 +456,18 @@ def add_raw(self, *rollup_segments: str) -> None: self.metric.add(rollup_segments) @staticmethod - def query(results: MetricResults, label: str) -> Set[str]: + def query(results: MetricResults, + label: str, + truncated_marker: str = '*') -> Set[str]: if not label in Lineage._METRICS: raise ValueError("Label {} does not exist for Lineage", label) response = results.query( MetricsFilter().with_namespace(Lineage.LINEAGE_NAMESPACE).with_name( - label))[MetricResults.STRINGSETS] + label))[MetricResults.BOUNDED_TRIES] result = set() for metric in response: - result.update(metric.committed) - result.update(metric.attempted) + for fqn in metric.committed.flattened(): + result.add(''.join(fqn[:-1]) + (truncated_marker if fqn[-1] else '')) + for fqn in metric.attempted.flattened(): + result.add(''.join(fqn[:-1]) + (truncated_marker if fqn[-1] else '')) return result