diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java new file mode 100644 index 0000000000000..c649e5e530650 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestDistributedQueriesSingleNode.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestDistributedQueries; +import com.google.common.collect.ImmutableMap; + +import java.util.Optional; + +import static io.airlift.tpch.TpchTable.getTables; + +public class TestDistributedQueriesSingleNode + extends AbstractTestDistributedQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + ImmutableMap.Builder coordinatorProperties = ImmutableMap.builder(); + coordinatorProperties.put("single-worker-execution-enabled", "true"); + return HiveQueryRunner.createQueryRunner( + getTables(), + ImmutableMap.of(), + coordinatorProperties.build(), + Optional.empty()); + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTestSingleWorker.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTestSingleWorker.java new file mode 100644 index 0000000000000..4d66e819af275 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTestSingleWorker.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.Optional; + +import static io.airlift.tpch.TpchTable.CUSTOMER; +import static io.airlift.tpch.TpchTable.LINE_ITEM; +import static io.airlift.tpch.TpchTable.NATION; +import static io.airlift.tpch.TpchTable.ORDERS; +import static io.airlift.tpch.TpchTable.PART_SUPPLIER; + +public class TestHiveIntegrationSmokeTestSingleWorker + extends TestHiveIntegrationSmokeTest +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + ImmutableMap.Builder coordinatorProperties = ImmutableMap.builder(); + coordinatorProperties.put("single-worker-execution-enabled", "true"); + return HiveQueryRunner.createQueryRunner( + ImmutableList.of(ORDERS, CUSTOMER, LINE_ITEM, PART_SUPPLIER, NATION), + ImmutableMap.of(), + coordinatorProperties.build(), + Optional.empty()); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index f66aeaf89a2ee..3b4c45fe131d9 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -324,6 +324,7 @@ public final class SystemSessionProperties public static final String OPTIMIZER_USE_HISTOGRAMS = "optimizer_use_histograms"; public static final String WARN_ON_COMMON_NAN_PATTERNS = "warn_on_common_nan_patterns"; public static final String INLINE_PROJECTIONS_ON_VALUES = "inline_projections_on_values"; + public static final String SINGLE_WORKER_EXECUTION_ENABLED = "single_worker_execution_enabled"; // TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future. public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all"; @@ -1824,6 +1825,11 @@ public SystemSessionProperties( NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, "Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only", queryManagerConfig.getMinColumnarEncodingChannelsToPreferRowWiseEncoding(), + false), + booleanProperty( + SINGLE_WORKER_EXECUTION_ENABLED, + "Enable single worker execution", + featuresConfig.isSingleWorkerExecutionEnabled(), false)); } @@ -2271,6 +2277,11 @@ public static boolean isNativeExecutionEnabled(Session session) return session.getSystemProperty(NATIVE_EXECUTION_ENABLED, Boolean.class); } + public static boolean isSingleWorkerExecutionEnabled(Session session) + { + return session.getSystemProperty(SINGLE_WORKER_EXECUTION_ENABLED, Boolean.class); + } + public static boolean isPushAggregationThroughJoin(Session session) { return session.getSystemProperty(PUSH_PARTIAL_AGGREGATION_THROUGH_JOIN, Boolean.class); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java index 3f885239d5d6d..9eee31487c64f 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java @@ -79,6 +79,7 @@ import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE; import static com.facebook.presto.util.Failures.checkCondition; @@ -326,7 +327,58 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) { return scheduler; } else { - if (!splitSources.isEmpty()) { + if (!splitSources.isEmpty() && (plan.getFragment().getPartitioning().equals(SINGLE_DISTRIBUTION))) { + NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, null, nodePredicate); + List nodes = nodeSelector.selectRandomNodes(1); + BucketNodeMap bucketNodeMap = new BucketNodeMap((split) -> 0) { + @Override + public int getBucketCount() + { + return 1; + } + @Override + public Optional getAssignedNode(int bucketedId) + { + return Optional.of(nodes.get(0)); + } + @Override + public boolean isBucketCacheable(int bucketedId) + { + return false; + } + @Override + public void assignOrUpdateBucketToNode(int bucketedId, InternalNode node, boolean cacheable) + { + } + @Override + public boolean isDynamic() + { + return true; + } + @Override + public boolean hasInitialMap() + { + return false; + } + @Override + public Optional> getBucketToNode() + { + return Optional.of(nodes); + } + }; + return new FixedSourcePartitionedScheduler( + stageExecution, + splitSources, + plan.getFragment().getStageExecutionDescriptor(), + plan.getFragment().getTableScanSchedulingOrder(), + nodes, + bucketNodeMap, + splitBatchSize, + getConcurrentLifespansPerNode(session), + nodeSelector, + ImmutableList.of(NOT_PARTITIONED)); + } + else if (!splitSources.isEmpty()) { // contains local source List schedulingOrder = plan.getFragment().getTableScanSchedulingOrder(); ConnectorId connectorId = partitioningHandle.getConnectorId().orElseThrow(IllegalStateException::new); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 5c2040e6ade8a..aeda2ebb3ebda 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -288,6 +288,7 @@ public class FeaturesConfig private int eagerPlanValidationThreadPoolSize = 20; private boolean prestoSparkExecutionEnvironment; + private boolean singleWorkerExecutionEnabled; public enum PartitioningPrecisionStrategy { @@ -2847,4 +2848,16 @@ public FeaturesConfig setPrestoSparkExecutionEnvironment(boolean prestoSparkExec this.prestoSparkExecutionEnvironment = prestoSparkExecutionEnvironment; return this; } + + public boolean isSingleWorkerExecutionEnabled() + { + return singleWorkerExecutionEnabled; + } + + @Config("single-worker-execution-enabled") + @ConfigDescription("Enable single worker execution") + public void setSingleWorkerExecutionEnabled(boolean singleWorkerExecutionEnabled) + { + this.singleWorkerExecutionEnabled = singleWorkerExecutionEnabled; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index eba7eeb25efd6..54232287096c9 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -61,6 +61,7 @@ import java.util.Set; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; +import static com.facebook.presto.SystemSessionProperties.isSingleWorkerExecutionEnabled; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables; import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames; @@ -174,6 +175,10 @@ public PlanNode visitOutput(OutputNode node, RewriteContext context.get().setSingleNodeDistribution(); } + if (isSingleWorkerExecutionEnabled(session)) { + context.get().setSingleNodeDistribution(); + } + return context.defaultRewrite(node, context.get()); } @@ -268,6 +273,10 @@ public PlanNode visitValues(ValuesNode node, RewriteContext @Override public PlanNode visitExchange(ExchangeNode exchange, RewriteContext context) { + if (isSingleWorkerExecutionEnabled(session)) { + context.get().setSingleNodeDistribution(); + } + switch (exchange.getScope()) { case LOCAL: return context.defaultRewrite(exchange, context.get()); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java index d93be0f969d4d..22d0a647a0cab 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java @@ -47,6 +47,7 @@ import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled; +import static com.facebook.presto.SystemSessionProperties.isSingleWorkerExecutionEnabled; import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES; import static com.facebook.presto.spi.StandardWarningCode.TOO_MANY_STAGES; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; @@ -94,8 +95,8 @@ public static SubPlan finalizeSubPlan( PartitioningHandle partitioningHandle) { subPlan = reassignPartitioningHandleIfNecessary(metadata, session, subPlan, partitioningHandle); - if (!forceSingleNode) { - // grouped execution is not supported for SINGLE_DISTRIBUTION + if (!forceSingleNode && !isSingleWorkerExecutionEnabled(session)) { + // grouped execution is not supported for SINGLE_DISTRIBUTION or SINGLE_WORKER_EXECUTION_ENABLED subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java index fd4dec41e7777..f4eed3255374a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java @@ -144,6 +144,7 @@ import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToDistinctInnerJoin; import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin; import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedLateralToJoin; +import com.facebook.presto.sql.planner.optimizations.AddExchangeForSingleWorkerExecution; import com.facebook.presto.sql.planner.optimizations.AddExchanges; import com.facebook.presto.sql.planner.optimizations.AddLocalExchanges; import com.facebook.presto.sql.planner.optimizations.ApplyConnectorOptimization; @@ -848,6 +849,7 @@ public PlanOptimizers( builder.add(new CteProjectionAndPredicatePushDown(metadata)); // must run before PhysicalCteOptimizer builder.add(new PhysicalCteOptimizer(metadata)); // Must run before AddExchanges builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchanges(metadata, partitioningProviderManager, featuresConfig.isNativeExecutionEnabled()))); + builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new AddExchangeForSingleWorkerExecution(metadata))); } //noinspection UnusedAssignment diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangeForSingleWorkerExecution.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangeForSingleWorkerExecution.java new file mode 100644 index 0000000000000..c52f66bb79f9b --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchangeForSingleWorkerExecution.java @@ -0,0 +1,160 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.optimizations; + +import com.facebook.presto.Session; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.VariableAllocator; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.Partitioning; +import com.facebook.presto.spi.plan.PartitioningScheme; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.TableFinishNode; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.relation.ConstantExpression; +import com.facebook.presto.sql.planner.TypeProvider; +import com.facebook.presto.sql.planner.plan.ChildReplacer; +import com.facebook.presto.sql.planner.plan.ExchangeNode; +import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode; +import com.facebook.presto.sql.planner.plan.InternalPlanNode; +import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; +import com.facebook.presto.sql.planner.plan.StatisticsWriterNode; +import com.google.common.collect.ImmutableList; + +import java.util.Optional; + +import static com.facebook.presto.SystemSessionProperties.isSingleWorkerExecutionEnabled; +import static com.facebook.presto.common.type.BooleanType.BOOLEAN; +import static com.facebook.presto.sql.planner.PlannerUtils.containsSystemTableScan; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; +import static com.facebook.presto.sql.planner.iterative.rule.PickTableLayout.pushPredicateIntoTableScan; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.ensureSourceOrderingGatheringExchange; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.gatheringExchange; +import static java.util.Objects.requireNonNull; + +public class AddExchangeForSingleWorkerExecution + implements PlanOptimizer +{ + private final Metadata metadata; + + public AddExchangeForSingleWorkerExecution(Metadata metadata) + { + this.metadata = requireNonNull(metadata, "metadata is null"); + } + + @Override + public boolean isEnabled(Session session) + { + return isSingleWorkerExecutionEnabled(session); + } + + @Override + public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) + { + if (isEnabled(session)) { + AddExchangeForSingleWorkerExecution.Rewriter rewriter = new AddExchangeForSingleWorkerExecution.Rewriter(idAllocator, metadata, session); + PlanNode rewrittenPlan = SimplePlanRewriter.rewriteWith(rewriter, plan, null); + return PlanOptimizerResult.optimizerResult(rewrittenPlan, rewriter.isPlanChanged()); + } + return PlanOptimizerResult.optimizerResult(plan, false); + } + + private class Rewriter + extends SimplePlanRewriter + { + private final PlanNodeIdAllocator idAllocator; + private final Metadata metadata; + private final Session session; + private boolean planChanged; + + private Rewriter(PlanNodeIdAllocator idAllocator, Metadata metadata, Session session) + { + this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + this.session = requireNonNull(session, "session is null"); + } + + public boolean isPlanChanged() + { + return planChanged; + } + + @Override + public PlanNode visitTableScan(TableScanNode node, RewriteContext context) + { + PlanNode plan = pushPredicateIntoTableScan(node, new ConstantExpression(true, BOOLEAN), true, session, idAllocator, metadata); + // Presto Java and Presto Native use different hash functions for partitioning + // An additional exchange makes sure the data flows through a native worker in case it need to be partitioned for downstream processing + if (containsSystemTableScan(plan)) { + plan = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, plan); + } + return plan; + } + + @Override + public PlanNode visitExplainAnalyze(ExplainAnalyzeNode node, RewriteContext context) + { + return addGatherExchange(node); + } + + @Override + public PlanNode visitTableFinish(TableFinishNode node, RewriteContext context) + { + PlanNode child = node.getSource(); + + ExchangeNode gather; + // In case the input is a union (see PushTableWriteThroughUnion), don't add another exchange. Instead, convert it to gather exchange + if (child instanceof ExchangeNode) { + ExchangeNode exchangeNode = (ExchangeNode) child; + gather = new ExchangeNode( + exchangeNode.getSourceLocation(), + idAllocator.getNextId(), + GATHER, + REMOTE_STREAMING, + new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), exchangeNode.getOutputVariables()), + exchangeNode.getSources(), + exchangeNode.getInputs(), + true, + Optional.empty()); + } + else { + gather = ensureSourceOrderingGatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, child); + } + planChanged = true; + return ChildReplacer.replaceChildren(node, ImmutableList.of(gather)); + } + + @Override + public PlanNode visitStatisticsWriterNode(StatisticsWriterNode node, RewriteContext context) + { + return addGatherExchange(node); + } + + private PlanNode addGatherExchange(InternalPlanNode node) + { + PlanNode child = node.getSources().get(0); + + // if the child is already a gathering exchange + if ((child instanceof ExchangeNode) && ((ExchangeNode) child).getType() == ExchangeNode.Type.GATHER) { + return node; + } + + ExchangeNode gather = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, child); + return ChildReplacer.replaceChildren(node, ImmutableList.of(gather)); + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 907a83ccb3077..3bdab0f4d2fdf 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -116,6 +116,7 @@ import static com.facebook.presto.SystemSessionProperties.isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled; import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites; import static com.facebook.presto.SystemSessionProperties.isScaleWriters; +import static com.facebook.presto.SystemSessionProperties.isSingleWorkerExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isUseStreamingExchangeForMarkDistinctEnabled; import static com.facebook.presto.SystemSessionProperties.preferStreamingOperators; import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; @@ -170,12 +171,21 @@ public AddExchanges(Metadata metadata, PartitioningProviderManager partitioningP this.nativeExecution = nativeExecution; } + @Override + public boolean isEnabled(Session session) + { + return !isSingleWorkerExecutionEnabled(session); + } + @Override public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) { - PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager, nativeExecution).accept(plan, PreferredProperties.any()); - boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent(); - return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered); + if (isEnabled(session)) { + PlanWithProperties result = new Rewriter(idAllocator, variableAllocator, session, partitioningProviderManager, nativeExecution).accept(plan, PreferredProperties.any()); + boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isRemote()).findFirst().isPresent(); + return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered); + } + return PlanOptimizerResult.optimizerResult(plan, false); } private class Rewriter diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java index 38c29689e46f2..ee98e2f5d1798 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java @@ -15,6 +15,7 @@ import com.facebook.presto.Session; import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.VariableAllocator; import com.facebook.presto.spi.WarningCollector; import com.facebook.presto.spi.plan.EquiJoinClause; @@ -29,8 +30,10 @@ import java.util.List; import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled; +import static com.facebook.presto.SystemSessionProperties.isSingleWorkerExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.preferMergeJoinForSortedInputs; import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_FIRST; +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.plan.JoinType.INNER; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; @@ -55,6 +58,9 @@ public void setEnabledForTesting(boolean isSet) @Override public boolean isEnabled(Session session) { + if (!isSingleWorkerExecutionEnabled(session)) { + throw new PrestoException(NOT_SUPPORTED, "Merge Join is not supported when single worker execution is enabled"); + } return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java index debed00e5050f..fb9a50ac7d9ba 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Optional; +import static com.facebook.presto.SystemSessionProperties.isSingleWorkerExecutionEnabled; import static com.facebook.presto.spi.plan.AggregationNode.Step.FINAL; import static com.facebook.presto.spi.plan.AggregationNode.Step.INTERMEDIATE; import static com.facebook.presto.spi.plan.AggregationNode.Step.PARTIAL; @@ -114,7 +115,7 @@ public Optional visitAggregation(AggregationNode node, Void conte // No remote repartition exchange between final and partial aggregation. // Make sure that final aggregation operators are executed on a single node. ActualProperties globalProperties = PropertyDerivations.derivePropertiesRecursively(node, metadata, session); - checkArgument(forceSingleNode || globalProperties.isSingleNode(), + checkArgument(isSingleWorkerExecutionEnabled(session) || forceSingleNode || globalProperties.isSingleNode(), "Final aggregation with default value not separated from partial aggregation by remote hash exchange"); if (!seenExchanges.localRepartitionExchange) { diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestWriter.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestWriter.java new file mode 100644 index 0000000000000..72d545dcff976 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestWriter.java @@ -0,0 +1,435 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.Session; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableList; +import org.testng.annotations.Test; + +import java.util.UUID; + +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createBucketedCustomer; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createBucketedLineitemAndOrders; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createCustomer; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createEmptyTable; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createLineitem; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createNation; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrders; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrdersEx; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createPart; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createPartitionedNation; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createPrestoBenchTables; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createRegion; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createSupplier; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; + +public abstract class AbstractTestWriter + extends AbstractTestQueryFramework +{ + private static final String[] TABLE_FORMATS = {"DWRF"}; + @Override + protected void createTables() + { + QueryRunner queryRunner = (QueryRunner) getExpectedQueryRunner(); + createLineitem(queryRunner); + createCustomer(queryRunner); + createOrders(queryRunner); + createOrdersEx(queryRunner); + createNation(queryRunner); + createPartitionedNation(queryRunner); + createSupplier(queryRunner); + createBucketedCustomer(queryRunner); + createPart(queryRunner); + createRegion(queryRunner); + createEmptyTable(queryRunner); + createBucketedLineitemAndOrders(queryRunner); + createPrestoBenchTables(queryRunner); + } + + @Test + public void testCreateTableWithUnsupportedFormats() + { + // Generate temporary table name. + String tmpTableName = generateRandomTableName(); + String[] unsupportedTableFormats = {"ORC", "JSON"}; + for (String unsupportedTableFormat : unsupportedTableFormats) { + assertQueryFails(String.format("CREATE TABLE %s WITH (format = '" + unsupportedTableFormat + "') AS SELECT * FROM nation", tmpTableName), " Unsupported file format in TableWrite: \"" + unsupportedTableFormat + "\"."); + } + } + + @Test + public void testTableFormats() + { + Session session = Session.builder(getSession()) + .setSystemProperty("scale_writers", "true") + .setSystemProperty("task_writer_count", "1") + .setSystemProperty("task_partitioned_writer_count", "2") + .setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true") + .setCatalogSessionProperty("hive", "orc_compression_codec", "ZSTD") + .setCatalogSessionProperty("hive", "compression_codec", "NONE") + .setCatalogSessionProperty("hive", "hive_storage_format", "PARQUET") + .setCatalogSessionProperty("hive", "respect_table_format", "false").build(); + String tmpTableName = generateRandomTableName(); + + for (String tableFormat : TABLE_FORMATS) { + try { + getQueryRunner().execute(session, String.format("CREATE TABLE %s (name VARCHAR, regionkey BIGINT, nationkey BIGINT) WITH (format = '%s', partitioned_by = ARRAY['regionkey','nationkey'])", tmpTableName, tableFormat)); + // With different storage_format for partition than table format + getQueryRunner().execute(session, String.format("INSERT INTO %s SELECT name, regionkey, nationkey FROM nation", tmpTableName)); + // TODO add support for presto to query each partition's format then verify written format is correct + } + finally { + dropTableIfExists(tmpTableName); + } + } + } + + @Test + public void testCreateUnpartitionedTableAsSelect() + { + // Generate temporary table name. + String tmpTableName = generateRandomTableName(); + Session writeSession = buildSessionForTableWrite(); + + try { + getQueryRunner().execute(writeSession, String.format("CREATE TABLE %s (name VARCHAR, regionkey BIGINT, nationkey BIGINT) WITH (partitioned_by = ARRAY['regionkey','nationkey'])", tmpTableName)); + // Test insert into an empty table. + getQueryRunner().execute(writeSession, String.format("INSERT INTO %s SELECT name, regionkey, nationkey FROM nation", tmpTableName)); + assertQuery(String.format("SELECT * FROM %s", tmpTableName), "SELECT name, regionkey, nationkey FROM nation"); + + // Test failure on insert into existing partitions. + assertQueryFails(writeSession, String.format("INSERT INTO %s SELECT name, regionkey, nationkey FROM nation", tmpTableName), + ".*Cannot insert into an existing partition of Hive table: regionkey=.*/nationkey=.*"); + + // Test insert into existing partitions if insert_existing_partitions_behavior is set to OVERWRITE. + Session overwriteSession = Session.builder(writeSession) + .setCatalogSessionProperty("hive", "insert_existing_partitions_behavior", "OVERWRITE") + .build(); + getQueryRunner().execute(overwriteSession, String.format("INSERT INTO %s SELECT CONCAT(name, '.test'), regionkey, nationkey FROM nation", tmpTableName)); + assertQuery(String.format("SELECT * FROM %s", tmpTableName), "SELECT CONCAT(name, '.test'), regionkey, nationkey FROM nation"); + } + finally { + dropTableIfExists(tmpTableName); + } + } + + @Test + public void testCreatePartitionedTableAsSelect() + { + { + Session session = buildSessionForTableWrite(); + // Generate temporary table name for created partitioned table. + String partitionedOrdersTableName = generateRandomTableName(); + + for (String tableFormat : TABLE_FORMATS) { + try { + getQueryRunner().execute(session, String.format( + "CREATE TABLE %s WITH (format = '" + tableFormat + "', " + + "partitioned_by = ARRAY[ 'orderstatus' ]) " + + "AS SELECT custkey, comment, orderstatus FROM orders", partitionedOrdersTableName)); + assertQuery(String.format("SELECT * FROM %s", partitionedOrdersTableName), "SELECT custkey, comment, orderstatus FROM orders"); + } + finally { + dropTableIfExists(partitionedOrdersTableName); + } + } + } + } + + @Test + public void testInsertIntoPartitionedTable() + { + // Generate temporary table name. + String tmpTableName = generateRandomTableName(); + Session writeSession = buildSessionForTableWrite(); + + try { + getQueryRunner().execute(writeSession, String.format("CREATE TABLE %s (name VARCHAR, regionkey BIGINT, nationkey BIGINT) WITH (partitioned_by = ARRAY['regionkey','nationkey'])", tmpTableName)); + // Test insert into an empty table. + getQueryRunner().execute(writeSession, String.format("INSERT INTO %s SELECT name, regionkey, nationkey FROM nation", tmpTableName)); + assertQuery(String.format("SELECT * FROM %s", tmpTableName), "SELECT name, regionkey, nationkey FROM nation"); + + // Test failure on insert into existing partitions. + assertQueryFails(writeSession, String.format("INSERT INTO %s SELECT name, regionkey, nationkey FROM nation", tmpTableName), + ".*Cannot insert into an existing partition of Hive table: regionkey=.*/nationkey=.*"); + + // Test insert into existing partitions if insert_existing_partitions_behavior is set to OVERWRITE. + Session overwriteSession = Session.builder(writeSession) + .setCatalogSessionProperty("hive", "insert_existing_partitions_behavior", "OVERWRITE") + .build(); + getQueryRunner().execute(overwriteSession, String.format("INSERT INTO %s SELECT CONCAT(name, '.test'), regionkey, nationkey FROM nation", tmpTableName)); + assertQuery(String.format("SELECT * FROM %s", tmpTableName), "SELECT CONCAT(name, '.test'), regionkey, nationkey FROM nation"); + } + finally { + dropTableIfExists(tmpTableName); + } + } + + @Test + public void testInsertIntoSpecialPartitionName() + { + Session writeSession = buildSessionForTableWrite(); + // Generate temporary table name. + String tmpTableName = generateRandomTableName(); + try { + getQueryRunner().execute(writeSession, String.format("CREATE TABLE %s (name VARCHAR, nationkey VARCHAR) WITH (partitioned_by = ARRAY['nationkey'])", tmpTableName)); + + // For special character in partition name, without correct handling, it would throw errors like 'Invalid partition spec: nationkey=A/B' + // In this test, verify those partition names can be successfully created + String[] specialCharacters = {"\"", "#", "%", "''", "*", "/", ":", "=", "?", "\\", "\\x7F", "{", "[", "]", "^"}; // escape single quote for sql + for (String specialCharacter : specialCharacters) { + getQueryRunner().execute(writeSession, String.format("INSERT INTO %s VALUES ('name', 'A%sB')", tmpTableName, specialCharacter)); + assertQuery(String.format("SELECT nationkey FROM %s", tmpTableName), String.format("VALUES('A%sB')", specialCharacter)); + getQueryRunner().execute(writeSession, String.format("DELETE FROM %s", tmpTableName)); + } + } + finally { + dropTableIfExists(tmpTableName); + } + } + + @Test + public void testCreateBucketTableAsSelect() + { + Session session = buildSessionForTableWrite(); + // Generate temporary table name for bucketed table. + String bucketedOrdersTableName = generateRandomTableName(); + + for (String tableFormat : TABLE_FORMATS) { + try { + getQueryRunner().execute(session, String.format( + "CREATE TABLE %s WITH (format = '" + tableFormat + "', " + + "partitioned_by = ARRAY[ 'orderstatus' ], " + + "bucketed_by = ARRAY[ 'custkey' ], " + + "bucket_count = 2) " + + "AS SELECT custkey, comment, orderstatus FROM orders", bucketedOrdersTableName)); + assertQuery(String.format("SELECT * FROM %s", bucketedOrdersTableName), "SELECT custkey, comment, orderstatus FROM orders"); + assertQuery(String.format("SELECT * FROM %s where \"$bucket\" = 0", bucketedOrdersTableName), "SELECT custkey, comment, orderstatus FROM orders where custkey % 2 = 0"); + assertQuery(String.format("SELECT * FROM %s where \"$bucket\" = 1", bucketedOrdersTableName), "SELECT custkey, comment, orderstatus FROM orders where custkey % 2 = 1"); + } + finally { + dropTableIfExists(bucketedOrdersTableName); + } + } + } + + @Test + public void testCreateBucketSortedTableAsSelect() + { + Session session = buildSessionForTableWrite(); + // Generate temporary table name. + String badBucketTableName = generateRandomTableName(); + + for (String tableFormat : TABLE_FORMATS) { + try { + getQueryRunner().execute(session, String.format( + "CREATE TABLE %s WITH (format = '%s', " + + "partitioned_by = ARRAY[ 'orderstatus' ], " + + "bucketed_by=array['custkey'], " + + "bucket_count=1, " + + "sorted_by=array['orderkey']) " + + "AS SELECT custkey, orderkey, orderstatus FROM orders", badBucketTableName, tableFormat)); + assertQueryOrdered(String.format("SELECT custkey, orderkey, orderstatus FROM %s where orderstatus = '0'", badBucketTableName), "SELECT custkey, orderkey, orderstatus FROM orders where orderstatus = '0'"); + } + finally { + dropTableIfExists(badBucketTableName); + } + } + } + + @Test + public void testScaleWriters() + { + Session session = buildSessionForTableWrite(); + String tmpTableName = generateRandomTableName(); + getQueryRunner().execute(session, String.format( + "CREATE TABLE %s AS SELECT o_orderkey, o_custkey, o_orderstatus, o_totalprice, CAST(o_orderdate as VARCHAR) as o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment FROM tpchstandard.tiny.orders", tmpTableName)); + assertEquals(computeActual("SELECT count(DISTINCT \"$path\") FROM " + tmpTableName).getOnlyValue(), 1L); + dropTableIfExists(tmpTableName); + + tmpTableName = generateRandomTableName(); + getQueryRunner().execute(session, String.format( + "CREATE TABLE %s AS SELECT o_orderkey, o_custkey, o_orderstatus, o_totalprice, CAST(o_orderdate as VARCHAR) as o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment FROM tpchstandard.sf100.orders where o_orderdate > Date('1997-01-10')", tmpTableName)); + long files = (long) computeActual("SELECT count(DISTINCT \"$path\") FROM " + tmpTableName).getOnlyValue(); + long workers = (long) computeScalar("SELECT count(*) FROM system.runtime.nodes"); + assertThat(files).isBetween(2L, workers); + dropTableIfExists(tmpTableName); + } + + @Test + public void testCollectColumnStatisticsOnCreateTable() + { + Session session = buildSessionForTableWrite(); + String tmpTableName = generateRandomTableName(); + assertUpdate(session, format("" + + "CREATE TABLE %s " + + "WITH ( " + + " partitioned_by = ARRAY['p_varchar'] " + + ") " + + "AS " + + "SELECT c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_varbinary, c_array, p_varchar " + + "FROM ( " + + " VALUES " + + " (null, null, null, null, null, null, null, 'p1'), " + + " (null, null, null, null, null, null, null, 'p1'), " + + " (true, BIGINT '1', DOUBLE '2.2', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), to_ieee754_64(1), sequence(0, 10), 'p1')," + + " (false, BIGINT '0', DOUBLE '1.2', TIMESTAMP '2012-08-08 00:00', CAST('abc2' AS VARCHAR), to_ieee754_64(2), sequence(10, 20), 'p1')," + + " (null, null, null, null, null, null, null, 'p2'), " + + " (null, null, null, null, null, null, null, 'p2'), " + + " (true, BIGINT '2', DOUBLE '3.3', TIMESTAMP '2012-09-09 01:00', CAST('cba1' AS VARCHAR), to_ieee754_64(3), sequence(20, 25), 'p2'), " + + " (false, BIGINT '1', DOUBLE '2.3', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), to_ieee754_64(4), sequence(30, 35), 'p2') " + + ") AS x (c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_varbinary, c_array, p_varchar)", tmpTableName), 8); + + assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p1')", tmpTableName), + "SELECT * FROM (VALUES " + + "('c_boolean', null, 2.0E0, 0.5E0, null, null, null, null), " + + "('c_bigint', null, 2.0E0, 0.5E0, null, '0', '1', null), " + + "('c_double', null, 2.0E0, 0.5E0, null, '1.2', '2.2', null), " + + "('c_timestamp', null, 2.0E0, 0.5E0, null, null, null, null), " + + "('c_varchar', 16.0E0, 2.0E0, 0.5E0, null, null, null, null), " + // 8.0 + "('c_varbinary', 24.0, null, 0.5E0, null, null, null, null), " + + "('c_array', 184.0E0, null, 0.5, null, null, null, null), " + // 176 + "('p_varchar', 8.0E0, 1.0E0, 0.0E0, null, null, null, null), " + + "(null, null, null, null, 4.0E0, null, null, null)) AS x (column_name, data_size, distinct_values_count, nulls_fraction, row_count, low_value, high_value, histogram)"); + assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p2')", tmpTableName), + "SELECT * FROM (VALUES " + + "('c_boolean', null, 2.0E0, 0.5E0, null, null, null, null), " + + "('c_bigint', null, 2.0E0, 0.5E0, null, '1', '2', null), " + + "('c_double', null, 2.0E0, 0.5E0, null, '2.3', '3.3', null), " + + "('c_timestamp', null, 2.0E0, 0.5E0, null, null, null, null), " + + "('c_varchar', 16.0E0, 2.0E0, 0.5E0, null, null, null, null), " + // 8 + "('c_varbinary', 24.0, null, 0.5E0, null, null, null, null), " + + "('c_array', 104.0E0, null, 0.5, null, null, null, null), " + // 96 + "('p_varchar', 8.0E0, 1.0E0, 0.0E0, null, null, null, null), " + + "(null, null, null, null, 4.0E0, null, null, null)) AS x (column_name, data_size, distinct_values_count, nulls_fraction, row_count, low_value, high_value, histogram)"); + + // non existing partition + assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p3')", tmpTableName), + "SELECT * FROM (VALUES " + + "('c_boolean', null, 0E0, 0E0, null, null, null, null), " + + "('c_bigint', null, 0E0, 0E0, null, null, null, null), " + + "('c_double', null, 0E0, 0E0, null, null, null, null), " + + "('c_timestamp', null, 0E0, 0E0, null, null, null, null), " + + "('c_varchar', 0E0, 0E0, 0E0, null, null, null, null), " + + "('c_varbinary', null, 0E0, 0E0, null, null, null, null), " + + "('c_array', null, 0E0, 0E0, null, null, null, null), " + + "('p_varchar', 0E0, 0E0, 0E0, null, null, null, null), " + + "(null, null, null, null, 0E0, null, null, null)) AS x (column_name, data_size, distinct_values_count, nulls_fraction, row_count, low_value, high_value, histogram)"); + + dropTableIfExists(tmpTableName); + } + + @Test + public void testCollectColumnStatisticsOnInsert() + { + Session session = buildSessionForTableWrite(); + String tmpTableName = generateRandomTableName(); + assertUpdate(session, format("" + + "CREATE TABLE %s ( " + + " c_boolean BOOLEAN, " + + " c_bigint BIGINT, " + + " c_double DOUBLE, " + + " c_timestamp TIMESTAMP, " + + " c_varchar VARCHAR, " + + " c_varbinary VARBINARY, " + + " c_array ARRAY(BIGINT), " + + " p_varchar VARCHAR " + + ") " + + "WITH ( " + + " partitioned_by = ARRAY['p_varchar'] " + + ")", tmpTableName)); + + assertUpdate(format("" + + "INSERT INTO %s " + + "SELECT c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_varbinary, c_array, p_varchar " + + "FROM ( " + + " VALUES " + + " (null, null, null, null, null, null, null, 'p1'), " + + " (null, null, null, null, null, null, null, 'p1'), " + + " (true, BIGINT '1', DOUBLE '2.2', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), to_ieee754_64(1), sequence(0, 10), 'p1')," + + " (false, BIGINT '0', DOUBLE '1.2', TIMESTAMP '2012-08-08 00:00', CAST('abc2' AS VARCHAR), to_ieee754_64(2), sequence(10, 20), 'p1')," + + " (null, null, null, null, null, null, null, 'p2'), " + + " (null, null, null, null, null, null, null, 'p2'), " + + " (true, BIGINT '2', DOUBLE '3.3', TIMESTAMP '2012-09-09 01:00', CAST('cba1' AS VARCHAR), to_ieee754_64(3), sequence(20, 25), 'p2'), " + + " (false, BIGINT '1', DOUBLE '2.3', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), to_ieee754_64(4), sequence(30, 35), 'p2') " + + ") AS x (c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_varbinary, c_array, p_varchar)", tmpTableName), 8); + + assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p1')", tmpTableName), + "SELECT * FROM (VALUES " + + "('c_boolean', null, 2.0E0, 0.5E0, null, null, null, null), " + + "('c_bigint', null, 2.0E0, 0.5E0, null, '0', '1', null), " + + "('c_double', null, 2.0E0, 0.5E0, null, '1.2', '2.2', null), " + + "('c_timestamp', null, 2.0E0, 0.5E0, null, null, null, null), " + + "('c_varchar', 16.0E0, 2.0E0, 0.5E0, null, null, null, null), " + // 8 + "('c_varbinary', 24.0, null, 0.5E0, null, null, null, null), " + + "('c_array', 184.0E0, null, 0.5E0, null, null, null, null), " + // 176 + "('p_varchar', 8.0E0, 1.0E0, 0.0E0, null, null, null, null), " + + "(null, null, null, null, 4.0E0, null, null, null)) AS x (column_name, data_size, distinct_values_count, nulls_fraction, row_count, low_value, high_value, histogram)"); + assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p2')", tmpTableName), + "SELECT * FROM (VALUES " + + "('c_boolean', null, 2.0E0, 0.5E0, null, null, null, null), " + + "('c_bigint', null, 2.0E0, 0.5E0, null, '1', '2', null), " + + "('c_double', null, 2.0E0, 0.5E0, null, '2.3', '3.3', null), " + + "('c_timestamp', null, 2.0E0, 0.5E0, null, null, null, null), " + + "('c_varchar', 16.0E0, 2.0E0, 0.5E0, null, null, null, null), " + // 8 + "('c_varbinary', 24.0, null, 0.5E0, null, null, null, null), " + + "('c_array', 104.0E0, null, 0.5, null, null, null, null), " + // 96 + "('p_varchar', 8.0E0, 1.0E0, 0.0E0, null, null, null, null), " + + "(null, null, null, null, 4.0E0, null, null, null)) AS x (column_name, data_size, distinct_values_count, nulls_fraction, row_count, low_value, high_value, histogram)"); + + // non existing partition + assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p3')", tmpTableName), + "SELECT * FROM (VALUES " + + "('c_boolean', null, 0E0, 0E0, null, null, null, null), " + + "('c_bigint', null, 0E0, 0E0, null, null, null, null), " + + "('c_double', null, 0E0, 0E0, null, null, null, null), " + + "('c_timestamp', null, 0E0, 0E0, null, null, null, null), " + + "('c_varchar', 0E0, 0E0, 0E0, null, null, null, null), " + + "('c_varbinary', null, 0E0, 0E0, null, null, null, null), " + + "('c_array', null, 0E0, 0E0, null, null, null, null), " + + "('p_varchar', 0E0, 0E0, 0E0, null, null, null, null), " + + "(null, null, null, null, 0E0, null, null, null)) AS x (column_name, data_size, distinct_values_count, nulls_fraction, row_count, low_value, high_value, histogram)"); + + dropTableIfExists(tmpTableName); + } + + protected void dropTableIfExists(String tableName) + { + computeExpected(String.format("DROP TABLE IF EXISTS %s", tableName), ImmutableList.of(BIGINT)); + } + + protected String generateRandomTableName() + { + String tableName = "tmp_presto_" + UUID.randomUUID().toString().replace("-", ""); + // Clean up if the temporary named table already exists. + dropTableIfExists(tableName); + return tableName; + } + + protected Session buildSessionForTableWrite() + { + return Session.builder(getSession()) + .setSystemProperty("scale_writers", "true") + .setSystemProperty("task_writer_count", "1") + .setSystemProperty("task_partitioned_writer_count", "2") + .setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true") + .setCatalogSessionProperty("hive", "orc_compression_codec", "ZSTD") + .build(); + } +} \ No newline at end of file diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index bbdf7f067d2c9..6ea9880b15523 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -121,7 +121,7 @@ public static QueryRunner createQueryRunner( defaultQueryRunner.close(); - return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled); + return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled, false); } public static QueryRunner createJavaQueryRunner() @@ -325,7 +325,8 @@ public static QueryRunner createNativeQueryRunner( String storageFormat, boolean addStorageFormatToPath, Boolean failOnNestedLoopJoin, - boolean isCoordinatorSidecarEnabled) + boolean isCoordinatorSidecarEnabled, + boolean singleWorkerExecutionEnabled) throws Exception { // The property "hive.allow-drop-table" needs to be set to true because security is always "legacy" in NativeQueryRunner. @@ -336,6 +337,9 @@ public static QueryRunner createNativeQueryRunner( ImmutableMap.Builder coordinatorProperties = ImmutableMap.builder(); coordinatorProperties.put("native-execution-enabled", "true"); + if (singleWorkerExecutionEnabled) { + coordinatorProperties.put("single-worker-execution-enabled", "true"); + } // Make query runner with external workers for tests return HiveQueryRunner.createQueryRunner( @@ -401,7 +405,7 @@ public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String s public static QueryRunner createNativeQueryRunner(String remoteFunctionServerUds) throws Exception { - return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false, false); + return createNativeQueryRunner(false, DEFAULT_STORAGE_FORMAT, Optional.ofNullable(remoteFunctionServerUds), false, false, false); } public static QueryRunner createNativeQueryRunner(boolean useThrift) @@ -413,16 +417,22 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift) public static QueryRunner createNativeQueryRunner(boolean useThrift, boolean failOnNestedLoopJoin) throws Exception { - return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin, false); + return createNativeQueryRunner(useThrift, DEFAULT_STORAGE_FORMAT, Optional.empty(), failOnNestedLoopJoin, false, false); } public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat) throws Exception { - return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false, false); + return createNativeQueryRunner(useThrift, storageFormat, Optional.empty(), false, false, false); } - public static QueryRunner createNativeQueryRunner(boolean useThrift, String storageFormat, Optional remoteFunctionServerUds, Boolean failOnNestedLoopJoin, boolean isCoordinatorSidecarEnabled) + public static QueryRunner createNativeQueryRunner( + boolean useThrift, + String storageFormat, + Optional remoteFunctionServerUds, + Boolean failOnNestedLoopJoin, + boolean isCoordinatorSidecarEnabled, + boolean singleWorkerExecutionEnabled) throws Exception { int cacheMaxSize = 0; @@ -437,7 +447,8 @@ public static QueryRunner createNativeQueryRunner(boolean useThrift, String stor storageFormat, true, failOnNestedLoopJoin, - isCoordinatorSidecarEnabled); + isCoordinatorSidecarEnabled, + singleWorkerExecutionEnabled); } // Start the remote function server. Return the UDS path used to communicate with it. diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeJoinQueriesSingleWorker.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeJoinQueriesSingleWorker.java new file mode 100644 index 0000000000000..aa3a7ba85591c --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeJoinQueriesSingleWorker.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; + +import java.util.Optional; + +public class TestPrestoNativeJoinQueriesSingleWorker + extends AbstractTestNativeJoinQueries +{ + @Override + protected QueryRunner createQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner( + true, + "DWRF", + Optional.empty(), + false, + false, + true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaQueryRunner(); + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeRemoteFunctionsSingleWorker.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeRemoteFunctionsSingleWorker.java new file mode 100644 index 0000000000000..825220a670570 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeRemoteFunctionsSingleWorker.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; + +import java.util.Optional; + +public class TestPrestoNativeRemoteFunctionsSingleWorker + extends AbstractTestNativeRemoteFunctions +{ + @Override + protected QueryRunner createQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner( + true, + "DWRF", + Optional.empty(), + false, + false, + true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaQueryRunner(); + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleWorker.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleWorker.java new file mode 100644 index 0000000000000..5afb710c0d654 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeSystemQueriesSingleWorker.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; + +import java.util.Optional; + +public class TestPrestoNativeSystemQueriesSingleWorker + extends AbstractTestNativeSystemQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner( + true, + "DWRF", + Optional.empty(), + false, + false, + true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaQueryRunner(); + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpchQueriesDwrfUsingThriftSingleWorker.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpchQueriesDwrfUsingThriftSingleWorker.java new file mode 100644 index 0000000000000..6c1d0a1cb0b91 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpchQueriesDwrfUsingThriftSingleWorker.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; + +import java.util.Optional; + +public class TestPrestoNativeTpchQueriesDwrfUsingThriftSingleWorker + extends AbstractTestNativeTpchQueries +{ + @Override + protected QueryRunner createQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner( + true, + "DWRF", + Optional.empty(), + false, + false, + true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaQueryRunner("DWRF"); + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpchQueriesParquetUsingJSONSingleWorker.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpchQueriesParquetUsingJSONSingleWorker.java new file mode 100644 index 0000000000000..44382c0349fef --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeTpchQueriesParquetUsingJSONSingleWorker.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; +import org.testng.annotations.Test; + +import java.util.Optional; + +@Test(groups = {"parquet"}) +public class TestPrestoNativeTpchQueriesParquetUsingJSONSingleWorker + extends AbstractTestNativeTpchQueries +{ + @Override + protected QueryRunner createQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner( + false, + "PARQUET", + Optional.empty(), + false, + false, + true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaQueryRunner("PARQUET"); + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWindowQueriesSingleWorker.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWindowQueriesSingleWorker.java new file mode 100644 index 0000000000000..d0409323623b4 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWindowQueriesSingleWorker.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; + +import java.util.Optional; + +public class TestPrestoNativeWindowQueriesSingleWorker + extends AbstractTestNativeWindowQueries +{ + @Override + protected QueryRunner createQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner( + true, + "DWRF", + Optional.empty(), + false, + false, + true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaQueryRunner(); + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWriter.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWriter.java index 0e6ac20b8eb51..e7e9dabcc6efe 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWriter.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWriter.java @@ -13,38 +13,12 @@ */ package com.facebook.presto.nativeworker; -import com.facebook.presto.Session; import com.facebook.presto.testing.ExpectedQueryRunner; import com.facebook.presto.testing.QueryRunner; -import com.facebook.presto.tests.AbstractTestQueryFramework; -import com.google.common.collect.ImmutableList; -import org.testng.annotations.Test; - -import java.util.UUID; - -import static com.facebook.presto.common.type.BigintType.BIGINT; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createBucketedCustomer; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createBucketedLineitemAndOrders; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createCustomer; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createEmptyTable; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createLineitem; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createNation; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrders; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrdersEx; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createPart; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createPartitionedNation; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createPrestoBenchTables; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createRegion; -import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createSupplier; -import static java.lang.String.format; -import static org.assertj.core.api.Assertions.assertThat; -import static org.testng.Assert.assertEquals; public class TestPrestoNativeWriter - extends AbstractTestQueryFramework + extends AbstractTestWriter { - private static final String[] TABLE_FORMATS = {"DWRF"}; - @Override protected QueryRunner createQueryRunner() throws Exception { @@ -56,394 +30,4 @@ protected ExpectedQueryRunner createExpectedQueryRunner() throws Exception { return PrestoNativeQueryRunnerUtils.createJavaQueryRunner(); } - - @Override - protected void createTables() - { - QueryRunner queryRunner = (QueryRunner) getExpectedQueryRunner(); - createLineitem(queryRunner); - createCustomer(queryRunner); - createOrders(queryRunner); - createOrdersEx(queryRunner); - createNation(queryRunner); - createPartitionedNation(queryRunner); - createSupplier(queryRunner); - createBucketedCustomer(queryRunner); - createPart(queryRunner); - createRegion(queryRunner); - createEmptyTable(queryRunner); - createBucketedLineitemAndOrders(queryRunner); - createPrestoBenchTables(queryRunner); - } - - @Test - public void testCreateTableWithUnsupportedFormats() - { - // Generate temporary table name. - String tmpTableName = generateRandomTableName(); - String[] unsupportedTableFormats = {"ORC", "JSON"}; - for (String unsupportedTableFormat : unsupportedTableFormats) { - assertQueryFails(String.format("CREATE TABLE %s WITH (format = '" + unsupportedTableFormat + "') AS SELECT * FROM nation", tmpTableName), " Unsupported file format in TableWrite: \"" + unsupportedTableFormat + "\"."); - } - } - - @Test - public void testTableFormats() - { - Session session = Session.builder(getSession()) - .setSystemProperty("scale_writers", "true") - .setSystemProperty("task_writer_count", "1") - .setSystemProperty("task_partitioned_writer_count", "2") - .setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true") - .setCatalogSessionProperty("hive", "orc_compression_codec", "ZSTD") - .setCatalogSessionProperty("hive", "compression_codec", "NONE") - .setCatalogSessionProperty("hive", "hive_storage_format", "PARQUET") - .setCatalogSessionProperty("hive", "respect_table_format", "false").build(); - String tmpTableName = generateRandomTableName(); - - for (String tableFormat : TABLE_FORMATS) { - try { - getQueryRunner().execute(session, String.format("CREATE TABLE %s (name VARCHAR, regionkey BIGINT, nationkey BIGINT) WITH (format = '%s', partitioned_by = ARRAY['regionkey','nationkey'])", tmpTableName, tableFormat)); - // With different storage_format for partition than table format - getQueryRunner().execute(session, String.format("INSERT INTO %s SELECT name, regionkey, nationkey FROM nation", tmpTableName)); - // TODO add support for presto to query each partition's format then verify written format is correct - } - finally { - dropTableIfExists(tmpTableName); - } - } - } - - @Test - public void testCreateUnpartitionedTableAsSelect() - { - // Generate temporary table name. - String tmpTableName = generateRandomTableName(); - Session writeSession = buildSessionForTableWrite(); - - try { - getQueryRunner().execute(writeSession, String.format("CREATE TABLE %s (name VARCHAR, regionkey BIGINT, nationkey BIGINT) WITH (partitioned_by = ARRAY['regionkey','nationkey'])", tmpTableName)); - // Test insert into an empty table. - getQueryRunner().execute(writeSession, String.format("INSERT INTO %s SELECT name, regionkey, nationkey FROM nation", tmpTableName)); - assertQuery(String.format("SELECT * FROM %s", tmpTableName), "SELECT name, regionkey, nationkey FROM nation"); - - // Test failure on insert into existing partitions. - assertQueryFails(writeSession, String.format("INSERT INTO %s SELECT name, regionkey, nationkey FROM nation", tmpTableName), - ".*Cannot insert into an existing partition of Hive table: regionkey=.*/nationkey=.*"); - - // Test insert into existing partitions if insert_existing_partitions_behavior is set to OVERWRITE. - Session overwriteSession = Session.builder(writeSession) - .setCatalogSessionProperty("hive", "insert_existing_partitions_behavior", "OVERWRITE") - .build(); - getQueryRunner().execute(overwriteSession, String.format("INSERT INTO %s SELECT CONCAT(name, '.test'), regionkey, nationkey FROM nation", tmpTableName)); - assertQuery(String.format("SELECT * FROM %s", tmpTableName), "SELECT CONCAT(name, '.test'), regionkey, nationkey FROM nation"); - } - finally { - dropTableIfExists(tmpTableName); - } - } - - @Test - public void testCreatePartitionedTableAsSelect() - { - { - Session session = buildSessionForTableWrite(); - // Generate temporary table name for created partitioned table. - String partitionedOrdersTableName = generateRandomTableName(); - - for (String tableFormat : TABLE_FORMATS) { - try { - getQueryRunner().execute(session, String.format( - "CREATE TABLE %s WITH (format = '" + tableFormat + "', " + - "partitioned_by = ARRAY[ 'orderstatus' ]) " + - "AS SELECT custkey, comment, orderstatus FROM orders", partitionedOrdersTableName)); - assertQuery(String.format("SELECT * FROM %s", partitionedOrdersTableName), "SELECT custkey, comment, orderstatus FROM orders"); - } - finally { - dropTableIfExists(partitionedOrdersTableName); - } - } - } - } - - @Test - public void testInsertIntoPartitionedTable() - { - // Generate temporary table name. - String tmpTableName = generateRandomTableName(); - Session writeSession = buildSessionForTableWrite(); - - try { - getQueryRunner().execute(writeSession, String.format("CREATE TABLE %s (name VARCHAR, regionkey BIGINT, nationkey BIGINT) WITH (partitioned_by = ARRAY['regionkey','nationkey'])", tmpTableName)); - // Test insert into an empty table. - getQueryRunner().execute(writeSession, String.format("INSERT INTO %s SELECT name, regionkey, nationkey FROM nation", tmpTableName)); - assertQuery(String.format("SELECT * FROM %s", tmpTableName), "SELECT name, regionkey, nationkey FROM nation"); - - // Test failure on insert into existing partitions. - assertQueryFails(writeSession, String.format("INSERT INTO %s SELECT name, regionkey, nationkey FROM nation", tmpTableName), - ".*Cannot insert into an existing partition of Hive table: regionkey=.*/nationkey=.*"); - - // Test insert into existing partitions if insert_existing_partitions_behavior is set to OVERWRITE. - Session overwriteSession = Session.builder(writeSession) - .setCatalogSessionProperty("hive", "insert_existing_partitions_behavior", "OVERWRITE") - .build(); - getQueryRunner().execute(overwriteSession, String.format("INSERT INTO %s SELECT CONCAT(name, '.test'), regionkey, nationkey FROM nation", tmpTableName)); - assertQuery(String.format("SELECT * FROM %s", tmpTableName), "SELECT CONCAT(name, '.test'), regionkey, nationkey FROM nation"); - } - finally { - dropTableIfExists(tmpTableName); - } - } - - @Test - public void testInsertIntoSpecialPartitionName() - { - Session writeSession = buildSessionForTableWrite(); - // Generate temporary table name. - String tmpTableName = generateRandomTableName(); - try { - getQueryRunner().execute(writeSession, String.format("CREATE TABLE %s (name VARCHAR, nationkey VARCHAR) WITH (partitioned_by = ARRAY['nationkey'])", tmpTableName)); - - // For special character in partition name, without correct handling, it would throw errors like 'Invalid partition spec: nationkey=A/B' - // In this test, verify those partition names can be successfully created - String[] specialCharacters = {"\"", "#", "%", "''", "*", "/", ":", "=", "?", "\\", "\\x7F", "{", "[", "]", "^"}; // escape single quote for sql - for (String specialCharacter : specialCharacters) { - getQueryRunner().execute(writeSession, String.format("INSERT INTO %s VALUES ('name', 'A%sB')", tmpTableName, specialCharacter)); - assertQuery(String.format("SELECT nationkey FROM %s", tmpTableName), String.format("VALUES('A%sB')", specialCharacter)); - getQueryRunner().execute(writeSession, String.format("DELETE FROM %s", tmpTableName)); - } - } - finally { - dropTableIfExists(tmpTableName); - } - } - - @Test - public void testCreateBucketTableAsSelect() - { - Session session = buildSessionForTableWrite(); - // Generate temporary table name for bucketed table. - String bucketedOrdersTableName = generateRandomTableName(); - - for (String tableFormat : TABLE_FORMATS) { - try { - getQueryRunner().execute(session, String.format( - "CREATE TABLE %s WITH (format = '" + tableFormat + "', " + - "partitioned_by = ARRAY[ 'orderstatus' ], " + - "bucketed_by = ARRAY[ 'custkey' ], " + - "bucket_count = 2) " + - "AS SELECT custkey, comment, orderstatus FROM orders", bucketedOrdersTableName)); - assertQuery(String.format("SELECT * FROM %s", bucketedOrdersTableName), "SELECT custkey, comment, orderstatus FROM orders"); - assertQuery(String.format("SELECT * FROM %s where \"$bucket\" = 0", bucketedOrdersTableName), "SELECT custkey, comment, orderstatus FROM orders where custkey % 2 = 0"); - assertQuery(String.format("SELECT * FROM %s where \"$bucket\" = 1", bucketedOrdersTableName), "SELECT custkey, comment, orderstatus FROM orders where custkey % 2 = 1"); - } - finally { - dropTableIfExists(bucketedOrdersTableName); - } - } - } - - @Test - public void testCreateBucketSortedTableAsSelect() - { - Session session = buildSessionForTableWrite(); - // Generate temporary table name. - String badBucketTableName = generateRandomTableName(); - - for (String tableFormat : TABLE_FORMATS) { - try { - getQueryRunner().execute(session, String.format( - "CREATE TABLE %s WITH (format = '%s', " + - "partitioned_by = ARRAY[ 'orderstatus' ], " + - "bucketed_by=array['custkey'], " + - "bucket_count=1, " + - "sorted_by=array['orderkey']) " + - "AS SELECT custkey, orderkey, orderstatus FROM orders", badBucketTableName, tableFormat)); - assertQueryOrdered(String.format("SELECT custkey, orderkey, orderstatus FROM %s where orderstatus = '0'", badBucketTableName), "SELECT custkey, orderkey, orderstatus FROM orders where orderstatus = '0'"); - } - finally { - dropTableIfExists(badBucketTableName); - } - } - } - - @Test - public void testScaleWriters() - { - Session session = buildSessionForTableWrite(); - String tmpTableName = generateRandomTableName(); - getQueryRunner().execute(session, String.format( - "CREATE TABLE %s AS SELECT o_orderkey, o_custkey, o_orderstatus, o_totalprice, CAST(o_orderdate as VARCHAR) as o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment FROM tpchstandard.tiny.orders", tmpTableName)); - assertEquals(computeActual("SELECT count(DISTINCT \"$path\") FROM " + tmpTableName).getOnlyValue(), 1L); - dropTableIfExists(tmpTableName); - - tmpTableName = generateRandomTableName(); - getQueryRunner().execute(session, String.format( - "CREATE TABLE %s AS SELECT o_orderkey, o_custkey, o_orderstatus, o_totalprice, CAST(o_orderdate as VARCHAR) as o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment FROM tpchstandard.sf100.orders where o_orderdate > Date('1997-01-10')", tmpTableName)); - long files = (long) computeActual("SELECT count(DISTINCT \"$path\") FROM " + tmpTableName).getOnlyValue(); - long workers = (long) computeScalar("SELECT count(*) FROM system.runtime.nodes"); - assertThat(files).isBetween(2L, workers); - dropTableIfExists(tmpTableName); - } - - @Test - public void testCollectColumnStatisticsOnCreateTable() - { - Session session = buildSessionForTableWrite(); - String tmpTableName = generateRandomTableName(); - assertUpdate(session, format("" + - "CREATE TABLE %s " + - "WITH ( " + - " partitioned_by = ARRAY['p_varchar'] " + - ") " + - "AS " + - "SELECT c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_varbinary, c_array, p_varchar " + - "FROM ( " + - " VALUES " + - " (null, null, null, null, null, null, null, 'p1'), " + - " (null, null, null, null, null, null, null, 'p1'), " + - " (true, BIGINT '1', DOUBLE '2.2', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), to_ieee754_64(1), sequence(0, 10), 'p1')," + - " (false, BIGINT '0', DOUBLE '1.2', TIMESTAMP '2012-08-08 00:00', CAST('abc2' AS VARCHAR), to_ieee754_64(2), sequence(10, 20), 'p1')," + - " (null, null, null, null, null, null, null, 'p2'), " + - " (null, null, null, null, null, null, null, 'p2'), " + - " (true, BIGINT '2', DOUBLE '3.3', TIMESTAMP '2012-09-09 01:00', CAST('cba1' AS VARCHAR), to_ieee754_64(3), sequence(20, 25), 'p2'), " + - " (false, BIGINT '1', DOUBLE '2.3', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), to_ieee754_64(4), sequence(30, 35), 'p2') " + - ") AS x (c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_varbinary, c_array, p_varchar)", tmpTableName), 8); - - assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p1')", tmpTableName), - "SELECT * FROM (VALUES " + - "('c_boolean', null, 2.0E0, 0.5E0, null, null, null, null), " + - "('c_bigint', null, 2.0E0, 0.5E0, null, '0', '1', null), " + - "('c_double', null, 2.0E0, 0.5E0, null, '1.2', '2.2', null), " + - "('c_timestamp', null, 2.0E0, 0.5E0, null, null, null, null), " + - "('c_varchar', 16.0E0, 2.0E0, 0.5E0, null, null, null, null), " + // 8.0 - "('c_varbinary', 24.0, null, 0.5E0, null, null, null, null), " + - "('c_array', 184.0E0, null, 0.5, null, null, null, null), " + // 176 - "('p_varchar', 8.0E0, 1.0E0, 0.0E0, null, null, null, null), " + - "(null, null, null, null, 4.0E0, null, null, null)) AS x (column_name, data_size, distinct_values_count, nulls_fraction, row_count, low_value, high_value, histogram)"); - assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p2')", tmpTableName), - "SELECT * FROM (VALUES " + - "('c_boolean', null, 2.0E0, 0.5E0, null, null, null, null), " + - "('c_bigint', null, 2.0E0, 0.5E0, null, '1', '2', null), " + - "('c_double', null, 2.0E0, 0.5E0, null, '2.3', '3.3', null), " + - "('c_timestamp', null, 2.0E0, 0.5E0, null, null, null, null), " + - "('c_varchar', 16.0E0, 2.0E0, 0.5E0, null, null, null, null), " + // 8 - "('c_varbinary', 24.0, null, 0.5E0, null, null, null, null), " + - "('c_array', 104.0E0, null, 0.5, null, null, null, null), " + // 96 - "('p_varchar', 8.0E0, 1.0E0, 0.0E0, null, null, null, null), " + - "(null, null, null, null, 4.0E0, null, null, null)) AS x (column_name, data_size, distinct_values_count, nulls_fraction, row_count, low_value, high_value, histogram)"); - - // non existing partition - assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p3')", tmpTableName), - "SELECT * FROM (VALUES " + - "('c_boolean', null, 0E0, 0E0, null, null, null, null), " + - "('c_bigint', null, 0E0, 0E0, null, null, null, null), " + - "('c_double', null, 0E0, 0E0, null, null, null, null), " + - "('c_timestamp', null, 0E0, 0E0, null, null, null, null), " + - "('c_varchar', 0E0, 0E0, 0E0, null, null, null, null), " + - "('c_varbinary', null, 0E0, 0E0, null, null, null, null), " + - "('c_array', null, 0E0, 0E0, null, null, null, null), " + - "('p_varchar', 0E0, 0E0, 0E0, null, null, null, null), " + - "(null, null, null, null, 0E0, null, null, null)) AS x (column_name, data_size, distinct_values_count, nulls_fraction, row_count, low_value, high_value, histogram)"); - - dropTableIfExists(tmpTableName); - } - - @Test - public void testCollectColumnStatisticsOnInsert() - { - Session session = buildSessionForTableWrite(); - String tmpTableName = generateRandomTableName(); - assertUpdate(session, format("" + - "CREATE TABLE %s ( " + - " c_boolean BOOLEAN, " + - " c_bigint BIGINT, " + - " c_double DOUBLE, " + - " c_timestamp TIMESTAMP, " + - " c_varchar VARCHAR, " + - " c_varbinary VARBINARY, " + - " c_array ARRAY(BIGINT), " + - " p_varchar VARCHAR " + - ") " + - "WITH ( " + - " partitioned_by = ARRAY['p_varchar'] " + - ")", tmpTableName)); - - assertUpdate(format("" + - "INSERT INTO %s " + - "SELECT c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_varbinary, c_array, p_varchar " + - "FROM ( " + - " VALUES " + - " (null, null, null, null, null, null, null, 'p1'), " + - " (null, null, null, null, null, null, null, 'p1'), " + - " (true, BIGINT '1', DOUBLE '2.2', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), to_ieee754_64(1), sequence(0, 10), 'p1')," + - " (false, BIGINT '0', DOUBLE '1.2', TIMESTAMP '2012-08-08 00:00', CAST('abc2' AS VARCHAR), to_ieee754_64(2), sequence(10, 20), 'p1')," + - " (null, null, null, null, null, null, null, 'p2'), " + - " (null, null, null, null, null, null, null, 'p2'), " + - " (true, BIGINT '2', DOUBLE '3.3', TIMESTAMP '2012-09-09 01:00', CAST('cba1' AS VARCHAR), to_ieee754_64(3), sequence(20, 25), 'p2'), " + - " (false, BIGINT '1', DOUBLE '2.3', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), to_ieee754_64(4), sequence(30, 35), 'p2') " + - ") AS x (c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_varbinary, c_array, p_varchar)", tmpTableName), 8); - - assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p1')", tmpTableName), - "SELECT * FROM (VALUES " + - "('c_boolean', null, 2.0E0, 0.5E0, null, null, null, null), " + - "('c_bigint', null, 2.0E0, 0.5E0, null, '0', '1', null), " + - "('c_double', null, 2.0E0, 0.5E0, null, '1.2', '2.2', null), " + - "('c_timestamp', null, 2.0E0, 0.5E0, null, null, null, null), " + - "('c_varchar', 16.0E0, 2.0E0, 0.5E0, null, null, null, null), " + // 8 - "('c_varbinary', 24.0, null, 0.5E0, null, null, null, null), " + - "('c_array', 184.0E0, null, 0.5E0, null, null, null, null), " + // 176 - "('p_varchar', 8.0E0, 1.0E0, 0.0E0, null, null, null, null), " + - "(null, null, null, null, 4.0E0, null, null, null)) AS x (column_name, data_size, distinct_values_count, nulls_fraction, row_count, low_value, high_value, histogram)"); - assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p2')", tmpTableName), - "SELECT * FROM (VALUES " + - "('c_boolean', null, 2.0E0, 0.5E0, null, null, null, null), " + - "('c_bigint', null, 2.0E0, 0.5E0, null, '1', '2', null), " + - "('c_double', null, 2.0E0, 0.5E0, null, '2.3', '3.3', null), " + - "('c_timestamp', null, 2.0E0, 0.5E0, null, null, null, null), " + - "('c_varchar', 16.0E0, 2.0E0, 0.5E0, null, null, null, null), " + // 8 - "('c_varbinary', 24.0, null, 0.5E0, null, null, null, null), " + - "('c_array', 104.0E0, null, 0.5, null, null, null, null), " + // 96 - "('p_varchar', 8.0E0, 1.0E0, 0.0E0, null, null, null, null), " + - "(null, null, null, null, 4.0E0, null, null, null)) AS x (column_name, data_size, distinct_values_count, nulls_fraction, row_count, low_value, high_value, histogram)"); - - // non existing partition - assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p3')", tmpTableName), - "SELECT * FROM (VALUES " + - "('c_boolean', null, 0E0, 0E0, null, null, null, null), " + - "('c_bigint', null, 0E0, 0E0, null, null, null, null), " + - "('c_double', null, 0E0, 0E0, null, null, null, null), " + - "('c_timestamp', null, 0E0, 0E0, null, null, null, null), " + - "('c_varchar', 0E0, 0E0, 0E0, null, null, null, null), " + - "('c_varbinary', null, 0E0, 0E0, null, null, null, null), " + - "('c_array', null, 0E0, 0E0, null, null, null, null), " + - "('p_varchar', 0E0, 0E0, 0E0, null, null, null, null), " + - "(null, null, null, null, 0E0, null, null, null)) AS x (column_name, data_size, distinct_values_count, nulls_fraction, row_count, low_value, high_value, histogram)"); - - dropTableIfExists(tmpTableName); - } - - private void dropTableIfExists(String tableName) - { - computeExpected(String.format("DROP TABLE IF EXISTS %s", tableName), ImmutableList.of(BIGINT)); - } - - private String generateRandomTableName() - { - String tableName = "tmp_presto_" + UUID.randomUUID().toString().replace("-", ""); - // Clean up if the temporary named table already exists. - dropTableIfExists(tableName); - return tableName; - } - - private Session buildSessionForTableWrite() - { - return Session.builder(getSession()) - .setSystemProperty("scale_writers", "true") - .setSystemProperty("task_writer_count", "1") - .setSystemProperty("task_partitioned_writer_count", "2") - .setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true") - .setCatalogSessionProperty("hive", "orc_compression_codec", "ZSTD") - .build(); - } } diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWriterSingleWorker.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWriterSingleWorker.java new file mode 100644 index 0000000000000..fafa092371e48 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWriterSingleWorker.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.Session; +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; + +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; + +public class TestPrestoNativeWriterSingleWorker + extends AbstractTestWriter +{ + @Override + protected QueryRunner createQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeQueryRunner( + false, + "DWRF", + Optional.empty(), + false, + false, + true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaQueryRunner(); + } + + @Override + public void testScaleWriters() { + Session session = buildSessionForTableWrite(); + String tmpTableName = generateRandomTableName(); + getQueryRunner().execute(session, String.format( + "CREATE TABLE %s AS SELECT o_orderkey, o_custkey, o_orderstatus, o_totalprice, CAST(o_orderdate as VARCHAR) as o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment FROM tpchstandard.tiny.orders", tmpTableName)); + assertEquals(computeActual("SELECT count(DISTINCT \"$path\") FROM " + tmpTableName).getOnlyValue(), 1L); + dropTableIfExists(tmpTableName); + + tmpTableName = generateRandomTableName(); + getQueryRunner().execute(session, String.format( + "CREATE TABLE %s AS SELECT o_orderkey, o_custkey, o_orderstatus, o_totalprice, CAST(o_orderdate as VARCHAR) as o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment FROM tpchstandard.sf100.orders where o_orderdate > Date('1997-01-10')", tmpTableName)); + long files = (long) computeActual("SELECT count(DISTINCT \"$path\") FROM " + tmpTableName).getOnlyValue(); + assertThat(files).isEqualTo(1); + dropTableIfExists(tmpTableName); + } +} diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 1ce3c7ad2f834..2b5e9f1a5c690 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 1ce3c7ad2f8348c2e0edf6fe2185086b09d8bdc7 +Subproject commit 2b5e9f1a5c6907d5b18b47d5aadbe4062bc42edf