-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support CollectLimit operator #100
Changes from all commits
248cd03
afb6513
e15852f
5543837
f1c7718
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,9 +20,11 @@ | |
package org.apache.comet.shims | ||
|
||
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation | ||
import org.apache.spark.sql.execution.{LimitExec, SparkPlan} | ||
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan | ||
|
||
trait ShimCometSparkSessionExtensions { | ||
import org.apache.comet.shims.ShimCometSparkSessionExtensions._ | ||
|
||
/** | ||
* TODO: delete after dropping Spark 3.2.0 support and directly call scan.pushedAggregate | ||
|
@@ -32,4 +34,19 @@ trait ShimCometSparkSessionExtensions { | |
.map { a => a.setAccessible(true); a } | ||
.flatMap(_.get(scan).asInstanceOf[Option[Aggregation]]) | ||
.headOption | ||
|
||
/** | ||
* TODO: delete after dropping Spark 3.2 and 3.3 support | ||
*/ | ||
def getOffset(limit: LimitExec): Int = getOffsetOpt(limit).getOrElse(0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do you like this? I think we should expose the The actual implementation could be generic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks good 👍 |
||
|
||
} | ||
|
||
object ShimCometSparkSessionExtensions { | ||
private def getOffsetOpt(plan: SparkPlan): Option[Int] = plan.getClass.getDeclaredFields | ||
.filter(_.getName == "offset") | ||
.map { a => a.setAccessible(true); a.get(plan) } | ||
.filter(_.isInstanceOf[Int]) | ||
.map(_.asInstanceOf[Int]) | ||
.headOption | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,6 @@ | |
|
||
package org.apache.spark.sql.comet | ||
|
||
import org.apache.spark.{Partition, SparkContext, TaskContext} | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition, UnknownPartitioning} | ||
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} | ||
|
@@ -42,7 +41,7 @@ case class CometCoalesceExec( | |
if (numPartitions == 1 && rdd.getNumPartitions < 1) { | ||
// Make sure we don't output an RDD with 0 partitions, when claiming that we have a | ||
// `SinglePartition`. | ||
new CometCoalesceExec.EmptyRDDWithPartitions(sparkContext, numPartitions) | ||
CometExecUtils.emptyRDDWithPartitions(sparkContext, 1) | ||
} else { | ||
rdd.coalesce(numPartitions, shuffle = false) | ||
} | ||
|
@@ -67,20 +66,3 @@ case class CometCoalesceExec( | |
|
||
override def hashCode(): Int = Objects.hashCode(numPartitions: java.lang.Integer, child) | ||
} | ||
|
||
object CometCoalesceExec { | ||
|
||
/** A simple RDD with no data, but with the given number of partitions. */ | ||
class EmptyRDDWithPartitions(@transient private val sc: SparkContext, numPartitions: Int) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's moving to CometExecUtils, So it can be reused for several places. |
||
extends RDD[ColumnarBatch](sc, Nil) { | ||
|
||
override def getPartitions: Array[Partition] = | ||
Array.tabulate(numPartitions)(i => EmptyPartition(i)) | ||
|
||
override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { | ||
Iterator.empty | ||
} | ||
} | ||
|
||
case class EmptyPartition(index: Int) extends Partition | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
advancedxy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
package org.apache.spark.sql.comet | ||
|
||
import java.util.Objects | ||
|
||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.serializer.Serializer | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec} | ||
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnaryExecNode, UnsafeRowSerializer} | ||
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} | ||
import org.apache.spark.sql.vectorized.ColumnarBatch | ||
|
||
/** | ||
* Comet physical plan node for Spark `CollectLimitExec`. | ||
* | ||
* Similar to `CometTakeOrderedAndProjectExec`, it contains two native executions seperated by a | ||
* Comet shuffle. | ||
* | ||
* TODO: support offset semantics | ||
*/ | ||
case class CometCollectLimitExec( | ||
override val originalPlan: SparkPlan, | ||
limit: Int, | ||
offset: Int, | ||
child: SparkPlan) | ||
extends CometExec | ||
with UnaryExecNode { | ||
|
||
private lazy val writeMetrics = | ||
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) | ||
private lazy val readMetrics = | ||
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) | ||
override lazy val metrics: Map[String, SQLMetric] = Map( | ||
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), | ||
"shuffleReadElapsedCompute" -> | ||
SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"), | ||
"numPartitions" -> SQLMetrics.createMetric( | ||
sparkContext, | ||
"number of partitions")) ++ readMetrics ++ writeMetrics | ||
|
||
private lazy val serializer: Serializer = | ||
new UnsafeRowSerializer(child.output.size, longMetric("dataSize")) | ||
|
||
override def executeCollect(): Array[InternalRow] = { | ||
ColumnarToRowExec(child).executeTake(limit) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we need to handle There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay |
||
} | ||
|
||
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { | ||
val childRDD = child.executeColumnar() | ||
if (childRDD.getNumPartitions == 0) { | ||
CometExecUtils.emptyRDDWithPartitions(sparkContext, 1) | ||
} else { | ||
val singlePartitionRDD = if (childRDD.getNumPartitions == 1) { | ||
childRDD | ||
} else { | ||
val localLimitedRDD = if (limit >= 0) { | ||
CometExecUtils.getNativeLimitRDD(childRDD, output, limit) | ||
} else { | ||
childRDD | ||
} | ||
// Shuffle to Single Partition using Comet shuffle | ||
val dep = CometShuffleExchangeExec.prepareShuffleDependency( | ||
localLimitedRDD, | ||
child.output, | ||
outputPartitioning, | ||
serializer, | ||
metrics) | ||
metrics("numPartitions").set(dep.partitioner.numPartitions) | ||
|
||
new CometShuffledBatchRDD(dep, readMetrics) | ||
} | ||
CometExecUtils.getNativeLimitRDD(singlePartitionRDD, output, limit) | ||
} | ||
} | ||
|
||
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = | ||
this.copy(child = newChild) | ||
|
||
override def stringArgs: Iterator[Any] = Iterator(limit, offset, child) | ||
|
||
override def equals(obj: Any): Boolean = { | ||
obj match { | ||
case other: CometCollectLimitExec => | ||
this.limit == other.limit && this.offset == other.offset && | ||
this.child == other.child | ||
case _ => | ||
false | ||
} | ||
} | ||
|
||
override def hashCode(): Int = | ||
Objects.hashCode(limit: java.lang.Integer, offset: java.lang.Integer, child) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I'm trying to understand why this is necessary. The test passes even if I remove this rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I didn't add a test case for this part. Like noted in the comment, it's correct to add or remove the
ColumnarToRowExec
on top of aCometExec
.CollectLimitExec
'sexecuteCollect
is optimized by usingexecuteTake
to take rows from child operator. UnlikeCollectLimitExec.doExecute()
orTakeOrderedAndProjectExec.doExecute()
, which would shuffle all the data into a single partition and then get the limited data from shuffled partition,executeTake
will retrieves rows directly from child's RDD without shuffle by partitions.Take the following code for an example:
sql("select * from a_very_large_table limit 100").collect()
.CollectLimitExec
'sexecuteCollect
will try to get the first 100 rows in the first partition, then the next 2 partitions if the previous partition doesn't contains 100 rows, then the next 4 partitions .... without shuffle.I modeled this behavior(see https://github.com/apache/arrow-datafusion-comet/pull/100/files#diff-50c88b1d9b68e7ba24cb6fad9a4f20ea1b8fa63c3c868578db151b83182c627fR57) in
CometCollectLimitExec
as well. However, without this rule, an additionalColumnarToRowExec
operator is wrapped on top ofCometCollectLimitExec
, which makes the override ineffective.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an assert in the test file, which should illustrate the basic idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so if we have the extra
ColumnarToRowExec
, the code will go through itsexecuteCollect
instead which will calldoExecuteÇolumnar
, instead of calling theexecuteCollect
in theCollectLimitExec
itself.I think we can probably do the same for
CometTakeOrderedAndProjectExec
too - Spark has anexecuteCollect
implementation for this too. However, I don't know how useful it is sinceexecuteCollect
is not often used? cc @viiryaThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, exactly.
I checked the implementation of
TakeOrderedAndProjectExec.executeCollect
when reviewingCometTakeOrderedAndProjectExec
, it still shuffles all data into a single partition which is necessary to satisfy the ordering semantic. Hence it's not necessary to do the same forCometTakeOrderedAndProjectExec
.It's used in API/df scenarios, it's quite often for data scientists to collect and explore the data via
collect
with limit set. For the pure SQL and ETL scenario, I believe it's rarely used.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, makes sense. This also removes
ColumnarToRowExec
from the plan even if instead ofexecuteCollect
,doExecute
is used, but I think it is OK sincedoExecute
itself callsColumnarToRowExec