Skip to content

Commit

Permalink
Add single node execution
Browse files Browse the repository at this point in the history
To improve latency of tiny queries running on a large cluster,
we introduce single worker execution mode: query will only use
one node to execute and plan would be optimized accordingly.
  • Loading branch information
kewang1024 committed Dec 6, 2024
1 parent ed176b9 commit 57fce4d
Show file tree
Hide file tree
Showing 14 changed files with 384 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestDistributedQueries;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.util.Optional;

import static io.airlift.tpch.TpchTable.getTables;

@Test(singleThreaded = true)
public class TestDistributedQueriesSingleNode
extends AbstractTestDistributedQueries
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
ImmutableMap.Builder<String, String> coordinatorProperties = ImmutableMap.builder();
coordinatorProperties.put("single-node-execution-enabled", "true");
return HiveQueryRunner.createQueryRunner(
getTables(),
ImmutableMap.of(),
coordinatorProperties.build(),
Optional.empty());
}

@Override
protected boolean supportsNotNullColumns()
{
return false;
}

@Override
public void testDelete()
{
// Hive connector currently does not support row-by-row delete
}

@Override
public void testUpdate()
{
// Updates are not supported by the connector
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ public final class SystemSessionProperties
public static final String OPTIMIZER_USE_HISTOGRAMS = "optimizer_use_histograms";
public static final String WARN_ON_COMMON_NAN_PATTERNS = "warn_on_common_nan_patterns";
public static final String INLINE_PROJECTIONS_ON_VALUES = "inline_projections_on_values";
public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all";
Expand Down Expand Up @@ -1824,6 +1825,11 @@ public SystemSessionProperties(
NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING,
"Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only",
queryManagerConfig.getMinColumnarEncodingChannelsToPreferRowWiseEncoding(),
false),
booleanProperty(
SINGLE_NODE_EXECUTION_ENABLED,
"Enable single node execution",
featuresConfig.isSingleNodeExecutionEnabled(),
false));
}

Expand Down Expand Up @@ -2271,6 +2277,11 @@ public static boolean isNativeExecutionEnabled(Session session)
return session.getSystemProperty(NATIVE_EXECUTION_ENABLED, Boolean.class);
}

public static boolean isSingleNodeExecutionEnabled(Session session)
{
return session.getSystemProperty(SINGLE_NODE_EXECUTION_ENABLED, Boolean.class);
}

public static boolean isPushAggregationThroughJoin(Session session)
{
return session.getSystemProperty(PUSH_PARTIAL_AGGREGATION_THROUGH_JOIN, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE;
import static com.facebook.presto.util.Failures.checkCondition;
Expand Down Expand Up @@ -326,7 +327,58 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
return scheduler;
}
else {
if (!splitSources.isEmpty()) {
if (!splitSources.isEmpty() && (plan.getFragment().getPartitioning().equals(SINGLE_DISTRIBUTION))) {
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, null, nodePredicate);
List<InternalNode> nodes = nodeSelector.selectRandomNodes(1);
BucketNodeMap bucketNodeMap = new BucketNodeMap((split) -> 0) {
@Override
public int getBucketCount()
{
return 1;
}
@Override
public Optional<InternalNode> getAssignedNode(int bucketedId)
{
return Optional.of(nodes.get(0));
}
@Override
public boolean isBucketCacheable(int bucketedId)
{
return false;
}
@Override
public void assignOrUpdateBucketToNode(int bucketedId, InternalNode node, boolean cacheable)
{
}
@Override
public boolean isDynamic()
{
return true;
}
@Override
public boolean hasInitialMap()
{
return false;
}
@Override
public Optional<List<InternalNode>> getBucketToNode()
{
return Optional.of(nodes);
}
};
return new FixedSourcePartitionedScheduler(
stageExecution,
splitSources,
plan.getFragment().getStageExecutionDescriptor(),
plan.getFragment().getTableScanSchedulingOrder(),
nodes,
bucketNodeMap,
splitBatchSize,
getConcurrentLifespansPerNode(session),
nodeSelector,
ImmutableList.of(NOT_PARTITIONED));
}
else if (!splitSources.isEmpty()) {
// contains local source
List<PlanNodeId> schedulingOrder = plan.getFragment().getTableScanSchedulingOrder();
ConnectorId connectorId = partitioningHandle.getConnectorId().orElseThrow(IllegalStateException::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public class FeaturesConfig
private int eagerPlanValidationThreadPoolSize = 20;

private boolean prestoSparkExecutionEnvironment;
private boolean singleNodeExecutionEnabled;

public enum PartitioningPrecisionStrategy
{
Expand Down Expand Up @@ -2847,4 +2848,17 @@ public FeaturesConfig setPrestoSparkExecutionEnvironment(boolean prestoSparkExec
this.prestoSparkExecutionEnvironment = prestoSparkExecutionEnvironment;
return this;
}

public boolean isSingleNodeExecutionEnabled()
{
return singleNodeExecutionEnabled;
}

@Config("single-node-execution-enabled")
@ConfigDescription("Enable single node execution")
public FeaturesConfig setSingleNodeExecutionEnabled(boolean singleNodeExecutionEnabled)
{
this.singleNodeExecutionEnabled = singleNodeExecutionEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Set;

import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables;
import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames;
Expand Down Expand Up @@ -174,6 +175,10 @@ public PlanNode visitOutput(OutputNode node, RewriteContext<FragmentProperties>
context.get().setSingleNodeDistribution();
}

if (isSingleNodeExecutionEnabled(session)) {
context.get().setSingleNodeDistribution();
}

return context.defaultRewrite(node, context.get());
}

Expand Down Expand Up @@ -268,6 +273,10 @@ public PlanNode visitValues(ValuesNode node, RewriteContext<FragmentProperties>
@Override
public PlanNode visitExchange(ExchangeNode exchange, RewriteContext<FragmentProperties> context)
{
if (isSingleNodeExecutionEnabled(session)) {
context.get().setSingleNodeDistribution();
}

switch (exchange.getScope()) {
case LOCAL:
return context.defaultRewrite(exchange, context.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount;
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES;
import static com.facebook.presto.spi.StandardWarningCode.TOO_MANY_STAGES;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
Expand Down Expand Up @@ -94,8 +95,8 @@ public static SubPlan finalizeSubPlan(
PartitioningHandle partitioningHandle)
{
subPlan = reassignPartitioningHandleIfNecessary(metadata, session, subPlan, partitioningHandle);
if (!noExchange) {
// grouped execution is not supported for SINGLE_DISTRIBUTION
if (!noExchange && !isSingleNodeExecutionEnabled(session)) {
// grouped execution is not supported for SINGLE_DISTRIBUTION or SINGLE_NODE_EXECUTION_ENABLED
subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedLateralToJoin;
import com.facebook.presto.sql.planner.optimizations.AddExchanges;
import com.facebook.presto.sql.planner.optimizations.AddExchangesForSingleNodeExecution;
import com.facebook.presto.sql.planner.optimizations.AddLocalExchanges;
import com.facebook.presto.sql.planner.optimizations.ApplyConnectorOptimization;
import com.facebook.presto.sql.planner.optimizations.CheckSubqueryNodesAreRewritten;
Expand Down Expand Up @@ -848,6 +849,7 @@ public PlanOptimizers(
builder.add(new CteProjectionAndPredicatePushDown(metadata)); // must run before PhysicalCteOptimizer
builder.add(new PhysicalCteOptimizer(metadata)); // Must run before AddExchanges
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchanges(metadata, partitioningProviderManager, featuresConfig.isNativeExecutionEnabled())));
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchangesForSingleNodeExecution(metadata)));
}

//noinspection UnusedAssignment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import static com.facebook.presto.SystemSessionProperties.isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled;
import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites;
import static com.facebook.presto.SystemSessionProperties.isScaleWriters;
import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isUseStreamingExchangeForMarkDistinctEnabled;
import static com.facebook.presto.SystemSessionProperties.preferStreamingOperators;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
Expand Down Expand Up @@ -170,12 +171,21 @@ public AddExchanges(Metadata metadata, PartitioningProviderManager partitioningP
this.nativeExecution = nativeExecution;
}

@Override
public boolean isEnabled(Session session)
{
return !isSingleNodeExecutionEnabled(session);
}

@Override
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager, nativeExecution).accept(plan, PreferredProperties.any());
boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent();
return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered);
if (isEnabled(session)) {
PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager, nativeExecution).accept(plan, PreferredProperties.any());
boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent();
return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered);
}
return PlanOptimizerResult.optimizerResult(plan, false);
}

private class Rewriter
Expand Down
Loading

0 comments on commit 57fce4d

Please sign in to comment.