diff --git a/dev/ensure-jars-have-correct-contents.sh b/dev/ensure-jars-have-correct-contents.sh index 5543093ff6..1f97d2d4a7 100755 --- a/dev/ensure-jars-have-correct-contents.sh +++ b/dev/ensure-jars-have-correct-contents.sh @@ -78,6 +78,8 @@ allowed_expr+="|^org/apache/spark/shuffle/sort/CometShuffleExternalSorter.*$" allowed_expr+="|^org/apache/spark/shuffle/sort/RowPartition.class$" allowed_expr+="|^org/apache/spark/shuffle/comet/.*$" allowed_expr+="|^org/apache/spark/sql/$" +# allow ExplainPlanGenerator trait since it may not be available in older Spark versions +allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$" allowed_expr+="|^org/apache/spark/CometPlugin.class$" allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$" allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$" diff --git a/pom.xml b/pom.xml index 3ff5b2f65b..f48bb7d9c1 100644 --- a/pom.xml +++ b/pom.xml @@ -959,6 +959,9 @@ under the License. javax.annotation.meta.TypeQualifierValidator org.apache.parquet.filter2.predicate.SparkFilterApi + + + org.apache.spark.sql.ExtendedExplainGenerator diff --git a/spark/inspections/CometTPCDSQueriesList-results.txt b/spark/inspections/CometTPCDSQueriesList-results.txt new file mode 100644 index 0000000000..13f99a1ac9 --- /dev/null +++ b/spark/inspections/CometTPCDSQueriesList-results.txt @@ -0,0 +1,838 @@ +Query: q1. Comet Exec: Enabled (CometFilter, CometProject) +Query: q1: ExplainInfo: +ObjectHashAggregate is not supported +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +xxhash64 is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q2. Comet Exec: Enabled (CometFilter, CometProject, CometUnion) +Query: q2: ExplainInfo: +ObjectHashAggregate is not supported +xxhash64 is not supported +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q3. Comet Exec: Enabled (CometFilter, CometProject) +Query: q3: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q4. Comet Exec: Enabled (CometFilter, CometProject) +Query: q4: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q5. Comet Exec: Enabled (CometFilter, CometProject, CometUnion) +Query: q5: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q6. Comet Exec: Enabled (CometHashAggregate, CometFilter, CometProject) +Query: q6: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q8. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q8: ExplainInfo: +ObjectHashAggregate is not supported +getstructfield is not supported +xxhash64 is not supported +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q9. Comet Exec: Enabled (CometFilter) +Query: q9: ExplainInfo: +named_struct is not supported +getstructfield is not supported + +Query: q10. Comet Exec: Enabled (CometFilter, CometProject) +Query: q10: ExplainInfo: +ObjectHashAggregate is not supported +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +xxhash64 is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q11. Comet Exec: Enabled (CometFilter, CometProject) +Query: q11: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q12. Comet Exec: Enabled (CometFilter, CometProject) +Query: q12: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported + +Query: q13. Comet Exec: Enabled (CometFilter, CometProject) +Query: q13: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q14a. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q14a: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native +ObjectHashAggregate is not supported +xxhash64 is not supported +Union disabled because not all child plans are native + +Query: q14b. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q14b: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native +ObjectHashAggregate is not supported +xxhash64 is not supported +Union disabled because not all child plans are native + +Query: q15. Comet Exec: Enabled (CometFilter, CometProject) +Query: q15: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q16. Comet Exec: Enabled (CometFilter, CometProject) +Query: q16: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q17. Comet Exec: Enabled (CometFilter, CometProject) +Query: q17: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q18. Comet Exec: Enabled (CometFilter, CometProject) +Query: q18: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q19. Comet Exec: Enabled (CometFilter, CometProject) +Query: q19: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q20. Comet Exec: Enabled (CometFilter, CometProject) +Query: q20: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported + +Query: q21. Comet Exec: Enabled (CometFilter, CometProject) +Query: q21: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q22. Comet Exec: Enabled (CometFilter, CometProject) +Query: q22: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q23a. Comet Exec: Enabled (CometFilter, CometProject) +Query: q23a: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +ObjectHashAggregate is not supported +xxhash64 is not supported +SortMergeJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q23b. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q23b: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +ObjectHashAggregate is not supported +xxhash64 is not supported +SortMergeJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q24a. Comet Exec: Enabled (CometFilter, CometProject) +Query: q24a: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q24b. Comet Exec: Enabled (CometFilter, CometProject) +Query: q24b: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q25. Comet Exec: Enabled (CometFilter, CometProject) +Query: q25: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q26. Comet Exec: Enabled (CometFilter, CometProject) +Query: q26: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q27. Comet Exec: Enabled (CometFilter, CometProject) +Query: q27: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q28. Comet Exec: Enabled (CometHashAggregate, CometFilter, CometProject) +Query: q28: ExplainInfo: +Unsupported aggregation mode PartialMerge +BroadcastExchange is not supported + +Query: q29. Comet Exec: Enabled (CometFilter, CometProject) +Query: q29: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q30. Comet Exec: Enabled (CometFilter, CometProject) +Query: q30: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +ObjectHashAggregate is not supported +xxhash64 is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q31. Comet Exec: Enabled (CometFilter) +Query: q31: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q32. Comet Exec: Enabled (CometFilter, CometProject) +Query: q32: ExplainInfo: +ObjectHashAggregate is not supported +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +xxhash64 is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q33. Comet Exec: Enabled (CometFilter, CometProject) +Query: q33: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q34. Comet Exec: Enabled (CometFilter, CometProject) +Query: q34: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q35. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q35: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q36. Comet Exec: Enabled (CometFilter, CometProject) +Query: q36: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported + +Query: q37. Comet Exec: Enabled (CometFilter, CometProject) +Query: q37: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q38. Comet Exec: Enabled (CometFilter, CometProject) +Query: q38: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q39a. Comet Exec: Enabled (CometFilter, CometProject) +Query: q39a: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q39b. Comet Exec: Enabled (CometFilter, CometProject) +Query: q39b: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q40. Comet Exec: Enabled (CometFilter, CometProject) +Query: q40: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q41. Comet Exec: Enabled (CometFilter, CometProject) +Query: q41: ExplainInfo: +ObjectHashAggregate is not supported +xxhash64 is not supported +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q42. Comet Exec: Enabled (CometFilter, CometProject) +Query: q42: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q43. Comet Exec: Enabled (CometFilter, CometProject) +Query: q43: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q44. Comet Exec: Enabled (CometHashAggregate, CometFilter, CometProject, CometSort) +Query: q44: ExplainInfo: +Window is not supported +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q45. Comet Exec: Enabled (CometFilter, CometProject) +Query: q45: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q46. Comet Exec: Enabled (CometFilter, CometProject) +Query: q46: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q47. Comet Exec: Enabled (CometFilter, CometProject) +Query: q47: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q48. Comet Exec: Enabled (CometFilter, CometProject) +Query: q48: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q49. Comet Exec: Enabled (CometFilter, CometProject) +Query: q49: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported +Union disabled because not all child plans are native + +Query: q50. Comet Exec: Enabled (CometFilter, CometProject) +Query: q50: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q51. Comet Exec: Enabled (CometFilter, CometProject) +Query: q51: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q52. Comet Exec: Enabled (CometFilter, CometProject) +Query: q52: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q53. Comet Exec: Enabled (CometFilter, CometProject) +Query: q53: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported + +Query: q54. Comet Exec: Enabled (CometFilter, CometProject, CometUnion) +Query: q54: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +ObjectHashAggregate is not supported +xxhash64 is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q55. Comet Exec: Enabled (CometFilter, CometProject) +Query: q55: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q56. Comet Exec: Enabled (CometFilter, CometProject) +Query: q56: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q57. Comet Exec: Enabled (CometFilter, CometProject) +Query: q57: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q58. Comet Exec: Enabled (CometFilter, CometProject) +Query: q58: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q59. Comet Exec: Enabled (CometFilter, CometProject) +Query: q59: ExplainInfo: +ObjectHashAggregate is not supported +xxhash64 is not supported +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q60. Comet Exec: Enabled (CometFilter, CometProject) +Query: q60: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q61. Comet Exec: Enabled (CometFilter, CometProject) +Query: q61: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q62. Comet Exec: Enabled (CometFilter, CometProject) +Query: q62: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q63. Comet Exec: Enabled (CometFilter, CometProject) +Query: q63: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported + +Query: q64. Comet Exec: Enabled (CometFilter, CometProject) +Query: q64: ExplainInfo: +BroadcastExchange is not supported +ObjectHashAggregate is not supported +BroadcastHashJoin disabled because not all child plans are native +xxhash64 is not supported +SortMergeJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q65. Comet Exec: Enabled (CometFilter, CometProject) +Query: q65: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q66. Comet Exec: Enabled (CometFilter, CometProject) +Query: q66: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q67. Comet Exec: Enabled (CometFilter, CometProject) +Query: q67: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported + +Query: q68. Comet Exec: Enabled (CometFilter, CometProject) +Query: q68: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q69. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q69: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q70. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q70: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q71. Comet Exec: Enabled (CometFilter, CometProject) +Query: q71: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q72. Comet Exec: Enabled (CometFilter, CometProject) +Query: q72: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q73. Comet Exec: Enabled (CometFilter, CometProject) +Query: q73: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q74. Comet Exec: Enabled (CometFilter, CometProject) +Query: q74: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q75. Comet Exec: Enabled (CometFilter, CometProject) +Query: q75: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q76. Comet Exec: Enabled (CometFilter, CometProject) +Query: q76: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q77. Comet Exec: Enabled (CometFilter, CometProject) +Query: q77: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q78. Comet Exec: Enabled (CometFilter) +Query: q78: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q79. Comet Exec: Enabled (CometFilter, CometProject) +Query: q79: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q80. Comet Exec: Enabled (CometFilter, CometProject) +Query: q80: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q81. Comet Exec: Enabled (CometFilter, CometProject) +Query: q81: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +ObjectHashAggregate is not supported +xxhash64 is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q82. Comet Exec: Enabled (CometFilter, CometProject) +Query: q82: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q83. Comet Exec: Enabled (CometFilter, CometProject) +Query: q83: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q84. Comet Exec: Enabled (CometFilter, CometProject) +Query: q84: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q85. Comet Exec: Enabled (CometFilter, CometProject) +Query: q85: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q86. Comet Exec: Enabled (CometFilter, CometProject) +Query: q86: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported + +Query: q87. Comet Exec: Enabled (CometFilter, CometProject) +Query: q87: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q88. Comet Exec: Enabled (CometFilter, CometProject) +Query: q88: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q89. Comet Exec: Enabled (CometFilter, CometProject) +Query: q89: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported + +Query: q90. Comet Exec: Enabled (CometFilter, CometProject) +Query: q90: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q91. Comet Exec: Enabled (CometFilter, CometProject) +Query: q91: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q92. Comet Exec: Enabled (CometFilter, CometProject) +Query: q92: ExplainInfo: +ObjectHashAggregate is not supported +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +xxhash64 is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q93. Comet Exec: Enabled (CometFilter, CometProject) +Query: q93: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q94. Comet Exec: Enabled (CometFilter, CometProject) +Query: q94: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q95. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q95: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q96. Comet Exec: Enabled (CometFilter, CometProject) +Query: q96: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q97. Comet Exec: Enabled (CometFilter, CometProject) +Query: q97: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q98. Comet Exec: Enabled (CometFilter, CometProject) +Query: q98: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q99. Comet Exec: Enabled (CometFilter, CometProject) +Query: q99: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q5a-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometUnion) +Query: q5a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q6-v2.7. Comet Exec: Enabled (CometHashAggregate, CometFilter, CometProject) +Query: q6-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q10a-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q10a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q11-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q11-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q12-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q12-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported + +Query: q14-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q14-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native +ObjectHashAggregate is not supported +xxhash64 is not supported +Union disabled because not all child plans are native + +Query: q14a-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q14a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native +ObjectHashAggregate is not supported +xxhash64 is not supported +Union disabled because not all child plans are native + +Query: q18a-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q18a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q20-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q20-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported + +Query: q22-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q22-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Native Broadcast is not enabled + +Query: q22a-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q22a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q24-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q24-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q27a-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q27a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q34-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q34-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q35-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q35-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q35a-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q35a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q36a-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q36a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native +Window is not supported + +Query: q47-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q47-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q49-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q49-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported +Union disabled because not all child plans are native + +Query: q51a-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q51a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q57-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q57-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported +SortMergeJoin disabled because not all child plans are native + +Query: q64-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q64-v2.7: ExplainInfo: +BroadcastExchange is not supported +ObjectHashAggregate is not supported +BroadcastHashJoin disabled because not all child plans are native +xxhash64 is not supported +SortMergeJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q67a-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q67a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native +Window is not supported + +Query: q70a-v2.7. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q70a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported +SortMergeJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q72-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q72-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q74-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q74-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q75-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q75-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q77a-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q77a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q78-v2.7. Comet Exec: Enabled (CometFilter) +Query: q78-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q80a-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q80a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native + +Query: q86a-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q86a-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Union disabled because not all child plans are native +Window is not supported + +Query: q98-v2.7. Comet Exec: Enabled (CometFilter, CometProject) +Query: q98-v2.7: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Window is not supported +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + diff --git a/spark/inspections/CometTPCHQueriesList-results.txt b/spark/inspections/CometTPCHQueriesList-results.txt new file mode 100644 index 0000000000..b51286d807 --- /dev/null +++ b/spark/inspections/CometTPCHQueriesList-results.txt @@ -0,0 +1,142 @@ +Query: q1 TPCH Snappy. Comet Exec: Enabled (CometHashAggregate, CometFilter, CometProject) +Query: q1 TPCH Snappy: ExplainInfo: +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q2 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject) +Query: q2 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +ObjectHashAggregate is not supported +SortMergeJoin disabled because not all child plans are native +xxhash64 is not supported + +Query: q3 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q3 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q4 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject) +Query: q4 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q5 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q5 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +ObjectHashAggregate is not supported +SortMergeJoin disabled because not all child plans are native +xxhash64 is not supported +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q6 TPCH Snappy. Comet Exec: Enabled (CometHashAggregate, CometFilter, CometProject) +Query: q6 TPCH Snappy: ExplainInfo: + + +Query: q7 TPCH Snappy. Comet Exec: Enabled (CometFilter) +Query: q7 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q8 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject) +Query: q8 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q9 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q9 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q10 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q10 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q11 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject) +Query: q11 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q12 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject) +Query: q12 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q13 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject) +Query: q13 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q14 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject) +Query: q14 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q15 TPCH Snappy. Comet Exec: Enabled (CometHashAggregate, CometFilter, CometProject) +Query: q15 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q16 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject) +Query: q16 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q17 TPCH Snappy. Comet Exec: Enabled (CometHashAggregate, CometFilter, CometProject, CometSort) +Query: q17 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q18 TPCH Snappy. Comet Exec: Enabled (CometHashAggregate, CometFilter, CometProject) +Query: q18 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +SortMergeJoin disabled because not all child plans are native + +Query: q19 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject) +Query: q19 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q20 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject) +Query: q20 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +ObjectHashAggregate is not supported +SortMergeJoin disabled because not all child plans are native +xxhash64 is not supported +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q21 TPCH Snappy. Comet Exec: Enabled (CometFilter, CometProject, CometSort) +Query: q21 TPCH Snappy: ExplainInfo: +ObjectHashAggregate is not supported +Sort merge join with a join condition is not supported +xxhash64 is not supported +SortMergeJoin disabled because not all child plans are native +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native + +Query: q22 TPCH Snappy. Comet Exec: Enabled (CometFilter) +Query: q22 TPCH Snappy: ExplainInfo: +BroadcastExchange is not supported +BroadcastHashJoin disabled because not all child plans are native +Shuffle: unsupported Spark partitioning: org.apache.spark.sql.catalyst.plans.physical.RangePartitioning + +Query: q1 TPCH Extended Snappy. Comet Exec: Enabled (CometHashAggregate, CometFilter, CometProject) +Query: q1 TPCH Extended Snappy: ExplainInfo: + + diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 12ec3b9c84..fc0d1336f2 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -24,10 +24,11 @@ import java.nio.ByteOrder import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.network.util.ByteUnit -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.comet._ import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle} import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec @@ -45,7 +46,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.comet.CometConf._ -import org.apache.comet.CometSparkSessionExtensions.{isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar} +import org.apache.comet.CometSparkSessionExtensions.{createMessage, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar, withInfo} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde @@ -75,8 +76,14 @@ class CometSparkSessionExtensions case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { - if (!isCometEnabled(conf) || !isCometScanEnabled(conf)) plan - else { + if (!isCometEnabled(conf) || !isCometScanEnabled(conf)) { + if (!isCometEnabled(conf)) { + withInfo(plan, "Comet is not enabled") + } else if (!isCometScanEnabled(conf)) { + withInfo(plan, "Comet Scan is not enabled") + } + plan + } else { plan.transform { // data source V2 case scanExec: BatchScanExec @@ -91,6 +98,27 @@ class CometSparkSessionExtensions scanExec.copy(scan = cometScan), runtimeFilters = scanExec.runtimeFilters) + // unsupported parquet data source V2 + case scanExec: BatchScanExec if scanExec.scan.isInstanceOf[ParquetScan] => + val requiredSchema = scanExec.scan.asInstanceOf[ParquetScan].readDataSchema + val info1 = createMessage( + !isSchemaSupported(requiredSchema), + s"Schema $requiredSchema is not supported") + val readPartitionSchema = scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema + val info2 = createMessage( + !isSchemaSupported(readPartitionSchema), + s"Schema $readPartitionSchema is not supported") + // Comet does not support pushedAggregate + val info3 = createMessage( + getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isDefined, + "Comet does not support pushed aggregate") + withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString("\n")) + scanExec + + case scanExec: BatchScanExec if !scanExec.scan.isInstanceOf[ParquetScan] => + withInfo(scanExec, "Comet Scan only supports Parquet") + scanExec + // iceberg scan case scanExec: BatchScanExec => if (isSchemaSupported(scanExec.scan.readSchema())) { @@ -103,22 +131,24 @@ class CometSparkSessionExtensions scanExec.clone().asInstanceOf[BatchScanExec], runtimeFilters = scanExec.runtimeFilters) case _ => - logInfo( - "Comet extension is not enabled for " + - s"${scanExec.scan.getClass.getSimpleName}: not enabled on data source side") + val msg = "Comet extension is not enabled for " + + s"${scanExec.scan.getClass.getSimpleName}: not enabled on data source side" + logInfo(msg) + withInfo(scanExec, msg) scanExec } } else { - logInfo( - "Comet extension is not enabled for " + - s"${scanExec.scan.getClass.getSimpleName}: Schema not supported") + val msg = "Comet extension is not enabled for " + + s"${scanExec.scan.getClass.getSimpleName}: Schema not supported" + logInfo(msg) + withInfo(scanExec, msg) scanExec } // data source V1 case scanExec @ FileSourceScanExec( HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _), - _: Seq[_], + _: Seq[AttributeReference], requiredSchema, _, _, @@ -128,6 +158,26 @@ class CometSparkSessionExtensions _) if isSchemaSupported(requiredSchema) && isSchemaSupported(partitionSchema) => logInfo("Comet extension enabled for v1 Scan") CometScanExec(scanExec, session) + + // data source v1 not supported case + case scanExec @ FileSourceScanExec( + HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _), + _: Seq[AttributeReference], + requiredSchema, + _, + _, + _, + _, + _, + _) => + val info1 = createMessage( + !isSchemaSupported(requiredSchema), + s"Schema $requiredSchema is not supported") + val info2 = createMessage( + !isSchemaSupported(partitionSchema), + s"Partition schema $partitionSchema is not supported") + withInfo(scanExec, Seq(info1, info2).flatten.mkString(",")) + scanExec } } } @@ -138,7 +188,7 @@ class CometSparkSessionExtensions plan.transformUp { case s: ShuffleExchangeExec if isCometPlan(s.child) && !isCometColumnarShuffleEnabled(conf) && - QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning) => + QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1 => logInfo("Comet extension enabled for Native Shuffle") // Switch to use Decimal128 regardless of precision, since Arrow native execution @@ -151,7 +201,7 @@ class CometSparkSessionExtensions case s: ShuffleExchangeExec if (!s.child.supportsColumnar || isCometPlan( s.child)) && isCometColumnarShuffleEnabled(conf) && - QueryPlanSerde.supportPartitioningTypes(s.child.output) && + QueryPlanSerde.supportPartitioningTypes(s.child.output)._1 && !isShuffleOperator(s.child) => logInfo("Comet extension enabled for JVM Columnar Shuffle") CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle) @@ -380,6 +430,14 @@ class CometSparkSessionExtensions op } + case op: ShuffledHashJoinExec if !isCometOperatorEnabled(conf, "hash_join") => + withInfo(op, "ShuffleHashJoin is not enabled") + op + + case op: ShuffledHashJoinExec if !op.children.forall(isCometNative(_)) => + withInfo(op, "ShuffleHashJoin disabled because not all child plans are native") + op + case op: BroadcastHashJoinExec if isCometOperatorEnabled(conf, "broadcast_hash_join") && op.children.forall(isCometNative(_)) => @@ -401,6 +459,10 @@ class CometSparkSessionExtensions op } + case op: BroadcastHashJoinExec if !isCometOperatorEnabled(conf, "broadcast_hash_join") => + withInfo(op, "BroadcastHashJoin is not enabled") + op + case op: SortMergeJoinExec if isCometOperatorEnabled(conf, "sort_merge_join") && op.children.forall(isCometNative(_)) => @@ -420,6 +482,13 @@ class CometSparkSessionExtensions case None => op } + case op: SortMergeJoinExec if !isCometOperatorEnabled(conf, "sort_merge_join") => + withInfo(op, "SortMergeJoin is not enabled") + op + + case op: SortMergeJoinExec if !op.children.forall(isCometNative(_)) => + withInfo(op, "SortMergeJoin disabled because not all child plans are native") + op case c @ CoalesceExec(numPartitions, child) if isCometOperatorEnabled(conf, "coalesce") @@ -432,6 +501,14 @@ class CometSparkSessionExtensions c } + case c @ CoalesceExec(_, _) if !isCometOperatorEnabled(conf, "coalesce") => + withInfo(c, "Coalesce is not enabled") + c + + case op: CoalesceExec if !op.children.forall(isCometNative(_)) => + withInfo(op, "Coalesce disabled because not all child plans are native") + op + case s: TakeOrderedAndProjectExec if isCometNative(s.child) && isCometOperatorEnabled(conf, "takeOrderedAndProjectExec") && isCometShuffleEnabled(conf) && @@ -445,6 +522,16 @@ class CometSparkSessionExtensions s } + case s: TakeOrderedAndProjectExec => + val info1 = createMessage( + !isCometOperatorEnabled(conf, "takeOrderedAndProjectExec"), + "TakeOrderedAndProject is not enabled") + val info2 = createMessage( + !isCometShuffleEnabled(conf), + "TakeOrderedAndProject requires shuffle to be enabled") + withInfo(s, Seq(info1, info2).flatten.mkString(",")) + s + case u: UnionExec if isCometOperatorEnabled(conf, "union") && u.children.forall(isCometNative) => @@ -456,6 +543,14 @@ class CometSparkSessionExtensions u } + case u: UnionExec if !isCometOperatorEnabled(conf, "union") => + withInfo(u, "Union is not enabled") + u + + case op: UnionExec if !op.children.forall(isCometNative(_)) => + withInfo(op, "Union disabled because not all child plans are native") + op + // For AQE broadcast stage on a Comet broadcast exchange case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => val newOp = transform1(s) @@ -488,12 +583,24 @@ class CometSparkSessionExtensions if (isCometNative(newPlan) || isCometBroadCastForceEnabled(conf)) { newPlan } else { + if (!isCometOperatorEnabled( + conf, + "broadcastExchangeExec") && !isCometBroadCastForceEnabled(conf)) { + withInfo(plan, "Native Broadcast is not enabled") + } plan } } else { plan } + // this case should be checked only after the previous case checking for a + // child BroadcastExchange has been applied, otherwise that transform + // never gets applied + case op: BroadcastHashJoinExec if !op.children.forall(isCometNative(_)) => + withInfo(op, "BroadcastHashJoin disabled because not all child plans are native") + op + // For AQE shuffle stage on a Comet shuffle exchange case s @ ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) => val newOp = transform1(s) @@ -523,7 +630,7 @@ class CometSparkSessionExtensions case s: ShuffleExchangeExec if isCometShuffleEnabled(conf) && !isCometColumnarShuffleEnabled(conf) && - QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning) => + QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1 => logInfo("Comet extension enabled for Native Shuffle") val newOp = transform1(s) @@ -544,7 +651,7 @@ class CometSparkSessionExtensions // convert it to CometColumnarShuffle, case s: ShuffleExchangeExec if isCometShuffleEnabled(conf) && isCometColumnarShuffleEnabled(conf) && - QueryPlanSerde.supportPartitioningTypes(s.child.output) && + QueryPlanSerde.supportPartitioningTypes(s.child.output)._1 && !isShuffleOperator(s.child) => logInfo("Comet extension enabled for JVM Columnar Shuffle") @@ -562,9 +669,32 @@ class CometSparkSessionExtensions s } + case s: ShuffleExchangeExec => + val isShuffleEnabled = isCometShuffleEnabled(conf) + val msg1 = createMessage(!isShuffleEnabled, "Native shuffle is not enabled") + val columnarShuffleEnabled = isCometColumnarShuffleEnabled(conf) + val msg2 = createMessage( + isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde + .supportPartitioning(s.child.output, s.outputPartitioning) + ._1, + "Shuffle: " + + s"${QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._2}") + val msg3 = createMessage( + isShuffleEnabled && columnarShuffleEnabled && !QueryPlanSerde + .supportPartitioningTypes(s.child.output) + ._1, + s"Columnar shuffle: ${QueryPlanSerde.supportPartitioningTypes(s.child.output)._2}") + withInfo(s, Seq(msg1, msg2, msg3).flatten.mkString(",")) + s + case op => // An operator that is not supported by Comet - op + op match { + case _: CometExec | _: CometBroadcastExchangeExec | _: CometShuffleExchangeExec => op + case o => + withInfo(o, s"${o.nodeName} is not supported") + o + } } } @@ -853,4 +983,63 @@ object CometSparkSessionExtensions extends Logging { ByteUnit.MiB.toBytes(shuffleMemorySize) } } + + /** + * Attaches explain information to a TreeNode, rolling up the corresponding information tags + * from any child nodes + * + * @param node + * The node to attach the explain information to. Typically a SparkPlan + * @param info + * Information text. Optional, may be null or empty. If not provided, then only information + * from child nodes will be included. + * @param exprs + * Child nodes. Information attached in these nodes will be be included in the information + * attached to @node + * @tparam T + * The type of the TreeNode. Typically SparkPlan, AggregateExpression, or Expression + * @return + * The node with information (if any) attached + */ + def withInfo[T <: TreeNode[_]](node: T, info: String, exprs: T*): T = { + val exprInfo = exprs + .flatMap { e => Seq(e.getTagValue(CometExplainInfo.EXTENSION_INFO)) } + .flatten + .mkString("\n") + if (info != null && info.nonEmpty && exprInfo.nonEmpty) { + node.setTagValue(CometExplainInfo.EXTENSION_INFO, Seq(exprInfo, info).mkString("\n")) + } else if (exprInfo.nonEmpty) { + node.setTagValue(CometExplainInfo.EXTENSION_INFO, exprInfo) + } else if (info != null && info.nonEmpty) { + node.setTagValue(CometExplainInfo.EXTENSION_INFO, info) + } + node + } + + /** + * Attaches explain information to a TreeNode, rolling up the corresponding information tags + * from any child nodes + * + * @param node + * The node to attach the explain information to. Typically a SparkPlan + * @param exprs + * Child nodes. Information attached in these nodes will be be included in the information + * attached to @node + * @tparam T + * The type of the TreeNode. Typically SparkPlan, AggregateExpression, or Expression + * @return + * The node with information (if any) attached + */ + def withInfo[T <: TreeNode[_]](node: T, exprs: T*): T = { + withInfo(node, "", exprs: _*) + } + + // Helper to reduce boilerplate + def createMessage(condition: Boolean, message: => String): Option[String] = { + if (condition) { + Some(message) + } else { + None + } + } } diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala new file mode 100644 index 0000000000..8d27501c8a --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import scala.collection.mutable + +import org.apache.spark.sql.ExtendedExplainGenerator +import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag} +import org.apache.spark.sql.execution.{InputAdapter, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} + +class ExtendedExplainInfo extends ExtendedExplainGenerator { + + override def title: String = "Comet" + + override def generateExtendedInfo(plan: SparkPlan): String = { + val info = extensionInfo(plan) + info.distinct.mkString("\n").trim + } + + private def getActualPlan(node: TreeNode[_]): TreeNode[_] = { + node match { + case p: AdaptiveSparkPlanExec => getActualPlan(p.executedPlan) + case p: InputAdapter => getActualPlan(p.child) + case p: QueryStageExec => getActualPlan(p.plan) + case p: WholeStageCodegenExec => getActualPlan(p.child) + case p => p + } + } + + private def extensionInfo(node: TreeNode[_]): mutable.Seq[String] = { + var info = mutable.Seq[String]() + val sorted = sortup(node) + sorted.foreach { p => + val all: Array[String] = + getActualPlan(p).getTagValue(CometExplainInfo.EXTENSION_INFO).getOrElse("").split("\n") + for (s <- all) { + info = info :+ s + } + } + info.filter(!_.contentEquals("\n")) + } + + // get all plan nodes, breadth first traversal, then returned the reversed list so + // leaf nodes are first + private def sortup(node: TreeNode[_]): mutable.Queue[TreeNode[_]] = { + val ordered = new mutable.Queue[TreeNode[_]]() + val traversed = mutable.Queue[TreeNode[_]](getActualPlan(node)) + while (traversed.nonEmpty) { + val s = traversed.dequeue() + ordered += s + if (s.innerChildren.nonEmpty) { + s.innerChildren.foreach { + case c @ (_: TreeNode[_]) => traversed.enqueue(getActualPlan(c)) + case _ => + } + } + if (s.children.nonEmpty) { + s.children.foreach { + case c @ (_: TreeNode[_]) => traversed.enqueue(getActualPlan(c)) + case _ => + } + } + } + ordered.reverse + } +} + +object CometExplainInfo { + val EXTENSION_INFO = new TreeNodeTag[String]("CometExtensionInfo") +} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index cb767d5fe2..0bc2c1d3c7 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark32, isSpark34Plus} +import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark32, isSpark34Plus, withInfo} import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc} import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo} import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, JoinType, Operator} @@ -52,7 +52,7 @@ import org.apache.comet.shims.ShimQueryPlanSerde */ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { def emitWarning(reason: String): Unit = { - logWarning(s"Comet native execution: $reason") + logWarning(s"Comet native execution is disabled due to: $reason") } def supportedDataType(dt: DataType): Boolean = dt match { @@ -218,6 +218,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setSum(sumBuilder) .build()) } else { + if (dataType.isEmpty) { + withInfo(aggExpr, s"datatype ${s.dataType} is not supported", child) + } else { + withInfo(aggExpr, child) + } None } case s @ Average(child, _) if avgDataTypeSupported(s.dataType) && isLegacyMode(s) => @@ -249,7 +254,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .newBuilder() .setAvg(builder) .build()) + } else if (dataType.isEmpty) { + withInfo(aggExpr, s"datatype ${s.dataType} is not supported", child) + None } else { + withInfo(aggExpr, child) None } case Count(children) => @@ -265,6 +274,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setCount(countBuilder) .build()) } else { + withInfo(aggExpr, children: _*) None } case min @ Min(child) if minMaxDataTypeSupported(min.dataType) => @@ -281,7 +291,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .newBuilder() .setMin(minBuilder) .build()) + } else if (dataType.isEmpty) { + withInfo(aggExpr, s"datatype ${min.dataType} is not supported", child) + None } else { + withInfo(aggExpr, child) None } case max @ Max(child) if minMaxDataTypeSupported(max.dataType) => @@ -298,7 +312,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .newBuilder() .setMax(maxBuilder) .build()) + } else if (dataType.isEmpty) { + withInfo(aggExpr, s"datatype ${max.dataType} is not supported", child) + None } else { + withInfo(aggExpr, child) None } case first @ First(child, ignoreNulls) @@ -316,7 +334,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .newBuilder() .setFirst(firstBuilder) .build()) + } else if (dataType.isEmpty) { + withInfo(aggExpr, s"datatype ${first.dataType} is not supported", child) + None } else { + withInfo(aggExpr, child) None } case last @ Last(child, ignoreNulls) @@ -334,7 +356,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .newBuilder() .setLast(lastBuilder) .build()) + } else if (dataType.isEmpty) { + withInfo(aggExpr, s"datatype ${last.dataType} is not supported", child) + None } else { + withInfo(aggExpr, child) None } case bitAnd @ BitAndAgg(child) if bitwiseAggTypeSupported(bitAnd.dataType) => @@ -351,7 +377,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .newBuilder() .setBitAndAgg(bitAndBuilder) .build()) + } else if (dataType.isEmpty) { + withInfo(aggExpr, s"datatype ${bitAnd.dataType} is not supported", child) + None } else { + withInfo(aggExpr, child) None } case bitOr @ BitOrAgg(child) if bitwiseAggTypeSupported(bitOr.dataType) => @@ -368,7 +398,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .newBuilder() .setBitOrAgg(bitOrBuilder) .build()) + } else if (dataType.isEmpty) { + withInfo(aggExpr, s"datatype ${bitOr.dataType} is not supported", child) + None } else { + withInfo(aggExpr, child) None } case bitXor @ BitXorAgg(child) if bitwiseAggTypeSupported(bitXor.dataType) => @@ -385,7 +419,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .newBuilder() .setBitXorAgg(bitXorBuilder) .build()) + } else if (dataType.isEmpty) { + withInfo(aggExpr, s"datatype ${bitXor.dataType} is not supported", child) + None } else { + withInfo(aggExpr, child) None } case cov @ CovSample(child1, child2, _) => @@ -427,7 +465,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { None } case fn => - emitWarning(s"unsupported Spark aggregate function: $fn") + val msg = s"unsupported Spark aggregate function: ${fn.prettyName}" + emitWarning(msg) + withInfo(aggExpr, msg, fn.children: _*) None } } @@ -470,6 +510,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setCast(castBuilder) .build()) } else { + if (!dataType.isDefined) { + withInfo(expr, s"Unsupported datatype ${dt}") + } else { + withInfo(expr, s"Unsupported expression $childExpr") + } None } } @@ -478,7 +523,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { SQLConf.get expr match { case a @ Alias(_, _) => - exprToProtoInternal(a.child, inputs) + val r = exprToProtoInternal(a.child, inputs) + if (r.isEmpty) { + withInfo(expr, a.child) + } + r case cast @ Cast(_: Literal, dataType, _, _) => // This can happen after promoting decimal precisions @@ -487,14 +536,19 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { case Cast(child, dt, timeZoneId, evalMode) => val childExpr = exprToProtoInternal(child, inputs) - val evalModeStr = if (evalMode.isInstanceOf[Boolean]) { - // Spark 3.2 & 3.3 has ansiEnabled boolean - if (evalMode.asInstanceOf[Boolean]) "ANSI" else "LEGACY" + if (childExpr.isDefined) { + val evalModeStr = if (evalMode.isInstanceOf[Boolean]) { + // Spark 3.2 & 3.3 has ansiEnabled boolean + if (evalMode.asInstanceOf[Boolean]) "ANSI" else "LEGACY" + } else { + // Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY + evalMode.toString + } + castToProto(timeZoneId, dt, childExpr, evalModeStr) } else { - // Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY - evalMode.toString + withInfo(expr, child) + None } - castToProto(timeZoneId, dt, childExpr, evalModeStr) case add @ Add(left, right, _) if supportedDataType(left.dataType) => val leftExpr = exprToProtoInternal(left, inputs) @@ -515,9 +569,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setAdd(addBuilder) .build()) } else { + withInfo(add, left, right) None } + case add @ Add(left, _, _) if !supportedDataType(left.dataType) => + withInfo(add, s"Unsupported datatype ${left.dataType}") + None + case sub @ Subtract(left, right, _) if supportedDataType(left.dataType) => val leftExpr = exprToProtoInternal(left, inputs) val rightExpr = exprToProtoInternal(right, inputs) @@ -537,9 +596,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setSubtract(builder) .build()) } else { + withInfo(sub, left, right) None } + case sub @ Subtract(left, _, _) if !supportedDataType(left.dataType) => + withInfo(sub, s"Unsupported datatype ${left.dataType}") + None + case mul @ Multiply(left, right, _) if supportedDataType(left.dataType) && !decimalBeforeSpark34(left.dataType) => val leftExpr = exprToProtoInternal(left, inputs) @@ -560,9 +624,19 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setMultiply(builder) .build()) } else { + withInfo(mul, left, right) None } + case mul @ Multiply(left, right, _) => + if (!supportedDataType(left.dataType)) { + withInfo(mul, s"Unsupported datatype ${left.dataType}") + } + if (decimalBeforeSpark34(left.dataType)) { + withInfo(mul, "Decimal support requires Spark 3.4 or later") + } + None + case div @ Divide(left, right, _) if supportedDataType(left.dataType) && !decimalBeforeSpark34(left.dataType) => val leftExpr = exprToProtoInternal(left, inputs) @@ -586,8 +660,17 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setDivide(builder) .build()) } else { + withInfo(div, left, right) None } + case div @ Divide(left, right, _) => + if (!supportedDataType(left.dataType)) { + withInfo(div, s"Unsupported datatype ${left.dataType}") + } + if (decimalBeforeSpark34(left.dataType)) { + withInfo(div, "Decimal support requires Spark 3.4 or later") + } + None case rem @ Remainder(left, right, _) if supportedDataType(left.dataType) && !decimalBeforeSpark34(left.dataType) => @@ -609,8 +692,17 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setRemainder(builder) .build()) } else { + withInfo(rem, left, right) None } + case rem @ Remainder(left, _, _) => + if (!supportedDataType(left.dataType)) { + withInfo(rem, s"Unsupported datatype ${left.dataType}") + } + if (decimalBeforeSpark34(left.dataType)) { + withInfo(rem, "Decimal support requires Spark 3.4 or later") + } + None case EqualTo(left, right) => val leftExpr = exprToProtoInternal(left, inputs) @@ -627,6 +719,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setEq(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -645,6 +738,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setNeq(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -663,6 +757,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setEqNullSafe(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -681,6 +776,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setNeqNullSafe(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -699,6 +795,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setGt(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -717,6 +814,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setGtEq(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -735,6 +833,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setLt(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -753,6 +852,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setLtEq(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -800,8 +900,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setLiteral(exprBuilder) .build()) } else { + withInfo(expr, s"Unsupported datatype $dataType") None } + case Literal(_, dataType) if !supportedDataType(dataType) => + withInfo(expr, s"Unsupported datatype $dataType") + None case Substring(str, Literal(pos, _), Literal(len, _)) => val strExpr = exprToProtoInternal(str, inputs) @@ -818,6 +922,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setSubstring(builder) .build()) } else { + withInfo(expr, str) None } @@ -837,27 +942,28 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setLike(builder) .build()) } else { + withInfo(expr, left, right) None } // TODO waiting for arrow-rs update - // case RLike(left, right) => - // val leftExpr = exprToProtoInternal(left, inputs) - // val rightExpr = exprToProtoInternal(right, inputs) - // - // if (leftExpr.isDefined && rightExpr.isDefined) { - // val builder = ExprOuterClass.RLike.newBuilder() - // builder.setLeft(leftExpr.get) - // builder.setRight(rightExpr.get) - // - // Some( - // ExprOuterClass.Expr - // .newBuilder() - // .setRlike(builder) - // .build()) - // } else { - // None - // } +// case RLike(left, right) => +// val leftExpr = exprToProtoInternal(left, inputs) +// val rightExpr = exprToProtoInternal(right, inputs) +// +// if (leftExpr.isDefined && rightExpr.isDefined) { +// val builder = ExprOuterClass.RLike.newBuilder() +// builder.setLeft(leftExpr.get) +// builder.setRight(rightExpr.get) +// +// Some( +// ExprOuterClass.Expr +// .newBuilder() +// .setRlike(builder) +// .build()) +// } else { +// None +// } case StartsWith(left, right) => val leftExpr = exprToProtoInternal(left, inputs) @@ -874,6 +980,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setStartsWith(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -892,6 +999,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setEndsWith(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -910,6 +1018,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setContains(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -926,6 +1035,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setStringSpace(builder) .build()) } else { + withInfo(expr, child) None } @@ -945,6 +1055,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setHour(builder) .build()) } else { + withInfo(expr, child) None } @@ -964,6 +1075,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setMinute(builder) .build()) } else { + withInfo(expr, child) None } @@ -982,6 +1094,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setTruncDate(builder) .build()) } else { + withInfo(expr, child, format) None } @@ -1003,6 +1116,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setTruncTimestamp(builder) .build()) } else { + withInfo(expr, child, format) None } @@ -1022,13 +1136,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setSecond(builder) .build()) } else { + withInfo(expr, child) None } case Year(child) => val periodType = exprToProtoInternal(Literal("year"), inputs) val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("datepart", Seq(periodType, childExpr): _*) + val optExpr = scalarExprToProto("datepart", Seq(periodType, childExpr): _*) .map(e => { Expr .newBuilder() @@ -1041,6 +1156,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .build()) .build() }) + optExprWithInfo(optExpr, expr, child) case IsNull(child) => val childExpr = exprToProtoInternal(child, inputs) @@ -1055,6 +1171,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setIsNull(castBuilder) .build()) } else { + withInfo(expr, child) None } @@ -1071,6 +1188,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setIsNotNull(castBuilder) .build()) } else { + withInfo(expr, child) None } @@ -1097,6 +1215,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setSortOrder(sortOrderBuilder) .build()) } else { + withInfo(expr, child) None } @@ -1115,6 +1234,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setAnd(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -1133,6 +1253,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setOr(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -1159,6 +1280,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setCheckOverflow(builder) .build()) } else { + withInfo(expr, child) None } @@ -1195,35 +1317,44 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .build()) } } else { + withInfo(attr, s"unsupported datatype: ${attr.dataType}") None } case Abs(child, _) => - exprToProtoInternal(child, inputs).map(childExpr => { + val childExpr = exprToProtoInternal(child, inputs) + if (childExpr.isDefined) { val abs = ExprOuterClass.Abs .newBuilder() - .setChild(childExpr) + .setChild(childExpr.get) .build() - Expr.newBuilder().setAbs(abs).build() - }) + Some(Expr.newBuilder().setAbs(abs).build()) + } else { + withInfo(expr, child) + None + } case Acos(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("acos", childExpr) + val optExpr = scalarExprToProto("acos", childExpr) + optExprWithInfo(optExpr, expr, child) case Asin(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("asin", childExpr) + val optExpr = scalarExprToProto("asin", childExpr) + optExprWithInfo(optExpr, expr, child) case Atan(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("atan", childExpr) + val optExpr = scalarExprToProto("atan", childExpr) + optExprWithInfo(optExpr, expr, child) case Atan2(left, right) => val leftExpr = exprToProtoInternal(left, inputs) val rightExpr = exprToProtoInternal(right, inputs) - scalarExprToProto("atan2", leftExpr, rightExpr) + val optExpr = scalarExprToProto("atan2", leftExpr, rightExpr) + optExprWithInfo(optExpr, expr, left, right) case e @ Ceil(child) => val childExpr = exprToProtoInternal(child, inputs) @@ -1231,18 +1362,22 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { case t: DecimalType if t.scale == 0 => // zero scale is no-op childExpr case t: DecimalType if t.scale < 0 => // Spark disallows negative scale SPARK-30252 + withInfo(e, s"Decimal type $t has negative scale") None case _ => - scalarExprToProtoWithReturnType("ceil", e.dataType, childExpr) + val optExpr = scalarExprToProtoWithReturnType("ceil", e.dataType, childExpr) + optExprWithInfo(optExpr, expr, child) } case Cos(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("cos", childExpr) + val optExpr = scalarExprToProto("cos", childExpr) + optExprWithInfo(optExpr, expr, child) case Exp(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("exp", childExpr) + val optExpr = scalarExprToProto("exp", childExpr) + optExprWithInfo(optExpr, expr, child) case e @ Floor(child) => val childExpr = exprToProtoInternal(child, inputs) @@ -1250,27 +1385,33 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { case t: DecimalType if t.scale == 0 => // zero scale is no-op childExpr case t: DecimalType if t.scale < 0 => // Spark disallows negative scale SPARK-30252 + withInfo(e, s"Decimal type $t has negative scale") None case _ => - scalarExprToProtoWithReturnType("floor", e.dataType, childExpr) + val optExpr = scalarExprToProtoWithReturnType("floor", e.dataType, childExpr) + optExprWithInfo(optExpr, expr, child) } case Log(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("ln", childExpr) + val optExpr = scalarExprToProto("ln", childExpr) + optExprWithInfo(optExpr, expr, child) case Log10(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("log10", childExpr) + val optExpr = scalarExprToProto("log10", childExpr) + optExprWithInfo(optExpr, expr, child) case Log2(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("log2", childExpr) + val optExpr = scalarExprToProto("log2", childExpr) + optExprWithInfo(optExpr, expr, child) case Pow(left, right) => val leftExpr = exprToProtoInternal(left, inputs) val rightExpr = exprToProtoInternal(right, inputs) - scalarExprToProto("pow", leftExpr, rightExpr) + val optExpr = scalarExprToProto("pow", leftExpr, rightExpr) + optExprWithInfo(optExpr, expr, left, right) // round function for Spark 3.2 does not allow negative round target scale. In addition, // it has different result precision/scale for decimals. Supporting only 3.3 and above. @@ -1282,6 +1423,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { lazy val childExpr = exprToProtoInternal(r.child, inputs) r.child.dataType match { case t: DecimalType if t.scale < 0 => // Spark disallows negative scale SPARK-30252 + withInfo(r, "Decimal type has negative scale") None case _ if scaleV == null => exprToProtoInternal(Literal(null), inputs) @@ -1302,36 +1444,47 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { // I.e. 6.13171162472835E18 == 6.1317116247283497E18. However, toString() does not. // That results in round(6.1317116247283497E18, -5) == 6.1317116247282995E18 instead // of 6.1317116247283999E18. + withInfo(r, "Comet does not support Spark's BigDecimal rounding") None case _ => // `scale` must be Int64 type in DataFusion val scaleExpr = exprToProtoInternal(Literal(_scale.toLong, LongType), inputs) - scalarExprToProtoWithReturnType("round", r.dataType, childExpr, scaleExpr) + val optExpr = + scalarExprToProtoWithReturnType("round", r.dataType, childExpr, scaleExpr) + optExprWithInfo(optExpr, expr, r.child) } case Signum(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("signum", childExpr) + val optExpr = scalarExprToProto("signum", childExpr) + optExprWithInfo(optExpr, expr, child) case Sin(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("sin", childExpr) + val optExpr = scalarExprToProto("sin", childExpr) + optExprWithInfo(optExpr, expr, child) case Sqrt(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("sqrt", childExpr) + val optExpr = scalarExprToProto("sqrt", childExpr) + optExprWithInfo(optExpr, expr, child) case Tan(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("tan", childExpr) + val optExpr = scalarExprToProto("tan", childExpr) + optExprWithInfo(optExpr, expr, child) case Ascii(child) => - val childExpr = exprToProtoInternal(Cast(child, StringType), inputs) - scalarExprToProto("ascii", childExpr) + val castExpr = Cast(child, StringType) + val childExpr = exprToProtoInternal(castExpr, inputs) + val optExpr = scalarExprToProto("ascii", childExpr) + optExprWithInfo(optExpr, expr, castExpr) case BitLength(child) => - val childExpr = exprToProtoInternal(Cast(child, StringType), inputs) - scalarExprToProto("bit_length", childExpr) + val castExpr = Cast(child, StringType) + val childExpr = exprToProtoInternal(castExpr, inputs) + val optExpr = scalarExprToProto("bit_length", childExpr) + optExprWithInfo(optExpr, expr, castExpr) case If(predicate, trueValue, falseValue) => val predicateExpr = exprToProtoInternal(predicate, inputs) @@ -1348,22 +1501,32 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setIf(builder) .build()) } else { + withInfo(expr, predicate, trueValue, falseValue) None } case CaseWhen(branches, elseValue) => - val whenSeq = branches.map(elements => exprToProtoInternal(elements._1, inputs)) - val thenSeq = branches.map(elements => exprToProtoInternal(elements._2, inputs)) + var allBranches: Seq[Expression] = Seq() + val whenSeq = branches.map(elements => { + allBranches = allBranches :+ elements._1 + exprToProtoInternal(elements._1, inputs) + }) + val thenSeq = branches.map(elements => { + allBranches = allBranches :+ elements._1 + exprToProtoInternal(elements._2, inputs) + }) assert(whenSeq.length == thenSeq.length) if (whenSeq.forall(_.isDefined) && thenSeq.forall(_.isDefined)) { val builder = ExprOuterClass.CaseWhen.newBuilder() builder.addAllWhen(whenSeq.map(_.get).asJava) builder.addAllThen(thenSeq.map(_.get).asJava) if (elseValue.isDefined) { - val elseValueExpr = exprToProtoInternal(elseValue.get, inputs) + val elseValueExpr = + exprToProtoInternal(elseValue.get, inputs) if (elseValueExpr.isDefined) { builder.setElseExpr(elseValueExpr.get) } else { + withInfo(expr, elseValue.get) return None } } @@ -1373,78 +1536,113 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setCaseWhen(builder) .build()) } else { + withInfo(expr, allBranches: _*) None } - case ConcatWs(children) => - val exprs = children.map(e => exprToProtoInternal(Cast(e, StringType), inputs)) - scalarExprToProto("concat_ws", exprs: _*) + var childExprs: Seq[Expression] = Seq() + val exprs = children.map(e => { + val castExpr = Cast(e, StringType) + childExprs = childExprs :+ castExpr + exprToProtoInternal(castExpr, inputs) + }) + val optExpr = scalarExprToProto("concat_ws", exprs: _*) + optExprWithInfo(optExpr, expr, childExprs: _*) case Chr(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProto("chr", childExpr) + val optExpr = scalarExprToProto("chr", childExpr) + optExprWithInfo(optExpr, expr, child) case InitCap(child) => - val childExpr = exprToProtoInternal(Cast(child, StringType), inputs) - scalarExprToProto("initcap", childExpr) + val castExpr = Cast(child, StringType) + val childExpr = exprToProtoInternal(castExpr, inputs) + val optExpr = scalarExprToProto("initcap", childExpr) + optExprWithInfo(optExpr, expr, castExpr) case Length(child) => - val childExpr = exprToProtoInternal(Cast(child, StringType), inputs) - scalarExprToProto("length", childExpr) + val castExpr = Cast(child, StringType) + val childExpr = exprToProtoInternal(castExpr, inputs) + val optExpr = scalarExprToProto("length", childExpr) + optExprWithInfo(optExpr, expr, castExpr) case Lower(child) => - val childExpr = exprToProtoInternal(Cast(child, StringType), inputs) - scalarExprToProto("lower", childExpr) + val castExpr = Cast(child, StringType) + val childExpr = exprToProtoInternal(castExpr, inputs) + val optExpr = scalarExprToProto("lower", childExpr) + optExprWithInfo(optExpr, expr, castExpr) case Md5(child) => - val childExpr = exprToProtoInternal(Cast(child, StringType), inputs) - scalarExprToProto("md5", childExpr) + val castExpr = Cast(child, StringType) + val childExpr = exprToProtoInternal(castExpr, inputs) + val optExpr = scalarExprToProto("md5", childExpr) + optExprWithInfo(optExpr, expr, castExpr) case OctetLength(child) => - val childExpr = exprToProtoInternal(Cast(child, StringType), inputs) - scalarExprToProto("octet_length", childExpr) + val castExpr = Cast(child, StringType) + val childExpr = exprToProtoInternal(castExpr, inputs) + val optExpr = scalarExprToProto("octet_length", childExpr) + optExprWithInfo(optExpr, expr, castExpr) case Reverse(child) => - val childExpr = exprToProtoInternal(Cast(child, StringType), inputs) - scalarExprToProto("reverse", childExpr) + val castExpr = Cast(child, StringType) + val childExpr = exprToProtoInternal(castExpr, inputs) + val optExpr = scalarExprToProto("reverse", childExpr) + optExprWithInfo(optExpr, expr, castExpr) case StringInstr(str, substr) => - val leftExpr = exprToProtoInternal(Cast(str, StringType), inputs) - val rightExpr = exprToProtoInternal(Cast(substr, StringType), inputs) - scalarExprToProto("strpos", leftExpr, rightExpr) + val leftCast = Cast(str, StringType) + val rightCast = Cast(substr, StringType) + val leftExpr = exprToProtoInternal(leftCast, inputs) + val rightExpr = exprToProtoInternal(rightCast, inputs) + val optExpr = scalarExprToProto("strpos", leftExpr, rightExpr) + optExprWithInfo(optExpr, expr, leftCast, rightCast) case StringRepeat(str, times) => - val leftExpr = exprToProtoInternal(Cast(str, StringType), inputs) - val rightExpr = exprToProtoInternal(Cast(times, LongType), inputs) - scalarExprToProto("repeat", leftExpr, rightExpr) + val leftCast = Cast(str, StringType) + val rightCast = Cast(times, LongType) + val leftExpr = exprToProtoInternal(leftCast, inputs) + val rightExpr = exprToProtoInternal(rightCast, inputs) + val optExpr = scalarExprToProto("repeat", leftExpr, rightExpr) + optExprWithInfo(optExpr, expr, leftCast, rightCast) case StringReplace(src, search, replace) => - val srcExpr = exprToProtoInternal(Cast(src, StringType), inputs) - val searchExpr = exprToProtoInternal(Cast(search, StringType), inputs) - val replaceExpr = exprToProtoInternal(Cast(replace, StringType), inputs) - scalarExprToProto("replace", srcExpr, searchExpr, replaceExpr) + val srcCast = Cast(src, StringType) + val searchCast = Cast(search, StringType) + val replaceCast = Cast(replace, StringType) + val srcExpr = exprToProtoInternal(srcCast, inputs) + val searchExpr = exprToProtoInternal(searchCast, inputs) + val replaceExpr = exprToProtoInternal(replaceCast, inputs) + val optExpr = scalarExprToProto("replace", srcExpr, searchExpr, replaceExpr) + optExprWithInfo(optExpr, expr, srcCast, searchCast, replaceCast) case StringTranslate(src, matching, replace) => - val srcExpr = exprToProtoInternal(Cast(src, StringType), inputs) - val matchingExpr = exprToProtoInternal(Cast(matching, StringType), inputs) - val replaceExpr = exprToProtoInternal(Cast(replace, StringType), inputs) - scalarExprToProto("translate", srcExpr, matchingExpr, replaceExpr) + val srcCast = Cast(src, StringType) + val matchingCast = Cast(matching, StringType) + val replaceCast = Cast(replace, StringType) + val srcExpr = exprToProtoInternal(srcCast, inputs) + val matchingExpr = exprToProtoInternal(matchingCast, inputs) + val replaceExpr = exprToProtoInternal(replaceCast, inputs) + val optExpr = scalarExprToProto("translate", srcExpr, matchingExpr, replaceExpr) + optExprWithInfo(optExpr, expr, srcCast, matchingCast, replaceCast) case StringTrim(srcStr, trimStr) => - trim(srcStr, trimStr, inputs, "trim") + trim(expr, srcStr, trimStr, inputs, "trim") case StringTrimLeft(srcStr, trimStr) => - trim(srcStr, trimStr, inputs, "ltrim") + trim(expr, srcStr, trimStr, inputs, "ltrim") case StringTrimRight(srcStr, trimStr) => - trim(srcStr, trimStr, inputs, "rtrim") + trim(expr, srcStr, trimStr, inputs, "rtrim") case StringTrimBoth(srcStr, trimStr, _) => - trim(srcStr, trimStr, inputs, "btrim") + trim(expr, srcStr, trimStr, inputs, "btrim") case Upper(child) => - val childExpr = exprToProtoInternal(Cast(child, StringType), inputs) - scalarExprToProto("upper", childExpr) + val castExpr = Cast(child, StringType) + val childExpr = exprToProtoInternal(castExpr, inputs) + val optExpr = scalarExprToProto("upper", childExpr) + optExprWithInfo(optExpr, expr, castExpr) case BitwiseAnd(left, right) => val leftExpr = exprToProtoInternal(left, inputs) @@ -1461,6 +1659,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setBitwiseAnd(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -1477,6 +1676,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setBitwiseNot(builder) .build()) } else { + withInfo(expr, child) None } @@ -1495,6 +1695,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setBitwiseOr(builder) .build()) } else { + withInfo(expr, left, right) None } @@ -1513,18 +1714,20 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setBitwiseXor(builder) .build()) } else { + withInfo(expr, left, right) None } case ShiftRight(left, right) => val leftExpr = exprToProtoInternal(left, inputs) - val rightExpr = if (left.dataType == LongType) { - // DataFusion bitwise shift right expression requires - // same data type between left and right side - exprToProtoInternal(Cast(right, LongType), inputs) + // DataFusion bitwise shift right expression requires + // same data type between left and right side + val rightExpression = if (left.dataType == LongType) { + Cast(right, LongType) } else { - exprToProtoInternal(right, inputs) + right } + val rightExpr = exprToProtoInternal(rightExpression, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.BitwiseShiftRight.newBuilder() @@ -1537,18 +1740,20 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setBitwiseShiftRight(builder) .build()) } else { + withInfo(expr, left, rightExpression) None } case ShiftLeft(left, right) => val leftExpr = exprToProtoInternal(left, inputs) - val rightExpr = if (left.dataType == LongType) { - // DataFusion bitwise shift left expression requires - // same data type between left and right side - exprToProtoInternal(Cast(right, LongType), inputs) + // DataFusion bitwise shift right expression requires + // same data type between left and right side + val rightExpression = if (left.dataType == LongType) { + Cast(right, LongType) } else { - exprToProtoInternal(right, inputs) + right } + val rightExpr = exprToProtoInternal(rightExpression, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.BitwiseShiftLeft.newBuilder() @@ -1561,11 +1766,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setBitwiseShiftLeft(builder) .build()) } else { + withInfo(expr, left, rightExpression) None } case In(value, list) => - in(value, list, inputs, false) + in(expr, value, list, inputs, false) case InSet(value, hset) => val valueDataType = value.dataType @@ -1574,10 +1780,10 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { }.toSeq // Change `InSet` to `In` expression // We do Spark `InSet` optimization in native (DataFusion) side. - in(value, list, inputs, false) + in(expr, value, list, inputs, false) case Not(In(value, list)) => - in(value, list, inputs, true) + in(expr, value, list, inputs, true) case Not(child) => val childExpr = exprToProtoInternal(child, inputs) @@ -1590,6 +1796,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setNot(builder) .build()) } else { + withInfo(expr, child) None } @@ -1604,6 +1811,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setNegative(builder) .build()) } else { + withInfo(expr, child) None } @@ -1612,20 +1820,25 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { val childExpr = scalarExprToProto("coalesce", exprChildren: _*) // TODO: Remove this once we have new DataFusion release which includes // the fix: https://github.com/apache/arrow-datafusion/pull/9459 - castToProto(None, a.dataType, childExpr, "LEGACY") + if (childExpr.isDefined) { + castToProto(None, a.dataType, childExpr, "LEGACY") + } else { + withInfo(expr, a.children: _*) + None + } // With Spark 3.4, CharVarcharCodegenUtils.readSidePadding gets called to pad spaces for // char types. Use rpad to achieve the behavior. // See https://github.com/apache/spark/pull/38151 case StaticInvoke( - clz: Class[_], + _: Class[CharVarcharCodegenUtils], _: StringType, "readSidePadding", arguments, _, true, false, - true) if clz == classOf[CharVarcharCodegenUtils] && arguments.size == 2 => + true) if arguments.size == 2 => val argsExpr = Seq( exprToProtoInternal(Cast(arguments(0), StringType), inputs), exprToProtoInternal(arguments(1), inputs)) @@ -1637,15 +1850,18 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { Some(ExprOuterClass.Expr.newBuilder().setScalarFunc(builder).build()) } else { + withInfo(expr, arguments: _*) None } case KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) => val dataType = serializeDataType(expr.dataType) if (dataType.isEmpty) { + withInfo(expr, s"Unsupported datatype ${expr.dataType}") return None } - exprToProtoInternal(expr, inputs).map { child => + val ex = exprToProtoInternal(expr, inputs) + ex.map { child => val builder = ExprOuterClass.NormalizeNaNAndZero .newBuilder() .setChild(child) @@ -1656,6 +1872,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { case s @ execution.ScalarSubquery(_, _) => val dataType = serializeDataType(s.dataType) if (dataType.isEmpty) { + withInfo(s, s"Scalar subquery returns unsupported datatype ${s.dataType}") return None } @@ -1667,14 +1884,17 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { case UnscaledValue(child) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProtoWithReturnType("unscaled_value", LongType, childExpr) + val optExpr = scalarExprToProtoWithReturnType("unscaled_value", LongType, childExpr) + optExprWithInfo(optExpr, expr, child) case MakeDecimal(child, precision, scale, true) => val childExpr = exprToProtoInternal(child, inputs) - scalarExprToProtoWithReturnType( + val optExpr = scalarExprToProtoWithReturnType( "make_decimal", DecimalType(precision, scale), childExpr) + optExprWithInfo(optExpr, expr, child) + case b @ BinaryExpression(_, _) if isBloomFilterMightContain(b) => val bloomFilter = b.left val value = b.right @@ -1690,30 +1910,37 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setBloomFilterMightContain(builder) .build()) } else { + withInfo(expr, bloomFilter, value) None } - case e => - emitWarning(s"unsupported Spark expression: '$e' of class '${e.getClass.getName}") + case _ => + withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*) None } } def trim( + expr: Expression, // parent expression srcStr: Expression, trimStr: Option[Expression], inputs: Seq[Attribute], trimType: String): Option[Expr] = { - val srcExpr = exprToProtoInternal(Cast(srcStr, StringType), inputs) + val srcCast = Cast(srcStr, StringType) + val srcExpr = exprToProtoInternal(srcCast, inputs) if (trimStr.isDefined) { - val trimExpr = exprToProtoInternal(Cast(trimStr.get, StringType), inputs) - scalarExprToProto(trimType, srcExpr, trimExpr) + val trimCast = Cast(trimStr.get, StringType) + val trimExpr = exprToProtoInternal(trimCast, inputs) + val optExpr = scalarExprToProto(trimType, srcExpr, trimExpr) + optExprWithInfo(optExpr, expr, null, srcCast, trimCast) } else { - scalarExprToProto(trimType, srcExpr) + val optExpr = scalarExprToProto(trimType, srcExpr) + optExprWithInfo(optExpr, expr, null, srcCast) } } def in( + expr: Expression, value: Expression, list: Seq[Expression], inputs: Seq[Attribute], @@ -1731,6 +1958,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setIn(builder) .build()) } else { + val allExprs = list ++ Seq(value) + withInfo(expr, allExprs: _*) None } } @@ -1764,7 +1993,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { args: Option[Expr]*): Option[Expr] = { args.foreach { case Some(a) => builder.addArgs(a) - case _ => return None + case _ => + return None } Some(ExprOuterClass.Expr.newBuilder().setScalarFunc(builder).build()) } @@ -1808,6 +2038,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .addAllProjectList(exprs.map(_.get).asJava) Some(result.setProjection(projectBuilder).build()) } else { + withInfo(op, projectList: _*) None } @@ -1818,6 +2049,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { val filterBuilder = OperatorOuterClass.Filter.newBuilder().setPredicate(cond.get) Some(result.setFilter(filterBuilder).build()) } else { + withInfo(op, condition, child) None } @@ -1830,6 +2062,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .addAllSortOrders(sortOrders.map(_.get).asJava) Some(result.setSort(sortBuilder).build()) } else { + withInfo(op, sortOrder: _*) None } @@ -1843,6 +2076,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setOffset(0) Some(result.setLimit(limitBuilder).build()) } else { + withInfo(op, "No child operator") None } @@ -1860,11 +2094,16 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { Some(result.setLimit(limitBuilder).build()) } else { + withInfo(op, "No child operator") None } case ExpandExec(projections, _, child) if isCometOperatorEnabled(op.conf, "expand") => - val projExprs = projections.flatMap(_.map(exprToProto(_, child.output))) + var allProjExprs: Seq[Expression] = Seq() + val projExprs = projections.flatMap(_.map(e => { + allProjExprs = allProjExprs :+ e + exprToProto(e, child.output) + })) if (projExprs.forall(_.isDefined) && childOp.nonEmpty) { val expandBuilder = OperatorOuterClass.Expand @@ -1873,6 +2112,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .setNumExprPerProject(projections.head.size) Some(result.setExpand(expandBuilder).build()) } else { + withInfo(op, allProjExprs: _*) None } @@ -1887,11 +2127,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { resultExpressions, child) if isCometOperatorEnabled(op.conf, "aggregate") => if (groupingExpressions.isEmpty && aggregateExpressions.isEmpty) { + withInfo(op, "No group by or aggregation") return None } // Aggregate expressions with filter are not supported yet. if (aggregateExpressions.exists(_.filter.isDefined)) { + withInfo(op, "Aggregate expression with filter is not supported") return None } @@ -1917,7 +2159,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { val attributes = groupingExpressions.map(_.toAttribute) ++ aggregateAttributes val resultExprs = resultExpressions.map(exprToProto(_, attributes)) if (resultExprs.exists(_.isEmpty)) { - emitWarning(s"Unsupported result expressions found in: ${resultExpressions}") + val msg = s"Unsupported result expressions found in: ${resultExpressions}" + emitWarning(msg) + withInfo(op, msg, resultExpressions: _*) return None } hashAggBuilder.addAllResultExprs(resultExprs.map(_.get).asJava) @@ -1928,13 +2172,16 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { if (modes.size != 1) { // This shouldn't happen as all aggregation expressions should share the same mode. // Fallback to Spark nevertheless here. + withInfo(op, "All aggregate expressions do not have the same mode") return None } val mode = modes.head match { case Partial => CometAggregateMode.Partial case Final => CometAggregateMode.Final - case _ => return None + case _ => + withInfo(op, s"Unsupported aggregation mode ${modes.head}") + return None } // In final mode, the aggregate expressions are bound to the output of the @@ -1945,7 +2192,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { // `output` is only used when `binding` is true (i.e., non-Final) val output = child.output - val aggExprs = aggregateExpressions.map(aggExprToProto(_, output, binding)) + val aggExprs = + aggregateExpressions.map(aggExprToProto(_, output, binding)) if (childOp.nonEmpty && groupingExprs.forall(_.isDefined) && aggExprs.forall(_.isDefined)) { val hashAggBuilder = OperatorOuterClass.HashAggregate.newBuilder() @@ -1955,7 +2203,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { val attributes = groupingExpressions.map(_.toAttribute) ++ aggregateAttributes val resultExprs = resultExpressions.map(exprToProto(_, attributes)) if (resultExprs.exists(_.isEmpty)) { - emitWarning(s"Unsupported result expressions found in: ${resultExpressions}") + val msg = s"Unsupported result expressions found in: ${resultExpressions}" + emitWarning(msg) + withInfo(op, msg, resultExpressions: _*) return None } hashAggBuilder.addAllResultExprs(resultExprs.map(_.get).asJava) @@ -1963,6 +2213,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { hashAggBuilder.setModeValue(mode.getNumber) Some(result.setHashAgg(hashAggBuilder).build()) } else { + val allChildren: Seq[Expression] = + groupingExpressions ++ aggregateExpressions ++ aggregateAttributes + withInfo(op, allChildren: _*) None } } @@ -1974,18 +2227,21 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { join.isInstanceOf[ShuffledHashJoinExec]) && !(isCometOperatorEnabled(op.conf, "broadcast_hash_join") && join.isInstanceOf[BroadcastHashJoinExec])) { + withInfo(join, s"Invalid hash join type ${join.nodeName}") return None } if (join.buildSide == BuildRight) { // DataFusion HashJoin assumes build side is always left. // TODO: support BuildRight + withInfo(join, "BuildRight is not supported") return None } val condition = join.condition.map { cond => val condProto = exprToProto(cond, join.left.output ++ join.right.output) if (condProto.isEmpty) { + withInfo(join, cond) return None } condProto.get @@ -1998,7 +2254,10 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { case FullOuter => JoinType.FullOuter case LeftSemi => JoinType.LeftSemi case LeftAnti => JoinType.LeftAnti - case _ => return None // Spark doesn't support other join types + case _ => + // Spark doesn't support other join types + withInfo(join, s"Unsupported join type ${join.joinType}") + return None } val leftKeys = join.leftKeys.map(exprToProto(_, join.left.output)) @@ -2015,6 +2274,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { condition.foreach(joinBuilder.setCondition) Some(result.setHashJoin(joinBuilder).build()) } else { + val allExprs: Seq[Expression] = join.leftKeys ++ join.rightKeys + withInfo(join, allExprs: _*) None } @@ -2040,6 +2301,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { // TODO: Support SortMergeJoin with join condition after new DataFusion release if (join.condition.isDefined) { + withInfo(op, "Sort merge join with a join condition is not supported") return None } @@ -2050,7 +2312,10 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { case FullOuter => JoinType.FullOuter case LeftSemi => JoinType.LeftSemi case LeftAnti => JoinType.LeftAnti - case _ => return None // Spark doesn't support other join types + case _ => + // Spark doesn't support other join types + withInfo(op, s"Unsupported join type ${join.joinType}") + return None } val leftKeys = join.leftKeys.map(exprToProto(_, join.left.output)) @@ -2071,9 +2336,15 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { .addAllRightJoinKeys(rightKeys.map(_.get).asJava) Some(result.setSortMergeJoin(joinBuilder).build()) } else { + val allExprs: Seq[Expression] = join.leftKeys ++ join.rightKeys + withInfo(join, allExprs: _*) None } + case join: SortMergeJoinExec if !isCometOperatorEnabled(op.conf, "sort_merge_join") => + withInfo(join, "SortMergeJoin is not enabled") + None + case op if isCometSink(op) => // These operators are source of Comet native execution chain val scanBuilder = OperatorOuterClass.Scan.newBuilder() @@ -2091,8 +2362,10 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { Some(result.setScan(scanBuilder).build()) } else { // There are unsupported scan type - emitWarning( - s"unsupported Comet operator: ${op.nodeName}, due to unsupported data types above") + val msg = + s"unsupported Comet operator: ${op.nodeName}, due to unsupported data types above" + emitWarning(msg) + withInfo(op, msg) None } @@ -2101,7 +2374,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { // 1. it is not Spark shuffle operator, which is handled separately // 2. it is not a Comet operator if (!op.nodeName.contains("Comet") && !op.isInstanceOf[ShuffleExchangeExec]) { - emitWarning(s"unsupported Spark operator: ${op.nodeName}") + val msg = s"unsupported Spark operator: ${op.nodeName}" + emitWarning(msg) + withInfo(op, msg) } None } @@ -2145,7 +2420,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { * Check if the datatypes of shuffle input are supported. This is used for Columnar shuffle * which supports struct/array. */ - def supportPartitioningTypes(inputs: Seq[Attribute]): Boolean = { + def supportPartitioningTypes(inputs: Seq[Attribute]): (Boolean, String) = { def supportedDataType(dt: DataType): Boolean = dt match { case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: DecimalType | @@ -2170,17 +2445,21 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { } // Check if the datatypes of shuffle input are supported. + var msg = "" val supported = inputs.forall(attr => supportedDataType(attr.dataType)) if (!supported) { - emitWarning(s"unsupported Spark partitioning: ${inputs.map(_.dataType)}") + msg = s"unsupported Spark partitioning: ${inputs.map(_.dataType)}" + emitWarning(msg) } - supported + (supported, msg) } /** * Whether the given Spark partitioning is supported by Comet. */ - def supportPartitioning(inputs: Seq[Attribute], partitioning: Partitioning): Boolean = { + def supportPartitioning( + inputs: Seq[Attribute], + partitioning: Partitioning): (Boolean, String) = { def supportedDataType(dt: DataType): Boolean = dt match { case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: DecimalType | @@ -2195,17 +2474,33 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { val supported = inputs.forall(attr => supportedDataType(attr.dataType)) if (!supported) { - emitWarning(s"unsupported Spark partitioning: ${inputs.map(_.dataType)}") - false + val msg = s"unsupported Spark partitioning: ${inputs.map(_.dataType)}" + emitWarning(msg) + (false, msg) } else { partitioning match { case HashPartitioning(expressions, _) => - expressions.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined) - case SinglePartition => true + (expressions.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined), null) + case SinglePartition => (true, null) case other => - emitWarning(s"unsupported Spark partitioning: ${other.getClass.getName}") - false + val msg = s"unsupported Spark partitioning: ${other.getClass.getName}" + emitWarning(msg) + (false, msg) } } } + + // Utility method. Adds explain info if the result of calling exprToProto is None + private def optExprWithInfo( + optExpr: Option[Expr], + expr: Expression, + childExpr: Expression*): Option[Expr] = { + optExpr match { + case None => + withInfo(expr, childExpr: _*) + None + case o => o + } + + } } diff --git a/spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala index 85c6413e13..ffec1bd402 100644 --- a/spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala @@ -20,7 +20,7 @@ package org.apache.comet.shims import org.apache.spark.sql.connector.expressions.aggregate.Aggregation -import org.apache.spark.sql.execution.{LimitExec, SparkPlan} +import org.apache.spark.sql.execution.{LimitExec, QueryExecution, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan trait ShimCometSparkSessionExtensions { @@ -49,4 +49,19 @@ object ShimCometSparkSessionExtensions { .filter(_.isInstanceOf[Int]) .map(_.asInstanceOf[Int]) .headOption + + // Extended info is available only since Spark 4.0.0 + // (https://issues.apache.org/jira/browse/SPARK-47289) + def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = { + try { + // Look for QueryExecution.extendedExplainInfo(scala.Function1[String, Unit], SparkPlan) + qe.getClass.getDeclaredMethod( + "extendedExplainInfo", + classOf[String => Unit], + classOf[SparkPlan]) + } catch { + case _: NoSuchMethodException | _: SecurityException => return false + } + true + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/ExtendedExplainGenerator.scala b/spark/src/main/scala/org/apache/spark/sql/ExtendedExplainGenerator.scala new file mode 100644 index 0000000000..3c5ae95e29 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/ExtendedExplainGenerator.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.execution.SparkPlan + +/** + * A trait for a session extension to implement that provides addition explain plan information. + * We copy this from Spark 4.0 since this trait is not available in Spark 3.x. We can remove this + * after dropping Spark 3.x support. + */ + +trait ExtendedExplainGenerator { + def title: String + + def generateExtendedInfo(plan: SparkPlan): String +} diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index bbe7edd3cb..ce9a7cd950 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1357,4 +1357,55 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { testCastedColumn(inputValues = Seq("car", "Truck")) } + test("explain comet") { + assume(isSpark34Plus) + withSQLConf( + SQLConf.ANSI_ENABLED.key -> "false", + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true", + CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", + "spark.sql.extendedExplainProvider" -> "org.apache.comet.ExtendedExplainInfo") { + val table = "test" + withTable(table) { + sql(s"create table $table(c0 int, c1 int , c2 float) using parquet") + sql(s"insert into $table values(0, 1, 100.000001)") + + Seq( + ( + s"SELECT cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as C from $table", + "make_interval is not supported"), + ( + "SELECT " + + "date_part('YEAR', make_interval(c0, c1, c0, c1, c0, c0, c2))" + + " + " + + "date_part('MONTH', make_interval(c0, c1, c0, c1, c0, c0, c2))" + + s" as yrs_and_mths from $table", + "extractintervalyears is not supported\n" + + "extractintervalmonths is not supported"), + ( + s"SELECT sum(c0), sum(c2) from $table group by c1", + "Native shuffle is not enabled\n" + + "AQEShuffleRead is not supported"), + ( + "SELECT A.c1, A.sum_c0, A.sum_c2, B.casted from " + + s"(SELECT c1, sum(c0) as sum_c0, sum(c2) as sum_c2 from $table group by c1) as A, " + + s"(SELECT c1, cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as casted from $table) as B " + + "where A.c1 = B.c1 ", + "Native shuffle is not enabled\n" + + "AQEShuffleRead is not supported\n" + + "make_interval is not supported\n" + + "BroadcastExchange is not supported\n" + + "BroadcastHashJoin disabled because not all child plans are native")) + .foreach(test => { + val qry = test._1 + val expected = test._2 + val df = sql(qry) + df.collect() // force an execution + checkSparkAnswerAndCompareExplainPlan(df, expected) + }) + } + } + } + } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala index 8f83ac04b3..ad989f58cc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryBase.scala @@ -46,6 +46,9 @@ trait CometTPCQueryBase extends Logging { .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString) .set("spark.sql.crossJoin.enabled", "true") .setIfMissing("parquet.enable.dictionary", "true") + .set( + "spark.shuffle.manager", + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") val sparkSession = SparkSession.builder .config(conf) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryListBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryListBase.scala index ac3c047023..6a58320606 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryListBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryListBase.scala @@ -31,12 +31,14 @@ import org.apache.spark.sql.comet.CometExec import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.comet.CometConf +import org.apache.comet.{CometConf, ExtendedExplainInfo} +import org.apache.comet.shims.ShimCometSparkSessionExtensions trait CometTPCQueryListBase extends CometTPCQueryBase with AdaptiveSparkPlanHelper - with SQLHelper { + with SQLHelper + with ShimCometSparkSessionExtensions { var output: Option[OutputStream] = None def main(args: Array[String]): Unit = { @@ -84,11 +86,16 @@ trait CometTPCQueryListBase withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", + // Lower bloom filter thresholds to allows us to simulate the plan produced at larger scale. + "spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold" -> "1MB", + "spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold" -> "1MB") { val df = cometSpark.sql(queryString) val cometPlans = mutable.HashSet.empty[String] - stripAQEPlan(df.queryExecution.executedPlan).foreach { + val executedPlan = df.queryExecution.executedPlan + stripAQEPlan(executedPlan).foreach { case op: CometExec => cometPlans += s"${op.nodeName}" case _ => @@ -100,6 +107,9 @@ trait CometTPCQueryListBase } else { out.println(s"Query: $name$nameSuffix. Comet Exec: Disabled") } + out.println( + s"Query: $name$nameSuffix: ExplainInfo:\n" + + s"${new ExtendedExplainInfo().generateExtendedInfo(executedPlan)}\n") } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 85e58824c9..ef64d666ea 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -26,6 +26,7 @@ import scala.util.Try import org.scalatest.BeforeAndAfterEach +import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.parquet.column.ParquetProperties import org.apache.parquet.example.data.Group @@ -37,7 +38,7 @@ import org.apache.spark._ import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER} import org.apache.spark.sql.comet.{CometBatchScanExec, CometBroadcastExchangeExec, CometExec, CometRowToColumnarExec, CometScanExec, CometScanWrapper, CometSinkPlaceHolder} import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec} -import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{ColumnarToRowExec, ExtendedMode, InputAdapter, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal._ import org.apache.spark.sql.test._ @@ -46,6 +47,8 @@ import org.apache.spark.sql.types.StructType import org.apache.comet._ import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus +import org.apache.comet.shims.ShimCometSparkSessionExtensions +import org.apache.comet.shims.ShimCometSparkSessionExtensions.supportsExtendedExplainInfo /** * Base class for testing. This exists in `org.apache.spark.sql` since [[SQLTestUtils]] is @@ -55,7 +58,8 @@ abstract class CometTestBase extends QueryTest with SQLTestUtils with BeforeAndAfterEach - with AdaptiveSparkPlanHelper { + with AdaptiveSparkPlanHelper + with ShimCometSparkSessionExtensions { import testImplicits._ protected val shuffleManager: String = @@ -227,6 +231,30 @@ abstract class CometTestBase (expected.get.getCause, actual.getCause) } + protected def checkSparkAnswerAndCompareExplainPlan( + df: DataFrame, + expectedInfo: String): Unit = { + var expected: Array[Row] = Array.empty + var dfSpark: Dataset[Row] = null + withSQLConf( + CometConf.COMET_ENABLED.key -> "false", + "spark.sql.extendedExplainProvider" -> "") { + dfSpark = Dataset.ofRows(spark, df.logicalPlan) + expected = dfSpark.collect() + } + val dfComet = Dataset.ofRows(spark, df.logicalPlan) + checkAnswer(dfComet, expected) + val diff = StringUtils.difference( + dfSpark.queryExecution.explainString(ExtendedMode), + dfComet.queryExecution.explainString(ExtendedMode)) + if (supportsExtendedExplainInfo(dfSpark.queryExecution)) { + assert(diff.contains(expectedInfo)) + } + val extendedInfo = + new ExtendedExplainInfo().generateExtendedInfo(dfComet.queryExecution.executedPlan) + assert(extendedInfo.equalsIgnoreCase(expectedInfo)) + } + private var _spark: SparkSession = _ protected implicit def spark: SparkSession = _spark protected implicit def sqlContext: SQLContext = _spark.sqlContext