Skip to content

Commit

Permalink
Migrate to structured logging
Browse files Browse the repository at this point in the history
  • Loading branch information
zeotuan committed May 23, 2024
1 parent 3631a04 commit 85a560f
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 82 deletions.
14 changes: 14 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ object LogKeys {
case object DATA_FILE_NUM extends LogKey
case object DATA_SOURCE extends LogKey
case object DATA_SOURCES extends LogKey
case object DECOMMISSION_INFO extends LogKey
case object DEFAULT_COMPACTION_INTERVAL extends LogKey
case object DEFAULT_COMPACT_INTERVAL extends LogKey
case object DEFAULT_ISOLATION_LEVEL extends LogKey
Expand Down Expand Up @@ -260,6 +261,7 @@ object LogKeys {
case object HASH_JOIN_KEYS extends LogKey
case object HASH_MAP_SIZE extends LogKey
case object HAS_R_PACKAGE extends LogKey
case object HEADING extends LogKey
case object HEARTBEAT extends LogKey
case object HEARTBEAT_INTERVAL extends LogKey
case object HISTORY_DIR extends LogKey
Expand Down Expand Up @@ -331,6 +333,7 @@ object LogKeys {
case object LOGICAL_PLAN_LEAVES extends LogKey
case object LOG_ID extends LogKey
case object LOG_KEY_FILE extends LogKey
case object LOG_LEVEL extends LogKey
case object LOG_OFFSET extends LogKey
case object LOG_TYPE extends LogKey
case object LOWER_BOUND extends LogKey
Expand Down Expand Up @@ -374,6 +377,7 @@ object LogKeys {
case object MINI_BATCH_FRACTION extends LogKey
case object MIN_COMPACTION_BATCH_ID extends LogKey
case object MIN_FREQUENT_PATTERN_COUNT extends LogKey
case object MIN_REGISTER_RATIO extends LogKey
case object MIN_POINT_PER_CLUSTER extends LogKey
case object MIN_SHARE extends LogKey
case object MIN_SIZE extends LogKey
Expand All @@ -391,6 +395,7 @@ object LogKeys {
case object NEW_VALUE extends LogKey
case object NEXT_RENEWAL_TIME extends LogKey
case object NODES extends LogKey
case object NODE_IDS extends LogKey
case object NODE_LOCATION extends LogKey
case object NON_BUILT_IN_CONNECTORS extends LogKey
case object NORM extends LogKey
Expand Down Expand Up @@ -421,6 +426,7 @@ object LogKeys {
case object NUM_ELEMENTS_SPILL_THRESHOLD extends LogKey
case object NUM_EVENTS extends LogKey
case object NUM_EXAMPLES extends LogKey
case object NUM_EXECUTORS extends LogKey
case object NUM_EXECUTOR_CORES extends LogKey
case object NUM_EXECUTOR_CORES_REMAINING extends LogKey
case object NUM_EXECUTOR_CORES_TOTAL extends LogKey
Expand All @@ -431,6 +437,7 @@ object LogKeys {
case object NUM_FILES_FAILED_TO_DELETE extends LogKey
case object NUM_FILES_REUSED extends LogKey
case object NUM_FREQUENT_ITEMS extends LogKey
case object NUM_GRACEFULLY_DECOMMISSIONED extends LogKey
case object NUM_INDEX_FILES extends LogKey
case object NUM_ITERATIONS extends LogKey
case object NUM_LEADING_SINGULAR_VALUES extends LogKey
Expand Down Expand Up @@ -469,6 +476,8 @@ object LogKeys {
case object NUM_SUB_DIRS extends LogKey
case object NUM_TASKS extends LogKey
case object NUM_TASK_CPUS extends LogKey
case object NUM_UNFINISHED_DECOMMISSION extends LogKey
case object NUM_UNEXPECTEDLY_EXIT extends LogKey
case object NUM_VERSIONS_RETAIN extends LogKey
case object OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD extends LogKey
case object OBJECT_ID extends LogKey
Expand Down Expand Up @@ -506,6 +515,7 @@ object LogKeys {
case object PATHS extends LogKey
case object PEER extends LogKey
case object PERCENT extends LogKey
case object PERCENTILE_HEADER extends LogKey
case object PIPELINE_STAGE_UID extends LogKey
case object PLUGIN_NAME extends LogKey
case object POD_COUNT extends LogKey
Expand Down Expand Up @@ -541,6 +551,7 @@ object LogKeys {
case object PYTHON_WORKER_MODULE extends LogKey
case object PYTHON_WORKER_RESPONSE extends LogKey
case object QUERY_CACHE_VALUE extends LogKey
case object QUANTILES extends LogKey
case object QUERY_HINT extends LogKey
case object QUERY_ID extends LogKey
case object QUERY_PLAN extends LogKey
Expand Down Expand Up @@ -660,12 +671,14 @@ object LogKeys {
case object STAGE_NAME extends LogKey
case object STAGE_NAME2 extends LogKey
case object STAGE_PARENTS extends LogKey
case object STAGE_STATUS extends LogKey
case object START_INDEX extends LogKey
case object STATEMENT_ID extends LogKey
case object STATE_STORE_ID extends LogKey
case object STATE_STORE_PROVIDER extends LogKey
case object STATE_STORE_VERSION extends LogKey
case object STATUS extends LogKey
case object STAT_COUNTER extends LogKey
case object STDERR extends LogKey
case object STOP_SITE_SHORT_FORM extends LogKey
case object STORAGE_LEVEL extends LogKey
Expand Down Expand Up @@ -700,6 +713,7 @@ object LogKeys {
case object TASK_NAME extends LogKey
case object TASK_REQUIREMENTS extends LogKey
case object TASK_RESOURCES extends LogKey
case object TASK_SET_MANAGER_NAME extends LogKey
case object TASK_SET_NAME extends LogKey
case object TASK_STATE extends LogKey
case object TEMP_FILE extends LogKey
Expand Down
40 changes: 21 additions & 19 deletions core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ private[scheduler] class HealthTracker (
val execsToInclude = executorIdToExcludedStatus.filter(_._2.expiryTime < now).keys
if (execsToInclude.nonEmpty) {
// Include any executors that have been excluded longer than the excludeOnFailure timeout.
logInfo(s"Removing executors $execsToInclude from exclude list because the " +
s"the executors have reached the timed out")
logInfo(log"Removing executors ${MDC(EXECUTOR_IDS, execsToInclude)} from exclude" +
log" list because the executors have reached the timed out")
execsToInclude.foreach { exec =>
val status = executorIdToExcludedStatus.remove(exec).get
val failedExecsOnNode = nodeToExcludedExecs(status.node)
Expand All @@ -128,8 +128,8 @@ private[scheduler] class HealthTracker (
val nodesToInclude = nodeIdToExcludedExpiryTime.filter(_._2 < now).keys
if (nodesToInclude.nonEmpty) {
// Include any nodes that have been excluded longer than the excludeOnFailure timeout.
logInfo(s"Removing nodes $nodesToInclude from exclude list because the " +
s"nodes have reached has timed out")
logInfo(log"Removing nodes ${MDC(NODE_IDS, nodesToInclude)} from exclude" +
log" list because the nodes have reached has timed out")
nodesToInclude.foreach { node =>
nodeIdToExcludedExpiryTime.remove(node)
// post both to keep backwards compatibility
Expand Down Expand Up @@ -158,23 +158,23 @@ private[scheduler] class HealthTracker (

private def killExecutor(exec: String, msg: String): Unit = {
val fullMsg = if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
s"${msg} (actually decommissioning)"
log"${MDC(MESSAGE, msg)} (actually decommissioning)"
} else {
msg
log"$MDC(MESSAGE, msg)"
}
allocationClient match {
case Some(a) =>
logInfo(fullMsg)
if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
a.decommissionExecutor(exec, ExecutorDecommissionInfo(fullMsg),
a.decommissionExecutor(exec, ExecutorDecommissionInfo(fullMsg.message),
adjustTargetNumExecutors = false)
} else {
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
force = true)
}
case None =>
logInfo(s"Not attempting to kill excluded executor id $exec " +
s"since allocation client is not defined.")
logInfo(log"Not attempting to kill excluded executor id ${MDC(EXECUTOR_ID, exec)} " +
log"since allocation client is not defined.")
}
}

Expand All @@ -196,14 +196,14 @@ private[scheduler] class HealthTracker (
allocationClient match {
case Some(a) =>
if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
logInfo(s"Decommissioning all executors on excluded host $node " +
s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
logInfo(log"Decommissioning all executors on excluded host ${MDC(HOST, node)} " +
log"since ${MDC(KEY, config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key)} is set.")
if (!a.decommissionExecutorsOnHost(node)) {
logError(log"Decommissioning executors on ${MDC(HOST, node)} failed.")
}
} else {
logInfo(s"Killing all executors on excluded host $node " +
s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
logInfo(log"Killing all executors on excluded host ${MDC(HOST, node)} " +
log"since ${MDC(KEY, config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key)} is set.")
if (!a.killExecutorsOnHost(node)) {
logError(log"Killing executors on node ${MDC(HOST, node)} failed.")
}
Expand Down Expand Up @@ -231,7 +231,8 @@ private[scheduler] class HealthTracker (

if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
if (!nodeIdToExcludedExpiryTime.contains(host)) {
logInfo(s"excluding node $host due to fetch failure of external shuffle service")
logInfo(log"excluding node ${MDC(HOST, host)} due to fetch failure of" +
log" external shuffle service")

nodeIdToExcludedExpiryTime.put(host, expiryTimeForNewExcludes)
// post both to keep backwards compatibility
Expand All @@ -242,7 +243,7 @@ private[scheduler] class HealthTracker (
updateNextExpiryTime()
}
} else if (!executorIdToExcludedStatus.contains(exec)) {
logInfo(s"Excluding executor $exec due to fetch failure")
logInfo(log"Excluding executor ${MDC(EXECUTOR_ID, exec)} due to fetch failure")

executorIdToExcludedStatus.put(exec, ExcludedExecutor(host, expiryTimeForNewExcludes))
// We hardcoded number of failure tasks to 1 for fetch failure, because there's no
Expand Down Expand Up @@ -280,8 +281,8 @@ private[scheduler] class HealthTracker (
// some of the logic around expiry times a little more confusing. But it also wouldn't be a
// problem to re-exclude, with a later expiry time.
if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToExcludedStatus.contains(exec)) {
logInfo(s"Excluding executor id: $exec because it has $newTotal" +
s" task failures in successful task sets")
logInfo(log"Excluding executor id: ${MDC(EXECUTOR_ID, exec)} because" +
log" it has ${MDC(NUM_FAILURES, newTotal)} task failures in successful task sets")
val node = failuresInTaskSet.node
executorIdToExcludedStatus.put(exec, ExcludedExecutor(node, expiryTimeForNewExcludes))
// post both to keep backwards compatibility
Expand All @@ -299,8 +300,9 @@ private[scheduler] class HealthTracker (
// time.
if (excludedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE &&
!nodeIdToExcludedExpiryTime.contains(node)) {
logInfo(s"Excluding node $node because it has ${excludedExecsOnNode.size} " +
s"executors excluded: ${excludedExecsOnNode}")
logInfo(log"Excluding node ${MDC(HOST, node)} because it has" +
log" ${MDC(NUM_EXECUTORS, excludedExecsOnNode.size)} " +
log"executors excluded: ${MDC(EXECUTOR_IDS, excludedExecsOnNode)}")
nodeIdToExcludedExpiryTime.put(node, expiryTimeForNewExcludes)
// post both to keep backwards compatibility
listenerBus.post(SparkListenerNodeBlacklisted(now, node, excludedExecsOnNode.size))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.scheduler
import scala.collection.mutable

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.util.{RpcUtils, ThreadUtils}

Expand Down Expand Up @@ -124,7 +125,7 @@ private[spark] class OutputCommitCoordinator(
stageStates.get(stage) match {
case Some(state) =>
require(state.authorizedCommitters.length == maxPartitionId + 1)
logInfo(s"Reusing state from previous attempt of stage $stage.")
logInfo(log"Reusing state from previous attempt of stage ${MDC(STAGE, stage)}.")

case _ =>
stageStates(stage) = new StageState(maxPartitionId + 1)
Expand All @@ -151,8 +152,10 @@ private[spark] class OutputCommitCoordinator(
case Success =>
// The task output has been committed successfully
case _: TaskCommitDenied =>
logInfo(s"Task was denied committing, stage: $stage.$stageAttempt, " +
s"partition: $partition, attempt: $attemptNumber")
logInfo(log"Task was denied committing, stage:" +
log" ${MDC(STAGE, stage)}.${MDC(STAGE_ATTEMPT, stageAttempt)}, " +
log"partition: ${MDC(PARTITION_ID, partition)}," +
log" attempt: ${MDC(STAGE_ATTEMPT_NUMBER, attemptNumber)}")
case _ =>
// Mark the attempt as failed to exclude from future commit protocol
val taskId = TaskIdentifier(stageAttempt, attemptNumber)
Expand Down Expand Up @@ -182,8 +185,10 @@ private[spark] class OutputCommitCoordinator(
attemptNumber: Int): Boolean = synchronized {
stageStates.get(stage) match {
case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) =>
logInfo(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " +
s"task attempt $attemptNumber already marked as failed.")
logInfo(log"Commit denied for" +
log" stage=${MDC(STAGE, stage)}.${MDC(STAGE_ATTEMPT, stageAttempt)}," +
log" partition=${MDC(PARTITION_ID, partition)}: " +
log"task attempt ${MDC(STAGE_ATTEMPT_NUMBER, attemptNumber)} already marked as failed.")
false
case Some(state) =>
val existing = state.authorizedCommitters(partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,23 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext
fileData = schedulerAllocFile.map { f =>
val filePath = new Path(f)
val fis = filePath.getFileSystem(sc.hadoopConfiguration).open(filePath)
logInfo(s"Creating Fair Scheduler pools from $f")
logInfo(log"Creating Fair Scheduler pools from ${MDC(PATH, f)}")
Some((fis, f))
}.getOrElse {
val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
if (is != null) {
logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE")
logInfo(log"Creating Fair Scheduler pools from default file:" +
log" ${MDC(PATH, DEFAULT_SCHEDULER_FILE)}")
Some((is, DEFAULT_SCHEDULER_FILE))
} else {
val schedulingMode = SchedulingMode.withName(sc.conf.get(SCHEDULER_MODE))
rootPool.addSchedulable(new Pool(
DEFAULT_POOL_NAME, schedulingMode, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
logInfo("Fair scheduler configuration not found, created default pool: " +
"%s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, schedulingMode, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
logInfo(log"Fair scheduler configuration not found, created default pool: " +
log"${MDC(POOL_NAME, DEFAULT_POOL_NAME)}," +
log" schedulingMode: ${MDC(XML_SCHEDULING_MODE, schedulingMode)}," +
log" minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}," +
log" weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}")
None
}
}
Expand Down Expand Up @@ -122,8 +125,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Created default pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
logInfo(log"Created default pool: ${MDC(POOL_NAME, DEFAULT_POOL_NAME)}," +
log" schedulingMode: ${MDC(XML_SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}," +
log" minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}," +
log" weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}")
}
}

Expand All @@ -142,8 +147,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext

rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))

logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
logInfo(log"Created pool: ${MDC(POOL_NAME, poolName)}," +
log" schedulingMode: ${MDC(XML_SCHEDULING_MODE, schedulingMode)}," +
log" minShare: ${MDC(MIN_SHARE, minShare)}," +
log" weight: ${MDC(WEIGHT, weight)}")
}
}

Expand Down Expand Up @@ -220,6 +227,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext
log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}")
}
parentPool.addSchedulable(manager)
logInfo("Added task set " + manager.name + " tasks to pool " + poolName)
logInfo(log"Added task set ${MDC(TASK_SET_MANAGER_NAME, manager.name)} tasks" +
log" to pool ${MDC(POOL_NAME, poolName)}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import scala.collection.mutable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.util.{Distribution, Utils}


Expand All @@ -46,7 +47,7 @@ class StatsReportListener extends SparkListener with Logging {

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
implicit val sc = stageCompleted
this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}")
this.logInfo(log"Finished stage: " + getStatusDetail(stageCompleted.stageInfo))
showMillisDistribution("task runtime:", (info, _) => info.duration, taskInfoMetrics.toSeq)

// Shuffle write
Expand All @@ -73,15 +74,18 @@ class StatsReportListener extends SparkListener with Logging {
taskInfoMetrics.clear()
}

private def getStatusDetail(info: StageInfo): String = {
private def getStatusDetail(info: StageInfo): MessageWithContext = {
val failureReason = info.failureReason.map("(" + _ + ")").getOrElse("")
val timeTaken = info.submissionTime.map(
x => info.completionTime.getOrElse(System.currentTimeMillis()) - x
).getOrElse("-")

s"Stage(${info.stageId}, ${info.attemptNumber()}); Name: '${info.name}'; " +
s"Status: ${info.getStatusString}$failureReason; numTasks: ${info.numTasks}; " +
s"Took: $timeTaken msec"
log"Stage(${MDC(STAGE_ID, info.stageId)}," +
log" ${MDC(STAGE_ATTEMPT_NUMBER, info.attemptNumber())});" +
log" Name: '${MDC(STAGE_NAME, info.name)}'; " +
log"Status: ${MDC(STAGE_STATUS, info.getStatusString)} " +
log"${MDC(REASON, failureReason)}; numTasks: ${MDC(NUM_TASKS, info.numTasks)}; " +
log"Took: ${MDC(TOTAL_TIME, timeTaken)} msec"
}

}
Expand Down Expand Up @@ -111,9 +115,9 @@ private[spark] object StatsReportListener extends Logging {
def showDistribution(heading: String, d: Distribution, formatNumber: Double => String): Unit = {
val stats = d.statCounter
val quantiles = d.getQuantiles(probabilities).map(formatNumber)
logInfo(heading + stats)
logInfo(percentilesHeader)
logInfo("\t" + quantiles.mkString("\t"))
logInfo(log"${MDC(HEADING, heading)}${MDC(STAT_COUNTER, stats)}")
logInfo(log"${MDC(PERCENTILE_HEADER, percentilesHeader)}")
logInfo(log"\t${MDC(QUANTILES, quantiles.mkString("\t"))}")
}

def showDistribution(
Expand Down
Loading

0 comments on commit 85a560f

Please sign in to comment.