From a15adeb3a215ad2ef7222e18112d23cdffa8569a Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Tue, 7 May 2024 17:35:35 -0700 Subject: [PATCH] [SPARK-47240][CORE][PART1] Migrate logInfo with variables to structured logging framework The PR aims to migrate `logInfo` in Core module with variables to structured logging framework. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46362 from zeotuan/coreInfo. Lead-authored-by: Tuan Pham Co-authored-by: Gengliang Wang Signed-off-by: Gengliang Wang --- .../org/apache/spark/internal/LogKey.scala | 48 ++++++++++++++++- .../org/apache/spark/BarrierCoordinator.scala | 17 +++--- .../org/apache/spark/BarrierTaskContext.scala | 35 ++++++------ .../spark/ExecutorAllocationManager.scala | 13 +++-- .../org/apache/spark/MapOutputTracker.scala | 48 +++++++++++------ .../scala/org/apache/spark/SparkContext.scala | 54 ++++++++++++------- .../spark/api/python/PythonHadoopUtil.scala | 2 +- .../apache/spark/api/python/PythonRDD.scala | 6 ++- .../spark/api/python/PythonRunner.scala | 2 +- .../apache/spark/api/python/PythonUtils.scala | 7 +-- .../api/python/StreamingPythonRunner.scala | 12 +++-- .../org/apache/spark/deploy/Client.scala | 18 ++++--- .../spark/deploy/ExternalShuffleService.scala | 10 ++-- .../spark/rdd/ReliableCheckpointRDD.scala | 9 ++-- .../spark/rdd/ReliableRDDCheckpointData.scala | 6 ++- .../org/apache/spark/ui/JettyUtils.scala | 7 ++- .../scala/org/apache/spark/ui/SparkUI.scala | 4 +- .../org/apache/spark/util/ListenerBus.scala | 7 +-- .../org/apache/spark/util/SignalUtils.scala | 2 +- 19 files changed, 205 insertions(+), 102 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index c127f9c3d1f90..14e822c6349f3 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -26,13 +26,15 @@ trait LogKey { } /** - * Various keys used for mapped diagnostic contexts(MDC) in logging. - * All structured logging keys should be defined here for standardization. + * Various keys used for mapped diagnostic contexts(MDC) in logging. All structured logging keys + * should be defined here for standardization. */ object LogKeys { case object ACCUMULATOR_ID extends LogKey + case object ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE extends LogKey case object ACTUAL_NUM_FILES extends LogKey case object ACTUAL_PARTITION_COLUMN extends LogKey + case object ADDED_JARS extends LogKey case object AGGREGATE_FUNCTIONS extends LogKey case object ALPHA extends LogKey case object ANALYSIS_ERROR extends LogKey @@ -42,7 +44,10 @@ object LogKeys { case object APP_NAME extends LogKey case object APP_STATE extends LogKey case object ARGS extends LogKey + case object AUTH_ENABLED extends LogKey case object BACKUP_FILE extends LogKey + case object BARRIER_EPOCH extends LogKey + case object BARRIER_ID extends LogKey case object BATCH_ID extends LogKey case object BATCH_NAME extends LogKey case object BATCH_TIMESTAMP extends LogKey @@ -55,6 +60,7 @@ object LogKeys { case object BOOT extends LogKey case object BROADCAST extends LogKey case object BROADCAST_ID extends LogKey + case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey case object BUCKET extends LogKey case object BYTECODE_SIZE extends LogKey case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey @@ -62,6 +68,7 @@ object LogKeys { case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey case object CALL_SITE_LONG_FORM extends LogKey + case object CALL_SITE_SHORT_FORM extends LogKey case object CATALOG_NAME extends LogKey case object CATEGORICAL_FEATURES extends LogKey case object CHECKPOINT_FILE extends LogKey @@ -142,11 +149,13 @@ object LogKeys { case object DEPRECATED_KEY extends LogKey case object DESCRIPTION extends LogKey case object DESIRED_NUM_PARTITIONS extends LogKey + case object DESTINATION_PATH extends LogKey case object DFS_FILE extends LogKey case object DIFF_DELTA extends LogKey case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey case object DRIVER_ID extends LogKey case object DRIVER_LIBRARY_PATH_KEY extends LogKey + case object DRIVER_STATE extends LogKey case object DROPPED_PARTITIONS extends LogKey case object DURATION extends LogKey case object EARLIEST_LOADED_VERSION extends LogKey @@ -161,6 +170,7 @@ object LogKeys { case object ERROR extends LogKey case object ESTIMATOR_PARAMETER_MAP extends LogKey case object EVALUATED_FILTERS extends LogKey + case object EVENT extends LogKey case object EVENT_LOG_DESTINATION extends LogKey case object EVENT_LOOP extends LogKey case object EVENT_NAME extends LogKey @@ -184,8 +194,10 @@ object LogKeys { case object EXECUTOR_TIMEOUT extends LogKey case object EXEC_AMOUNT extends LogKey case object EXISTING_FILE extends LogKey + case object EXISTING_JARS extends LogKey case object EXISTING_PATH extends LogKey case object EXIT_CODE extends LogKey + case object EXPECTED_ANSWER extends LogKey case object EXPECTED_NUM_FILES extends LogKey case object EXPECTED_PARTITION_COLUMN extends LogKey case object EXPIRY_TIMESTAMP extends LogKey @@ -211,6 +223,7 @@ object LogKeys { case object FILTER extends LogKey case object FILTERS extends LogKey case object FINAL_CONTEXT extends LogKey + case object FINAL_OUTPUT_PATH extends LogKey case object FINAL_PATH extends LogKey case object FINISH_TRIGGER_DURATION extends LogKey case object FROM_OFFSET extends LogKey @@ -252,6 +265,7 @@ object LogKeys { case object IS_NETWORK_REQUEST_DONE extends LogKey case object JAR_ENTRY extends LogKey case object JAR_MESSAGE extends LogKey + case object JAVA_VERSION extends LogKey case object JOB_ID extends LogKey case object JOIN_CONDITION extends LogKey case object JOIN_CONDITION_SUB_EXPR extends LogKey @@ -280,6 +294,7 @@ object LogKeys { case object LOGICAL_PLAN_COLUMNS extends LogKey case object LOGICAL_PLAN_LEAVES extends LogKey case object LOG_ID extends LogKey + case object LOG_KEY_FILE extends LogKey case object LOG_OFFSET extends LogKey case object LOG_TYPE extends LogKey case object LOWER_BOUND extends LogKey @@ -326,13 +341,16 @@ object LogKeys { case object NEW_FEATURE_COLUMN_NAME extends LogKey case object NEW_LABEL_COLUMN_NAME extends LogKey case object NEW_PATH extends LogKey + case object NEW_RDD_ID extends LogKey case object NEW_STATE extends LogKey case object NEW_VALUE extends LogKey case object NODES extends LogKey case object NODE_LOCATION extends LogKey case object NON_BUILT_IN_CONNECTORS extends LogKey case object NORM extends LogKey + case object NUM_ADDED_MASTERS extends LogKey case object NUM_ADDED_PARTITIONS extends LogKey + case object NUM_ADDED_WORKERS extends LogKey case object NUM_BIN extends LogKey case object NUM_BYTES extends LogKey case object NUM_BYTES_CURRENT extends LogKey @@ -373,6 +391,7 @@ object LogKeys { case object NUM_PRUNED extends LogKey case object NUM_REPLICAS extends LogKey case object NUM_REQUESTS extends LogKey + case object NUM_REQUEST_SYNC_TASK extends LogKey case object NUM_RESOURCE_SLOTS extends LogKey case object NUM_RETRIES extends LogKey case object NUM_RIGHT_PARTITION_VALUES extends LogKey @@ -397,7 +416,12 @@ object LogKeys { case object OPTIONS extends LogKey case object OP_ID extends LogKey case object OP_TYPE extends LogKey + case object OS_ARCH extends LogKey + case object OS_NAME extends LogKey + case object OS_VERSION extends LogKey case object OUTPUT extends LogKey + case object OUTPUT_LINE extends LogKey + case object OUTPUT_LINE_NUMBER extends LogKey case object OVERHEAD_MEMORY_SIZE extends LogKey case object PAGE_SIZE extends LogKey case object PARSE_MODE extends LogKey @@ -438,6 +462,9 @@ object LogKeys { case object PUSHED_FILTERS extends LogKey case object PVC_METADATA_NAME extends LogKey case object PYTHON_EXEC extends LogKey + case object PYTHON_VERSION extends LogKey + case object PYTHON_WORKER_MODULE extends LogKey + case object PYTHON_WORKER_RESPONSE extends LogKey case object QUERY_CACHE_VALUE extends LogKey case object QUERY_HINT extends LogKey case object QUERY_ID extends LogKey @@ -447,6 +474,8 @@ object LogKeys { case object QUERY_PLAN_LENGTH_MAX extends LogKey case object QUERY_RUN_ID extends LogKey case object RANGE extends LogKey + case object RDD_CHECKPOINT_DIR extends LogKey + case object RDD_DEBUG_STRING extends LogKey case object RDD_DESCRIPTION extends LogKey case object RDD_ID extends LogKey case object READ_LIMIT extends LogKey @@ -466,6 +495,7 @@ object LogKeys { case object REMOTE_ADDRESS extends LogKey case object REMOVE_FROM_MASTER extends LogKey case object REPORT_DETAILS extends LogKey + case object REQUESTER_SIZE extends LogKey case object RESOURCE extends LogKey case object RESOURCE_NAME extends LogKey case object RESOURCE_PROFILE_ID extends LogKey @@ -489,13 +519,18 @@ object LogKeys { case object SCHEDULER_POOL_NAME extends LogKey case object SCHEMA extends LogKey case object SCHEMA2 extends LogKey + case object SERIALIZE_OUTPUT_LENGTH extends LogKey + case object SERVER_NAME extends LogKey case object SERVICE_NAME extends LogKey + case object SERVLET_CONTEXT_HANDLER_PATH extends LogKey case object SESSION_HOLD_INFO extends LogKey case object SESSION_ID extends LogKey case object SESSION_KEY extends LogKey case object SHARD_ID extends LogKey case object SHELL_COMMAND extends LogKey case object SHUFFLE_BLOCK_INFO extends LogKey + case object SHUFFLE_DB_BACKEND_KEY extends LogKey + case object SHUFFLE_DB_BACKEND_NAME extends LogKey case object SHUFFLE_ID extends LogKey case object SHUFFLE_MERGE_ID extends LogKey case object SHUFFLE_SERVICE_NAME extends LogKey @@ -506,8 +541,10 @@ object LogKeys { case object SMALLEST_CLUSTER_INDEX extends LogKey case object SNAPSHOT_VERSION extends LogKey case object SOCKET_ADDRESS extends LogKey + case object SOURCE_PATH extends LogKey case object SPARK_DATA_STREAM extends LogKey case object SPARK_PLAN_ID extends LogKey + case object SPARK_VERSION extends LogKey case object SPILL_TIMES extends LogKey case object SQL_TEXT extends LogKey case object SRC_PATH extends LogKey @@ -520,6 +557,7 @@ object LogKeys { case object STATE_STORE_VERSION extends LogKey case object STATUS extends LogKey case object STDERR extends LogKey + case object STOP_SITE_SHORT_FORM extends LogKey case object STORAGE_LEVEL extends LogKey case object STORAGE_LEVEL_DESERIALIZED extends LogKey case object STORAGE_LEVEL_REPLICATION extends LogKey @@ -535,11 +573,14 @@ object LogKeys { case object STREAMING_WRITE extends LogKey case object STREAM_ID extends LogKey case object STREAM_NAME extends LogKey + case object STREAM_SOURCE extends LogKey case object SUBMISSION_ID extends LogKey case object SUBSAMPLING_RATE extends LogKey case object SUB_QUERY extends LogKey case object TABLE_NAME extends LogKey case object TABLE_TYPES extends LogKey + case object TARGET_NUM_EXECUTOR extends LogKey + case object TARGET_NUM_EXECUTOR_DELTA extends LogKey case object TARGET_PATH extends LogKey case object TASK_ATTEMPT_ID extends LogKey case object TASK_ID extends LogKey @@ -548,6 +589,7 @@ object LogKeys { case object TASK_SET_NAME extends LogKey case object TASK_STATE extends LogKey case object TEMP_FILE extends LogKey + case object TEMP_OUTPUT_PATH extends LogKey case object TEMP_PATH extends LogKey case object TEST_SIZE extends LogKey case object THREAD extends LogKey @@ -557,6 +599,7 @@ object LogKeys { case object TIME extends LogKey case object TIMEOUT extends LogKey case object TIMER extends LogKey + case object TIMESTAMP extends LogKey case object TIME_UNITS extends LogKey case object TIP extends LogKey case object TOKEN_KIND extends LogKey @@ -598,6 +641,7 @@ object LogKeys { case object WAIT_SEND_TIME extends LogKey case object WAIT_TIME extends LogKey case object WATERMARK_CONSTRAINT extends LogKey + case object WEB_URL extends LogKey case object WEIGHT extends LogKey case object WEIGHTED_NUM extends LogKey case object WORKER extends LogKey diff --git a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala index 942242107e22f..adce6c3f5ffdb 100644 --- a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala @@ -23,7 +23,8 @@ import java.util.function.Consumer import scala.collection.mutable.{ArrayBuffer, HashSet} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys._ import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} import org.apache.spark.util.ThreadUtils @@ -161,7 +162,8 @@ private[spark] class BarrierCoordinator( s"${request.numTasks} from Task $taskId, previously it was $numTasks.") // Check whether the epoch from the barrier tasks matches current barrierEpoch. - logInfo(s"Current barrier epoch for $barrierId is $barrierEpoch.") + logInfo(log"Current barrier epoch for ${MDC(BARRIER_ID, barrierId)}" + + log" is ${MDC(BARRIER_EPOCH, barrierEpoch)}.") if (epoch != barrierEpoch) { requester.sendFailure(new SparkException(s"The request to sync of $barrierId with " + s"barrier epoch $barrierEpoch has already finished. Maybe task $taskId is not " + @@ -176,14 +178,17 @@ private[spark] class BarrierCoordinator( // Add the requester to array of RPCCallContexts pending for reply. requesters += requester messages(request.partitionId) = request.message - logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received update from Task " + - s"$taskId, current progress: ${requesters.size}/$numTasks.") + logInfo(log"Barrier sync epoch ${MDC(BARRIER_EPOCH, barrierEpoch)}" + + log" from ${MDC(BARRIER_ID, barrierId)} received update from Task" + + log" ${MDC(TASK_ID, taskId)}, current progress:" + + log" ${MDC(REQUESTER_SIZE, requesters.size)}/${MDC(NUM_REQUEST_SYNC_TASK, numTasks)}.") if (requesters.size == numTasks) { requesters.foreach(_.reply(messages.clone())) // Finished current barrier() call successfully, clean up ContextBarrierState and // increase the barrier epoch. - logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received all updates from " + - s"tasks, finished successfully.") + logInfo(log"Barrier sync epoch ${MDC(BARRIER_EPOCH, barrierEpoch)}" + + log" from ${MDC(BARRIER_ID, barrierId)} received all updates from" + + log" tasks, finished successfully.") barrierEpoch += 1 requesters.clear() requestMethods.clear() diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index e083ece918b63..c8d6000cd6282 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -26,7 +26,8 @@ import scala.util.{Failure, Success => ScalaSuccess, Try} import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} +import org.apache.spark.internal.LogKeys._ import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.source.Source import org.apache.spark.resource.ResourceInformation @@ -56,19 +57,27 @@ class BarrierTaskContext private[spark] ( // with the driver side epoch. private var barrierEpoch = 0 + private def logProgressInfo(msg: MessageWithContext, startTime: Option[Long]): Unit = { + val waitMsg = startTime.fold(log"")(st => log", waited " + + log"for ${MDC(TOTAL_TIME, System.currentTimeMillis() - st)} ms,") + logInfo(log"Task ${MDC(TASK_ATTEMPT_ID, taskAttemptId())}" + + log" from Stage ${MDC(STAGE_ID, stageId())}" + + log"(Attempt ${MDC(STAGE_ATTEMPT, stageAttemptNumber())}) " + + msg + waitMsg + + log" current barrier epoch is ${MDC(BARRIER_EPOCH, barrierEpoch)}.") + } + private def runBarrier(message: String, requestMethod: RequestMethod.Value): Array[String] = { - logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt ${stageAttemptNumber()}) " + - s"has entered the global sync, current barrier epoch is $barrierEpoch.") + logProgressInfo(log"has entered the global sync", None) logTrace("Current callSite: " + Utils.getCallSite()) val startTime = System.currentTimeMillis() val timerTask = new TimerTask { override def run(): Unit = { - logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt " + - s"${stageAttemptNumber()}) waiting " + - s"under the global sync since $startTime, has been waiting for " + - s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " + - s"current barrier epoch is $barrierEpoch.") + logProgressInfo( + log"waiting under the global sync since ${MDC(TIME, startTime)}", + Some(startTime) + ) } } // Log the update of global sync every 1 minute. @@ -104,17 +113,11 @@ class BarrierTaskContext private[spark] ( val messages = abortableRpcFuture.future.value.get.get barrierEpoch += 1 - logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt ${stageAttemptNumber()}) " + - s"finished global sync successfully, waited for " + - s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " + - s"current barrier epoch is $barrierEpoch.") + logProgressInfo(log"finished global sync successfully", Some(startTime)) messages } catch { case e: SparkException => - logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt " + - s"${stageAttemptNumber()}) failed to perform global sync, waited for " + - s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " + - s"current barrier epoch is $barrierEpoch.") + logProgressInfo(log"failed to perform global sync", Some(startTime)) throw e } finally { timerTask.cancel() diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index d156d74b23168..3bfa1ae0d4dc1 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -446,10 +446,12 @@ private[spark] class ExecutorAllocationManager( val delta = targetNum.delta totalDelta += delta if (delta > 0) { - val executorsString = "executor" + { if (delta > 1) "s" else "" } - logInfo(s"Requesting $delta new $executorsString because tasks are backlogged " + - s"(new desired total will be ${numExecutorsTargetPerResourceProfileId(rpId)} " + - s"for resource profile id: ${rpId})") + val executorsString = log" new executor" + { if (delta > 1) log"s" else log"" } + logInfo(log"Requesting ${MDC(TARGET_NUM_EXECUTOR_DELTA, delta)}" + + executorsString + log" because tasks are backlogged " + + log"(new desired total will be" + + log" ${MDC(TARGET_NUM_EXECUTOR, numExecutorsTargetPerResourceProfileId(rpId))} " + + log"for resource profile id: ${MDC(RESOURCE_PROFILE_ID, rpId)})") numExecutorsToAddPerResourceProfileId(rpId) = if (delta == numExecutorsToAddPerResourceProfileId(rpId)) { numExecutorsToAddPerResourceProfileId(rpId) * 2 @@ -604,7 +606,8 @@ private[spark] class ExecutorAllocationManager( } else { executorMonitor.executorsKilled(executorsRemoved.toSeq) } - logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.") + logInfo(log"Executors ${MDC(EXECUTOR_IDS, executorsRemoved.mkString(","))}" + + log"removed due to idle timeout.") executorsRemoved.toSeq } else { logWarning(log"Unable to reach the cluster manager to kill executor/s " + diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 6f3a354a2879d..fdc2b0a4c20f0 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -34,7 +34,7 @@ import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOut import org.roaringbitmap.RoaringBitmap import org.apache.spark.broadcast.{Broadcast, BroadcastManager} -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec @@ -188,7 +188,8 @@ private class ShuffleStatus( val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_)) mapStatusOpt match { case Some(mapStatus) => - logInfo(s"Updating map output for ${mapId} to ${bmAddress}") + logInfo(log"Updating map output for ${MDC(MAP_ID, mapId)}" + + log" to ${MDC(BLOCK_MANAGER_ID, bmAddress)}") mapStatus.updateLocation(bmAddress) invalidateSerializedMapOutputStatusCache() case None => @@ -200,7 +201,8 @@ private class ShuffleStatus( _numAvailableMapOutputs += 1 invalidateSerializedMapOutputStatusCache() mapStatusesDeleted(index) = null - logInfo(s"Recover ${mapStatus.mapId} ${mapStatus.location}") + logInfo(log"Recover ${MDC(MAP_ID, mapStatus.mapId)}" + + log" ${MDC(BLOCK_MANAGER_ID, mapStatus.location)}") } else { logWarning(log"Asked to update map output ${MDC(MAP_ID, mapId)} " + log"for untracked map status.") @@ -490,20 +492,24 @@ private[spark] class MapOutputTrackerMasterEndpoint( logDebug("init") // force eager creation of logger + private def logInfoMsg(msg: MessageWithContext, shuffleId: Int, context: RpcCallContext): Unit = { + val hostPort = context.senderAddress.hostPort + logInfo(log"Asked to send " + + msg + + log" locations for shuffle ${MDC(SHUFFLE_ID, shuffleId)} to ${MDC(HOST_PORT, hostPort)}") + } + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case GetMapOutputStatuses(shuffleId: Int) => - val hostPort = context.senderAddress.hostPort - logInfo(s"Asked to send map output locations for shuffle $shuffleId to $hostPort") + logInfoMsg(log"map output", shuffleId, context) tracker.post(GetMapOutputMessage(shuffleId, context)) case GetMapAndMergeResultStatuses(shuffleId: Int) => - val hostPort = context.senderAddress.hostPort - logInfo(s"Asked to send map/merge result locations for shuffle $shuffleId to $hostPort") + logInfoMsg(log"map/merge result", shuffleId, context) tracker.post(GetMapAndMergeOutputMessage(shuffleId, context)) case GetShufflePushMergerLocations(shuffleId: Int) => - logInfo(s"Asked to send shuffle push merger locations for shuffle" + - s" $shuffleId to ${context.senderAddress.hostPort}") + logInfoMsg(log"shuffle push merger", shuffleId, context) tracker.post(GetShufflePushMergersMessage(shuffleId, context)) case StopMapOutputTracker => @@ -1422,13 +1428,15 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr val mergeOutputStatuses = mergeStatuses.get(shuffleId).orNull if (mapOutputStatuses == null || mergeOutputStatuses == null) { - logInfo("Don't have map/merge outputs for shuffle " + shuffleId + ", fetching them") + logInfo(log"Don't have map/merge outputs for" + + log" shuffle ${MDC(SHUFFLE_ID, shuffleId)}, fetching them") val startTimeNs = System.nanoTime() fetchingLock.withLock(shuffleId) { var fetchedMapStatuses = mapStatuses.get(shuffleId).orNull var fetchedMergeStatuses = mergeStatuses.get(shuffleId).orNull if (fetchedMapStatuses == null || fetchedMergeStatuses == null) { - logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint) + logInfo(log"Doing the fetch; tracker endpoint = " + + log"${MDC(RPC_ENDPOINT_REF, trackerEndpoint)}") val fetchedBytes = askTracker[(Array[Byte], Array[Byte])](GetMapAndMergeResultStatuses(shuffleId)) try { @@ -1456,12 +1464,14 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } else { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { - logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") + logInfo(log"Don't have map outputs for shuffle ${MDC(SHUFFLE_ID, shuffleId)}," + + log" fetching them") val startTimeNs = System.nanoTime() fetchingLock.withLock(shuffleId) { var fetchedStatuses = mapStatuses.get(shuffleId).orNull if (fetchedStatuses == null) { - logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint) + logInfo(log"Doing the fetch; tracker endpoint =" + + log" ${MDC(RPC_ENDPOINT_REF, trackerEndpoint)}") val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId)) try { fetchedStatuses = @@ -1500,7 +1510,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr def updateEpoch(newEpoch: Long): Unit = { epochLock.synchronized { if (newEpoch > epoch) { - logInfo("Updating epoch to " + newEpoch + " and clearing cache") + logInfo(log"Updating epoch to ${MDC(EPOCH, newEpoch)} and clearing cache") epoch = newEpoch mapStatuses.clear() mergeStatuses.clear() @@ -1561,7 +1571,9 @@ private[spark] object MapOutputTracker extends Logging { oos.close() } val outArr = out.toByteArray - logInfo("Broadcast outputstatuses size = " + outArr.length + ", actual size = " + arrSize) + logInfo(log"Broadcast outputstatuses size = " + + log"${MDC(BROADCAST_OUTPUT_STATUS_SIZE, outArr.length)}," + + log" actual size = ${MDC(BROADCAST_OUTPUT_STATUS_SIZE, arrSize)}") (outArr, bcast) } else { (chunkedByteBuf.toArray, null) @@ -1594,8 +1606,10 @@ private[spark] object MapOutputTracker extends Logging { try { // deserialize the Broadcast, pull .value array out of it, and then deserialize that val bcast = deserializeObject(in).asInstanceOf[Broadcast[Array[Array[Byte]]]] - logInfo("Broadcast outputstatuses size = " + bytes.length + - ", actual size = " + bcast.value.foldLeft(0L)(_ + _.length)) + val actualSize = bcast.value.foldLeft(0L)(_ + _.length) + logInfo(log"Broadcast outputstatuses size =" + + log" ${MDC(BROADCAST_OUTPUT_STATUS_SIZE, bytes.length)}" + + log", actual size = ${MDC(BROADCAST_OUTPUT_STATUS_SIZE, actualSize)}") val bcastIn = new ChunkedByteBuffer(bcast.value.map(ByteBuffer.wrap)).toInputStream() // Important - ignore the DIRECT tag ! Start from offset 1 bcastIn.skip(1) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 385f9cbc6cf4f..0dbac45fd7f99 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -199,10 +199,11 @@ class SparkContext(config: SparkConf) extends Logging { this(master, appName, sparkHome, jars, Map()) // log out Spark Version in Spark driver log - logInfo(s"Running Spark version $SPARK_VERSION") - logInfo(s"OS info ${System.getProperty("os.name")}, ${System.getProperty("os.version")}, " + - s"${System.getProperty("os.arch")}") - logInfo(s"Java version ${System.getProperty("java.version")}") + logInfo(log"Running Spark version ${MDC(LogKeys.SPARK_VERSION, SPARK_VERSION)}") + logInfo(log"OS info ${MDC(LogKeys.OS_NAME, System.getProperty("os.name"))}," + + log" ${MDC(LogKeys.OS_VERSION, System.getProperty("os.version"))}, " + + log"${MDC(LogKeys.OS_ARCH, System.getProperty("os.arch"))}") + logInfo(log"Java version ${MDC(LogKeys.JAVA_VERSION, System.getProperty("java.version"))}") /* ------------------------------------------------------------------------------------- * | Private variables. These variables keep the internal state of the context, and are | @@ -439,7 +440,7 @@ class SparkContext(config: SparkConf) extends Logging { logResourceInfo(SPARK_DRIVER_PREFIX, _resources) // log out spark.app.name in the Spark driver logs - logInfo(s"Submitted application: $appName") + logInfo(log"Submitted application: ${MDC(LogKeys.APP_NAME, appName)}") // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { @@ -448,7 +449,7 @@ class SparkContext(config: SparkConf) extends Logging { } if (_conf.getBoolean("spark.logConf", false)) { - logInfo("Spark configuration:\n" + _conf.toDebugString) + logInfo(log"Spark configuration:\n${MDC(LogKeys.CONFIG, _conf.toDebugString)}") } // Set Spark driver host and port system properties. This explicitly sets the configuration @@ -1704,7 +1705,8 @@ class SparkContext(config: SparkConf) extends Logging { "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.") val bc = env.broadcastManager.newBroadcast[T](value, isLocal, serializedOnly) val callSite = getCallSite() - logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) + logInfo(log"Created broadcast ${MDC(LogKeys.BROADCAST_ID, bc.id)}" + + log" from ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}") cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc } @@ -1833,7 +1835,8 @@ class SparkContext(config: SparkConf) extends Logging { addedFiles .getOrElseUpdate(jobArtifactUUID, new ConcurrentHashMap[String, Long]().asScala) .putIfAbsent(key, timestamp).isEmpty) { - logInfo(s"Added file $path at $key with timestamp $timestamp") + logInfo(log"Added file ${MDC(LogKeys.PATH, path)} at ${MDC(LogKeys.KEY, key)} with" + + log" timestamp ${MDC(LogKeys.TIMESTAMP, timestamp)}") // Fetch the file locally so that closures which are run on the driver can still use the // SparkFiles API to access files. Utils.fetchFile(uri.toString, root, conf, hadoopConfiguration, timestamp, useCache = false) @@ -1845,7 +1848,8 @@ class SparkContext(config: SparkConf) extends Logging { .putIfAbsent( Utils.getUriBuilder(new URI(key)).fragment(uri.getFragment).build().toString, timestamp).isEmpty) { - logInfo(s"Added archive $path at $key with timestamp $timestamp") + logInfo(log"Added archive ${MDC(LogKeys.PATH, path)} at ${MDC(LogKeys.KEY, key)}" + + log" with timestamp ${MDC(LogKeys.TIMESTAMP, timestamp)}") // If the scheme is file, use URI to simply copy instead of downloading. val uriToUse = if (!isLocal && scheme == "file") uri else new URI(key) val uriToDownload = Utils.getUriBuilder(uriToUse).fragment(null).build() @@ -1855,7 +1859,9 @@ class SparkContext(config: SparkConf) extends Logging { root, if (uri.getFragment != null) uri.getFragment else source.getName) logInfo( - s"Unpacking an archive $path from ${source.getAbsolutePath} to ${dest.getAbsolutePath}") + log"Unpacking an archive ${MDC(LogKeys.PATH, path)}" + + log" from ${MDC(LogKeys.SOURCE_PATH, source.getAbsolutePath)}" + + log" to ${MDC(LogKeys.DESTINATION_PATH, dest.getAbsolutePath)}") Utils.deleteRecursively(dest) Utils.unpack(source, dest) postEnvironmentUpdate() @@ -2216,8 +2222,14 @@ class SparkContext(config: SparkConf) extends Logging { .getOrElseUpdate(jobArtifactUUID, new ConcurrentHashMap[String, Long]().asScala) .putIfAbsent(_, timestamp).isEmpty) if (added.nonEmpty) { - val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI" - logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with timestamp $timestamp") + val jarMessage = if (scheme != "ivy") { + log"Added JAR" + } else { + log"Added dependency jars of Ivy URI" + } + logInfo(jarMessage + log" ${MDC(LogKeys.PATH, path)}" + + log" at ${MDC(LogKeys.ADDED_JARS, added.mkString(","))}" + + log" with timestamp ${MDC(LogKeys.TIMESTAMP, timestamp)}") postEnvironmentUpdate() } if (existed.nonEmpty) { @@ -2274,7 +2286,8 @@ class SparkContext(config: SparkConf) extends Logging { */ def stop(exitCode: Int): Unit = { stopSite = Some(getCallSite()) - logInfo(s"SparkContext is stopping with exitCode $exitCode from ${stopSite.get.shortForm}.") + logInfo(log"SparkContext is stopping with exitCode ${MDC(LogKeys.EXIT_CODE, exitCode)}" + + log" from ${MDC(LogKeys.STOP_SITE_SHORT_FORM, stopSite.get.shortForm)}.") if (LiveListenerBus.withinListenerThread.value) { throw new SparkException(s"Cannot stop SparkContext within listener bus thread.") } @@ -2438,9 +2451,10 @@ class SparkContext(config: SparkConf) extends Logging { } val callSite = getCallSite() val cleanedFunc = clean(func) - logInfo("Starting job: " + callSite.shortForm) + logInfo(log"Starting job: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}") if (conf.getBoolean("spark.logLineage", false)) { - logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) + logInfo(log"RDD's recursive dependencies:\n" + + log"${MDC(LogKeys.RDD_DEBUG_STRING, rdd.toDebugString)}") } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) @@ -2559,13 +2573,14 @@ class SparkContext(config: SparkConf) extends Logging { timeout: Long): PartialResult[R] = { assertNotStopped() val callSite = getCallSite() - logInfo("Starting job: " + callSite.shortForm) - val start = System.nanoTime + logInfo(log"Starting job: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}") + val start = System.currentTimeMillis() val cleanedFunc = clean(func) val result = dagScheduler.runApproximateJob(rdd, cleanedFunc, evaluator, callSite, timeout, localProperties.get) logInfo( - "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s") + log"Job finished: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}," + + log" took ${MDC(LogKeys.TOTAL_TIME, System.currentTimeMillis() - start)}ms") result } @@ -2793,7 +2808,8 @@ class SparkContext(config: SparkConf) extends Logging { val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf) listeners.foreach { listener => listenerBus.addToSharedQueue(listener) - logInfo(s"Registered listener ${listener.getClass().getName()}") + logInfo(log"Registered listener" + + log"${MDC(LogKeys.CLASS_NAME, listener.getClass().getName())}") } } } catch { diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index 46dab16b8276e..5e2b5553f3dca 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -45,7 +45,7 @@ private[python] object Converter extends Logging { converterClass.map { cc => Try { val c = Utils.classForName[Converter[T, U]](cc).getConstructor().newInstance() - logInfo(s"Loaded converter: $cc") + logInfo(log"Loaded converter: ${MDC(CLASS_NAME, cc)}") c } match { case Success(c) => c diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 5aa080b5fb291..d643983ef5dfe 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -37,7 +37,8 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.api.python.PythonFunction.PythonAccumulator import org.apache.spark.broadcast.Broadcast import org.apache.spark.input.PortableDataStream -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{HOST, PORT} import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.network.util.JavaUtils import org.apache.spark.rdd.RDD @@ -733,7 +734,8 @@ private[spark] class PythonAccumulatorV2( private def openSocket(): Socket = synchronized { if (socket == null || socket.isClosed) { socket = new Socket(serverHost, serverPort) - logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort") + logInfo(log"Connected to AccumulatorServer at host: ${MDC(HOST, serverHost)}" + + log" port: ${MDC(PORT, serverPort)}") // send the secret just for the initial authentication when opening a new connection socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8)) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 16f87146c708c..b2571ffddc577 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -32,7 +32,7 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.api.python.PythonFunction.PythonAccumulator import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys._ +import org.apache.spark.internal.LogKeys.TASK_NAME import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES, Python} import org.apache.spark.internal.config.Python._ import org.apache.spark.rdd.InputFileBlockHolder diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 26c790a124470..fc6403fc7de77 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -28,7 +28,8 @@ import scala.sys.process.Process import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{PATH, PYTHON_VERSION} import org.apache.spark.util.ArrayImplicits.SparkArrayOps import org.apache.spark.util.Utils @@ -122,11 +123,11 @@ private[spark] object PythonUtils extends Logging { PythonUtils.sparkPythonPath, sys.env.getOrElse("PYTHONPATH", "")) val environment = Map("PYTHONPATH" -> pythonPath) - logInfo(s"Python path $pythonPath") + logInfo(log"Python path ${MDC(PATH, pythonPath)}") val processPythonVer = Process(pythonVersionCMD, None, environment.toSeq: _*) val output = runCommand(processPythonVer) - logInfo(s"Python version: ${output.getOrElse("Unable to determine")}") + logInfo(log"Python version: ${MDC(PYTHON_VERSION, output.getOrElse("Unable to determine"))}") val pythonCode = """ diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala index b238e2b87c653..0ff2b79ab6623 100644 --- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala @@ -22,7 +22,8 @@ import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, Data import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkEnv, SparkPythonException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{PYTHON_EXEC, PYTHON_WORKER_MODULE, PYTHON_WORKER_RESPONSE, SESSION_ID} import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT @@ -58,7 +59,8 @@ private[spark] class StreamingPythonRunner( * to be used with the functions. */ def init(): (DataOutputStream, DataInputStream) = { - logInfo(s"Initializing Python runner (session: $sessionId, pythonExec: $pythonExec)") + logInfo(log"Initializing Python runner (session: ${MDC(SESSION_ID, sessionId)}," + + log" pythonExec: ${MDC(PYTHON_EXEC, pythonExec)})") val env = SparkEnv.get val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") @@ -95,7 +97,8 @@ private[spark] class StreamingPythonRunner( val errMessage = PythonWorkerUtils.readUTF(dataIn) throw streamingPythonRunnerInitializationFailure(resFromPython, errMessage) } - logInfo(s"Runner initialization succeeded (returned $resFromPython).") + logInfo(log"Runner initialization succeeded (returned" + + log" ${MDC(PYTHON_WORKER_RESPONSE, resFromPython)}).") (dataOut, dataIn) } @@ -116,7 +119,8 @@ private[spark] class StreamingPythonRunner( * Stops the Python worker. */ def stop(): Unit = { - logInfo(s"Stopping streaming runner for sessionId: $sessionId, module: $workerModule.") + logInfo(log"Stopping streaming runner for sessionId: ${MDC(SESSION_ID, sessionId)}," + + log" module: ${MDC(PYTHON_WORKER_MODULE, workerModule)}.") try { pythonWorkerFactory.foreach { factory => diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 50ce6f82800dd..226a6dcd36a16 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -164,11 +164,12 @@ private class ClientEndpoint( // logs again when waitAppCompletion is set to true if (!driverStatusReported) { driverStatusReported = true - logInfo(s"State of $submittedDriverID is ${state.get}") + logInfo(log"State of ${MDC(DRIVER_ID, submittedDriverID)}" + + log" is ${MDC(DRIVER_STATE, state.get)}") // Worker node, if present (workerId, workerHostPort, state) match { case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") + logInfo(log"Driver running on ${MDC(HOST, hostPort)} (${MDC(WORKER_ID, id)})") case _ => } } @@ -181,17 +182,18 @@ private class ClientEndpoint( state.get match { case DriverState.FINISHED | DriverState.FAILED | DriverState.ERROR | DriverState.KILLED => - logInfo(s"State of driver $submittedDriverID is ${state.get}, " + - s"exiting spark-submit JVM.") + logInfo(log"State of driver ${MDC(DRIVER_ID, submittedDriverID)}" + + log" is ${MDC(DRIVER_STATE, state.get)}, exiting spark-submit JVM.") System.exit(0) case _ => if (!waitAppCompletion) { - logInfo(s"spark-submit not configured to wait for completion, " + - s"exiting spark-submit JVM.") + logInfo("spark-submit not configured to wait for completion, " + + " exiting spark-submit JVM.") System.exit(0) } else { - logDebug(s"State of driver $submittedDriverID is ${state.get}, " + - s"continue monitoring driver status.") + logDebug(log"State of driver ${MDC(DRIVER_ID, submittedDriverID)}" + + log" is ${MDC(DRIVER_STATE, state.get)}, " + + log"continue monitoring driver status.") } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index a56fbd5a644ae..3ce5e2d62b6a0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -23,7 +23,8 @@ import java.util.concurrent.CountDownLatch import scala.jdk.CollectionConverters._ import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKeys.{AUTH_ENABLED, PORT, SHUFFLE_DB_BACKEND_KEY, SHUFFLE_DB_BACKEND_NAME} import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} import org.apache.spark.network.TransportContext import org.apache.spark.network.crypto.AuthServerBootstrap @@ -86,8 +87,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) { val shuffleDBName = sparkConf.get(config.SHUFFLE_SERVICE_DB_BACKEND) val dbBackend = DBBackend.byName(shuffleDBName) - logInfo(s"Use ${dbBackend.name()} as the implementation of " + - s"${config.SHUFFLE_SERVICE_DB_BACKEND.key}") + logInfo(log"Use ${MDC(SHUFFLE_DB_BACKEND_NAME, dbBackend.name())} as the implementation of " + + log"${MDC(SHUFFLE_DB_BACKEND_KEY, config.SHUFFLE_SERVICE_DB_BACKEND.key)}") new ExternalBlockHandler(conf, findRegisteredExecutorsDBFile(dbBackend.fileName(registeredExecutorsDB))) } else { @@ -106,7 +107,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana def start(): Unit = { require(server == null, "Shuffle server already started") val authEnabled = securityManager.isAuthenticationEnabled() - logInfo(s"Starting shuffle service on port $port (auth enabled = $authEnabled)") + logInfo(log"Starting shuffle service on port ${MDC(PORT, port)}" + + log" (auth enabled = ${MDC(AUTH_ENABLED, authEnabled)})") val bootstraps: Seq[TransportServerBootstrap] = if (authEnabled) { Seq(new AuthServerBootstrap(transportConf, securityManager)) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 4bd2de62d75c2..cc777659f541e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -173,7 +173,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { val checkpointDurationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs) - logInfo(s"Checkpointing took $checkpointDurationMs ms.") + logInfo(log"Checkpointing took ${MDC(TOTAL_TIME, checkpointDurationMs)} ms.") val newRDD = new ReliableCheckpointRDD[T]( sc, checkpointDirPath.toString, originalRDD.partitioner) @@ -220,7 +220,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { } (catchBlock = { val deleted = fs.delete(tempOutputPath, false) if (!deleted) { - logInfo(s"Failed to delete tempOutputPath $tempOutputPath.") + logInfo(log"Failed to delete tempOutputPath ${MDC(TEMP_OUTPUT_PATH, tempOutputPath)}.") } }, finallyBlock = { serializeStream.close() @@ -228,12 +228,13 @@ private[spark] object ReliableCheckpointRDD extends Logging { if (!fs.rename(tempOutputPath, finalOutputPath)) { if (!fs.exists(finalOutputPath)) { - logInfo(s"Deleting tempOutputPath $tempOutputPath") + logInfo(log"Deleting tempOutputPath ${MDC(TEMP_OUTPUT_PATH, tempOutputPath)}") fs.delete(tempOutputPath, false) throw SparkCoreErrors.checkpointFailedToSaveError(ctx.attemptNumber(), finalOutputPath) } else { // Some other copy of this task must've finished before us and renamed it - logInfo(s"Final output path $finalOutputPath already exists; not overwriting it") + logInfo(log"Final output path" + + log" ${MDC(FINAL_OUTPUT_PATH, finalOutputPath)} already exists; not overwriting it") if (!fs.delete(tempOutputPath, false)) { logWarning(log"Error deleting ${MDC(PATH, tempOutputPath)}") } diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala index 0d1bc1425161e..b468a38fcf229 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala @@ -23,7 +23,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{NEW_RDD_ID, RDD_CHECKPOINT_DIR, RDD_ID} import org.apache.spark.internal.config.CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS /** @@ -66,7 +67,8 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v } } - logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}") + logInfo(log"Done checkpointing RDD ${MDC(RDD_ID, rdd.id)}" + + log" to ${MDC(RDD_CHECKPOINT_DIR, cpDir)}, new parent is RDD ${MDC(NEW_RDD_ID, newRDD.id)}") newRDD } diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 3d0e477037eab..f503be908c072 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -250,7 +250,8 @@ private[spark] object JettyUtils extends Logging { poolSize: Int = 200): ServerInfo = { val stopTimeout = conf.get(UI_JETTY_STOP_TIMEOUT) - logInfo(s"Start Jetty $hostName:$port for $serverName") + logInfo(log"Start Jetty ${MDC(HOST, hostName)}:${MDC(PORT, port)}" + + log" for ${MDC(SERVER_NAME, serverName)}") // Start the server first, with no connectors. val pool = new QueuedThreadPool(poolSize) if (serverName.nonEmpty) { @@ -558,7 +559,9 @@ private[spark] case class ServerInfo( */ private def addFilters(handler: ServletContextHandler, securityMgr: SecurityManager): Unit = { conf.get(UI_FILTERS).foreach { filter => - logInfo(s"Adding filter to ${handler.getContextPath()}: $filter") + logInfo(log"Adding filter to" + + log" ${MDC(SERVLET_CONTEXT_HANDLER_PATH, handler.getContextPath())}:" + + log" ${MDC(UI_FILTER, filter)}") val oldParams = conf.getOption(s"spark.$filter.params").toSeq .flatMap(Utils.stringToSeq) .flatMap { param => diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index ac76f74725e89..b8d422c9d9fbb 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -24,7 +24,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.CLASS_NAME +import org.apache.spark.internal.LogKeys.{CLASS_NAME, WEB_URL} import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR import org.apache.spark.internal.config.UI._ import org.apache.spark.scheduler._ @@ -164,7 +164,7 @@ private[spark] class SparkUI private ( /** Stop the server behind this web interface. Only valid after bind(). */ override def stop(): Unit = { super.stop() - logInfo(s"Stopped Spark web UI at $webUrl") + logInfo(log"Stopped Spark web UI at ${MDC(WEB_URL, webUrl)}") } override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index 51499870a6a30..4f01cd6ac2136 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -27,7 +27,7 @@ import com.codahale.metrics.Timer import org.apache.spark.SparkEnv import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKeys.LISTENER +import org.apache.spark.internal.LogKeys.{EVENT, LISTENER, TOTAL_TIME} import org.apache.spark.scheduler.EventLoggingListener import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate @@ -132,8 +132,9 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { if (maybeTimerContext != null) { val elapsed = maybeTimerContext.stop() if (logSlowEventEnabled && elapsed > logSlowEventThreshold) { - logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " + - s"${elapsed / 1000000000d}s.") + logInfo(log"Process of event ${MDC(EVENT, redactEvent(event))} by" + + log"listener ${MDC(LISTENER, listenerName)} took " + + log"${MDC(TOTAL_TIME, elapsed / 1000000d)}ms.") } } } diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala index 098f8b698fb7a..b41166a50efd2 100644 --- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala @@ -80,7 +80,7 @@ private[spark] object SignalUtils extends Logging { action: => Boolean): Unit = synchronized { try { val handler = handlers.getOrElseUpdate(signal, { - logInfo(s"Registering signal handler for $signal") + logInfo(log"Registering signal handler for ${MDC(SIGNAL, signal)}") new ActionHandler(new Signal(signal)) }) handler.register(action)