From 57fce4da6c8dc2cb8d215ba15fa0375ee59c2722 Mon Sep 17 00:00:00 2001 From: Ke Date: Thu, 5 Dec 2024 10:46:26 -0800 Subject: [PATCH] Add single node execution To improve latency of tiny queries running on a large cluster, we introduce single worker execution mode: query will only use one node to execute and plan would be optimized accordingly. --- .../TestDistributedQueriesSingleNode.java | 59 +++++++ .../presto/SystemSessionProperties.java | 11 ++ .../scheduler/SectionExecutionFactory.java | 54 ++++++- .../presto/sql/analyzer/FeaturesConfig.java | 14 ++ .../sql/planner/BasePlanFragmenter.java | 9 ++ .../sql/planner/PlanFragmenterUtils.java | 5 +- .../presto/sql/planner/PlanOptimizers.java | 2 + .../planner/optimizations/AddExchanges.java | 16 +- .../AddExchangesForSingleNodeExecution.java | 150 ++++++++++++++++++ .../MergeJoinForSortedInputOptimizer.java | 3 +- ...ValidateAggregationsWithDefaultValues.java | 3 +- .../sql/analyzer/TestFeaturesConfig.java | 7 +- .../PrestoNativeQueryRunnerUtils.java | 25 ++- ...stPrestoNativeSystemQueriesSingleNode.java | 43 +++++ 14 files changed, 384 insertions(+), 17 deletions(-) create mode 100644 presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java create mode 100644 presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangesForSingleNodeExecution.java create mode 100644 presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleNode.java diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java new file mode 100644 index 0000000000000..bfaec5f22852d --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestDistributedQueries; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static io.airlift.tpch.TpchTable.getTables; + +@Test(singleThreaded = true) +public class TestDistributedQueriesSingleNode + extends AbstractTestDistributedQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + ImmutableMap.Builder coordinatorProperties = ImmutableMap.builder(); + coordinatorProperties.put("single-node-execution-enabled", "true"); + return HiveQueryRunner.createQueryRunner( + getTables(), + ImmutableMap.of(), + coordinatorProperties.build(), + Optional.empty()); + } + + @Override + protected boolean supportsNotNullColumns() + { + return false; + } + + @Override + public void testDelete() + { + // Hive connector currently does not support row-by-row delete + } + + @Override + public void testUpdate() + { + // Updates are not supported by the connector + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index f66aeaf89a2ee..058ce20281284 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -324,6 +324,7 @@ public final class SystemSessionProperties public static final String OPTIMIZER_USE_HISTOGRAMS = "optimizer_use_histograms"; public static final String WARN_ON_COMMON_NAN_PATTERNS = "warn_on_common_nan_patterns"; public static final String INLINE_PROJECTIONS_ON_VALUES = "inline_projections_on_values"; + public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled"; // TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future. public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all"; @@ -1824,6 +1825,11 @@ public SystemSessionProperties( NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, "Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only", queryManagerConfig.getMinColumnarEncodingChannelsToPreferRowWiseEncoding(), + false), + booleanProperty( + SINGLE_NODE_EXECUTION_ENABLED, + "Enable single node execution", + featuresConfig.isSingleNodeExecutionEnabled(), false)); } @@ -2271,6 +2277,11 @@ public static boolean isNativeExecutionEnabled(Session session) return session.getSystemProperty(NATIVE_EXECUTION_ENABLED, Boolean.class); } + public static boolean isSingleNodeExecutionEnabled(Session session) + { + return session.getSystemProperty(SINGLE_NODE_EXECUTION_ENABLED, Boolean.class); + } + public static boolean isPushAggregationThroughJoin(Session session) { return session.getSystemProperty(PUSH_PARTIAL_AGGREGATION_THROUGH_JOIN, Boolean.class); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java index 3f885239d5d6d..9eee31487c64f 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java @@ -79,6 +79,7 @@ import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE; import static com.facebook.presto.util.Failures.checkCondition; @@ -326,7 +327,58 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) { return scheduler; } else { - if (!splitSources.isEmpty()) { + if (!splitSources.isEmpty() && (plan.getFragment().getPartitioning().equals(SINGLE_DISTRIBUTION))) { + NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, null, nodePredicate); + List nodes = nodeSelector.selectRandomNodes(1); + BucketNodeMap bucketNodeMap = new BucketNodeMap((split) -> 0) { + @Override + public int getBucketCount() + { + return 1; + } + @Override + public Optional getAssignedNode(int bucketedId) + { + return Optional.of(nodes.get(0)); + } + @Override + public boolean isBucketCacheable(int bucketedId) + { + return false; + } + @Override + public void assignOrUpdateBucketToNode(int bucketedId, InternalNode node, boolean cacheable) + { + } + @Override + public boolean isDynamic() + { + return true; + } + @Override + public boolean hasInitialMap() + { + return false; + } + @Override + public Optional> getBucketToNode() + { + return Optional.of(nodes); + } + }; + return new FixedSourcePartitionedScheduler( + stageExecution, + splitSources, + plan.getFragment().getStageExecutionDescriptor(), + plan.getFragment().getTableScanSchedulingOrder(), + nodes, + bucketNodeMap, + splitBatchSize, + getConcurrentLifespansPerNode(session), + nodeSelector, + ImmutableList.of(NOT_PARTITIONED)); + } + else if (!splitSources.isEmpty()) { // contains local source List schedulingOrder = plan.getFragment().getTableScanSchedulingOrder(); ConnectorId connectorId = partitioningHandle.getConnectorId().orElseThrow(IllegalStateException::new); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 5c2040e6ade8a..9c2cbd9bfc753 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -288,6 +288,7 @@ public class FeaturesConfig private int eagerPlanValidationThreadPoolSize = 20; private boolean prestoSparkExecutionEnvironment; + private boolean singleNodeExecutionEnabled; public enum PartitioningPrecisionStrategy { @@ -2847,4 +2848,17 @@ public FeaturesConfig setPrestoSparkExecutionEnvironment(boolean prestoSparkExec this.prestoSparkExecutionEnvironment = prestoSparkExecutionEnvironment; return this; } + + public boolean isSingleNodeExecutionEnabled() + { + return singleNodeExecutionEnabled; + } + + @Config("single-node-execution-enabled") + @ConfigDescription("Enable single node execution") + public FeaturesConfig setSingleNodeExecutionEnabled(boolean singleNodeExecutionEnabled) + { + this.singleNodeExecutionEnabled = singleNodeExecutionEnabled; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index eba7eeb25efd6..190271f3e39ae 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -61,6 +61,7 @@ import java.util.Set; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables; import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames; @@ -174,6 +175,10 @@ public PlanNode visitOutput(OutputNode node, RewriteContext context.get().setSingleNodeDistribution(); } + if (isSingleNodeExecutionEnabled(session)) { + context.get().setSingleNodeDistribution(); + } + return context.defaultRewrite(node, context.get()); } @@ -268,6 +273,10 @@ public PlanNode visitValues(ValuesNode node, RewriteContext @Override public PlanNode visitExchange(ExchangeNode exchange, RewriteContext context) { + if (isSingleNodeExecutionEnabled(session)) { + context.get().setSingleNodeDistribution(); + } + switch (exchange.getScope()) { case LOCAL: return context.defaultRewrite(exchange, context.get()); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java index 4a226fea5ef4e..2ad78df0be9ce 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java @@ -47,6 +47,7 @@ import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES; import static com.facebook.presto.spi.StandardWarningCode.TOO_MANY_STAGES; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; @@ -94,8 +95,8 @@ public static SubPlan finalizeSubPlan( PartitioningHandle partitioningHandle) { subPlan = reassignPartitioningHandleIfNecessary(metadata, session, subPlan, partitioningHandle); - if (!noExchange) { - // grouped execution is not supported for SINGLE_DISTRIBUTION + if (!noExchange && !isSingleNodeExecutionEnabled(session)) { + // grouped execution is not supported for SINGLE_DISTRIBUTION or SINGLE_NODE_EXECUTION_ENABLED subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java index b03a6f52c5717..a900bb364d163 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java @@ -145,6 +145,7 @@ import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin; import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedLateralToJoin; import com.facebook.presto.sql.planner.optimizations.AddExchanges; +import com.facebook.presto.sql.planner.optimizations.AddExchangesForSingleNodeExecution; import com.facebook.presto.sql.planner.optimizations.AddLocalExchanges; import com.facebook.presto.sql.planner.optimizations.ApplyConnectorOptimization; import com.facebook.presto.sql.planner.optimizations.CheckSubqueryNodesAreRewritten; @@ -848,6 +849,7 @@ public PlanOptimizers( builder.add(new CteProjectionAndPredicatePushDown(metadata)); // must run before PhysicalCteOptimizer builder.add(new PhysicalCteOptimizer(metadata)); // Must run before AddExchanges builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchanges(metadata, partitioningProviderManager, featuresConfig.isNativeExecutionEnabled()))); + builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchangesForSingleNodeExecution(metadata))); } //noinspection UnusedAssignment diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 907a83ccb3077..b099f7766621a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -116,6 +116,7 @@ import static com.facebook.presto.SystemSessionProperties.isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled; import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites; import static com.facebook.presto.SystemSessionProperties.isScaleWriters; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isUseStreamingExchangeForMarkDistinctEnabled; import static com.facebook.presto.SystemSessionProperties.preferStreamingOperators; import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; @@ -170,12 +171,21 @@ public AddExchanges(Metadata metadata, PartitioningProviderManager partitioningP this.nativeExecution = nativeExecution; } + @Override + public boolean isEnabled(Session session) + { + return !isSingleNodeExecutionEnabled(session); + } + @Override public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) { - PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager, nativeExecution).accept(plan, PreferredProperties.any()); - boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent(); - return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered); + if (isEnabled(session)) { + PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager, nativeExecution).accept(plan, PreferredProperties.any()); + boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent(); + return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered); + } + return PlanOptimizerResult.optimizerResult(plan, false); } private class Rewriter diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangesForSingleNodeExecution.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangesForSingleNodeExecution.java new file mode 100644 index 0000000000000..228a5bbc16a07 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangesForSingleNodeExecution.java @@ -0,0 +1,150 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.optimizations; + +import com.facebook.presto.Session; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.VariableAllocator; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.Partitioning; +import com.facebook.presto.spi.plan.PartitioningScheme; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.TableFinishNode; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.relation.ConstantExpression; +import com.facebook.presto.sql.planner.TypeProvider; +import com.facebook.presto.sql.planner.plan.ChildReplacer; +import com.facebook.presto.sql.planner.plan.ExchangeNode; +import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode; +import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; +import com.facebook.presto.sql.planner.plan.StatisticsWriterNode; +import com.google.common.collect.ImmutableList; + +import java.util.Optional; + +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; +import static com.facebook.presto.common.type.BooleanType.BOOLEAN; +import static com.facebook.presto.sql.planner.PlannerUtils.containsSystemTableScan; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; +import static com.facebook.presto.sql.planner.iterative.rule.PickTableLayout.pushPredicateIntoTableScan; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.gatheringExchange; +import static java.util.Objects.requireNonNull; + +public class AddExchangesForSingleNodeExecution + implements PlanOptimizer +{ + private final Metadata metadata; + + public AddExchangesForSingleNodeExecution(Metadata metadata) + { + this.metadata = requireNonNull(metadata, "metadata is null"); + } + + @Override + public boolean isEnabled(Session session) + { + return isSingleNodeExecutionEnabled(session); + } + + @Override + public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) + { + if (isEnabled(session)) { + AddExchangesForSingleNodeExecution.Rewriter rewriter = new AddExchangesForSingleNodeExecution.Rewriter(idAllocator, metadata, session); + PlanNode rewrittenPlan = SimplePlanRewriter.rewriteWith(rewriter, plan, null); + return PlanOptimizerResult.optimizerResult(rewrittenPlan, rewriter.isPlanChanged()); + } + return PlanOptimizerResult.optimizerResult(plan, false); + } + + private class Rewriter + extends SimplePlanRewriter + { + private final PlanNodeIdAllocator idAllocator; + private final Metadata metadata; + private final Session session; + private boolean planChanged; + + private Rewriter(PlanNodeIdAllocator idAllocator, Metadata metadata, Session session) + { + this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + this.session = requireNonNull(session, "session is null"); + } + + public boolean isPlanChanged() + { + return planChanged; + } + + @Override + public PlanNode visitTableScan(TableScanNode node, RewriteContext context) + { + PlanNode plan = pushPredicateIntoTableScan(node, new ConstantExpression(true, BOOLEAN), true, session, idAllocator, metadata); + // Presto Java and Presto Native use different hash functions for partitioning + // An additional exchange makes sure the data flows through a native worker in case it need to be partitioned for downstream processing + if (containsSystemTableScan(plan)) { + plan = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, plan); + } + return plan; + } + + @Override + public PlanNode visitExplainAnalyze(ExplainAnalyzeNode node, RewriteContext context) + { + return addGatherExchange(node, context); + } + + @Override + public PlanNode visitTableFinish(TableFinishNode node, RewriteContext context) + { + return addGatherExchange(node, context); + } + + @Override + public PlanNode visitStatisticsWriterNode(StatisticsWriterNode node, RewriteContext context) + { + return addGatherExchange(node, context); + } + + private PlanNode addGatherExchange(PlanNode node, RewriteContext context) + { + PlanNode child = node.getSources().get(0).accept(this, context); + + ExchangeNode gather; + // In case the child is an exchange, don't add another exchange. Instead, convert it to gather exchange. + if (child instanceof ExchangeNode) { + ExchangeNode exchangeNode = (ExchangeNode) child; + gather = new ExchangeNode( + exchangeNode.getSourceLocation(), + idAllocator.getNextId(), + GATHER, + REMOTE_STREAMING, + new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), exchangeNode.getOutputVariables()), + exchangeNode.getSources(), + exchangeNode.getInputs(), + true, + Optional.empty()); + } + else { + gather = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, child); + } + planChanged = true; + return ChildReplacer.replaceChildren(node, ImmutableList.of(gather)); + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java index 38c29689e46f2..4b0e6b7564a7e 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java @@ -29,6 +29,7 @@ import java.util.List; import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.preferMergeJoinForSortedInputs; import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_FIRST; import static com.facebook.presto.spi.plan.JoinType.INNER; @@ -55,7 +56,7 @@ public void setEnabledForTesting(boolean isSet) @Override public boolean isEnabled(Session session) { - return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session); + return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session) && !isSingleNodeExecutionEnabled(session); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java index 51fd2c4668277..7ff225916331b 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Optional; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.spi.plan.AggregationNode.Step.FINAL; import static com.facebook.presto.spi.plan.AggregationNode.Step.INTERMEDIATE; import static com.facebook.presto.spi.plan.AggregationNode.Step.PARTIAL; @@ -114,7 +115,7 @@ public Optional visitAggregation(AggregationNode node, Void conte // No remote repartition exchange between final and partial aggregation. // Make sure that final aggregation operators are executed on a single node. ActualProperties globalProperties = PropertyDerivations.derivePropertiesRecursively(node, metadata, session); - checkArgument(noExchange || globalProperties.isSingleNode(), + checkArgument(isSingleNodeExecutionEnabled(session) || noExchange || globalProperties.isSingleNode(), "Final aggregation with default value not separated from partial aggregation by remote hash exchange"); if (!seenExchanges.localRepartitionExchange) { diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 4ec67782a1ab7..1844bf4d85663 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -246,7 +246,8 @@ public void testDefaults() .setInlineProjectionsOnValues(false) .setEagerPlanValidationEnabled(false) .setEagerPlanValidationThreadPoolSize(20) - .setPrestoSparkExecutionEnvironment(false)); + .setPrestoSparkExecutionEnvironment(false) + .setSingleNodeExecutionEnabled(false)); } @Test @@ -442,6 +443,7 @@ public void testExplicitPropertyMappings() .put("eager-plan-validation-enabled", "true") .put("eager-plan-validation-thread-pool-size", "2") .put("presto-spark-execution-environment", "true") + .put("single-node-execution-enabled", "true") .build(); FeaturesConfig expected = new FeaturesConfig() @@ -634,7 +636,8 @@ public void testExplicitPropertyMappings() .setInlineProjectionsOnValues(true) .setEagerPlanValidationEnabled(true) .setEagerPlanValidationThreadPoolSize(2) - .setPrestoSparkExecutionEnvironment(true); + .setPrestoSparkExecutionEnvironment(true) + .setSingleNodeExecutionEnabled(true); assertFullMapping(properties, expected); } diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index bbdf7f067d2c9..d0706c4aded9d 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -121,7 +121,7 @@ public static QueryRunner createQueryRunner( defaultQueryRunner.close(); - return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled); + return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled, false); } public static QueryRunner createJavaQueryRunner() @@ -325,7 +325,8 @@ public static QueryRunner createNativeQueryRunner( String storageFormat, boolean addStorageFormatToPath, Boolean failOnNestedLoopJoin, - boolean isCoordinatorSidecarEnabled) + boolean isCoordinatorSidecarEnabled, + boolean singleNodeExecutionEnabled) throws Exception { // The property "hive.allow-drop-table" needs to be set to true because security is always "legacy" in NativeQueryRunner. @@ -336,6 +337,9 @@ public static QueryRunner createNativeQueryRunner( ImmutableMap.Builder coordinatorProperties = ImmutableMap.builder(); coordinatorProperties.put("native-execution-enabled", "true"); + if (singleNodeExecutionEnabled) { + coordinatorProperties.put("single-node-execution-enabled", "true"); + } // Make query runner with external workers for tests return HiveQueryRunner.createQueryRunner( @@ -401,7 +405,7 @@ public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String s public static QueryRunner createNativeQueryRunner(String remoteFunctionServerUds) throws Exception { - return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false, false); + return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false, false, false); } public static QueryRunner createNativeQueryRunner(boolean useThrift) @@ -413,16 +417,22 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift) public static QueryRunner createNativeQueryRunner(boolean useThrift, boolean failOnNestedLoopJoin) throws Exception { - return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin, false); + return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin, false, false); } public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat) throws Exception { - return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false, false); + return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false, false, false); } - public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat, Optional remoteFunctionServerUds, Boolean failOnNestedLoopJoin, boolean isCoordinatorSidecarEnabled) + public static QueryRunner createNativeQueryRunner( + boolean useThrift, + String storageFormat, + Optional remoteFunctionServerUds, + Boolean failOnNestedLoopJoin, + boolean isCoordinatorSidecarEnabled, + boolean singleNodeExecutionEnabled) throws Exception { int cacheMaxSize = 0; @@ -437,7 +447,8 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift, String stor storageFormat, true, failOnNestedLoopJoin, - isCoordinatorSidecarEnabled); + isCoordinatorSidecarEnabled, + singleNodeExecutionEnabled); } // Start the remote function server. Return the UDS path used to communicate with it. diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleNode.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleNode.java new file mode 100644 index 0000000000000..10cba774d2ddc --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleNode.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; + +import java.util.Optional; + +public class TestPrestoNativeSystemQueriesSingleNode + extends AbstractTestNativeSystemQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner( + true, + "DWRF", + Optional.empty(), + false, + false, + true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaQueryRunner(); + } +}