From 33051f014b4eaf90e46add27ae3044d97ddae0c1 Mon Sep 17 00:00:00 2001 From: Sky Fan <3374614481@qq.com> Date: Fri, 3 Nov 2023 23:29:56 +0800 Subject: [PATCH] feat: partition by left most cluster key when building merge into filter (#13547) * chore: support collect statistics of multi join expr in merge into * partition by first join condition * fix filter * update * fix clippy * fix * fix bind * fix and add log * fix stackoverflow --- .../interpreters/interpreter_merge_into.rs | 158 ++++++++++++++++-- .../sql/src/planner/expression_parser.rs | 17 +- .../sql/src/planner/plans/scalar_expr.rs | 29 ++++ 3 files changed, 183 insertions(+), 21 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 4618cde7a930..a4623664ec09 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -17,6 +17,9 @@ use std::sync::Arc; use std::time::Instant; use std::u64::MAX; +use common_ast::parser::parse_comma_separated_exprs; +use common_ast::parser::tokenize_sql; +use common_ast::Dialect; use common_catalog::lock::Lock; use common_catalog::table::TableExt; use common_exception::ErrorCode; @@ -30,6 +33,7 @@ use common_expression::SendableDataBlockStream; use common_expression::ROW_NUMBER_COL_NAME; use common_functions::BUILTIN_FUNCTIONS; use common_meta_app::schema::TableInfo; +use common_sql::bind_one_table; use common_sql::executor::CommitSink; use common_sql::executor::Exchange; use common_sql::executor::FragmentKind::Merge; @@ -54,16 +58,20 @@ use common_sql::plans::RelOperator; use common_sql::plans::ScalarItem; use common_sql::plans::UpdatePlan; use common_sql::BindContext; +use common_sql::ColumnBinding; use common_sql::ColumnBindingBuilder; use common_sql::IndexType; use common_sql::MetadataRef; +use common_sql::NameResolutionContext; use common_sql::ScalarExpr; use common_sql::TypeCheck; +use common_sql::TypeChecker; use common_sql::Visibility; use common_storages_factory::Table; use common_storages_fuse::FuseTable; use common_storages_fuse::TableContext; use itertools::Itertools; +use log::info; use storages_common_locks::LockManager; use storages_common_table_meta::meta::TableSnapshot; use tokio_stream::StreamExt; @@ -111,6 +119,20 @@ impl MergeStyleJoin<'_> { target_sexpr, } } + + pub fn collect_column_map(&self) -> HashMap { + let mut column_map = HashMap::new(); + for (t, s) in self + .target_conditions + .iter() + .zip(self.source_conditions.iter()) + { + if let (ScalarExpr::BoundColumnRef(t_col), ScalarExpr::BoundColumnRef(s_col)) = (t, s) { + column_map.insert(t_col.column.column_name.clone(), s_col.column.clone()); + } + } + column_map + } } impl MergeIntoInterpreter { @@ -196,7 +218,7 @@ impl MergeIntoInterpreter { }; let optimized_input = self - .build_static_filter(&input, meta_data, self.ctx.clone()) + .build_static_filter(&input, meta_data, self.ctx.clone(), check_table) .await?; let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false); @@ -454,6 +476,7 @@ impl MergeIntoInterpreter { join: &SExpr, metadata: &MetadataRef, ctx: Arc, + table: Arc, ) -> Result> { // 1. collect statistics from the source side // plan of source table is extended to: @@ -469,9 +492,71 @@ impl MergeIntoInterpreter { let mut eval_scalar_items = Vec::with_capacity(m_join.source_conditions.len()); let mut min_max_binding = Vec::with_capacity(m_join.source_conditions.len() * 2); let mut min_max_scalar_items = Vec::with_capacity(m_join.source_conditions.len() * 2); + let mut group_items = vec![]; if m_join.source_conditions.is_empty() { return Ok(Box::new(join.clone())); } + let column_map = m_join.collect_column_map(); + let fuse_table = + table + .as_any() + .downcast_ref::() + .ok_or(ErrorCode::Unimplemented(format!( + "table {}, engine type {}, does not support MERGE INTO", + table.name(), + table.get_table_info().engine(), + )))?; + let mut group_exprs = vec![]; + if let Some(cluster_key_str) = fuse_table.cluster_key_str() { + let sql_dialect = Dialect::MySQL; + let tokens = tokenize_sql(cluster_key_str)?; + let ast_exprs = parse_comma_separated_exprs(&tokens, sql_dialect)?; + let (mut bind_context, metadata) = bind_one_table(table)?; + if !ast_exprs.is_empty() { + let ast_expr = &ast_exprs[0]; + let name_resolution_ctx = + NameResolutionContext::try_from(ctx.get_settings().as_ref())?; + let mut type_checker = TypeChecker::new( + &mut bind_context, + ctx.clone(), + &name_resolution_ctx, + metadata.clone(), + &[], + false, + false, + ); + let (scalar_expr, _) = *type_checker.resolve(ast_expr).await?; + let projected = scalar_expr.try_project_column_binding(|binding| { + column_map.get(&binding.column_name).cloned() + }); + if let Some(p) = projected { + group_exprs.push(p); + } + } + } + for group_expr in group_exprs { + let index = metadata + .write() + .add_derived_column("".to_string(), group_expr.data_type()?); + let evaled = ScalarExpr::BoundColumnRef(BoundColumnRef { + span: None, + column: ColumnBindingBuilder::new( + "".to_string(), + index, + Box::new(group_expr.data_type()?), + Visibility::Visible, + ) + .build(), + }); + eval_scalar_items.push(ScalarItem { + scalar: group_expr.clone(), + index, + }); + group_items.push(ScalarItem { + scalar: evaled.clone(), + index, + }); + } for source_side_expr in m_join.source_conditions { // eval source side join expr let index = metadata @@ -543,8 +628,6 @@ impl MergeIntoInterpreter { min_max_scalar_items.push(max); } - let group_item = eval_scalar_items[0].clone(); - let eval_source_side_join_expr_op = EvalScalar { items: eval_scalar_items, }; @@ -568,7 +651,7 @@ impl MergeIntoInterpreter { let agg_partial_op = Aggregate { mode: AggregateMode::Partial, - group_items: vec![group_item.clone()], + group_items: group_items.clone(), aggregate_functions: min_max_scalar_items.clone(), from_distinct: false, limit: None, @@ -580,7 +663,7 @@ impl MergeIntoInterpreter { ); let agg_final_op = Aggregate { mode: AggregateMode::Final, - group_items: vec![group_item], + group_items, aggregate_functions: min_max_scalar_items, from_distinct: false, limit: None, @@ -643,16 +726,7 @@ impl MergeIntoInterpreter { filter_parts.push(and); } } - let mut or = filter_parts[0].clone(); - for filter_part in filter_parts.iter().skip(1) { - or = ScalarExpr::FunctionCall(FunctionCall { - span: None, - func_name: "or".to_string(), - params: vec![], - arguments: vec![or, filter_part.clone()], - }); - } - filters.push(or); + filters.extend(Self::combine_filter_parts(&filter_parts).into_iter()); } let mut target_plan = m_join.target_sexpr.clone(); Self::push_down_filters(&mut target_plan, &filters)?; @@ -661,10 +735,64 @@ impl MergeIntoInterpreter { Ok(Box::new(new_sexpr)) } + fn combine_filter_parts(filter_parts: &[ScalarExpr]) -> Option { + match filter_parts.len() { + 0 => None, + 1 => Some(filter_parts[0].clone()), + _ => { + let mid = filter_parts.len() / 2; + let left = Self::combine_filter_parts(&filter_parts[0..mid]); + let right = Self::combine_filter_parts(&filter_parts[mid..]); + if let Some(left) = left { + if let Some(right) = right { + Some(ScalarExpr::FunctionCall(FunctionCall { + span: None, + func_name: "or".to_string(), + params: vec![], + arguments: vec![left, right], + })) + } else { + Some(left) + } + } else { + right + } + } + } + } + + fn display_scalar_expr(s: &ScalarExpr) -> String { + match s { + ScalarExpr::BoundColumnRef(x) => x.column.column_name.clone(), + ScalarExpr::ConstantExpr(x) => x.value.to_string(), + ScalarExpr::WindowFunction(x) => format!("{:?}", x), + ScalarExpr::AggregateFunction(x) => format!("{:?}", x), + ScalarExpr::LambdaFunction(x) => format!("{:?}", x), + ScalarExpr::FunctionCall(x) => match x.func_name.as_str() { + "and" | "or" | "gte" | "lte" => { + format!( + "({} {} {})", + Self::display_scalar_expr(&x.arguments[0]), + x.func_name, + Self::display_scalar_expr(&x.arguments[1]) + ) + } + _ => format!("{:?}", x), + }, + ScalarExpr::CastExpr(x) => format!("{:?}", x), + ScalarExpr::SubqueryExpr(x) => format!("{:?}", x), + ScalarExpr::UDFServerCall(x) => format!("{:?}", x), + } + } + fn push_down_filters(s_expr: &mut SExpr, filters: &[ScalarExpr]) -> Result<()> { match s_expr.plan() { RelOperator::Scan(s) => { let mut new_scan = s.clone(); + info!("push down {} filters:", filters.len()); + for filter in filters { + info!("{}", Self::display_scalar_expr(filter)); + } if let Some(preds) = new_scan.push_down_predicates { new_scan.push_down_predicates = Some(preds.iter().chain(filters).cloned().collect()); diff --git a/src/query/sql/src/planner/expression_parser.rs b/src/query/sql/src/planner/expression_parser.rs index 545cf7c28df6..202f044dd50e 100644 --- a/src/query/sql/src/planner/expression_parser.rs +++ b/src/query/sql/src/planner/expression_parser.rs @@ -56,15 +56,11 @@ use crate::BaseTableColumn; use crate::ColumnEntry; use crate::IdentifierNormalizer; use crate::Metadata; +use crate::MetadataRef; use crate::ScalarExpr; use crate::Visibility; -pub fn parse_exprs( - ctx: Arc, - table_meta: Arc, - sql: &str, -) -> Result> { - let settings = Settings::create("".to_string()); +pub fn bind_one_table(table_meta: Arc) -> Result<(BindContext, MetadataRef)> { let mut bind_context = BindContext::new(); let metadata = Arc::new(RwLock::new(Metadata::default())); let table_index = metadata.write().add_table( @@ -111,7 +107,16 @@ pub fn parse_exprs( bind_context.add_column_binding(column_binding); } + Ok((bind_context, metadata)) +} +pub fn parse_exprs( + ctx: Arc, + table_meta: Arc, + sql: &str, +) -> Result> { + let (mut bind_context, metadata) = bind_one_table(table_meta)?; + let settings = Settings::create("".to_string()); let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; let mut type_checker = TypeChecker::new( &mut bind_context, diff --git a/src/query/sql/src/planner/plans/scalar_expr.rs b/src/query/sql/src/planner/plans/scalar_expr.rs index 75c5ab411b7a..2d894afdae37 100644 --- a/src/query/sql/src/planner/plans/scalar_expr.rs +++ b/src/query/sql/src/planner/plans/scalar_expr.rs @@ -180,6 +180,35 @@ impl ScalarExpr { ScalarExpr::CastExpr(expr) => expr.argument.evaluable(), } } + + pub fn try_project_column_binding( + &self, + f: impl Fn(&ColumnBinding) -> Option + Copy, + ) -> Option { + match self { + ScalarExpr::BoundColumnRef(expr) => f(&expr.column).map(|x| { + ScalarExpr::BoundColumnRef(BoundColumnRef { + span: None, + column: x, + }) + }), + ScalarExpr::FunctionCall(expr) => { + // Any of the arguments return None, then return None + let arguments = expr + .arguments + .iter() + .map(|x| x.try_project_column_binding(f)) + .collect::>>()?; + Some(ScalarExpr::FunctionCall(FunctionCall { + span: None, + func_name: expr.func_name.clone(), + params: expr.params.clone(), + arguments, + })) + } + _ => None, + } + } } impl From for ScalarExpr {