From 84a662f337a74de79c21594c196dfe02aad82d83 Mon Sep 17 00:00:00 2001 From: BJMg Date: Wed, 24 Jul 2024 11:00:57 +0200 Subject: [PATCH] Flush channel on close so buffered metrics are not lost (#5195) We thought disposing the connection was enough to flush any buffered metrics, but this was not flushing the channel. This was missed by our IT tests (StatsdMeterRegistryPublishTest) because we did not use buffered config. This makes sure to flush the channel before disposing the connection so any buffered statsd lines are sent on close. Resolves gh-2141 --- .../statsd/StatsdMeterRegistry.java | 9 +++ .../StatsdMeterRegistryPublishTest.java | 61 ++++++++++++++----- 2 files changed, 56 insertions(+), 14 deletions(-) diff --git a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java index 793f3d1440..cea5907cca 100644 --- a/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java +++ b/implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java @@ -26,6 +26,7 @@ import io.micrometer.core.lang.Nullable; import io.micrometer.core.util.internal.logging.WarnThenDebugLogger; import io.micrometer.statsd.internal.*; +import io.netty.channel.Channel; import io.netty.channel.unix.DomainSocketAddress; import io.netty.util.AttributeKey; import org.reactivestreams.Publisher; @@ -94,6 +95,9 @@ public class StatsdMeterRegistry extends MeterRegistry { Disposable.Swap statsdConnection = Disposables.swap(); + @Nullable + private Channel flushableChannel; + private Disposable.Swap meterPoller = Disposables.swap(); @Nullable @@ -295,6 +299,7 @@ private void connectAndSubscribe(UdpClient udpClient) { private void retryReplaceClient(Mono connectMono) { connectMono.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)).maxBackoff(Duration.ofMinutes(1))) .subscribe(connection -> { + this.flushableChannel = connection.channel(); this.statsdConnection.replace(connection); // now that we're connected, start polling gauges and other pollable @@ -309,6 +314,9 @@ private void startPolling() { public void stop() { if (started.compareAndSet(true, false)) { + if (this.flushableChannel != null) { + this.flushableChannel.flush(); + } if (statsdConnection.get() != null) { statsdConnection.get().dispose(); } @@ -321,6 +329,7 @@ public void stop() { @Override public void close() { poll(); + this.sink.complete(); stop(); super.close(); } diff --git a/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java b/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java index 054d802430..2414fec1b9 100644 --- a/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java +++ b/implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java @@ -79,6 +79,25 @@ void cleanUp() { } } + @ParameterizedTest + @EnumSource(StatsdProtocol.class) + void receiveAllBufferedMetricsAfterCloseSuccessfully(StatsdProtocol protocol) throws InterruptedException { + skipUdsTestOnWindows(protocol); + serverLatch = new CountDownLatch(3); + server = startServer(protocol, 0); + + final int port = getPort(protocol); + meterRegistry = new StatsdMeterRegistry(getBufferedConfig(protocol, port), Clock.SYSTEM); + startRegistryAndWaitForClient(); + Thread.sleep(1000); + Counter counter = Counter.builder("my.counter").register(meterRegistry); + counter.increment(); + counter.increment(); + counter.increment(); + meterRegistry.close(); + assertThat(serverLatch.await(5, TimeUnit.SECONDS)).isTrue(); + } + @ParameterizedTest @EnumSource(StatsdProtocol.class) void receiveMetricsSuccessfully(StatsdProtocol protocol) throws InterruptedException { @@ -336,11 +355,14 @@ private DisposableChannel startServer(StatsdProtocol protocol, int port) { return UdpServer.create() .bindAddress(() -> protocol == StatsdProtocol.UDP ? InetSocketAddress.createUnresolved("localhost", port) : newDomainSocketAddress()) - .handle((in, out) -> in.receive().asString().flatMap(packet -> { - serverLatch.countDown(); - serverMetricReadCount.getAndIncrement(); - return Flux.never(); - })) + .handle((in, out) -> in.receive() + .asString() + .flatMap(packet -> Flux.just(packet.split("\n"))) + .flatMap(packetLine -> { + serverLatch.countDown(); + serverMetricReadCount.getAndIncrement(); + return Flux.never(); + })) .doOnBound((server) -> bound = true) .doOnUnbound((server) -> bound = false) .wiretap("udpserver", LogLevel.INFO) @@ -351,14 +373,17 @@ else if (protocol == StatsdProtocol.TCP) { return TcpServer.create() .host("localhost") .port(port) - .handle((in, out) -> in.receive().asString().flatMap(packet -> { - IntStream.range(0, packet.split("my.counter").length - 1).forEach(i -> { - serverLatch.countDown(); - serverMetricReadCount.getAndIncrement(); - }); - in.withConnection(channel::set); - return Flux.never(); - })) + .handle((in, out) -> in.receive() + .asString() + .flatMap(packet -> Flux.just(packet.split("\n"))) + .flatMap(packetLine -> { + IntStream.range(0, packetLine.split("my.counter").length - 1).forEach(i -> { + serverLatch.countDown(); + serverMetricReadCount.getAndIncrement(); + }); + in.withConnection(channel::set); + return Flux.never(); + })) .doOnBound((server) -> bound = true) .doOnUnbound((server) -> { bound = false; @@ -388,6 +413,14 @@ private static DomainSocketAddress newDomainSocketAddress() { } private StatsdConfig getUnbufferedConfig(StatsdProtocol protocol, int port) { + return getConfig(protocol, port, false); + } + + private StatsdConfig getBufferedConfig(StatsdProtocol protocol, int port) { + return getConfig(protocol, port, true); + } + + private StatsdConfig getConfig(StatsdProtocol protocol, int port, boolean buffered) { return new StatsdConfig() { @Override public String get(String key) { @@ -411,7 +444,7 @@ public StatsdProtocol protocol() { @Override public boolean buffered() { - return false; + return buffered; } }; }