Skip to content

Commit

Permalink
Add barebones Netty event-loop metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
vkostyukov committed Mar 18, 2023
1 parent 72de6ff commit 59624c3
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 0 deletions.
10 changes: 10 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/CommonPools.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.util.concurrent.ScheduledExecutorService;

import com.linecorp.armeria.client.ClientFactoryBuilder;
import com.linecorp.armeria.common.metric.MeterIdPrefix;
import com.linecorp.armeria.common.metric.MoreMetrics;
import com.linecorp.armeria.common.metric.NettyMetrics;
import com.linecorp.armeria.common.util.BlockingTaskExecutor;
import com.linecorp.armeria.common.util.EventLoopGroups;
import com.linecorp.armeria.server.ServerBuilder;
Expand All @@ -36,6 +39,13 @@ public final class CommonPools {
private static final EventLoopGroup WORKER_GROUP =
EventLoopGroups.newEventLoopGroup(Flags.numCommonWorkers(), "armeria-common-worker", true);

static {
// Bind NettyMetrics for the common worker group.
MoreMetrics
.nettyMetrics(new MeterIdPrefix("armeria.netty.common"), WORKER_GROUP)
.bindTo(Flags.meterRegistry());
}

/**
* Returns the default common blocking task {@link ScheduledExecutorService} which is used for
* potentially long-running tasks which may block I/O threads.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.linecorp.armeria.common.metric;

import io.netty.channel.EventLoopGroup;

/**
* Provides useful {@link io.micrometer.core.instrument.binder.MeterBinder}s to monitor various Armeria components.
*/
public final class MoreMetrics {

/**
* Creates {@link NettyMetrics} for a given {{eventLoopGroup}}.
*/
public static NettyMetrics nettyMetrics(MeterIdPrefix idPrefix, EventLoopGroup eventLoopGroup) {
return new NettyMetrics(idPrefix, eventLoopGroup);
}

private MoreMetrics() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.metric;

import com.linecorp.armeria.common.util.Ticker;
import com.linecorp.armeria.internal.common.metric.MicrometerUtil;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.concurrent.EventExecutor;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;

/**
* A {{MeterBinder}} to observe Netty's runtime.
*
* The following stats are currently exported per registered {{MeterIdPrefix}}. Currently, we distinguish between
* server and client metrics and export cumulative metrics under `armeria.client.netty` and `armeria.server.netty`.
*
* - "event.loops.num.workers" (gauge) - the total number of Netty's event loops
* - "event.loops.pending.tasks" (gauge) - the total number of IO tasks waiting to be run on event loops
*/
public final class NettyMetrics implements MeterBinder {

private MeterIdPrefix idPrefix;
private EventLoopGroup eventLoopGroup;

public NettyMetrics(MeterIdPrefix idPrefix, EventLoopGroup eventLoopGroup) {
this.idPrefix = requireNonNull(idPrefix, "idPrefix");
this.eventLoopGroup = requireNonNull(eventLoopGroup, "eventLoopGroup");
}

@Override
public void bindTo(MeterRegistry registry) {
final Self metrics = MicrometerUtil.register(registry, idPrefix, Self.class, Self::new);
metrics.add(eventLoopGroup);
}

private static final long UPDATE_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(3);

static final class Self {
private Set<EventLoopGroup> registry = ConcurrentHashMap.newKeySet(2);
private final Ticker ticker;

private volatile double lastPendingTasks;
private volatile double lastNumWorkers;
private volatile long lastStatsUpdateTime;

Self(MeterRegistry parent, MeterIdPrefix idPrefix) {
this(parent, idPrefix, Ticker.systemTicker());
}

Self(MeterRegistry parent, MeterIdPrefix idPrefix, Ticker ticker) {
this.ticker = requireNonNull(ticker, "ticker");

final String numWorkers = idPrefix.name("event.loops.num.workers");
parent.gauge(numWorkers, idPrefix.tags(), this, metrics -> metrics.numWorkers());

final String pendingTasks = idPrefix.name("event.loops.pending.tasks");
parent.gauge(pendingTasks, idPrefix.tags(), this, metrics -> metrics.pendingTasks());
}

// This runs every 3 seconds at most.
private void recomputeStatsIfNeeded() {
final long currentTimeNanos = ticker.read();
if (currentTimeNanos - lastStatsUpdateTime < UPDATE_INTERVAL_NANOS) {
return;
}

int numWorkers = 0;
int pendingTasks = 0;
for (EventLoopGroup group: registry) {

// Purge event loop groups that were shutdown.
if (group.isShutdown()) {
registry.remove(group);
continue;
}

for (EventExecutor eventLoop: group) {
if (eventLoop instanceof SingleThreadEventLoop) {
pendingTasks += ((SingleThreadEventLoop) eventLoop).pendingTasks();
}
numWorkers += 1;
}
}

lastNumWorkers = numWorkers;
lastPendingTasks = pendingTasks;
lastStatsUpdateTime = currentTimeNanos;
}

void add(EventLoopGroup eventLoopGroup) {
registry.add(eventLoopGroup);
}

double numWorkers() {
recomputeStatsIfNeeded();
return lastNumWorkers;
}

double pendingTasks() {
recomputeStatsIfNeeded();
return lastPendingTasks;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.linecorp.armeria.common.metric;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class NettyMetricsTest {

private class BlockMe extends CountDownLatch implements Runnable {

BlockMe() {
super(1);
}

@Override
public void run() {
try {
await();
} catch (Throwable ignored) {
}
}
}


@Test
public void test() {
MeterRegistry registry = new SimpleMeterRegistry();
NettyMetrics.Self metrics =
new NettyMetrics.Self(
registry,
new MeterIdPrefix("foo")
);

BlockMe task = new BlockMe();

EventLoopGroup workers = new DefaultEventLoopGroup(2);
// Block both executors
workers.submit(task);
workers.submit(task);

workers.submit(() -> {});

metrics.add(workers);

// Check that API works as expected
assertThat(metrics.pendingTasks()).isEqualTo(1.0);
assertThat(metrics.numWorkers()).isEqualTo(2.0);

// Check that metrics are exported
assertThat(MoreMeters.measureAll(registry))
.containsEntry("foo.event.loops.num.workers#value", 2.0)
.containsEntry("foo.event.loops.pending.tasks#value", 1.0);

// Release & shutdown threads
task.countDown();

await().untilAsserted(() ->
assertThat(
MoreMeters.measureAll(registry))
.containsEntry("foo.event.loops.num.workers#value", 2.0)
.containsEntry("foo.event.loops.pending.tasks#value", 0.0));

workers.shutdownGracefully();
}
}

0 comments on commit 59624c3

Please sign in to comment.