diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 47784fc8c4d69..7afd66e0e18b6 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -52,6 +52,8 @@ use crate::optimizer::PlanRef; use crate::scheduler::worker_node_manager::WorkerNodeSelector; use crate::scheduler::SchedulerResult; +const TASK_MIN_PARALLELISM: u32 = 32; + #[derive(Clone, Debug, Hash, Eq, PartialEq)] pub struct QueryId { pub id: String, @@ -437,6 +439,7 @@ impl QueryStage { &self, exchange_info: Option, source_info: SourceScanInfo, + task_parallelism: u32, ) -> Self { assert!(matches!(source_info, SourceScanInfo::Complete(_))); let exchange_info = if let Some(exchange_info) = exchange_info { @@ -450,7 +453,7 @@ impl QueryStage { id: self.id, root: self.root.clone(), exchange_info, - parallelism: Some(source_info.split_info().unwrap().len() as u32), + parallelism: Some(task_parallelism), table_scan_info: self.table_scan_info.clone(), source_info: Some(source_info), has_lookup_join: self.has_lookup_join, @@ -660,9 +663,14 @@ impl StageGraph { .complete(self.batch_parallelism) .await?; + // In order to cooperate with the batch reading of file source, all the splits to be read are divide into several parts to prevent the task from taking up too many resources. + // To prevent the number of tasks from being too low, here the maximum of half the batch parallelism and `TASK_MIN_PARALLELISM` is taken. + let task_parallelism = + std::cmp::max((self.batch_parallelism / 2) as u32, TASK_MIN_PARALLELISM); let complete_stage = Arc::new(stage.clone_with_exchange_info_and_complete_source_info( exchange_info, complete_source_info, + task_parallelism, )); let parallelism = complete_stage.parallelism; complete_stages.insert(stage.id, complete_stage);