From c2faa5cd611888e6ee69254aadd68efb6e0b9441 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 4 Mar 2024 16:46:25 +0800 Subject: [PATCH 1/2] change batch parallelism --- src/frontend/src/scheduler/plan_fragmenter.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 47784fc8c4d69..c5404a7947264 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -437,6 +437,7 @@ impl QueryStage { &self, exchange_info: Option, source_info: SourceScanInfo, + task_parallelism: u32, ) -> Self { assert!(matches!(source_info, SourceScanInfo::Complete(_))); let exchange_info = if let Some(exchange_info) = exchange_info { @@ -450,7 +451,7 @@ impl QueryStage { id: self.id, root: self.root.clone(), exchange_info, - parallelism: Some(source_info.split_info().unwrap().len() as u32), + parallelism: Some(task_parallelism), table_scan_info: self.table_scan_info.clone(), source_info: Some(source_info), has_lookup_join: self.has_lookup_join, @@ -660,9 +661,11 @@ impl StageGraph { .complete(self.batch_parallelism) .await?; + // In order to cooperate with the batch reading of file source, all the splits to be read are divide into several parts to prevent the task from taking up too many resources. let complete_stage = Arc::new(stage.clone_with_exchange_info_and_complete_source_info( exchange_info, complete_source_info, + (self.batch_parallelism / 2) as u32, )); let parallelism = complete_stage.parallelism; complete_stages.insert(stage.id, complete_stage); From 6b5649ff1d19766f06b050134da9bd8b272637a3 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 5 Mar 2024 13:22:14 +0800 Subject: [PATCH 2/2] minor --- src/frontend/src/scheduler/plan_fragmenter.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index c5404a7947264..7afd66e0e18b6 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -52,6 +52,8 @@ use crate::optimizer::PlanRef; use crate::scheduler::worker_node_manager::WorkerNodeSelector; use crate::scheduler::SchedulerResult; +const TASK_MIN_PARALLELISM: u32 = 32; + #[derive(Clone, Debug, Hash, Eq, PartialEq)] pub struct QueryId { pub id: String, @@ -662,10 +664,13 @@ impl StageGraph { .await?; // In order to cooperate with the batch reading of file source, all the splits to be read are divide into several parts to prevent the task from taking up too many resources. + // To prevent the number of tasks from being too low, here the maximum of half the batch parallelism and `TASK_MIN_PARALLELISM` is taken. + let task_parallelism = + std::cmp::max((self.batch_parallelism / 2) as u32, TASK_MIN_PARALLELISM); let complete_stage = Arc::new(stage.clone_with_exchange_info_and_complete_source_info( exchange_info, complete_source_info, - (self.batch_parallelism / 2) as u32, + task_parallelism, )); let parallelism = complete_stage.parallelism; complete_stages.insert(stage.id, complete_stage);