Skip to content

Commit

Permalink
Merge branch '1.13.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
shakuzen committed Jul 24, 2024
2 parents 710075a + b3ab39e commit 3df85e2
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.micrometer.core.instrument.internal.DefaultMeter;
import io.micrometer.core.instrument.util.HierarchicalNameMapper;
import io.micrometer.statsd.internal.*;
import io.netty.channel.Channel;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.util.AttributeKey;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -94,6 +95,9 @@ public class StatsdMeterRegistry extends MeterRegistry {

Disposable.Swap statsdConnection = Disposables.swap();

@Nullable
private Channel flushableChannel;

private Disposable.Swap meterPoller = Disposables.swap();

@Nullable
Expand Down Expand Up @@ -295,6 +299,7 @@ private void connectAndSubscribe(UdpClient udpClient) {
private void retryReplaceClient(Mono<? extends Connection> connectMono) {
connectMono.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)).maxBackoff(Duration.ofMinutes(1)))
.subscribe(connection -> {
this.flushableChannel = connection.channel();
this.statsdConnection.replace(connection);

// now that we're connected, start polling gauges and other pollable
Expand All @@ -309,6 +314,9 @@ private void startPolling() {

public void stop() {
if (started.compareAndSet(true, false)) {
if (this.flushableChannel != null) {
this.flushableChannel.flush();
}
if (statsdConnection.get() != null) {
statsdConnection.get().dispose();
}
Expand All @@ -321,6 +329,7 @@ public void stop() {
@Override
public void close() {
poll();
this.sink.complete();
stop();
super.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,25 @@ void cleanUp() {
}
}

@ParameterizedTest
@EnumSource(StatsdProtocol.class)
void receiveAllBufferedMetricsAfterCloseSuccessfully(StatsdProtocol protocol) throws InterruptedException {
skipUdsTestOnWindows(protocol);
serverLatch = new CountDownLatch(3);
server = startServer(protocol, 0);

final int port = getPort(protocol);
meterRegistry = new StatsdMeterRegistry(getBufferedConfig(protocol, port), Clock.SYSTEM);
startRegistryAndWaitForClient();
Thread.sleep(1000);
Counter counter = Counter.builder("my.counter").register(meterRegistry);
counter.increment();
counter.increment();
counter.increment();
meterRegistry.close();
assertThat(serverLatch.await(5, TimeUnit.SECONDS)).isTrue();
}

@ParameterizedTest
@EnumSource(StatsdProtocol.class)
void receiveMetricsSuccessfully(StatsdProtocol protocol) throws InterruptedException {
Expand Down Expand Up @@ -336,11 +355,14 @@ private DisposableChannel startServer(StatsdProtocol protocol, int port) {
return UdpServer.create()
.bindAddress(() -> protocol == StatsdProtocol.UDP
? InetSocketAddress.createUnresolved("localhost", port) : newDomainSocketAddress())
.handle((in, out) -> in.receive().asString().flatMap(packet -> {
serverLatch.countDown();
serverMetricReadCount.getAndIncrement();
return Flux.never();
}))
.handle((in, out) -> in.receive()
.asString()
.flatMap(packet -> Flux.just(packet.split("\n")))
.flatMap(packetLine -> {
serverLatch.countDown();
serverMetricReadCount.getAndIncrement();
return Flux.never();
}))
.doOnBound((server) -> bound = true)
.doOnUnbound((server) -> bound = false)
.wiretap("udpserver", LogLevel.INFO)
Expand All @@ -351,14 +373,17 @@ else if (protocol == StatsdProtocol.TCP) {
return TcpServer.create()
.host("localhost")
.port(port)
.handle((in, out) -> in.receive().asString().flatMap(packet -> {
IntStream.range(0, packet.split("my.counter").length - 1).forEach(i -> {
serverLatch.countDown();
serverMetricReadCount.getAndIncrement();
});
in.withConnection(channel::set);
return Flux.never();
}))
.handle((in, out) -> in.receive()
.asString()
.flatMap(packet -> Flux.just(packet.split("\n")))
.flatMap(packetLine -> {
IntStream.range(0, packetLine.split("my.counter").length - 1).forEach(i -> {
serverLatch.countDown();
serverMetricReadCount.getAndIncrement();
});
in.withConnection(channel::set);
return Flux.never();
}))
.doOnBound((server) -> bound = true)
.doOnUnbound((server) -> {
bound = false;
Expand Down Expand Up @@ -388,6 +413,14 @@ private static DomainSocketAddress newDomainSocketAddress() {
}

private StatsdConfig getUnbufferedConfig(StatsdProtocol protocol, int port) {
return getConfig(protocol, port, false);
}

private StatsdConfig getBufferedConfig(StatsdProtocol protocol, int port) {
return getConfig(protocol, port, true);
}

private StatsdConfig getConfig(StatsdProtocol protocol, int port, boolean buffered) {
return new StatsdConfig() {
@Override
public String get(String key) {
Expand All @@ -411,7 +444,7 @@ public StatsdProtocol protocol() {

@Override
public boolean buffered() {
return false;
return buffered;
}
};
}
Expand Down

0 comments on commit 3df85e2

Please sign in to comment.