From 66ae24e70acc92aac8412cd81c61f836eac442b4 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Fri, 8 Nov 2024 17:08:47 +0100 Subject: [PATCH] opentelemetry-sdk: speed up exemplars a bit Use a sparse dict to allocate ExemplarsBucket on demand instead of preallocating all of them in FixedSizeExemplarReservoirABC. Make the following return around 2X more loops for both trace_based and always_off exemplars filter: .tox/benchmark-opentelemetry-sdk/bin/pytest opentelemetry-sdk/benchmarks/metrics/ -k 'test_histogram_record_1000[7]' --- .../_internal/exemplar/exemplar_reservoir.py | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py index c8fa7f1453..22d1ee9f75 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/exemplar/exemplar_reservoir.py @@ -13,8 +13,18 @@ # limitations under the License. from abc import ABC, abstractmethod +from collections import defaultdict from random import randrange -from typing import Any, Callable, Dict, List, Optional, Sequence, Union +from typing import ( + Any, + Callable, + Dict, + List, + Mapping, + Optional, + Sequence, + Union, +) from opentelemetry import trace from opentelemetry.context import Context @@ -155,9 +165,9 @@ class FixedSizeExemplarReservoirABC(ExemplarReservoir): def __init__(self, size: int, **kwargs) -> None: super().__init__(**kwargs) self._size: int = size - self._reservoir_storage: List[ExemplarBucket] = [ - ExemplarBucket() for _ in range(self._size) - ] + self._reservoir_storage: Mapping[int, ExemplarBucket] = defaultdict( + ExemplarBucket + ) def collect(self, point_attributes: Attributes) -> List[Exemplar]: """Returns accumulated Exemplars and also resets the reservoir for the next @@ -171,15 +181,16 @@ def collect(self, point_attributes: Attributes) -> List[Exemplar]: exemplars contain the attributes that were filtered out by the aggregator, but recorded alongside the original measurement. """ - exemplars = filter( - lambda e: e is not None, - map( - lambda bucket: bucket.collect(point_attributes), - self._reservoir_storage, - ), - ) + exemplars = [ + e + for e in ( + bucket.collect(point_attributes) + for _, bucket in sorted(self._reservoir_storage.items()) + ) + if e is not None + ] self._reset() - return [*exemplars] + return exemplars def offer( self,