Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1748] Deprecate identity provider configs tied with quota #2952

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public void init(Context context) throws IOException {
applicationAttemptId,
IOBufferSize);
UserIdentifier userIdentifier =
new UserIdentifier(
celebornConf.quotaUserSpecificTenant(), celebornConf.quotaUserSpecificUserName());
new UserIdentifier(celebornConf.userSpecificTenant(), celebornConf.userSpecificUserName());

final float spiller = jobConf.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float) 0.8);
int pushSize = (int) ((IOBufferSize << 20) * spiller);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void init(Context<K, V> context) {
lmPort,
celebornConf,
new UserIdentifier(
celebornConf.quotaUserSpecificTenant(), celebornConf.quotaUserSpecificUserName()));
celebornConf.userSpecificTenant(), celebornConf.userSpecificUserName()));
this.merger =
new MergeManagerImpl<>(
reduceId,
Expand Down
85 changes: 52 additions & 33 deletions common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.util.Try
import scala.util.matching.Regex

import org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
import org.apache.celeborn.common.identity.{DefaultIdentityProvider, IdentityProvider}
import org.apache.celeborn.common.identity.{DefaultIdentityProvider, HadoopBasedIdentityProvider, IdentityProvider}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.internal.config._
import org.apache.celeborn.common.network.util.ByteUnit
Expand Down Expand Up @@ -887,11 +887,15 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
// Quota //
// //////////////////////////////////////////////////////
def quotaEnabled: Boolean = get(QUOTA_ENABLED)
def quotaIdentityProviderClass: String = get(QUOTA_IDENTITY_PROVIDER)
def quotaUserSpecificTenant: String = get(QUOTA_USER_SPECIFIC_TENANT)
def quotaUserSpecificUserName: String = get(QUOTA_USER_SPECIFIC_USERNAME)
def quotaInterruptShuffleEnabled: Boolean = get(QUOTA_INTERRUPT_SHUFFLE_ENABLED)

// //////////////////////////////////////////////////////
// Identity //
// //////////////////////////////////////////////////////
def identityProviderClass: String = get(IDENTITY_PROVIDER)
def userSpecificTenant: String = get(USER_SPECIFIC_TENANT)
def userSpecificUserName: String = get(USER_SPECIFIC_USERNAME)

// //////////////////////////////////////////////////////
// Client //
// //////////////////////////////////////////////////////
Expand Down Expand Up @@ -1589,7 +1593,19 @@ object CelebornConf extends Logging {
DeprecatedConfig(
"celeborn.worker.congestionControl.high.watermark",
"0.6.0",
"Please use celeborn.worker.congestionControl.diskBuffer.high.watermark"))
"Please use celeborn.worker.congestionControl.diskBuffer.high.watermark"),
DeprecatedConfig(
"celeborn.quota.identity.provider",
"0.6.0",
"Please use celeborn.identity.provider"),
DeprecatedConfig(
"celeborn.quota.identity.user-specific.tenant",
"0.6.0",
"Please use celeborn.identity.user-specific.tenant"),
DeprecatedConfig(
"celeborn.quota.identity.user-specific.userName",
"0.6.0",
"Please use celeborn.identity.user-specific.userName"))

Map(configs.map { cfg => (cfg.key -> cfg) }: _*)
}
Expand Down Expand Up @@ -5329,6 +5345,37 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)

val IDENTITY_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.identity.provider")
.withAlternative("celeborn.quota.identity.provider")
.categories("client")
.doc(s"IdentityProvider class name. Default class is " +
s"`${classOf[DefaultIdentityProvider].getName}`. " +
s"Optional values: " +
s"${classOf[HadoopBasedIdentityProvider].getName} user name will be obtained by UserGroupInformation.getUserName; " +
s"${classOf[DefaultIdentityProvider].getName} user name and tenant id are default values or user-specific values.")
.version("0.6.0")
.stringConf
.createWithDefault(classOf[DefaultIdentityProvider].getName)

val USER_SPECIFIC_TENANT: ConfigEntry[String] =
buildConf("celeborn.identity.user-specific.tenant")
.withAlternative("celeborn.quota.identity.user-specific.tenant")
.categories("client")
.doc(s"Tenant id if ${IDENTITY_PROVIDER.key} is ${classOf[DefaultIdentityProvider].getName}.")
.version("0.6.0")
.stringConf
.createWithDefault(IdentityProvider.DEFAULT_TENANT_ID)

val USER_SPECIFIC_USERNAME: ConfigEntry[String] =
buildConf("celeborn.identity.user-specific.userName")
.withAlternative("celeborn.quota.identity.user-specific.userName")
.categories("client")
.doc(s"User name if ${IDENTITY_PROVIDER.key} is ${classOf[DefaultIdentityProvider].getName}.")
.version("0.6.0")
.stringConf
.createWithDefault(IdentityProvider.DEFAULT_USERNAME)

val QUOTA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.quota.enabled")
.categories("quota", "master", "client")
Expand All @@ -5342,18 +5389,6 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)

val QUOTA_IDENTITY_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.quota.identity.provider")
.categories("quota", "client")
.doc(s"IdentityProvider class name. Default class is " +
s"`${classOf[DefaultIdentityProvider].getName}`. " +
s"Optional values: " +
s"org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; " +
s"org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values.")
.version("0.2.0")
.stringConf
.createWithDefault(classOf[DefaultIdentityProvider].getName)

val CONTAINER_INFO_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.container.info.provider")
.categories("master", "worker")
Expand All @@ -5363,22 +5398,6 @@ object CelebornConf extends Logging {
.stringConf
.createWithDefault("org.apache.celeborn.server.common.container.DefaultContainerInfoProvider")

val QUOTA_USER_SPECIFIC_TENANT: ConfigEntry[String] =
buildConf("celeborn.quota.identity.user-specific.tenant")
.categories("quota", "client")
.doc(s"Tenant id if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider.")
.version("0.3.0")
.stringConf
.createWithDefault(IdentityProvider.DEFAULT_TENANT_ID)

val QUOTA_USER_SPECIFIC_USERNAME: ConfigEntry[String] =
buildConf("celeborn.quota.identity.user-specific.userName")
.categories("quota", "client")
.doc(s"User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider.")
.version("0.3.0")
.stringConf
.createWithDefault(IdentityProvider.DEFAULT_USERNAME)

val QUOTA_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.diskBytesWritten")
.categories("quota")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class DefaultIdentityProvider extends IdentityProvider {
override def provide(): UserIdentifier = {
val conf = new CelebornConf()
UserIdentifier(
conf.quotaUserSpecificTenant,
conf.quotaUserSpecificUserName)
conf.userSpecificTenant,
conf.userSpecificUserName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ object IdentityProvider extends Logging {
val DEFAULT_USERNAME = "default"

def instantiate(conf: CelebornConf): IdentityProvider = {
Utils.instantiate[IdentityProvider](conf.quotaIdentityProviderClass)
Utils.instantiate[IdentityProvider](conf.identityProviderClass)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class UtilsSuite extends CelebornFunSuite {

test("test instantiate") {
val celebornConf = new CelebornConf()
assert(Utils.instantiate[DefaultIdentityProvider](celebornConf.quotaIdentityProviderClass)
assert(Utils.instantiate[DefaultIdentityProvider](celebornConf.identityProviderClass)
.isInstanceOf[DefaultIdentityProvider])
}
}
6 changes: 3 additions & 3 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ license: |
| celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always use spark built-in shuffle implementation. This configuration is deprecated, consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. | 0.3.0 | celeborn.shuffle.forceFallback.enabled |
| celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | celeborn.shuffle.writer |
| celeborn.client.spark.stageRerun.enabled | true | false | Whether to enable stage rerun. If true, client throws FetchFailedException instead of CelebornIOException. | 0.4.0 | celeborn.client.spark.fetch.throwsFetchFailure |
| celeborn.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | false | IdentityProvider class name. Default class is `org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. | 0.6.0 | celeborn.quota.identity.provider |
| celeborn.identity.user-specific.tenant | default | false | Tenant id if celeborn.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.6.0 | celeborn.quota.identity.user-specific.tenant |
| celeborn.identity.user-specific.userName | default | false | User name if celeborn.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.6.0 | celeborn.quota.identity.user-specific.userName |
| celeborn.master.endpoints | &lt;localhost&gt;:9097 | false | Endpoints of master nodes for celeborn clients to connect. Client uses resolver provided by celeborn.master.endpoints.resolver to resolve the master endpoints. By default Celeborn uses `org.apache.celeborn.common.client.StaticMasterEndpointResolver` which take static master endpoints as input. Allowed pattern: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. If the master endpoints are not static then users can pass custom resolver implementation to discover master endpoints actively using celeborn.master.endpoints.resolver. | 0.2.0 | |
| celeborn.master.endpoints.resolver | org.apache.celeborn.common.client.StaticMasterEndpointResolver | false | Resolver class that can be used for discovering and updating the master endpoints. This allows users to provide a custom master endpoint resolver implementation. This is useful in environments where the master nodes might change due to scaling operations or infrastructure updates. Clients need to ensure that provided resolver class should be present in the classpath. | 0.6.0 | |
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service of Spark when Master side checks that there is no enough quota for current user. | 0.2.0 | |
| celeborn.quota.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | false | IdentityProvider class name. Default class is `org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. | 0.2.0 | |
| celeborn.quota.identity.user-specific.tenant | default | false | Tenant id if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.identity.user-specific.userName | default | false | User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable interrupt shuffle when quota exceeds. | 0.6.0 | |
| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | celeborn.storage.activeTypes |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | |
Expand Down
3 changes: 0 additions & 3 deletions docs/configuration/quota.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ license: |
| Key | Default | isDynamic | Description | Since | Deprecated |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service of Spark when Master side checks that there is no enough quota for current user. | 0.2.0 | |
| celeborn.quota.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | false | IdentityProvider class name. Default class is `org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. | 0.2.0 | |
| celeborn.quota.identity.user-specific.tenant | default | false | Tenant id if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.identity.user-specific.userName | default | false | User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable interrupt shuffle when quota exceeds. | 0.6.0 | |
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written disk bytes. | 0.5.0 | |
| celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for written disk file count. | 0.5.0 | |
Expand Down
Loading