diff --git a/src/query/sql/src/planner/optimizer/cascades/cascade.rs b/src/query/sql/src/planner/optimizer/cascades/cascade.rs index 147bb4782d20..762d622f788a 100644 --- a/src/query/sql/src/planner/optimizer/cascades/cascade.rs +++ b/src/query/sql/src/planner/optimizer/cascades/cascade.rs @@ -19,6 +19,7 @@ use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; use log::debug; +use log::info; use super::explore_rules::get_explore_rule_set; use crate::optimizer::cascades::scheduler::Scheduler; @@ -87,10 +88,29 @@ impl CascadesOptimizer { .group_index; let root_task = OptimizeGroupTask::new(self.ctx.clone(), root_index); - let mut scheduler = Scheduler::new(); + + let start_time = std::time::Instant::now(); + let mut num_task_apply_rule = 0; + let mut scheduler = Scheduler::new().with_callback(|task| { + if let Task::ApplyRule(_) = task { + num_task_apply_rule += 1; + } + }); scheduler.add_task(Task::OptimizeGroup(root_task)); scheduler.run(self)?; + let scheduled_task_count = scheduler.scheduled_task_count(); + drop(scheduler); + let elapsed = start_time.elapsed(); + + info!( + "optimizer stats - total task number: {:#?}, total execution time: {:.3}ms, average execution time: {:.3}ms, apply rule task number: {:#?}", + scheduled_task_count, + elapsed.as_millis() as f64, + elapsed.as_millis() as f64 / scheduled_task_count as f64, + num_task_apply_rule, + ); + debug!("Memo:\n{}", display_memo(&self.memo, &self.best_cost_map)?); self.find_optimal_plan(root_index) diff --git a/src/query/sql/src/planner/optimizer/cascades/scheduler.rs b/src/query/sql/src/planner/optimizer/cascades/scheduler.rs index a35379a9e753..8ff063743181 100644 --- a/src/query/sql/src/planner/optimizer/cascades/scheduler.rs +++ b/src/query/sql/src/planner/optimizer/cascades/scheduler.rs @@ -20,29 +20,69 @@ use log::debug; use super::tasks::Task; use super::CascadesOptimizer; -pub struct Scheduler { +#[allow(clippy::type_complexity)] +pub struct Scheduler<'a> { task_queue: VecDeque, /// A counter to track the number of tasks /// that have been scheduled. scheduled_task_count: u64, + + /// The maximum number of tasks that can be scheduled. + /// If the number of scheduled tasks exceeds this limit, + /// the scheduler will stop scheduling new tasks. + task_limit: u64, + + /// Task callback functions invoked before a task is executed. + callback: Option>, } -impl Scheduler { +impl<'a> Scheduler<'a> { pub fn new() -> Self { Self { task_queue: Default::default(), scheduled_task_count: 0, + task_limit: u64::MAX, + callback: None, } } + /// Set the maximum number of tasks that can be scheduled. + #[allow(dead_code)] + pub fn with_task_limit(mut self, task_limit: u64) -> Self { + self.task_limit = task_limit; + self + } + + /// Add a callback function that will be invoked before a task is executed. + pub fn with_callback(mut self, callback: impl FnMut(&Task) + 'a) -> Self { + self.callback = Some(Box::new(callback)); + self + } + pub fn run(&mut self, optimizer: &mut CascadesOptimizer) -> Result<()> { - while let Some(task) = self.task_queue.pop_front() { + while let Some(mut task) = self.task_queue.pop_front() { + if self.scheduled_task_count > self.task_limit { + // Skip explore tasks if the task limit is reached. + match task { + Task::ExploreGroup(t) => { + task = Task::ExploreGroup(t.with_termination()); + } + Task::ExploreExpr(t) => { + task = Task::ExploreExpr(t.with_termination()); + } + _ => {} + } + } + if task.ref_count() > 0 { // The task is still referenced by other tasks, requeue it. self.task_queue.push_back(task); continue; } + if let Some(callback) = &mut self.callback { + callback(&task); + } task.execute(optimizer, self)?; // Update the counter @@ -60,4 +100,8 @@ impl Scheduler { pub fn add_task(&mut self, task: Task) { self.task_queue.push_back(task); } + + pub fn scheduled_task_count(&self) -> u64 { + self.scheduled_task_count + } } diff --git a/src/query/sql/src/planner/optimizer/cascades/tasks/explore_expr.rs b/src/query/sql/src/planner/optimizer/cascades/tasks/explore_expr.rs index 2bdc558f7524..8a18b23ca773 100644 --- a/src/query/sql/src/planner/optimizer/cascades/tasks/explore_expr.rs +++ b/src/query/sql/src/planner/optimizer/cascades/tasks/explore_expr.rs @@ -56,6 +56,8 @@ pub struct ExploreExprTask { pub ref_count: Rc, pub parent: Option>, + + should_terminate: bool, } impl ExploreExprTask { @@ -71,29 +73,32 @@ impl ExploreExprTask { m_expr_index, ref_count: Rc::new(SharedCounter::new()), parent: None, + should_terminate: false, } } - pub fn with_parent( - ctx: Arc, - group_index: IndexType, - m_expr_index: IndexType, - parent: &Rc, - ) -> Self { - let mut task = Self::new(ctx, group_index, m_expr_index); + pub fn with_parent(mut self, parent: &Rc) -> Self { parent.inc(); - task.parent = Some(parent.clone()); - task + self.parent = Some(parent.clone()); + self + } + + pub fn with_termination(mut self) -> Self { + self.should_terminate = true; + self } pub fn execute( mut self, optimizer: &mut CascadesOptimizer, - scheduler: &mut Scheduler, + scheduler: &mut Scheduler<'_>, ) -> Result<()> { if matches!(self.state, ExploreExprState::ExploredSelf) { return Ok(()); } + if self.should_terminate { + return self.terminate(); + } self.transition(optimizer, scheduler)?; scheduler.add_task(Task::ExploreExpr(self)); Ok(()) @@ -102,7 +107,7 @@ impl ExploreExprTask { fn transition( &mut self, optimizer: &mut CascadesOptimizer, - scheduler: &mut Scheduler, + scheduler: &mut Scheduler<'_>, ) -> Result<()> { let event = match self.state { ExploreExprState::Init => self.explore_children(optimizer, scheduler)?, @@ -129,7 +134,7 @@ impl ExploreExprTask { fn explore_children( &mut self, optimizer: &mut CascadesOptimizer, - scheduler: &mut Scheduler, + scheduler: &mut Scheduler<'_>, ) -> Result { let m_expr = optimizer .memo @@ -142,7 +147,7 @@ impl ExploreExprTask { // If the child group isn't explored, then schedule a `ExploreGroupTask` for it. all_children_explored = false; let explore_group_task = - ExploreGroupTask::with_parent(self.ctx.clone(), *child, &self.ref_count); + ExploreGroupTask::new(self.ctx.clone(), *child).with_parent(&self.ref_count); scheduler.add_task(Task::ExploreGroup(explore_group_task)); } } @@ -157,7 +162,7 @@ impl ExploreExprTask { fn explore_self( &mut self, optimizer: &mut CascadesOptimizer, - scheduler: &mut Scheduler, + scheduler: &mut Scheduler<'_>, ) -> Result { let m_expr = optimizer .memo @@ -181,4 +186,14 @@ impl ExploreExprTask { } Ok(ExploreExprEvent::ExploredSelf) } + + fn terminate(&mut self) -> Result<()> { + if let Some(parent) = &self.parent { + parent.dec(); + } + + self.state = ExploreExprState::ExploredSelf; + + Ok(()) + } } diff --git a/src/query/sql/src/planner/optimizer/cascades/tasks/explore_group.rs b/src/query/sql/src/planner/optimizer/cascades/tasks/explore_group.rs index ed7a82d468fa..d4f0f264c624 100644 --- a/src/query/sql/src/planner/optimizer/cascades/tasks/explore_group.rs +++ b/src/query/sql/src/planner/optimizer/cascades/tasks/explore_group.rs @@ -52,6 +52,8 @@ pub struct ExploreGroupTask { pub ref_count: Rc, pub parent: Option>, + + pub should_terminate: bool, } impl ExploreGroupTask { @@ -63,28 +65,33 @@ impl ExploreGroupTask { last_explored_m_expr: None, ref_count: Rc::new(SharedCounter::new()), parent: None, + should_terminate: false, } } - pub fn with_parent( - ctx: Arc, - group_index: IndexType, - parent: &Rc, - ) -> Self { - let mut task = Self::new(ctx, group_index); + pub fn with_parent(mut self, parent: &Rc) -> Self { parent.inc(); - task.parent = Some(parent.clone()); - task + self.parent = Some(parent.clone()); + self + } + + /// Mark this task as a termination task. + pub fn with_termination(mut self) -> Self { + self.should_terminate = true; + self } pub fn execute( mut self, optimizer: &mut CascadesOptimizer, - scheduler: &mut Scheduler, + scheduler: &mut Scheduler<'_>, ) -> Result<()> { if matches!(self.state, ExploreGroupState::Explored) { return Ok(()); } + if self.should_terminate { + return self.terminate(optimizer); + } self.transition(optimizer, scheduler)?; scheduler.add_task(Task::ExploreGroup(self)); Ok(()) @@ -93,7 +100,7 @@ impl ExploreGroupTask { pub fn transition( &mut self, optimizer: &mut CascadesOptimizer, - scheduler: &mut Scheduler, + scheduler: &mut Scheduler<'_>, ) -> Result<()> { let event = match self.state { ExploreGroupState::Init => self.explore_group(optimizer, scheduler)?, @@ -115,7 +122,7 @@ impl ExploreGroupTask { fn explore_group( &mut self, optimizer: &mut CascadesOptimizer, - scheduler: &mut Scheduler, + scheduler: &mut Scheduler<'_>, ) -> Result { let group = optimizer.memo.group_mut(self.group_index)?; @@ -130,12 +137,8 @@ impl ExploreGroupTask { } for m_expr in group.m_exprs.iter().skip(start_index) { - let task = ExploreExprTask::with_parent( - self.ctx.clone(), - m_expr.group_index, - m_expr.index, - &self.ref_count, - ); + let task = ExploreExprTask::new(self.ctx.clone(), m_expr.group_index, m_expr.index) + .with_parent(&self.ref_count); scheduler.add_task(Task::ExploreExpr(task)); } @@ -143,4 +146,15 @@ impl ExploreGroupTask { Ok(ExploreGroupEvent::Exploring) } + + pub fn terminate(&mut self, cascades_optimizer: &mut CascadesOptimizer) -> Result<()> { + if let Some(parent) = &self.parent { + parent.dec(); + } + + let group = cascades_optimizer.memo.group_mut(self.group_index)?; + group.set_state(GroupState::Explored); + + Ok(()) + } } diff --git a/src/query/sql/src/planner/optimizer/cascades/tasks/mod.rs b/src/query/sql/src/planner/optimizer/cascades/tasks/mod.rs index fce630489464..29c2b7d3fb21 100644 --- a/src/query/sql/src/planner/optimizer/cascades/tasks/mod.rs +++ b/src/query/sql/src/planner/optimizer/cascades/tasks/mod.rs @@ -70,7 +70,7 @@ impl Task { pub fn execute( self, optimizer: &mut CascadesOptimizer, - scheduler: &mut Scheduler, + scheduler: &mut Scheduler<'_>, ) -> Result<()> { match self { Task::ApplyRule(task) => task.execute(optimizer), diff --git a/src/query/sql/src/planner/optimizer/cascades/tasks/optimize_group.rs b/src/query/sql/src/planner/optimizer/cascades/tasks/optimize_group.rs index 08cb332fb5c5..fe6d717ab381 100644 --- a/src/query/sql/src/planner/optimizer/cascades/tasks/optimize_group.rs +++ b/src/query/sql/src/planner/optimizer/cascades/tasks/optimize_group.rs @@ -126,8 +126,8 @@ impl OptimizeGroupTask { ) -> Result { let group = optimizer.memo.group(self.group_index)?; if !group.state.explored() { - let task = - ExploreGroupTask::with_parent(self.ctx.clone(), group.group_index, &self.ref_count); + let task = ExploreGroupTask::new(self.ctx.clone(), group.group_index) + .with_parent(&self.ref_count); scheduler.add_task(Task::ExploreGroup(task)); Ok(OptimizeGroupEvent::Exploring) } else { diff --git a/src/query/sql/src/planner/optimizer/memo.rs b/src/query/sql/src/planner/optimizer/memo.rs index 288da7bbda5f..b0da24c32583 100644 --- a/src/query/sql/src/planner/optimizer/memo.rs +++ b/src/query/sql/src/planner/optimizer/memo.rs @@ -149,4 +149,17 @@ impl Memo { self.groups.push(group); group_index } + + /// Get an estimate of the memory size of the memo. + pub fn mem_size(&self) -> usize { + // Since all the `RelOperator` are interned, + // we only need to count the size of `m_expr_lookup_table`. + // We assume the `RelOperator`s are the major part of the memo. + self.m_expr_lookup_table.len() * std::mem::size_of::() + } + + /// Get the number of groups in the memo. + pub fn num_groups(&self) -> usize { + self.groups.len() + } }