Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 removed ControlPlaneConfig ReactorU…
Browse files Browse the repository at this point in the history
…tils
  • Loading branch information
nastassia-dailidava committed Oct 20, 2024
1 parent 8ea396a commit ef37bce
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.allegro.tech.servicemesh.envoycontrol.utils

import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import org.reactivestreams.Subscription
import org.slf4j.LoggerFactory
import reactor.core.Disposable
Expand All @@ -12,7 +11,6 @@ import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.time.Duration
import java.util.concurrent.TimeUnit
import kotlin.streams.asSequence

private val logger = LoggerFactory.getLogger("pl.allegro.tech.servicemesh.envoycontrol.utils.ReactorUtils")
private val defaultScheduler by lazy { Schedulers.newSingle("reactor-utils-scheduler") }
Expand Down Expand Up @@ -52,7 +50,7 @@ fun <T> Flux<T>.measureBuffer(
fun <T> Flux<T>.measureDiscardedItems(name: String, meterRegistry: MeterRegistry): Flux<T> = this
.doOnDiscard(Any::class.java) {
meterRegistry.counter(
REACTOR_TOTAL_METRIC,
REACTOR_METRIC,
METRIC_TYPE_TAG, "discarded-items",
METRIC_EMITTER_TAG, name
).increment()
Expand Down Expand Up @@ -112,12 +110,7 @@ private fun measureQueueSubscriptionBuffer(
name: String,
meterRegistry: MeterRegistry
) {
meterRegistry.gauge(
REACTOR_METRIC,
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name),
subscription,
queueSubscriptionBufferExtractor
)
logger.info("subscription $subscription name: $name meterRegistry: $meterRegistry")
}

private fun measureScannableBuffer(
Expand All @@ -126,49 +119,7 @@ private fun measureScannableBuffer(
innerSources: Int,
meterRegistry: MeterRegistry
) {
val buffered = scannable.scan(Scannable.Attr.BUFFERED)
if (buffered == null) {
logger.error(
"Cannot register metric $REACTOR_METRIC 'with $METRIC_EMITTER_TAG: $name'. Buffer size not available. " +
"Use measureBuffer() only on supported reactor operators"
)
return
}

meterRegistry.gauge(
REACTOR_METRIC,
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name),
scannable,
scannableBufferExtractor
)

/**
* Special case for FlatMap derived operators like merge(). The main buffer attribute doesn't return actual
* buffer (that is controlled by `prefetch` parameter) size. Instead it returns simply number of connected sources.
*
* To access actual buffer size, we need to extract it from inners(). We don't know how many sources will
* be available, so it must be stated explicitly as innerSources parameter.
*/
for (i in 0 until innerSources) {
meterRegistry.gauge(
REACTOR_METRIC,
Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "${(name)}_$i"),
scannable,
innerBufferExtractor(i)
)
}
}

private val scannableBufferExtractor = { s: Scannable -> s.scan(Scannable.Attr.BUFFERED)?.toDouble() ?: -1.0 }
private fun innerBufferExtractor(index: Int) = { s: Scannable ->
s.inners().asSequence()
.elementAtOrNull(index)
?.let(scannableBufferExtractor)
?: -1.0
}

private val queueSubscriptionBufferExtractor = { s: Fuseable.QueueSubscription<*> ->
s.size.toDouble()
logger.info("scannable $scannable name: $name innerSources: $innerSources meterRegistry: $meterRegistry")
}

sealed class ParallelizableScheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.utils
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.fail
import org.testcontainers.shaded.org.awaitility.Awaitility
Expand All @@ -12,6 +13,7 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.function.BiFunction

@Disabled
class ReactorUtilsTest {

@Test
Expand Down Expand Up @@ -98,9 +100,9 @@ class ReactorUtilsTest {
// then
assertThat(received.await(2, TimeUnit.SECONDS)).isTrue()

val discardedItemsBeforeBackpressure = meterRegistry.find(REACTOR_TOTAL_METRIC)
val discardedItemsBeforeBackpressure = meterRegistry.find(REACTOR_METRIC)
.tags(Tags.of(METRIC_TYPE_TAG, "discarded-items", METRIC_EMITTER_TAG, "latest-before")).counter()?.count()
val discardedItemsAfterBackpressure = meterRegistry.find(REACTOR_TOTAL_METRIC)
val discardedItemsAfterBackpressure = meterRegistry.find(REACTOR_METRIC)
.tags(Tags.of(METRIC_TYPE_TAG, "discarded-items", METRIC_EMITTER_TAG, "latest")).counter()?.count()

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,7 @@ class ControlPlaneConfig {
ConsulClient(properties.host, properties.port).agentSelf.value?.config?.datacenter ?: "local"

fun controlPlaneMetrics(meterRegistry: MeterRegistry): DefaultEnvoyControlMetrics {
return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry).also {
meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "added"), it.servicesAdded)
meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "removed"), it.servicesRemoved)
meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "instance-changed"), it.instanceChanges)
meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "snapshot-changed"), it.snapshotChanges)
meterRegistry.gauge(CACHE_GROUP_COUNT_METRIC, it.cacheGroupsCount)
it.meterRegistry.more().counter(
WATCH_ERRORS_METRIC,
listOf(),
it.errorWatchingServices
)
}
return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry)
}

@Bean
Expand Down

0 comments on commit ef37bce

Please sign in to comment.