From 5430f632f46afb4c2130d717b2c75296218994e7 Mon Sep 17 00:00:00 2001 From: Xuedong Luan Date: Mon, 13 May 2024 22:25:03 +0800 Subject: [PATCH] chore: Rename some columnar shuffle configs for code consistently (#418) --- common/src/main/scala/org/apache/comet/CometConf.scala | 6 +++--- .../spark/shuffle/sort/CometShuffleExternalSorter.java | 4 ++-- .../shuffle/CometBypassMergeSortShuffleWriter.java | 2 +- .../sql/comet/execution/shuffle/CometDiskBlockWriter.java | 2 +- .../sql/comet/execution/shuffle/ShuffleThreadPool.java | 3 ++- .../sql/comet/execution/shuffle/CometShuffleManager.scala | 2 +- .../org/apache/comet/exec/CometColumnarShuffleSuite.scala | 6 +++--- 7 files changed, 13 insertions(+), 12 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index e9349c315..e3584300e 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -186,7 +186,7 @@ object CometConf { .booleanConf .createWithDefault(false) - val COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] = + val COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] = conf("spark.comet.columnar.shuffle.async.thread.num") .doc("Number of threads used for Comet async columnar shuffle per shuffle task. " + "By default, this config is 3. Note that more threads means more memory requirement to " + @@ -195,7 +195,7 @@ object CometConf { .intConf .createWithDefault(3) - val COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = { + val COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = { conf("spark.comet.columnar.shuffle.async.max.thread.num") .doc("Maximum number of threads on an executor used for Comet async columnar shuffle. " + "By default, this config is 100. This is the upper bound of total number of shuffle " + @@ -207,7 +207,7 @@ object CometConf { .createWithDefault(100) } - val COMET_EXEC_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] = + val COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] = conf("spark.comet.columnar.shuffle.spill.threshold") .doc( "Number of rows to be spilled used for Comet columnar shuffle. " + diff --git a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java index 4417c4f29..ed3e2be66 100644 --- a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java +++ b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java @@ -146,7 +146,7 @@ public CometShuffleExternalSorter( this.numPartitions = numPartitions; this.schema = schema; this.numElementsForSpillThreshold = - (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD().get(); + (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get(); this.writeMetrics = writeMetrics; this.peakMemoryUsedBytes = getMemoryUsage(); @@ -158,7 +158,7 @@ public CometShuffleExternalSorter( this.isAsync = (boolean) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get(); if (isAsync) { - this.threadNum = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM().get(); + this.threadNum = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get(); assert (this.threadNum > 0); this.threadPool = ShuffleThreadPool.getThreadPool(); } else { diff --git a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java index 5c17a643a..108e1f2e1 100644 --- a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java +++ b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java @@ -141,7 +141,7 @@ final class CometBypassMergeSortShuffleWriter extends ShuffleWriter this.partitionChecksums = createPartitionChecksums(numPartitions, conf); this.isAsync = (boolean) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get(); - this.asyncThreadNum = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM().get(); + this.asyncThreadNum = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get(); if (isAsync) { logger.info("Async shuffle writer enabled"); diff --git a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java index 309fcaf69..f793874d7 100644 --- a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java +++ b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java @@ -154,7 +154,7 @@ public final class CometDiskBlockWriter { this.columnarBatchSize = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_BATCH_SIZE().get(); this.numElementsForSpillThreshold = - (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD().get(); + (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get(); this.preferDictionaryRatio = (double) CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get(); diff --git a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java index 69550e47b..86d7d7dc2 100644 --- a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java +++ b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/ShuffleThreadPool.java @@ -37,7 +37,8 @@ public static synchronized ExecutorService getThreadPool() { ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("async-shuffle-writer-%d").build(); - int threadNum = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM().get(); + int threadNum = + (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM().get(); INSTANCE = new ThreadPoolExecutor( 0, threadNum, 1L, TimeUnit.SECONDS, new ThreadPoolQueue(threadNum), factory); diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala index cb3422531..335fb065f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala @@ -248,7 +248,7 @@ object CometShuffleManager extends Logging { // Bypass merge sort if we have partition * cores fewer than // `spark.comet.columnar.shuffle.async.max.thread.num` val executorCores = conf.get(config.EXECUTOR_CORES) - val maxThreads = CometConf.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM.get(SQLConf.get) + val maxThreads = CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM.get(SQLConf.get) val threadCond = dep.partitioner.numPartitions * executorCores <= maxThreads // Comet columnar shuffle buffers rows in memory. If too many cores are used with diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 114351fd1..600f9c44f 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -52,7 +52,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar super.test(testName, testTags: _*) { withSQLConf( CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncShuffleEnable.toString, - CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString, + CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString, CometConf.COMET_EXEC_ENABLED.key -> "false", CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", @@ -634,7 +634,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", CometConf.COMET_BATCH_SIZE.key -> "10", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.1", - CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") { + CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") { val table1 = (0 until 1000) .map(i => (111111.toString, 2222222.toString, 3333333.toString, i.toLong)) .toDF("a", "b", "c", "d") @@ -1081,7 +1081,7 @@ class CometShuffleEncryptionSuite extends CometTestBase { class CometShuffleManagerSuite extends CometTestBase { test("should not bypass merge sort if executor cores are too high") { - withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM.key -> "100") { + withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM.key -> "100") { val conf = new SparkConf() conf.set("spark.executor.cores", "1")