Skip to content

Commit

Permalink
Add barebones Netty event-loop metrics (#4750)
Browse files Browse the repository at this point in the history
Motivation:

We want to export some very basic metrics about the underlying Netty runtime. Let's start with the number of event-loops and the number of IO tasks waiting to be executed. 

Modifications:

- Added `EventLoopMetrics`, which implements `MeterBinder`.
- The approach to count pending-io-tasks is highly inspired by [this gauge from Finagle](https://github.com/twitter/finagle/blob/develop/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/WorkerEventLoop.scala#L32-L47). It's also pretty similar how we do it internally at Databricks.
- Registered `CommonPools` event-loop-groups for metrics collection.

Result:

- Closes #4675
  • Loading branch information
vkostyukov authored Mar 28, 2023
1 parent 1673327 commit 2dccda3
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.ScheduledExecutorService;

import com.linecorp.armeria.client.ClientFactoryBuilder;
import com.linecorp.armeria.common.metric.MoreMeterBinders;
import com.linecorp.armeria.common.util.BlockingTaskExecutor;
import com.linecorp.armeria.common.util.EventLoopGroups;
import com.linecorp.armeria.server.ServerBuilder;
Expand All @@ -36,6 +37,13 @@ public final class CommonPools {
private static final EventLoopGroup WORKER_GROUP =
EventLoopGroups.newEventLoopGroup(Flags.numCommonWorkers(), "armeria-common-worker", true);

static {
// Bind EventLoopMetrics for the common worker group.
MoreMeterBinders
.eventLoopMetrics(WORKER_GROUP, "common")
.bindTo(Flags.meterRegistry());
}

/**
* Returns the default common blocking task {@link ScheduledExecutorService} which is used for
* potentially long-running tasks which may block I/O threads.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.metric;

import static java.util.Objects.requireNonNull;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.collect.Iterators;

import com.linecorp.armeria.internal.common.metric.MicrometerUtil;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.concurrent.EventExecutor;

/**
* A {@link MeterBinder} to observe Netty {@link EventLoopGroup}s. The following stats are currently
* exported per registered {@link MeterIdPrefix}.
*
* <ul>
* <li>"event.loop.num.workers" (gauge) - the total number of Netty's event loops</li>
* <li>"event.loop.pending.tasks" (gauge)
* - the total number of IO tasks waiting to be run on event loops</li>
* </ul>
**/
final class EventLoopMetrics implements MeterBinder {

private final EventLoopGroup eventLoopGroup;
private final MeterIdPrefix idPrefix;

/**
* Creates an instance of {@link EventLoopMetrics}.
*/
EventLoopMetrics(EventLoopGroup eventLoopGroup, String name) {
this.eventLoopGroup = requireNonNull(eventLoopGroup, "eventLoopGroup");
idPrefix = new MeterIdPrefix("armeria.netty").append(requireNonNull(name, "name"));
}

@Override
public void bindTo(MeterRegistry registry) {
final Self metrics = MicrometerUtil.register(registry, idPrefix, Self.class, Self::new);
metrics.add(eventLoopGroup);
}

/**
* An actual implementation of {@link EventLoopMetrics}.
*/
static final class Self {
private final Set<EventLoopGroup> registry = ConcurrentHashMap.newKeySet(2);

Self(MeterRegistry parent, MeterIdPrefix idPrefix) {

final String numWorkers = idPrefix.name("event.loop.workers");
parent.gauge(numWorkers, idPrefix.tags(), this, Self::numWorkers);

final String pendingTasks = idPrefix.name("event.loop.pending.tasks");
parent.gauge(pendingTasks, idPrefix.tags(), this, Self::pendingTasks);
}

void add(EventLoopGroup eventLoopGroup) {
registry.add(eventLoopGroup);
}

double numWorkers() {
int result = 0;
for (EventLoopGroup group : registry) {
// Purge event loop groups that were shutdown.
if (group.isShutdown()) {
registry.remove(group);
continue;
}
result += Iterators.size(group.iterator());
}
return result;
}

double pendingTasks() {
int result = 0;
for (EventLoopGroup group : registry) {
// Purge event loop groups that were shutdown.
if (group.isShutdown()) {
registry.remove(group);
continue;
}
for (EventExecutor eventLoop : group) {
if (eventLoop instanceof SingleThreadEventLoop) {
result += ((SingleThreadEventLoop) eventLoop).pendingTasks();
}
}
}
return result;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.metric;

import com.linecorp.armeria.common.annotation.UnstableApi;

import io.micrometer.core.instrument.binder.MeterBinder;
import io.netty.channel.EventLoopGroup;

/**
* Provides useful {@link MeterBinder}s to monitor various Armeria components.
*/
public final class MoreMeterBinders {

/**
* Returns a new {@link MeterBinder} to observe Netty's {@link EventLoopGroup}s. The following stats are
* currently exported per registered {@link MeterIdPrefix}.
*
* <ul>
* <li>"event.loop.num.workers" (gauge) - the total number of Netty's event loops</li>
* <li>"event.loop.pending.tasks" (gauge)
* - the total number of IO tasks waiting to be run on event loops</li>
* </ul>
*/
@UnstableApi
public static MeterBinder eventLoopMetrics(EventLoopGroup eventLoopGroup, String name) {
return new EventLoopMetrics(eventLoopGroup, name);
}

private MoreMeterBinders() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.metric;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;

class EventLoopMetricsTest {

private class BlockMe extends CountDownLatch implements Runnable {

AtomicInteger run = new AtomicInteger();

BlockMe() {
super(1);
}

@Override
public void run() {
run.incrementAndGet();
try {
await();
} catch (Throwable ignored) {
}
}
}

@Test
void test() {
final MeterRegistry registry = new SimpleMeterRegistry();
final EventLoopMetrics.Self metrics =
new EventLoopMetrics.Self(
registry,
new MeterIdPrefix("foo")
);

final BlockMe task = new BlockMe();

final EventLoopGroup workers = new DefaultEventLoopGroup(2);
// Block both executors
workers.submit(task);
workers.submit(task);

await().untilAtomic(task.run, Matchers.equalTo(2));

workers.submit(() -> {});

metrics.add(workers);

// Check that API works as expected
assertThat(metrics.pendingTasks()).isEqualTo(1.0);
assertThat(metrics.numWorkers()).isEqualTo(2.0);

// Check that metrics are exported
assertThat(MoreMeters.measureAll(registry))
.containsEntry("foo.event.loop.workers#value", 2.0)
.containsEntry("foo.event.loop.pending.tasks#value", 1.0);

// Release & shutdown threads
task.countDown();

await().untilAsserted(() ->
assertThat(
MoreMeters.measureAll(registry))
.containsEntry("foo.event.loop.workers#value", 2.0)
.containsEntry("foo.event.loop.pending.tasks#value", 0.0));

workers.shutdownGracefully();
}
}

0 comments on commit 2dccda3

Please sign in to comment.