diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e6ae18b8ef..4d02b9560c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,8 @@ Runtime Behavior Changes Added README in integration tests noting that this must exist for integration tests to run. ``PHAB_ID=D1152235`` * finagle-netty4: `EventLoopGroupTracker` (previously named `EventLoopGroupExecutionDelayTracker`) now collects stats cpu_time_ms and active_sockets per netty worker thread. +* finagle-netty4: `EventLoopGroupTracker` now collects the distribution of cpu utilization by each netty thread + and all_sockets instead of active_sockets. ``PHAB_ID=D1177719`` New Features diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala index 26c0ee9a4a..55416540f2 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala @@ -1,5 +1,8 @@ package com.twitter.finagle.netty4.threading +import com.twitter.finagle.stats.HistogramFormat +import com.twitter.finagle.stats.MetricBuilder +import com.twitter.finagle.stats.MetricBuilder.HistogramType import com.twitter.finagle.stats.Stat import com.twitter.finagle.stats.StatsReceiver import com.twitter.logging.Logger @@ -51,11 +54,35 @@ private[threading] class EventLoopGroupTrackingRunnable( private[this] val threadMXBean = ManagementFactory.getThreadMXBean private[this] val scopedStatsReceiver = statsReceiver.scope(threadName) - private[this] val activeSocketsStat = scopedStatsReceiver.stat("active_sockets") + private[this] val pendingTasksStat = scopedStatsReceiver.stat( + MetricBuilder( + metricType = HistogramType, + name = Seq("pending_tasks"), + percentiles = Array[Double](0.25, 0.50, 0.75, 0.90, 0.95, 0.99), + histogramFormat = HistogramFormat.FullSummary + ) + ) + private[this] val allSocketsStat = scopedStatsReceiver.stat( + MetricBuilder( + metricType = HistogramType, + name = Seq("all_sockets"), + percentiles = Array[Double](0.50), + histogramFormat = HistogramFormat.FullSummary + ) + ) private[this] val cpuTimeCounter = scopedStatsReceiver.counter("cpu_time_ms") + private[this] val cpuUtilStat = scopedStatsReceiver.stat( + MetricBuilder( + metricType = HistogramType, + name = Seq("cpu_util"), + percentiles = Array[Double](0.25, 0.50, 0.75, 0.90, 0.95, 0.99), + histogramFormat = HistogramFormat.FullSummary + ) + ) // Accessed only from within the same netty thread - private[this] var prevCPUTimeMs = 0L + private[this] var prevCPUTimeNs = 0L + private[this] var prevWallTimeNs = 0L setWatchTask() executor.scheduleWithFixedDelay( @@ -71,7 +98,8 @@ private[threading] class EventLoopGroupTrackingRunnable( watchTask.get.cancel(false) } - val executionDelay = Time.now - scheduledExecutionTime + val now = Time.now + val executionDelay = now - scheduledExecutionTime if (threadDumpEnabled && executionDelay.inMillis > threadDumpThreshold.inMillis) { dumpLogger.warning( s"THREAD: $threadName EXECUTION DELAY is greater than ${threadDumpThreshold.inMillis}ms, was ${executionDelay.inMillis}ms" @@ -79,23 +107,28 @@ private[threading] class EventLoopGroupTrackingRunnable( } delayStat.add(executionDelay.inMillis) - scheduledExecutionTime = Time.now.plus(taskTrackingPeriod) + scheduledExecutionTime = now.plus(taskTrackingPeriod) setWatchTask() - var numActiveSockets = 0 // This will be nio event loop or epoll event loop. - executor.asInstanceOf[SingleThreadEventLoop].registeredChannelsIterator().forEachRemaining { - channel => - if (channel.isActive) { - numActiveSockets += 1 - } - } - activeSocketsStat.add(numActiveSockets) + val loop = executor.asInstanceOf[SingleThreadEventLoop] + allSocketsStat.add(loop.registeredChannels()) + pendingTasksStat.add(loop.pendingTasks()) // `getThreadCPUTime` returns the time in nanoseconds. - val currentCPUTimeMs = threadMXBean.getThreadCpuTime(threadId) / 1000000 - cpuTimeCounter.incr(currentCPUTimeMs - prevCPUTimeMs) - prevCPUTimeMs = currentCPUTimeMs + val currCPUTimeNs = threadMXBean.getThreadCpuTime(threadId) + val cpuTime = currCPUTimeNs - prevCPUTimeNs + + val currWallTimeNs = System.nanoTime() + val wallTimeNs = currWallTimeNs - prevWallTimeNs + cpuTimeCounter.incr(TimeUnit.NANOSECONDS.toMillis(cpuTime)) + if (prevWallTimeNs != 0 && wallTimeNs != 0) { + cpuUtilStat.add( + 10000 * cpuTime / wallTimeNs + ) + } + prevCPUTimeNs = currCPUTimeNs + prevWallTimeNs = currWallTimeNs } private[this] def setWatchTask(): Unit = { diff --git a/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala b/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala index e555d7d809..b28ec0fdd6 100644 --- a/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala +++ b/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala @@ -62,7 +62,7 @@ class EventLoopGroupTrackerTest .get(Seq("finagle_thread_delay_tracking_test-1", "cpu_time_ms")).isDefined) assert( statsReceiver.stats - .get(Seq("finagle_thread_delay_tracking_test-1", "active_sockets")).isDefined) + .get(Seq("finagle_thread_delay_tracking_test-1", "all_sockets")).isDefined) // we should have no threads with the name no_threads_expected Thread.getAllStackTraces.keySet().asScala.foreach { thread: Thread =>