Skip to content

Commit

Permalink
chore: Rename some columnar shuffle configs for code consistently (ap…
Browse files Browse the repository at this point in the history
…ache#418)

(cherry picked from commit 5430f63)
  • Loading branch information
leoluan2009 authored and Huaxin Gao committed May 23, 2024
1 parent 340d7d9 commit b2adeaf
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 12 deletions.
6 changes: 3 additions & 3 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
val COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.async.thread.num")
.doc("Number of threads used for Comet async columnar shuffle per shuffle task. " +
"By default, this config is 3. Note that more threads means more memory requirement to " +
Expand All @@ -195,7 +195,7 @@ object CometConf {
.intConf
.createWithDefault(3)

val COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
val COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
conf("spark.comet.columnar.shuffle.async.max.thread.num")
.doc("Maximum number of threads on an executor used for Comet async columnar shuffle. " +
"By default, this config is 100. This is the upper bound of total number of shuffle " +
Expand All @@ -207,7 +207,7 @@ object CometConf {
.createWithDefault(100)
}

val COMET_EXEC_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] =
val COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.spill.threshold")
.doc(
"Number of rows to be spilled used for Comet columnar shuffle. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public CometShuffleExternalSorter(
this.numPartitions = numPartitions;
this.schema = schema;
this.numElementsForSpillThreshold =
(int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD().get();
(int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get();
this.writeMetrics = writeMetrics;

this.peakMemoryUsedBytes = getMemoryUsage();
Expand All @@ -158,7 +158,7 @@ public CometShuffleExternalSorter(
this.isAsync = (boolean) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get();

if (isAsync) {
this.threadNum = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM().get();
this.threadNum = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get();
assert (this.threadNum > 0);
this.threadPool = ShuffleThreadPool.getThreadPool();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ final class CometBypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V>
this.partitionChecksums = createPartitionChecksums(numPartitions, conf);

this.isAsync = (boolean) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get();
this.asyncThreadNum = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM().get();
this.asyncThreadNum = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get();

if (isAsync) {
logger.info("Async shuffle writer enabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public final class CometDiskBlockWriter {
this.columnarBatchSize = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_BATCH_SIZE().get();

this.numElementsForSpillThreshold =
(int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD().get();
(int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get();

this.preferDictionaryRatio =
(double) CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public static synchronized ExecutorService getThreadPool() {
ThreadFactory factory =
new ThreadFactoryBuilder().setNameFormat("async-shuffle-writer-%d").build();

int threadNum = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM().get();
int threadNum =
(int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM().get();
INSTANCE =
new ThreadPoolExecutor(
0, threadNum, 1L, TimeUnit.SECONDS, new ThreadPoolQueue(threadNum), factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ object CometShuffleManager extends Logging {
// Bypass merge sort if we have partition * cores fewer than
// `spark.comet.columnar.shuffle.async.max.thread.num`
val executorCores = conf.get(config.EXECUTOR_CORES)
val maxThreads = CometConf.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM.get(SQLConf.get)
val maxThreads = CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM.get(SQLConf.get)
val threadCond = dep.partitioner.numPartitions * executorCores <= maxThreads

// Comet columnar shuffle buffers rows in memory. If too many cores are used with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
super.test(testName, testTags: _*) {
withSQLConf(
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncShuffleEnable.toString,
CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString,
CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString,
CometConf.COMET_EXEC_ENABLED.key -> "false",
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
Expand Down Expand Up @@ -634,7 +634,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
CometConf.COMET_BATCH_SIZE.key -> "10",
CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.1",
CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") {
CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") {
val table1 = (0 until 1000)
.map(i => (111111.toString, 2222222.toString, 3333333.toString, i.toLong))
.toDF("a", "b", "c", "d")
Expand Down Expand Up @@ -1081,7 +1081,7 @@ class CometShuffleEncryptionSuite extends CometTestBase {
class CometShuffleManagerSuite extends CometTestBase {

test("should not bypass merge sort if executor cores are too high") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM.key -> "100") {
withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM.key -> "100") {
val conf = new SparkConf()
conf.set("spark.executor.cores", "1")

Expand Down

0 comments on commit b2adeaf

Please sign in to comment.