Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support CollectLimit operator #100

Merged
merged 5 commits into from
Feb 28, 2024

Conversation

advancedxy
Copy link
Contributor

Which issue does this PR close?

Closes #37 .

Rationale for this change

Better operator coverage

What changes are included in this PR?

Add CometCollectLimitExec

How are these changes tested?

Add new test and some manual verification.

@advancedxy advancedxy force-pushed the support_collect_limit_exec branch from 673bb7f to 2d0fbb2 Compare February 24, 2024 07:08
@advancedxy advancedxy marked this pull request as draft February 24, 2024 09:15
@advancedxy advancedxy force-pushed the support_collect_limit_exec branch 2 times, most recently from 009b7e2 to b07f8de Compare February 26, 2024 01:16
@advancedxy advancedxy marked this pull request as ready for review February 26, 2024 01:18
object CometCoalesceExec {

/** A simple RDD with no data, but with the given number of partitions. */
class EmptyRDDWithPartitions(@transient private val sc: SparkContext, numPartitions: Int)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's moving to CometExecUtils, So it can be reused for several places.

@advancedxy
Copy link
Contributor Author

All cleared. cc @sunchao @viirya

@@ -32,4 +33,9 @@ trait ShimCometSparkSessionExtensions {
.map { a => a.setAccessible(true); a }
.flatMap(_.get(scan).asInstanceOf[Option[Aggregation]])
.headOption

def getOffset(limit: LimitExec): Option[Int] = limit.getClass.getDeclaredFields
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I wonder if we can just return 0 if there is no offset in this method, so that we don't have to do getOffset(op).getOrElse(0) in a few places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered it too. But it seems more nature to define a Option[Int] to access a potential non-existed field.

Let me reconsider this part.

// `CometCollectLimitExec` which overrides `executeCollect`, the redundant `ColumnarToRowExec`
// makes the override ineffective. The purpose of this rule is to eliminate the redundant
// `ColumnarToRowExec` for such operators.
case class EliminateRedundantColumnarToRow(session: SparkSession) extends Rule[SparkPlan] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm trying to understand why this is necessary. The test passes even if I remove this rule.

Copy link
Contributor Author

@advancedxy advancedxy Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I didn't add a test case for this part. Like noted in the comment, it's correct to add or remove the ColumnarToRowExec on top of a CometExec.

CollectLimitExec's executeCollect is optimized by using executeTake to take rows from child operator. Unlike CollectLimitExec.doExecute() or TakeOrderedAndProjectExec.doExecute(), which would shuffle all the data into a single partition and then get the limited data from shuffled partition, executeTake will retrieves rows directly from child's RDD without shuffle by partitions.

Take the following code for an example: sql("select * from a_very_large_table limit 100").collect(). CollectLimitExec's executeCollect will try to get the first 100 rows in the first partition, then the next 2 partitions if the previous partition doesn't contains 100 rows, then the next 4 partitions .... without shuffle.

I modeled this behavior(see https://github.com/apache/arrow-datafusion-comet/pull/100/files#diff-50c88b1d9b68e7ba24cb6fad9a4f20ea1b8fa63c3c868578db151b83182c627fR57) in CometCollectLimitExec as well. However, without this rule, an additional ColumnarToRowExec operator is wrapped on top of CometCollectLimitExec, which makes the override ineffective.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an assert in the test file, which should illustrate the basic idea.

          // make sure the root node is CometCollectLimitExec
          assert(qe.executedPlan.isInstanceOf[CometCollectLimitExec])

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so if we have the extra ColumnarToRowExec, the code will go through its executeCollect instead which will call doExecuteÇolumnar, instead of calling the executeCollect in the CollectLimitExec itself.

I think we can probably do the same for CometTakeOrderedAndProjectExec too - Spark has an executeCollect implementation for this too. However, I don't know how useful it is since executeCollect is not often used? cc @viirya

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so if we have the extra ColumnarToRowExec, the code will go through its executeCollect instead which will call doExecuteÇolumnar, instead of calling the executeCollect in the CollectLimitExec itself.

Yeah, exactly.

I think we can probably do the same for CometTakeOrderedAndProjectExec too - Spark has an executeCollect implementation for this too.

I checked the implementation of TakeOrderedAndProjectExec.executeCollect when reviewing CometTakeOrderedAndProjectExec, it still shuffles all data into a single partition which is necessary to satisfy the ordering semantic. Hence it's not necessary to do the same for CometTakeOrderedAndProjectExec.

However, I don't know how useful it is since executeCollect is not often used

It's used in API/df scenarios, it's quite often for data scientists to collect and explore the data via collect with limit set. For the pure SQL and ETL scenario, I believe it's rarely used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, makes sense. This also removes ColumnarToRowExec from the plan even if instead of executeCollect, doExecute is used, but I think it is OK since doExecute itself calls ColumnarToRowExec

@advancedxy advancedxy force-pushed the support_collect_limit_exec branch from b07f8de to afb6513 Compare February 27, 2024 15:05
/**
* TODO: delete after dropping Spark 3.2 and 3.3 support
*/
def getOffset(limit: LimitExec): Int = getOffsetOpt(limit).getOrElse(0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you like this? I think we should expose the getOffset method to accept LimitExec only and it could return Int directly.

The actual implementation could be generic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good 👍

CometExecUtils.getLimitNativePlan(output, limit).get
CometExec.getCometIterator(Seq(iter), limitOp)
}
CometExecUtils.toNativeLimitedPerPartition(childRDD, output, limit)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor to use the utility method.

If not appropriate, I can revert this.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

/**
* TODO: delete after dropping Spark 3.2 and 3.3 support
*/
def getOffset(limit: LimitExec): Int = getOffsetOpt(limit).getOrElse(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good 👍

// `CometCollectLimitExec` which overrides `executeCollect`, the redundant `ColumnarToRowExec`
// makes the override ineffective. The purpose of this rule is to eliminate the redundant
// `ColumnarToRowExec` for such operators.
case class EliminateRedundantColumnarToRow(session: SparkSession) extends Rule[SparkPlan] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, makes sense. This also removes ColumnarToRowExec from the plan even if instead of executeCollect, doExecute is used, but I think it is OK since doExecute itself calls ColumnarToRowExec

Comment on lines 483 to 484
plan.transform { case ColumnarToRowExec(child: CometCollectLimitExec) =>
child
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this looks like a bit dangerous if ColumnarToRowExec + CometCollectLimitExec is not end of the query.

I think the assumption here is the query is to collect data from ColumnarToRowExec + CometCollectLimitExec. So executeCollect is called on ColumnarToRowExec which makes ineffective of CometCollectLimitExec's executeCollect.

The more correct one maybe:

plan match {
  case ColumnarToRowExec(child: CometCollectLimitExec) =>
        child
  case other => other
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the suggest one is better.

Hmm, this looks like a bit dangerous if ColumnarToRowExec + CometCollectLimitExec is not end of the query.

I'd like to point out that ColumnarToRowExec + CometCollectLimitExec will always be the end of the query as CollectLimitExec is the end of query. You can see the SpecialLimits rule which only translate the end of query to a CollectLimitExec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to point out that ColumnarToRowExec + CometCollectLimitExec will always be the end of the query as CollectLimitExec is the end of query.

Yes, this is usually the case. But I remember that at some special cases, users can produce a query tree that some others operators on top of CollectLimitExec. I think this is why CollectLimitExec still implements doExecute not just executeCollect.

Comment on lines 38 to 42
*/
def createEmptyColumnarRDDWithSinglePartition(
sparkContext: SparkContext): RDD[ColumnarBatch] = {
new EmptyRDDWithPartitions(sparkContext, 1)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name is too long. This doesn't save the number of words. 😂

Maybe just keep original one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

if (!plan.exists(op => planClass.isAssignableFrom(op.getClass))) {
assert(
false,
s"Expected plan to contain ${planClass.getSimpleName}.\n" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s"Expected plan to contain ${planClass.getSimpleName}.\n" +
s"Expected plan to contain ${planClass.getSimpleName} but not.\n" +

new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))

override def executeCollect(): Array[InternalRow] = {
ColumnarToRowExec(child).executeTake(limit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to handle limit < 0 case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when offset = 0, limit cannot be limit < 0.
See CollectLimitExec's assert.

image

Let's handle that case when we are adding offset support?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay

@sunchao sunchao merged commit 313111d into apache:main Feb 28, 2024
10 checks passed
@sunchao
Copy link
Member

sunchao commented Feb 28, 2024

Merged, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support CollectLimit operator
3 participants