From a64b53fbf465f510da0a5250afedcb38a0d687fe Mon Sep 17 00:00:00 2001 From: cjj2010 <2449402815@qq.com> Date: Thu, 26 Sep 2024 17:30:03 +0800 Subject: [PATCH] [enhance](Cooldown) add SessionVariable for enableCooldownReplicaAffinity --- .../org/apache/doris/planner/OlapScanNode.java | 5 +++-- .../java/org/apache/doris/qe/SessionVariable.java | 15 ++++++++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index afdffc748c01a6..3d00cb477b0969 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -978,10 +978,11 @@ private void addScanRangeLocations(Partition partition, private boolean isEnableCooldownReplicaAffinity() { ConnectContext connectContext = ConnectContext.get(); - if (connectContext != null) { + if (connectContext == null) { + return Config.enable_cooldown_replica_affinity; + } else { return connectContext.getSessionVariable().isEnableCooldownReplicaAffinity(); } - return true; } private void computePartitionInfo() throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index b05622a5c88b13..ce9f9799a52d35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -667,6 +667,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_PHRASE_QUERY_SEQUENYIAL_OPT = "enable_phrase_query_sequential_opt"; + public static final String ENABLE_COOLDOWN_REPLICA_AFFINITY = + "enable_cooldown_replica_affinity"; /** * If set false, user couldn't submit analyze SQL and FE won't allocate any related resources. */ @@ -946,6 +948,12 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_ODBC_TRANSCATION) public boolean enableOdbcTransaction = false; + @VariableMgr.VarAttr(name = ENABLE_SCAN_RUN_SERIAL, description = { + "是否开启ScanNode串行读,以避免limit较小的情况下的读放大,可以提高查询的并发能力", + "Whether to enable ScanNode serial reading to avoid read amplification in cases of small limits" + + "which can improve query concurrency. default is false."}) + public boolean enableScanRunSerial = false; + @VariableMgr.VarAttr(name = ENABLE_SQL_CACHE) public boolean enableSqlCache = false; @@ -2159,7 +2167,7 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) { @VariableMgr.VarAttr(name = ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true, description = { "开启后将会允许自动调整 pipeline task 的并发数。当 scan 节点没有过滤条件,且 limit 参数小于" - + "adaptive_pipeline_task_serial_read_on_limit 中指定的行数时,scanner 的并行度将会被设置为 1", + + "adaptive_pipeline_task_serial_read_on_limit 中指定的行数时,scan 的并行度将会被设置为 1", "When enabled, the pipeline task concurrency will be adjusted automatically. When the scan node has no filter " + "conditions and the limit parameter is less than the number of rows specified in " + "adaptive_pipeline_task_serial_read_on_limit, the parallelism of the scan will be set to 1." @@ -2987,6 +2995,10 @@ public void setShowHiddenColumns(boolean showHiddenColumns) { this.showHiddenColumns = showHiddenColumns; } + public boolean isEnableScanRunSerial() { + return enableScanRunSerial; + } + public boolean skipStorageEngineMerge() { return skipStorageEngineMerge; } @@ -3671,6 +3683,7 @@ public TQueryOptions toThrift() { tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery); tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin); tResult.setEnableHashJoinEarlyStartProbe(enableHashJoinEarlyStartProbe); + tResult.setEnableScanNodeRunSerial(enableScanRunSerial); tResult.setBatchSize(batchSize); tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);