Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 returned all other metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Oct 20, 2024
1 parent c191571 commit 8ea396a
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTIONS_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.DISCOVERY_REQ_TYPE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REQUESTS_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.STREAM_TYPE_TAG
import java.util.concurrent.atomic.AtomicInteger
import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest
Expand Down Expand Up @@ -58,14 +60,30 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)
}

override fun onV3StreamRequest(streamId: Long, request: V3DiscoveryRequest) {
// noop
meterRegistry.counter(
REQUESTS_METRIC,
Tags.of(
CONNECTION_TYPE_TAG, "grpc",
STREAM_TYPE_TAG, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(),
DISCOVERY_REQ_TYPE_TAG, "total"
)
)
.increment()
}

override fun onV3StreamDeltaRequest(
streamId: Long,
request: V3DeltaDiscoveryRequest
) {
// noop
meterRegistry.counter(
REQUESTS_METRIC,
Tags.of(
CONNECTION_TYPE_TAG, "grpc",
STREAM_TYPE_TAG, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(),
DISCOVERY_REQ_TYPE_TAG, "delta"
)
)
.increment()
}

override fun onStreamCloseWithError(streamId: Long, typeUrl: String, error: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.COMMUNICATION_MODE_ERROR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
Expand Down Expand Up @@ -186,8 +185,7 @@ class SnapshotUpdater(
SNAPSHOT_GROUP_ERROR_METRIC,
Tags.of(
SERVICE_TAG, group.serviceName,
OPERATION_TAG, "create-snapshot",
METRIC_EMITTER_TAG, "snapshot-updater"
OPERATION_TAG, "create-snapshot"
)
).increment()
logger.error("Unable to create snapshot for group ${group.serviceName}", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_CANCELLED_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_SECONDS_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_TOTAL_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_ERRORS_METRIC
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -75,8 +74,7 @@ class RemoteServices(
SERVICES_STATE_ERRORS_METRIC,
Tags.of(
CLUSTER_TAG, cluster,
OPERATION_TAG, "get-state",
METRIC_EMITTER_TAG, "cross-dc-synchronization"
OPERATION_TAG, "get-state"
)
).increment()
logger.warn("Error synchronizing instances ${it.message}", it)
Expand All @@ -93,8 +91,7 @@ class RemoteServices(
SERVICES_STATE_ERRORS_METRIC,
Tags.of(
CLUSTER_TAG, cluster,
OPERATION_TAG, "get-instances",
METRIC_EMITTER_TAG, "cross-dc-synchronization"
OPERATION_TAG, "get-instances"
)
).increment()
logger.warn("Failed fetching instances from $cluster", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ import io.micrometer.core.instrument.noop.NoopTimer

val noopTimer = NoopTimer(Meter.Id("", Tags.empty(), null, null, Meter.Type.TIMER))
const val REACTOR_METRIC = "reactor.stats"
const val REACTOR_TOTAL_METRIC = "reactor.stats.total"
const val SERVICES_STATE_METRIC = "services.state"
const val SERVICES_STATE_ERRORS_METRIC = "services.state.errors"
const val SERVICES_STATE_ERRORS_METRIC = "services.state.errors.total"
const val SNAPSHOT_METRIC = "snapshot"
const val SNAPSHOT_UPDATE_DURATION_METRIC = "snapshot.update.duration.seconds"
const val SNAPSHOT_ERROR_METRIC = "snapshot.errors"
const val SNAPSHOT_GROUP_ERROR_METRIC = "snapshot.group.errors"
const val WATCH_ERRORS_METRIC = "watch.errors.total"
const val SNAPSHOT_GROUP_ERROR_METRIC = "snapshot.group.errors.total"
const val COMMUNICATION_MODE_ERROR_METRIC = "communication.errors.total"
const val CONNECTIONS_METRIC = "connections.stats"
const val REQUESTS_METRIC = "stream.requests"
const val WATCH_METRIC = "watch.stats"
const val REQUESTS_METRIC = "requests.stats"
const val WATCH_ERRORS_METRIC = "service.watch.errors.total"
const val WATCH_METRIC = "service.watch"
const val ENVOY_CONTROL_WARM_UP_METRIC = "envoy.control.warmup.seconds"
const val CROSS_DC_SYNC_METRIC = "cross.dc.synchronization"
const val CROSS_DC_SYNC_CANCELLED_METRIC = "$CROSS_DC_SYNC_METRIC.cancelled.total"
Expand All @@ -35,7 +36,6 @@ const val WATCH_TYPE_TAG = "watch-type"
const val DISCOVERY_REQ_TYPE_TAG = "discovery-request-type"
const val METRIC_TYPE_TAG = "metric-type"
const val METRIC_EMITTER_TAG = "metric-emitter"
const val SNAPSHOT_STATUS_TAG = "snapshot-status"
const val UPDATE_TRIGGER_TAG = "update-trigger"
const val SERVICE_TAG = "service"
const val OPERATION_TAG = "operation"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.servicemesh.envoycontrol.utils

import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import org.reactivestreams.Subscription
import org.slf4j.LoggerFactory
import reactor.core.Disposable
Expand All @@ -11,6 +12,7 @@ import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.time.Duration
import java.util.concurrent.TimeUnit
import kotlin.streams.asSequence

private val logger = LoggerFactory.getLogger("pl.allegro.tech.servicemesh.envoycontrol.utils.ReactorUtils")
private val defaultScheduler by lazy { Schedulers.newSingle("reactor-utils-scheduler") }
Expand Down Expand Up @@ -50,7 +52,7 @@ fun <T> Flux<T>.measureBuffer(
fun <T> Flux<T>.measureDiscardedItems(name: String, meterRegistry: MeterRegistry): Flux<T> = this
.doOnDiscard(Any::class.java) {
meterRegistry.counter(
REACTOR_METRIC,
REACTOR_TOTAL_METRIC,
METRIC_TYPE_TAG, "discarded-items",
METRIC_EMITTER_TAG, name
).increment()
Expand Down Expand Up @@ -110,7 +112,12 @@ private fun measureQueueSubscriptionBuffer(
name: String,
meterRegistry: MeterRegistry
) {
logger.info("subscription $subscription name: $name meterRegistry: $meterRegistry")
meterRegistry.gauge(
REACTOR_METRIC,
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name),
subscription,
queueSubscriptionBufferExtractor
)
}

private fun measureScannableBuffer(
Expand All @@ -119,7 +126,49 @@ private fun measureScannableBuffer(
innerSources: Int,
meterRegistry: MeterRegistry
) {
logger.info("scannable $scannable name: $name innerSources: $innerSources meterRegistry: $meterRegistry")
val buffered = scannable.scan(Scannable.Attr.BUFFERED)
if (buffered == null) {
logger.error(
"Cannot register metric $REACTOR_METRIC 'with $METRIC_EMITTER_TAG: $name'. Buffer size not available. " +
"Use measureBuffer() only on supported reactor operators"
)
return
}

meterRegistry.gauge(
REACTOR_METRIC,
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name),
scannable,
scannableBufferExtractor
)

/**
* Special case for FlatMap derived operators like merge(). The main buffer attribute doesn't return actual
* buffer (that is controlled by `prefetch` parameter) size. Instead it returns simply number of connected sources.
*
* To access actual buffer size, we need to extract it from inners(). We don't know how many sources will
* be available, so it must be stated explicitly as innerSources parameter.
*/
for (i in 0 until innerSources) {
meterRegistry.gauge(
REACTOR_METRIC,
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "${(name)}_$i"),
scannable,
innerBufferExtractor(i)
)
}
}

private val scannableBufferExtractor = { s: Scannable -> s.scan(Scannable.Attr.BUFFERED)?.toDouble() ?: -1.0 }
private fun innerBufferExtractor(index: Int) = { s: Scannable ->
s.inners().asSequence()
.elementAtOrNull(index)
?.let(scannableBufferExtractor)
?: -1.0
}

private val queueSubscriptionBufferExtractor = { s: Fuseable.QueueSubscription<*> ->
s.size.toDouble()
}

sealed class ParallelizableScheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyIn
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.RequestPolicyMapper
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator
import pl.allegro.tech.servicemesh.envoycontrol.utils.DirectScheduler
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelScheduler
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler
Expand Down Expand Up @@ -477,8 +476,7 @@ class SnapshotUpdaterTest {
.tags(
Tags.of(
SERVICE_TAG, "example-service",
OPERATION_TAG, "create-snapshot",
METRIC_EMITTER_TAG, "snapshot-updater"
OPERATION_TAG, "create-snapshot"
)
)
.counter()?.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pl.allegro.tech.servicemesh.envoycontrol.utils
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.fail
import org.testcontainers.shaded.org.awaitility.Awaitility
Expand All @@ -13,7 +12,6 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.function.BiFunction

@Disabled
class ReactorUtilsTest {

@Test
Expand Down Expand Up @@ -100,9 +98,9 @@ class ReactorUtilsTest {
// then
assertThat(received.await(2, TimeUnit.SECONDS)).isTrue()

val discardedItemsBeforeBackpressure = meterRegistry.find(REACTOR_METRIC)
val discardedItemsBeforeBackpressure = meterRegistry.find(REACTOR_TOTAL_METRIC)
.tags(Tags.of(METRIC_TYPE_TAG, "discarded-items", METRIC_EMITTER_TAG, "latest-before")).counter()?.count()
val discardedItemsAfterBackpressure = meterRegistry.find(REACTOR_METRIC)
val discardedItemsAfterBackpressure = meterRegistry.find(REACTOR_TOTAL_METRIC)
.tags(Tags.of(METRIC_TYPE_TAG, "discarded-items", METRIC_EMITTER_TAG, "latest")).counter()?.count()

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.infrastructure
import com.ecwid.consul.v1.ConsulClient
import com.fasterxml.jackson.databind.ObjectMapper
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.ConfigurationProperties
Expand Down Expand Up @@ -40,6 +41,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.RegexServi
import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.ServiceInstancesTransformer
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.EnvoyHttpFilters
import pl.allegro.tech.servicemesh.envoycontrol.synchronization.GlobalStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.utils.CACHE_GROUP_COUNT_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_ERRORS_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_METRIC
import reactor.core.scheduler.Schedulers
import java.net.URI

Expand Down Expand Up @@ -172,7 +177,18 @@ class ControlPlaneConfig {
ConsulClient(properties.host, properties.port).agentSelf.value?.config?.datacenter ?: "local"

fun controlPlaneMetrics(meterRegistry: MeterRegistry): DefaultEnvoyControlMetrics {
return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry)
return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry).also {
meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "added"), it.servicesAdded)
meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "removed"), it.servicesRemoved)
meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "instance-changed"), it.instanceChanges)
meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "snapshot-changed"), it.snapshotChanges)
meterRegistry.gauge(CACHE_GROUP_COUNT_METRIC, it.cacheGroupsCount)
it.meterRegistry.more().counter(
WATCH_ERRORS_METRIC,
listOf(),
it.errorWatchingServices
)
}
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pl.allegro.tech.servicemesh.envoycontrol

import io.micrometer.core.instrument.Tags
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension
import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted
Expand All @@ -21,15 +20,14 @@ import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscover
import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.RDS
import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.SDS
import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.UNKNOWN
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTIONS_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.DISCOVERY_REQ_TYPE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REQUESTS_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.STREAM_TYPE_TAG
import java.util.function.Consumer
import java.util.function.Predicate

@Disabled
class XdsMetricsDiscoveryServerCallbacksTest : MetricsDiscoveryServerCallbacksTest {
companion object {

Expand Down Expand Up @@ -89,7 +87,6 @@ class XdsMetricsDiscoveryServerCallbacksTest : MetricsDiscoveryServerCallbacksTe
)
}

@Disabled
class AdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTest {
companion object {

Expand Down Expand Up @@ -149,7 +146,6 @@ class AdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTes
)
}

@Disabled
class DeltaAdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbacksTest {
companion object {

Expand Down Expand Up @@ -209,7 +205,6 @@ class DeltaAdsMetricsDiscoveryServerCallbackTest : MetricsDiscoveryServerCallbac
)
}

@Disabled
interface MetricsDiscoveryServerCallbacksTest {
companion object {
private val logger by logger()
Expand Down Expand Up @@ -251,7 +246,8 @@ interface MetricsDiscoveryServerCallbacksTest {
).isNotNull
assertThat(
meterRegistry.get(metric)
.tags(Tags.of(STREAM_TYPE_TAG, type.name.lowercase(), CONNECTION_TYPE_TAG, "grpc")).gauge().value()
.tags(Tags.of(STREAM_TYPE_TAG, type.name.lowercase(), CONNECTION_TYPE_TAG, "grpc")).gauge()
.value()
.toInt()
).isEqualTo(value)
}
Expand Down

0 comments on commit 8ea396a

Please sign in to comment.