diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 53c335c1eced6..103ece09f6425 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -82,27 +82,47 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + // Match the serialization pattern for case of Dataset.toJSON // Call `planTakeOrdered` first which matches a larger plan. - case ReturnAnswer(rootPlan) => planTakeOrdered(rootPlan).getOrElse(rootPlan match { - // We should match the combination of limit and offset first, to get the optimal physical - // plan, instead of planning limit and offset separately. - case LimitAndOffset(limit, offset, child) => - CollectLimitExec(limit = limit, child = planLater(child), offset = offset) - case OffsetAndLimit(offset, limit, child) => - // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. - CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset) - case Limit(IntegerLiteral(limit), child) => - CollectLimitExec(limit = limit, child = planLater(child)) - case logical.Offset(IntegerLiteral(offset), child) => - CollectLimitExec(child = planLater(child), offset = offset) - case Tail(IntegerLiteral(limit), child) => - CollectTailExec(limit, planLater(child)) - case other => planLater(other) - }) :: Nil + case ReturnAnswer( + SerializeFromObject( + serializer, + MapPartitions( + f, + objAttr1, + DeserializeToObject(deserializer, objAttr2, child)))) => + SerializeFromObjectExec( + serializer, + MapPartitionsExec( + f, + objAttr1, + DeserializeToObjectExec( + deserializer, + objAttr2, + planTakeOrdered(child).getOrElse(planCollectLimit(child))))) :: Nil + case ReturnAnswer(rootPlan) => + planTakeOrdered(rootPlan).getOrElse(planCollectLimit(rootPlan)) :: Nil case other => planTakeOrdered(other).toSeq } + private def planCollectLimit(plan: LogicalPlan): SparkPlan = plan match { + // We should match the combination of limit and offset first, to get the optimal physical + // plan, instead of planning limit and offset separately. + case LimitAndOffset(limit, offset, child) => + CollectLimitExec(limit = limit, child = planLater(child), offset = offset) + case OffsetAndLimit(offset, limit, child) => + // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset) + case Limit(IntegerLiteral(limit), child) => + CollectLimitExec(limit = limit, child = planLater(child)) + case logical.Offset(IntegerLiteral(offset), child) => + CollectLimitExec(child = planLater(child), offset = offset) + case Tail(IntegerLiteral(limit), child) => + CollectTailExec(limit, planLater(child)) + case other => planLater(other) + } + private def planTakeOrdered(plan: LogicalPlan): Option[SparkPlan] = plan match { // We should match the combination of limit and offset first, to get the optimal physical // plan, instead of planning limit and offset separately. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 1400ee25f4319..6584ed21be7d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -262,6 +262,19 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } } + test("SPARK-49919: CollectLimit can be applied with toJSON") { + val query = testData.select($"value").limit(2).toJSON + val planned = query.queryExecution.sparkPlan + assert(planned.exists(_.isInstanceOf[CollectLimitExec])) + val t = testData.select($"value").toJSON.logicalPlan.output + } + + test("SPARK-49919: TakeOrderedAndProject can be applied with toJSON") { + val query = testData.select($"key", $"value").sort($"key").limit(2).toJSON + val planned = query.queryExecution.executedPlan + assert(planned.isInstanceOf[execution.TakeOrderedAndProjectExec]) + } + test("PartitioningCollection") { withTempView("normal", "small", "tiny") { testData.createOrReplaceTempView("normal")