Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 Migrated metrics to prometheus (rem…
Browse files Browse the repository at this point in the history
…oved reactor micrometer)
  • Loading branch information
nastassia-dailidava committed Oct 7, 2024
1 parent 1b340fa commit 33cc5f5
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_TYPE_TAG
import reactor.core.observability.micrometer.Micrometer
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import java.util.function.Consumer
Expand Down Expand Up @@ -41,7 +40,7 @@ internal class GroupChangeWatcher(
.checkpoint("group-change-watcher-emitted")
.name(REACTOR_METRIC)
.tag(WATCH_TYPE_TAG, "group")
.tap(Micrometer.metrics(meterRegistry))
.metrics()
.doOnSubscribe {
logger.info("Watching group changes")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,20 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler
import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.noopTimer
import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG
import reactor.core.observability.micrometer.Micrometer
import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.noopTimer
import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
Expand Down Expand Up @@ -67,7 +66,7 @@ class SnapshotUpdater(
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "merged")
.tag(UPDATE_TRIGGER_TAG, "global")
.tap(Micrometer.metrics(meterRegistry))
.metrics()
// step 3: group updates don't provide a snapshot,
// so we piggyback the last updated snapshot state for use
.scan { previous: UpdateResult, newUpdate: UpdateResult ->
Expand Down Expand Up @@ -111,7 +110,7 @@ class SnapshotUpdater(
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "published")
.tag(UPDATE_TRIGGER_TAG, "groups")
.tap(Micrometer.metrics(meterRegistry))
.metrics()
.onErrorResume { e ->
meterRegistry.counter(
ERRORS_TOTAL_METRIC,
Expand All @@ -136,7 +135,7 @@ class SnapshotUpdater(
.name(REACTOR_METRIC)
.tag(UPDATE_TRIGGER_TAG, "services")
.tag(STATUS_TAG, "published")
.tap(Micrometer.metrics(meterRegistry))
.metrics()
.createClusterConfigurations()
.map { (states, clusters) ->
var lastXdsSnapshot: GlobalSnapshot? = null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package pl.allegro.tech.servicemesh.envoycontrol.synchronization

import io.micrometer.core.instrument.MeterRegistry
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import reactor.core.observability.micrometer.Micrometer
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers

Expand Down Expand Up @@ -50,7 +49,7 @@ class GlobalStateChanges(
.checkpoint("global-service-changes-emitted")
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "global-service-changes-combinator")
.tap(Micrometer.metrics(meterRegistry))
.metrics()
}

private fun combinedExperimentalFlow(
Expand Down Expand Up @@ -84,6 +83,6 @@ class GlobalStateChanges(
.publishOn(scheduler, 1)
.checkpoint("global-service-changes-published")
.tag(CHECKPOINT_TAG, "published")
.tap(Micrometer.metrics(meterRegistry))
.metrics()
}
}

0 comments on commit 33cc5f5

Please sign in to comment.