diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt index e4429b858..769f992d7 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt @@ -416,7 +416,6 @@ class ControlPlane private constructor( ExecutorServiceMetrics( executor, executorServiceName, - "envoy-control", Tags.of("executor", executorServiceName) ) .bindTo(meterRegistry) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt index 65ab9512d..f02434208 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt @@ -11,6 +11,9 @@ import io.micrometer.core.instrument.MeterRegistry import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlMetrics import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer +import pl.allegro.tech.servicemesh.envoycontrol.utils.reactorMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.watchMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.watchTypeTag import reactor.core.observability.micrometer.Micrometer import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink @@ -37,7 +40,8 @@ internal class GroupChangeWatcher( return groupsChanged .measureBuffer("group-change-watcher", meterRegistry) .checkpoint("group-change-watcher-emitted") - .name("group_change_watcher") + .name(reactorMetricName) + .tag(watchTypeTag, "group") .tap(Micrometer.metrics(meterRegistry)) .doOnSubscribe { logger.info("Watching group changes") diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt index 91e524135..afa4131d0 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt @@ -7,7 +7,9 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Compa import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer +import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured +import pl.allegro.tech.servicemesh.envoycontrol.utils.reactorMetricName import reactor.core.observability.micrometer.Micrometer import reactor.core.publisher.Flux import reactor.core.scheduler.Schedulers @@ -17,11 +19,10 @@ class GlobalStateChanges( private val meterRegistry: MeterRegistry, private val properties: SyncProperties ) { - private val scheduler = Micrometer.timedScheduler( + private val scheduler = Schedulers.newBoundedElastic( Int.MAX_VALUE, Int.MAX_VALUE, "global-service-changes-combinator" - ), meterRegistry, "schedulers", Tags.of("name", "global-service-changes-combinator") - ) + ) fun combined(): Flux { val clusterStatesStreams: List> = clusterStateChanges.map { it.stream() } @@ -45,10 +46,11 @@ class GlobalStateChanges( .toMultiClusterState() } .logSuppressedError("combineLatest() suppressed exception") - .measureBuffer("global-service-changes-combine-latest", meterRegistry) + .measureBuffer("global-service-changes-combinator", meterRegistry) .checkpoint("global-service-changes-emitted") + .name(reactorMetricName) + .tag(metricEmitterTag, "global-service-changes-combinator") .tap(Micrometer.metrics(meterRegistry)) - .name("global-service-changes-emitted").metrics() } // todo diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt index 1dc508495..d53e2a9cb 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt @@ -9,6 +9,9 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState import pl.allegro.tech.servicemesh.envoycontrol.utils.clusterTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.crossDcSyncCancelledMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.crossDcSyncSecondsMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.crossDcSyncTotalMetricName import pl.allegro.tech.servicemesh.envoycontrol.utils.errorsMetricName import pl.allegro.tech.servicemesh.envoycontrol.utils.operationTag import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag @@ -35,7 +38,7 @@ class RemoteServices( val aclFlux: Flux = Flux.create({ sink -> scheduler.scheduleWithFixedDelay({ meterRegistry.timer( - "cross_dc_synchronization.seconds", + crossDcSyncSecondsMetricName, Tags.of(operationTag, "get-multi-cluster-state") ) .recordCallable { @@ -44,7 +47,7 @@ class RemoteServices( }, 0, interval, TimeUnit.SECONDS) }, FluxSink.OverflowStrategy.LATEST) return aclFlux.doOnCancel { - meterRegistry.counter("cross_dc_synchronization.cancelled").increment() + meterRegistry.counter(crossDcSyncCancelledMetricName).increment() logger.warn("Cancelling cross dc sync") } } @@ -104,7 +107,7 @@ class RemoteServices( state: ServicesState ): ClusterState { meterRegistry.counter( - "cross_dc_synchronization.total", Tags.of(clusterTag, cluster) + crossDcSyncTotalMetricName, Tags.of(clusterTag, cluster) ) .increment() val clusterState = ClusterState( diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt index d01453d4b..b3cfbd495 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt @@ -9,9 +9,17 @@ const val reactorMetricName = "reactor" const val errorsMetricName = "errors.total" const val connectionsMetricName = "connections" const val requestsMetricName = "requests.total" +const val watchMetricName = "watch" +const val envoyControlWarmUpMetricName = "envoy.control.warmup.seconds" +const val crossDcSyncMetricName = "cross.dc.synchronization" +const val crossDcSyncCancelledMetricName = "$crossDcSyncMetricName.cancelled.total" +const val crossDcSyncSecondsMetricName = "$crossDcSyncMetricName.seconds" +const val crossDcSyncTotalMetricName = "$crossDcSyncMetricName.total" const val connectionTypeTag = "connection-type" const val streamTypeTag = "stream-type" +const val checkpointTag = "checkpoint" +const val watchTypeTag = "watch-type" const val discoveryReqTypeTag = "discovery-request-type" const val metricTypeTag = "metric-type" const val metricEmitterTag = "metric-emitter" diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt index 033037043..83847c457 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt @@ -41,6 +41,12 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.RegexServi import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.ServiceInstancesTransformer import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.EnvoyHttpFilters import pl.allegro.tech.servicemesh.envoycontrol.synchronization.GlobalStateChanges +import pl.allegro.tech.servicemesh.envoycontrol.utils.errorsMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.statusTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.streamTypeTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.watchMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.watchTypeTag import reactor.core.scheduler.Schedulers import java.net.URI @@ -173,14 +179,29 @@ class ControlPlaneConfig { ConsulClient(properties.host, properties.port).agentSelf.value?.config?.datacenter ?: "local" fun controlPlaneMetrics(meterRegistry: MeterRegistry): DefaultEnvoyControlMetrics { - val metricName = "watched-services" return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry).also { - meterRegistry.gauge(metricName, Tags.of("status", "added"), it.servicesAdded) - meterRegistry.gauge(metricName, Tags.of("status", "removed"), it.servicesRemoved) - meterRegistry.gauge(metricName, Tags.of("status", "instance-changed"), it.instanceChanges) - meterRegistry.gauge(metricName, Tags.of("status", "snapshot-changed"), it.snapshotChanges) + meterRegistry.gauge(watchMetricName, Tags.of(statusTag, "added", watchTypeTag, "service"), it.servicesAdded) + meterRegistry.gauge( + watchMetricName, + Tags.of(statusTag, "removed", watchTypeTag, "service"), + it.servicesRemoved + ) + meterRegistry.gauge( + watchMetricName, + Tags.of(statusTag, "instance-changed", watchTypeTag, "service"), + it.instanceChanges + ) + meterRegistry.gauge( + watchMetricName, + Tags.of(statusTag, "snapshot-changed", watchTypeTag, "service"), + it.snapshotChanges + ) meterRegistry.gauge("cache.groups.count", it.cacheGroupsCount) - it.meterRegistry.more().counter("services.watch.errors.total", listOf(), it.errorWatchingServices) + it.meterRegistry.more().counter( + errorsMetricName, + Tags.of(metricEmitterTag, watchMetricName, watchTypeTag, "service"), + it.errorWatchingServices + ) } } diff --git a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt index 86b7b36da..fa64b9eff 100644 --- a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt +++ b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt @@ -13,7 +13,12 @@ import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.server.ReadinessStateHandler import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState +import pl.allegro.tech.servicemesh.envoycontrol.utils.envoyControlWarmUpMetricName import pl.allegro.tech.servicemesh.envoycontrol.utils.measureDiscardedItems +import pl.allegro.tech.servicemesh.envoycontrol.utils.checkpointTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.reactorMetricName +import reactor.core.observability.micrometer.Micrometer import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink import java.time.Duration @@ -51,11 +56,14 @@ class ConsulServiceChanges( }, FluxSink.OverflowStrategy.LATEST ) - .measureDiscardedItems("consul-service-changes-emitted", metrics.meterRegistry) + .measureDiscardedItems("consul-service-changes", metrics.meterRegistry) .checkpoint("consul-service-changes-emitted") - .name("consul-service-changes-emitted").metrics() + .name(reactorMetricName) + .tag(metricEmitterTag, "consul-service-changes") + .tag(checkpointTag, "emitted") .checkpoint("consul-service-changes-emitted-distinct") - .name("consul-service-changes-emitted-distinct").metrics() + .tag(checkpointTag, "distinct") + .tap(Micrometer.metrics(metrics.meterRegistry)) .doOnCancel { logger.warn("Cancelling watching consul service changes") watcher.close() @@ -226,7 +234,7 @@ class ConsulServiceChanges( if (ready) { val stopTimer = System.currentTimeMillis() readinessStateHandler.ready() - metrics.meterRegistry.timer("envoy-control.warmup.seconds") + metrics.meterRegistry.timer(envoyControlWarmUpMetricName) .record( stopTimer - startTimer, TimeUnit.SECONDS diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt index 81e5c573c..9ad0822d9 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt @@ -233,7 +233,9 @@ interface MetricsDiscoveryServerCallbacksTest { // given val meterRegistry = envoyControl().app.meterRegistry() consul().server.operations.registerService(service(), name = "echo") - + for (meter in meterRegistry.meters) { + print(meter.toString()) + } // expect untilAsserted { expectedGrpcConnectionsGaugeValues().forEach { (type, value) ->