Skip to content

Commit

Permalink
[KYUUBI apache#5237] ConfigMaps deletion on Kubernetes
Browse files Browse the repository at this point in the history
  • Loading branch information
Madhukar525722 committed Oct 5, 2024
1 parent 8e2b1b3 commit c729e2d
Showing 1 changed file with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
private var expireCleanUpTriggerCacheExecutor: ScheduledExecutorService = _

private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _
private var cleanupConfigMapExecutor: ThreadPoolExecutor = _

private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = {
checkKubernetesInfo(kubernetesInfo)
Expand Down Expand Up @@ -165,6 +166,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
TimeUnit.MILLISECONDS)
cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool(
"cleanup-canceled-app-pod-thread")
cleanupConfigMapExecutor = ThreadUtils.newDaemonCachedThreadPool(
"cleanup-config-map-thread")
}

override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
Expand Down Expand Up @@ -278,6 +281,48 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo)
extends ResourceEventHandler[Pod] {

private def runConfigMapDeletion(pod: Pod, kubernetesInfo: KubernetesInfo): Unit = {
cleanupConfigMapExecutor.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
try {
val volumes = pod.getSpec.getVolumes.asScala
val configMapVolume = volumes.find(_.getConfigMap != null)
.map(_.getConfigMap.getName)
.find(_.contains("spark-exec"))

configMapVolume match {
case Some(configMapName) =>
val kubernetesClient = getOrCreateKubernetesClient(kubernetesInfo)
val statusDetailsList =
kubernetesClient.configMaps().withName(configMapName).delete()

val deletionSuccessful =
statusDetailsList != null && statusDetailsList.asScala.nonEmpty
if (deletionSuccessful) {
info(
s"[$kubernetesInfo] ConfigMap $configMapName associated with " +
s"pod ${pod.getMetadata.getName} deleted successfully.")
} else {
warn(
s"[$kubernetesInfo] Failed to delete ConfigMap " +
s"$configMapName associated with " +
s"pod ${pod.getMetadata.getName}.")
}
case None =>
warn(
s"[$kubernetesInfo] No ConfigMap volume found " +
s"for pod ${pod.getMetadata.getName}.")
}
} catch {
case NonFatal(e) => error(
s"[$kubernetesInfo] Failed to delete ConfigMap associated with " +
s"pod ${pod.getMetadata.getName}",
e)
}
}
})
}

override def onAdd(pod: Pod): Unit = {
if (isSparkEnginePod(pod)) {
updateApplicationState(kubernetesInfo, pod)
Expand Down Expand Up @@ -320,6 +365,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
appStateSource,
appStateContainer)
}

runConfigMapDeletion(pod, kubernetesInfo)
}
}

Expand Down

0 comments on commit c729e2d

Please sign in to comment.