Skip to content

Commit

Permalink
Support rolling spark.kubernetes.file.upload.path
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Dec 30, 2024
1 parent e8cbff3 commit 38953dc
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 4 deletions.
6 changes: 6 additions & 0 deletions docs/deployment/engine_on_kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ The minimum required configurations are:
* spark.kubernetes.file.upload.path (path on S3 or HDFS)
* spark.kubernetes.authenticate.driver.serviceAccountName ([viz ServiceAccount](#serviceaccount))

The vanilla Spark neither support rolling nor expiration mechanism for `spark.kubernetes.file.upload.path`, if you use
file system that does not support TTL, e.g. HDFS, additional cleanup mechanisms are needed to prevent the files in this
directory from growing indefinitely. Since Kyuubi v1.11.0, you can configure `spark.kubernetes.file.upload.path` with
placeholders `{{YEAR}}`, `{{MONTH}}` and `{{DAY}}`, and enable `kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled`
to let Kyuubi server create the directory with 777 permission automatically before submitting Spark application.

### Docker Image

Spark ships a `./bin/docker-image-tool.sh` script to build and publish the Docker images for running Spark applications on Kubernetes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,15 @@ object KyuubiConf {
.createWithDefault(
"http://{{SPARK_DRIVER_SVC}}.{{KUBERNETES_NAMESPACE}}.svc:{{SPARK_UI_PORT}}")

val KUBERNETES_SPARK_AUTO_CREATE_FILE_UPLOAD_PATH: ConfigEntry[Boolean] =
buildConf("kyuubi.kubernetes.spark.autoCreateFileUploadPath.enabled")
.doc("If enabled, Kyuubi server will try to create the " +
"`spark.kubernetes.file.upload.path` with permission 777 before submitting " +
"the Spark application.")
.version("1.11.0")
.booleanConf
.createWithDefault(false)

object KubernetesCleanupDriverPodStrategy extends Enumeration {
type KubernetesCleanupDriverPodStrategy = Value
val NONE, ALL, COMPLETED = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class SparkBatchProcessBuilder(
(batchKyuubiConf.getAll ++
sparkAppNameConf() ++
engineLogPathConf() ++
appendPodNameConf(batchConf)).map { case (k, v) =>
appendPodNameConf(batchConf) ++
prepareK8sFileUploadPath()).map { case (k, v) =>
buffer ++= confKeyValue(convertConfigKey(k), v)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ package org.apache.kyuubi.engine.spark

import java.io.{File, FileFilter, IOException}
import java.nio.file.Paths
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.Locale

import scala.collection.mutable

import com.google.common.annotations.VisibleForTesting
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi._
Expand All @@ -37,7 +42,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.{JavaUtils, KubernetesUtils, Validator}
import org.apache.kyuubi.util.{JavaUtils, KubernetesUtils, KyuubiHadoopUtils, Validator}
import org.apache.kyuubi.util.command.CommandLineUtils._

class SparkProcessBuilder(
Expand Down Expand Up @@ -141,7 +146,11 @@ class SparkProcessBuilder(
allConf = allConf ++ zkAuthKeytabFileConf(allConf)
}
// pass spark engine log path to spark conf
(allConf ++ engineLogPathConf ++ extraYarnConf(allConf) ++ appendPodNameConf(allConf)).foreach {
(allConf ++
engineLogPathConf ++
extraYarnConf(allConf) ++
appendPodNameConf(allConf) ++
prepareK8sFileUploadPath()).foreach {
case (k, v) => buffer ++= confKeyValue(convertConfigKey(k), v)
}

Expand Down Expand Up @@ -266,6 +275,40 @@ class SparkProcessBuilder(
map.result().toMap
}

def prepareK8sFileUploadPath(): Map[String, String] = {
kubernetesFileUploadPath() match {
case Some(uploadPathPattern) if isK8sClusterMode =>
val today = LocalDate.now()
val uploadPath = uploadPathPattern
.replace("{{YEAR}}", today.format(YEAR_FMT))
.replace("{{MONTH}}", today.format(MONTH_FMT))
.replace("{{DAY}}", today.format(DAY_FMT))

if (conf.get(KUBERNETES_SPARK_AUTO_CREATE_FILE_UPLOAD_PATH)) {
val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf, loadDefaults = false)
val path = new Path(uploadPath)
var fs: FileSystem = null
try {
fs = path.getFileSystem(hadoopConf)
if (!fs.exists(path)) {
info(s"Try creating $KUBERNETES_FILE_UPLOAD_PATH: $uploadPath")
fs.mkdirs(path, KUBERNETES_UPLOAD_PATH_PERMISSION)
}
} catch {
case ioe: IOException =>
warn(s"Failed to create $KUBERNETES_FILE_UPLOAD_PATH: $uploadPath", ioe)
} finally {
if (fs != null) {
Utils.tryLogNonFatalError(fs.close())
}
}
}
Map(KUBERNETES_FILE_UPLOAD_PATH -> uploadPath)
case None =>
Map.empty
}
}

def extraYarnConf(conf: Map[String, String]): Map[String, String] = {
val map = mutable.Map.newBuilder[String, String]
if (clusterManager().exists(_.toLowerCase(Locale.ROOT).startsWith("yarn"))) {
Expand Down Expand Up @@ -294,6 +337,14 @@ class SparkProcessBuilder(
}
}

def isK8sClusterMode: Boolean = {
clusterManager().map(_.toLowerCase(Locale.ROOT)) match {
case Some(m) if m.startsWith("k8s") =>
deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster")
case _ => false
}
}

def kubernetesContext(): Option[String] = {
conf.getOption(KUBERNETES_CONTEXT_KEY).orElse(defaultsConf.get(KUBERNETES_CONTEXT_KEY))
}
Expand All @@ -302,6 +353,11 @@ class SparkProcessBuilder(
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
}

def kubernetesFileUploadPath(): Option[String] = {
conf.getOption(KUBERNETES_FILE_UPLOAD_PATH)
.orElse(defaultsConf.get(KUBERNETES_FILE_UPLOAD_PATH))
}

override def validateConf(): Unit = Validator.validateConf(conf)

// For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user
Expand Down Expand Up @@ -331,6 +387,13 @@ object SparkProcessBuilder {
final val YARN_MAX_APP_ATTEMPTS_KEY = "spark.yarn.maxAppAttempts"
final val INTERNAL_RESOURCE = "spark-internal"

final val KUBERNETES_FILE_UPLOAD_PATH = "spark.kubernetes.file.upload.path"
final val KUBERNETES_UPLOAD_PATH_PERMISSION = new FsPermission(Integer.parseInt("777", 8).toShort)

final val YEAR_FMT = DateTimeFormatter.ofPattern("yyyy")
final val MONTH_FMT = DateTimeFormatter.ofPattern("MM")
final val DAY_FMT = DateTimeFormatter.ofPattern("dd")

/**
* The path configs from Spark project that might upload local files:
* - SparkSubmit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.engine.spark

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.UUID

import org.apache.kyuubi.KyuubiFunSuite
Expand All @@ -36,4 +38,24 @@ class SparkBatchProcessBuilderSuite extends KyuubiFunSuite {
None)
assert(builder.commands.toSeq.contains("spark.kyuubi.key=value"))
}

test("spark.kubernetes.file.upload.path supports placeholder") {
val conf1 = KyuubiConf(false)
conf1.set("spark.master", "k8s://test:12345")
conf1.set("spark.submit.deployMode", "cluster")
conf1.set("spark.kubernetes.file.upload.path", "hdfs:///spark-upload-{{YEAR}}{{MONTH}}{{DAY}}")
val builder1 = new SparkBatchProcessBuilder(
"",
conf1,
UUID.randomUUID().toString,
"test",
Some("test"),
"test",
Map("kyuubi.key" -> "value"),
Seq.empty,
None)
val commands1 = builder1.toString.split(' ')
val toady = DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now())
assert(commands1.contains(s"spark.kubernetes.file.upload.path=hdfs:///spark-upload-$toady"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.kyuubi.engine.spark

import java.io.File
import java.nio.file.{Files, Path, Paths, StandardOpenOption}
import java.time.Duration
import java.time.{Duration, LocalDate}
import java.time.format.DateTimeFormatter
import java.util.UUID
import java.util.concurrent.{Executors, TimeUnit}

Expand Down Expand Up @@ -468,6 +469,17 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
None)
assert(builder.commands.toSeq.contains("spark.kyuubi.key=value"))
}

test("spark.kubernetes.file.upload.path supports placeholder") {
val conf1 = KyuubiConf(false)
conf1.set("spark.master", "k8s://test:12345")
conf1.set("spark.submit.deployMode", "cluster")
conf1.set("spark.kubernetes.file.upload.path", "hdfs:///spark-upload-{{YEAR}}{{MONTH}}{{DAY}}")
val builder1 = new SparkProcessBuilder("", true, conf1)
val commands1 = builder1.toString.split(' ')
val toady = DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now())
assert(commands1.contains(s"spark.kubernetes.file.upload.path=hdfs:///spark-upload-$toady"))
}
}

class FakeSparkProcessBuilder(config: KyuubiConf)
Expand Down

0 comments on commit 38953dc

Please sign in to comment.