diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index c89c50aa91d..7fd22c53f1b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -34,7 +34,6 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, SPARK_SQL_EXECUTION_ID_KEY} import org.apache.kyuubi.engine.spark.operation.ExecuteStatement import org.apache.kyuubi.operation.Operation -import org.apache.kyuubi.operation.log.OperationLog /** * A [[SparkListener]] based on spark's DeveloperApi [[StatsReportListener]], used to appending @@ -78,15 +77,6 @@ class SQLOperationListener( properties != null && properties.getProperty(KYUUBI_STATEMENT_ID_KEY) == operationId } - private def withOperationLog(f: => Unit): Unit = { - try { - operation.getOperationLog.foreach(OperationLog.setCurrentOperationLog) - f - } finally { - OperationLog.removeCurrentOperationLog() - } - } - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { if (sameGroupId(jobStart.properties)) { val jobId = jobStart.jobId @@ -105,7 +95,7 @@ class SQLOperationListener( activeJobs.put( jobId, new SparkJobInfo(stageSize, stageIds)) - withOperationLog { + operation.withOperationLog { info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + s" ${activeJobs.size()} active jobs running") } @@ -119,7 +109,7 @@ class SQLOperationListener( case JobSucceeded => "succeeded" case _ => "failed" // TODO: Handle JobFailed(exception: Exception) } - withOperationLog { + operation.withOperationLog { info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running") } } @@ -135,7 +125,7 @@ class SQLOperationListener( activeStages.put( stageAttempt, new SparkStageInfo(stageId, stageInfo.numTasks)) - withOperationLog { + operation.withOperationLog { info(s"Query [$operationId]: Stage $stageId.$attemptNumber started " + s"with ${stageInfo.numTasks} tasks, ${activeStages.size()} active stages running") } @@ -166,7 +156,7 @@ class SQLOperationListener( operationRunTime.getAndAdd(taskMetrics.executorRunTime) operationCpuTime.getAndAdd(taskMetrics.executorCpuTime) } - withOperationLog(super.onStageCompleted(stageCompleted)) + operation.withOperationLog(super.onStageCompleted(stageCompleted)) } } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index 3681e98f784..fdcc0c3406d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -74,6 +74,15 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin override def getOperationLog: Option[OperationLog] = None + override def withOperationLog(f: => Unit): Unit = { + try { + getOperationLog.foreach(OperationLog.setCurrentOperationLog) + f + } finally { + OperationLog.removeCurrentOperationLog() + } + } + OperationAuditLogger.audit(this, OperationState.INITIALIZED) @volatile protected var state: OperationState = INITIALIZED @volatile protected var startTime: Long = _ diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala index e216385180f..9a6e453d544 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala @@ -37,6 +37,7 @@ trait Operation { def getHandle: OperationHandle def getStatus: OperationStatus def getOperationLog: Option[OperationLog] + def withOperationLog(f: => Unit): Unit def getBackgroundHandle: Future[_] def shouldRunAsync: Boolean diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index 3fd5ddbeaa8..be0d3bdf6e6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -499,9 +499,14 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { val sessionHandle = formatSessionHandle(batchId) sessionManager.getBatchSession(sessionHandle).map { batchSession => - fe.getSessionUser(batchSession.user) + val userName = fe.getSessionUser(batchSession.user) + val ipAddress = fe.getIpAddress sessionManager.closeSession(batchSession.handle) val (killed, msg) = batchSession.batchJobSubmissionOp.getKillMessage + batchSession.batchJobSubmissionOp.withOperationLog { + warn(s"Received kill batch request from $userName/$ipAddress") + warn(s"Kill batch response: killed: $killed, msg: $msg.") + } new CloseBatchResponse(killed, msg) }.getOrElse { sessionManager.getBatchMetadata(batchId).map { metadata =>