From fb4ad5884e9bfeb4003057c1a794b02e0108f523 Mon Sep 17 00:00:00 2001 From: Ke Date: Thu, 5 Dec 2024 10:46:26 -0800 Subject: [PATCH] Add single node execution To improve latency of tiny queries running on a large cluster, we introduce single worker execution mode: query will only use one node to execute and plan would be optimized accordingly. --- .../main/sphinx/admin/properties-session.rst | 9 ++ .../src/main/sphinx/admin/properties.rst | 9 ++ .../TestDistributedQueriesSingleNode.java | 59 +++++++ .../presto/SystemSessionProperties.java | 11 ++ .../scheduler/SectionExecutionFactory.java | 54 ++++++- .../presto/sql/analyzer/FeaturesConfig.java | 14 ++ .../sql/planner/BasePlanFragmenter.java | 9 ++ .../sql/planner/PlanFragmenterUtils.java | 5 +- .../presto/sql/planner/PlanOptimizers.java | 2 + .../planner/optimizations/AddExchanges.java | 16 +- .../AddExchangesForSingleNodeExecution.java | 150 ++++++++++++++++++ .../MergeJoinForSortedInputOptimizer.java | 3 +- ...ValidateAggregationsWithDefaultValues.java | 3 +- .../sql/analyzer/TestFeaturesConfig.java | 7 +- .../PrestoNativeQueryRunnerUtils.java | 25 ++- ...stPrestoNativeSystemQueriesSingleNode.java | 43 +++++ 16 files changed, 402 insertions(+), 17 deletions(-) create mode 100644 presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java create mode 100644 presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangesForSingleNodeExecution.java create mode 100644 presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleNode.java diff --git a/presto-docs/src/main/sphinx/admin/properties-session.rst b/presto-docs/src/main/sphinx/admin/properties-session.rst index e0a3f230df991..f1c686786c96b 100644 --- a/presto-docs/src/main/sphinx/admin/properties-session.rst +++ b/presto-docs/src/main/sphinx/admin/properties-session.rst @@ -81,6 +81,15 @@ Number of local parallel table writer threads per worker for partitioned writes. set, the number set by ``task_writer_count`` will be used. It is required to be a power of two for a Java query engine. +``single_node_execution_enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``boolean`` +* **Default value:** ``false`` + +This property enables running the query on a single node on a large cluster setup. +The corresponding configuration property is :ref:`admin/properties:\`\`single-node-execution-enabled\`\``. + Spilling Properties ------------------- diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index d28103662a9c4..9188ce93738ea 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -70,6 +70,15 @@ When enabled, the logical plan will begin to be built and validated before queueing and allocation of cluster resources so that any errors or incompatibilities in the query plan will fail quickly and inform the user. +``single-node-execution-enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``boolean`` +* **Default value:** ``false`` + +This property enables running the query on a single node on a large cluster setup. +This feature can benefit the low latency use cases such as tiny queries. + .. _tuning-memory: Memory Management Properties diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java new file mode 100644 index 0000000000000..bfaec5f22852d --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestDistributedQueries; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static io.airlift.tpch.TpchTable.getTables; + +@Test(singleThreaded = true) +public class TestDistributedQueriesSingleNode + extends AbstractTestDistributedQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + ImmutableMap.Builder coordinatorProperties = ImmutableMap.builder(); + coordinatorProperties.put("single-node-execution-enabled", "true"); + return HiveQueryRunner.createQueryRunner( + getTables(), + ImmutableMap.of(), + coordinatorProperties.build(), + Optional.empty()); + } + + @Override + protected boolean supportsNotNullColumns() + { + return false; + } + + @Override + public void testDelete() + { + // Hive connector currently does not support row-by-row delete + } + + @Override + public void testUpdate() + { + // Updates are not supported by the connector + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index f66aeaf89a2ee..058ce20281284 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -324,6 +324,7 @@ public final class SystemSessionProperties public static final String OPTIMIZER_USE_HISTOGRAMS = "optimizer_use_histograms"; public static final String WARN_ON_COMMON_NAN_PATTERNS = "warn_on_common_nan_patterns"; public static final String INLINE_PROJECTIONS_ON_VALUES = "inline_projections_on_values"; + public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled"; // TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future. public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all"; @@ -1824,6 +1825,11 @@ public SystemSessionProperties( NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, "Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only", queryManagerConfig.getMinColumnarEncodingChannelsToPreferRowWiseEncoding(), + false), + booleanProperty( + SINGLE_NODE_EXECUTION_ENABLED, + "Enable single node execution", + featuresConfig.isSingleNodeExecutionEnabled(), false)); } @@ -2271,6 +2277,11 @@ public static boolean isNativeExecutionEnabled(Session session) return session.getSystemProperty(NATIVE_EXECUTION_ENABLED, Boolean.class); } + public static boolean isSingleNodeExecutionEnabled(Session session) + { + return session.getSystemProperty(SINGLE_NODE_EXECUTION_ENABLED, Boolean.class); + } + public static boolean isPushAggregationThroughJoin(Session session) { return session.getSystemProperty(PUSH_PARTIAL_AGGREGATION_THROUGH_JOIN, Boolean.class); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java index 3f885239d5d6d..9eee31487c64f 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java @@ -79,6 +79,7 @@ import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE; import static com.facebook.presto.util.Failures.checkCondition; @@ -326,7 +327,58 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) { return scheduler; } else { - if (!splitSources.isEmpty()) { + if (!splitSources.isEmpty() && (plan.getFragment().getPartitioning().equals(SINGLE_DISTRIBUTION))) { + NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, null, nodePredicate); + List nodes = nodeSelector.selectRandomNodes(1); + BucketNodeMap bucketNodeMap = new BucketNodeMap((split) -> 0) { + @Override + public int getBucketCount() + { + return 1; + } + @Override + public Optional getAssignedNode(int bucketedId) + { + return Optional.of(nodes.get(0)); + } + @Override + public boolean isBucketCacheable(int bucketedId) + { + return false; + } + @Override + public void assignOrUpdateBucketToNode(int bucketedId, InternalNode node, boolean cacheable) + { + } + @Override + public boolean isDynamic() + { + return true; + } + @Override + public boolean hasInitialMap() + { + return false; + } + @Override + public Optional> getBucketToNode() + { + return Optional.of(nodes); + } + }; + return new FixedSourcePartitionedScheduler( + stageExecution, + splitSources, + plan.getFragment().getStageExecutionDescriptor(), + plan.getFragment().getTableScanSchedulingOrder(), + nodes, + bucketNodeMap, + splitBatchSize, + getConcurrentLifespansPerNode(session), + nodeSelector, + ImmutableList.of(NOT_PARTITIONED)); + } + else if (!splitSources.isEmpty()) { // contains local source List schedulingOrder = plan.getFragment().getTableScanSchedulingOrder(); ConnectorId connectorId = partitioningHandle.getConnectorId().orElseThrow(IllegalStateException::new); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 5c2040e6ade8a..9c2cbd9bfc753 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -288,6 +288,7 @@ public class FeaturesConfig private int eagerPlanValidationThreadPoolSize = 20; private boolean prestoSparkExecutionEnvironment; + private boolean singleNodeExecutionEnabled; public enum PartitioningPrecisionStrategy { @@ -2847,4 +2848,17 @@ public FeaturesConfig setPrestoSparkExecutionEnvironment(boolean prestoSparkExec this.prestoSparkExecutionEnvironment = prestoSparkExecutionEnvironment; return this; } + + public boolean isSingleNodeExecutionEnabled() + { + return singleNodeExecutionEnabled; + } + + @Config("single-node-execution-enabled") + @ConfigDescription("Enable single node execution") + public FeaturesConfig setSingleNodeExecutionEnabled(boolean singleNodeExecutionEnabled) + { + this.singleNodeExecutionEnabled = singleNodeExecutionEnabled; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index eba7eeb25efd6..190271f3e39ae 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -61,6 +61,7 @@ import java.util.Set; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables; import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames; @@ -174,6 +175,10 @@ public PlanNode visitOutput(OutputNode node, RewriteContext context.get().setSingleNodeDistribution(); } + if (isSingleNodeExecutionEnabled(session)) { + context.get().setSingleNodeDistribution(); + } + return context.defaultRewrite(node, context.get()); } @@ -268,6 +273,10 @@ public PlanNode visitValues(ValuesNode node, RewriteContext @Override public PlanNode visitExchange(ExchangeNode exchange, RewriteContext context) { + if (isSingleNodeExecutionEnabled(session)) { + context.get().setSingleNodeDistribution(); + } + switch (exchange.getScope()) { case LOCAL: return context.defaultRewrite(exchange, context.get()); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java index 4a226fea5ef4e..2ad78df0be9ce 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java @@ -47,6 +47,7 @@ import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES; import static com.facebook.presto.spi.StandardWarningCode.TOO_MANY_STAGES; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; @@ -94,8 +95,8 @@ public static SubPlan finalizeSubPlan( PartitioningHandle partitioningHandle) { subPlan = reassignPartitioningHandleIfNecessary(metadata, session, subPlan, partitioningHandle); - if (!noExchange) { - // grouped execution is not supported for SINGLE_DISTRIBUTION + if (!noExchange && !isSingleNodeExecutionEnabled(session)) { + // grouped execution is not supported for SINGLE_DISTRIBUTION or SINGLE_NODE_EXECUTION_ENABLED subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java index b03a6f52c5717..a900bb364d163 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java @@ -145,6 +145,7 @@ import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin; import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedLateralToJoin; import com.facebook.presto.sql.planner.optimizations.AddExchanges; +import com.facebook.presto.sql.planner.optimizations.AddExchangesForSingleNodeExecution; import com.facebook.presto.sql.planner.optimizations.AddLocalExchanges; import com.facebook.presto.sql.planner.optimizations.ApplyConnectorOptimization; import com.facebook.presto.sql.planner.optimizations.CheckSubqueryNodesAreRewritten; @@ -848,6 +849,7 @@ public PlanOptimizers( builder.add(new CteProjectionAndPredicatePushDown(metadata)); // must run before PhysicalCteOptimizer builder.add(new PhysicalCteOptimizer(metadata)); // Must run before AddExchanges builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchanges(metadata, partitioningProviderManager, featuresConfig.isNativeExecutionEnabled()))); + builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchangesForSingleNodeExecution(metadata))); } //noinspection UnusedAssignment diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 907a83ccb3077..b099f7766621a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -116,6 +116,7 @@ import static com.facebook.presto.SystemSessionProperties.isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled; import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites; import static com.facebook.presto.SystemSessionProperties.isScaleWriters; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isUseStreamingExchangeForMarkDistinctEnabled; import static com.facebook.presto.SystemSessionProperties.preferStreamingOperators; import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; @@ -170,12 +171,21 @@ public AddExchanges(Metadata metadata, PartitioningProviderManager partitioningP this.nativeExecution = nativeExecution; } + @Override + public boolean isEnabled(Session session) + { + return !isSingleNodeExecutionEnabled(session); + } + @Override public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) { - PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager, nativeExecution).accept(plan, PreferredProperties.any()); - boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent(); - return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered); + if (isEnabled(session)) { + PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager, nativeExecution).accept(plan, PreferredProperties.any()); + boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent(); + return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered); + } + return PlanOptimizerResult.optimizerResult(plan, false); } private class Rewriter diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangesForSingleNodeExecution.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangesForSingleNodeExecution.java new file mode 100644 index 0000000000000..228a5bbc16a07 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangesForSingleNodeExecution.java @@ -0,0 +1,150 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.optimizations; + +import com.facebook.presto.Session; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.VariableAllocator; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.Partitioning; +import com.facebook.presto.spi.plan.PartitioningScheme; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.TableFinishNode; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.relation.ConstantExpression; +import com.facebook.presto.sql.planner.TypeProvider; +import com.facebook.presto.sql.planner.plan.ChildReplacer; +import com.facebook.presto.sql.planner.plan.ExchangeNode; +import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode; +import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; +import com.facebook.presto.sql.planner.plan.StatisticsWriterNode; +import com.google.common.collect.ImmutableList; + +import java.util.Optional; + +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; +import static com.facebook.presto.common.type.BooleanType.BOOLEAN; +import static com.facebook.presto.sql.planner.PlannerUtils.containsSystemTableScan; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; +import static com.facebook.presto.sql.planner.iterative.rule.PickTableLayout.pushPredicateIntoTableScan; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.gatheringExchange; +import static java.util.Objects.requireNonNull; + +public class AddExchangesForSingleNodeExecution + implements PlanOptimizer +{ + private final Metadata metadata; + + public AddExchangesForSingleNodeExecution(Metadata metadata) + { + this.metadata = requireNonNull(metadata, "metadata is null"); + } + + @Override + public boolean isEnabled(Session session) + { + return isSingleNodeExecutionEnabled(session); + } + + @Override + public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) + { + if (isEnabled(session)) { + AddExchangesForSingleNodeExecution.Rewriter rewriter = new AddExchangesForSingleNodeExecution.Rewriter(idAllocator, metadata, session); + PlanNode rewrittenPlan = SimplePlanRewriter.rewriteWith(rewriter, plan, null); + return PlanOptimizerResult.optimizerResult(rewrittenPlan, rewriter.isPlanChanged()); + } + return PlanOptimizerResult.optimizerResult(plan, false); + } + + private class Rewriter + extends SimplePlanRewriter + { + private final PlanNodeIdAllocator idAllocator; + private final Metadata metadata; + private final Session session; + private boolean planChanged; + + private Rewriter(PlanNodeIdAllocator idAllocator, Metadata metadata, Session session) + { + this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + this.session = requireNonNull(session, "session is null"); + } + + public boolean isPlanChanged() + { + return planChanged; + } + + @Override + public PlanNode visitTableScan(TableScanNode node, RewriteContext context) + { + PlanNode plan = pushPredicateIntoTableScan(node, new ConstantExpression(true, BOOLEAN), true, session, idAllocator, metadata); + // Presto Java and Presto Native use different hash functions for partitioning + // An additional exchange makes sure the data flows through a native worker in case it need to be partitioned for downstream processing + if (containsSystemTableScan(plan)) { + plan = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, plan); + } + return plan; + } + + @Override + public PlanNode visitExplainAnalyze(ExplainAnalyzeNode node, RewriteContext context) + { + return addGatherExchange(node, context); + } + + @Override + public PlanNode visitTableFinish(TableFinishNode node, RewriteContext context) + { + return addGatherExchange(node, context); + } + + @Override + public PlanNode visitStatisticsWriterNode(StatisticsWriterNode node, RewriteContext context) + { + return addGatherExchange(node, context); + } + + private PlanNode addGatherExchange(PlanNode node, RewriteContext context) + { + PlanNode child = node.getSources().get(0).accept(this, context); + + ExchangeNode gather; + // In case the child is an exchange, don't add another exchange. Instead, convert it to gather exchange. + if (child instanceof ExchangeNode) { + ExchangeNode exchangeNode = (ExchangeNode) child; + gather = new ExchangeNode( + exchangeNode.getSourceLocation(), + idAllocator.getNextId(), + GATHER, + REMOTE_STREAMING, + new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), exchangeNode.getOutputVariables()), + exchangeNode.getSources(), + exchangeNode.getInputs(), + true, + Optional.empty()); + } + else { + gather = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, child); + } + planChanged = true; + return ChildReplacer.replaceChildren(node, ImmutableList.of(gather)); + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java index 38c29689e46f2..4b0e6b7564a7e 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java @@ -29,6 +29,7 @@ import java.util.List; import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.preferMergeJoinForSortedInputs; import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_FIRST; import static com.facebook.presto.spi.plan.JoinType.INNER; @@ -55,7 +56,7 @@ public void setEnabledForTesting(boolean isSet) @Override public boolean isEnabled(Session session) { - return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session); + return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session) && !isSingleNodeExecutionEnabled(session); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java index 51fd2c4668277..7ff225916331b 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Optional; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.spi.plan.AggregationNode.Step.FINAL; import static com.facebook.presto.spi.plan.AggregationNode.Step.INTERMEDIATE; import static com.facebook.presto.spi.plan.AggregationNode.Step.PARTIAL; @@ -114,7 +115,7 @@ public Optional visitAggregation(AggregationNode node, Void conte // No remote repartition exchange between final and partial aggregation. // Make sure that final aggregation operators are executed on a single node. ActualProperties globalProperties = PropertyDerivations.derivePropertiesRecursively(node, metadata, session); - checkArgument(noExchange || globalProperties.isSingleNode(), + checkArgument(isSingleNodeExecutionEnabled(session) || noExchange || globalProperties.isSingleNode(), "Final aggregation with default value not separated from partial aggregation by remote hash exchange"); if (!seenExchanges.localRepartitionExchange) { diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 4ec67782a1ab7..1844bf4d85663 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -246,7 +246,8 @@ public void testDefaults() .setInlineProjectionsOnValues(false) .setEagerPlanValidationEnabled(false) .setEagerPlanValidationThreadPoolSize(20) - .setPrestoSparkExecutionEnvironment(false)); + .setPrestoSparkExecutionEnvironment(false) + .setSingleNodeExecutionEnabled(false)); } @Test @@ -442,6 +443,7 @@ public void testExplicitPropertyMappings() .put("eager-plan-validation-enabled", "true") .put("eager-plan-validation-thread-pool-size", "2") .put("presto-spark-execution-environment", "true") + .put("single-node-execution-enabled", "true") .build(); FeaturesConfig expected = new FeaturesConfig() @@ -634,7 +636,8 @@ public void testExplicitPropertyMappings() .setInlineProjectionsOnValues(true) .setEagerPlanValidationEnabled(true) .setEagerPlanValidationThreadPoolSize(2) - .setPrestoSparkExecutionEnvironment(true); + .setPrestoSparkExecutionEnvironment(true) + .setSingleNodeExecutionEnabled(true); assertFullMapping(properties, expected); } diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index bbdf7f067d2c9..d0706c4aded9d 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -121,7 +121,7 @@ public static QueryRunner createQueryRunner( defaultQueryRunner.close(); - return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled); + return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled, false); } public static QueryRunner createJavaQueryRunner() @@ -325,7 +325,8 @@ public static QueryRunner createNativeQueryRunner( String storageFormat, boolean addStorageFormatToPath, Boolean failOnNestedLoopJoin, - boolean isCoordinatorSidecarEnabled) + boolean isCoordinatorSidecarEnabled, + boolean singleNodeExecutionEnabled) throws Exception { // The property "hive.allow-drop-table" needs to be set to true because security is always "legacy" in NativeQueryRunner. @@ -336,6 +337,9 @@ public static QueryRunner createNativeQueryRunner( ImmutableMap.Builder coordinatorProperties = ImmutableMap.builder(); coordinatorProperties.put("native-execution-enabled", "true"); + if (singleNodeExecutionEnabled) { + coordinatorProperties.put("single-node-execution-enabled", "true"); + } // Make query runner with external workers for tests return HiveQueryRunner.createQueryRunner( @@ -401,7 +405,7 @@ public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String s public static QueryRunner createNativeQueryRunner(String remoteFunctionServerUds) throws Exception { - return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false, false); + return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false, false, false); } public static QueryRunner createNativeQueryRunner(boolean useThrift) @@ -413,16 +417,22 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift) public static QueryRunner createNativeQueryRunner(boolean useThrift, boolean failOnNestedLoopJoin) throws Exception { - return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin, false); + return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin, false, false); } public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat) throws Exception { - return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false, false); + return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false, false, false); } - public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat, Optional remoteFunctionServerUds, Boolean failOnNestedLoopJoin, boolean isCoordinatorSidecarEnabled) + public static QueryRunner createNativeQueryRunner( + boolean useThrift, + String storageFormat, + Optional remoteFunctionServerUds, + Boolean failOnNestedLoopJoin, + boolean isCoordinatorSidecarEnabled, + boolean singleNodeExecutionEnabled) throws Exception { int cacheMaxSize = 0; @@ -437,7 +447,8 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift, String stor storageFormat, true, failOnNestedLoopJoin, - isCoordinatorSidecarEnabled); + isCoordinatorSidecarEnabled, + singleNodeExecutionEnabled); } // Start the remote function server. Return the UDS path used to communicate with it. diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleNode.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleNode.java new file mode 100644 index 0000000000000..10cba774d2ddc --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleNode.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; + +import java.util.Optional; + +public class TestPrestoNativeSystemQueriesSingleNode + extends AbstractTestNativeSystemQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner( + true, + "DWRF", + Optional.empty(), + false, + false, + true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaQueryRunner(); + } +}