Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add single worker execution mode for native execution #24172

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ public final class SystemSessionProperties
// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all";
private static final String NATIVE_EXECUTION_ENABLED = "native_execution_enabled";
private static final String NATIVE_SINGLE_WORKER_EXECUTION = "native_single_worker_execution";
private static final String NATIVE_EXECUTION_EXECUTABLE_PATH = "native_execution_executable_path";
private static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments";
public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled";
Expand Down Expand Up @@ -1536,6 +1537,11 @@ public SystemSessionProperties(
"Enable execution on native engine",
featuresConfig.isNativeExecutionEnabled(),
true),
booleanProperty(
NATIVE_SINGLE_WORKER_EXECUTION,
"Enable single worker execution on native engine",
featuresConfig.isNativeSingleWorkerExecution(),
true),
booleanProperty(
NATIVE_EXECUTION_PROCESS_REUSE_ENABLED,
"Enable reuse the native process within the same JVM",
Expand Down Expand Up @@ -2271,6 +2277,11 @@ public static boolean isNativeExecutionEnabled(Session session)
return session.getSystemProperty(NATIVE_EXECUTION_ENABLED, Boolean.class);
}

public static boolean isNativeSingleWorkerExecution(Session session)
{
return session.getSystemProperty(NATIVE_SINGLE_WORKER_EXECUTION, Boolean.class);
}

public static boolean isPushAggregationThroughJoin(Session session)
{
return session.getSystemProperty(PUSH_PARTIAL_AGGREGATION_THROUGH_JOIN, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static class NetworkTopologyType
}

private int minCandidates = 10;
private boolean includeCoordinator = true;
private boolean includeCoordinator;
private int maxSplitsPerNode = 100;
private int maxPendingSplitsPerTask = 10;
private int maxUnacknowledgedSplitsPerTask = 500;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE;
import static com.facebook.presto.util.Failures.checkCondition;
Expand Down Expand Up @@ -326,7 +327,58 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
return scheduler;
}
else {
if (!splitSources.isEmpty()) {
if (!splitSources.isEmpty() && (plan.getFragment().getPartitioning().equals(SINGLE_DISTRIBUTION))) {
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, null, nodePredicate);
List<InternalNode> nodes = nodeSelector.selectRandomNodes(1);
BucketNodeMap bucketNodeMap = new BucketNodeMap((split) -> 0) {
@Override
public int getBucketCount()
{
return 1;
}
@Override
public Optional<InternalNode> getAssignedNode(int bucketedId)
{
return Optional.of(nodes.get(0));
}
@Override
public boolean isBucketCacheable(int bucketedId)
{
return false;
}
@Override
public void assignOrUpdateBucketToNode(int bucketedId, InternalNode node, boolean cacheable)
{
}
@Override
public boolean isDynamic()
{
return true;
}
@Override
public boolean hasInitialMap()
{
return false;
}
@Override
public Optional<List<InternalNode>> getBucketToNode()
{
return Optional.of(nodes);
}
};
return new FixedSourcePartitionedScheduler(
stageExecution,
splitSources,
plan.getFragment().getStageExecutionDescriptor(),
plan.getFragment().getTableScanSchedulingOrder(),
nodes,
bucketNodeMap,
splitBatchSize,
getConcurrentLifespansPerNode(session),
nodeSelector,
ImmutableList.of(NOT_PARTITIONED));
}
else if (!splitSources.isEmpty()) {
// contains local source
List<PlanNodeId> schedulingOrder = plan.getFragment().getTableScanSchedulingOrder();
ConnectorId connectorId = partitioningHandle.getConnectorId().orElseThrow(IllegalStateException::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public class FeaturesConfig
private boolean pushRemoteExchangeThroughGroupId;
private boolean isOptimizeMultipleApproxPercentileOnSameFieldEnabled = true;
private boolean nativeExecutionEnabled;
private boolean nativeSingleWorkerExecution = true;
private boolean disableTimeStampWithTimeZoneForNative;
private boolean disableIPAddressForNative;
private String nativeExecutionExecutablePath = "./presto_server";
Expand Down Expand Up @@ -2251,6 +2252,18 @@ public boolean isNativeExecutionEnabled()
return this.nativeExecutionEnabled;
}

public boolean isNativeSingleWorkerExecution()
{
return nativeSingleWorkerExecution;
}

@Config("native-single-worker-execution")
@ConfigDescription("Enable execution on native engine for single worker")
public void setNativeSingleWorkerExecution(boolean nativeSingleWorkerExecution)
{
this.nativeSingleWorkerExecution = nativeSingleWorkerExecution;
}

@Config("disable-timestamp-with-timezone-for-native-execution")
@ConfigDescription("Disable timestamp with timezone type on native engine")
public FeaturesConfig setDisableTimeStampWithTimeZoneForNative(boolean disableTimeStampWithTimeZoneForNative)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import java.util.Set;

import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isNativeExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isNativeSingleWorkerExecution;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables;
import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames;
Expand Down Expand Up @@ -174,6 +176,10 @@ public PlanNode visitOutput(OutputNode node, RewriteContext<FragmentProperties>
context.get().setSingleNodeDistribution();
}

if (isNativeExecutionEnabled(session) && isNativeSingleWorkerExecution(session)) {
context.get().setSingleNodeDistribution();
}

return context.defaultRewrite(node, context.get());
}

Expand Down Expand Up @@ -268,6 +274,10 @@ public PlanNode visitValues(ValuesNode node, RewriteContext<FragmentProperties>
@Override
public PlanNode visitExchange(ExchangeNode exchange, RewriteContext<FragmentProperties> context)
{
if (isNativeExecutionEnabled(session) && isNativeSingleWorkerExecution(session)) {
context.get().setSingleNodeDistribution();
}

switch (exchange.getScope()) {
case LOCAL:
return context.defaultRewrite(exchange, context.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import static com.facebook.presto.SystemSessionProperties.getExchangeMaterializationStrategy;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount;
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isNativeExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isNativeSingleWorkerExecution;
import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES;
import static com.facebook.presto.spi.StandardWarningCode.TOO_MANY_STAGES;
Expand Down Expand Up @@ -94,7 +96,7 @@ public static SubPlan finalizeSubPlan(
PartitioningHandle partitioningHandle)
{
subPlan = reassignPartitioningHandleIfNecessary(metadata, session, subPlan, partitioningHandle);
if (!forceSingleNode) {
if (!forceSingleNode && !(isNativeExecutionEnabled(session) && isNativeSingleWorkerExecution(session))) {
// grouped execution is not supported for SINGLE_DISTRIBUTION
subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToDistinctInnerJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedLateralToJoin;
import com.facebook.presto.sql.planner.optimizations.AddExchangeForNativeSingleWorker;
import com.facebook.presto.sql.planner.optimizations.AddExchanges;
import com.facebook.presto.sql.planner.optimizations.AddLocalExchanges;
import com.facebook.presto.sql.planner.optimizations.ApplyConnectorOptimization;
Expand Down Expand Up @@ -813,7 +814,11 @@ public PlanOptimizers(
costCalculator,
ImmutableSet.of(new ScaledWriterRule())));

if (!forceSingleNode) {
if (featuresConfig.isNativeExecutionEnabled() && featuresConfig.isNativeSingleWorkerExecution()) {
// For native single worker execution
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchangeForNativeSingleWorker(metadata, partitioningProviderManager)));
}
else if (!forceSingleNode) {
builder.add(new ReplicateSemiJoinInDelete()); // Must run before AddExchanges
builder.add(new IterativeOptimizer(
metadata,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.optimizations;

import com.facebook.presto.Session;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.MergeJoinNode;
import com.facebook.presto.spi.plan.Partitioning;
import com.facebook.presto.spi.plan.PartitioningScheme;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.TableFinishNode;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.plan.ChildReplacer;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.google.common.collect.ImmutableList;

import java.util.Optional;

import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.ensureSourceOrderingGatheringExchange;
import static java.util.Objects.requireNonNull;

public class AddExchangeForNativeSingleWorker
implements PlanOptimizer
{
private final Metadata metadata;
private final PartitioningProviderManager partitioningProviderManager;

public AddExchangeForNativeSingleWorker(Metadata metadata, PartitioningProviderManager partitioningProviderManager)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.partitioningProviderManager = requireNonNull(partitioningProviderManager, "partitioningProviderManager is null");
}

@Override
public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
AddExchangeForNativeSingleWorker.Rewriter rewriter = new AddExchangeForNativeSingleWorker.Rewriter(variableAllocator, idAllocator, metadata, session);
PlanNode rewrittenPlan = SimplePlanRewriter.rewriteWith(rewriter, plan, null);
return PlanOptimizerResult.optimizerResult(rewrittenPlan, rewriter.isPlanChanged());
}

private class Rewriter
extends SimplePlanRewriter<Void>
{
private final PlanNodeIdAllocator idAllocator;
private final Session session;
private boolean planChanged;

public Rewriter(VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, Metadata metadata, Session session)
{
this.idAllocator = idAllocator;
this.session = session;
}

public boolean isPlanChanged()
{
return planChanged;
}

@Override
public PlanNode visitTableFinish(TableFinishNode node, RewriteContext<Void> context)
{
PlanNode child = node.getSource();

ExchangeNode gather;
// in case the input is a union (see PushTableWriteThroughUnion), don't add another exchange
if (child instanceof ExchangeNode) {
ExchangeNode exchangeNode = (ExchangeNode) child;
gather = new ExchangeNode(
exchangeNode.getSourceLocation(),
idAllocator.getNextId(),
GATHER,
REMOTE_STREAMING,
new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), exchangeNode.getOutputVariables()),
exchangeNode.getSources(),
exchangeNode.getInputs(),
true,
Optional.empty());
}
else {
gather = ensureSourceOrderingGatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, child);
}
planChanged = true;
return ChildReplacer.replaceChildren(node, ImmutableList.of(gather));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.List;

import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isNativeExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isNativeSingleWorkerExecution;
import static com.facebook.presto.SystemSessionProperties.preferMergeJoinForSortedInputs;
import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_FIRST;
import static com.facebook.presto.spi.plan.JoinType.INNER;
Expand All @@ -55,7 +57,7 @@ public void setEnabledForTesting(boolean isSet)
@Override
public boolean isEnabled(Session session)
{
return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session);
return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session) && !(isNativeExecutionEnabled(session) && isNativeSingleWorkerExecution(session));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class PlanChecker
@Inject
public PlanChecker(FeaturesConfig featuresConfig, PlanCheckerProviderManager planCheckerProviderManager)
{
this(featuresConfig, false, planCheckerProviderManager);
this(featuresConfig, featuresConfig.isNativeExecutionEnabled() && featuresConfig.isNativeSingleWorkerExecution(), planCheckerProviderManager);
}

public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode, PlanCheckerProviderManager planCheckerProviderManager)
Expand Down
Loading
Loading