Skip to content

Commit

Permalink
[SPARK-47240][CORE][PART1] Migrate logInfo with variables to structur…
Browse files Browse the repository at this point in the history
…ed logging framework

The PR aims to migrate `logInfo` in Core module with variables to structured logging framework.

### Why are the changes needed?

To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46362 from zeotuan/coreInfo.

Lead-authored-by: Tuan Pham <[email protected]>
Co-authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
zeotuan and gengliangwang committed May 8, 2024
1 parent 553e1b8 commit a15adeb
Show file tree
Hide file tree
Showing 19 changed files with 205 additions and 102 deletions.
48 changes: 46 additions & 2 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ trait LogKey {
}

/**
* Various keys used for mapped diagnostic contexts(MDC) in logging.
* All structured logging keys should be defined here for standardization.
* Various keys used for mapped diagnostic contexts(MDC) in logging. All structured logging keys
* should be defined here for standardization.
*/
object LogKeys {
case object ACCUMULATOR_ID extends LogKey
case object ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
case object ACTUAL_NUM_FILES extends LogKey
case object ACTUAL_PARTITION_COLUMN extends LogKey
case object ADDED_JARS extends LogKey
case object AGGREGATE_FUNCTIONS extends LogKey
case object ALPHA extends LogKey
case object ANALYSIS_ERROR extends LogKey
Expand All @@ -42,7 +44,10 @@ object LogKeys {
case object APP_NAME extends LogKey
case object APP_STATE extends LogKey
case object ARGS extends LogKey
case object AUTH_ENABLED extends LogKey
case object BACKUP_FILE extends LogKey
case object BARRIER_EPOCH extends LogKey
case object BARRIER_ID extends LogKey
case object BATCH_ID extends LogKey
case object BATCH_NAME extends LogKey
case object BATCH_TIMESTAMP extends LogKey
Expand All @@ -55,13 +60,15 @@ object LogKeys {
case object BOOT extends LogKey
case object BROADCAST extends LogKey
case object BROADCAST_ID extends LogKey
case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
case object BUCKET extends LogKey
case object BYTECODE_SIZE extends LogKey
case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey
case object CACHE_AUTO_REMOVED_SIZE extends LogKey
case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey
case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey
case object CALL_SITE_LONG_FORM extends LogKey
case object CALL_SITE_SHORT_FORM extends LogKey
case object CATALOG_NAME extends LogKey
case object CATEGORICAL_FEATURES extends LogKey
case object CHECKPOINT_FILE extends LogKey
Expand Down Expand Up @@ -142,11 +149,13 @@ object LogKeys {
case object DEPRECATED_KEY extends LogKey
case object DESCRIPTION extends LogKey
case object DESIRED_NUM_PARTITIONS extends LogKey
case object DESTINATION_PATH extends LogKey
case object DFS_FILE extends LogKey
case object DIFF_DELTA extends LogKey
case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey
case object DRIVER_ID extends LogKey
case object DRIVER_LIBRARY_PATH_KEY extends LogKey
case object DRIVER_STATE extends LogKey
case object DROPPED_PARTITIONS extends LogKey
case object DURATION extends LogKey
case object EARLIEST_LOADED_VERSION extends LogKey
Expand All @@ -161,6 +170,7 @@ object LogKeys {
case object ERROR extends LogKey
case object ESTIMATOR_PARAMETER_MAP extends LogKey
case object EVALUATED_FILTERS extends LogKey
case object EVENT extends LogKey
case object EVENT_LOG_DESTINATION extends LogKey
case object EVENT_LOOP extends LogKey
case object EVENT_NAME extends LogKey
Expand All @@ -184,8 +194,10 @@ object LogKeys {
case object EXECUTOR_TIMEOUT extends LogKey
case object EXEC_AMOUNT extends LogKey
case object EXISTING_FILE extends LogKey
case object EXISTING_JARS extends LogKey
case object EXISTING_PATH extends LogKey
case object EXIT_CODE extends LogKey
case object EXPECTED_ANSWER extends LogKey
case object EXPECTED_NUM_FILES extends LogKey
case object EXPECTED_PARTITION_COLUMN extends LogKey
case object EXPIRY_TIMESTAMP extends LogKey
Expand All @@ -211,6 +223,7 @@ object LogKeys {
case object FILTER extends LogKey
case object FILTERS extends LogKey
case object FINAL_CONTEXT extends LogKey
case object FINAL_OUTPUT_PATH extends LogKey
case object FINAL_PATH extends LogKey
case object FINISH_TRIGGER_DURATION extends LogKey
case object FROM_OFFSET extends LogKey
Expand Down Expand Up @@ -252,6 +265,7 @@ object LogKeys {
case object IS_NETWORK_REQUEST_DONE extends LogKey
case object JAR_ENTRY extends LogKey
case object JAR_MESSAGE extends LogKey
case object JAVA_VERSION extends LogKey
case object JOB_ID extends LogKey
case object JOIN_CONDITION extends LogKey
case object JOIN_CONDITION_SUB_EXPR extends LogKey
Expand Down Expand Up @@ -280,6 +294,7 @@ object LogKeys {
case object LOGICAL_PLAN_COLUMNS extends LogKey
case object LOGICAL_PLAN_LEAVES extends LogKey
case object LOG_ID extends LogKey
case object LOG_KEY_FILE extends LogKey
case object LOG_OFFSET extends LogKey
case object LOG_TYPE extends LogKey
case object LOWER_BOUND extends LogKey
Expand Down Expand Up @@ -326,13 +341,16 @@ object LogKeys {
case object NEW_FEATURE_COLUMN_NAME extends LogKey
case object NEW_LABEL_COLUMN_NAME extends LogKey
case object NEW_PATH extends LogKey
case object NEW_RDD_ID extends LogKey
case object NEW_STATE extends LogKey
case object NEW_VALUE extends LogKey
case object NODES extends LogKey
case object NODE_LOCATION extends LogKey
case object NON_BUILT_IN_CONNECTORS extends LogKey
case object NORM extends LogKey
case object NUM_ADDED_MASTERS extends LogKey
case object NUM_ADDED_PARTITIONS extends LogKey
case object NUM_ADDED_WORKERS extends LogKey
case object NUM_BIN extends LogKey
case object NUM_BYTES extends LogKey
case object NUM_BYTES_CURRENT extends LogKey
Expand Down Expand Up @@ -373,6 +391,7 @@ object LogKeys {
case object NUM_PRUNED extends LogKey
case object NUM_REPLICAS extends LogKey
case object NUM_REQUESTS extends LogKey
case object NUM_REQUEST_SYNC_TASK extends LogKey
case object NUM_RESOURCE_SLOTS extends LogKey
case object NUM_RETRIES extends LogKey
case object NUM_RIGHT_PARTITION_VALUES extends LogKey
Expand All @@ -397,7 +416,12 @@ object LogKeys {
case object OPTIONS extends LogKey
case object OP_ID extends LogKey
case object OP_TYPE extends LogKey
case object OS_ARCH extends LogKey
case object OS_NAME extends LogKey
case object OS_VERSION extends LogKey
case object OUTPUT extends LogKey
case object OUTPUT_LINE extends LogKey
case object OUTPUT_LINE_NUMBER extends LogKey
case object OVERHEAD_MEMORY_SIZE extends LogKey
case object PAGE_SIZE extends LogKey
case object PARSE_MODE extends LogKey
Expand Down Expand Up @@ -438,6 +462,9 @@ object LogKeys {
case object PUSHED_FILTERS extends LogKey
case object PVC_METADATA_NAME extends LogKey
case object PYTHON_EXEC extends LogKey
case object PYTHON_VERSION extends LogKey
case object PYTHON_WORKER_MODULE extends LogKey
case object PYTHON_WORKER_RESPONSE extends LogKey
case object QUERY_CACHE_VALUE extends LogKey
case object QUERY_HINT extends LogKey
case object QUERY_ID extends LogKey
Expand All @@ -447,6 +474,8 @@ object LogKeys {
case object QUERY_PLAN_LENGTH_MAX extends LogKey
case object QUERY_RUN_ID extends LogKey
case object RANGE extends LogKey
case object RDD_CHECKPOINT_DIR extends LogKey
case object RDD_DEBUG_STRING extends LogKey
case object RDD_DESCRIPTION extends LogKey
case object RDD_ID extends LogKey
case object READ_LIMIT extends LogKey
Expand All @@ -466,6 +495,7 @@ object LogKeys {
case object REMOTE_ADDRESS extends LogKey
case object REMOVE_FROM_MASTER extends LogKey
case object REPORT_DETAILS extends LogKey
case object REQUESTER_SIZE extends LogKey
case object RESOURCE extends LogKey
case object RESOURCE_NAME extends LogKey
case object RESOURCE_PROFILE_ID extends LogKey
Expand All @@ -489,13 +519,18 @@ object LogKeys {
case object SCHEDULER_POOL_NAME extends LogKey
case object SCHEMA extends LogKey
case object SCHEMA2 extends LogKey
case object SERIALIZE_OUTPUT_LENGTH extends LogKey
case object SERVER_NAME extends LogKey
case object SERVICE_NAME extends LogKey
case object SERVLET_CONTEXT_HANDLER_PATH extends LogKey
case object SESSION_HOLD_INFO extends LogKey
case object SESSION_ID extends LogKey
case object SESSION_KEY extends LogKey
case object SHARD_ID extends LogKey
case object SHELL_COMMAND extends LogKey
case object SHUFFLE_BLOCK_INFO extends LogKey
case object SHUFFLE_DB_BACKEND_KEY extends LogKey
case object SHUFFLE_DB_BACKEND_NAME extends LogKey
case object SHUFFLE_ID extends LogKey
case object SHUFFLE_MERGE_ID extends LogKey
case object SHUFFLE_SERVICE_NAME extends LogKey
Expand All @@ -506,8 +541,10 @@ object LogKeys {
case object SMALLEST_CLUSTER_INDEX extends LogKey
case object SNAPSHOT_VERSION extends LogKey
case object SOCKET_ADDRESS extends LogKey
case object SOURCE_PATH extends LogKey
case object SPARK_DATA_STREAM extends LogKey
case object SPARK_PLAN_ID extends LogKey
case object SPARK_VERSION extends LogKey
case object SPILL_TIMES extends LogKey
case object SQL_TEXT extends LogKey
case object SRC_PATH extends LogKey
Expand All @@ -520,6 +557,7 @@ object LogKeys {
case object STATE_STORE_VERSION extends LogKey
case object STATUS extends LogKey
case object STDERR extends LogKey
case object STOP_SITE_SHORT_FORM extends LogKey
case object STORAGE_LEVEL extends LogKey
case object STORAGE_LEVEL_DESERIALIZED extends LogKey
case object STORAGE_LEVEL_REPLICATION extends LogKey
Expand All @@ -535,11 +573,14 @@ object LogKeys {
case object STREAMING_WRITE extends LogKey
case object STREAM_ID extends LogKey
case object STREAM_NAME extends LogKey
case object STREAM_SOURCE extends LogKey
case object SUBMISSION_ID extends LogKey
case object SUBSAMPLING_RATE extends LogKey
case object SUB_QUERY extends LogKey
case object TABLE_NAME extends LogKey
case object TABLE_TYPES extends LogKey
case object TARGET_NUM_EXECUTOR extends LogKey
case object TARGET_NUM_EXECUTOR_DELTA extends LogKey
case object TARGET_PATH extends LogKey
case object TASK_ATTEMPT_ID extends LogKey
case object TASK_ID extends LogKey
Expand All @@ -548,6 +589,7 @@ object LogKeys {
case object TASK_SET_NAME extends LogKey
case object TASK_STATE extends LogKey
case object TEMP_FILE extends LogKey
case object TEMP_OUTPUT_PATH extends LogKey
case object TEMP_PATH extends LogKey
case object TEST_SIZE extends LogKey
case object THREAD extends LogKey
Expand All @@ -557,6 +599,7 @@ object LogKeys {
case object TIME extends LogKey
case object TIMEOUT extends LogKey
case object TIMER extends LogKey
case object TIMESTAMP extends LogKey
case object TIME_UNITS extends LogKey
case object TIP extends LogKey
case object TOKEN_KIND extends LogKey
Expand Down Expand Up @@ -598,6 +641,7 @@ object LogKeys {
case object WAIT_SEND_TIME extends LogKey
case object WAIT_TIME extends LogKey
case object WATERMARK_CONSTRAINT extends LogKey
case object WEB_URL extends LogKey
case object WEIGHT extends LogKey
case object WEIGHTED_NUM extends LogKey
case object WORKER extends LogKey
Expand Down
17 changes: 11 additions & 6 deletions core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import java.util.function.Consumer

import scala.collection.mutable.{ArrayBuffer, HashSet}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted}
import org.apache.spark.util.ThreadUtils
Expand Down Expand Up @@ -161,7 +162,8 @@ private[spark] class BarrierCoordinator(
s"${request.numTasks} from Task $taskId, previously it was $numTasks.")

// Check whether the epoch from the barrier tasks matches current barrierEpoch.
logInfo(s"Current barrier epoch for $barrierId is $barrierEpoch.")
logInfo(log"Current barrier epoch for ${MDC(BARRIER_ID, barrierId)}" +
log" is ${MDC(BARRIER_EPOCH, barrierEpoch)}.")
if (epoch != barrierEpoch) {
requester.sendFailure(new SparkException(s"The request to sync of $barrierId with " +
s"barrier epoch $barrierEpoch has already finished. Maybe task $taskId is not " +
Expand All @@ -176,14 +178,17 @@ private[spark] class BarrierCoordinator(
// Add the requester to array of RPCCallContexts pending for reply.
requesters += requester
messages(request.partitionId) = request.message
logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received update from Task " +
s"$taskId, current progress: ${requesters.size}/$numTasks.")
logInfo(log"Barrier sync epoch ${MDC(BARRIER_EPOCH, barrierEpoch)}" +
log" from ${MDC(BARRIER_ID, barrierId)} received update from Task" +
log" ${MDC(TASK_ID, taskId)}, current progress:" +
log" ${MDC(REQUESTER_SIZE, requesters.size)}/${MDC(NUM_REQUEST_SYNC_TASK, numTasks)}.")
if (requesters.size == numTasks) {
requesters.foreach(_.reply(messages.clone()))
// Finished current barrier() call successfully, clean up ContextBarrierState and
// increase the barrier epoch.
logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received all updates from " +
s"tasks, finished successfully.")
logInfo(log"Barrier sync epoch ${MDC(BARRIER_EPOCH, barrierEpoch)}" +
log" from ${MDC(BARRIER_ID, barrierId)} received all updates from" +
log" tasks, finished successfully.")
barrierEpoch += 1
requesters.clear()
requestMethods.clear()
Expand Down
35 changes: 19 additions & 16 deletions core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import scala.util.{Failure, Success => ScalaSuccess, Try}

import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.source.Source
import org.apache.spark.resource.ResourceInformation
Expand Down Expand Up @@ -56,19 +57,27 @@ class BarrierTaskContext private[spark] (
// with the driver side epoch.
private var barrierEpoch = 0

private def logProgressInfo(msg: MessageWithContext, startTime: Option[Long]): Unit = {
val waitMsg = startTime.fold(log"")(st => log", waited " +
log"for ${MDC(TOTAL_TIME, System.currentTimeMillis() - st)} ms,")
logInfo(log"Task ${MDC(TASK_ATTEMPT_ID, taskAttemptId())}" +
log" from Stage ${MDC(STAGE_ID, stageId())}" +
log"(Attempt ${MDC(STAGE_ATTEMPT, stageAttemptNumber())}) " +
msg + waitMsg +
log" current barrier epoch is ${MDC(BARRIER_EPOCH, barrierEpoch)}.")
}

private def runBarrier(message: String, requestMethod: RequestMethod.Value): Array[String] = {
logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt ${stageAttemptNumber()}) " +
s"has entered the global sync, current barrier epoch is $barrierEpoch.")
logProgressInfo(log"has entered the global sync", None)
logTrace("Current callSite: " + Utils.getCallSite())

val startTime = System.currentTimeMillis()
val timerTask = new TimerTask {
override def run(): Unit = {
logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt " +
s"${stageAttemptNumber()}) waiting " +
s"under the global sync since $startTime, has been waiting for " +
s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " +
s"current barrier epoch is $barrierEpoch.")
logProgressInfo(
log"waiting under the global sync since ${MDC(TIME, startTime)}",
Some(startTime)
)
}
}
// Log the update of global sync every 1 minute.
Expand Down Expand Up @@ -104,17 +113,11 @@ class BarrierTaskContext private[spark] (
val messages = abortableRpcFuture.future.value.get.get

barrierEpoch += 1
logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt ${stageAttemptNumber()}) " +
s"finished global sync successfully, waited for " +
s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " +
s"current barrier epoch is $barrierEpoch.")
logProgressInfo(log"finished global sync successfully", Some(startTime))
messages
} catch {
case e: SparkException =>
logInfo(s"Task ${taskAttemptId()} from Stage ${stageId()}(Attempt " +
s"${stageAttemptNumber()}) failed to perform global sync, waited for " +
s"${MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)} seconds, " +
s"current barrier epoch is $barrierEpoch.")
logProgressInfo(log"failed to perform global sync", Some(startTime))
throw e
} finally {
timerTask.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,12 @@ private[spark] class ExecutorAllocationManager(
val delta = targetNum.delta
totalDelta += delta
if (delta > 0) {
val executorsString = "executor" + { if (delta > 1) "s" else "" }
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged " +
s"(new desired total will be ${numExecutorsTargetPerResourceProfileId(rpId)} " +
s"for resource profile id: ${rpId})")
val executorsString = log" new executor" + { if (delta > 1) log"s" else log"" }
logInfo(log"Requesting ${MDC(TARGET_NUM_EXECUTOR_DELTA, delta)}" +
executorsString + log" because tasks are backlogged " +
log"(new desired total will be" +
log" ${MDC(TARGET_NUM_EXECUTOR, numExecutorsTargetPerResourceProfileId(rpId))} " +
log"for resource profile id: ${MDC(RESOURCE_PROFILE_ID, rpId)})")
numExecutorsToAddPerResourceProfileId(rpId) =
if (delta == numExecutorsToAddPerResourceProfileId(rpId)) {
numExecutorsToAddPerResourceProfileId(rpId) * 2
Expand Down Expand Up @@ -604,7 +606,8 @@ private[spark] class ExecutorAllocationManager(
} else {
executorMonitor.executorsKilled(executorsRemoved.toSeq)
}
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
logInfo(log"Executors ${MDC(EXECUTOR_IDS, executorsRemoved.mkString(","))}" +
log"removed due to idle timeout.")
executorsRemoved.toSeq
} else {
logWarning(log"Unable to reach the cluster manager to kill executor/s " +
Expand Down
Loading

0 comments on commit a15adeb

Please sign in to comment.