Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Rename some columnar shuffle configs for code consistently #418

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
val COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.async.thread.num")
.doc("Number of threads used for Comet async columnar shuffle per shuffle task. " +
"By default, this config is 3. Note that more threads means more memory requirement to " +
Expand All @@ -195,7 +195,7 @@ object CometConf {
.intConf
.createWithDefault(3)

val COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
val COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
conf("spark.comet.columnar.shuffle.async.max.thread.num")
.doc("Maximum number of threads on an executor used for Comet async columnar shuffle. " +
"By default, this config is 100. This is the upper bound of total number of shuffle " +
Expand All @@ -207,7 +207,7 @@ object CometConf {
.createWithDefault(100)
}

val COMET_EXEC_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] =
val COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.spill.threshold")
.doc(
"Number of rows to be spilled used for Comet columnar shuffle. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public CometShuffleExternalSorter(
this.numPartitions = numPartitions;
this.schema = schema;
this.numElementsForSpillThreshold =
(int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD().get();
(int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get();
this.writeMetrics = writeMetrics;

this.peakMemoryUsedBytes = getMemoryUsage();
Expand All @@ -158,7 +158,7 @@ public CometShuffleExternalSorter(
this.isAsync = (boolean) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get();

if (isAsync) {
this.threadNum = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM().get();
this.threadNum = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get();
assert (this.threadNum > 0);
this.threadPool = ShuffleThreadPool.getThreadPool();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ final class CometBypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V>
this.partitionChecksums = createPartitionChecksums(numPartitions, conf);

this.isAsync = (boolean) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get();
this.asyncThreadNum = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM().get();
this.asyncThreadNum = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get();

if (isAsync) {
logger.info("Async shuffle writer enabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public final class CometDiskBlockWriter {
this.columnarBatchSize = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_BATCH_SIZE().get();

this.numElementsForSpillThreshold =
(int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD().get();
(int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get();

this.preferDictionaryRatio =
(double) CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public static synchronized ExecutorService getThreadPool() {
ThreadFactory factory =
new ThreadFactoryBuilder().setNameFormat("async-shuffle-writer-%d").build();

int threadNum = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM().get();
int threadNum =
(int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM().get();
INSTANCE =
new ThreadPoolExecutor(
0, threadNum, 1L, TimeUnit.SECONDS, new ThreadPoolQueue(threadNum), factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ object CometShuffleManager extends Logging {
// Bypass merge sort if we have partition * cores fewer than
// `spark.comet.columnar.shuffle.async.max.thread.num`
val executorCores = conf.get(config.EXECUTOR_CORES)
val maxThreads = CometConf.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM.get(SQLConf.get)
val maxThreads = CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM.get(SQLConf.get)
val threadCond = dep.partitioner.numPartitions * executorCores <= maxThreads

// Comet columnar shuffle buffers rows in memory. If too many cores are used with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
super.test(testName, testTags: _*) {
withSQLConf(
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncShuffleEnable.toString,
CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString,
CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString,
CometConf.COMET_EXEC_ENABLED.key -> "false",
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
Expand Down Expand Up @@ -634,7 +634,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
CometConf.COMET_BATCH_SIZE.key -> "10",
CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.1",
CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") {
CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") {
val table1 = (0 until 1000)
.map(i => (111111.toString, 2222222.toString, 3333333.toString, i.toLong))
.toDF("a", "b", "c", "d")
Expand Down Expand Up @@ -1081,7 +1081,7 @@ class CometShuffleEncryptionSuite extends CometTestBase {
class CometShuffleManagerSuite extends CometTestBase {

test("should not bypass merge sort if executor cores are too high") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM.key -> "100") {
withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM.key -> "100") {
val conf = new SparkConf()
conf.set("spark.executor.cores", "1")

Expand Down
Loading