Skip to content

Commit

Permalink
clb1734 bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Nov 27, 2024
1 parent e642197 commit 3bf75a2
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.celeborn.common.metrics.source

import java.util.{Map => JMap}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

import com.codahale.metrics._
Expand Down Expand Up @@ -59,8 +61,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String)

val metricsCapacity: Int = conf.metricsCapacity

val innerMetrics: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]()

val timerSupplier = new TimerSupplier(metricsSlidingWindowSize)

val metricsCleaner: ScheduledExecutorService =
Expand All @@ -79,11 +79,28 @@ abstract class AbstractSource(conf: CelebornConf, role: String)

val applicationLabel = "applicationId"

protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] =
JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]()
val atomicSortNum = new AtomicInteger(0)

val timerMetricsMap: ConcurrentHashMap[String, Int] =
JavaUtils.newConcurrentHashMap[String, Int]()

protected val namedGauges: ConcurrentHashMap[String, (Int, NamedGauge[_])] =
JavaUtils.newConcurrentHashMap[String, (Int, NamedGauge[_])]()

protected val namedTimers
: ConcurrentHashMap[String, (Int, NamedTimer, ConcurrentHashMap[String, Long])] =
JavaUtils.newConcurrentHashMap[String, (Int, NamedTimer, ConcurrentHashMap[String, Long])]()

protected val namedCounters: ConcurrentHashMap[String, (Int, NamedCounter)] =
JavaUtils.newConcurrentHashMap[String, (Int, NamedCounter)]()

protected val namedMeters: ConcurrentHashMap[String, NamedMeter] =
JavaUtils.newConcurrentHashMap[String, NamedMeter]()
protected val namedMeters: ConcurrentHashMap[String, (Int, NamedMeter)] =
JavaUtils.newConcurrentHashMap[String, (Int, NamedMeter)]()

def addTimerMetricsMap(namedTimer: NamedTimer): Unit = {
val timerMetrics = getTimerMetrics(namedTimer)
timerMetricsMap.putIfAbsent(timerMetrics, atomicSortNum.getAndIncrement())
}

def addGauge[T](
name: String,
Expand All @@ -93,7 +110,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
if (gauge.getValue.isInstanceOf[Number]) {
namedGauges.putIfAbsent(
metricNameWithCustomizedLabels(name, labels),
NamedGauge(name, gauge, labels ++ staticLabels))
(atomicSortNum.getAndIncrement(), NamedGauge(name, gauge, labels ++ staticLabels)))
} else {
logWarning(
s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number")
Expand Down Expand Up @@ -124,7 +141,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
meter: Meter): Unit = {
namedMeters.putIfAbsent(
metricNameWithCustomizedLabels(name, labels),
NamedMeter(name, meter, labels ++ staticLabels))
(atomicSortNum.getAndIncrement(), NamedMeter(name, meter, labels ++ staticLabels)))
}

def addMeter(
Expand All @@ -145,10 +162,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
addMeter(name, Map.empty[String, String], meter)
}

protected val namedTimers
: ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])] =
JavaUtils.newConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])]()

def addTimer(name: String): Unit = addTimer(name, Map.empty[String, String])

def addTimer(name: String, labels: Map[String, String]): Unit = {
Expand All @@ -161,40 +174,43 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
metricRegistry.timer(metricNameWithLabel, timerSupplier),
labels ++ staticLabels)
val values = JavaUtils.newConcurrentHashMap[String, Long]()
(namedTimer, values)
(atomicSortNum.getAndIncrement(), namedTimer, values)
})
}

protected val namedCounters: ConcurrentHashMap[String, NamedCounter] =
JavaUtils.newConcurrentHashMap[String, NamedCounter]()

def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String])

def addCounter(name: String, labels: Map[String, String]): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
namedCounters.putIfAbsent(
metricNameWithLabel,
NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels))
(
atomicSortNum.getAndIncrement(),
NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels)))
}

def counters(): List[NamedCounter] = {
namedCounters.values().asScala.toList
namedCounters.values().asScala.toList.sortBy(_._1).map(_._2)
}

def gauges(): List[NamedGauge[_]] = {
namedGauges.values().asScala.toList
namedGauges.values().asScala.toList.sortBy(_._1).map(_._2)
}

def meters(): List[NamedMeter] = {
namedMeters.values().asScala.toList
namedMeters.values().asScala.toList.sortBy(_._1).map(_._2)
}

def histograms(): List[NamedHistogram] = {
List.empty[NamedHistogram]
}

def timers(): List[NamedTimer] = {
namedTimers.values().asScala.toList.map(_._1)
namedTimers.values().asScala.toList.sortBy(_._1).map(_._2)
}

def timerMetrics(): List[String] = {
timerMetricsMap.asScala.toList.sortBy(_._2).map(_._1)
}

def gaugeExists(name: String, labels: Map[String, String]): Boolean = {
Expand Down Expand Up @@ -267,7 +283,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels)
val pair = namedTimers.get(metricNameWithLabel)
if (pair != null) {
pair._2.put(key, System.nanoTime())
pair._3.put(key, System.nanoTime())
} else {
logWarning(s"Metric $metricNameWithLabel not found!")
}
Expand All @@ -276,13 +292,13 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
protected def doStopTimer(metricsName: String, key: String, labels: Map[String, String]): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels)
try {
val (namedTimer, map) = namedTimers.get(metricNameWithLabel)
val (_, namedTimer, map) = namedTimers.get(metricNameWithLabel)
val startTime = Option(map.remove(key))
startTime match {
case Some(t) =>
namedTimer.timer.update(System.nanoTime() - t, TimeUnit.NANOSECONDS)
if (namedTimer.timer.getCount % metricsSlidingWindowSize == 0) {
recordTimer(namedTimer)
addTimerMetricsMap(namedTimer)
}
case None =>
}
Expand All @@ -301,7 +317,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
if (!namedTimers.containsKey(metricNameWithLabel)) {
addTimer(metricsName, labels)
}
val (namedTimer, _) = namedTimers.get(metricNameWithLabel)
val (_, namedTimer, _) = namedTimers.get(metricNameWithLabel)
namedTimer.timer.update(value, TimeUnit.NANOSECONDS)
}

Expand All @@ -315,7 +331,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)

def incCounter(metricsName: String, incV: Long, labels: Map[String, String]): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels)
val counter = namedCounters.get(metricNameWithLabel)
val (_, counter) = namedCounters.get(metricNameWithLabel)
if (counter != null) {
counter.counter.inc(incV)
} else {
Expand All @@ -341,37 +357,28 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
protected def startCleaner(): Unit = {
val cleanTask: Runnable = new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
namedTimers.values.asScala.toArray.map(_._2).foreach(clearOldValues)
namedTimers.values.asScala.toArray.map(_._3).foreach(clearOldValues)
}
}
metricsCleaner.scheduleWithFixedDelay(cleanTask, 10, 10, TimeUnit.MINUTES)
}

private def updateInnerMetrics(str: String): Unit = {
innerMetrics.synchronized {
if (innerMetrics.size() >= metricsCapacity) {
innerMetrics.remove()
}
innerMetrics.offer(str)
}
}

def recordCounter(nc: NamedCounter): Unit = {
def getCounterMetrics(nc: NamedCounter): String = {
val timestamp = System.currentTimeMillis
val label = nc.labelString
updateInnerMetrics(s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} $timestamp\n")
val str = s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} $timestamp\n"
str
}

def recordGauge(ng: NamedGauge[_]): Unit = {
def getGaugeMetrics(ng: NamedGauge[_]): String = {
val timestamp = System.currentTimeMillis
val sb = new StringBuilder
val label = ng.labelString
sb.append(s"${normalizeKey(ng.name)}Value$label ${ng.gauge.getValue} $timestamp\n")

updateInnerMetrics(sb.toString())
sb.toString()
}

def recordMeter(nm: NamedMeter): Unit = {
def getMeterMetrics(nm: NamedMeter): String = {
val timestamp = System.currentTimeMillis
val sb = new StringBuilder
val label = nm.labelString
Expand All @@ -383,11 +390,10 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
s"${normalizeKey(nm.name)}FiveMinuteRate$label ${nm.meter.getFiveMinuteRate} $timestamp\n")
sb.append(
s"${normalizeKey(nm.name)}FifteenMinuteRate$label ${nm.meter.getFifteenMinuteRate} $timestamp\n")

updateInnerMetrics(sb.toString())
sb.toString()
}

def recordHistogram(nh: NamedHistogram): Unit = {
def getHistogramMetrics(nh: NamedHistogram): String = {
val timestamp = System.currentTimeMillis
val sb = new mutable.StringBuilder
val snapshot = nh.histogram.getSnapshot
Expand All @@ -409,11 +415,10 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n")
sb.append(s"${prefix}999thPercentile$label" +
s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n")

updateInnerMetrics(sb.toString())
sb.toString()
}

def recordTimer(nt: NamedTimer): Unit = {
def getTimerMetrics(nt: NamedTimer): String = {
val timestamp = System.currentTimeMillis
val sb = new mutable.StringBuilder
val snapshot = nt.timer.getSnapshot
Expand All @@ -435,32 +440,53 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n")
sb.append(s"${prefix}999thPercentile$label" +
s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n")

updateInnerMetrics(sb.toString())
sb.toString()
}

override def getMetrics(): String = {
innerMetrics.synchronized {
counters().foreach(c => recordCounter(c))
gauges().foreach(g => recordGauge(g))
meters().foreach(m => recordMeter(m))
histograms().foreach(h => {
recordHistogram(h)
var leftMetricsNum = metricsCapacity
val metricsSnapshot = ArrayBuffer[String]()
leftMetricsNum = fillInnerMetricsSnapshot(timerMetrics(), leftMetricsNum, metricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, metricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, metricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, metricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, metricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, metricsSnapshot)
val sb = new mutable.StringBuilder
metricsSnapshot.foreach(metric => sb.append(metric))
if (leftMetricsNum <= 0) {
logWarning("The number of metrics exceed the output metrics strings capacity!")
}
sb.toString()
}

private def fillInnerMetricsSnapshot(
metricList: List[AnyRef],
leftNum: Int,
metricsSnapshot: ArrayBuffer[String]): Int = {
if (leftNum <= 0) {
return 0
}
val addList = metricList.take(leftNum)
addList.foreach {
case c: NamedCounter =>
metricsSnapshot += getCounterMetrics(c)
case g: NamedGauge[_] =>
metricsSnapshot += getGaugeMetrics(g)
case m: NamedMeter =>
metricsSnapshot += getMeterMetrics(m)
case h: NamedHistogram =>
metricsSnapshot += getHistogramMetrics(h)
h.asInstanceOf[CelebornHistogram].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
})
timers().foreach(t => {
recordTimer(t)
case t: NamedTimer =>
metricsSnapshot += getTimerMetrics(t)
t.timer.asInstanceOf[CelebornTimer].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
})
val sb = new mutable.StringBuilder
while (!innerMetrics.isEmpty) {
sb.append(innerMetrics.poll())
}
innerMetrics.clear()
sb.toString()
case s =>
metricsSnapshot += s.toString
}
leftNum - addList.size
}

override def destroy(): Unit = {
Expand All @@ -469,7 +495,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
namedGauges.clear()
namedMeters.clear()
namedTimers.clear()
innerMetrics.clear()
timerMetricsMap.clear()
atomicSortNum.set(0)
metricRegistry.removeMatching(new MetricFilter {
override def matches(s: String, metric: Metric): Boolean = true
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER)

def getCounterCount(metricsName: String): Long = {
val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, Map.empty)
namedCounters.get(metricNameWithLabel).counter.getCount
namedCounters.get(metricNameWithLabel)._2.counter.getCount
}

def connectionActive(client: TransportClient): Unit = {
Expand Down

0 comments on commit 3bf75a2

Please sign in to comment.