diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d021aa55ad..e6ae18b8ef 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,6 +12,8 @@ Runtime Behavior Changes * finagle-mysql: (Testing behaviour change only) Updated mysql version expected by integration tests to 8.0.21. Added README in integration tests noting that this must exist for integration tests to run. ``PHAB_ID=D1152235`` +* finagle-netty4: `EventLoopGroupTracker` (previously named `EventLoopGroupExecutionDelayTracker`) now collects + stats cpu_time_ms and active_sockets per netty worker thread. New Features @@ -19,6 +21,16 @@ New Features * finagle-mysql: Added support for LONG_BLOB data type. ``PHAB_ID=D1152247`` + +Breaking API Changes +~~~~~~~~~~~~~~~~~~~~ + +* finagle-netty4: `c.t.f.netty4.threading.EventLoopGroupExecutionDelayTracker` has been renamed to + `EventLoopGroupTracker`, `c.t.f.netty4.threading.TrackWorkerPoolExecutionDelay` has been renamed to + `TrackWorkerPoolExcutionDelay`, `c.t.f.netty4.param.TrackWorkerPoolExecutionDelay` has been renamed + to `TrackWorkerPool`. These changes reflect the tracker's new functionality of collecting metrics + and data other than the execution delay (see Runtime Behaviour Changes). ``PHAB_ID=D1176906`` + 24.5.0 ------ diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/ListeningServerBuilder.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/ListeningServerBuilder.scala index 4f754b807b..3766369eff 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/ListeningServerBuilder.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/ListeningServerBuilder.scala @@ -4,7 +4,7 @@ import com.twitter.finagle.ListeningServer import com.twitter.finagle.Stack import com.twitter.finagle.netty4.channel.Netty4FramedServerChannelInitializer import com.twitter.finagle.netty4.channel.Netty4RawServerChannelInitializer -import com.twitter.finagle.netty4.threading.EventLoopGroupExecutionDelayTracker +import com.twitter.finagle.netty4.threading.EventLoopGroupTracker import com.twitter.finagle.param.Stats import com.twitter.finagle.param.Timer import com.twitter.finagle.server.Listener @@ -221,13 +221,13 @@ private[finagle] class ListeningServerBuilder( def boundAddress: SocketAddress = ch.localAddress() - private[this] val workerPoolExecutionDelayTrackingSettings = - params[param.TrackWorkerPoolExecutionDelay] - if (workerPoolExecutionDelayTrackingSettings.enableTracking) { - EventLoopGroupExecutionDelayTracker.track( + private[this] val workerPoolTrackingSettings = + params[param.TrackWorkerPool] + if (workerPoolTrackingSettings.enableTracking) { + EventLoopGroupTracker.track( params[param.WorkerPool].eventLoopGroup, - workerPoolExecutionDelayTrackingSettings.trackingTaskPeriod, - workerPoolExecutionDelayTrackingSettings.threadDumpThreshold, + workerPoolTrackingSettings.trackingTaskPeriod, + workerPoolTrackingSettings.threadDumpThreshold, params[Stats].statsReceiver, s"finagle/netty-4/delayTracking/${boundAddress}", Logger.get("com.twitter.finagle.netty4.Netty4Listener.threadDelay") diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/param/TrackWorkerPoolExcutionDelay.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/param/TrackWorkerPoolExcutionDelay.scala index 50c9b3bb67..0e295c986a 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/param/TrackWorkerPoolExcutionDelay.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/param/TrackWorkerPoolExcutionDelay.scala @@ -4,31 +4,31 @@ import com.twitter.finagle.Stack import com.twitter.util.Duration /** - * Control for tracking execution delay in the worker threads for a listener. This is intended - * to be enabled for perf tracking, and may impact performance as it adds tracking runnables to - * the event executors. Stats will be written to the stats receiver for the listener under - * workerpool/deviation_ms. When thread dumping is enabled, all logging is done at the warning - * level. + * Control for tracking execution delay, cpu time, and active sockets in the worker threads for a + * listener. This is intended to be enabled for perf tracking, and may impact performance as it adds + * tracking runnables to the event executors. Stats will be written to the stats receiver for the + * listener under workerpool/deviation_ms. When thread dumping is enabled, all logging is done at + * the warning level. * - * @param enableTracking If true enable thread pause tracking. + * @param enableTracking If true enable thread tracking. * @param trackingTaskPeriod The fixed time scheduling window for the execution delay runnable. * @param threadDumpThreshold If > 0ms, enable stack dumping of threads when they have been delayed for * more than the threshold. Thresholds of < 10ms will not work as * expected as the underlying executors do not use high resolution timers. */ -case class TrackWorkerPoolExecutionDelay( +case class TrackWorkerPool( enableTracking: Boolean, trackingTaskPeriod: Duration, threadDumpThreshold: Duration) { - def mk(): (TrackWorkerPoolExecutionDelay, Stack.Param[TrackWorkerPoolExecutionDelay]) = - (this, TrackWorkerPoolExecutionDelay.trackWorkerPoolExecutionDelayParam) + def mk(): (TrackWorkerPool, Stack.Param[TrackWorkerPool]) = + (this, TrackWorkerPool.trackWorkerPoolParam) } -object TrackWorkerPoolExecutionDelay { - implicit val trackWorkerPoolExecutionDelayParam: Stack.Param[TrackWorkerPoolExecutionDelay] = - Stack.Param[TrackWorkerPoolExecutionDelay]( - TrackWorkerPoolExecutionDelay( +object TrackWorkerPool { + implicit val trackWorkerPoolParam: Stack.Param[TrackWorkerPool] = + Stack.Param[TrackWorkerPool]( + TrackWorkerPool( false, Duration.fromMilliseconds(0), Duration.fromMilliseconds(0) diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTracker.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTracker.scala similarity index 87% rename from finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTracker.scala rename to finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTracker.scala index 62b713c4ed..a2398cf574 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTracker.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTracker.scala @@ -8,7 +8,7 @@ import io.netty.channel.EventLoopGroup import java.util.concurrent.ScheduledThreadPoolExecutor import scala.reflect.internal.util.WeakHashSet -object EventLoopGroupExecutionDelayTracker { +object EventLoopGroupTracker { private[threading] val trackedEventLoopGroups = new WeakHashSet[EventLoopGroup]() @@ -19,8 +19,8 @@ object EventLoopGroupExecutionDelayTracker { * instrumentation. * * @param nettyEventLoopGroup The netty EventLoopGroup for which thread delays should be captured - * @param injectionPeriod The fixed delay for the runnables added to the EventLoopGroup threads to - * capture thread execution delays. + * @param trackingTaskPeriod The fixed delay for the runnables added to the EventLoopGroup threads to + * capture thread tracking information. * @param dumpThreshold If > 0ms log seen delay for threads and the stack trace for threads at * the when the threads exceed the dumpThreshold delay. * @param statsReceiver The stats receiver under which execution delay stats should be reported. @@ -29,7 +29,7 @@ object EventLoopGroupExecutionDelayTracker { */ def track( nettyEventLoopGroup: EventLoopGroup, - injectionPeriod: Duration, + trackingTaskPeriod: Duration, dumpThreshold: Duration, statsReceiver: StatsReceiver, dumpThreadPoolName: String, @@ -50,10 +50,11 @@ object EventLoopGroupExecutionDelayTracker { val stat = statsReceiver.stat("workerpool", "deviation_ms") while (workerIter.hasNext) { val loop = workerIter.next() - new EventLoopGroupExecutionDelayTrackingRunnable( + new EventLoopGroupTrackingRunnable( loop, - injectionPeriod, + trackingTaskPeriod, stat, + statsReceiver, dumpThreshold, dumpThresholdExceededThreadPool, logger diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTrackingRunnable.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala similarity index 56% rename from finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTrackingRunnable.scala rename to finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala index e87ca6edad..26c0ee9a4a 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTrackingRunnable.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala @@ -1,15 +1,23 @@ package com.twitter.finagle.netty4.threading import com.twitter.finagle.stats.Stat +import com.twitter.finagle.stats.StatsReceiver import com.twitter.logging.Logger -import com.twitter.util.{Duration, Time} +import com.twitter.util.Duration +import com.twitter.util.Time +import io.netty.channel.SingleThreadEventLoop import io.netty.util.concurrent.EventExecutor -import java.util.concurrent.{Callable, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit} +import java.lang.management.ManagementFactory +import java.util.concurrent.Callable +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.ScheduledThreadPoolExecutor +import java.util.concurrent.TimeUnit -private[threading] class EventLoopGroupExecutionDelayTrackingRunnable( - eventExecutor: EventExecutor, - injectionPeriod: Duration, +private[threading] class EventLoopGroupTrackingRunnable( + executor: EventExecutor, + taskTrackingPeriod: Duration, delayStat: Stat, + statsReceiver: StatsReceiver, threadDumpThreshold: Duration, dumpWatchThreadPool: Option[ScheduledThreadPoolExecutor], dumpLogger: Logger) @@ -22,10 +30,10 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable( // the one thread in the executor. This is currently how netty is implemented // but this class will stop working if netty changes their implementation private[this] val executorThread: Thread = { - if (eventExecutor.inEventLoop()) { + if (executor.inEventLoop()) { Thread.currentThread() } else { - eventExecutor + executor .submit(new Callable[Thread] { override def call(): Thread = { Thread.currentThread() @@ -34,15 +42,26 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable( } } + private[this] val threadId = executorThread.getId private[this] val threadName: String = executorThread.getName + private[this] var scheduledExecutionTime: Time = Time.now private[this] var watchTask: Option[ScheduledFuture[_]] = None + private[this] val threadMXBean = ManagementFactory.getThreadMXBean + + private[this] val scopedStatsReceiver = statsReceiver.scope(threadName) + private[this] val activeSocketsStat = scopedStatsReceiver.stat("active_sockets") + private[this] val cpuTimeCounter = scopedStatsReceiver.counter("cpu_time_ms") + + // Accessed only from within the same netty thread + private[this] var prevCPUTimeMs = 0L + setWatchTask() - eventExecutor.scheduleWithFixedDelay( + executor.scheduleWithFixedDelay( this, 0, - injectionPeriod.inMillis, + taskTrackingPeriod.inMillis, java.util.concurrent.TimeUnit.MILLISECONDS ) @@ -57,12 +76,26 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable( dumpLogger.warning( s"THREAD: $threadName EXECUTION DELAY is greater than ${threadDumpThreshold.inMillis}ms, was ${executionDelay.inMillis}ms" ) - } delayStat.add(executionDelay.inMillis) - scheduledExecutionTime = Time.now.plus(injectionPeriod) + scheduledExecutionTime = Time.now.plus(taskTrackingPeriod) setWatchTask() + + var numActiveSockets = 0 + // This will be nio event loop or epoll event loop. + executor.asInstanceOf[SingleThreadEventLoop].registeredChannelsIterator().forEachRemaining { + channel => + if (channel.isActive) { + numActiveSockets += 1 + } + } + activeSocketsStat.add(numActiveSockets) + + // `getThreadCPUTime` returns the time in nanoseconds. + val currentCPUTimeMs = threadMXBean.getThreadCpuTime(threadId) / 1000000 + cpuTimeCounter.incr(currentCPUTimeMs - prevCPUTimeMs) + prevCPUTimeMs = currentCPUTimeMs } private[this] def setWatchTask(): Unit = { @@ -71,7 +104,7 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable( dumpWatchThreadPool.get.schedule( new Runnable { override def run(): Unit = { - var builder = new StringBuilder() + val builder = new StringBuilder() builder .append( s"THREAD: $threadName EXECUTION DELAY exceeded configured dump threshold. Thread stack trace:\n" @@ -80,7 +113,7 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable( dumpLogger.warning(builder.toString()) } }, - (injectionPeriod + threadDumpThreshold).inMillis, + (taskTrackingPeriod + threadDumpThreshold).inMillis, TimeUnit.MILLISECONDS ) ) diff --git a/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTrackerTest.scala b/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala similarity index 79% rename from finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTrackerTest.scala rename to finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala index 634b85d87f..e555d7d809 100644 --- a/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTrackerTest.scala +++ b/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala @@ -18,14 +18,14 @@ import org.scalatestplus.mockito.MockitoSugar import scala.collection.JavaConverters._ import org.scalatest.funsuite.AnyFunSuite -class EventLoopGroupExecutionDelayTrackerTest +class EventLoopGroupTrackerTest extends AnyFunSuite with Eventually with IntegrationPatience with MockitoSugar { test( - "EventLoopGroupExecutionDelayTracker with thread dump disabled records stats but no threads created and no logging" + "EventLoopGroupTracker with thread dump disabled records stats but no threads created and no logging" ) { val statsReceiver = new InMemoryStatsReceiver @@ -37,7 +37,7 @@ class EventLoopGroupExecutionDelayTrackerTest val eventLoopGroup = new NioEventLoopGroup(1, executor) - EventLoopGroupExecutionDelayTracker.track( + EventLoopGroupTracker.track( eventLoopGroup, Duration.fromMilliseconds(50), Duration.Zero, @@ -55,8 +55,14 @@ class EventLoopGroupExecutionDelayTrackerTest // Force ourselves to wait Thread.sleep(300) - // we should have deviation stats + // we should have deviation, cpu time, and active sockets stats assert(statsReceiver.stats.get(Seq("workerpool", "deviation_ms")).isDefined) + assert( + statsReceiver.counters + .get(Seq("finagle_thread_delay_tracking_test-1", "cpu_time_ms")).isDefined) + assert( + statsReceiver.stats + .get(Seq("finagle_thread_delay_tracking_test-1", "active_sockets")).isDefined) // we should have no threads with the name no_threads_expected Thread.getAllStackTraces.keySet().asScala.foreach { thread: Thread => @@ -68,7 +74,7 @@ class EventLoopGroupExecutionDelayTrackerTest } test( - "EventLoopGroupExecutionDelayTracker with thread dump enabled records stats creates watch threads and logs dumps" + "EventLoopGroupTracker with thread dump enabled records stats creates watch threads and logs dumps" ) { val statsReceiver = new InMemoryStatsReceiver @@ -80,7 +86,7 @@ class EventLoopGroupExecutionDelayTrackerTest val eventLoopGroup = new NioEventLoopGroup(1, executor) - EventLoopGroupExecutionDelayTracker.track( + EventLoopGroupTracker.track( eventLoopGroup, Duration.fromMilliseconds(50), Duration.fromMilliseconds(10), @@ -115,10 +121,10 @@ class EventLoopGroupExecutionDelayTrackerTest } test( - "validate EventLoopGroupExecutionDelayTracker track guards against multiple submissions of the same EventLoopGroup" + "validate EventLoopGroupTracker track guards against multiple submissions of the same EventLoopGroup" ) { // clear our tracking set first as other tests added to the set - EventLoopGroupExecutionDelayTracker.trackedEventLoopGroups.clear() + EventLoopGroupTracker.trackedEventLoopGroups.clear() val statsReceiver = new InMemoryStatsReceiver @@ -131,7 +137,7 @@ class EventLoopGroupExecutionDelayTrackerTest val eventLoopGroup = new NioEventLoopGroup(1, executor) val eventLoopGroup2 = new NioEventLoopGroup(1, executor) - EventLoopGroupExecutionDelayTracker.track( + EventLoopGroupTracker.track( eventLoopGroup, Duration.fromMilliseconds(50), Duration.Zero, @@ -139,9 +145,9 @@ class EventLoopGroupExecutionDelayTrackerTest "execution_delay_test_pool", mockLogger ) - assert(EventLoopGroupExecutionDelayTracker.trackedEventLoopGroups.size == 1) + assert(EventLoopGroupTracker.trackedEventLoopGroups.size == 1) - EventLoopGroupExecutionDelayTracker.track( + EventLoopGroupTracker.track( eventLoopGroup2, Duration.fromMilliseconds(50), Duration.Zero, @@ -149,9 +155,9 @@ class EventLoopGroupExecutionDelayTrackerTest "execution_delay_test_pool", mockLogger ) - assert(EventLoopGroupExecutionDelayTracker.trackedEventLoopGroups.size == 2) + assert(EventLoopGroupTracker.trackedEventLoopGroups.size == 2) - EventLoopGroupExecutionDelayTracker.track( + EventLoopGroupTracker.track( eventLoopGroup, Duration.fromMilliseconds(50), Duration.Zero, @@ -159,7 +165,7 @@ class EventLoopGroupExecutionDelayTrackerTest "execution_delay_test_pool", mockLogger ) - assert(EventLoopGroupExecutionDelayTracker.trackedEventLoopGroups.size == 2) + assert(EventLoopGroupTracker.trackedEventLoopGroups.size == 2) } }