Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify metrics in preparation for adding another metric type. #33195

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 5 additions & 22 deletions sdks/python/apache_beam/internal/metrics/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from typing import TYPE_CHECKING
from typing import Optional

from apache_beam.metrics.cells import MetricAggregator
from apache_beam.metrics.cells import MetricCell
from apache_beam.metrics.cells import MetricCellFactory
from apache_beam.utils.histogram import Histogram
Expand All @@ -50,10 +49,10 @@ class HistogramCell(MetricCell):
"""
def __init__(self, bucket_type):
self._bucket_type = bucket_type
self.data = HistogramAggregator(bucket_type).identity_element()
self.data = HistogramData.identity_element(bucket_type)

def reset(self):
self.data = HistogramAggregator(self._bucket_type).identity_element()
self.data = HistogramData.identity_element(self._bucket_type)

def combine(self, other: 'HistogramCell') -> 'HistogramCell':
result = HistogramCell(self._bucket_type)
Expand Down Expand Up @@ -148,22 +147,6 @@ def combine(self, other: Optional['HistogramData']) -> 'HistogramData':

return HistogramData(self.histogram.combine(other.histogram))


class HistogramAggregator(MetricAggregator):
"""For internal use only; no backwards-compatibility guarantees.

Aggregator for Histogram metric data during pipeline execution.

Values aggregated should be ``HistogramData`` objects.
"""
def __init__(self, bucket_type: 'BucketType') -> None:
self._bucket_type = bucket_type

def identity_element(self) -> HistogramData:
return HistogramData(Histogram(self._bucket_type))

def combine(self, x: HistogramData, y: HistogramData) -> HistogramData:
return x.combine(y)

def result(self, x: HistogramData) -> HistogramResult:
return HistogramResult(x.get_cumulative())
@staticmethod
def identity_element(bucket_type) -> 'HistogramData':
return HistogramData(Histogram(bucket_type))
15 changes: 10 additions & 5 deletions sdks/python/apache_beam/metrics/cells.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,26 @@ cdef class CounterCell(MetricCell):
cpdef bint update(self, value) except -1


# Not using AbstractMetricCell so that data can be typed.
cdef class DistributionCell(MetricCell):
cdef readonly DistributionData data

@cython.locals(ivalue=libc.stdint.int64_t)
cdef inline bint _update(self, value) except -1


cdef class GaugeCell(MetricCell):
cdef readonly object data
cdef class AbstractMetricCell(MetricCell):
cdef readonly object data_class
cdef public object data
cdef bint _update_locked(self, value) except -1


cdef class StringSetCell(MetricCell):
cdef readonly object data
cdef class GaugeCell(AbstractMetricCell):
pass

cdef inline bint _update(self, value) except -1

cdef class StringSetCell(AbstractMetricCell):
pass


cdef class DistributionData(object):
Expand Down
Loading
Loading