Skip to content

Commit

Permalink
refactor heuristic optimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
leiysky committed Dec 14, 2023
1 parent 457baeb commit 131a8d2
Show file tree
Hide file tree
Showing 17 changed files with 241 additions and 135 deletions.
31 changes: 10 additions & 21 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@ use common_sql::executor::physical_plans::Exchange;
use common_sql::executor::physical_plans::FragmentKind;
use common_sql::executor::physical_plans::MutationKind;
use common_sql::executor::PhysicalPlan;
use common_sql::optimizer::CascadesOptimizer;
use common_sql::optimizer::DPhpy;
use common_sql::optimizer::HeuristicOptimizer;
use common_sql::optimizer::optimize_query;
use common_sql::optimizer::OptimizerContext;
use common_sql::optimizer::SExpr;
use common_sql::optimizer::DEFAULT_REWRITE_RULES;
use common_sql::optimizer::RESIDUAL_RULES;
use common_sql::plans::BoundColumnRef;
use common_sql::plans::ConstantExpr;
use common_sql::plans::EvalScalar;
Expand Down Expand Up @@ -298,7 +295,7 @@ pub async fn subquery_filter(
// Select `_row_id` column
let input_expr = subquery_desc.input_expr.clone();

let expr = SExpr::create_unary(
let mut s_expr = SExpr::create_unary(
Arc::new(RelOperator::EvalScalar(EvalScalar {
items: vec![ScalarItem {
scalar: ScalarExpr::BoundColumnRef(BoundColumnRef {
Expand All @@ -314,26 +311,18 @@ pub async fn subquery_filter(
let mut bind_context = Box::new(BindContext::new());
bind_context.add_column_binding(row_id_column_binding.clone());

let heuristic = HeuristicOptimizer::new(ctx.get_function_context()?, metadata.clone());
let mut expr = heuristic.optimize(expr, &DEFAULT_REWRITE_RULES)?;
let mut dphyp_optimized = false;
if ctx.get_settings().get_enable_dphyp()? {
let (dp_res, optimized) =
DPhpy::new(ctx.clone(), metadata.clone()).optimize(Arc::new(expr.clone()))?;
if optimized {
expr = (*dp_res).clone();
dphyp_optimized = true;
}
}
let mut cascades = CascadesOptimizer::create(ctx.clone(), metadata.clone(), dphyp_optimized)?;
expr = cascades.optimize(expr)?;
expr = heuristic.optimize(expr, &RESIDUAL_RULES)?;
let opt_ctx = OptimizerContext::new(ctx.clone(), metadata.clone())
.with_enable_distributed_optimization(false)
.with_enable_join_reorder(unsafe { !ctx.get_settings().get_disable_join_reorder()? })
.with_enable_dphyp(ctx.get_settings().get_enable_dphyp()?);

s_expr = optimize_query(opt_ctx, s_expr.clone())?;

// Create `input_expr` pipeline and execute it to get `_row_id` data block.
let select_interpreter = SelectInterpreter::try_create(
ctx.clone(),
*bind_context,
expr,
s_expr,
metadata.clone(),
None,
false,
Expand Down
9 changes: 2 additions & 7 deletions src/query/sql/src/planner/binder/ddl/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_ast::ast::CreateIndexStmt;
use common_ast::ast::DropIndexStmt;
use common_ast::ast::ExplainKind;
Expand All @@ -38,7 +36,6 @@ use storages_common_table_meta::meta::Location;

use crate::binder::Binder;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerConfig;
use crate::optimizer::OptimizerContext;
use crate::plans::CreateIndexPlan;
use crate::plans::DropIndexPlan;
Expand Down Expand Up @@ -289,10 +286,8 @@ impl Binder {
bind_context.planning_agg_index = true;
let plan = if let Statement::Query(_) = &stmt {
let select_plan = self.bind_statement(bind_context, &stmt).await?;
let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig {
enable_distributed_optimization: !self.ctx.get_cluster().is_empty(),
}));
Ok(optimize(self.ctx.clone(), opt_ctx, select_plan)?)
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone());
Ok(optimize(opt_ctx, select_plan)?)
} else {
Err(ErrorCode::UnsupportedIndex("statement is not query"))
};
Expand Down
5 changes: 2 additions & 3 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ use crate::binder::Binder;
use crate::binder::ColumnBindingBuilder;
use crate::binder::Visibility;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerConfig;
use crate::optimizer::OptimizerContext;
use crate::parse_computed_expr_to_string;
use crate::parse_default_expr_to_string;
Expand Down Expand Up @@ -632,8 +631,8 @@ impl Binder {
let stmt = Statement::Query(Box::new(*query.clone()));
let select_plan = self.bind_statement(&mut bind_context, &stmt).await?;
// Don't enable distributed optimization for `CREATE TABLE ... AS SELECT ...` for now
let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig::default()));
let optimized_plan = optimize(self.ctx.clone(), opt_ctx, select_plan)?;
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone());
let optimized_plan = optimize(opt_ctx, select_plan)?;
Some(Box::new(optimized_plan))
} else {
None
Expand Down
8 changes: 3 additions & 5 deletions src/query/sql/src/planner/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use common_meta_app::principal::OnErrorMode;
use crate::binder::Binder;
use crate::normalize_identifier;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerConfig;
use crate::optimizer::OptimizerContext;
use crate::plans::CopyIntoTableMode;
use crate::plans::Insert;
Expand Down Expand Up @@ -150,9 +149,8 @@ impl Binder {
InsertSource::Select { query } => {
let statement = Statement::Query(query);
let select_plan = self.bind_statement(bind_context, &statement).await?;
let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig {
enable_distributed_optimization: !self.ctx.get_cluster().is_empty(),
}));
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone())
.with_enable_distributed_optimization(!self.ctx.get_cluster().is_empty());

if let Plan::Query { s_expr, .. } = &select_plan {
if !self.check_sexpr_top(s_expr)? {
Expand All @@ -162,7 +160,7 @@ impl Binder {
}
}

let optimized_plan = optimize(self.ctx.clone(), opt_ctx, select_plan)?;
let optimized_plan = optimize(opt_ctx, select_plan)?;
Ok(InsertInputSource::SelectPlan(Box::new(optimized_plan)))
}
};
Expand Down
9 changes: 3 additions & 6 deletions src/query/sql/src/planner/binder/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use common_meta_app::principal::OnErrorMode;
use crate::binder::Binder;
use crate::normalize_identifier;
use crate::optimizer::optimize;
use crate::optimizer::OptimizerConfig;
use crate::optimizer::OptimizerContext;
use crate::plans::CopyIntoTableMode;
use crate::plans::InsertInputSource;
Expand Down Expand Up @@ -144,11 +143,9 @@ impl Binder {
));
}
}
let enable_distributed_optimization = false;
let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig {
enable_distributed_optimization,
}));
let optimized_plan = optimize(self.ctx.clone(), opt_ctx, select_plan)?;
let opt_ctx = OptimizerContext::new(self.ctx.clone(), self.metadata.clone())
.with_enable_distributed_optimization(false);
let optimized_plan = optimize(opt_ctx, select_plan)?;
Ok(InsertInputSource::SelectPlan(Box::new(optimized_plan)))
}
};
Expand Down
8 changes: 3 additions & 5 deletions src/query/sql/src/planner/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use common_expression::DataSchemaRef;
use parking_lot::RwLock;

use crate::optimizer::optimize;
use crate::optimizer::OptimizerConfig;
use crate::optimizer::OptimizerContext;
use crate::planner::optimizer::s_expr::SExpr;
use crate::plans::Limit;
Expand Down Expand Up @@ -553,10 +552,9 @@ impl Dataframe {
ignore_result: false,
formatted_ast: None,
};
let opt_ctx = Arc::new(OptimizerContext::new(OptimizerConfig {
enable_distributed_optimization,
}));
optimize(self.query_ctx, opt_ctx, plan)
let opt_ctx = OptimizerContext::new(self.query_ctx.clone(), self.binder.metadata.clone())
.with_enable_distributed_optimization(enable_distributed_optimization);
optimize(opt_ctx, plan)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use common_expression::types::DataType;
use crate::binder::ColumnBindingBuilder;
use crate::binder::JoinPredicate;
use crate::binder::Visibility;
use crate::optimizer::heuristic::subquery_rewriter::FlattenInfo;
use crate::optimizer::heuristic::subquery_rewriter::SubqueryRewriter;
use crate::optimizer::heuristic::subquery_rewriter::UnnestResult;
use crate::optimizer::decorrelate::subquery_rewriter::FlattenInfo;
use crate::optimizer::decorrelate::subquery_rewriter::SubqueryRewriter;
use crate::optimizer::decorrelate::subquery_rewriter::UnnestResult;
use crate::optimizer::ColumnSet;
use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use common_expression::types::DataType;

use crate::binder::ColumnBindingBuilder;
use crate::binder::Visibility;
use crate::optimizer::heuristic::subquery_rewriter::FlattenInfo;
use crate::optimizer::decorrelate::subquery_rewriter::FlattenInfo;
use crate::optimizer::ColumnSet;
use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@
mod decorrelate;
mod flatten_plan;
mod flatten_scalar;
mod subquery_rewriter;

pub use decorrelate::decorrelate_subquery;
pub use subquery_rewriter::FlattenInfo;
pub use subquery_rewriter::SubqueryRewriter;
1 change: 1 addition & 0 deletions src/query/sql/src/planner/optimizer/heuristic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod decorrelate;
mod heuristic;
mod subquery_rewriter;

pub use decorrelate::decorrelate_subquery;
pub use heuristic::HeuristicOptimizer;
pub use heuristic::DEFAULT_REWRITE_RULES;
pub use heuristic::RESIDUAL_RULES;
Expand Down
14 changes: 7 additions & 7 deletions src/query/sql/src/planner/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ mod cost;
mod distributed;
mod format;
mod group;
mod heuristic;
// mod heuristic;
mod decorrelate;
mod hyper_dp;
mod m_expr;
mod memo;
Expand All @@ -30,16 +31,13 @@ pub mod s_expr;
mod util;

pub use cascades::CascadesOptimizer;
pub use heuristic::FlattenInfo;
pub use heuristic::HeuristicOptimizer;
pub use heuristic::SubqueryRewriter;
pub use heuristic::DEFAULT_REWRITE_RULES;
pub use heuristic::RESIDUAL_RULES;
pub use decorrelate::FlattenInfo;
pub use decorrelate::SubqueryRewriter;
pub use hyper_dp::DPhpy;
pub use m_expr::MExpr;
pub use memo::Memo;
pub use optimizer::optimize;
pub use optimizer::OptimizerConfig;
pub use optimizer::optimize_query;
pub use optimizer::OptimizerContext;
pub use pattern_extractor::PatternExtractor;
pub use property::*;
Expand All @@ -48,5 +46,7 @@ pub use rule::try_push_down_filter_join;
pub use rule::RuleFactory;
pub use rule::RuleID;
pub use rule::RuleSet;
pub use rule::DEFAULT_REWRITE_RULES;
pub use rule::RESIDUAL_RULES;
pub use s_expr::get_udf_names;
pub use s_expr::SExpr;
Loading

0 comments on commit 131a8d2

Please sign in to comment.