-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add single node execution #24172
base: master
Are you sure you want to change the base?
Add single node execution #24172
Conversation
0d9137e
to
d71d256
Compare
Can you help to explain why this feature is or has to be tied to native execution? Perhaps describe the background and motivation? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High level comments
- Do we want to support single node execution on per-query basis?
This can be useful to improve latency of tiny queries running on a large cluster. For example a user may know that a query is small and may decide to run it on a multi node cluster in a single node mode.
If decided to support it is necessary to make sure the session property is used consistently through the code.
If decided not to support it for now I think the session property should be removed and a configuration property should only be used.
- Should the single node execution mode be native specific?
When running a Java cluster deployment with a dedicated coordinator and a dedicated worker (workers) additional exchanges at worker - coordinator boundary are necessary.
I'm thinking if a simpler mental model would be to always add coordinator-to-worker exchanges when single node execution is requested?
@@ -328,6 +328,7 @@ public final class SystemSessionProperties | |||
// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future. | |||
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all"; | |||
private static final String NATIVE_EXECUTION_ENABLED = "native_execution_enabled"; | |||
private static final String NATIVE_SINGLE_WORKER_EXECUTION = "native_single_worker_execution"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we stay consistent and use node
instead of worker
(e.g.: query_max_memory_per_node
, force_single_node_output
, etc.). Also maybe add the _enabled
suffix to make it sound more natural, e.g.: single_node_execution_enabled
, isSingleNodeExecutionEnabled(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried singleNodeExecutionEnabled, but then realized it would cause confusion with forceSingleNode
Node can either be worker or coordinator, but what we want is explicitly worker. So singleWorkerExecutionEnabled
makes more sense
presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public PlanNode visitTableFinish(TableFinishNode node, RewriteContext<Void> context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are other nodes that need to be run on coordinator:
https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java#L181
https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java#L235
Do we need to add an exchange to support those as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setCoordinatorOnlyDistribution currently works for ExplainAnalyze, TableFinish, MetadataDelete and StatisticsWriterNode
MetadataDelete we don't need to add exchange (looks like it would be a metadata operation: 02b1bf7)
I have added the exchange for the rest
@@ -813,7 +814,11 @@ public PlanOptimizers( | |||
costCalculator, | |||
ImmutableSet.of(new ScaledWriterRule()))); | |||
|
|||
if (!forceSingleNode) { | |||
if (featuresConfig.isNativeExecutionEnabled() && featuresConfig.isNativeSingleWorkerExecution()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if a simpler mental model would be to always add worker to coordinator
exchanges when single worker execution is enabled.
This way:
- single worker execution can be enabled for normal clusters with more than a single worker and a single coordinator (with schedule on coordinator disabled) on per query basis
- For Java execution if coordinator scheduling is enabled an extra exchange is not going to hurt
For example this condition can be kept as isSingleNodeExecutionEnabled(session)
and we can call the AddExchangeForNativeSingleWorker
as AddWorkerToCoordinatorExchanges
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But for some cases, it would be exchange from coordinator to worker? For example, scanning system table
Aggregation [Worker]
|
Exchange
|
TableScan (system table) [Coordinator]
presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java
Outdated
Show resolved
Hide resolved
...ain/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java
Outdated
Show resolved
Hide resolved
...rc/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWindowQueriesSingleWorker.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public void testScaleWriters() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scale writers should have no effect in single node? Do we nee this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to use this test to see scaled-writer under single worker execution mode, it wouldn't scale to multiple worker tasks
We can also potentially use HBO/CBO to decide to run some in single node mode |
15bf3b5
to
5aa836f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % nits and fixing test failures
presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java
Outdated
Show resolved
Hide resolved
.../java/com/facebook/presto/sql/planner/optimizations/AddExchangeForSingleWorkerExecution.java
Outdated
Show resolved
Hide resolved
.../java/com/facebook/presto/sql/planner/optimizations/AddExchangeForSingleWorkerExecution.java
Outdated
Show resolved
Hide resolved
...ain/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java
Outdated
Show resolved
Hide resolved
7685df7
to
3de09d3
Compare
3de09d3
to
8aa6631
Compare
46d7ec3
to
1ee77a7
Compare
Thanks for the release note entry! Some formatting nits.
Also, consider adding documentation for the new configuration property and session property to either the Presto [Configuration, Session] Properties pages, or the Presto C++ pages, as appropriate. |
if (containsSystemTableScan(plan)) { | ||
plan = gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, plan); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these three lines of code the reason we've extended so many test cases to apply to Presto - single node - native? If so, I'm wondering if a few, targetted example based tests are more appropriate. I'm concerned the additional tests don't add value and will make our CI slower and more expensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not only so, essentially under single-node mode, we will not use addExchange optimizer. Theoretically we should test many cases that originally have remote exchange (especially partitioned), would they still return correct result under single-node mode.
And also scheduling has changes accordingly, one of the test cases actually caught a issue for it.
I understand your concern, I remove some of tests (that I think could potentially be redundant in terms of exchange pattern)
2024bac
to
55ca461
Compare
ac7073b
to
57fce4d
Compare
It won't let me rerun some sporadic flaky test, have to push to force rerun :( |
@kewang1024 this feature could be useful for lower latency deployments to support canned or bounded queries, where latency is expected to be low. Could you please introduce some documentation for it as @steveburnett requested? |
@kewang1024 /@kaikalur can you create an issue for this so we don't lose track of this suggestion? I think we could also consider to toggle this feature via a resource group, similar to per-query-limits in resource groups. This might make the feature more convenient to toggle, and in a multinode cluster, this would more accurate value for the hard/soft concurrency limit (and perhaps make it safer to run in a multitenant deployment). I can create an issue for that, I don't think it needs to be added here. |
@steveburnett the Presto C++ pages is not a good place since this is not limited to c++, I failed to find the [Configuration, Session] one you're referring to, can u give me a pointer |
Of course! I was referring the to Presto Configuration Properties page or the Presto Session Properties page |
57fce4d
to
fb4ad58
Compare
fb4ad58
to
b91f0f5
Compare
Thanks @steveburnett for the prompt response, updated the doc. |
040866e
to
61d73da
Compare
@tdcmeehan Updated accordingly, can you help take another look? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, wondering what's preventing us from using FixedBucketNodeMap
, since it seems that BucketNodeMap
is aligned with this use case (only using a single node-bucket).
@Override | ||
public boolean isDynamic() | ||
{ | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, why is this true? Shouldn't this be false, since I think there is a single node and a single task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is only applicable for grouped execution. Grouped execution is currently not supported for single node mode. It can be supported if needed, but generally the idea is that only small queries should run single node, while grouped execution is generally applicable for very large queries.
@Override | ||
public boolean hasInitialMap() | ||
{ | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this be true? Since there's only one node being returned in getBucketToNode
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is true. @kewang1024 I wonder if instead we can simply use a DynamicBucketNodeMap((split) -> 0, 1)
. Basically pretending there's only a single bucket for all splits to avoid a custom override?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, can make that change now
8376f0c
to
6ae5e35
Compare
To improve performance for small queries which can be executed within a single node, we introduce single worker execution mode: query will only use one node to execute and plan would be optimized accordingly.
6ae5e35
to
58a7e6c
Compare
Description