diff --git a/sdks/python/apache_beam/internal/metrics/cells.py b/sdks/python/apache_beam/internal/metrics/cells.py index c7b546258a70..989dc7183045 100644 --- a/sdks/python/apache_beam/internal/metrics/cells.py +++ b/sdks/python/apache_beam/internal/metrics/cells.py @@ -28,7 +28,6 @@ from typing import TYPE_CHECKING from typing import Optional -from apache_beam.metrics.cells import MetricAggregator from apache_beam.metrics.cells import MetricCell from apache_beam.metrics.cells import MetricCellFactory from apache_beam.utils.histogram import Histogram @@ -50,10 +49,10 @@ class HistogramCell(MetricCell): """ def __init__(self, bucket_type): self._bucket_type = bucket_type - self.data = HistogramAggregator(bucket_type).identity_element() + self.data = HistogramData.identity_element(bucket_type) def reset(self): - self.data = HistogramAggregator(self._bucket_type).identity_element() + self.data = HistogramData.identity_element(self._bucket_type) def combine(self, other: 'HistogramCell') -> 'HistogramCell': result = HistogramCell(self._bucket_type) @@ -148,22 +147,6 @@ def combine(self, other: Optional['HistogramData']) -> 'HistogramData': return HistogramData(self.histogram.combine(other.histogram)) - -class HistogramAggregator(MetricAggregator): - """For internal use only; no backwards-compatibility guarantees. - - Aggregator for Histogram metric data during pipeline execution. - - Values aggregated should be ``HistogramData`` objects. - """ - def __init__(self, bucket_type: 'BucketType') -> None: - self._bucket_type = bucket_type - - def identity_element(self) -> HistogramData: - return HistogramData(Histogram(self._bucket_type)) - - def combine(self, x: HistogramData, y: HistogramData) -> HistogramData: - return x.combine(y) - - def result(self, x: HistogramData) -> HistogramResult: - return HistogramResult(x.get_cumulative()) + @staticmethod + def identity_element(bucket_type) -> 'HistogramData': + return HistogramData(Histogram(bucket_type)) diff --git a/sdks/python/apache_beam/metrics/cells.pxd b/sdks/python/apache_beam/metrics/cells.pxd index 98bb5eff0977..c583dabeb0c0 100644 --- a/sdks/python/apache_beam/metrics/cells.pxd +++ b/sdks/python/apache_beam/metrics/cells.pxd @@ -33,6 +33,7 @@ cdef class CounterCell(MetricCell): cpdef bint update(self, value) except -1 +# Not using AbstractMetricCell so that data can be typed. cdef class DistributionCell(MetricCell): cdef readonly DistributionData data @@ -40,14 +41,18 @@ cdef class DistributionCell(MetricCell): cdef inline bint _update(self, value) except -1 -cdef class GaugeCell(MetricCell): - cdef readonly object data +cdef class AbstractMetricCell(MetricCell): + cdef readonly object data_class + cdef public object data + cdef bint _update_locked(self, value) except -1 -cdef class StringSetCell(MetricCell): - cdef readonly object data +cdef class GaugeCell(AbstractMetricCell): + pass - cdef inline bint _update(self, value) except -1 + +cdef class StringSetCell(AbstractMetricCell): + pass cdef class DistributionData(object): diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index 63fc9f3f7cc9..10ac7b3a1e69 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -27,11 +27,9 @@ import threading import time from datetime import datetime -from typing import Any from typing import Iterable from typing import Optional from typing import Set -from typing import SupportsInt try: import cython @@ -43,11 +41,7 @@ class fake_cython: globals()['cython'] = fake_cython __all__ = [ - 'MetricAggregator', - 'MetricCell', - 'MetricCellFactory', - 'DistributionResult', - 'GaugeResult' + 'MetricCell', 'MetricCellFactory', 'DistributionResult', 'GaugeResult' ] _LOGGER = logging.getLogger(__name__) @@ -110,11 +104,11 @@ class CounterCell(MetricCell): """ def __init__(self, *args): super().__init__(*args) - self.value = CounterAggregator.identity_element() + self.value = 0 def reset(self): # type: () -> None - self.value = CounterAggregator.identity_element() + self.value = 0 def combine(self, other): # type: (CounterCell) -> CounterCell @@ -175,11 +169,11 @@ class DistributionCell(MetricCell): """ def __init__(self, *args): super().__init__(*args) - self.data = DistributionAggregator.identity_element() + self.data = DistributionData.identity_element() def reset(self): # type: () -> None - self.data = DistributionAggregator.identity_element() + self.data = DistributionData.identity_element() def combine(self, other): # type: (DistributionCell) -> DistributionCell @@ -221,47 +215,65 @@ def to_runner_api_monitoring_info_impl(self, name, transform_id): ptransform=transform_id) -class GaugeCell(MetricCell): +class AbstractMetricCell(MetricCell): """For internal use only; no backwards-compatibility guarantees. - Tracks the current value and delta for a gauge metric. - - Each cell tracks the state of a metric independently per context per bundle. - Therefore, each metric has a different cell in each bundle, that is later - aggregated. + Tracks the current value and delta for a metric with a data class. This class is thread safe. """ - def __init__(self, *args): - super().__init__(*args) - self.data = GaugeAggregator.identity_element() + def __init__(self, data_class): + super().__init__() + self.data_class = data_class + self.data = self.data_class.identity_element() def reset(self): - self.data = GaugeAggregator.identity_element() + self.data = self.data_class.identity_element() - def combine(self, other): - # type: (GaugeCell) -> GaugeCell - result = GaugeCell() + def combine(self, other: 'AbstractMetricCell') -> 'AbstractMetricCell': + result = type(self)() # type: ignore[call-arg] result.data = self.data.combine(other.data) return result def set(self, value): - self.update(value) + with self._lock: + self._update_locked(value) def update(self, value): - # type: (SupportsInt) -> None - value = int(value) with self._lock: - # Set the value directly without checking timestamp, because - # this value is naturally the latest value. - self.data.value = value - self.data.timestamp = time.time() + self._update_locked(value) + + def _update_locked(self, value): + raise NotImplementedError(type(self)) def get_cumulative(self): - # type: () -> GaugeData with self._lock: return self.data.get_cumulative() + def to_runner_api_monitoring_info_impl(self, name, transform_id): + raise NotImplementedError(type(self)) + + +class GaugeCell(AbstractMetricCell): + """For internal use only; no backwards-compatibility guarantees. + + Tracks the current value and delta for a gauge metric. + + Each cell tracks the state of a metric independently per context per bundle. + Therefore, each metric has a different cell in each bundle, that is later + aggregated. + + This class is thread safe. + """ + def __init__(self): + super().__init__(GaugeData) + + def _update_locked(self, value): + # Set the value directly without checking timestamp, because + # this value is naturally the latest value. + self.data.value = int(value) + self.data.timestamp = time.time() + def to_runner_api_monitoring_info_impl(self, name, transform_id): from apache_beam.metrics import monitoring_infos return monitoring_infos.int64_user_gauge( @@ -271,7 +283,7 @@ def to_runner_api_monitoring_info_impl(self, name, transform_id): ptransform=transform_id) -class StringSetCell(MetricCell): +class StringSetCell(AbstractMetricCell): """For internal use only; no backwards-compatibility guarantees. Tracks the current value for a StringSet metric. @@ -282,50 +294,23 @@ class StringSetCell(MetricCell): This class is thread safe. """ - def __init__(self, *args): - super().__init__(*args) - self.data = StringSetAggregator.identity_element() + def __init__(self): + super().__init__(StringSetData) def add(self, value): self.update(value) - def update(self, value): - # type: (str) -> None - if cython.compiled: - # We will hold the GIL throughout the entire _update. - self._update(value) - else: - with self._lock: - self._update(value) - - def _update(self, value): + def _update_locked(self, value): self.data.add(value) - def get_cumulative(self): - # type: () -> StringSetData - with self._lock: - return self.data.get_cumulative() - - def combine(self, other): - # type: (StringSetCell) -> StringSetCell - combined = StringSetAggregator().combine(self.data, other.data) - result = StringSetCell() - result.data = combined - return result - def to_runner_api_monitoring_info_impl(self, name, transform_id): from apache_beam.metrics import monitoring_infos - return monitoring_infos.user_set_string( name.namespace, name.name, self.get_cumulative(), ptransform=transform_id) - def reset(self): - # type: () -> None - self.data = StringSetAggregator.identity_element() - class DistributionResult(object): """The result of a Distribution metric.""" @@ -449,6 +434,10 @@ def get_cumulative(self): # type: () -> GaugeData return GaugeData(self.value, timestamp=self.timestamp) + def get_result(self): + # type: () -> GaugeResult + return GaugeResult(self.get_cumulative()) + def combine(self, other): # type: (Optional[GaugeData]) -> GaugeData if other is None: @@ -464,6 +453,11 @@ def singleton(value, timestamp=None): # type: (Optional[int], Optional[int]) -> GaugeData return GaugeData(value, timestamp=timestamp) + @staticmethod + def identity_element(): + # type: () -> GaugeData + return GaugeData(0, timestamp=0) + class DistributionData(object): """For internal use only; no backwards-compatibility guarantees. @@ -510,6 +504,9 @@ def get_cumulative(self): # type: () -> DistributionData return DistributionData(self.sum, self.count, self.min, self.max) + def get_result(self) -> DistributionResult: + return DistributionResult(self.get_cumulative()) + def combine(self, other): # type: (Optional[DistributionData]) -> DistributionData if other is None: @@ -526,6 +523,11 @@ def singleton(value): # type: (int) -> DistributionData return DistributionData(value, 1, value, value) + @staticmethod + def identity_element(): + # type: () -> DistributionData + return DistributionData(0, 0, 2**63 - 1, -2**63) + class StringSetData(object): """For internal use only; no backwards-compatibility guarantees. @@ -568,6 +570,9 @@ def __repr__(self) -> str: def get_cumulative(self) -> "StringSetData": return StringSetData(set(self.string_set), self.string_size) + def get_result(self) -> set[str]: + return set(self.string_set) + def add(self, *strings): """ Add strings into this StringSetData and return the result StringSetData. @@ -585,6 +590,11 @@ def combine(self, other: "StringSetData") -> "StringSetData": if other is None: return self + if not other.string_set: + return self + elif not self.string_set: + return other + combined = set(self.string_set) string_size = self.add_until_capacity( combined, self.string_size, other.string_set) @@ -614,113 +624,9 @@ def add_until_capacity( return current_size @staticmethod - def singleton(value): - # type: (int) -> DistributionData - return DistributionData(value, 1, value, value) - - -class MetricAggregator(object): - """For internal use only; no backwards-compatibility guarantees. - - Base interface for aggregating metric data during pipeline execution.""" - def identity_element(self): - # type: () -> Any - - """Returns the identical element of an Aggregation. - - For the identity element, it must hold that - Aggregator.combine(any_element, identity_element) == any_element. - """ - raise NotImplementedError + def singleton(value: str) -> "StringSetData": + return StringSetData({value}) - def combine(self, x, y): - # type: (Any, Any) -> Any - raise NotImplementedError - - def result(self, x): - # type: (Any) -> Any - raise NotImplementedError - - -class CounterAggregator(MetricAggregator): - """For internal use only; no backwards-compatibility guarantees. - - Aggregator for Counter metric data during pipeline execution. - - Values aggregated should be ``int`` objects. - """ @staticmethod - def identity_element(): - # type: () -> int - return 0 - - def combine(self, x, y): - # type: (SupportsInt, SupportsInt) -> int - return int(x) + int(y) - - def result(self, x): - # type: (SupportsInt) -> int - return int(x) - - -class DistributionAggregator(MetricAggregator): - """For internal use only; no backwards-compatibility guarantees. - - Aggregator for Distribution metric data during pipeline execution. - - Values aggregated should be ``DistributionData`` objects. - """ - @staticmethod - def identity_element(): - # type: () -> DistributionData - return DistributionData(0, 0, 2**63 - 1, -2**63) - - def combine(self, x, y): - # type: (DistributionData, DistributionData) -> DistributionData - return x.combine(y) - - def result(self, x): - # type: (DistributionData) -> DistributionResult - return DistributionResult(x.get_cumulative()) - - -class GaugeAggregator(MetricAggregator): - """For internal use only; no backwards-compatibility guarantees. - - Aggregator for Gauge metric data during pipeline execution. - - Values aggregated should be ``GaugeData`` objects. - """ - @staticmethod - def identity_element(): - # type: () -> GaugeData - return GaugeData(0, timestamp=0) - - def combine(self, x, y): - # type: (GaugeData, GaugeData) -> GaugeData - result = x.combine(y) - return result - - def result(self, x): - # type: (GaugeData) -> GaugeResult - return GaugeResult(x.get_cumulative()) - - -class StringSetAggregator(MetricAggregator): - @staticmethod - def identity_element(): - # type: () -> StringSetData + def identity_element() -> "StringSetData": return StringSetData() - - def combine(self, x, y): - # type: (StringSetData, StringSetData) -> StringSetData - if len(x.string_set) == 0: - return y - elif len(y.string_set) == 0: - return x - else: - return x.combine(y) - - def result(self, x): - # type: (StringSetData) -> set - return set(x.string_set) diff --git a/sdks/python/apache_beam/runners/direct/direct_metrics.py b/sdks/python/apache_beam/runners/direct/direct_metrics.py index f715ce3bf521..d20849d769af 100644 --- a/sdks/python/apache_beam/runners/direct/direct_metrics.py +++ b/sdks/python/apache_beam/runners/direct/direct_metrics.py @@ -24,23 +24,84 @@ import threading from collections import defaultdict +from typing import Any +from typing import SupportsInt -from apache_beam.metrics.cells import CounterAggregator -from apache_beam.metrics.cells import DistributionAggregator -from apache_beam.metrics.cells import GaugeAggregator -from apache_beam.metrics.cells import StringSetAggregator +from apache_beam.metrics.cells import DistributionData +from apache_beam.metrics.cells import GaugeData +from apache_beam.metrics.cells import StringSetData from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.execution import MetricResult from apache_beam.metrics.metric import MetricResults +class MetricAggregator(object): + """For internal use only; no backwards-compatibility guarantees. + + Base interface for aggregating metric data during pipeline execution.""" + def identity_element(self): + # type: () -> Any + + """Returns the identical element of an Aggregation. + + For the identity element, it must hold that + Aggregator.combine(any_element, identity_element) == any_element. + """ + raise NotImplementedError + + def combine(self, x, y): + # type: (Any, Any) -> Any + raise NotImplementedError + + def result(self, x): + # type: (Any) -> Any + raise NotImplementedError + + +class CounterAggregator(MetricAggregator): + """For internal use only; no backwards-compatibility guarantees. + + Aggregator for Counter metric data during pipeline execution. + + Values aggregated should be ``int`` objects. + """ + @staticmethod + def identity_element(): + # type: () -> int + return 0 + + def combine(self, x, y): + # type: (SupportsInt, SupportsInt) -> int + return int(x) + int(y) + + def result(self, x): + # type: (SupportsInt) -> int + return int(x) + + +class GenericAggregator(MetricAggregator): + def __init__(self, data_class): + self._data_class = data_class + + def identity_element(self): + return self._data_class.identity_element() + + def combine(self, x, y): + return x.combine(y) + + def result(self, x): + return x.get_result() + + class DirectMetrics(MetricResults): def __init__(self): self._counters = defaultdict(lambda: DirectMetric(CounterAggregator())) self._distributions = defaultdict( - lambda: DirectMetric(DistributionAggregator())) - self._gauges = defaultdict(lambda: DirectMetric(GaugeAggregator())) - self._string_sets = defaultdict(lambda: DirectMetric(StringSetAggregator())) + lambda: DirectMetric(GenericAggregator(DistributionData))) + self._gauges = defaultdict( + lambda: DirectMetric(GenericAggregator(GaugeData))) + self._string_sets = defaultdict( + lambda: DirectMetric(GenericAggregator(StringSetData))) def _apply_operation(self, bundle, updates, op): for k, v in updates.counters.items():