From bb73e4b3ddff706b08597b89d32b6748f36a2650 Mon Sep 17 00:00:00 2001 From: Kunshang Ji Date: Fri, 14 May 2021 14:31:47 +0800 Subject: [PATCH 1/5] fix when aggPushdown set to false, ape still pushdown agg issue. --- .../sql/execution/datasources/FileSourceStrategy.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index d87c892ad..6564bd09a 100644 --- a/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, import org.apache.spark.sql.catalyst.planning.{AggScanOperation, ScanOperation} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} -import org.apache.spark.sql.internal.SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED +import org.apache.spark.sql.internal.SQLConf.{APE_AGGREGATION_PUSHDOWN_ENABLED, PARQUET_FILTER_PUSHDOWN_ENABLED} import org.apache.spark.util.collection.BitSet /** @@ -146,9 +146,11 @@ object FileSourceStrategy extends Strategy with Logging { // match Aggregate node when apply FileSourceStrategy, will return a null node, // but will keep agg info in fsRelation. Ideally, following filter/ projection // will apply FileSourceStrategy very soon. - fsRelation.groupExpr = Some(groupingExpr) - fsRelation.resultExpr = Some(aggExpr - .map(expr => expr.asInstanceOf[AggregateExpression])) + if (SparkSession.getActiveSession.get.conf.get(APE_AGGREGATION_PUSHDOWN_ENABLED, false)) { + fsRelation.groupExpr = Some(groupingExpr) + fsRelation.resultExpr = Some(aggExpr + .map(expr => expr.asInstanceOf[AggregateExpression])) + } Seq() case ScanOperation(projects, filters, From 64a3bd2368c8b8a7546c981410263e294a33b7a8 Mon Sep 17 00:00:00 2001 From: Kunshang Ji Date: Mon, 24 May 2021 13:06:19 +0800 Subject: [PATCH 2/5] fix q54 --- .../datasources/util/AggregateConvertor.java | 5 +- .../sql/execution/aggregate/AggUtils.scala | 46 ++++++++++++++----- .../datasources/DataSourceStrategy.scala | 1 + .../datasources/FileSourceStrategy.scala | 23 +++++++--- 4 files changed, 56 insertions(+), 19 deletions(-) diff --git a/oap-ape/ape-java/ape-spark/src/main/java/org/apache/spark/sql/execution/datasources/util/AggregateConvertor.java b/oap-ape/ape-java/ape-spark/src/main/java/org/apache/spark/sql/execution/datasources/util/AggregateConvertor.java index 66780257b..de31ecd5b 100644 --- a/oap-ape/ape-java/ape-spark/src/main/java/org/apache/spark/sql/execution/datasources/util/AggregateConvertor.java +++ b/oap-ape/ape-java/ape-spark/src/main/java/org/apache/spark/sql/execution/datasources/util/AggregateConvertor.java @@ -128,10 +128,13 @@ private static JsonNode constructTree(Expression expr, JsonNode rootNode) { ((ObjectNode) tmpNode).put("dataType", tmpExpr.dataType().toString()); ((ObjectNode) tmpNode).put("value", tmpExpr.value().toString()); return tmpNode; + } else if (expr instanceof UnscaledValue) { + // TODO: cast to Long? + return constructTree(exprs.get(0), tmpNode); } else { //TODO: will include other type? - throw new UnsupportedOperationException("should not reach here."); + throw new UnsupportedOperationException("should not reach here. Expr: " + expr.toString()); } diff --git a/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index 89dd1f2cd..479cfe3b5 100644 --- a/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala +++ b/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project} import org.apache.spark.sql.execution.{PlanLater, SparkPlan} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.streaming.{StateStoreRestoreExec, StateStoreSaveExec} /** @@ -89,6 +89,14 @@ object AggUtils { } } + def splitConjunctivePredicates(condition: Expression): Seq[Expression] = { + condition match { + case And(cond1, cond2) => + splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2) + case other => other :: Nil + } + } + def planAggregateWithoutDistinct( groupingExpressions: Seq[NamedExpression], aggregateExpressions: Seq[AggregateExpression], @@ -142,17 +150,31 @@ object AggUtils { } case filter: Filter => filter.child match { - case LogicalRelation (fsRelation: HadoopFsRelation, _, _, _) => - if (!fsRelation.resultExpr.isEmpty) { - val agg = createAggregate( - requiredChildDistributionExpressions = Some(groupingAttributes), - groupingExpressions = groupingAttributes, - aggregateExpressions = finalAggregateExpressions, - aggregateAttributes = finalAggregateAttributes, - initialInputBufferOffset = groupingExpressions.length, - resultExpressions = resultExpressions, - child = child) - return agg :: Nil + case l@LogicalRelation (fsRelation: HadoopFsRelation, _, _, _) => + if (!fsRelation.resultExpr.isEmpty && fsRelation.resultExpr.get.size != 0) { + // judge whether all filter could pushdown here, if not, we will NOT + // do agg push down(don't ignore partial agg) + val filterExpressions = splitConjunctivePredicates(filter.condition) + val normalizedFilter = DataSourceStrategy.normalizeExprs( + filterExpressions.filter((_.deterministic)), l.output) + val pushedFilters = + normalizedFilter.flatMap(DataSourceStrategy.translateFilter(_, true)) + + if(pushedFilters.size == normalizedFilter.size) { + // scalastyle:off println + System.err.println("ignore partial agg here!") + System.err.println("resultExpr: " + fsRelation.resultExpr.mkString) + // scalastyle:on println + val agg = createAggregate( + requiredChildDistributionExpressions = Some(groupingAttributes), + groupingExpressions = groupingAttributes, + aggregateExpressions = finalAggregateExpressions, + aggregateAttributes = finalAggregateAttributes, + initialInputBufferOffset = groupingExpressions.length, + resultExpressions = resultExpressions, + child = child) + return agg :: Nil + } } case _ => // do nothing, fix queries like tpc-ds q45 } diff --git a/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 12de9e9e5..6c1a37a54 100644 --- a/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -546,6 +546,7 @@ object DataSourceStrategy { */ protected[sql] def translateAggregate(groupingExpressions: Seq[Expression], aggregateExpressions: Seq[AggregateExpression]): String = { + if (groupingExpressions.size > 0 && aggregateExpressions.size == 0) return "" AggregateConvertor.toJsonString( scala.collection.JavaConverters.seqAsJavaList(groupingExpressions), scala.collection.JavaConverters.seqAsJavaList(aggregateExpressions)) diff --git a/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 6564bd09a..6b734cf94 100644 --- a/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -147,9 +147,13 @@ object FileSourceStrategy extends Strategy with Logging { // but will keep agg info in fsRelation. Ideally, following filter/ projection // will apply FileSourceStrategy very soon. if (SparkSession.getActiveSession.get.conf.get(APE_AGGREGATION_PUSHDOWN_ENABLED, false)) { - fsRelation.groupExpr = Some(groupingExpr) - fsRelation.resultExpr = Some(aggExpr - .map(expr => expr.asInstanceOf[AggregateExpression])) + var withoutDistict = true + aggExpr.map(expr => if (expr.isDistinct) withoutDistict = false) + + if (withoutDistict) { + fsRelation.groupExpr = Some(groupingExpr) + fsRelation.resultExpr = Some(aggExpr) + } } Seq() @@ -241,14 +245,21 @@ object FileSourceStrategy extends Strategy with Logging { val partialResultExpressions = groupingAttributes ++ partialAggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes) - val aggregateExpression = DataSourceStrategy.translateAggregate(gExpression, aggExpressions) + val aggregateExpression = { + if (afterScanFilters.isEmpty) DataSourceStrategy.translateAggregate(gExpression, aggExpressions) + else "" + } + logInfo("agg pd info: " + gExpression.mkString + + " " + aggExpressions.mkString) // set null to avoid influence later node/plan. fsRelation.groupExpr = None fsRelation.resultExpr = None + val onlyGroup = (aggExpressions.size == 0) && (gExpression.size > 0) val outAttributes: Seq[Attribute] = - if (!partialResultExpressions.isEmpty) partialResultExpressions - else outputAttributes + if (!onlyGroup && !partialResultExpressions.isEmpty && afterScanFilters.isEmpty) { + partialResultExpressions + } else outputAttributes val schema = outAttributes.toStructType logInfo(s"Output Data Schema after agg pd: ${schema.simpleString}") From 03e7e84770bd16f3dba9407233eaac2eb1f66a11 Mon Sep 17 00:00:00 2001 From: Kunshang Ji Date: Tue, 25 May 2021 13:13:29 +0800 Subject: [PATCH 3/5] fix issues: 1) SparkOptimizer will add a UnscaledValue node for some aggregate case(sum/average), it may return a double type rather than long/decimal, so add a dump work around. 2) fix count node null issue --- oap-ape/ape-native/src/reader.cc | 12 +++++++----- oap-ape/ape-native/src/utils/AggExpression.cc | 9 +++++---- oap-ape/ape-native/src/utils/ApeDecimal.h | 1 + oap-ape/ape-native/src/utils/DumpUtils.h | 5 ++++- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/oap-ape/ape-native/src/reader.cc b/oap-ape/ape-native/src/reader.cc index 90fb9afe5..8224a1dc5 100644 --- a/oap-ape/ape-native/src/reader.cc +++ b/oap-ape/ape-native/src/reader.cc @@ -27,7 +27,9 @@ namespace ape { -Reader::Reader() {} +Reader::Reader() { + arrow::util::ArrowLog::StartArrowLog("", arrow::util::ArrowLogLevel::ARROW_DEBUG); +} void Reader::init(std::string fileName, std::string hdfsHost, int hdfsPort, std::string requiredSchema, int firstRowGroup, int rowGroupToRead) { @@ -193,7 +195,10 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr); ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter; - rowsRet = doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr); + int tmp = doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr); + // if the last batch are empty after filter, it will return 0 regard less of the + // group num + if (tmp != 0) rowsRet = tmp; } if (aggExprs.size()) { @@ -370,9 +375,6 @@ int Reader::doAggregation(int batchSize, ApeHashMap& map, std::vector& keys std::dynamic_pointer_cast(agg)->getResult( results[i], keys.size(), indexes); } else { - if (results[i].nullVector->size() == 0) { - results[i].nullVector->resize(1); - } std::dynamic_pointer_cast(agg)->getResult(results[i]); } } else { diff --git a/oap-ape/ape-native/src/utils/AggExpression.cc b/oap-ape/ape-native/src/utils/AggExpression.cc index f55ad4195..dfd558453 100644 --- a/oap-ape/ape-native/src/utils/AggExpression.cc +++ b/oap-ape/ape-native/src/utils/AggExpression.cc @@ -71,9 +71,10 @@ int AggExpression::ExecuteWithParam(int batchSize, void Count::getResultInternal(DecimalVector& result) { result.type = ResultType::LongType; - if (result.nullVector->size() == 0) { - result.nullVector->resize(1); - result.nullVector->at(0) = 1; + result.nullVector->resize(1); + result.nullVector->at(0) = 1; + if (result.data.size() == 0) { + result.data.push_back(arrow::BasicDecimal128(0)); } if (typeid(*child) == typeid(LiteralExpression)) { // for count(*) or count(1) result.data[0] += arrow::BasicDecimal128(batchSize_); @@ -84,7 +85,7 @@ void Count::getResultInternal(DecimalVector& result) { auto tmp = DecimalVector(); child->getResult(tmp); for (int i = 0; i < tmp.data.size(); i++) { - if (tmp.nullVector->at(i)) result.data[0] += 1; + if (tmp.nullVector->at(i)) result.data[0] += arrow::BasicDecimal128(1); } } } diff --git a/oap-ape/ape-native/src/utils/ApeDecimal.h b/oap-ape/ape-native/src/utils/ApeDecimal.h index e8b67a21f..67945ff58 100644 --- a/oap-ape/ape-native/src/utils/ApeDecimal.h +++ b/oap-ape/ape-native/src/utils/ApeDecimal.h @@ -19,6 +19,7 @@ #include #include +#include #include namespace ape { diff --git a/oap-ape/ape-native/src/utils/DumpUtils.h b/oap-ape/ape-native/src/utils/DumpUtils.h index be3a1df2d..dc3da03b8 100644 --- a/oap-ape/ape-native/src/utils/DumpUtils.h +++ b/oap-ape/ape-native/src/utils/DumpUtils.h @@ -89,7 +89,10 @@ class DumpUtils { break; } case ResultType::DoubleType: { - // TODO: convert + // TODO: this is just for UnscaledValue case, if the data type is Double, this + // will not work + arrow::Decimal128 tmp(result.data[i]); + *((double*)bufferAddr + i) = tmp.ToDouble(0); break; } case ResultType::Decimal64Type: { From 8ca9343408c2740cebf9bc5735bb60f24ac06631 Mon Sep 17 00:00:00 2001 From: Kunshang Ji Date: Tue, 25 May 2021 13:16:28 +0800 Subject: [PATCH 4/5] remove logs --- .../org/apache/spark/sql/execution/aggregate/AggUtils.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index 479cfe3b5..83ebd6bc1 100644 --- a/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala +++ b/oap-ape/ape-java/ape-spark/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -161,10 +161,6 @@ object AggUtils { normalizedFilter.flatMap(DataSourceStrategy.translateFilter(_, true)) if(pushedFilters.size == normalizedFilter.size) { - // scalastyle:off println - System.err.println("ignore partial agg here!") - System.err.println("resultExpr: " + fsRelation.resultExpr.mkString) - // scalastyle:on println val agg = createAggregate( requiredChildDistributionExpressions = Some(groupingAttributes), groupingExpressions = groupingAttributes, From 0127f756784d82350c785bd4799420e0ed8249bb Mon Sep 17 00:00:00 2001 From: Kunshang Ji Date: Tue, 25 May 2021 13:39:27 +0800 Subject: [PATCH 5/5] remove log --- oap-ape/ape-native/src/reader.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/oap-ape/ape-native/src/reader.cc b/oap-ape/ape-native/src/reader.cc index 8224a1dc5..32a31e357 100644 --- a/oap-ape/ape-native/src/reader.cc +++ b/oap-ape/ape-native/src/reader.cc @@ -27,9 +27,7 @@ namespace ape { -Reader::Reader() { - arrow::util::ArrowLog::StartArrowLog("", arrow::util::ArrowLogLevel::ARROW_DEBUG); -} +Reader::Reader() {} void Reader::init(std::string fileName, std::string hdfsHost, int hdfsPort, std::string requiredSchema, int firstRowGroup, int rowGroupToRead) {