Skip to content

Commit

Permalink
feat(planner): support timeout termination for query optimization (#1โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ3955)

add stats for cascades optimizer
  • Loading branch information
leiysky authored Dec 7, 2023
1 parent e9934ab commit 4e6515b
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 38 deletions.
22 changes: 21 additions & 1 deletion src/query/sql/src/planner/optimizer/cascades/cascade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use log::debug;
use log::info;

use super::explore_rules::get_explore_rule_set;
use crate::optimizer::cascades::scheduler::Scheduler;
Expand Down Expand Up @@ -87,10 +88,29 @@ impl CascadesOptimizer {
.group_index;

let root_task = OptimizeGroupTask::new(self.ctx.clone(), root_index);
let mut scheduler = Scheduler::new();

let start_time = std::time::Instant::now();
let mut num_task_apply_rule = 0;
let mut scheduler = Scheduler::new().with_callback(|task| {
if let Task::ApplyRule(_) = task {
num_task_apply_rule += 1;
}
});
scheduler.add_task(Task::OptimizeGroup(root_task));
scheduler.run(self)?;

let scheduled_task_count = scheduler.scheduled_task_count();
drop(scheduler);
let elapsed = start_time.elapsed();

info!(
"optimizer stats - total task number: {:#?}, total execution time: {:.3}ms, average execution time: {:.3}ms, apply rule task number: {:#?}",
scheduled_task_count,
elapsed.as_millis() as f64,
elapsed.as_millis() as f64 / scheduled_task_count as f64,
num_task_apply_rule,
);

debug!("Memo:\n{}", display_memo(&self.memo, &self.best_cost_map)?);

self.find_optimal_plan(root_index)
Expand Down
50 changes: 47 additions & 3 deletions src/query/sql/src/planner/optimizer/cascades/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,69 @@ use log::debug;
use super::tasks::Task;
use super::CascadesOptimizer;

pub struct Scheduler {
#[allow(clippy::type_complexity)]
pub struct Scheduler<'a> {
task_queue: VecDeque<Task>,

/// A counter to track the number of tasks
/// that have been scheduled.
scheduled_task_count: u64,

/// The maximum number of tasks that can be scheduled.
/// If the number of scheduled tasks exceeds this limit,
/// the scheduler will stop scheduling new tasks.
task_limit: u64,

/// Task callback functions invoked before a task is executed.
callback: Option<Box<dyn FnMut(&Task) + 'a>>,
}

impl Scheduler {
impl<'a> Scheduler<'a> {
pub fn new() -> Self {
Self {
task_queue: Default::default(),
scheduled_task_count: 0,
task_limit: u64::MAX,
callback: None,
}
}

/// Set the maximum number of tasks that can be scheduled.
#[allow(dead_code)]
pub fn with_task_limit(mut self, task_limit: u64) -> Self {
self.task_limit = task_limit;
self
}

/// Add a callback function that will be invoked before a task is executed.
pub fn with_callback(mut self, callback: impl FnMut(&Task) + 'a) -> Self {
self.callback = Some(Box::new(callback));
self
}

pub fn run(&mut self, optimizer: &mut CascadesOptimizer) -> Result<()> {
while let Some(task) = self.task_queue.pop_front() {
while let Some(mut task) = self.task_queue.pop_front() {
if self.scheduled_task_count > self.task_limit {
// Skip explore tasks if the task limit is reached.
match task {
Task::ExploreGroup(t) => {
task = Task::ExploreGroup(t.with_termination());
}
Task::ExploreExpr(t) => {
task = Task::ExploreExpr(t.with_termination());
}
_ => {}
}
}

if task.ref_count() > 0 {
// The task is still referenced by other tasks, requeue it.
self.task_queue.push_back(task);
continue;
}
if let Some(callback) = &mut self.callback {
callback(&task);
}
task.execute(optimizer, self)?;

// Update the counter
Expand All @@ -60,4 +100,8 @@ impl Scheduler {
pub fn add_task(&mut self, task: Task) {
self.task_queue.push_back(task);
}

pub fn scheduled_task_count(&self) -> u64 {
self.scheduled_task_count
}
}
43 changes: 29 additions & 14 deletions src/query/sql/src/planner/optimizer/cascades/tasks/explore_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct ExploreExprTask {

pub ref_count: Rc<SharedCounter>,
pub parent: Option<Rc<SharedCounter>>,

should_terminate: bool,
}

impl ExploreExprTask {
Expand All @@ -71,29 +73,32 @@ impl ExploreExprTask {
m_expr_index,
ref_count: Rc::new(SharedCounter::new()),
parent: None,
should_terminate: false,
}
}

pub fn with_parent(
ctx: Arc<dyn TableContext>,
group_index: IndexType,
m_expr_index: IndexType,
parent: &Rc<SharedCounter>,
) -> Self {
let mut task = Self::new(ctx, group_index, m_expr_index);
pub fn with_parent(mut self, parent: &Rc<SharedCounter>) -> Self {
parent.inc();
task.parent = Some(parent.clone());
task
self.parent = Some(parent.clone());
self
}

pub fn with_termination(mut self) -> Self {
self.should_terminate = true;
self
}

pub fn execute(
mut self,
optimizer: &mut CascadesOptimizer,
scheduler: &mut Scheduler,
scheduler: &mut Scheduler<'_>,
) -> Result<()> {
if matches!(self.state, ExploreExprState::ExploredSelf) {
return Ok(());
}
if self.should_terminate {
return self.terminate();
}
self.transition(optimizer, scheduler)?;
scheduler.add_task(Task::ExploreExpr(self));
Ok(())
Expand All @@ -102,7 +107,7 @@ impl ExploreExprTask {
fn transition(
&mut self,
optimizer: &mut CascadesOptimizer,
scheduler: &mut Scheduler,
scheduler: &mut Scheduler<'_>,
) -> Result<()> {
let event = match self.state {
ExploreExprState::Init => self.explore_children(optimizer, scheduler)?,
Expand All @@ -129,7 +134,7 @@ impl ExploreExprTask {
fn explore_children(
&mut self,
optimizer: &mut CascadesOptimizer,
scheduler: &mut Scheduler,
scheduler: &mut Scheduler<'_>,
) -> Result<ExploreExprEvent> {
let m_expr = optimizer
.memo
Expand All @@ -142,7 +147,7 @@ impl ExploreExprTask {
// If the child group isn't explored, then schedule a `ExploreGroupTask` for it.
all_children_explored = false;
let explore_group_task =
ExploreGroupTask::with_parent(self.ctx.clone(), *child, &self.ref_count);
ExploreGroupTask::new(self.ctx.clone(), *child).with_parent(&self.ref_count);
scheduler.add_task(Task::ExploreGroup(explore_group_task));
}
}
Expand All @@ -157,7 +162,7 @@ impl ExploreExprTask {
fn explore_self(
&mut self,
optimizer: &mut CascadesOptimizer,
scheduler: &mut Scheduler,
scheduler: &mut Scheduler<'_>,
) -> Result<ExploreExprEvent> {
let m_expr = optimizer
.memo
Expand All @@ -181,4 +186,14 @@ impl ExploreExprTask {
}
Ok(ExploreExprEvent::ExploredSelf)
}

fn terminate(&mut self) -> Result<()> {
if let Some(parent) = &self.parent {
parent.dec();
}

self.state = ExploreExprState::ExploredSelf;

Ok(())
}
}
48 changes: 31 additions & 17 deletions src/query/sql/src/planner/optimizer/cascades/tasks/explore_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub struct ExploreGroupTask {

pub ref_count: Rc<SharedCounter>,
pub parent: Option<Rc<SharedCounter>>,

pub should_terminate: bool,
}

impl ExploreGroupTask {
Expand All @@ -63,28 +65,33 @@ impl ExploreGroupTask {
last_explored_m_expr: None,
ref_count: Rc::new(SharedCounter::new()),
parent: None,
should_terminate: false,
}
}

pub fn with_parent(
ctx: Arc<dyn TableContext>,
group_index: IndexType,
parent: &Rc<SharedCounter>,
) -> Self {
let mut task = Self::new(ctx, group_index);
pub fn with_parent(mut self, parent: &Rc<SharedCounter>) -> Self {
parent.inc();
task.parent = Some(parent.clone());
task
self.parent = Some(parent.clone());
self
}

/// Mark this task as a termination task.
pub fn with_termination(mut self) -> Self {
self.should_terminate = true;
self
}

pub fn execute(
mut self,
optimizer: &mut CascadesOptimizer,
scheduler: &mut Scheduler,
scheduler: &mut Scheduler<'_>,
) -> Result<()> {
if matches!(self.state, ExploreGroupState::Explored) {
return Ok(());
}
if self.should_terminate {
return self.terminate(optimizer);
}
self.transition(optimizer, scheduler)?;
scheduler.add_task(Task::ExploreGroup(self));
Ok(())
Expand All @@ -93,7 +100,7 @@ impl ExploreGroupTask {
pub fn transition(
&mut self,
optimizer: &mut CascadesOptimizer,
scheduler: &mut Scheduler,
scheduler: &mut Scheduler<'_>,
) -> Result<()> {
let event = match self.state {
ExploreGroupState::Init => self.explore_group(optimizer, scheduler)?,
Expand All @@ -115,7 +122,7 @@ impl ExploreGroupTask {
fn explore_group(
&mut self,
optimizer: &mut CascadesOptimizer,
scheduler: &mut Scheduler,
scheduler: &mut Scheduler<'_>,
) -> Result<ExploreGroupEvent> {
let group = optimizer.memo.group_mut(self.group_index)?;

Expand All @@ -130,17 +137,24 @@ impl ExploreGroupTask {
}

for m_expr in group.m_exprs.iter().skip(start_index) {
let task = ExploreExprTask::with_parent(
self.ctx.clone(),
m_expr.group_index,
m_expr.index,
&self.ref_count,
);
let task = ExploreExprTask::new(self.ctx.clone(), m_expr.group_index, m_expr.index)
.with_parent(&self.ref_count);
scheduler.add_task(Task::ExploreExpr(task));
}

self.last_explored_m_expr = Some(group.num_exprs());

Ok(ExploreGroupEvent::Exploring)
}

pub fn terminate(&mut self, cascades_optimizer: &mut CascadesOptimizer) -> Result<()> {
if let Some(parent) = &self.parent {
parent.dec();
}

let group = cascades_optimizer.memo.group_mut(self.group_index)?;
group.set_state(GroupState::Explored);

Ok(())
}
}
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/optimizer/cascades/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Task {
pub fn execute(
self,
optimizer: &mut CascadesOptimizer,
scheduler: &mut Scheduler,
scheduler: &mut Scheduler<'_>,
) -> Result<()> {
match self {
Task::ApplyRule(task) => task.execute(optimizer),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ impl OptimizeGroupTask {
) -> Result<OptimizeGroupEvent> {
let group = optimizer.memo.group(self.group_index)?;
if !group.state.explored() {
let task =
ExploreGroupTask::with_parent(self.ctx.clone(), group.group_index, &self.ref_count);
let task = ExploreGroupTask::new(self.ctx.clone(), group.group_index)
.with_parent(&self.ref_count);
scheduler.add_task(Task::ExploreGroup(task));
Ok(OptimizeGroupEvent::Exploring)
} else {
Expand Down
13 changes: 13 additions & 0 deletions src/query/sql/src/planner/optimizer/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,17 @@ impl Memo {
self.groups.push(group);
group_index
}

/// Get an estimate of the memory size of the memo.
pub fn mem_size(&self) -> usize {
// Since all the `RelOperator` are interned,
// we only need to count the size of `m_expr_lookup_table`.
// We assume the `RelOperator`s are the major part of the memo.
self.m_expr_lookup_table.len() * std::mem::size_of::<RelOperator>()
}

/// Get the number of groups in the memo.
pub fn num_groups(&self) -> usize {
self.groups.len()
}
}

0 comments on commit 4e6515b

Please sign in to comment.