diff --git a/presto-docs/src/main/sphinx/admin/properties-session.rst b/presto-docs/src/main/sphinx/admin/properties-session.rst index e0a3f230df99..b6f864d06a25 100644 --- a/presto-docs/src/main/sphinx/admin/properties-session.rst +++ b/presto-docs/src/main/sphinx/admin/properties-session.rst @@ -81,6 +81,18 @@ Number of local parallel table writer threads per worker for partitioned writes. set, the number set by ``task_writer_count`` will be used. It is required to be a power of two for a Java query engine. +``single_node_execution_enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``boolean`` +* **Default value:** ``false`` + +This property ensures that queries scheduled in this cluster use only a single +node for execution, which may improve performance for small queries which can +be executed within a single node. + +The corresponding configuration property is :ref:`admin/properties:\`\`single-node-execution-enabled\`\``. + Spilling Properties ------------------- diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index d28103662a9c..de49d8bcaa87 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -70,6 +70,18 @@ When enabled, the logical plan will begin to be built and validated before queueing and allocation of cluster resources so that any errors or incompatibilities in the query plan will fail quickly and inform the user. +``single-node-execution-enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``boolean`` +* **Default value:** ``false`` + +This property ensures that queries scheduled in this cluster use only a single +node for execution, which may improve performance for small queries which can +be executed within a single node. + +The corresponding session property is :ref:`admin/properties-session:\`\`single_node_execution_enabled\`\``. + .. _tuning-memory: Memory Management Properties diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java new file mode 100644 index 000000000000..bfaec5f22852 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestDistributedQueries; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static io.airlift.tpch.TpchTable.getTables; + +@Test(singleThreaded = true) +public class TestDistributedQueriesSingleNode + extends AbstractTestDistributedQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + ImmutableMap.Builder coordinatorProperties = ImmutableMap.builder(); + coordinatorProperties.put("single-node-execution-enabled", "true"); + return HiveQueryRunner.createQueryRunner( + getTables(), + ImmutableMap.of(), + coordinatorProperties.build(), + Optional.empty()); + } + + @Override + protected boolean supportsNotNullColumns() + { + return false; + } + + @Override + public void testDelete() + { + // Hive connector currently does not support row-by-row delete + } + + @Override + public void testUpdate() + { + // Updates are not supported by the connector + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 28165edd441f..8ab8d397d521 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -325,6 +325,7 @@ public final class SystemSessionProperties public static final String WARN_ON_COMMON_NAN_PATTERNS = "warn_on_common_nan_patterns"; public static final String INLINE_PROJECTIONS_ON_VALUES = "inline_projections_on_values"; public static final String INCLUDE_VALUES_NODE_IN_CONNECTOR_OPTIMIZER = "include_values_node_in_connector_optimizer"; + public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled"; // TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future. public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all"; @@ -1829,6 +1830,11 @@ public SystemSessionProperties( NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, "Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only", queryManagerConfig.getMinColumnarEncodingChannelsToPreferRowWiseEncoding(), + false), + booleanProperty( + SINGLE_NODE_EXECUTION_ENABLED, + "Enable single node execution", + featuresConfig.isSingleNodeExecutionEnabled(), false)); } @@ -2276,6 +2282,11 @@ public static boolean isNativeExecutionEnabled(Session session) return session.getSystemProperty(NATIVE_EXECUTION_ENABLED, Boolean.class); } + public static boolean isSingleNodeExecutionEnabled(Session session) + { + return session.getSystemProperty(SINGLE_NODE_EXECUTION_ENABLED, Boolean.class); + } + public static boolean isPushAggregationThroughJoin(Session session) { return session.getSystemProperty(PUSH_PARTIAL_AGGREGATION_THROUGH_JOIN, Boolean.class); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java index 3f885239d5d6..5cd18d03c4d9 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java @@ -25,6 +25,7 @@ import com.facebook.presto.execution.StageId; import com.facebook.presto.execution.TaskStatus; import com.facebook.presto.execution.buffer.OutputBuffers; +import com.facebook.presto.execution.scheduler.group.DynamicBucketNodeMap; import com.facebook.presto.execution.scheduler.nodeSelection.NodeSelector; import com.facebook.presto.failureDetector.FailureDetector; import com.facebook.presto.metadata.InternalNode; @@ -79,6 +80,7 @@ import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE; import static com.facebook.presto.util.Failures.checkCondition; @@ -326,7 +328,22 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) { return scheduler; } else { - if (!splitSources.isEmpty()) { + if (!splitSources.isEmpty() && (plan.getFragment().getPartitioning().equals(SINGLE_DISTRIBUTION))) { + NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, null, nodePredicate); + List nodes = nodeSelector.selectRandomNodes(1); + return new FixedSourcePartitionedScheduler( + stageExecution, + splitSources, + plan.getFragment().getStageExecutionDescriptor(), + plan.getFragment().getTableScanSchedulingOrder(), + nodes, + new DynamicBucketNodeMap((split) -> 0, 1), + splitBatchSize, + getConcurrentLifespansPerNode(session), + nodeSelector, + ImmutableList.of(NOT_PARTITIONED)); + } + else if (!splitSources.isEmpty()) { // contains local source List schedulingOrder = plan.getFragment().getTableScanSchedulingOrder(); ConnectorId connectorId = partitioningHandle.getConnectorId().orElseThrow(IllegalStateException::new); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 94a735c4b1a8..f7ea9174fe8e 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -289,6 +289,7 @@ public class FeaturesConfig private int eagerPlanValidationThreadPoolSize = 20; private boolean prestoSparkExecutionEnvironment; + private boolean singleNodeExecutionEnabled; public enum PartitioningPrecisionStrategy { @@ -2861,4 +2862,17 @@ public FeaturesConfig setPrestoSparkExecutionEnvironment(boolean prestoSparkExec this.prestoSparkExecutionEnvironment = prestoSparkExecutionEnvironment; return this; } + + public boolean isSingleNodeExecutionEnabled() + { + return singleNodeExecutionEnabled; + } + + @Config("single-node-execution-enabled") + @ConfigDescription("Enable single node execution") + public FeaturesConfig setSingleNodeExecutionEnabled(boolean singleNodeExecutionEnabled) + { + this.singleNodeExecutionEnabled = singleNodeExecutionEnabled; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index eba7eeb25efd..190271f3e39a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -61,6 +61,7 @@ import java.util.Set; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables; import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames; @@ -174,6 +175,10 @@ public PlanNode visitOutput(OutputNode node, RewriteContext context.get().setSingleNodeDistribution(); } + if (isSingleNodeExecutionEnabled(session)) { + context.get().setSingleNodeDistribution(); + } + return context.defaultRewrite(node, context.get()); } @@ -268,6 +273,10 @@ public PlanNode visitValues(ValuesNode node, RewriteContext @Override public PlanNode visitExchange(ExchangeNode exchange, RewriteContext context) { + if (isSingleNodeExecutionEnabled(session)) { + context.get().setSingleNodeDistribution(); + } + switch (exchange.getScope()) { case LOCAL: return context.defaultRewrite(exchange, context.get()); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java index 4a226fea5ef4..2ad78df0be9c 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java @@ -47,6 +47,7 @@ import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES; import static com.facebook.presto.spi.StandardWarningCode.TOO_MANY_STAGES; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; @@ -94,8 +95,8 @@ public static SubPlan finalizeSubPlan( PartitioningHandle partitioningHandle) { subPlan = reassignPartitioningHandleIfNecessary(metadata, session, subPlan, partitioningHandle); - if (!noExchange) { - // grouped execution is not supported for SINGLE_DISTRIBUTION + if (!noExchange && !isSingleNodeExecutionEnabled(session)) { + // grouped execution is not supported for SINGLE_DISTRIBUTION or SINGLE_NODE_EXECUTION_ENABLED subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java index b03a6f52c571..a900bb364d16 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java @@ -145,6 +145,7 @@ import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin; import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedLateralToJoin; import com.facebook.presto.sql.planner.optimizations.AddExchanges; +import com.facebook.presto.sql.planner.optimizations.AddExchangesForSingleNodeExecution; import com.facebook.presto.sql.planner.optimizations.AddLocalExchanges; import com.facebook.presto.sql.planner.optimizations.ApplyConnectorOptimization; import com.facebook.presto.sql.planner.optimizations.CheckSubqueryNodesAreRewritten; @@ -848,6 +849,7 @@ public PlanOptimizers( builder.add(new CteProjectionAndPredicatePushDown(metadata)); // must run before PhysicalCteOptimizer builder.add(new PhysicalCteOptimizer(metadata)); // Must run before AddExchanges builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchanges(metadata, partitioningProviderManager, featuresConfig.isNativeExecutionEnabled()))); + builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchangesForSingleNodeExecution(metadata))); } //noinspection UnusedAssignment diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 907a83ccb307..b099f7766621 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -116,6 +116,7 @@ import static com.facebook.presto.SystemSessionProperties.isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled; import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites; import static com.facebook.presto.SystemSessionProperties.isScaleWriters; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isUseStreamingExchangeForMarkDistinctEnabled; import static com.facebook.presto.SystemSessionProperties.preferStreamingOperators; import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; @@ -170,12 +171,21 @@ public AddExchanges(Metadata metadata, PartitioningProviderManager partitioningP this.nativeExecution = nativeExecution; } + @Override + public boolean isEnabled(Session session) + { + return !isSingleNodeExecutionEnabled(session); + } + @Override public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) { - PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager, nativeExecution).accept(plan, PreferredProperties.any()); - boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent(); - return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered); + if (isEnabled(session)) { + PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager, nativeExecution).accept(plan, PreferredProperties.any()); + boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent(); + return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered); + } + return PlanOptimizerResult.optimizerResult(plan, false); } private class Rewriter diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangesForSingleNodeExecution.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangesForSingleNodeExecution.java new file mode 100644 index 000000000000..228a5bbc16a0 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangesForSingleNodeExecution.java @@ -0,0 +1,150 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.optimizations; + +import com.facebook.presto.Session; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.VariableAllocator; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.Partitioning; +import com.facebook.presto.spi.plan.PartitioningScheme; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.TableFinishNode; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.relation.ConstantExpression; +import com.facebook.presto.sql.planner.TypeProvider; +import com.facebook.presto.sql.planner.plan.ChildReplacer; +import com.facebook.presto.sql.planner.plan.ExchangeNode; +import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode; +import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; +import com.facebook.presto.sql.planner.plan.StatisticsWriterNode; +import com.google.common.collect.ImmutableList; + +import java.util.Optional; + +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; +import static com.facebook.presto.common.type.BooleanType.BOOLEAN; +import static com.facebook.presto.sql.planner.PlannerUtils.containsSystemTableScan; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; +import static com.facebook.presto.sql.planner.iterative.rule.PickTableLayout.pushPredicateIntoTableScan; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.gatheringExchange; +import static java.util.Objects.requireNonNull; + +public class AddExchangesForSingleNodeExecution + implements PlanOptimizer +{ + private final Metadata metadata; + + public AddExchangesForSingleNodeExecution(Metadata metadata) + { + this.metadata = requireNonNull(metadata, "metadata is null"); + } + + @Override + public boolean isEnabled(Session session) + { + return isSingleNodeExecutionEnabled(session); + } + + @Override + public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) + { + if (isEnabled(session)) { + AddExchangesForSingleNodeExecution.Rewriter rewriter = new AddExchangesForSingleNodeExecution.Rewriter(idAllocator, metadata, session); + PlanNode rewrittenPlan = SimplePlanRewriter.rewriteWith(rewriter, plan, null); + return PlanOptimizerResult.optimizerResult(rewrittenPlan, rewriter.isPlanChanged()); + } + return PlanOptimizerResult.optimizerResult(plan, false); + } + + private class Rewriter + extends SimplePlanRewriter + { + private final PlanNodeIdAllocator idAllocator; + private final Metadata metadata; + private final Session session; + private boolean planChanged; + + private Rewriter(PlanNodeIdAllocator idAllocator, Metadata metadata, Session session) + { + this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + this.session = requireNonNull(session, "session is null"); + } + + public boolean isPlanChanged() + { + return planChanged; + } + + @Override + public PlanNode visitTableScan(TableScanNode node, RewriteContext context) + { + PlanNode plan = pushPredicateIntoTableScan(node, new ConstantExpression(true, BOOLEAN), true, session, idAllocator, metadata); + // Presto Java and Presto Native use different hash functions for partitioning + // An additional exchange makes sure the data flows through a native worker in case it need to be partitioned for downstream processing + if (containsSystemTableScan(plan)) { + plan = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, plan); + } + return plan; + } + + @Override + public PlanNode visitExplainAnalyze(ExplainAnalyzeNode node, RewriteContext context) + { + return addGatherExchange(node, context); + } + + @Override + public PlanNode visitTableFinish(TableFinishNode node, RewriteContext context) + { + return addGatherExchange(node, context); + } + + @Override + public PlanNode visitStatisticsWriterNode(StatisticsWriterNode node, RewriteContext context) + { + return addGatherExchange(node, context); + } + + private PlanNode addGatherExchange(PlanNode node, RewriteContext context) + { + PlanNode child = node.getSources().get(0).accept(this, context); + + ExchangeNode gather; + // In case the child is an exchange, don't add another exchange. Instead, convert it to gather exchange. + if (child instanceof ExchangeNode) { + ExchangeNode exchangeNode = (ExchangeNode) child; + gather = new ExchangeNode( + exchangeNode.getSourceLocation(), + idAllocator.getNextId(), + GATHER, + REMOTE_STREAMING, + new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), exchangeNode.getOutputVariables()), + exchangeNode.getSources(), + exchangeNode.getInputs(), + true, + Optional.empty()); + } + else { + gather = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, child); + } + planChanged = true; + return ChildReplacer.replaceChildren(node, ImmutableList.of(gather)); + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java index 38c29689e46f..4b0e6b7564a7 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java @@ -29,6 +29,7 @@ import java.util.List; import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.preferMergeJoinForSortedInputs; import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_FIRST; import static com.facebook.presto.spi.plan.JoinType.INNER; @@ -55,7 +56,7 @@ public void setEnabledForTesting(boolean isSet) @Override public boolean isEnabled(Session session) { - return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session); + return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session) && !isSingleNodeExecutionEnabled(session); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java index 51fd2c466827..7ff225916331 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Optional; +import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.spi.plan.AggregationNode.Step.FINAL; import static com.facebook.presto.spi.plan.AggregationNode.Step.INTERMEDIATE; import static com.facebook.presto.spi.plan.AggregationNode.Step.PARTIAL; @@ -114,7 +115,7 @@ public Optional visitAggregation(AggregationNode node, Void conte // No remote repartition exchange between final and partial aggregation. // Make sure that final aggregation operators are executed on a single node. ActualProperties globalProperties = PropertyDerivations.derivePropertiesRecursively(node, metadata, session); - checkArgument(noExchange || globalProperties.isSingleNode(), + checkArgument(isSingleNodeExecutionEnabled(session) || noExchange || globalProperties.isSingleNode(), "Final aggregation with default value not separated from partial aggregation by remote hash exchange"); if (!seenExchanges.localRepartitionExchange) { diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 0fe7a44088a9..ff8154866a60 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -247,7 +247,8 @@ public void testDefaults() .setIncludeValuesNodeInConnectorOptimizer(true) .setEagerPlanValidationEnabled(false) .setEagerPlanValidationThreadPoolSize(20) - .setPrestoSparkExecutionEnvironment(false)); + .setPrestoSparkExecutionEnvironment(false) + .setSingleNodeExecutionEnabled(false)); } @Test @@ -444,6 +445,7 @@ public void testExplicitPropertyMappings() .put("eager-plan-validation-enabled", "true") .put("eager-plan-validation-thread-pool-size", "2") .put("presto-spark-execution-environment", "true") + .put("single-node-execution-enabled", "true") .build(); FeaturesConfig expected = new FeaturesConfig() @@ -637,7 +639,8 @@ public void testExplicitPropertyMappings() .setIncludeValuesNodeInConnectorOptimizer(false) .setEagerPlanValidationEnabled(true) .setEagerPlanValidationThreadPoolSize(2) - .setPrestoSparkExecutionEnvironment(true); + .setPrestoSparkExecutionEnvironment(true) + .setSingleNodeExecutionEnabled(true); assertFullMapping(properties, expected); } diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index 589e8de462e1..58431a64f7ca 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -121,7 +121,7 @@ public static QueryRunner createQueryRunner( defaultQueryRunner.close(); - return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled); + return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled, false); } public static QueryRunner createJavaQueryRunner() @@ -325,7 +325,8 @@ public static QueryRunner createNativeQueryRunner( String storageFormat, boolean addStorageFormatToPath, Boolean failOnNestedLoopJoin, - boolean isCoordinatorSidecarEnabled) + boolean isCoordinatorSidecarEnabled, + boolean singleNodeExecutionEnabled) throws Exception { // The property "hive.allow-drop-table" needs to be set to true because security is always "legacy" in NativeQueryRunner. @@ -336,6 +337,9 @@ public static QueryRunner createNativeQueryRunner( ImmutableMap.Builder coordinatorProperties = ImmutableMap.builder(); coordinatorProperties.put("native-execution-enabled", "true"); + if (singleNodeExecutionEnabled) { + coordinatorProperties.put("single-node-execution-enabled", "true"); + } // Make query runner with external workers for tests return HiveQueryRunner.createQueryRunner( @@ -401,7 +405,7 @@ public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String s public static QueryRunner createNativeQueryRunner(String remoteFunctionServerUds) throws Exception { - return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false, false); + return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false, false, false); } public static QueryRunner createNativeQueryRunner(boolean useThrift) @@ -413,16 +417,22 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift) public static QueryRunner createNativeQueryRunner(boolean useThrift, boolean failOnNestedLoopJoin) throws Exception { - return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin, false); + return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin, false, false); } public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat) throws Exception { - return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false, false); + return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false, false, false); } - public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat, Optional remoteFunctionServerUds, Boolean failOnNestedLoopJoin, boolean isCoordinatorSidecarEnabled) + public static QueryRunner createNativeQueryRunner( + boolean useThrift, + String storageFormat, + Optional remoteFunctionServerUds, + Boolean failOnNestedLoopJoin, + boolean isCoordinatorSidecarEnabled, + boolean singleNodeExecutionEnabled) throws Exception { int cacheMaxSize = 0; @@ -437,7 +447,8 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift, String stor storageFormat, true, failOnNestedLoopJoin, - isCoordinatorSidecarEnabled); + isCoordinatorSidecarEnabled, + singleNodeExecutionEnabled); } // Start the remote function server. Return the UDS path used to communicate with it. diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleNode.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleNode.java new file mode 100644 index 000000000000..10cba774d2dd --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleNode.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; + +import java.util.Optional; + +public class TestPrestoNativeSystemQueriesSingleNode + extends AbstractTestNativeSystemQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner( + true, + "DWRF", + Optional.empty(), + false, + false, + true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaQueryRunner(); + } +}