From 736d27cb71a2b1c9a95a44298b403ba72f5db523 Mon Sep 17 00:00:00 2001 From: "nastassia.dailidava" Date: Sat, 19 Oct 2024 16:58:28 +0200 Subject: [PATCH] allegro-internal/flex-roadmap#819 rolled back reactor metrics --- .../envoycontrol/snapshot/SnapshotUpdater.kt | 37 +++++-------------- .../synchronization/GlobalStateChanges.kt | 16 ++------ .../RemoteClusterStateChanges.kt | 6 +-- .../consul/services/ConsulServiceChanges.kt | 12 ++---- 4 files changed, 17 insertions(+), 54 deletions(-) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt index 2ca421461..bc32cdd46 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt @@ -14,11 +14,8 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC -import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer @@ -60,13 +57,9 @@ class SnapshotUpdater( // step 2: only watches groups. if groups change we use the last services state and update those groups groups().subscribeOn(globalSnapshotScheduler) ) - .measureBuffer("snapshot-updater", meterRegistry, innerSources = 2) + .measureBuffer("snapshot-updater-merged", meterRegistry, innerSources = 2) .checkpoint("snapshot-updater-merged") - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "snapshot-updater") - .tag(SNAPSHOT_STATUS_TAG, "merged") - .tag(UPDATE_TRIGGER_TAG, "global") - .metrics() + .name("snapshot-updater-merged").metrics() // step 3: group updates don't provide a snapshot, // so we piggyback the last updated snapshot state for use .scan { previous: UpdateResult, newUpdate: UpdateResult -> @@ -101,16 +94,12 @@ class SnapshotUpdater( // see GroupChangeWatcher return onGroupAdded .publishOn(globalSnapshotScheduler) - .measureBuffer("snapshot-updater", meterRegistry) + .measureBuffer("snapshot-updater-groups-published", meterRegistry) .checkpoint("snapshot-updater-groups-published") + .name("snapshot-updater-groups-published").metrics() .map { groups -> UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups) } - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "snapshot-updater") - .tag(SNAPSHOT_STATUS_TAG, "published") - .tag(UPDATE_TRIGGER_TAG, "groups") - .metrics() .onErrorResume { e -> meterRegistry.counter( ERRORS_TOTAL_METRIC, @@ -124,18 +113,13 @@ class SnapshotUpdater( internal fun services(states: Flux): Flux { return states - .name(REACTOR_METRIC) - .tag(UPDATE_TRIGGER_TAG, "services") - .tag(STATUS_TAG, "sampled") - .onBackpressureLatestMeasured("snapshot-updater", meterRegistry) + .name("snapshot-updater-services-sampled").metrics() + .onBackpressureLatestMeasured("snapshot-updater-services-sampled", meterRegistry) // prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure .publishOn(globalSnapshotScheduler, 1) - .measureBuffer("snapshot-updater", meterRegistry) + .measureBuffer("snapshot-updater-services-published", meterRegistry) .checkpoint("snapshot-updater-services-published") - .name(REACTOR_METRIC) - .tag(UPDATE_TRIGGER_TAG, "services") - .tag(STATUS_TAG, "published") - .metrics() + .name("snapshot-updater-services-published").metrics() .createClusterConfigurations() .map { (states, clusters) -> var lastXdsSnapshot: GlobalSnapshot? = null @@ -162,10 +146,7 @@ class SnapshotUpdater( } .filter { it != emptyUpdateResult } .onErrorResume { e -> - meterRegistry.counter( - ERRORS_TOTAL_METRIC, - Tags.of(METRIC_EMITTER_TAG, "snapshot-updater", UPDATE_TRIGGER_TAG, "services") - ).increment() + meterRegistry.counter("snapshot-updater.services.updates.errors").increment() logger.error("Unable to process service changes", e) Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES)) } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt index 912b6f57f..30f0d1a70 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt @@ -4,9 +4,6 @@ import io.micrometer.core.instrument.MeterRegistry import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState -import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured @@ -45,11 +42,9 @@ class GlobalStateChanges( .toMultiClusterState() } .logSuppressedError("combineLatest() suppressed exception") - .measureBuffer("global-service-changes-combinator", meterRegistry) + .measureBuffer("global-service-changes-combine-latest", meterRegistry) .checkpoint("global-service-changes-emitted") - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "global-service-changes-combinator") - .metrics() + .name("global-service-changes-emitted").metrics() } private fun combinedExperimentalFlow( @@ -76,13 +71,10 @@ class GlobalStateChanges( .logSuppressedError("combineLatest() suppressed exception") .measureBuffer("global-service-changes-combine-latest", meterRegistry) .checkpoint("global-service-changes-emitted") - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "global-service-changes") - .tag(CHECKPOINT_TAG, "emitted") + .name("global-service-changes-emitted").metrics() .onBackpressureLatestMeasured("global-service-changes-backpressure", meterRegistry) .publishOn(scheduler, 1) .checkpoint("global-service-changes-published") - .tag(CHECKPOINT_TAG, "published") - .metrics() + .name("global-service-changes-published").metrics() } } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt index ee85877b8..6ef67a998 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt @@ -3,8 +3,6 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlProperties import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState -import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import reactor.core.publisher.Flux class RemoteClusterStateChanges( @@ -16,7 +14,5 @@ class RemoteClusterStateChanges( .getChanges(properties.sync.pollingInterval) .startWith(MultiClusterState.empty()) .distinctUntilChanged() - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "cross-dc-synchronisation") - .metrics() + .name("cross-dc-changes-distinct").metrics() } diff --git a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt index 39b01cc79..886c4d8ac 100644 --- a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt +++ b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt @@ -15,9 +15,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState import pl.allegro.tech.servicemesh.envoycontrol.utils.ENVOY_CONTROL_WARM_UP_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.measureDiscardedItems -import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink import java.time.Duration @@ -55,14 +52,11 @@ class ConsulServiceChanges( }, FluxSink.OverflowStrategy.LATEST ) - .measureDiscardedItems("consul-service-changes", metrics.meterRegistry) + .measureDiscardedItems("consul-service-changes-emitted", metrics.meterRegistry) .checkpoint("consul-service-changes-emitted") - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "consul-service-changes") - .tag(CHECKPOINT_TAG, "emitted") + .name("consul-service-changes-emitted").metrics() .checkpoint("consul-service-changes-emitted-distinct") - .tag(CHECKPOINT_TAG, "distinct") - .metrics() + .name("consul-service-changes-emitted-distinct").metrics() .doOnCancel { logger.warn("Cancelling watching consul service changes") watcher.close()