Skip to content

Commit

Permalink
Add single node execution
Browse files Browse the repository at this point in the history
To improve performance for small queries which can be executed
within a single node, we introduce single worker execution mode:
query will only use one node to execute and plan would be optimized
accordingly.
  • Loading branch information
kewang1024 committed Dec 9, 2024
1 parent ed176b9 commit 61d73da
Show file tree
Hide file tree
Showing 16 changed files with 408 additions and 17 deletions.
12 changes: 12 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ Number of local parallel table writer threads per worker for partitioned writes.
set, the number set by ``task_writer_count`` will be used. It is required to be a power
of two for a Java query engine.

``single_node_execution_enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

This property ensures that queries scheduled in this cluster use only a single
node for execution, which may improve performance for small queries which can
be executed within a single node.

The corresponding configuration property is :ref:`admin/properties:\`\`single-node-execution-enabled\`\``.

Spilling Properties
-------------------

Expand Down
12 changes: 12 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ When enabled, the logical plan will begin to be built and validated before
queueing and allocation of cluster resources so that any errors or
incompatibilities in the query plan will fail quickly and inform the user.

``single-node-execution-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

This property ensures that queries scheduled in this cluster use only a single
node for execution, which may improve performance for small queries which can
be executed within a single node.

The corresponding session property is :ref:`admin/properties-session:\`\`single_node_execution_enabled\`\``.

.. _tuning-memory:

Memory Management Properties
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestDistributedQueries;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.util.Optional;

import static io.airlift.tpch.TpchTable.getTables;

@Test(singleThreaded = true)
public class TestDistributedQueriesSingleNode
extends AbstractTestDistributedQueries
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
ImmutableMap.Builder<String, String> coordinatorProperties = ImmutableMap.builder();
coordinatorProperties.put("single-node-execution-enabled", "true");
return HiveQueryRunner.createQueryRunner(
getTables(),
ImmutableMap.of(),
coordinatorProperties.build(),
Optional.empty());
}

@Override
protected boolean supportsNotNullColumns()
{
return false;
}

@Override
public void testDelete()
{
// Hive connector currently does not support row-by-row delete
}

@Override
public void testUpdate()
{
// Updates are not supported by the connector
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ public final class SystemSessionProperties
public static final String OPTIMIZER_USE_HISTOGRAMS = "optimizer_use_histograms";
public static final String WARN_ON_COMMON_NAN_PATTERNS = "warn_on_common_nan_patterns";
public static final String INLINE_PROJECTIONS_ON_VALUES = "inline_projections_on_values";
public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all";
Expand Down Expand Up @@ -1824,6 +1825,11 @@ public SystemSessionProperties(
NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING,
"Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only",
queryManagerConfig.getMinColumnarEncodingChannelsToPreferRowWiseEncoding(),
false),
booleanProperty(
SINGLE_NODE_EXECUTION_ENABLED,
"Enable single node execution",
featuresConfig.isSingleNodeExecutionEnabled(),
false));
}

Expand Down Expand Up @@ -2271,6 +2277,11 @@ public static boolean isNativeExecutionEnabled(Session session)
return session.getSystemProperty(NATIVE_EXECUTION_ENABLED, Boolean.class);
}

public static boolean isSingleNodeExecutionEnabled(Session session)
{
return session.getSystemProperty(SINGLE_NODE_EXECUTION_ENABLED, Boolean.class);
}

public static boolean isPushAggregationThroughJoin(Session session)
{
return session.getSystemProperty(PUSH_PARTIAL_AGGREGATION_THROUGH_JOIN, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE;
import static com.facebook.presto.util.Failures.checkCondition;
Expand Down Expand Up @@ -326,7 +327,58 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
return scheduler;
}
else {
if (!splitSources.isEmpty()) {
if (!splitSources.isEmpty() && (plan.getFragment().getPartitioning().equals(SINGLE_DISTRIBUTION))) {
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, null, nodePredicate);
List<InternalNode> nodes = nodeSelector.selectRandomNodes(1);
BucketNodeMap bucketNodeMap = new BucketNodeMap((split) -> 0) {
@Override
public int getBucketCount()
{
return 1;
}
@Override
public Optional<InternalNode> getAssignedNode(int bucketedId)
{
return Optional.of(nodes.get(0));
}
@Override
public boolean isBucketCacheable(int bucketedId)
{
return false;
}
@Override
public void assignOrUpdateBucketToNode(int bucketedId, InternalNode node, boolean cacheable)
{
}
@Override
public boolean isDynamic()
{
return true;
}
@Override
public boolean hasInitialMap()
{
return false;
}
@Override
public Optional<List<InternalNode>> getBucketToNode()
{
return Optional.of(nodes);
}
};
return new FixedSourcePartitionedScheduler(
stageExecution,
splitSources,
plan.getFragment().getStageExecutionDescriptor(),
plan.getFragment().getTableScanSchedulingOrder(),
nodes,
bucketNodeMap,
splitBatchSize,
getConcurrentLifespansPerNode(session),
nodeSelector,
ImmutableList.of(NOT_PARTITIONED));
}
else if (!splitSources.isEmpty()) {
// contains local source
List<PlanNodeId> schedulingOrder = plan.getFragment().getTableScanSchedulingOrder();
ConnectorId connectorId = partitioningHandle.getConnectorId().orElseThrow(IllegalStateException::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public class FeaturesConfig
private int eagerPlanValidationThreadPoolSize = 20;

private boolean prestoSparkExecutionEnvironment;
private boolean singleNodeExecutionEnabled;

public enum PartitioningPrecisionStrategy
{
Expand Down Expand Up @@ -2847,4 +2848,17 @@ public FeaturesConfig setPrestoSparkExecutionEnvironment(boolean prestoSparkExec
this.prestoSparkExecutionEnvironment = prestoSparkExecutionEnvironment;
return this;
}

public boolean isSingleNodeExecutionEnabled()
{
return singleNodeExecutionEnabled;
}

@Config("single-node-execution-enabled")
@ConfigDescription("Enable single node execution")
public FeaturesConfig setSingleNodeExecutionEnabled(boolean singleNodeExecutionEnabled)
{
this.singleNodeExecutionEnabled = singleNodeExecutionEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Set;

import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables;
import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames;
Expand Down Expand Up @@ -174,6 +175,10 @@ public PlanNode visitOutput(OutputNode node, RewriteContext<FragmentProperties>
context.get().setSingleNodeDistribution();
}

if (isSingleNodeExecutionEnabled(session)) {
context.get().setSingleNodeDistribution();
}

return context.defaultRewrite(node, context.get());
}

Expand Down Expand Up @@ -268,6 +273,10 @@ public PlanNode visitValues(ValuesNode node, RewriteContext<FragmentProperties>
@Override
public PlanNode visitExchange(ExchangeNode exchange, RewriteContext<FragmentProperties> context)
{
if (isSingleNodeExecutionEnabled(session)) {
context.get().setSingleNodeDistribution();
}

switch (exchange.getScope()) {
case LOCAL:
return context.defaultRewrite(exchange, context.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount;
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES;
import static com.facebook.presto.spi.StandardWarningCode.TOO_MANY_STAGES;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
Expand Down Expand Up @@ -94,8 +95,8 @@ public static SubPlan finalizeSubPlan(
PartitioningHandle partitioningHandle)
{
subPlan = reassignPartitioningHandleIfNecessary(metadata, session, subPlan, partitioningHandle);
if (!noExchange) {
// grouped execution is not supported for SINGLE_DISTRIBUTION
if (!noExchange && !isSingleNodeExecutionEnabled(session)) {
// grouped execution is not supported for SINGLE_DISTRIBUTION or SINGLE_NODE_EXECUTION_ENABLED
subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedLateralToJoin;
import com.facebook.presto.sql.planner.optimizations.AddExchanges;
import com.facebook.presto.sql.planner.optimizations.AddExchangesForSingleNodeExecution;
import com.facebook.presto.sql.planner.optimizations.AddLocalExchanges;
import com.facebook.presto.sql.planner.optimizations.ApplyConnectorOptimization;
import com.facebook.presto.sql.planner.optimizations.CheckSubqueryNodesAreRewritten;
Expand Down Expand Up @@ -848,6 +849,7 @@ public PlanOptimizers(
builder.add(new CteProjectionAndPredicatePushDown(metadata)); // must run before PhysicalCteOptimizer
builder.add(new PhysicalCteOptimizer(metadata)); // Must run before AddExchanges
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchanges(metadata, partitioningProviderManager, featuresConfig.isNativeExecutionEnabled())));
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchangesForSingleNodeExecution(metadata)));
}

//noinspection UnusedAssignment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import static com.facebook.presto.SystemSessionProperties.isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled;
import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites;
import static com.facebook.presto.SystemSessionProperties.isScaleWriters;
import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isUseStreamingExchangeForMarkDistinctEnabled;
import static com.facebook.presto.SystemSessionProperties.preferStreamingOperators;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
Expand Down Expand Up @@ -170,12 +171,21 @@ public AddExchanges(Metadata metadata, PartitioningProviderManager partitioningP
this.nativeExecution = nativeExecution;
}

@Override
public boolean isEnabled(Session session)
{
return !isSingleNodeExecutionEnabled(session);
}

@Override
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager, nativeExecution).accept(plan, PreferredProperties.any());
boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent();
return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered);
if (isEnabled(session)) {
PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager, nativeExecution).accept(plan, PreferredProperties.any());
boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent();
return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered);
}
return PlanOptimizerResult.optimizerResult(plan, false);
}

private class Rewriter
Expand Down
Loading

0 comments on commit 61d73da

Please sign in to comment.