Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 Migrated metrics to prometheus | re…
Browse files Browse the repository at this point in the history
…moved some metrics [2]
  • Loading branch information
nastassia-dailidava committed Oct 18, 2024
1 parent fa27450 commit c1d47b1
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.COMMUNICATION_MODE_ERROR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_ERROR_METRIC
Expand Down Expand Up @@ -64,11 +62,6 @@ class SnapshotUpdater(
)
.measureBuffer("snapshot-updater", meterRegistry, innerSources = 2)
.checkpoint("snapshot-updater-merged")
.name(SNAPSHOT_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "merged")
.tag(UPDATE_TRIGGER_TAG, "global")
.metrics()
// step 3: group updates don't provide a snapshot,
// so we piggyback the last updated snapshot state for use
.scan { previous: UpdateResult, newUpdate: UpdateResult ->
Expand Down Expand Up @@ -126,17 +119,11 @@ class SnapshotUpdater(

internal fun services(states: Flux<MultiClusterState>): Flux<UpdateResult> {
return states
.name(SERVICES_STATE_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(CHECKPOINT_TAG, "sampled")
.onBackpressureLatestMeasured("snapshot-updater", meterRegistry)
// prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure
.publishOn(globalSnapshotScheduler, 1)
.measureBuffer("snapshot-updater", meterRegistry)
.checkpoint("snapshot-updater-services-published")
.name(SERVICES_STATE_METRIC)
.tag(CHECKPOINT_TAG, "published")
.metrics()
.createClusterConfigurations()
.map { (states, clusters) ->
var lastXdsSnapshot: GlobalSnapshot? = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import io.micrometer.core.instrument.MeterRegistry
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured
Expand Down Expand Up @@ -47,10 +44,6 @@ class GlobalStateChanges(
.logSuppressedError("combineLatest() suppressed exception")
.measureBuffer("global-service-changes-combinator", meterRegistry)
.checkpoint("global-service-changes-emitted")
.name(SERVICES_STATE_METRIC)
.tag(METRIC_EMITTER_TAG, "global-service-changes")
.tag(CHECKPOINT_TAG, "combined")
.metrics()
}

private fun combinedExperimentalFlow(
Expand All @@ -77,13 +70,7 @@ class GlobalStateChanges(
.logSuppressedError("combineLatest() suppressed exception")
.measureBuffer("global-service-changes-combine-latest", meterRegistry)
.checkpoint("global-service-changes-emitted")
.name(SERVICES_STATE_METRIC)
.tag(METRIC_EMITTER_TAG, "global-service-changes")
.tag(CHECKPOINT_TAG, "emitted")
.onBackpressureLatestMeasured("global-service-changes-backpressure", meterRegistry)
.publishOn(scheduler, 1)
.checkpoint("global-service-changes-published")
.tag(CHECKPOINT_TAG, "published")
.metrics()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization
import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlProperties
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
import reactor.core.publisher.Flux

class RemoteClusterStateChanges(
Expand All @@ -17,8 +14,4 @@ class RemoteClusterStateChanges(
.getChanges(properties.sync.pollingInterval)
.startWith(MultiClusterState.empty())
.distinctUntilChanged()
.name(SERVICES_STATE_METRIC)
.tag(METRIC_EMITTER_TAG, "remote-cluster-changes")
.tag(CHECKPOINT_TAG, "cross-dc")
.metrics()
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.allegro.tech.servicemesh.envoycontrol.utils

import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import org.reactivestreams.Subscription
import org.slf4j.LoggerFactory
import reactor.core.Disposable
Expand All @@ -12,7 +11,6 @@ import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.time.Duration
import java.util.concurrent.TimeUnit
import kotlin.streams.asSequence

private val logger = LoggerFactory.getLogger("pl.allegro.tech.servicemesh.envoycontrol.utils.ReactorUtils")
private val defaultScheduler by lazy { Schedulers.newSingle("reactor-utils-scheduler") }
Expand Down Expand Up @@ -112,12 +110,7 @@ private fun measureQueueSubscriptionBuffer(
name: String,
meterRegistry: MeterRegistry
) {
meterRegistry.gauge(
REACTOR_METRIC,
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name),
subscription,
queueSubscriptionBufferExtractor
)
logger.info("subscription $subscription name: $name meterRegistry: $meterRegistry")
}

private fun measureScannableBuffer(
Expand All @@ -126,49 +119,7 @@ private fun measureScannableBuffer(
innerSources: Int,
meterRegistry: MeterRegistry
) {
val buffered = scannable.scan(Scannable.Attr.BUFFERED)
if (buffered == null) {
logger.error(
"Cannot register metric $REACTOR_METRIC 'with $METRIC_EMITTER_TAG: $name'. Buffer size not available. " +
"Use measureBuffer() only on supported reactor operators"
)
return
}

meterRegistry.gauge(
REACTOR_METRIC,
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name),
scannable,
scannableBufferExtractor
)

/**
* Special case for FlatMap derived operators like merge(). The main buffer attribute doesn't return actual
* buffer (that is controlled by `prefetch` parameter) size. Instead it returns simply number of connected sources.
*
* To access actual buffer size, we need to extract it from inners(). We don't know how many sources will
* be available, so it must be stated explicitly as innerSources parameter.
*/
for (i in 0 until innerSources) {
meterRegistry.gauge(
REACTOR_METRIC,
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "${(name)}_$i"),
scannable,
innerBufferExtractor(i)
)
}
}

private val scannableBufferExtractor = { s: Scannable -> s.scan(Scannable.Attr.BUFFERED)?.toDouble() ?: -1.0 }
private fun innerBufferExtractor(index: Int) = { s: Scannable ->
s.inners().asSequence()
.elementAtOrNull(index)
?.let(scannableBufferExtractor)
?: -1.0
}

private val queueSubscriptionBufferExtractor = { s: Fuseable.QueueSubscription<*> ->
s.size.toDouble()
logger.info("scannable $scannable name: $name innerSources: $innerSources meterRegistry: $meterRegistry")
}

sealed class ParallelizableScheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.utils
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.fail
import org.testcontainers.shaded.org.awaitility.Awaitility
Expand All @@ -12,6 +13,7 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.function.BiFunction

@Disabled
class ReactorUtilsTest {

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.server.ReadinessStateHandler
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.ENVOY_CONTROL_WARM_UP_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureDiscardedItems
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
Expand Down Expand Up @@ -57,12 +54,7 @@ class ConsulServiceChanges(
)
.measureDiscardedItems("consul-service-changes", metrics.meterRegistry)
.checkpoint("consul-service-changes-emitted")
.name(SERVICES_STATE_METRIC)
.tag(METRIC_EMITTER_TAG, "consul-service-changes")
.tag(CHECKPOINT_TAG, "emitted")
.checkpoint("consul-service-changes-emitted-distinct")
.tag(CHECKPOINT_TAG, "distinct")
.metrics()
.doOnCancel {
logger.warn("Cancelling watching consul service changes")
watcher.close()
Expand Down

0 comments on commit c1d47b1

Please sign in to comment.