Skip to content

Commit

Permalink
[SPARK-49919][SQL] Add special limits support for return content as J…
Browse files Browse the repository at this point in the history
…SON dataset
  • Loading branch information
LantaoJin committed Oct 10, 2024
1 parent d7772f2 commit 81cc9de
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,47 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
*/
object SpecialLimits extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// Match the serialization pattern for case of Dataset.toJSON
// Call `planTakeOrdered` first which matches a larger plan.
case ReturnAnswer(rootPlan) => planTakeOrdered(rootPlan).getOrElse(rootPlan match {
// We should match the combination of limit and offset first, to get the optimal physical
// plan, instead of planning limit and offset separately.
case LimitAndOffset(limit, offset, child) =>
CollectLimitExec(limit = limit, child = planLater(child), offset = offset)
case OffsetAndLimit(offset, limit, child) =>
// 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'.
CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset)
case Limit(IntegerLiteral(limit), child) =>
CollectLimitExec(limit = limit, child = planLater(child))
case logical.Offset(IntegerLiteral(offset), child) =>
CollectLimitExec(child = planLater(child), offset = offset)
case Tail(IntegerLiteral(limit), child) =>
CollectTailExec(limit, planLater(child))
case other => planLater(other)
}) :: Nil
case ReturnAnswer(
SerializeFromObject(
serializer,
MapPartitions(
f,
objAttr1,
DeserializeToObject(deserializer, objAttr2, child)))) =>
SerializeFromObjectExec(
serializer,
MapPartitionsExec(
f,
objAttr1,
DeserializeToObjectExec(
deserializer,
objAttr2,
planTakeOrdered(child).getOrElse(planCollectLimit(child))))) :: Nil
case ReturnAnswer(rootPlan) =>
planTakeOrdered(rootPlan).getOrElse(planCollectLimit(rootPlan)) :: Nil

case other => planTakeOrdered(other).toSeq
}

private def planCollectLimit(plan: LogicalPlan): SparkPlan = plan match {
// We should match the combination of limit and offset first, to get the optimal physical
// plan, instead of planning limit and offset separately.
case LimitAndOffset(limit, offset, child) =>
CollectLimitExec(limit = limit, child = planLater(child), offset = offset)
case OffsetAndLimit(offset, limit, child) =>
// 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'.
CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset)
case Limit(IntegerLiteral(limit), child) =>
CollectLimitExec(limit = limit, child = planLater(child))
case logical.Offset(IntegerLiteral(offset), child) =>
CollectLimitExec(child = planLater(child), offset = offset)
case Tail(IntegerLiteral(limit), child) =>
CollectTailExec(limit, planLater(child))
case other => planLater(other)
}

private def planTakeOrdered(plan: LogicalPlan): Option[SparkPlan] = plan match {
// We should match the combination of limit and offset first, to get the optimal physical
// plan, instead of planning limit and offset separately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,19 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
}

test("SPARK-49919: CollectLimit can be applied with toJSON") {
val query = testData.select($"value").limit(2).toJSON
val planned = query.queryExecution.sparkPlan
assert(planned.exists(_.isInstanceOf[CollectLimitExec]))
val t = testData.select($"value").toJSON.logicalPlan.output
}

test("SPARK-49919: TakeOrderedAndProject can be applied with toJSON") {
val query = testData.select($"key", $"value").sort($"key").limit(2).toJSON
val planned = query.queryExecution.executedPlan
assert(planned.isInstanceOf[execution.TakeOrderedAndProjectExec])
}

test("PartitioningCollection") {
withTempView("normal", "small", "tiny") {
testData.createOrReplaceTempView("normal")
Expand Down

0 comments on commit 81cc9de

Please sign in to comment.