From d5ca3082bd481f735f54e0c9146664fb798ea6d3 Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Thu, 10 Oct 2024 16:52:56 +0800 Subject: [PATCH 1/3] [CELEBORN-1577][Phase1] Storage quota should support interrupt shuffle. --- .../shuffle/celeborn/SparkShuffleManager.java | 1 + .../spark/shuffle/celeborn/SparkUtils.java | 24 ++++++++++++++++++ .../shuffle/celeborn/SparkShuffleManager.java | 1 + .../spark/shuffle/celeborn/SparkUtils.java | 23 +++++++++++++++++ .../client/ApplicationHeartbeater.scala | 15 ++++++++--- .../celeborn/client/LifecycleManager.scala | 25 ++++++++++++++++--- .../client/WorkerStatusTrackerSuite.scala | 3 ++- common/src/main/proto/TransportMessages.proto | 1 + .../protocol/message/ControlMessages.scala | 13 +++++++--- .../service/deploy/master/Master.scala | 3 ++- 10 files changed, 97 insertions(+), 12 deletions(-) diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java index 4f6e835e726..4f3ffe33008 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java @@ -96,6 +96,7 @@ private void initializeLifecycleManager(String appId) { synchronized (this) { if (lifecycleManager == null) { lifecycleManager = new LifecycleManager(appId, celebornConf); + lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle); if (celebornConf.clientFetchThrowsFetchFailure()) { MapOutputTrackerMaster mapOutputTracker = (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker(); diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java index 4f38c98152b..3f2e4709750 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java @@ -22,14 +22,19 @@ import java.lang.reflect.Method; import java.util.concurrent.atomic.LongAdder; +import scala.Option; +import scala.Some; import scala.Tuple2; import org.apache.spark.BarrierTaskContext; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; +import org.apache.spark.SparkContext$; import org.apache.spark.TaskContext; +import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.MapStatus; import org.apache.spark.scheduler.MapStatus$; +import org.apache.spark.scheduler.ShuffleMapStage; import org.apache.spark.sql.execution.UnsafeRowSerializer; import org.apache.spark.sql.execution.metric.SQLMetric; import org.apache.spark.storage.BlockManagerId; @@ -39,6 +44,7 @@ import org.apache.celeborn.client.ShuffleClient; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.util.Utils; +import org.apache.celeborn.reflect.DynFields; public class SparkUtils { private static final Logger logger = LoggerFactory.getLogger(SparkUtils.class); @@ -179,4 +185,22 @@ public static void addFailureListenerIfBarrierTask( shuffleClient.reportBarrierTaskFailure(appShuffleId, appShuffleIdentifier); }); } + + private static final DynFields.UnboundField shuffleIdToMapStage_FIELD = + DynFields.builder().hiddenImpl(DAGScheduler.class, "shuffleIdToMapStage").build(); + + public static void cancelShuffle(int shuffleId, String reason) { + if (SparkContext$.MODULE$.getActive().nonEmpty()) { + DAGScheduler scheduler = SparkContext$.MODULE$.getActive().get().dagScheduler(); + scala.collection.mutable.Map shuffleIdToMapStageValue = + (scala.collection.mutable.Map) + shuffleIdToMapStage_FIELD.bind(scheduler).get(); + Option shuffleMapStage = shuffleIdToMapStageValue.get(shuffleId); + if (shuffleMapStage.nonEmpty()) { + scheduler.cancelStage(shuffleMapStage.get().id(), new Some<>(reason)); + } + } else { + logger.error("Can not get active SparkContext, skip cancelShuffle."); + } + } } diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java index da785886ca7..e355d6769f7 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java @@ -138,6 +138,7 @@ private void initializeLifecycleManager() { synchronized (this) { if (lifecycleManager == null) { lifecycleManager = new LifecycleManager(appUniqueId, celebornConf); + lifecycleManager.registerCancelShuffleCallback(SparkUtils::cancelShuffle); if (celebornConf.clientFetchThrowsFetchFailure()) { MapOutputTrackerMaster mapOutputTracker = (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker(); diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java index 47317474e8d..d8a237bc459 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java @@ -19,15 +19,20 @@ import java.util.concurrent.atomic.LongAdder; +import scala.Option; +import scala.Some; import scala.Tuple2; import org.apache.spark.BarrierTaskContext; import org.apache.spark.MapOutputTrackerMaster; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; +import org.apache.spark.SparkContext$; import org.apache.spark.TaskContext; +import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.MapStatus; import org.apache.spark.scheduler.MapStatus$; +import org.apache.spark.scheduler.ShuffleMapStage; import org.apache.spark.shuffle.ShuffleHandle; import org.apache.spark.shuffle.ShuffleReadMetricsReporter; import org.apache.spark.shuffle.ShuffleReader; @@ -266,6 +271,9 @@ public static CelebornShuffleReader createColumnarShuffleReader( .orNoop() .build(); + private static final DynFields.UnboundField shuffleIdToMapStage_FIELD = + DynFields.builder().hiddenImpl(DAGScheduler.class, "shuffleIdToMapStage").build(); + public static void unregisterAllMapOutput( MapOutputTrackerMaster mapOutputTracker, int shuffleId) { if (!UnregisterAllMapAndMergeOutput_METHOD.isNoop()) { @@ -296,4 +304,19 @@ public static void addFailureListenerIfBarrierTask( shuffleClient.reportBarrierTaskFailure(appShuffleId, appShuffleIdentifier); }); } + + public static void cancelShuffle(int shuffleId, String reason) { + if (SparkContext$.MODULE$.getActive().nonEmpty()) { + DAGScheduler scheduler = SparkContext$.MODULE$.getActive().get().dagScheduler(); + scala.collection.mutable.Map shuffleIdToMapStageValue = + (scala.collection.mutable.Map) + shuffleIdToMapStage_FIELD.bind(scheduler).get(); + Option shuffleMapStage = shuffleIdToMapStageValue.get(shuffleId); + if (shuffleMapStage.nonEmpty()) { + scheduler.cancelStage(shuffleMapStage.get().id(), new Some<>(reason)); + } + } else { + LOG.error("Can not get active SparkContext, skip cancelShuffle."); + } + } } diff --git a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala index d3af38183ef..ed077bd5816 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, ApplicationLostResponse, HeartbeatFromApplication, HeartbeatFromApplicationResponse, ZERO_UUID} +import org.apache.celeborn.common.protocol.message.ControlMessages._ import org.apache.celeborn.common.protocol.message.StatusCode import org.apache.celeborn.common.util.{ThreadUtils, Utils} @@ -33,7 +33,8 @@ class ApplicationHeartbeater( conf: CelebornConf, masterClient: MasterClient, shuffleMetrics: () => (Long, Long), - workerStatusTracker: WorkerStatusTracker) extends Logging { + workerStatusTracker: WorkerStatusTracker, + cancelAllActiveStages: String => Unit) extends Logging { private var stopped = false @@ -68,6 +69,7 @@ class ApplicationHeartbeater( if (response.statusCode == StatusCode.SUCCESS) { logDebug("Successfully send app heartbeat.") workerStatusTracker.handleHeartbeatResponse(response) + checkQuotaExceeds(response.checkQuotaResponse) } } catch { case it: InterruptedException => @@ -97,7 +99,8 @@ class ApplicationHeartbeater( StatusCode.REQUEST_FAILED, List.empty.asJava, List.empty.asJava, - List.empty.asJava) + List.empty.asJava, + CheckQuotaResponse(isAvailable = true, "")) } } @@ -114,6 +117,12 @@ class ApplicationHeartbeater( } } + private def checkQuotaExceeds(response: CheckQuotaResponse): Unit = { + if (!response.isAvailable) { + cancelAllActiveStages(response.reason) + } + } + def stop(): Unit = { stopped.synchronized { if (!stopped) { diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 60721c160af..0e26fbcd133 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -22,9 +22,9 @@ import java.nio.ByteBuffer import java.security.SecureRandom import java.util import java.util.{function, List => JList} -import java.util.concurrent.{Callable, ConcurrentHashMap, LinkedBlockingQueue, ScheduledFuture, TimeUnit} +import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger -import java.util.function.Consumer +import java.util.function.{BiConsumer, Consumer} import scala.collection.JavaConverters._ import scala.collection.generic.CanBuildFrom @@ -54,7 +54,6 @@ import org.apache.celeborn.common.rpc.{ClientSaslContextBuilder, RpcSecurityCont import org.apache.celeborn.common.rpc.netty.{LocalNettyRpcCallContext, RemoteNettyRpcCallContext} import org.apache.celeborn.common.util.{JavaUtils, PbSerDeUtils, ThreadUtils, Utils} // Can Remove this if celeborn don't support scala211 in future -import org.apache.celeborn.common.util.FunctionConverter._ import org.apache.celeborn.common.util.ThreadUtils.awaitResult import org.apache.celeborn.common.util.Utils.UNKNOWN_APP_SHUFFLE_ID @@ -209,7 +208,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends conf, masterClient, () => commitManager.commitMetrics(), - workerStatusTracker) + workerStatusTracker, + reason => cancelAllActiveStages(reason)) private val changePartitionManager = new ChangePartitionManager(conf, this) private val releasePartitionManager = new ReleasePartitionManager(conf, this) @@ -1760,6 +1760,11 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends appShuffleDeterminateMap.put(appShuffleId, determinate) } + @volatile private var cancelShuffleCallback: Option[BiConsumer[Integer, String]] = None + def registerCancelShuffleCallback(callback: BiConsumer[Integer, String]): Unit = { + cancelShuffleCallback = Some(callback) + } + // Initialize at the end of LifecycleManager construction. initialize() @@ -1778,4 +1783,16 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends rnd.nextBytes(secretBytes) JavaUtils.bytesToString(ByteBuffer.wrap(secretBytes)) } + + def cancelAllActiveStages(reason: String): Unit = cancelShuffleCallback match { + case Some(c) => + shuffleAllocatedWorkers + .asScala + .keys + .filter(!commitManager.isStageEnd(_)) + .foreach(c.accept(_, reason)) + + case _ => + } + } diff --git a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala index 1f606dd3280..8187eac0bbe 100644 --- a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala +++ b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala @@ -95,7 +95,8 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite { StatusCode.SUCCESS, excludedWorkers, unknownWorkers, - shuttingWorkers) + shuttingWorkers, + null) } private def mockWorkers(workerHosts: Array[String]): util.ArrayList[WorkerInfo] = { diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 4dc4f148912..0a14b97b5f4 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -446,6 +446,7 @@ message PbHeartbeatFromApplicationResponse { repeated PbWorkerInfo excludedWorkers = 2; repeated PbWorkerInfo unknownWorkers = 3; repeated PbWorkerInfo shuttingWorkers = 4; + PbCheckQuotaResponse checkQuotaResponse = 5; } message PbCheckQuota { diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 5c345237c9e..68a4feb661f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -399,7 +399,8 @@ object ControlMessages extends Logging { statusCode: StatusCode, excludedWorkers: util.List[WorkerInfo], unknownWorkers: util.List[WorkerInfo], - shuttingWorkers: util.List[WorkerInfo]) extends Message + shuttingWorkers: util.List[WorkerInfo], + checkQuotaResponse: CheckQuotaResponse) extends Message case class CheckQuota(userIdentifier: UserIdentifier) extends Message @@ -796,7 +797,10 @@ object ControlMessages extends Logging { statusCode, excludedWorkers, unknownWorkers, - shuttingWorkers) => + shuttingWorkers, + checkQuotaResponse) => + val pbCheckQuotaResponse = PbCheckQuotaResponse.newBuilder().setAvailable( + checkQuotaResponse.isAvailable).setReason(checkQuotaResponse.reason) val payload = PbHeartbeatFromApplicationResponse.newBuilder() .setStatus(statusCode.getValue) .addAllExcludedWorkers( @@ -805,6 +809,7 @@ object ControlMessages extends Logging { unknownWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava) .addAllShuttingWorkers( shuttingWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava) + .setCheckQuotaResponse(pbCheckQuotaResponse) .build().toByteArray new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION_RESPONSE, payload) @@ -1180,6 +1185,7 @@ object ControlMessages extends Logging { case HEARTBEAT_FROM_APPLICATION_RESPONSE_VALUE => val pbHeartbeatFromApplicationResponse = PbHeartbeatFromApplicationResponse.parseFrom(message.getPayload) + val pbCheckQuotaResponse = pbHeartbeatFromApplicationResponse.getCheckQuotaResponse HeartbeatFromApplicationResponse( Utils.toStatusCode(pbHeartbeatFromApplicationResponse.getStatus), pbHeartbeatFromApplicationResponse.getExcludedWorkersList.asScala @@ -1187,7 +1193,8 @@ object ControlMessages extends Logging { pbHeartbeatFromApplicationResponse.getUnknownWorkersList.asScala .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava, pbHeartbeatFromApplicationResponse.getShuttingWorkersList.asScala - .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava) + .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava, + CheckQuotaResponse(pbCheckQuotaResponse.getAvailable, pbCheckQuotaResponse.getReason)) case CHECK_QUOTA_VALUE => val pbCheckAvailable = PbCheckQuota.parseFrom(message.getPayload) diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index d0623a20b3e..521c32b416e 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -1097,7 +1097,8 @@ private[celeborn] class Master( (statusSystem.excludedWorkers.asScala ++ statusSystem.manuallyExcludedWorkers.asScala).asJava), needCheckedWorkerList, new util.ArrayList[WorkerInfo]( - (statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava))) + (statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava), + CheckQuotaResponse(isAvailable = true, ""))) } else { context.reply(OneWayMessageResponse) } From e70211f6fa47c6ce96c5572bf90a76a3f92c397a Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Thu, 17 Oct 2024 17:23:13 +0800 Subject: [PATCH 2/3] [CELEBORN-1577][Phase2] QuotaManager should support interrupt shuffle. --- .../apache/celeborn/common/CelebornConf.scala | 75 ++- .../common/quota/ResourceConsumption.scala | 21 + .../quota/{Quota.scala => StorageQuota.scala} | 6 +- .../clustermeta/AbstractMetaManager.java | 11 +- .../service/deploy/master/Master.scala | 76 +-- .../service/deploy/master/MasterSource.scala | 2 + .../deploy/master/quota/QuotaManager.scala | 293 +++++++--- .../deploy/master/quota/QuotaStatus.scala | 30 ++ .../test/resources/dynamicConfig-quota-2.yaml | 38 ++ .../test/resources/dynamicConfig-quota.yaml | 1 + .../master/quota/QuotaManagerSuite.scala | 505 +++++++++++++++++- .../common/service/config/DynamicConfig.java | 64 ++- .../service/deploy/worker/Worker.scala | 8 +- .../worker/storage/StorageManager.scala | 17 +- 14 files changed, 965 insertions(+), 182 deletions(-) rename common/src/main/scala/org/apache/celeborn/common/quota/{Quota.scala => StorageQuota.scala} (90%) create mode 100644 master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala create mode 100644 master/src/test/resources/dynamicConfig-quota-2.yaml diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 290ce60f92e..ca63442e074 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -661,6 +661,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def estimatedPartitionSizeForEstimationUpdateInterval: Long = get(ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL) def masterResourceConsumptionInterval: Long = get(MASTER_RESOURCE_CONSUMPTION_INTERVAL) + def masterUserDiskUsageThreshold: Long = get(MASTER_USER_DISK_USAGE_THRESHOLD) + def masterClusterDiskUsageThreshold: Long = get(MASTER_CLUSTER_DISK_USAGE_THRESHOLD) def clusterName: String = get(CLUSTER_NAME) // ////////////////////////////////////////////////////// @@ -1061,6 +1063,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def registerShuffleFilterExcludedWorkerEnabled: Boolean = get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED) + def interruptShuffleEnabled: Boolean = get(QUOTA_INTERRUPT_SHUFFLE_ENABLED) + // ////////////////////////////////////////////////////// // Worker // // ////////////////////////////////////////////////////// @@ -2841,6 +2845,26 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("30s") + val MASTER_USER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] = + buildConf("celeborn.master.userResourceConsumption.user.threshold") + .categories("master") + .doc("When user resource consumption exceeds quota, Master will " + + "interrupt some apps until user resource consumption is less " + + "than this value. Default value is Long.MaxValue which means disable check.") + .version("0.6.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Long.MaxValue) + + val MASTER_CLUSTER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] = + buildConf("celeborn.master.userResourceConsumption.cluster.threshold") + .categories("master") + .doc("When cluster resource consumption exceeds quota, Master will " + + "interrupt some apps until cluster resource consumption is less " + + "than this value. Default value is Long.MaxValue which means disable check.") + .version("0.6.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Long.MaxValue) + val CLUSTER_NAME: ConfigEntry[String] = buildConf("celeborn.cluster.name") .categories("master", "worker") @@ -5185,7 +5209,7 @@ object CelebornConf extends Logging { .dynamic .doc("Quota dynamic configuration for written disk bytes.") .version("0.5.0") - .longConf + .bytesConf(ByteUnit.BYTE) .createWithDefault(Long.MaxValue) val QUOTA_DISK_FILE_COUNT: ConfigEntry[Long] = @@ -5203,7 +5227,7 @@ object CelebornConf extends Logging { .dynamic .doc("Quota dynamic configuration for written hdfs bytes.") .version("0.5.0") - .longConf + .bytesConf(ByteUnit.BYTE) .createWithDefault(Long.MaxValue) val QUOTA_HDFS_FILE_COUNT: ConfigEntry[Long] = @@ -5765,4 +5789,51 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] = + buildConf("celeborn.quota.cluster.diskBytesWritten") + .categories("quota") + .dynamic + .doc("Quota dynamic configuration for cluster written disk bytes.") + .version("0.6.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Long.MaxValue) + + val QUOTA_CLUSTER_DISK_FILE_COUNT: ConfigEntry[Long] = + buildConf("celeborn.quota.cluster.diskFileCount") + .categories("quota") + .dynamic + .doc("Quota dynamic configuration for cluster written disk file count.") + .version("0.6.0") + .longConf + .createWithDefault(Long.MaxValue) + + val QUOTA_CLUSTER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] = + buildConf("celeborn.quota.cluster.hdfsBytesWritten") + .categories("quota") + .dynamic + .doc("Quota dynamic configuration for cluster written hdfs bytes.") + .version("0.6.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Long.MaxValue) + + val QUOTA_CLUSTER_HDFS_FILE_COUNT: ConfigEntry[Long] = + buildConf("celeborn.quota.cluster.hdfsFileCount") + .categories("quota") + .dynamic + .doc("Quota dynamic configuration for cluster written hdfs file count.") + .version("0.6.0") + .longConf + .createWithDefault(Long.MaxValue) + + val QUOTA_INTERRUPT_SHUFFLE_ENABLED: ConfigEntry[Boolean] = { + buildConf("celeborn.quota.interruptShuffle.enabled") + .categories("quota") + .dynamic + .doc("If enabled, the resource consumption used by the tenant exceeds " + + "celeborn.quota.tenant.xx, or the resource consumption of the entire cluster " + + "exceeds celeborn.quota.cluster.xx, some shuffles will be selected and interrupted.") + .version("0.6.0") + .booleanConf + .createWithDefault(false) + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala b/common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala index 10d1114b985..d454bb894c2 100644 --- a/common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala +++ b/common/src/main/scala/org/apache/celeborn/common/quota/ResourceConsumption.scala @@ -30,6 +30,12 @@ case class ResourceConsumption( hdfsFileCount: Long, var subResourceConsumptions: util.Map[String, ResourceConsumption] = null) { + def withSubResourceConsumptions( + resourceConsumptions: util.Map[String, ResourceConsumption]): ResourceConsumption = { + subResourceConsumptions = resourceConsumptions + this + } + def add(other: ResourceConsumption): ResourceConsumption = { ResourceConsumption( diskBytesWritten + other.diskBytesWritten, @@ -38,6 +44,14 @@ case class ResourceConsumption( hdfsFileCount + other.hdfsFileCount) } + def subtract(other: ResourceConsumption): ResourceConsumption = { + ResourceConsumption( + diskBytesWritten - other.diskBytesWritten, + diskFileCount - other.diskFileCount, + hdfsBytesWritten - other.hdfsBytesWritten, + hdfsFileCount - other.hdfsFileCount) + } + def addSubResourceConsumptions(otherSubResourceConsumptions: Map[ String, ResourceConsumption]): Map[String, ResourceConsumption] = { @@ -77,4 +91,11 @@ case class ResourceConsumption( s" hdfsFileCount: $hdfsFileCount," + s" subResourceConsumptions: $subResourceConsumptionString)" } + + def simpleString: String = { + s"ResourceConsumption(diskBytesWritten: ${Utils.bytesToString(diskBytesWritten)}," + + s" diskFileCount: $diskFileCount," + + s" hdfsBytesWritten: ${Utils.bytesToString(hdfsBytesWritten)}," + + s" hdfsFileCount: $hdfsFileCount)" + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala b/common/src/main/scala/org/apache/celeborn/common/quota/StorageQuota.scala similarity index 90% rename from common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala rename to common/src/main/scala/org/apache/celeborn/common/quota/StorageQuota.scala index 8a845225821..1a7a8e52abf 100644 --- a/common/src/main/scala/org/apache/celeborn/common/quota/Quota.scala +++ b/common/src/main/scala/org/apache/celeborn/common/quota/StorageQuota.scala @@ -20,7 +20,7 @@ package org.apache.celeborn.common.quota import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.util.Utils -case class Quota( +case class StorageQuota( diskBytesWritten: Long, diskFileCount: Long, hdfsBytesWritten: Long, @@ -34,3 +34,7 @@ case class Quota( s"]" } } + +object StorageQuota { + val DEFAULT_QUOTA = StorageQuota(Long.MaxValue, Long.MaxValue, Long.MaxValue, Long.MaxValue) +} diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 1631e90d34c..cc48cd0ab5c 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -22,10 +22,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Files; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -504,4 +501,10 @@ public boolean isWorkerAvailable(WorkerInfo workerInfo) { public void updateApplicationMeta(ApplicationMeta applicationMeta) { applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta); } + + public List workerSnapshot() { + synchronized (workers) { + return new ArrayList<>(workers); + } + } } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 521c32b416e..ee14d81aed5 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -184,7 +184,12 @@ private[celeborn] class Master( private val hasHDFSStorage = conf.hasHDFSStorage private val hasS3Storage = conf.hasS3Storage - private val quotaManager = new QuotaManager(conf, configService) + private val quotaManager = new QuotaManager( + statusSystem, + masterSource, + resourceConsumptionSource, + conf, + configService) private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval private val userResourceConsumptions = JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]() @@ -1098,7 +1103,7 @@ private[celeborn] class Master( needCheckedWorkerList, new util.ArrayList[WorkerInfo]( (statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava), - CheckQuotaResponse(isAvailable = true, ""))) + quotaManager.checkApplicationQuotaStatus(appId))) } else { context.reply(OneWayMessageResponse) } @@ -1114,76 +1119,11 @@ private[celeborn] class Master( } } - private def handleResourceConsumption(userIdentifier: UserIdentifier): ResourceConsumption = { - val userResourceConsumption = computeUserResourceConsumption(userIdentifier) - gaugeResourceConsumption(userIdentifier) - userResourceConsumption - } - - private def gaugeResourceConsumption( - userIdentifier: UserIdentifier, - applicationId: String = null): Unit = { - val resourceConsumptionLabel = - if (applicationId == null) userIdentifier.toMap - else userIdentifier.toMap + (resourceConsumptionSource.applicationLabel -> applicationId) - resourceConsumptionSource.addGauge( - ResourceConsumptionSource.DISK_FILE_COUNT, - resourceConsumptionLabel) { () => - computeResourceConsumption(userIdentifier, applicationId).diskFileCount - } - resourceConsumptionSource.addGauge( - ResourceConsumptionSource.DISK_BYTES_WRITTEN, - resourceConsumptionLabel) { () => - computeResourceConsumption(userIdentifier, applicationId).diskBytesWritten - } - resourceConsumptionSource.addGauge( - ResourceConsumptionSource.HDFS_FILE_COUNT, - resourceConsumptionLabel) { () => - computeResourceConsumption(userIdentifier, applicationId).hdfsFileCount - } - resourceConsumptionSource.addGauge( - ResourceConsumptionSource.HDFS_BYTES_WRITTEN, - resourceConsumptionLabel) { () => - computeResourceConsumption(userIdentifier, applicationId).hdfsBytesWritten - } - } - - private def computeResourceConsumption( - userIdentifier: UserIdentifier, - applicationId: String = null): ResourceConsumption = { - val newResourceConsumption = computeUserResourceConsumption(userIdentifier) - if (applicationId == null) { - val current = System.currentTimeMillis() - if (userResourceConsumptions.containsKey(userIdentifier)) { - val resourceConsumptionAndUpdateTime = userResourceConsumptions.get(userIdentifier) - if (current - resourceConsumptionAndUpdateTime._2 <= masterResourceConsumptionInterval) { - return resourceConsumptionAndUpdateTime._1 - } - } - userResourceConsumptions.put(userIdentifier, (newResourceConsumption, current)) - newResourceConsumption - } else { - newResourceConsumption.subResourceConsumptions.get(applicationId) - } - } - - // TODO: Support calculate topN app resource consumption. - private def computeUserResourceConsumption( - userIdentifier: UserIdentifier): ResourceConsumption = { - val resourceConsumption = statusSystem.workers.asScala.flatMap { - workerInfo => workerInfo.userResourceConsumption.asScala.get(userIdentifier) - }.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _) - resourceConsumption - } - private[master] def handleCheckQuota( userIdentifier: UserIdentifier, context: RpcCallContext): Unit = { - val userResourceConsumption = handleResourceConsumption(userIdentifier) if (conf.quotaEnabled) { - val (isAvailable, reason) = - quotaManager.checkQuotaSpaceAvailable(userIdentifier, userResourceConsumption) - context.reply(CheckQuotaResponse(isAvailable, reason)) + context.reply(quotaManager.checkUserQuotaStatus(userIdentifier)) } else { context.reply(CheckQuotaResponse(true, "")) } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala index b2e72524486..6cc97c136f2 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala @@ -60,4 +60,6 @@ object MasterSource { // Capacity val DEVICE_CELEBORN_FREE_CAPACITY = "DeviceCelebornFreeBytes" val DEVICE_CELEBORN_TOTAL_CAPACITY = "DeviceCelebornTotalBytes" + + val UPDATE_RESOURCE_CONSUMPTION_TIME = "UpdateResourceConsumptionTime" } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala index a5d446368a6..9a9b49a4a11 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala @@ -16,96 +16,259 @@ */ package org.apache.celeborn.service.deploy.master.quota +import java.util.{Map => JMap} +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.quota.{Quota, ResourceConsumption} -import org.apache.celeborn.common.util.Utils +import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource +import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource._ +import org.apache.celeborn.common.protocol.message.ControlMessages.CheckQuotaResponse +import org.apache.celeborn.common.quota.{ResourceConsumption, StorageQuota} +import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils} import org.apache.celeborn.server.common.service.config.ConfigService +import org.apache.celeborn.service.deploy.master.MasterSource +import org.apache.celeborn.service.deploy.master.MasterSource.UPDATE_RESOURCE_CONSUMPTION_TIME +import org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager +import org.apache.celeborn.service.deploy.master.quota.QuotaStatus._ -class QuotaManager(celebornConf: CelebornConf, configService: ConfigService) extends Logging { - val DEFAULT_QUOTA = Quota( - celebornConf.get(CelebornConf.QUOTA_DISK_BYTES_WRITTEN), - celebornConf.get(CelebornConf.QUOTA_DISK_FILE_COUNT), - celebornConf.get(CelebornConf.QUOTA_HDFS_BYTES_WRITTEN), - celebornConf.get(CelebornConf.QUOTA_HDFS_FILE_COUNT)) - def getQuota(userIdentifier: UserIdentifier): Quota = { - if (configService != null) { - val config = - configService.getTenantUserConfigFromCache(userIdentifier.tenantId, userIdentifier.name) - config.getQuota - } else { - DEFAULT_QUOTA - } +class QuotaManager( + statusSystem: AbstractMetaManager, + masterSource: MasterSource, + resourceConsumptionSource: ResourceConsumptionSource, + celebornConf: CelebornConf, + configService: ConfigService) extends Logging { + + val userQuotaStatus: JMap[UserIdentifier, QuotaStatus] = JavaUtils.newConcurrentHashMap() + val appQuotaStatus: JMap[String, QuotaStatus] = JavaUtils.newConcurrentHashMap() + val userResourceConsumptionMap: JMap[UserIdentifier, ResourceConsumption] = + JavaUtils.newConcurrentHashMap() + private val quotaChecker = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-quota-checker") + quotaChecker.scheduleWithFixedDelay( + () => { + try { + updateResourceConsumption() + } catch { + case t: Throwable => logError("Update user resource consumption failed.", t) + } + }, + 0L, + celebornConf.masterResourceConsumptionInterval, + TimeUnit.MILLISECONDS) + + def handleAppLost(appId: String): Unit = { + appQuotaStatus.remove(appId) + } + + def checkUserQuotaStatus(userIdentifier: UserIdentifier): CheckQuotaResponse = { + val userStatus = userQuotaStatus.getOrDefault(userIdentifier, QuotaStatus()) + CheckQuotaResponse(!userStatus.exceed, userStatus.exceedReason) + } + + def checkApplicationQuotaStatus(applicationId: String): CheckQuotaResponse = { + val status = appQuotaStatus.getOrDefault(applicationId, QuotaStatus()) + CheckQuotaResponse(!status.exceed, status.exceedReason) + } + + def getUserStorageQuota(user: UserIdentifier): StorageQuota = { + Option(configService) + .map(_.getTenantUserConfigFromCache(user.tenantId, user.name).getTenantStorageQuota) + .getOrElse(StorageQuota.DEFAULT_QUOTA) + } + + def getClusterStorageQuota: StorageQuota = { + Option(configService) + .map(_.getSystemConfigFromCache.getClusterStorageQuota) + .getOrElse(StorageQuota.DEFAULT_QUOTA) } - def checkQuotaSpaceAvailable( - userIdentifier: UserIdentifier, - resourceResumption: ResourceConsumption): (Boolean, String) = { - val quota = getQuota(userIdentifier) + private def interruptShuffleEnabled: Boolean = { + Option(configService) + .map(_.getSystemConfigFromCache.interruptShuffleEnabled()) + .getOrElse(celebornConf.interruptShuffleEnabled) + } + + private def checkUserQuotaSpace( + user: UserIdentifier, + consumption: ResourceConsumption): QuotaStatus = { + val quota = getUserStorageQuota(user) + checkQuotaSpace(s"$USER_EXHAUSTED user: $user. ", consumption, quota) + } + + private def checkClusterQuotaSpace( + consumption: ResourceConsumption, + quota: StorageQuota): QuotaStatus = { + checkQuotaSpace(CLUSTER_EXHAUSTED, consumption, quota) + } + private def checkQuotaSpace( + reason: String, + consumption: ResourceConsumption, + quota: StorageQuota): QuotaStatus = { val checkResults = Seq( - checkDiskBytesWritten(userIdentifier, resourceResumption.diskBytesWritten, quota), - checkDiskFileCount(userIdentifier, resourceResumption.diskFileCount, quota), - checkHdfsBytesWritten(userIdentifier, resourceResumption.hdfsBytesWritten, quota), - checkHdfsFileCount(userIdentifier, resourceResumption.hdfsFileCount, quota)) + checkQuota( + consumption.diskBytesWritten, + quota.diskBytesWritten, + "DISK_BYTES_WRITTEN", + Utils.bytesToString), + checkQuota( + consumption.diskFileCount, + quota.diskFileCount, + "DISK_FILE_COUNT", + _.toString), + checkQuota( + consumption.hdfsBytesWritten, + quota.hdfsBytesWritten, + "HDFS_BYTES_WRITTEN", + Utils.bytesToString), + checkQuota( + consumption.hdfsFileCount, + quota.hdfsFileCount, + "HDFS_FILE_COUNT", + _.toString)) val exceed = checkResults.foldLeft(false)(_ || _._1) - val reason = checkResults.foldLeft("")(_ + _._2) - (!exceed, reason) + val exceedReason = + if (exceed) { + s"$reason ${checkResults.foldLeft("")(_ + _._2)}" + } else { + "" + } + QuotaStatus(exceed, exceedReason) } - private def checkDiskBytesWritten( - userIdentifier: UserIdentifier, + private def checkQuota( value: Long, - quota: Quota): (Boolean, String) = { - val exceed = (quota.diskBytesWritten > 0 && value >= quota.diskBytesWritten) + quota: Long, + quotaType: String, + format: Long => String): (Boolean, String) = { + val exceed = quota > 0 && value >= quota var reason = "" if (exceed) { - reason = s"User $userIdentifier used diskBytesWritten (${Utils.bytesToString(value)}) " + - s"exceeds quota (${Utils.bytesToString(quota.diskBytesWritten)}). " + reason = s"$quotaType(${format(value)}) exceeds quota(${format(quota)}). " logWarning(reason) } (exceed, reason) } - private def checkDiskFileCount( - userIdentifier: UserIdentifier, - value: Long, - quota: Quota): (Boolean, String) = { - val exceed = (quota.diskFileCount > 0 && value >= quota.diskFileCount) - var reason = "" - if (exceed) { - reason = - s"User $userIdentifier used diskFileCount($value) exceeds quota(${quota.diskFileCount}). " - logWarning(reason) + private def checkConsumptionExceeded( + used: ResourceConsumption, + threshold: StorageQuota): Boolean = { + used.diskBytesWritten >= threshold.diskBytesWritten || + used.diskFileCount >= threshold.diskFileCount || + used.hdfsBytesWritten >= threshold.hdfsBytesWritten || + used.hdfsFileCount >= threshold.hdfsFileCount + } + + def updateResourceConsumption(): Unit = { + masterSource.sample(UPDATE_RESOURCE_CONSUMPTION_TIME, this.getClass.getSimpleName, Map.empty) { + val clusterQuota = getClusterStorageQuota + var clusterResourceConsumption = ResourceConsumption(0, 0, 0, 0) + val userResourceConsumption = statusSystem.workerSnapshot.asScala.flatMap { workerInfo => + workerInfo.userResourceConsumption.asScala + }.groupBy(_._1).map { case (userIdentifier, userConsumptionList) => + // Step 1: Compute user consumption and set quota status. + val resourceConsumptionList = userConsumptionList.map(_._2) + val resourceConsumption = computeUserResourceConsumption(resourceConsumptionList) + userQuotaStatus.put( + userIdentifier, + checkUserQuotaSpace(userIdentifier, resourceConsumption)) + + // Step 2: Update user resource consumption metrics. + // For extract metrics + userResourceConsumptionMap.put(userIdentifier, resourceConsumption) + registerUserResourceConsumptionMetrics(userIdentifier) + + // Step 3: Expire user level exceeded app except already expired app + clusterResourceConsumption = clusterResourceConsumption.add(resourceConsumption) + val userQuota = getUserStorageQuota(userIdentifier) + if (interruptShuffleEnabled && checkConsumptionExceeded(resourceConsumption, userQuota)) { + val subResourceConsumptions = computeSubAppConsumption(resourceConsumptionList) + // Compute expired size + val (expired, notExpired) = subResourceConsumptions.partition { case (app, _) => + appQuotaStatus.containsKey(app) + } + val userConsumptions = expired.values.foldLeft(resourceConsumption)(_.subtract(_)) + expireApplication(userConsumptions, userQuota, notExpired.toSeq, USER_EXHAUSTED) + (Option(subResourceConsumptions), resourceConsumptionList) + } else { + (None, resourceConsumptionList) + } + } + + // Step 4: Expire cluster level exceeded app except already expired app + if (interruptShuffleEnabled && + checkClusterQuotaSpace(clusterResourceConsumption, clusterQuota).exceed) { + val appConsumptions = userResourceConsumption.map { + case (None, subConsumptionList) => computeSubAppConsumption(subConsumptionList) + case (Some(subConsumptions), _) => subConsumptions + }.flatMap(_.toSeq).toSeq + + // Compute nonExpired app total usage + val (expired, notExpired) = appConsumptions.partition { case (app, _) => + appQuotaStatus.containsKey(app) + } + clusterResourceConsumption = + expired.map(_._2).foldLeft(clusterResourceConsumption)(_.subtract(_)) + expireApplication(clusterResourceConsumption, clusterQuota, notExpired, CLUSTER_EXHAUSTED) + } } - (exceed, reason) } - private def checkHdfsBytesWritten( - userIdentifier: UserIdentifier, - value: Long, - quota: Quota): (Boolean, String) = { - val exceed = (quota.hdfsBytesWritten > 0 && value >= quota.hdfsBytesWritten) - var reason = "" - if (exceed) { - reason = s"User $userIdentifier used hdfsBytesWritten(${Utils.bytesToString(value)}) " + - s"exceeds quota(${Utils.bytesToString(quota.hdfsBytesWritten)}). " - logWarning(reason) + private def expireApplication( + used: ResourceConsumption, + threshold: StorageQuota, + appMap: Seq[(String, ResourceConsumption)], + expireReason: String): Unit = { + var nonExpired = used + if (checkConsumptionExceeded(used, threshold)) { + val sortedConsumption = + appMap.sortBy(_._2)(Ordering.by((r: ResourceConsumption) => + ( + r.diskBytesWritten, + r.diskFileCount, + r.hdfsBytesWritten, + r.hdfsFileCount)).reverse) + for ((appId, consumption) <- sortedConsumption + if checkConsumptionExceeded(nonExpired, threshold)) { + val reason = s"$expireReason Used: ${consumption.simpleString}, Threshold: $threshold" + appQuotaStatus.put(appId, QuotaStatus(exceed = true, reason)) + nonExpired = nonExpired.subtract(consumption) + } } - (exceed, reason) } - private def checkHdfsFileCount( - userIdentifier: UserIdentifier, - value: Long, - quota: Quota): (Boolean, String) = { - val exceed = (quota.hdfsFileCount > 0 && value >= quota.hdfsFileCount) - var reason = "" - if (exceed) { - reason = - s"User $userIdentifier used hdfsFileCount($value) exceeds quota(${quota.hdfsFileCount}). " - logWarning(reason) + private def computeUserResourceConsumption( + consumptions: Seq[ResourceConsumption]): ResourceConsumption = { + consumptions.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _) + } + + private def computeSubAppConsumption( + resourceConsumptionList: Seq[ResourceConsumption]): Map[String, ResourceConsumption] = { + resourceConsumptionList.foldRight(Map.empty[String, ResourceConsumption]) { + case (consumption, subConsumption) => + consumption.addSubResourceConsumptions(subConsumption) + } + } + + private def getResourceConsumption(userIdentifier: UserIdentifier): ResourceConsumption = { + userResourceConsumptionMap.getOrDefault(userIdentifier, ResourceConsumption(0, 0, 0, 0)) + } + + private def registerUserResourceConsumptionMetrics(userIdentifier: UserIdentifier): Unit = { + resourceConsumptionSource.addGauge(DISK_FILE_COUNT, userIdentifier.toMap) { () => + getResourceConsumption(userIdentifier).diskBytesWritten + } + resourceConsumptionSource.addGauge(DISK_BYTES_WRITTEN, userIdentifier.toMap) { () => + getResourceConsumption(userIdentifier).diskBytesWritten + } + resourceConsumptionSource.addGauge(HDFS_FILE_COUNT, userIdentifier.toMap) { () => + getResourceConsumption(userIdentifier).hdfsFileCount + } + resourceConsumptionSource.addGauge(HDFS_BYTES_WRITTEN, userIdentifier.toMap) { () => + getResourceConsumption(userIdentifier).hdfsBytesWritten } - (exceed, reason) } } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala new file mode 100644 index 00000000000..561ae7fe793 --- /dev/null +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaStatus.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.master.quota + +import QuotaStatus._ + +case class QuotaStatus(exceed: Boolean = false, exceedReason: String = NORMAL) + +object QuotaStatus { + val NORMAL: String = "" + val CLUSTER_EXHAUSTED: String = + "Interrupt application caused by the cluster storage usage reach threshold." + val USER_EXHAUSTED: String = + "Interrupt or reject application caused by the user storage usage reach threshold." +} diff --git a/master/src/test/resources/dynamicConfig-quota-2.yaml b/master/src/test/resources/dynamicConfig-quota-2.yaml new file mode 100644 index 00000000000..fd5e27aaa82 --- /dev/null +++ b/master/src/test/resources/dynamicConfig-quota-2.yaml @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- level: SYSTEM + config: + celeborn.quota.tenant.diskBytesWritten: 1000G + celeborn.quota.tenant.diskFileCount: 100 + celeborn.quota.tenant.hdfsBytesWritten: 1G + celeborn.quota.cluster.diskBytesWritten: 130G + celeborn.quota.interruptShuffle.enabled: true + +- tenantId: tenant_01 + level: TENANT + config: + celeborn.quota.tenant.diskBytesWritten: 10G + celeborn.quota.tenant.diskFileCount: 1000 + celeborn.quota.tenant.hdfsBytesWritten: 10G + users: + - name: Jerry + config: + celeborn.quota.tenant.diskBytesWritten: 100G + celeborn.quota.tenant.diskFileCount: 10000 + celeborn.master.userResourceConsumption.user.threshold: 120G + + diff --git a/master/src/test/resources/dynamicConfig-quota.yaml b/master/src/test/resources/dynamicConfig-quota.yaml index 156a3f692b4..8a7d33d03f9 100644 --- a/master/src/test/resources/dynamicConfig-quota.yaml +++ b/master/src/test/resources/dynamicConfig-quota.yaml @@ -19,6 +19,7 @@ celeborn.quota.tenant.diskBytesWritten: 1G celeborn.quota.tenant.diskFileCount: 100 celeborn.quota.tenant.hdfsBytesWritten: 1G + celeborn.quota.interruptShuffle.enabled: true - tenantId: tenant_01 level: TENANT diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala index 5e5e93017ba..bd29815f338 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManagerSuite.scala @@ -19,16 +19,26 @@ package org.apache.celeborn.service.deploy.master.quota import java.io.File -import org.junit.Assert.assertEquals +import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} +import scala.util.Random + +import org.junit.Assert.{assertEquals, assertFalse, assertTrue} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.funsuite.AnyFunSuite import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.quota.{Quota, ResourceConsumption} +import org.apache.celeborn.common.meta.WorkerInfo +import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource +import org.apache.celeborn.common.protocol.TransportModuleConstants +import org.apache.celeborn.common.protocol.message.ControlMessages.CheckQuotaResponse +import org.apache.celeborn.common.quota.{ResourceConsumption, StorageQuota} +import org.apache.celeborn.common.rpc.RpcEnv import org.apache.celeborn.common.util.Utils -import org.apache.celeborn.server.common.service.config.DynamicConfigServiceFactory +import org.apache.celeborn.server.common.service.config.{ConfigService, DynamicConfigServiceFactory, FsConfigServiceImpl} +import org.apache.celeborn.service.deploy.master.MasterSource +import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager class QuotaManagerSuite extends AnyFunSuite with BeforeAndAfterAll @@ -36,35 +46,77 @@ class QuotaManagerSuite extends AnyFunSuite with Logging { protected var quotaManager: QuotaManager = _ + private var resourceConsumptionSource: ResourceConsumptionSource = _ + + val worker = new WorkerInfo( + "localhost", + 10001, + 10002, + 10003, + 10004) + + val conf = new CelebornConf() + + var configService: ConfigService = _ + // helper function final protected def getTestResourceFile(file: String): File = { new File(getClass.getClassLoader.getResource(file).getFile) } override def beforeAll(): Unit = { - val conf = new CelebornConf() conf.set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "FS") conf.set( CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH.key, getTestResourceFile("dynamicConfig-quota.yaml").getPath) - quotaManager = new QuotaManager(conf, DynamicConfigServiceFactory.getConfigService(conf)) + val statusSystem = new SingleMasterMetaManager( + RpcEnv.create( + "test-rpc", + TransportModuleConstants.RPC_SERVICE_MODULE, + "localhost", + 9001, + conf, + None), + conf) + statusSystem.workers.add(worker) + resourceConsumptionSource = new ResourceConsumptionSource(conf, "Master") + configService = DynamicConfigServiceFactory.getConfigService(conf) + quotaManager = new QuotaManager( + statusSystem, + new MasterSource(conf), + resourceConsumptionSource, + conf, + configService) } test("test celeborn quota conf") { + configService.refreshCache() assertEquals( - quotaManager.getQuota(UserIdentifier("tenant_01", "Jerry")), - Quota(Utils.byteStringAsBytes("100G"), 10000, Utils.byteStringAsBytes("10G"), Long.MaxValue)) + quotaManager.getUserStorageQuota(UserIdentifier("tenant_01", "Jerry")), + StorageQuota( + Utils.byteStringAsBytes("100G"), + 10000, + Utils.byteStringAsBytes("10G"), + Long.MaxValue)) // Fallback to tenant level assertEquals( - quotaManager.getQuota(UserIdentifier("tenant_01", "name_not_exist")), - Quota(Utils.byteStringAsBytes("10G"), 1000, Utils.byteStringAsBytes("10G"), Long.MaxValue)) + quotaManager.getUserStorageQuota(UserIdentifier("tenant_01", "name_not_exist")), + StorageQuota( + Utils.byteStringAsBytes("10G"), + 1000, + Utils.byteStringAsBytes("10G"), + Long.MaxValue)) // Fallback to system level assertEquals( - quotaManager.getQuota(UserIdentifier("tenant_not_exist", "Tom")), - Quota(Utils.byteStringAsBytes("1G"), 100, Utils.byteStringAsBytes("1G"), Long.MaxValue)) + quotaManager.getUserStorageQuota(UserIdentifier("tenant_not_exist", "Tom")), + StorageQuota( + Utils.byteStringAsBytes("1G"), + 100, + Utils.byteStringAsBytes("1G"), + Long.MaxValue)) } - test("test check quota return result") { + test("test check user quota return result") { val user = UserIdentifier("tenant_01", "Jerry") val rc1 = ResourceConsumption(Utils.byteStringAsBytes("10G"), 20, Utils.byteStringAsBytes("1G"), 40) @@ -77,22 +129,431 @@ class QuotaManagerSuite extends AnyFunSuite Utils.byteStringAsBytes("30G"), 40) - val res1 = quotaManager.checkQuotaSpaceAvailable(user, rc1) - val res2 = quotaManager.checkQuotaSpaceAvailable(user, rc2) - val res3 = quotaManager.checkQuotaSpaceAvailable(user, rc3) + addUserConsumption(user, rc1) + quotaManager.updateResourceConsumption() + val res1 = checkUserQuota(user) - val exp1 = (true, "") - val exp2 = ( + addUserConsumption(user, rc2) + quotaManager.updateResourceConsumption() + val res2 = checkUserQuota(user) + + addUserConsumption(user, rc3) + quotaManager.updateResourceConsumption() + val res3 = checkUserQuota(user) + + val exp1 = CheckQuotaResponse(true, "") + val exp2 = CheckQuotaResponse( false, - s"User $user used hdfsBytesWritten(30.0 GiB) exceeds quota(10.0 GiB). ") - val exp3 = ( + s"Interrupt or reject application caused by the user storage usage reach threshold. " + + s"user: `tenant_01`.`Jerry`. " + + s"HDFS_BYTES_WRITTEN(30.0 GiB) exceeds quota(10.0 GiB). ") + val exp3 = CheckQuotaResponse( false, - s"User $user used diskBytesWritten (200.0 GiB) exceeds quota (100.0 GiB). " + - s"User $user used diskFileCount(20000) exceeds quota(10000). " + - s"User $user used hdfsBytesWritten(30.0 GiB) exceeds quota(10.0 GiB). ") + s"Interrupt or reject application caused by the user storage usage reach threshold. " + + s"user: `tenant_01`.`Jerry`. " + + s"DISK_BYTES_WRITTEN(200.0 GiB) exceeds quota(100.0 GiB). " + + s"DISK_FILE_COUNT(20000) exceeds quota(10000). " + + s"HDFS_BYTES_WRITTEN(30.0 GiB) exceeds quota(10.0 GiB). ") assert(res1 == exp1) assert(res2 == exp2) assert(res3 == exp3) + clearUserConsumption() + } + + test("test check application quota return result") { + val user = UserIdentifier("tenant_01", "Jerry") + var rc = + ResourceConsumption( + Utils.byteStringAsBytes("200G"), + 20000, + Utils.byteStringAsBytes("30G"), + 40) + rc.withSubResourceConsumptions( + Map( + "app1" -> ResourceConsumption( + Utils.byteStringAsBytes("150G"), + 15000, + Utils.byteStringAsBytes("25G"), + 20), + "app2" -> ResourceConsumption( + Utils.byteStringAsBytes("50G"), + 5000, + Utils.byteStringAsBytes("5G"), + 20)).asJava) + + addUserConsumption(user, rc) + conf.set("celeborn.quota.cluster.diskBytesWritten", "60gb") + configService.refreshCache() + quotaManager.updateResourceConsumption() + var res1 = checkUserQuota(user) + var res2 = checkApplicationQuota(user, "app1") + var res3 = checkApplicationQuota(user, "app2") + + val succeed = CheckQuotaResponse(true, "") + val failed = CheckQuotaResponse( + false, + s"Interrupt or reject application caused by the user storage usage reach threshold. " + + s"user: `tenant_01`.`Jerry`. " + + s"DISK_BYTES_WRITTEN(200.0 GiB) exceeds quota(100.0 GiB). " + + s"DISK_FILE_COUNT(20000) exceeds quota(10000). " + + s"HDFS_BYTES_WRITTEN(30.0 GiB) exceeds quota(10.0 GiB). ") + assert(res1 == failed) + assert(res2 == CheckQuotaResponse( + false, + "Interrupt or reject application caused by the user storage usage reach threshold. " + + "Used: " + + "ResourceConsumption(" + + "diskBytesWritten: 150.0 GiB, " + + "diskFileCount: 15000, " + + "hdfsBytesWritten: 25.0 GiB, " + + "hdfsFileCount: 20), " + + "Threshold: " + + "Quota[" + + "diskBytesWritten=100.0 GiB, " + + "diskFileCount=10000, " + + "hdfsBytesWritten=10.0 GiB, " + + "hdfsFileCount=9223372036854775807]")) + assert(res3 == succeed) + + conf.set("celeborn.quota.cluster.diskBytesWritten", "50gb") + configService.refreshCache() + quotaManager.updateResourceConsumption() + res1 = checkUserQuota(user) + res2 = checkApplicationQuota(user, "app1") + res3 = checkApplicationQuota(user, "app2") + + assert(res1 == failed) + assert(res2 == CheckQuotaResponse( + false, + "Interrupt or reject application caused by the user storage usage reach threshold. " + + "Used: ResourceConsumption(" + + "diskBytesWritten: 150.0 GiB, " + + "diskFileCount: 15000, " + + "hdfsBytesWritten: 25.0 GiB, " + + "hdfsFileCount: 20), " + + "Threshold: Quota[" + + "diskBytesWritten=100.0 GiB, " + + "diskFileCount=10000, " + + "hdfsBytesWritten=10.0 GiB, " + + "hdfsFileCount=9223372036854775807]")) + assert(res3 == CheckQuotaResponse( + false, + "Interrupt application caused by the cluster storage usage reach threshold. " + + "Used: ResourceConsumption(" + + "diskBytesWritten: 50.0 GiB, " + + "diskFileCount: 5000, " + + "hdfsBytesWritten: 5.0 GiB, " + + "hdfsFileCount: 20), " + + "Threshold: " + + "Quota[" + + "diskBytesWritten=50.0 GiB, " + + "diskFileCount=9223372036854775807, " + + "hdfsBytesWritten=8.0 EiB, " + + "hdfsFileCount=9223372036854775807]")) + clearUserConsumption() + + rc = + ResourceConsumption( + Utils.byteStringAsBytes("50G"), + 1000, + Utils.byteStringAsBytes("5G"), + 40) + rc.withSubResourceConsumptions( + Map( + "app1" -> ResourceConsumption( + Utils.byteStringAsBytes("40G"), + 500, + Utils.byteStringAsBytes("3G"), + 20), + "app2" -> ResourceConsumption( + Utils.byteStringAsBytes("10G"), + 500, + Utils.byteStringAsBytes("2G"), + 20)).asJava) + + addUserConsumption(user, rc) + conf.set("celeborn.quota.cluster.diskBytesWritten", "20gb") + configService.refreshCache() + quotaManager.updateResourceConsumption() + + res1 = checkUserQuota(user) + res2 = checkApplicationQuota(user, "app1") + res3 = checkApplicationQuota(user, "app2") + + assert(res1 == succeed) + assert(res2 == CheckQuotaResponse( + false, + "Interrupt application caused by the cluster storage usage reach threshold. " + + "Used: " + + "ResourceConsumption(" + + "diskBytesWritten: 40.0 GiB, " + + "diskFileCount: 500, " + + "hdfsBytesWritten: 3.0 GiB, " + + "hdfsFileCount: 20), " + + "Threshold: " + + "Quota[diskBytesWritten=20.0 GiB, " + + "diskFileCount=9223372036854775807, " + + "hdfsBytesWritten=8.0 EiB, " + + "hdfsFileCount=9223372036854775807]")) + assert(res3 == CheckQuotaResponse(true, "")) + + clearUserConsumption() + } + + test("test handleResourceConsumption time - case1") { + // 1000 users 100wapplications, all exceeded + conf.set("celeborn.quota.tenant.diskBytesWritten", "1mb") + conf.set("celeborn.quota.cluster.diskBytesWritten", "1mb") + configService.refreshCache() + val MAX = 2L * 1024 * 1024 * 1024 + val MIN = 1L * 1024 * 1024 * 1024 + val random = new Random() + for (i <- 0 until 1000) { + val user = UserIdentifier("default", s"user$i") + val subResourceConsumption = (0 until 1000).map { + index => + val appId = s"$user$i app$index" + val consumption = ResourceConsumption( + MIN + Math.abs(random.nextLong()) % (MAX - MIN), + MIN + Math.abs(random.nextLong()) % (MAX - MIN), + MIN + Math.abs(random.nextLong()) % (MAX - MIN), + MIN + Math.abs(random.nextLong()) % (MAX - MIN)) + (appId, consumption) + }.toMap + val userConsumption = subResourceConsumption.values.foldRight( + ResourceConsumption(0, 0, 0, 0))(_ add _) + userConsumption.subResourceConsumptions = subResourceConsumption.asJava + addUserConsumption(user, userConsumption) + } + + val start = System.currentTimeMillis() + quotaManager.updateResourceConsumption() + val duration = System.currentTimeMillis() - start + print(s"duration=$duration") + + val res = resourceConsumptionSource.getMetrics() + for (i <- 0 until 1000) { + val user = UserIdentifier("default", s"user$i") + assert(res.contains( + s"""metrics_diskFileCount_Value{name="user$i",role="Master",tenantId="default"}""")) + assert(res.contains( + s"""metrics_diskFileCount_Value{name="user$i",role="Master",tenantId="default"}""")) + assert(res.contains( + s"""metrics_hdfsFileCount_Value{name="user$i",role="Master",tenantId="default"}""")) + assert(res.contains( + s"""metrics_hdfsBytesWritten_Value{name="user$i",role="Master",tenantId="default"}""")) + assertFalse(quotaManager.checkUserQuotaStatus(user).isAvailable) + (0 until 1000).foreach { + index => + val appId = s"$user$i app$index" + assertFalse(quotaManager.checkApplicationQuotaStatus(appId).isAvailable) + } + } + clearUserConsumption() + } + + test("test handleResourceConsumption time - case2") { + // 1000 users 2000000 applications, all exceeded + conf.set("celeborn.quota.tenant.diskBytesWritten", "1mb") + conf.set("celeborn.quota.cluster.diskBytesWritten", "1mb") + configService.refreshCache() + val MAX = 2L * 1024 * 1024 * 1024 + val MIN = 1L * 1024 * 1024 * 1024 + val random = new Random() + for (i <- 0 until 1000) { + val user = UserIdentifier("default", s"user$i") + val subResourceConsumption = + if (i < 100) { + (0 until 1000).map { + index => + val appId = s"$user$i app$index" + val consumption = ResourceConsumption( + MIN + Math.abs(random.nextLong()) % (MAX - MIN), + MIN + Math.abs(random.nextLong()) % (MAX - MIN), + MIN + Math.abs(random.nextLong()) % (MAX - MIN), + MIN + Math.abs(random.nextLong()) % (MAX - MIN)) + (appId, consumption) + }.toMap + } else { + (0 until 1000).map { + index => + val appId = s"$user$i app$index" + val consumption = ResourceConsumption(0, 0, 0, 0) + (appId, consumption) + }.toMap + } + val userConsumption = subResourceConsumption.values.foldRight( + ResourceConsumption(0, 0, 0, 0))(_ add _) + userConsumption.subResourceConsumptions = subResourceConsumption.asJava + addUserConsumption(user, userConsumption) + } + + val start = System.currentTimeMillis() + quotaManager.updateResourceConsumption() + val duration = System.currentTimeMillis() - start + print(s"duration=$duration") + + val res = resourceConsumptionSource.getMetrics() + for (i <- 0 until 1000) { + val user = UserIdentifier("default", s"user$i") + assert(res.contains( + s"""metrics_diskFileCount_Value{name="user$i",role="Master",tenantId="default"}""")) + assert(res.contains( + s"""metrics_diskFileCount_Value{name="user$i",role="Master",tenantId="default"}""")) + assert(res.contains( + s"""metrics_hdfsFileCount_Value{name="user$i",role="Master",tenantId="default"}""")) + assert(res.contains( + s"""metrics_hdfsBytesWritten_Value{name="user$i",role="Master",tenantId="default"}""")) + if (i < 100) { + assertFalse(quotaManager.checkUserQuotaStatus(user).isAvailable) + } else { + assertTrue(quotaManager.checkUserQuotaStatus(user).isAvailable) + } + (0 until 1000).foreach { + index => + val appId = s"$user$i app$index" + if (i < 100) { + assertFalse(quotaManager.checkApplicationQuotaStatus(appId).isAvailable) + } else { + assertTrue(quotaManager.checkApplicationQuotaStatus(appId).isAvailable) + } + } + } + clearUserConsumption() + } + + test("test user level conf") { + val conf1 = new CelebornConf() + conf1.set(CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND, "FS") + conf1.set( + CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH.key, + getTestResourceFile("dynamicConfig-quota-2.yaml").getPath) + val statusSystem = new SingleMasterMetaManager( + RpcEnv.create( + "test-rpc", + TransportModuleConstants.RPC_SERVICE_MODULE, + "localhost", + 9001, + conf1, + None), + conf1) + statusSystem.workers.add(worker) + val quotaManager1 = new QuotaManager( + statusSystem, + new MasterSource(conf1), + resourceConsumptionSource, + conf1, + new FsConfigServiceImpl(conf1)) + + val user = UserIdentifier("tenant_01", "Jerry") + val user1 = UserIdentifier("tenant_01", "John") + + val rc = + ResourceConsumption( + Utils.byteStringAsBytes("200G"), + 20000, + Utils.byteStringAsBytes("30G"), + 40) + rc.withSubResourceConsumptions( + Map( + "app1" -> ResourceConsumption( + Utils.byteStringAsBytes("150G"), + 15000, + Utils.byteStringAsBytes("25G"), + 20), + "app2" -> ResourceConsumption( + Utils.byteStringAsBytes("50G"), + 5000, + Utils.byteStringAsBytes("5G"), + 20)).asJava) + + val rc1 = + ResourceConsumption( + Utils.byteStringAsBytes("80G"), + 0, + 0, + 0) + + rc1.withSubResourceConsumptions( + Map( + "app3" -> ResourceConsumption( + Utils.byteStringAsBytes("80G"), + 0, + 0, + 0)).asJava) + + addUserConsumption(user, rc) + addUserConsumption(user1, rc1) + + quotaManager1.updateResourceConsumption() + val res1 = quotaManager1.checkUserQuotaStatus(user) + val res2 = quotaManager1.checkApplicationQuotaStatus("app1") + val res3 = quotaManager1.checkApplicationQuotaStatus("app2") + val res4 = quotaManager1.checkApplicationQuotaStatus("app3") + assert(res1 == CheckQuotaResponse( + false, + s"Interrupt or reject application caused by the user storage usage reach threshold. " + + s"user: `tenant_01`.`Jerry`. " + + s"DISK_BYTES_WRITTEN(200.0 GiB) exceeds quota(100.0 GiB). " + + s"DISK_FILE_COUNT(20000) exceeds quota(10000). " + + s"HDFS_BYTES_WRITTEN(30.0 GiB) exceeds quota(10.0 GiB). ")) + assert(res2 == CheckQuotaResponse( + false, + "Interrupt or reject application caused by the user storage usage reach threshold. " + + "Used: ResourceConsumption(" + + "diskBytesWritten: 150.0 GiB, " + + "diskFileCount: 15000, " + + "hdfsBytesWritten: 25.0 GiB, " + + "hdfsFileCount: 20), " + + "Threshold: " + + "Quota[" + + "diskBytesWritten=100.0 GiB, " + + "diskFileCount=10000, " + + "hdfsBytesWritten=10.0 GiB, " + + "hdfsFileCount=9223372036854775807]")) + assert(res3 == CheckQuotaResponse(true, "")) + assert(res4 == CheckQuotaResponse( + false, + "Interrupt or reject application caused by the user storage usage reach threshold. " + + "Used: " + + "ResourceConsumption(" + + "diskBytesWritten: 80.0 GiB, " + + "diskFileCount: 0, " + + "hdfsBytesWritten: 0.0 B, " + + "hdfsFileCount: 0), " + + "Threshold: " + + "Quota[" + + "diskBytesWritten=10.0 GiB, " + + "diskFileCount=1000, " + + "hdfsBytesWritten=10.0 GiB, " + + "hdfsFileCount=9223372036854775807]")) + + clearUserConsumption() + } + + def checkUserQuota(userIdentifier: UserIdentifier): CheckQuotaResponse = { + quotaManager.checkUserQuotaStatus(userIdentifier) + } + + def checkApplicationQuota( + userIdentifier: UserIdentifier, + applicationId: String): CheckQuotaResponse = { + quotaManager.checkApplicationQuotaStatus(applicationId) + } + + def addUserConsumption( + userIdentifier: UserIdentifier, + resourceConsumption: ResourceConsumption): Unit = { + worker.userResourceConsumption.put(userIdentifier, resourceConsumption) + } + + def clearUserConsumption(): Unit = { + val applicationSet = worker.userResourceConsumption.asScala.values.flatMap { consumption => + Option(consumption.subResourceConsumptions).map(_.asScala.keySet) + }.flatten.toSet + + applicationSet.foreach(quotaManager.handleAppLost) + worker.userResourceConsumption.clear() } } diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java index c48bbe1fb56..88d31fc9f82 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java @@ -25,7 +25,7 @@ import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.internal.config.ConfigEntry; -import org.apache.celeborn.common.quota.Quota; +import org.apache.celeborn.common.quota.StorageQuota; import org.apache.celeborn.common.util.Utils; /** @@ -40,7 +40,9 @@ public abstract class DynamicConfig { private static final Logger LOG = LoggerFactory.getLogger(DynamicConfig.class); protected Map configs = new HashMap<>(); - protected volatile Quota quota = null; + protected volatile StorageQuota tenantStorageQuota = null; + + protected volatile StorageQuota clusterStorageQuota = null; public abstract DynamicConfig getParentLevelConfig(); @@ -92,19 +94,19 @@ public T formatValue( return null; } - public Quota getQuota() { - if (quota == null) { + public StorageQuota getTenantStorageQuota() { + if (tenantStorageQuota == null) { synchronized (DynamicConfig.class) { - if (quota == null) { - quota = currentQuota(); + if (tenantStorageQuota == null) { + tenantStorageQuota = currentTenantQuota(); } } } - return quota; + return tenantStorageQuota; } - protected Quota currentQuota() { - return new Quota( + protected StorageQuota currentTenantQuota() { + return new StorageQuota( getValue( CelebornConf.QUOTA_DISK_BYTES_WRITTEN().key(), CelebornConf.QUOTA_DISK_BYTES_WRITTEN(), @@ -127,6 +129,49 @@ protected Quota currentQuota() { ConfigType.STRING)); } + public StorageQuota getClusterStorageQuota() { + if (clusterStorageQuota == null) { + synchronized (DynamicConfig.class) { + if (clusterStorageQuota == null) { + clusterStorageQuota = currentClusterQuota(); + } + } + } + return clusterStorageQuota; + } + + protected StorageQuota currentClusterQuota() { + return new StorageQuota( + getValue( + CelebornConf.QUOTA_CLUSTER_DISK_BYTES_WRITTEN().key(), + CelebornConf.QUOTA_CLUSTER_DISK_BYTES_WRITTEN(), + Long.TYPE, + ConfigType.BYTES), + getValue( + CelebornConf.QUOTA_CLUSTER_DISK_FILE_COUNT().key(), + CelebornConf.QUOTA_CLUSTER_DISK_FILE_COUNT(), + Long.TYPE, + ConfigType.STRING), + getValue( + CelebornConf.QUOTA_CLUSTER_HDFS_BYTES_WRITTEN().key(), + CelebornConf.QUOTA_CLUSTER_HDFS_BYTES_WRITTEN(), + Long.TYPE, + ConfigType.BYTES), + getValue( + CelebornConf.QUOTA_CLUSTER_HDFS_FILE_COUNT().key(), + CelebornConf.QUOTA_CLUSTER_HDFS_FILE_COUNT(), + Long.TYPE, + ConfigType.STRING)); + } + + public boolean interruptShuffleEnabled() { + return getValue( + CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED().key(), + CelebornConf.QUOTA_INTERRUPT_SHUFFLE_ENABLED(), + Boolean.TYPE, + ConfigType.BOOLEAN); + } + public Map getConfigs() { return configs; } @@ -143,6 +188,7 @@ public enum ConfigType { BYTES, STRING, TIME_MS, + BOOLEAN } public static T convert(Class clazz, String value) { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index d0371e2bd4b..7f1d7cd0ecf 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -671,8 +671,14 @@ private[celeborn] class Worker( val resourceConsumptionSnapshot = storageManager.userResourceConsumptionSnapshot() val userResourceConsumptions = workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava) - resourceConsumptionSnapshot.foreach { case (userIdentifier, _) => + resourceConsumptionSnapshot.foreach { case (userIdentifier, userResourceConsumption) => gaugeResourceConsumption(userIdentifier) + val subResourceConsumptions = userResourceConsumption.subResourceConsumptions + if (CollectionUtils.isNotEmpty(subResourceConsumptions)) { + subResourceConsumptions.asScala.keys.foreach { + gaugeResourceConsumption(userIdentifier, _) + } + } } handleTopResourceConsumption(userResourceConsumptions) userResourceConsumptions diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 00e0a28878a..cc02f52b409 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -904,17 +904,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } // userIdentifier -> List((userIdentifier, (applicationId, fileInfo)))) .groupBy(_._1) - .map { case (userIdentifier, userWithFileInfoList) => + .mapValues { userWithFileInfoList => // collect resource consumed by each user on this worker - val userFileInfos = userWithFileInfoList.map(_._2) - ( - userIdentifier, - resourceConsumption( - userFileInfos.map(_._2), - userFileInfos.groupBy(_._1).map { - case (applicationId, appWithFileInfoList) => - (applicationId, resourceConsumption(appWithFileInfoList.map(_._2))) - }.asJava)) + val subResourceConsumption = userWithFileInfoList.map(_._2).groupBy(_._1).map { + case (applicationId, appWithFileInfoList) => + (applicationId, resourceConsumption(appWithFileInfoList.map(_._2))) + } + subResourceConsumption.values.foldLeft(ResourceConsumption(0, 0, 0, 0))(_ add _) + .withSubResourceConsumptions(subResourceConsumption.asJava) } } } From ff37e633397b9f956b40db75df2695d4278719a9 Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Thu, 17 Oct 2024 17:31:07 +0800 Subject: [PATCH 3/3] fix --- docs/configuration/master.md | 2 ++ docs/configuration/quota.md | 9 +++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 6e98b60245d..b12331e49b6 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -77,7 +77,9 @@ license: | | celeborn.master.slot.assign.loadAware.numDiskGroups | 5 | false | This configuration is a guidance for load-aware slot allocation algorithm. This value is control how many disk groups will be created. | 0.3.0 | celeborn.slots.assign.loadAware.numDiskGroups | | celeborn.master.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. | 0.3.1 | | | celeborn.master.slot.assign.policy | ROUNDROBIN | false | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.availableTypes` | 0.3.0 | celeborn.slots.assign.policy | +| celeborn.master.userResourceConsumption.cluster.threshold | 9223372036854775807b | false | When cluster resource consumption exceeds quota, Master will interrupt some apps until cluster resource consumption is less than this value. Default value is Long.MaxValue which means disable check. | 0.6.0 | | | celeborn.master.userResourceConsumption.update.interval | 30s | false | Time length for a window about compute user resource consumption. | 0.3.0 | | +| celeborn.master.userResourceConsumption.user.threshold | 9223372036854775807b | false | When user resource consumption exceeds quota, Master will interrupt some apps until user resource consumption is less than this value. Default value is Long.MaxValue which means disable check. | 0.6.0 | | | celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker unavailable info would be cleared when the retention period is expired. Set -1 to disable the expiration. | 0.3.1 | | | celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service of Spark when Master side checks that there is no enough quota for current user. | 0.2.0 | | | celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false | Regex to decide which Celeborn configuration properties and environment variables in master and worker environments contain sensitive information. When this regex matches a property key or value, the value is redacted from the logging. | 0.5.0 | | diff --git a/docs/configuration/quota.md b/docs/configuration/quota.md index 4e79050e19e..8bfce68a22e 100644 --- a/docs/configuration/quota.md +++ b/docs/configuration/quota.md @@ -19,12 +19,17 @@ license: | | Key | Default | isDynamic | Description | Since | Deprecated | | --- | ------- | --------- | ----------- | ----- | ---------- | +| celeborn.quota.cluster.diskBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for cluster written disk bytes. | 0.6.0 | | +| celeborn.quota.cluster.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for cluster written disk file count. | 0.6.0 | | +| celeborn.quota.cluster.hdfsBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for cluster written hdfs bytes. | 0.6.0 | | +| celeborn.quota.cluster.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for cluster written hdfs file count. | 0.6.0 | | | celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service of Spark when Master side checks that there is no enough quota for current user. | 0.2.0 | | | celeborn.quota.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | false | IdentityProvider class name. Default class is `org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. | 0.2.0 | | | celeborn.quota.identity.user-specific.tenant | default | false | Tenant id if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | | | celeborn.quota.identity.user-specific.userName | default | false | User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | | -| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written disk bytes. | 0.5.0 | | +| celeborn.quota.interruptShuffle.enabled | false | true | If enabled, the resource consumption used by the tenant exceeds celeborn.quota.tenant.xx, or the resource consumption of the entire cluster exceeds celeborn.quota.cluster.xx, some shuffles will be selected and interrupted. | 0.6.0 | | +| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for written disk bytes. | 0.5.0 | | | celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for written disk file count. | 0.5.0 | | -| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written hdfs bytes. | 0.5.0 | | +| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for written hdfs bytes. | 0.5.0 | | | celeborn.quota.tenant.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for written hdfs file count. | 0.5.0 | |