diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala index 558f6e44a6384..6acc6a03dbf33 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.executor.profiler -import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException} +import java.io.{BufferedInputStream, File, FileInputStream, InputStream, IOException} import java.net.URI import java.util.concurrent.{ScheduledExecutorService, TimeUnit} @@ -34,12 +34,13 @@ import org.apache.spark.util.ThreadUtils */ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) extends Logging { - private var running = false - private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED) + @volatile private var running = false + @volatile private var stageId: Int = _ private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS) private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR) private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR) private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL) + private val codeProfilingStageIsolated = conf.get(EXECUTOR_PROFILING_STAGE_ISOLATED) private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr" private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr" @@ -55,7 +56,7 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex val profiler: Option[AsyncProfiler] = { Option( - if (enableProfiler && AsyncProfilerLoader.isSupported) AsyncProfilerLoader.load() else null + if (AsyncProfilerLoader.isSupported) AsyncProfilerLoader.load() else null ) } @@ -76,6 +77,17 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex } } + def reStart(id: Int): Unit = { + if (!running && id != this.stageId) { + val tempFile = new File(s"$profilerLocalDir/profile.jfr") + if (tempFile.exists()) { + tempFile.delete() + } + this.stageId = id; + start() + } + } + /** Stops the profiling and saves output to dfs location. */ def stop(): Unit = { if (running) { @@ -99,8 +111,11 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex val appName = conf.get("spark.app.name").replace(" ", "-") val profilerOutputDirname = profilerDfsDir.get - val profileOutputFile = + val profileOutputFile = if (codeProfilingStageIsolated) { + s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId-$stageId.jfr" + } else { s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr" + } val fs = FileSystem.get(new URI(profileOutputFile), config); val filenamePath = new Path(profileOutputFile) outputStream = fs.create(filenamePath) diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala index e144092cdecd2..a9586eeebe3b8 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala @@ -17,11 +17,9 @@ package org.apache.spark.executor.profiler import java.util.{Map => JMap} - import scala.jdk.CollectionConverters._ import scala.util.Random - -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, TaskContext, TaskFailedReason} import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} import org.apache.spark.internal.Logging @@ -42,6 +40,7 @@ class JVMProfilerExecutorPlugin extends ExecutorPlugin with Logging { private var pluginCtx: PluginContext = _ private var profiler: ExecutorJVMProfiler = _ private var codeProfilingEnabled: Boolean = _ + private var codeProfilingStageIsolated: Boolean = _ private var codeProfilingFraction: Double = _ private val rand: Random = new Random(System.currentTimeMillis()) @@ -49,17 +48,42 @@ class JVMProfilerExecutorPlugin extends ExecutorPlugin with Logging { pluginCtx = ctx sparkConf = ctx.conf() codeProfilingEnabled = sparkConf.get(EXECUTOR_PROFILING_ENABLED) - if (codeProfilingEnabled) { + codeProfilingStageIsolated = sparkConf.get(EXECUTOR_PROFILING_STAGE_ISOLATED) + if (codeProfilingEnabled ) { codeProfilingFraction = sparkConf.get(EXECUTOR_PROFILING_FRACTION) if (rand.nextInt(100) * 0.01 < codeProfilingFraction) { logInfo(s"Executor id ${pluginCtx.executorID()} selected for JVM code profiling") profiler = new ExecutorJVMProfiler(sparkConf, pluginCtx.executorID()) - profiler.start() + if (!codeProfilingStageIsolated) { + profiler.start() + } } } Map.empty[String, String].asJava } + override def onTaskStart(): Unit = { + if (codeProfilingStageIsolated) { + val task = TaskContext.get() + if (task != null) { + val stageId = task.stageId(); + profiler.reStart(stageId) + } + } + } + + override def onTaskFailed(failureReason: TaskFailedReason): Unit = { + if (codeProfilingStageIsolated) { + onTaskSucceeded() + } + } + + override def onTaskSucceeded(): Unit = { + if (codeProfilingStageIsolated) { + profiler.stop() + } + } + override def shutdown(): Unit = { logInfo("Executor JVM profiler shutting down") if (profiler != null) { diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala index f9adec2d4be90..1d6733a3166b9 100644 --- a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala @@ -29,6 +29,13 @@ package object profiler { .booleanConf .createWithDefault(false) + private[profiler] val EXECUTOR_PROFILING_STAGE_ISOLATED = + ConfigBuilder("spark.executor.profiling.stage.isolated") + .doc("Turn on code profiling task isolated in executor.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + private[profiler] val EXECUTOR_PROFILING_DFS_DIR = ConfigBuilder("spark.executor.profiling.dfsDir") .doc("HDFS compatible file-system path to where the profiler will write output jfr files.")