Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable columnar shuffle by default #250

Merged
merged 32 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
da9d7df
feat: Enable columnar shuffle by default
viirya Apr 8, 2024
2ae45fb
Update plan stability
viirya Apr 9, 2024
0ab3225
Fix
viirya Apr 10, 2024
b2d9dac
Update diff
viirya Apr 10, 2024
c43488a
Add Comet memoryoverhead for Spark SQL tests
viirya Apr 11, 2024
8815eaa
Update plan stability
viirya Apr 13, 2024
343dabe
Update diff
viirya Apr 14, 2024
8f0875f
Update more diff
viirya Apr 15, 2024
2d6a7ea
Update DataFusion commit
viirya Apr 15, 2024
1fbeda2
Update diff
viirya Apr 16, 2024
7e2c2c0
Update diff
viirya Apr 19, 2024
dd196c8
Update diff
viirya Apr 20, 2024
2b0bc0b
Update diff
viirya Apr 21, 2024
733991b
Update diff
viirya Apr 22, 2024
eb41802
Fix more tests
viirya Apr 30, 2024
6b2315d
Fix more
viirya Apr 30, 2024
1059ef2
Fix
viirya Apr 30, 2024
5cb2c54
Fix more
viirya Apr 30, 2024
b342b66
Fix more
viirya May 2, 2024
6d27a5f
Fix more
viirya May 2, 2024
508cb1a
Fix more
viirya May 2, 2024
5e03881
Fix more
viirya May 3, 2024
1e560d3
Update diff
viirya May 3, 2024
126675e
Merge remote-tracking branch 'upstream/main' into columnar_shuffle_de…
viirya May 3, 2024
b37070d
Fix memory leak
viirya May 4, 2024
557b753
Update plan stability
viirya May 5, 2024
65a9515
Restore diff
viirya May 5, 2024
70c53b4
Merge remote-tracking branch 'upstream/main' into columnar_shuffle_de…
viirya May 6, 2024
c9633ba
Update core/src/execution/datafusion/planner.rs
viirya May 8, 2024
a61dc70
Update core/src/execution/datafusion/planner.rs
viirya May 8, 2024
cceb552
Fix style
viirya May 8, 2024
cbc5305
Use ShuffleExchangeLike instead
viirya May 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
"spark.comet.columnar.shuffle.enabled")
.doc(
"Force Comet to only use columnar shuffle for CometScan and Spark regular operators. " +
"If this is enabled, Comet native shuffle will not be enabled but only Arrow shuffle. " +
"By default, this config is false.")
.booleanConf
.createWithDefault(false)
val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.columnar.shuffle.enabled")
.doc(
"Whether to enable Arrow-based columnar shuffle for Comet and Spark regular operators. " +
"If this is enabled, Comet prefers columnar shuffle than native shuffle. " +
"By default, this config is true.")
.booleanConf
.createWithDefault(true)

val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.shuffle.enforceMode.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class NativeUtil {
val arrowArray = ArrowArray.allocateNew(allocator)
Data.exportVector(
allocator,
getFieldVector(valueVector),
getFieldVector(valueVector, "export"),
provider,
arrowArray,
arrowSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ object Utils {
}
}

getFieldVector(valueVector)
getFieldVector(valueVector, "serialize")

case c =>
throw new SparkException(
Expand All @@ -253,14 +253,15 @@ object Utils {
(fieldVectors, provider)
}

def getFieldVector(valueVector: ValueVector): FieldVector = {
def getFieldVector(valueVector: ValueVector, reason: String): FieldVector = {
valueVector match {
case v @ (_: BitVector | _: TinyIntVector | _: SmallIntVector | _: IntVector |
_: BigIntVector | _: Float4Vector | _: Float8Vector | _: VarCharVector |
_: DecimalVector | _: DateDayVector | _: TimeStampMicroTZVector | _: VarBinaryVector |
_: FixedSizeBinaryVector | _: TimeStampMicroVector) =>
v.asInstanceOf[FieldVector]
case _ => throw new SparkException(s"Unsupported Arrow Vector: ${valueVector.getClass}")
case _ =>
throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}")
}
}
}
Loading
Loading