diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt index cccc757ed..720845943 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt @@ -4,21 +4,24 @@ import io.envoyproxy.controlplane.cache.SnapshotCache import io.envoyproxy.controlplane.cache.v3.Snapshot import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Tags +import io.micrometer.core.instrument.Timer import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.ADS import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS import pl.allegro.tech.servicemesh.envoycontrol.groups.Group import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState +import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.COMMUNICATION_MODE_ERROR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler +import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_ERROR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_GROUP_ERROR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_METRIC -import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_UPDATE_DURATION_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer @@ -62,7 +65,9 @@ class SnapshotUpdater( ) .measureBuffer("snapshot-updater-merged", meterRegistry, innerSources = 2) .checkpoint("snapshot-updater-merged") - .name("snapshot-updater-merged").metrics() + .name(SNAPSHOT_METRIC) + .tag(CHECKPOINT_TAG, "merged") + .metrics() // step 3: group updates don't provide a snapshot, // so we piggyback the last updated snapshot state for use .scan { previous: UpdateResult, newUpdate: UpdateResult -> @@ -99,19 +104,15 @@ class SnapshotUpdater( .publishOn(globalSnapshotScheduler) .measureBuffer("snapshot-updater-groups-published", meterRegistry) .checkpoint("snapshot-updater-groups-published") - .name("snapshot-updater-groups-published").metrics() + .name(SNAPSHOT_METRIC) + .tag(CHECKPOINT_TAG, "published") + .metrics() .map { groups -> UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups) } - .name(SNAPSHOT_METRIC) - .tag(METRIC_EMITTER_TAG, "snapshot-updater") - .tag(SNAPSHOT_STATUS_TAG, "published") - .tag(UPDATE_TRIGGER_TAG, "groups") - .metrics() .onErrorResume { e -> meterRegistry.counter( - SNAPSHOT_ERROR_METRIC, - Tags.of(UPDATE_TRIGGER_TAG, "groups", METRIC_EMITTER_TAG, "snapshot-updater") + SNAPSHOT_ERROR_METRIC, Tags.of(UPDATE_TRIGGER_TAG, "groups") ) .increment() logger.error("Unable to process new group", e) @@ -121,13 +122,17 @@ class SnapshotUpdater( internal fun services(states: Flux): Flux { return states - .name("snapshot-updater-services-sampled").metrics() + .name(SERVICES_STATE_METRIC) + .tag(CHECKPOINT_TAG, "sampled") + .metrics() .onBackpressureLatestMeasured("snapshot-updater-services-sampled", meterRegistry) // prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure .publishOn(globalSnapshotScheduler, 1) .measureBuffer("snapshot-updater-services-published", meterRegistry) .checkpoint("snapshot-updater-services-published") - .name("snapshot-updater-services-published").metrics() + .name(SERVICES_STATE_METRIC) + .tag(CHECKPOINT_TAG, "published") + .metrics() .createClusterConfigurations() .map { (states, clusters) -> var lastXdsSnapshot: GlobalSnapshot? = null @@ -155,8 +160,7 @@ class SnapshotUpdater( .filter { it != emptyUpdateResult } .onErrorResume { e -> meterRegistry.counter( - SNAPSHOT_ERROR_METRIC, - Tags.of(METRIC_EMITTER_TAG, "snapshot-updater", UPDATE_TRIGGER_TAG, "services") + SNAPSHOT_ERROR_METRIC, Tags.of(UPDATE_TRIGGER_TAG, "services") ).increment() logger.error("Unable to process service changes", e) Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES)) @@ -190,10 +194,13 @@ class SnapshotUpdater( } } + private val updateSnapshotForGroupsTimer = meterRegistry.timer(SNAPSHOT_UPDATE_DURATION_METRIC) + private fun updateSnapshotForGroups( groups: Collection, result: UpdateResult ): Mono { + val sample = Timer.start() versions.retainGroups(cache.groups()) val results = Flux.fromIterable(groups) .doOnNextScheduledOn(groupSnapshotScheduler) { group -> @@ -211,6 +218,7 @@ class SnapshotUpdater( } } return results.then(Mono.fromCallable { + sample.stop(updateSnapshotForGroupsTimer) result }) }