From 10773eeb4d01ca36d322b95adcf595c49e82ca0f Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 7 Dec 2023 18:16:51 +0800 Subject: [PATCH 01/32] add optimize comment --- src/query/sql/src/planner/optimizer/optimizer.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 59ef8b0db8f7..1711fef2594f 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -127,6 +127,13 @@ pub fn optimize( Ok(Plan::CopyIntoTable(plan)) } Plan::MergeInto(plan) => { + // before, we think source table is always the small table. + // 1. for matched only, we use inner join + // 2. for insert only, we use right anti join + // 3. for full merge into, we use right outer join + // for now, let's import the statistic info to determine + // left join or outer join + // optimize source :fix issue #13733 // reason: if there is subquery,windowfunc exprs etc. see // src/planner/semantic/lowering.rs `as_raw_expr()`, we will From fe324c07d2f144ec6888681bd3b6c3f58db0b32f Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Tue, 12 Dec 2023 17:01:24 +0800 Subject: [PATCH 02/32] add join optimizer top rule --- .../sql/src/planner/optimizer/optimizer.rs | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 1711fef2594f..0dc4f1f9e0fb 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -24,7 +24,9 @@ use log::info; use super::cost::CostContext; use super::distributed::MergeSourceOptimizer; use super::format::display_memo; +use super::rule::TransformResult; use super::Memo; +use super::RuleFactory; use crate::optimizer::cascades::CascadesOptimizer; use crate::optimizer::distributed::optimize_distributed_query; use crate::optimizer::hyper_dp::DPhpy; @@ -127,13 +129,6 @@ pub fn optimize( Ok(Plan::CopyIntoTable(plan)) } Plan::MergeInto(plan) => { - // before, we think source table is always the small table. - // 1. for matched only, we use inner join - // 2. for insert only, we use right anti join - // 3. for full merge into, we use right outer join - // for now, let's import the statistic info to determine - // left join or outer join - // optimize source :fix issue #13733 // reason: if there is subquery,windowfunc exprs etc. see // src/planner/semantic/lowering.rs `as_raw_expr()`, we will @@ -163,6 +158,19 @@ pub fn optimize( Arc::new(right_source), ])); + // before, we think source table is always the small table. + // 1. for matched only, we use inner join + // 2. for insert only, we use right anti join + // 3. for full merge into, we use right outer join + // for now, let's import the statistic info to determine left join or outer join + // we just do optimization for the top join (target and source),won't do recursive optimize + let rule = RuleFactory::create_rule(RuleID::CommuteJoin, plan.meta_data.clone())?; + let mut state = TransformResult::new(); + // we will reorder the join order according to the cardinality of target and source. + rule.apply(&join_sexpr, &mut state)?; + assert_eq!(state.results().len(), 1); + join_sexpr = Box::new(state.results()[0].clone()); + // try to optimize distributed join if opt_ctx.config.enable_distributed_optimization && ctx.get_settings().get_enable_distributed_merge_into()? @@ -236,7 +244,10 @@ pub fn optimize_query( if unsafe { ctx.get_settings().get_disable_join_reorder()? } { return heuristic.optimize_expression(&result, &[RuleID::EliminateEvalScalar]); } - heuristic.optimize_expression(&result, &RESIDUAL_RULES) + println!("result: {:?}\n", result); + let result = heuristic.optimize_expression(&result, &RESIDUAL_RULES)?; + println!("result2: {:?}\n", result); + Ok(result) } // TODO(leiysky): reuse the optimization logic with `optimize_query` From 07d5bac247067f1130b96337bdb0ccd37b8ebf86 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Tue, 12 Dec 2023 20:46:36 +0800 Subject: [PATCH 03/32] add distributed optmize rule --- .../distributed/distributed_merge.rs | 46 +++++++++++++++---- .../sql/src/planner/optimizer/optimizer.rs | 14 +++--- 2 files changed, 44 insertions(+), 16 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs b/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs index 5fbb3537ea2e..3943687136e2 100644 --- a/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs +++ b/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs @@ -34,13 +34,17 @@ impl MergeSourceOptimizer { } } - // rewrite plan: for now, we use right join, and the default + // rewrite plan: + // 1. if use right join, and the default // distributed right join will use shuffle hash join, but its // performance is very slow and poor. So we need to rewrite it. // In new distributed plan, target partitions will be shuffled // to query nodes, and source will be broadcasted to all nodes // and build hashtable. It means all nodes hold the same hashtable. - pub fn optimize(&self, s_expr: &SExpr) -> Result { + // 2. if use left outer join, we will broadcast target table(target + // table is build side), and source is probe side, the source will + // be distributed to nodes randomly. + pub fn optimize(&self, s_expr: &SExpr, change_join_order: bool) -> Result { let join_s_expr = s_expr.child(0)?; let left_exchange = join_s_expr.child(0)?; @@ -50,17 +54,28 @@ impl MergeSourceOptimizer { let right_exchange = join_s_expr.child(1)?; assert!(right_exchange.children.len() == 1); let right_exchange_input = right_exchange.child(0)?; - - let new_join_children = vec![ - Arc::new(left_exchange_input.clone()), - Arc::new(SExpr::create_unary( - Arc::new(RelOperator::Exchange(Broadcast)), + // target is build side + let new_join_children = if change_join_order { + vec![ + Arc::new(left_exchange_input.clone()), Arc::new(SExpr::create_unary( - Arc::new(RelOperator::AddRowNumber(AddRowNumber)), + Arc::new(RelOperator::Exchange(Broadcast)), Arc::new(right_exchange_input.clone()), )), - )), - ]; + ] + } else { + // source is build side + vec![ + Arc::new(left_exchange_input.clone()), + Arc::new(SExpr::create_unary( + Arc::new(RelOperator::Exchange(Broadcast)), + Arc::new(SExpr::create_unary( + Arc::new(RelOperator::AddRowNumber(AddRowNumber)), + Arc::new(right_exchange_input.clone()), + )), + )), + ] + }; let mut join: Join = join_s_expr.plan().clone().try_into()?; join.need_hold_hash_table = true; @@ -79,6 +94,7 @@ impl MergeSourceOptimizer { // Exchange Exchange(Shuffle) // | | // * * + // if source is build we will get below: // Output: // Exchange // | @@ -88,6 +104,16 @@ impl MergeSourceOptimizer { // * Exchange(Broadcast) // | // AddRowNumber + // if target is build we will get below: + // Output: + // Exchange + // | + // Join + // / \ + // / \ + // / \ + // / \ + // * Exchange(Broadcast) SExpr::create_unary( Arc::new( PatternPlan { diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 0dc4f1f9e0fb..dec4ffe7c405 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -162,21 +162,22 @@ pub fn optimize( // 1. for matched only, we use inner join // 2. for insert only, we use right anti join // 3. for full merge into, we use right outer join - // for now, let's import the statistic info to determine left join or outer join - // we just do optimization for the top join (target and source),won't do recursive optimize + // for now, let's import the statistic info to determine left join or right join + // we just do optimization for the top join (target and source),won't do recursive optimization. let rule = RuleFactory::create_rule(RuleID::CommuteJoin, plan.meta_data.clone())?; let mut state = TransformResult::new(); // we will reorder the join order according to the cardinality of target and source. rule.apply(&join_sexpr, &mut state)?; assert_eq!(state.results().len(), 1); + // we need to check whether we do swap left and right. + let old_left = join_sexpr.child(0)?; + let new_left = state.results()[0].child(0)?; + let change_join_order = (old_left == new_left); join_sexpr = Box::new(state.results()[0].clone()); - // try to optimize distributed join if opt_ctx.config.enable_distributed_optimization && ctx.get_settings().get_enable_distributed_merge_into()? { - // Todo(JackTan25): We should use optimizer to make a decision to use - // left join and right join. // input is a Join_SExpr let merge_into_join_sexpr = optimize_distributed_query(ctx.clone(), &join_sexpr)?; // after optimize source, we need to add @@ -188,7 +189,8 @@ pub fn optimize( (merge_into_join_sexpr.clone(), false) } else { ( - merge_source_optimizer.optimize(&merge_into_join_sexpr)?, + merge_source_optimizer + .optimize(&merge_into_join_sexpr, change_join_order)?, true, ) }; From 782546aace8e05bffdab1cac5cddb6b7cfbe0772 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Tue, 12 Dec 2023 21:03:06 +0800 Subject: [PATCH 04/32] add change join order flag --- src/query/service/src/interpreters/interpreter_merge_into.rs | 4 ++++ .../service/src/pipelines/builders/builder_merge_into.rs | 1 + .../sql/src/executor/physical_plans/physical_merge_into.rs | 2 ++ src/query/sql/src/planner/binder/merge_into.rs | 1 + src/query/sql/src/planner/optimizer/optimizer.rs | 4 +++- src/query/sql/src/planner/plans/merge_into.rs | 1 + 6 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index dd37d8bd57e1..372bedb4264a 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -149,6 +149,7 @@ impl MergeIntoInterpreter { field_index_map, merge_type, distributed, + change_join_order, .. } = &self.plan; @@ -386,6 +387,7 @@ impl MergeIntoInterpreter { distributed: false, output_schema: DataSchemaRef::default(), merge_type: merge_type.clone(), + change_join_order: *change_join_order, })) } else { let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto { @@ -402,6 +404,7 @@ impl MergeIntoInterpreter { join_output_schema.fields[row_number_idx.unwrap()].clone(), ])), merge_type: merge_type.clone(), + change_join_order: *change_join_order, })); PhysicalPlan::MergeIntoAppendNotMatched(Box::new(MergeIntoAppendNotMatched { @@ -418,6 +421,7 @@ impl MergeIntoInterpreter { unmatched: unmatched.clone(), input_schema: merge_into_source.output_schema()?, merge_type: merge_type.clone(), + change_join_order: *change_join_order, })) }; diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index 8b8ee2c2ce0a..c44240e02a82 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -97,6 +97,7 @@ impl PipelineBuilder { unmatched, input_schema, merge_type, + .. } = merge_into_append_not_macted; // in common case: receive row numbers and MutationLogs diff --git a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs index d817758ada36..af1a2db388c9 100644 --- a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs +++ b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs @@ -51,6 +51,7 @@ pub struct MergeInto { pub output_schema: DataSchemaRef, pub distributed: bool, pub merge_type: MergeIntoType, + pub change_join_order: bool, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] @@ -62,4 +63,5 @@ pub struct MergeIntoAppendNotMatched { pub unmatched: Vec<(DataSchemaRef, Option, Vec)>, pub input_schema: DataSchemaRef, pub merge_type: MergeIntoType, + pub change_join_order: bool, } diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index f855005aefe6..ab75d8641686 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -408,6 +408,7 @@ impl Binder { field_index_map, merge_type, distributed: false, + change_join_order: false, }) } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index dec4ffe7c405..8bd13eff4c7e 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -172,7 +172,7 @@ pub fn optimize( // we need to check whether we do swap left and right. let old_left = join_sexpr.child(0)?; let new_left = state.results()[0].child(0)?; - let change_join_order = (old_left == new_left); + let change_join_order = old_left == new_left; join_sexpr = Box::new(state.results()[0].clone()); // try to optimize distributed join if opt_ctx.config.enable_distributed_optimization @@ -198,11 +198,13 @@ pub fn optimize( Ok(Plan::MergeInto(Box::new(MergeInto { input: Box::new(optimized_distributed_merge_into_join_sexpr), distributed, + change_join_order, ..*plan }))) } else { Ok(Plan::MergeInto(Box::new(MergeInto { input: join_sexpr, + change_join_order, ..*plan }))) } diff --git a/src/query/sql/src/planner/plans/merge_into.rs b/src/query/sql/src/planner/plans/merge_into.rs index a553dc9fd613..9eb90a19df34 100644 --- a/src/query/sql/src/planner/plans/merge_into.rs +++ b/src/query/sql/src/planner/plans/merge_into.rs @@ -67,6 +67,7 @@ pub struct MergeInto { pub field_index_map: HashMap, pub merge_type: MergeIntoType, pub distributed: bool, + pub change_join_order: bool, } impl std::fmt::Debug for MergeInto { From 429b7b08df014a3f283ff8e88b264d0bbe3bc53f Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Wed, 13 Dec 2023 15:28:56 +0800 Subject: [PATCH 05/32] fix not append plan --- .../interpreters/interpreter_merge_into.rs | 12 +- .../pipelines/builders/builder_merge_into.rs | 299 +++++++++++------- .../physical_plans/physical_merge_into.rs | 1 + ...sor_merge_into_split_row_number_and_log.rs | 6 + 4 files changed, 197 insertions(+), 121 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 372bedb4264a..2b4453cef229 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -398,7 +398,7 @@ impl MergeIntoInterpreter { matched, field_index_of_input_schema, row_id_idx, - segments, + segments: segments.clone(), distributed: true, output_schema: DataSchemaRef::new(DataSchema::new(vec![ join_output_schema.fields[row_number_idx.unwrap()].clone(), @@ -406,7 +406,14 @@ impl MergeIntoInterpreter { merge_type: merge_type.clone(), change_join_order: *change_join_order, })); - + // if change_join_order = true, it means the target is build side, + // in this way, we will do matched operation and not matched operation + // locally in every node, and the main node just recieve rowids to apply. + let segments = if *change_join_order { + segments.clone() + } else { + vec![] + }; PhysicalPlan::MergeIntoAppendNotMatched(Box::new(MergeIntoAppendNotMatched { input: Box::new(PhysicalPlan::Exchange(Exchange { plan_id: 0, @@ -422,6 +429,7 @@ impl MergeIntoInterpreter { input_schema: merge_into_source.output_schema()?, merge_type: merge_type.clone(), change_join_order: *change_join_order, + segments, })) }; diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index c44240e02a82..83a4621ab272 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -86,6 +86,7 @@ impl PipelineBuilder { } // build merge into append not matched pipeline. + // it must be distributed merge into execution pub(crate) fn build_merge_into_append_not_matched( &mut self, merge_into_append_not_macted: &MergeIntoAppendNotMatched, @@ -97,149 +98,209 @@ impl PipelineBuilder { unmatched, input_schema, merge_type, + change_join_order, + segments, .. } = merge_into_append_not_macted; - // in common case: receive row numbers and MutationLogs + // there are two cases: + // 1. if source is build side (change_join_order = false). + // receive row numbers and MutationLogs, excatly below: + // 1.1 full operation: row numbers and MutationLogs + // 1.2 matched only: MutationLogs + // 1.3 insert only: row numbers and MutationLogs + // 2. if target table is build side (change_join_order = true). + // receive rowids and MutationLogs,excatly below: + // 2.1 full operation: rowids and MutationLogs + // 2.2 matched only: rowids and MutationLogs + // 2.3 insert only: MutationLogs self.build_pipeline(input)?; self.main_pipeline.try_resize(1)?; - // we will receive MutationLogs only without row_number. - if let MergeIntoType::MatechedOnly = merge_type { - return Ok(()); - } - - assert!(self.join_state.is_some()); - assert!(self.probe_data_fields.is_some()); - - let join_state = self.join_state.clone().unwrap(); - // split row_number and log - // output_port_row_number - // output_port_log - self.main_pipeline - .add_pipe(RowNumberAndLogSplitProcessor::create()?.into_pipe()); - - // accumulate source data which is not matched from hashstate - let pipe_items = vec![ - DeduplicateRowNumber::create()?.into_pipe_item(), - create_dummy_item(), - ]; - self.main_pipeline.add_pipe(Pipe::create(2, 2, pipe_items)); - - let pipe_items = vec![ - ExtractHashTableByRowNumber::create( - join_state, - self.probe_data_fields.clone().unwrap(), - merge_type.clone(), - )? - .into_pipe_item(), - create_dummy_item(), - ]; - self.main_pipeline.add_pipe(Pipe::create(2, 2, pipe_items)); - - // not macthed operation - let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create( - unmatched.clone(), - input_schema.clone(), - self.func_ctx.clone(), - self.ctx.clone(), - )?; - let pipe_items = vec![ - merge_into_not_matched_processor.into_pipe_item(), - create_dummy_item(), - ]; - self.main_pipeline.add_pipe(Pipe::create(2, 2, pipe_items)); - - // split row_number and log - // output_port_not_matched_data - // output_port_log - // start to append data let tbl = self .ctx .build_table_by_table_info(catalog_info, table_info, None)?; - // 1.fill default columns - let table_default_schema = &tbl.schema().remove_computed_fields(); - let mut builder = self.main_pipeline.add_transform_with_specified_len( - |transform_input_port, transform_output_port| { - TransformResortAddOnWithoutSourceSchema::try_create( - self.ctx.clone(), - transform_input_port, - transform_output_port, - Arc::new(DataSchema::from(table_default_schema)), - tbl.clone(), - ) - }, - 1, - )?; - builder.add_items(vec![create_dummy_item()]); - self.main_pipeline.add_pipe(builder.finalize()); + let table = FuseTable::try_from_table(tbl.as_ref())?; + // case 1 + if !*change_join_order { + if let MergeIntoType::MatechedOnly = merge_type { + // we will receive MutationLogs only without row_number. + return Ok(()); + } + assert!(self.join_state.is_some()); + assert!(self.probe_data_fields.is_some()); - // 2.fill computed columns - let table_computed_schema = &tbl.schema().remove_virtual_computed_fields(); - let default_schema: DataSchemaRef = Arc::new(table_default_schema.into()); - let computed_schema: DataSchemaRef = Arc::new(table_computed_schema.into()); - if default_schema != computed_schema { - builder = self.main_pipeline.add_transform_with_specified_len( + let join_state = self.join_state.clone().unwrap(); + // split row_number and log + // output_port_row_number + // output_port_log + self.main_pipeline + .add_pipe(RowNumberAndLogSplitProcessor::create()?.into_pipe()); + + // accumulate source data which is not matched from hashstate + let pipe_items = vec![ + DeduplicateRowNumber::create()?.into_pipe_item(), + create_dummy_item(), + ]; + self.main_pipeline.add_pipe(Pipe::create(2, 2, pipe_items)); + + let pipe_items = vec![ + ExtractHashTableByRowNumber::create( + join_state, + self.probe_data_fields.clone().unwrap(), + merge_type.clone(), + )? + .into_pipe_item(), + create_dummy_item(), + ]; + self.main_pipeline.add_pipe(Pipe::create(2, 2, pipe_items)); + + // not macthed operation + let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create( + unmatched.clone(), + input_schema.clone(), + self.func_ctx.clone(), + self.ctx.clone(), + )?; + let pipe_items = vec![ + merge_into_not_matched_processor.into_pipe_item(), + create_dummy_item(), + ]; + self.main_pipeline.add_pipe(Pipe::create(2, 2, pipe_items)); + + // split row_number and log + // output_port_not_matched_data + // output_port_log + // start to append data + + // 1.fill default columns + let table_default_schema = &tbl.schema().remove_computed_fields(); + let mut builder = self.main_pipeline.add_transform_with_specified_len( |transform_input_port, transform_output_port| { - TransformAddComputedColumns::try_create( + TransformResortAddOnWithoutSourceSchema::try_create( self.ctx.clone(), transform_input_port, transform_output_port, - default_schema.clone(), - computed_schema.clone(), + Arc::new(DataSchema::from(table_default_schema)), + tbl.clone(), ) }, 1, )?; builder.add_items(vec![create_dummy_item()]); self.main_pipeline.add_pipe(builder.finalize()); - } - // 3. cluster sort - let table = FuseTable::try_from_table(tbl.as_ref())?; - let block_thresholds = table.get_block_thresholds(); - table.cluster_gen_for_append_with_specified_len( - self.ctx.clone(), - &mut self.main_pipeline, - block_thresholds, - 1, - 1, - )?; + // 2.fill computed columns + let table_computed_schema = &tbl.schema().remove_virtual_computed_fields(); + let default_schema: DataSchemaRef = Arc::new(table_default_schema.into()); + let computed_schema: DataSchemaRef = Arc::new(table_computed_schema.into()); + if default_schema != computed_schema { + builder = self.main_pipeline.add_transform_with_specified_len( + |transform_input_port, transform_output_port| { + TransformAddComputedColumns::try_create( + self.ctx.clone(), + transform_input_port, + transform_output_port, + default_schema.clone(), + computed_schema.clone(), + ) + }, + 1, + )?; + builder.add_items(vec![create_dummy_item()]); + self.main_pipeline.add_pipe(builder.finalize()); + } - // 4. serialize block - let cluster_stats_gen = - table.get_cluster_stats_gen(self.ctx.clone(), 0, block_thresholds, None)?; - let serialize_block_transform = TransformSerializeBlock::try_create( - self.ctx.clone(), - InputPort::create(), - OutputPort::create(), - table, - cluster_stats_gen, - MutationKind::MergeInto, - )?; + // 3. cluster sort - let pipe_items = vec![ - serialize_block_transform.into_pipe_item(), - create_dummy_item(), - ]; - self.main_pipeline.add_pipe(Pipe::create(2, 2, pipe_items)); + let block_thresholds = table.get_block_thresholds(); + table.cluster_gen_for_append_with_specified_len( + self.ctx.clone(), + &mut self.main_pipeline, + block_thresholds, + 1, + 1, + )?; - // 5. serialize segment - let serialize_segment_transform = TransformSerializeSegment::new( - self.ctx.clone(), - InputPort::create(), - OutputPort::create(), - table, - block_thresholds, - ); - let pipe_items = vec![ - serialize_segment_transform.into_pipe_item(), - create_dummy_item(), - ]; - self.main_pipeline.add_pipe(Pipe::create(2, 2, pipe_items)); + // 4. serialize block + let cluster_stats_gen = + table.get_cluster_stats_gen(self.ctx.clone(), 0, block_thresholds, None)?; + let serialize_block_transform = TransformSerializeBlock::try_create( + self.ctx.clone(), + InputPort::create(), + OutputPort::create(), + table, + cluster_stats_gen, + MutationKind::MergeInto, + )?; - // resize to one, because they are all mutation logs now. - self.main_pipeline.try_resize(1)?; + let pipe_items = vec![ + serialize_block_transform.into_pipe_item(), + create_dummy_item(), + ]; + self.main_pipeline.add_pipe(Pipe::create(2, 2, pipe_items)); + + // 5. serialize segment + let serialize_segment_transform = TransformSerializeSegment::new( + self.ctx.clone(), + InputPort::create(), + OutputPort::create(), + table, + block_thresholds, + ); + let pipe_items = vec![ + serialize_segment_transform.into_pipe_item(), + create_dummy_item(), + ]; + self.main_pipeline.add_pipe(Pipe::create(2, 2, pipe_items)); + + // resize to one, because they are all mutation logs now. + self.main_pipeline.try_resize(1)?; + } else { + // case 2 + if let MergeIntoType::InsertOnly = merge_type { + // we will receive MutationLogs only without rowids. + return Ok(()); + } + // we will recieve MutationLogs and rowids. So we should apply + // rowids firstly and then send all mutation logs to commit sink. + // we need to spilt rowid and mutationlogs, and we can get pipeitems: + // 1.rowid_port + // 2.logs_port + self.main_pipeline + .add_pipe(RowNumberAndLogSplitProcessor::create()?.into_pipe()); + let cluster_stats_gen = table.get_cluster_stats_gen( + self.ctx.clone(), + 0, + table.get_block_thresholds(), + None, + )?; + let block_builder = TransformSerializeBlock::try_create( + self.ctx.clone(), + InputPort::create(), + OutputPort::create(), + table, + cluster_stats_gen.clone(), + MutationKind::MergeInto, + )? + .get_block_builder(); + let max_threads = self.settings.get_max_threads()?; + let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize)); + // MutationsLogs port0 + // MutationsLogs port1 + assert_eq!(self.main_pipeline.output_len(), 2); + self.main_pipeline.add_pipe(Pipe::create(2, 2, vec![ + table.rowid_aggregate_mutator( + self.ctx.clone(), + block_builder, + io_request_semaphore, + segments.clone(), + )?, + create_dummy_item(), + ])); + assert_eq!(self.main_pipeline.output_len(), 2); + self.main_pipeline.try_resize(1)?; + } Ok(()) } @@ -736,7 +797,7 @@ impl PipelineBuilder { // output_port1: MutationLogs // output_port2: row_numbers // 1.for matched only, there are no row_numbers - // 2.for unmatched only/insert only, there is no output_port0. + // 2.for insert only, there is no output_port0. self.main_pipeline.add_pipe(Pipe::create( self.main_pipeline.output_len(), get_output_len(&pipe_items), diff --git a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs index af1a2db388c9..b2c6a0d16063 100644 --- a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs +++ b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs @@ -64,4 +64,5 @@ pub struct MergeIntoAppendNotMatched { pub input_schema: DataSchemaRef, pub merge_type: MergeIntoType, pub change_join_order: bool, + pub segments: Vec<(usize, Location)>, } diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs index 53e260a4af9d..4ca785e0342b 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs @@ -67,6 +67,8 @@ impl RowNumberAndLogSplitProcessor { } } +// we will also use RowNumberAndLogSplitProcessor to +// split rowids and logs although it's named with 'RowNumber' impl Processor for RowNumberAndLogSplitProcessor { fn name(&self) -> String { "RowNumberAndLogSplit".to_owned() @@ -132,6 +134,7 @@ impl Processor for RowNumberAndLogSplitProcessor { fn process(&mut self) -> Result<()> { if let Some(data_block) = self.input_data.take() { // all matched or logs + // if it's rowid, the meta will be none if data_block.get_meta().is_some() { if SourceFullMatched::downcast_ref_from(data_block.get_meta().unwrap()).is_some() { self.output_data_row_number = Some(data_block) @@ -140,6 +143,9 @@ impl Processor for RowNumberAndLogSplitProcessor { self.output_data_log = Some(data_block); } } else { + // when we use source as probe side and do distributed + // execution,it could be rowid but we use output_data_row_number. + // it doesn't matter. self.output_data_row_number = Some(data_block) } } From 78ae91f6f2fe2f0cde23e58847e0af71f50e4f89 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Wed, 13 Dec 2023 22:52:18 +0800 Subject: [PATCH 06/32] fix typo --- .../service/src/interpreters/interpreter_merge_into.rs | 2 +- .../service/src/pipelines/builders/builder_merge_into.rs | 6 +++--- src/query/sql/src/planner/plans/merge_into.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 2b4453cef229..3ab5d57b24c4 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -408,7 +408,7 @@ impl MergeIntoInterpreter { })); // if change_join_order = true, it means the target is build side, // in this way, we will do matched operation and not matched operation - // locally in every node, and the main node just recieve rowids to apply. + // locally in every node, and the main node just receive rowids to apply. let segments = if *change_join_order { segments.clone() } else { diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index 83a4621ab272..9134fbd06bdb 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -105,12 +105,12 @@ impl PipelineBuilder { // there are two cases: // 1. if source is build side (change_join_order = false). - // receive row numbers and MutationLogs, excatly below: + // receive row numbers and MutationLogs, exactly below: // 1.1 full operation: row numbers and MutationLogs // 1.2 matched only: MutationLogs // 1.3 insert only: row numbers and MutationLogs // 2. if target table is build side (change_join_order = true). - // receive rowids and MutationLogs,excatly below: + // receive rowids and MutationLogs,exactly below: // 2.1 full operation: rowids and MutationLogs // 2.2 matched only: rowids and MutationLogs // 2.3 insert only: MutationLogs @@ -262,7 +262,7 @@ impl PipelineBuilder { // we will receive MutationLogs only without rowids. return Ok(()); } - // we will recieve MutationLogs and rowids. So we should apply + // we will receive MutationLogs and rowids. So we should apply // rowids firstly and then send all mutation logs to commit sink. // we need to spilt rowid and mutationlogs, and we can get pipeitems: // 1.rowid_port diff --git a/src/query/sql/src/planner/plans/merge_into.rs b/src/query/sql/src/planner/plans/merge_into.rs index 9eb90a19df34..05c7893f4dae 100644 --- a/src/query/sql/src/planner/plans/merge_into.rs +++ b/src/query/sql/src/planner/plans/merge_into.rs @@ -85,7 +85,7 @@ impl std::fmt::Debug for MergeInto { } } -pub const INSERT_NAME: &str = "number of rows insertd"; +pub const INSERT_NAME: &str = "number of rows inserted"; pub const UPDTAE_NAME: &str = "number of rows updated"; pub const DELETE_NAME: &str = "number of rows deleted"; From 93d259df4b5ca9c0932526e89fe526e23643bda4 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 13:46:41 +0800 Subject: [PATCH 07/32] add more cluster test --- .../pipelines/builders/builder_merge_into.rs | 7 ++- ...distributed_merge_into_without_enable.test | 10 ++-- ...34_pr13848_without_distributed_enable.test | 6 +- ...e_pipeline_without_distributed_enable.test | 60 ++++++++++++++++++- ...merge_into_without_distributed_enable.test | 52 ++++++++-------- 5 files changed, 99 insertions(+), 36 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index 8b8ee2c2ce0a..3abcbbaf2d00 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -624,7 +624,12 @@ impl PipelineBuilder { // with row_id and row_number (output_lens - 2, 1) } else { - // (with row_id and without row_number) or (without row_id and with row_number) + // I. (with row_id and without row_number) (need_match and !need_unmatch) + // II. (without row_id and with row_number) (!need_match and need_unmatch) + // in fact for II, it should be (output_lens-1,1), but in this case, the + // output_lens = 1, so it will be (0,1), and we just need to append a dummy_item. + // but we use (output_lens - 1, 0) instead of (output_lens-1,1), because they will + // arrive the same result (that's appending only one dummy item) (output_lens - 1, 0) }; table.cluster_gen_for_append_with_specified_len( diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0033_distributed_merge_into_without_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0033_distributed_merge_into_without_enable.test index 9116e5107178..4ec2b86f685c 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0033_distributed_merge_into_without_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0033_distributed_merge_into_without_enable.test @@ -10,7 +10,7 @@ statement ok drop table if exists distributed_source_test; statement ok -create table distributed_target_test(a int,b string); +create table distributed_target_test(a int,b string) cluster by(a,b); ## multi blocks statement ok @@ -42,7 +42,7 @@ select * from distributed_target_test order by a; 8 x statement ok -create table distributed_source_test(a int,b string,is_databend_deleted bool); +create table distributed_source_test(a int,b string,is_databend_deleted bool) cluster by(a,b); statement ok insert into distributed_source_test values(1,'d',true),(2,'e',true),(3,'f',false),(4,'e',true),(5,'f',false); @@ -82,13 +82,13 @@ statement ok drop table if exists corner_target_table; statement ok -create table corner_target_table(a int,b string,c string); +create table corner_target_table(a int,b string,c string) cluster by(a,b); statement ok drop table if exists corner_source_table; statement ok -create table corner_source_table(a int,b string,c string); +create table corner_source_table(a int,b string,c string) cluster by(a,b); ## add block1 statement ok @@ -164,7 +164,7 @@ statement ok merge into distributed_test_order as t using (select id,34 as id1,238 as id2, id3, id4, id5, id6, id7,s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12, s13,d1, d2, d3, d4, d5, d6, d7, d8, d9, d10,insert_time,insert_time1,insert_time2,insert_time3,i from distributed_random_store) as s on t.id = s.id and t.insert_time = s.insert_time when matched then update * when not matched then insert *; statement ok -create table orders2(a int,b string,c string); +create table orders2(a int,b string,c string) cluster by(a,b); statement ok insert into orders2 values(1,'a1','b1'),(2,'a2','b2'),(3,'a3','b3'); diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0034_pr13848_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0034_pr13848_without_distributed_enable.test index a8237fab04c2..1f78e89ce6b5 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0034_pr13848_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0034_pr13848_without_distributed_enable.test @@ -3,10 +3,10 @@ statement ok set enable_experimental_merge_into = 1; statement ok -create table merge_target_0(a int,b string); +create table merge_target_0(a int,b string) cluster by(a,b); statement ok -create table merge_source_0(a int,b string); +create table merge_source_0(a int,b string) cluster by(a,b); statement ok insert into merge_target_0 values(1,'a1'),(2,'b1'); @@ -87,7 +87,7 @@ select * from merge_target_0 order by a,b; ### test copy into table unsupport statement ok -create table copy_table_test0(a int,b string); +create table copy_table_test0(a int,b string) cluster by(a,b); statement ok create stage parquet_table0 FILE_FORMAT = (TYPE = PARQUET); diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0035_merge_into_separate_pipeline_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0035_merge_into_separate_pipeline_without_distributed_enable.test index 939de478f5c0..4ddcec87749a 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0035_merge_into_separate_pipeline_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0035_merge_into_separate_pipeline_without_distributed_enable.test @@ -8,7 +8,7 @@ statement ok drop table if exists t2_separate; statement ok -create table t1_separate(a int,b string, c string); +create table t1_separate(a int,b string, c string) cluster by(a,b); statement ok create table t2_separate(a int,b string, c string); @@ -92,5 +92,63 @@ select * from t1_separate order by a,b,c; 8 a8 b8 9 a9 b9 +## test insert-only cluster by +statement ok +truncate table t1_separate; + +statement ok +truncate table t2_separate; + +statement ok +insert into t2_separate values(8,'a8','b8'),(9,'a9','b9'),(1,'a5','b5'),(3,'a6','b6'); + +query T +merge into t1_separate as t1 using (select * from t2_separate) as t2 on t1.a = t2.a when not matched then insert *; +---- +4 + +## without order by +query TTT +select * from t1_separate; +---- +1 a5 b5 +3 a6 b6 +8 a8 b8 +9 a9 b9 + +## test macthed-only cluster by +query T +merge into t1_separate as t1 using (select * from t2_separate) as t2 on t1.a = t2.a when matched then update *; +---- +4 + +query TTT +select * from t1_separate; +---- +1 a5 b5 +3 a6 b6 +8 a8 b8 +9 a9 b9 + +## test full operation cluster by +statement ok +insert into t2_separate values(5,'a5','b5'),(7,'a7','b7'); + +query TT +merge into t1_separate as t1 using (select * from t2_separate) as t2 on t1.a = t2.a when matched then update * when not matched then insert *; +---- +2 4 + +## we will do compact +query TTT +select * from t1_separate; +---- +1 a5 b5 +3 a6 b6 +5 a5 b5 +7 a7 b7 +8 a8 b8 +9 a9 b9 + statement ok set enable_experimental_merge_into = 0; diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test index b80104e9f28f..ccfcf2889bc8 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test @@ -8,10 +8,10 @@ statement ok drop table if exists t2; statement ok -create table t1(a int,b string, c string); +create table t1(a int,b string, c string) cluster by(a,b); statement ok -create table t2(a int,b string, c string); +create table t2(a int,b string, c string) cluster by(a,b); statement ok insert into t1 values(1,'b1','c1'),(2,'b2','c2'); @@ -276,7 +276,7 @@ statement ok drop table if exists target_table; statement ok -create table target_table(a int,b string,c string); +create table target_table(a int,b string,c string) cluster by(a,b); statement ok insert into target_table values(1,'a_1','b_1'),(2,'a_2','b_2'); @@ -288,7 +288,7 @@ select * from target_table order by a,b,c; 2 a_2 b_2 statement ok -create table test_stage(a int,b string,c string); +create table test_stage(a int,b string,c string) cluster by(a,b); statement ok insert into test_stage values(1,'a1','b1'),(2,'a2','b2'),(3,'a3','b3'); @@ -468,10 +468,10 @@ select * from t1 order by a,b,c; 1 a1 b1 statement ok -CREATE TABLE employees (employee_id INT, employee_name VARCHAR(255),department VARCHAR(255)); +CREATE TABLE employees (employee_id INT, employee_name VARCHAR(255),department VARCHAR(255)) cluster by(employee_id,employee_name); statement ok -CREATE TABLE salaries (employee_id INT,salary DECIMAL(10, 2)); +CREATE TABLE salaries (employee_id INT,salary DECIMAL(10, 2)) cluster by(employee_id,salary); statement ok INSERT INTO employees VALUES(1, 'Alice', 'HR'),(2, 'Bob', 'IT'),(3, 'Charlie', 'Finance'),(4, 'David', 'HR'); @@ -494,10 +494,10 @@ select * from salaries order by employee_id; ## null cast bug fix statement ok -create table t1_target(a int not null); +create table t1_target(a int not null) cluster by(a); statement ok -create table t2_source(a int not null); +create table t2_source(a int not null) cluster by(a); statement ok insert into t1_target values(1); @@ -559,13 +559,13 @@ statement ok drop table if exists source_test; statement ok -create table target_test(a int,b string); +create table target_test(a int,b string) cluster by(a,b); statement ok insert into target_test values(1,'a'),(2,'b'),(3,'c'); statement ok -create table source_test(a int,b string,delete_flag bool); +create table source_test(a int,b string,delete_flag bool) cluster by(a,b);; statement ok insert into source_test values(1,'d',true),(2,'e',true),(3,'f',false),(4,'e',true),(5,'f',false); @@ -609,10 +609,10 @@ merge into test_order as t using (select id,34 as id1,238 as id2, id3, id4, id5, ## test update list #13297 statement ok -create table t11(a int,b string, c string); +create table t11(a int,b string, c string) cluster by(a,b); statement ok -create table t12(a int,b string, c string); +create table t12(a int,b string, c string) cluster by(a,b); statement ok insert into t11 values(1,'b1','c1'),(2,'b2','c2'); @@ -628,7 +628,7 @@ merge into t11 using (select a, c from t12) as t12 on t11.a = t12.a when matched ## test issue #13287 statement ok -create table tt1 (a int, b int); +create table tt1 (a int, b int) cluster by(a,b); statement error 1065 merge into tt1 using(select 10, 20) as tt2 on tt1.a = 1 when not matched and tt1.b = 2 then insert values (10, 20); @@ -645,7 +645,7 @@ select count(*) from tt1; ## test issue #13367 statement ok -create table tt2(a bool, b variant, c map(string, string)); +create table tt2(a bool, b variant, c map(string, string)) cluster by(a,b); statement ok insert into tt2 values (true, '10', {'k1':'v1'}), (false, '20', {'k2':'v2'}) @@ -669,10 +669,10 @@ statement ok drop table if exists t2; statement ok -create table t1(a int); +create table t1(a int) cluster by(a); statement ok -create table t2(a int); +create table t2(a int) cluster by(a); statement ok insert into t1 values(1); @@ -697,10 +697,10 @@ statement ok drop table if exists t2; statement ok -create table t1(b int); +create table t1(b int) cluster by(b); statement ok -create table t2(a int); +create table t2(a int) cluster by(a); statement ok insert into t1 values(1); @@ -719,10 +719,10 @@ statement ok drop table if exists t2; statement ok -create table t1(a int,b string,c bool); +create table t1(a int,b string,c bool) cluster by(a,b); statement ok -create table t2(a int,b string,c bool); +create table t2(a int,b string,c bool) cluster by(a,b); statement ok insert into t1 values(1,'a1',true),(2,'a2',false),(3,'a3',true); @@ -783,7 +783,7 @@ statement ok drop table if exists tt1; statement ok -create table tt1(a bool, b int); +create table tt1(a bool, b int) cluster by(a,b); statement ok insert into tt1 values (true, 1), (false, 2); @@ -806,10 +806,10 @@ statement ok drop table if exists t12; statement ok -create table t12 (a int, b int); +create table t12 (a int, b int) cluster by(a,b); statement ok -create table t11 (a int, b int); +create table t11 (a int, b int) cluster by(a,b); statement ok insert into t11 values (1, 10),(2, 20),(3, 30),(4, 40); @@ -1006,7 +1006,7 @@ FROM orders; 64.16764110 6.416764110000 1.97683658 19.29134884 statement ok -create table tb_01 (id int,c1 varchar,c2 datetime(0),c3 json); +create table tb_01 (id int,c1 varchar,c2 datetime(0),c3 json) cluster by(c1,c2); statement ok create table tmp_01 like tb_01; @@ -1026,10 +1026,10 @@ select id,c1,to_date(c2),c3 from tb_01; ## test #issue13932 statement ok -create table null_target(a int not null,b text); +create table null_target(a int not null,b text) cluster by(a,b); statement ok -create table null_source(a int not null,b text); +create table null_source(a int not null,b text) cluster by(a,b); statement ok insert into null_target values(1,'a1'); From e9e2d625be8865006dee4db08861479847253221 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 15:10:10 +0800 Subject: [PATCH 08/32] fix test --- .../09_0036_merge_into_without_distributed_enable.test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test index ccfcf2889bc8..b04df8e87474 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test @@ -565,7 +565,7 @@ statement ok insert into target_test values(1,'a'),(2,'b'),(3,'c'); statement ok -create table source_test(a int,b string,delete_flag bool) cluster by(a,b);; +create table source_test(a int,b string,delete_flag bool) cluster by(a,b); statement ok insert into source_test values(1,'d',true),(2,'e',true),(3,'f',false),(4,'e',true),(5,'f',false); From 481f54494e09a8d3dae3a75b27ccbfa0c52e2076 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 15:45:58 +0800 Subject: [PATCH 09/32] finish new distributed pipeline --- .../pipelines/builders/builder_merge_into.rs | 99 +++++++++++++++---- 1 file changed, 78 insertions(+), 21 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index 9134fbd06bdb..55835f836d7e 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -374,6 +374,7 @@ impl PipelineBuilder { segments, distributed, merge_type, + change_join_order, .. } = merge_into; @@ -458,7 +459,10 @@ impl PipelineBuilder { } if need_unmatch { - if !*distributed { + // distributed: false, standalone mode, we need to add insert processor + // (distirbuted,change join order):(true,true) target is build side, we + // need to support insert in local node. + if !*distributed || *distributed && *change_join_order { let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create( unmatched.clone(), input.output_schema()?, @@ -545,6 +549,7 @@ impl PipelineBuilder { self.resize_row_id(2)?; } } else { + // I. if change_join_order is false, it means source is build side. // the complete pipeline(with matched and unmatched) below: // shuffle outputs and resize row_id // ---------------------------------------------------------------------- @@ -560,6 +565,22 @@ impl PipelineBuilder { // ---------------------------------------------------------------------- // 1.for matched only, there is no row_number // 2.for unmatched only/insert only, there is no row_id and matched data. + // II. if change_join_order is true, it means target is build side. + // the complete pipeline(with matched and unmatched) below: + // shuffle outputs and resize row_id + // ---------------------------------------------------------------------- + // row_id port0_1 row_id port0_1 row_id port + // matched data port0_2 row_id port1_1 matched data port0_2 + // unmatched port0_3 matched data port0_2 matched data port1_2 + // row_id port1_1 matched data port1_2 ...... + // matched data port1_2 ===> ..... ====> ...... + // unmatched port1_3 ..... ...... + // unmatched port0_3 unmatched port + // ...... unmatched port1_3 + // ...... ..... + // ---------------------------------------------------------------------- + // 1.for matched only, there is no unmatched port + // 2.for unmatched only/insert only, there is no row_id and matched data. // do shuffle let mut rules = Vec::with_capacity(self.main_pipeline.output_len()); if need_match && need_unmatch { @@ -580,7 +601,7 @@ impl PipelineBuilder { self.main_pipeline.reorder_inputs(rules); self.resize_row_id(2)?; } else if !need_match && need_unmatch { - // insert-only, there are only row_numbers + // insert-only, there are only row_numbers/unmatched ports self.main_pipeline.try_resize(1)?; } } @@ -594,13 +615,23 @@ impl PipelineBuilder { } } else if need_match && need_unmatch { // remove first row_id port and last row_number_port - self.main_pipeline.output_len() - 2 + if !*change_join_order { + self.main_pipeline.output_len() - 2 + } else { + // remove first row_id port + self.main_pipeline.output_len() - 1 + } } else if need_match && !need_unmatch { // remove first row_id port self.main_pipeline.output_len() - 1 } else { - // there are only row_number ports - 0 + // there are only row_number + if !*change_join_order { + 0 + } else { + // unmatched prot + 1 + } }; // fill default columns @@ -628,7 +659,7 @@ impl PipelineBuilder { // receive row_id builder.add_items_prepend(vec![create_dummy_item()]); } - if need_unmatch { + if need_unmatch && !*change_join_order { // receive row_number builder.add_items(vec![create_dummy_item()]); } @@ -683,11 +714,21 @@ impl PipelineBuilder { mid_len } else { let (mid_len, last_len) = if need_match && need_unmatch { - // with row_id and row_number - (output_lens - 2, 1) + if !*change_join_order { + // with row_id and row_number + (output_lens - 2, 1) + } else { + // with row_id + (output_lens - 1, 0) + } } else { - // (with row_id and without row_number) or (without row_id and with row_number) - (output_lens - 1, 0) + if *change_join_order && !need_match && need_unmatch { + // insert only + (output_lens, 0) + } else { + // (with row_id and without row_number/unmatched) or (without row_id and with row_number/unmatched) + (output_lens - 1, 0) + } }; table.cluster_gen_for_append_with_specified_len( self.ctx.clone(), @@ -701,12 +742,17 @@ impl PipelineBuilder { pipe_items.clear(); if need_match { - pipe_items.push(table.rowid_aggregate_mutator( - self.ctx.clone(), - block_builder, - io_request_semaphore, - segments.clone(), - )?); + // rowid should be accumulated in main node. + if *change_join_order && *distributed { + pipe_items.push(create_dummy_item()) + } else { + pipe_items.push(table.rowid_aggregate_mutator( + self.ctx.clone(), + block_builder, + io_request_semaphore, + segments.clone(), + )?); + } } for _ in 0..serialize_len { @@ -722,7 +768,7 @@ impl PipelineBuilder { } // receive row_number - if *distributed && need_unmatch { + if *distributed && need_unmatch && !*change_join_order { pipe_items.push(create_dummy_item()); } @@ -732,6 +778,7 @@ impl PipelineBuilder { pipe_items, )); + // complete pipeline(if change_join_order is true, there is no row_number_port): // resize block ports // aggregate_mutator port/dummy_item port aggregate_mutator port/ dummy_item (this depends on apply_row_id) // serialize_block port0 ======> @@ -757,7 +804,7 @@ impl PipelineBuilder { } // with row_number - if *distributed && need_unmatch { + if *distributed && need_unmatch && !change_join_order { ranges.push(vec![self.main_pipeline.output_len() - 1]); } @@ -781,13 +828,14 @@ impl PipelineBuilder { vec.push(serialize_segment_transform.into_pipe_item()); } - if need_unmatch { + if need_unmatch && !*change_join_order { vec.push(create_dummy_item()) } vec }; // the complete pipeline(with matched and unmatched) below: + // I. change_join_order: false // distributed: false // output_port0: MutationLogs // output_port1: MutationLogs @@ -797,7 +845,16 @@ impl PipelineBuilder { // output_port1: MutationLogs // output_port2: row_numbers // 1.for matched only, there are no row_numbers - // 2.for insert only, there is no output_port0. + // 2.for insert only, there is no output_port0,output_port1. + // II. change_join_order: true + // distributed: false + // output_port0: MutationLogs + // output_port1: MutationLogs + // + // distributed: true + // output_port0: rowid + // output_port1: MutationLogs + // 1.for insert only, there is no output_port0. self.main_pipeline.add_pipe(Pipe::create( self.main_pipeline.output_len(), get_output_len(&pipe_items), @@ -805,7 +862,7 @@ impl PipelineBuilder { )); // accumulate row_number - if *distributed && need_unmatch { + if *distributed && need_unmatch && !*change_join_order { let pipe_items = if need_match { vec![ create_dummy_item(), From 0f1442cd9ebde10dce64068c00b5423dc619327a Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 16:00:57 +0800 Subject: [PATCH 10/32] fix typo --- src/query/service/src/pipelines/builders/builder_merge_into.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index 55835f836d7e..b281d5056847 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -460,7 +460,7 @@ impl PipelineBuilder { if need_unmatch { // distributed: false, standalone mode, we need to add insert processor - // (distirbuted,change join order):(true,true) target is build side, we + // (distributed,change join order):(true,true) target is build side, we // need to support insert in local node. if !*distributed || *distributed && *change_join_order { let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create( From d66d5c00b1b553ad9d18e6904ddb7ccde00bf4cf Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 16:51:53 +0800 Subject: [PATCH 11/32] fix lint --- .../src/pipelines/builders/builder_merge_into.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index b281d5056847..b3d7f793e8f5 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -462,7 +462,7 @@ impl PipelineBuilder { // distributed: false, standalone mode, we need to add insert processor // (distributed,change join order):(true,true) target is build side, we // need to support insert in local node. - if !*distributed || *distributed && *change_join_order { + if !*distributed || *change_join_order { let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create( unmatched.clone(), input.output_schema()?, @@ -721,14 +721,12 @@ impl PipelineBuilder { // with row_id (output_lens - 1, 0) } + } else if *change_join_order && !need_match && need_unmatch { + // insert only + (output_lens, 0) } else { - if *change_join_order && !need_match && need_unmatch { - // insert only - (output_lens, 0) - } else { - // (with row_id and without row_number/unmatched) or (without row_id and with row_number/unmatched) - (output_lens - 1, 0) - } + // (with row_id and without row_number/unmatched) or (without row_id and with row_number/unmatched) + (output_lens - 1, 0) }; table.cluster_gen_for_append_with_specified_len( self.ctx.clone(), From 91d044a896ffeb2094abac43e78b07928ba9b62a Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 18:58:44 +0800 Subject: [PATCH 12/32] fix bugs --- .../interpreter_merge_into_static_filter.rs | 95 ++++++++++--------- .../sql/src/planner/optimizer/optimizer.rs | 2 - 2 files changed, 50 insertions(+), 47 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs index 928f5146820a..a64a1f0cb42f 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs @@ -57,10 +57,10 @@ use crate::interpreters::InterpreterFactory; use crate::sessions::QueryContext; struct MergeStyleJoin<'a> { - source_conditions: &'a [ScalarExpr], - target_conditions: &'a [ScalarExpr], - source_sexpr: &'a SExpr, - target_sexpr: &'a SExpr, + build_conditions: &'a [ScalarExpr], + probe_conditions: &'a [ScalarExpr], + build_sexpr: &'a SExpr, + probe_sexpr: &'a SExpr, } impl MergeStyleJoin<'_> { @@ -73,25 +73,27 @@ impl MergeStyleJoin<'_> { join_op.join_type == JoinType::Right || join_op.join_type == JoinType::RightAnti || join_op.join_type == JoinType::Inner + || join_op.join_type == JoinType::Left + || join_op.join_type == JoinType::LeftAnti ); - let source_conditions = &join_op.right_conditions; - let target_conditions = &join_op.left_conditions; - let source_sexpr = join.child(1).unwrap(); - let target_sexpr = join.child(0).unwrap(); + let build_conditions = &join_op.right_conditions; + let probe_conditions = &join_op.left_conditions; + let build_sexpr = join.child(1).unwrap(); + let probe_sexpr = join.child(0).unwrap(); MergeStyleJoin { - source_conditions, - target_conditions, - source_sexpr, - target_sexpr, + build_conditions, + probe_conditions, + build_sexpr, + probe_sexpr, } } pub fn collect_column_map(&self) -> HashMap { let mut column_map = HashMap::new(); for (t, s) in self - .target_conditions + .probe_conditions .iter() - .zip(self.source_conditions.iter()) + .zip(self.build_conditions.iter()) { if let (ScalarExpr::BoundColumnRef(t_col), ScalarExpr::BoundColumnRef(s_col)) = (t, s) { column_map.insert(t_col.column.column_name.clone(), s_col.column.clone()); @@ -119,7 +121,7 @@ impl MergeIntoInterpreter { // \ // SourcePlan let m_join = MergeStyleJoin::new(join); - if m_join.source_conditions.is_empty() { + if m_join.build_conditions.is_empty() { return Ok(Box::new(join.clone())); } let column_map = m_join.collect_column_map(); @@ -181,9 +183,9 @@ impl MergeIntoInterpreter { // 2. build filter and push down to target side ctx.set_status_info("building pushdown filters"); - let mut filters = Vec::with_capacity(m_join.target_conditions.len()); + let mut filters = Vec::with_capacity(m_join.probe_conditions.len()); - for (i, target_side_expr) in m_join.target_conditions.iter().enumerate() { + for (i, target_side_expr) in m_join.probe_conditions.iter().enumerate() { let mut filter_parts = vec![]; for block in blocks.iter() { let block = block.convert_to_full(); @@ -225,11 +227,11 @@ impl MergeIntoInterpreter { } filters.extend(Self::combine_filter_parts(&filter_parts).into_iter()); } - let mut target_plan = m_join.target_sexpr.clone(); - Self::push_down_filters(&mut target_plan, &filters)?; - let source_plan = m_join.source_sexpr; + let mut probe_plan = m_join.probe_sexpr.clone(); + Self::push_down_filters(&mut probe_plan, &filters)?; + let build_plan = m_join.build_sexpr; let new_sexpr = - join.replace_children(vec![Arc::new(target_plan), Arc::new(source_plan.clone())]); + join.replace_children(vec![Arc::new(probe_plan), Arc::new(build_plan.clone())]); ctx.set_status_info("join expression replaced"); Ok(Box::new(new_sexpr)) @@ -381,9 +383,9 @@ impl MergeIntoInterpreter { metadata: &MetadataRef, group_expr: ScalarExpr, ) -> Result { - let mut eval_scalar_items = Vec::with_capacity(m_join.source_conditions.len()); - let mut min_max_binding = Vec::with_capacity(m_join.source_conditions.len() * 2); - let mut min_max_scalar_items = Vec::with_capacity(m_join.source_conditions.len() * 2); + let mut eval_scalar_items = Vec::with_capacity(m_join.build_conditions.len()); + let mut min_max_binding = Vec::with_capacity(m_join.build_conditions.len() * 2); + let mut min_max_scalar_items = Vec::with_capacity(m_join.build_conditions.len() * 2); let mut group_items = vec![]; let index = metadata @@ -407,46 +409,46 @@ impl MergeIntoInterpreter { scalar: evaled, index, }); - for source_side_expr in m_join.source_conditions { + for build_side_expr in m_join.build_conditions { // eval source side join expr let index = metadata .write() - .add_derived_column("".to_string(), source_side_expr.data_type()?); + .add_derived_column("".to_string(), build_side_expr.data_type()?); let evaled = ScalarExpr::BoundColumnRef(BoundColumnRef { span: None, column: ColumnBindingBuilder::new( "".to_string(), index, - Box::new(source_side_expr.data_type()?), + Box::new(build_side_expr.data_type()?), Visibility::Visible, ) .build(), }); eval_scalar_items.push(ScalarItem { - scalar: source_side_expr.clone(), + scalar: build_side_expr.clone(), index, }); // eval min/max of source side join expr - let min_display_name = format!("min({:?})", source_side_expr); - let max_display_name = format!("max({:?})", source_side_expr); + let min_display_name = format!("min({:?})", build_side_expr); + let max_display_name = format!("max({:?})", build_side_expr); let min_index = metadata .write() - .add_derived_column(min_display_name.clone(), source_side_expr.data_type()?); + .add_derived_column(min_display_name.clone(), build_side_expr.data_type()?); let max_index = metadata .write() - .add_derived_column(max_display_name.clone(), source_side_expr.data_type()?); + .add_derived_column(max_display_name.clone(), build_side_expr.data_type()?); let min_binding = ColumnBindingBuilder::new( min_display_name.clone(), min_index, - Box::new(source_side_expr.data_type()?), + Box::new(build_side_expr.data_type()?), Visibility::Visible, ) .build(); let max_binding = ColumnBindingBuilder::new( max_display_name.clone(), max_index, - Box::new(source_side_expr.data_type()?), + Box::new(build_side_expr.data_type()?), Visibility::Visible, ) .build(); @@ -458,7 +460,7 @@ impl MergeIntoInterpreter { distinct: false, params: vec![], args: vec![evaled.clone()], - return_type: Box::new(source_side_expr.data_type()?), + return_type: Box::new(build_side_expr.data_type()?), display_name: min_display_name.clone(), }), index: min_index, @@ -469,7 +471,7 @@ impl MergeIntoInterpreter { distinct: false, params: vec![], args: vec![evaled], - return_type: Box::new(source_side_expr.data_type()?), + return_type: Box::new(build_side_expr.data_type()?), display_name: max_display_name.clone(), }), index: max_index, @@ -478,21 +480,24 @@ impl MergeIntoInterpreter { min_max_scalar_items.push(max); } - let eval_source_side_join_expr_op = EvalScalar { + let eval_build_side_join_expr_op = EvalScalar { items: eval_scalar_items, }; - let source_plan = m_join.source_sexpr; - let eval_target_side_condition_sexpr = if let RelOperator::Exchange(_) = source_plan.plan() - { + let build_plan = m_join.build_sexpr; + let eval_probe_side_condition_sexpr = if let RelOperator::Exchange(_) = build_plan.plan() { // there is another row_number operator here SExpr::create_unary( - Arc::new(eval_source_side_join_expr_op.into()), - Arc::new(source_plan.child(0)?.child(0)?.clone()), + Arc::new(eval_build_side_join_expr_op.into()), + SExpr::create_unary( + // merge data here + Arc::new(RelOperator::Exchange(common_sql::plans::Exchange::Merge)), + Arc::new(build_plan.child(0)?.child(0)?.clone()), + ), ) } else { SExpr::create_unary( - Arc::new(eval_source_side_join_expr_op.into()), - Arc::new(source_plan.clone()), + Arc::new(eval_build_side_join_expr_op.into()), + Arc::new(build_plan.clone()), ) }; @@ -509,7 +514,7 @@ impl MergeIntoInterpreter { }; let agg_partial_sexpr = SExpr::create_unary( Arc::new(agg_partial_op.into()), - Arc::new(eval_target_side_condition_sexpr), + Arc::new(eval_probe_side_condition_sexpr), ); let agg_final_op = Aggregate { mode: AggregateMode::Final, diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 8bd13eff4c7e..8a518925cd84 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -248,9 +248,7 @@ pub fn optimize_query( if unsafe { ctx.get_settings().get_disable_join_reorder()? } { return heuristic.optimize_expression(&result, &[RuleID::EliminateEvalScalar]); } - println!("result: {:?}\n", result); let result = heuristic.optimize_expression(&result, &RESIDUAL_RULES)?; - println!("result2: {:?}\n", result); Ok(result) } From 41b6547c9b54f78bc9021c13aa8ad18446302342 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 19:04:45 +0800 Subject: [PATCH 13/32] fix static filter bug --- .../interpreters/interpreter_merge_into_static_filter.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs index 928f5146820a..4d04c6335ffa 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs @@ -484,10 +484,13 @@ impl MergeIntoInterpreter { let source_plan = m_join.source_sexpr; let eval_target_side_condition_sexpr = if let RelOperator::Exchange(_) = source_plan.plan() { - // there is another row_number operator here SExpr::create_unary( Arc::new(eval_source_side_join_expr_op.into()), - Arc::new(source_plan.child(0)?.child(0)?.clone()), + SExpr::create_unary( + // there is another row_number operator here + RelOperator::Exchange(common_sql::plans::Exchange::Merge), + Arc::new(source_plan.child(0)?.child(0)?.clone()), + ), ) } else { SExpr::create_unary( From 2d8c2cac05b7dadf4082e835c421bd8a35638312 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 19:13:33 +0800 Subject: [PATCH 14/32] fix --- .../interpreters/interpreter_merge_into_static_filter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs index 4d04c6335ffa..4064da7572ec 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs @@ -486,11 +486,11 @@ impl MergeIntoInterpreter { { SExpr::create_unary( Arc::new(eval_source_side_join_expr_op.into()), - SExpr::create_unary( + Arc::new(SExpr::create_unary( // there is another row_number operator here - RelOperator::Exchange(common_sql::plans::Exchange::Merge), + Arc::new(RelOperator::Exchange(common_sql::plans::Exchange::Merge)), Arc::new(source_plan.child(0)?.child(0)?.clone()), - ), + )), ) } else { SExpr::create_unary( From afe09f251e4866b6a903379c105fbe7916fd7732 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 19:29:48 +0800 Subject: [PATCH 15/32] fix test --- .../09_0036_merge_into_without_distributed_enable.test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test index b04df8e87474..bacff352fe1f 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test @@ -645,7 +645,7 @@ select count(*) from tt1; ## test issue #13367 statement ok -create table tt2(a bool, b variant, c map(string, string)) cluster by(a,b); +create table tt2(a bool, b variant, c map(string, string)) cluster by(a,c); statement ok insert into tt2 values (true, '10', {'k1':'v1'}), (false, '20', {'k2':'v2'}) From b3b2d9a5e0a17597fe94f3ff81f42ce7b19107db Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 21:13:43 +0800 Subject: [PATCH 16/32] fix test --- .../interpreter_merge_into_static_filter.rs | 16 +++++++++------- ...36_merge_into_without_distributed_enable.test | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs index 4064da7572ec..d9a3969f01ef 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs @@ -486,11 +486,8 @@ impl MergeIntoInterpreter { { SExpr::create_unary( Arc::new(eval_source_side_join_expr_op.into()), - Arc::new(SExpr::create_unary( - // there is another row_number operator here - Arc::new(RelOperator::Exchange(common_sql::plans::Exchange::Merge)), - Arc::new(source_plan.child(0)?.child(0)?.clone()), - )), + // there is another row_number operator here + Arc::new(source_plan.child(0)?.child(0)?.clone()), ) } else { SExpr::create_unary( @@ -522,8 +519,13 @@ impl MergeIntoInterpreter { limit: None, grouping_sets: None, }; - let agg_final_sexpr = - SExpr::create_unary(Arc::new(agg_final_op.into()), Arc::new(agg_partial_sexpr)); + let agg_final_sexpr = SExpr::create_unary( + Arc::new(agg_final_op.into()), + Arc::new(SExpr::create_unary( + Arc::new(RelOperator::Exchange(common_sql::plans::Exchange::Merge)), + Arc::new(agg_partial_sexpr), + )), + ); Ok(Plan::Query { s_expr: Box::new(agg_final_sexpr), metadata: metadata.clone(), diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test index bacff352fe1f..d0b7356d94c8 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test @@ -645,7 +645,7 @@ select count(*) from tt1; ## test issue #13367 statement ok -create table tt2(a bool, b variant, c map(string, string)) cluster by(a,c); +create table tt2(a bool, b variant, c map(string, string)) cluster by(a); statement ok insert into tt2 values (true, '10', {'k1':'v1'}), (false, '20', {'k2':'v2'}) From 992daeeba291ea66b8b6f9fa685209ad39569659 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 21:43:13 +0800 Subject: [PATCH 17/32] rollback --- .../interpreter_merge_into_static_filter.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs index d9a3969f01ef..4064da7572ec 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs @@ -486,8 +486,11 @@ impl MergeIntoInterpreter { { SExpr::create_unary( Arc::new(eval_source_side_join_expr_op.into()), - // there is another row_number operator here - Arc::new(source_plan.child(0)?.child(0)?.clone()), + Arc::new(SExpr::create_unary( + // there is another row_number operator here + Arc::new(RelOperator::Exchange(common_sql::plans::Exchange::Merge)), + Arc::new(source_plan.child(0)?.child(0)?.clone()), + )), ) } else { SExpr::create_unary( @@ -519,13 +522,8 @@ impl MergeIntoInterpreter { limit: None, grouping_sets: None, }; - let agg_final_sexpr = SExpr::create_unary( - Arc::new(agg_final_op.into()), - Arc::new(SExpr::create_unary( - Arc::new(RelOperator::Exchange(common_sql::plans::Exchange::Merge)), - Arc::new(agg_partial_sexpr), - )), - ); + let agg_final_sexpr = + SExpr::create_unary(Arc::new(agg_final_op.into()), Arc::new(agg_partial_sexpr)); Ok(Plan::Query { s_expr: Box::new(agg_final_sexpr), metadata: metadata.clone(), From cf6a3957ac9925caa7808d4e43895e62c260022c Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 22:49:51 +0800 Subject: [PATCH 18/32] remove static filter --- .../interpreters/interpreter_merge_into.rs | 6 +- .../interpreter_merge_into_static_filter.rs | 536 ------------------ 2 files changed, 1 insertion(+), 541 deletions(-) delete mode 100644 src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 1ade312ee5c6..01ca9535e739 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -188,14 +188,10 @@ impl MergeIntoInterpreter { input }; - let optimized_input = - Self::build_static_filter(&input, meta_data, self.ctx.clone(), check_table).await?; let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false); // build source for MergeInto - let join_input = builder - .build(&optimized_input, *columns_set.clone()) - .await?; + let join_input = builder.build(&input, *columns_set.clone()).await?; // find row_id column index let join_output_schema = join_input.output_schema()?; diff --git a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs deleted file mode 100644 index 4064da7572ec..000000000000 --- a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs +++ /dev/null @@ -1,536 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::sync::Arc; - -use common_ast::parser::parse_comma_separated_exprs; -use common_ast::parser::tokenize_sql; -use common_ast::Dialect; -use common_exception::ErrorCode; -use common_exception::Result; -use common_expression::SendableDataBlockStream; -use common_sql::bind_one_table; -use common_sql::format_scalar; -use common_sql::optimizer::SExpr; -use common_sql::plans::Aggregate; -use common_sql::plans::AggregateFunction; -use common_sql::plans::AggregateMode; -use common_sql::plans::BoundColumnRef; -use common_sql::plans::ConstantExpr; -use common_sql::plans::EvalScalar; -use common_sql::plans::FunctionCall; -use common_sql::plans::JoinType; -use common_sql::plans::Plan; -use common_sql::plans::RelOperator; -use common_sql::plans::ScalarItem; -use common_sql::plans::VisitorMut; -use common_sql::BindContext; -use common_sql::ColumnBinding; -use common_sql::ColumnBindingBuilder; -use common_sql::MetadataRef; -use common_sql::NameResolutionContext; -use common_sql::ScalarExpr; -use common_sql::TypeChecker; -use common_sql::Visibility; -use common_storages_factory::Table; -use common_storages_fuse::FuseTable; -use common_storages_fuse::TableContext; -use log::info; -use log::warn; -use tokio_stream::StreamExt; - -use super::InterpreterPtr; -use crate::interpreters::interpreter_merge_into::MergeIntoInterpreter; -use crate::interpreters::InterpreterFactory; -use crate::sessions::QueryContext; - -struct MergeStyleJoin<'a> { - source_conditions: &'a [ScalarExpr], - target_conditions: &'a [ScalarExpr], - source_sexpr: &'a SExpr, - target_sexpr: &'a SExpr, -} - -impl MergeStyleJoin<'_> { - pub fn new(join: &SExpr) -> MergeStyleJoin { - let join_op = match join.plan() { - RelOperator::Join(j) => j, - _ => unreachable!(), - }; - assert!( - join_op.join_type == JoinType::Right - || join_op.join_type == JoinType::RightAnti - || join_op.join_type == JoinType::Inner - ); - let source_conditions = &join_op.right_conditions; - let target_conditions = &join_op.left_conditions; - let source_sexpr = join.child(1).unwrap(); - let target_sexpr = join.child(0).unwrap(); - MergeStyleJoin { - source_conditions, - target_conditions, - source_sexpr, - target_sexpr, - } - } - - pub fn collect_column_map(&self) -> HashMap { - let mut column_map = HashMap::new(); - for (t, s) in self - .target_conditions - .iter() - .zip(self.source_conditions.iter()) - { - if let (ScalarExpr::BoundColumnRef(t_col), ScalarExpr::BoundColumnRef(s_col)) = (t, s) { - column_map.insert(t_col.column.column_name.clone(), s_col.column.clone()); - } - } - column_map - } -} - -impl MergeIntoInterpreter { - pub async fn build_static_filter( - join: &SExpr, - metadata: &MetadataRef, - ctx: Arc, - table: Arc, - ) -> Result> { - // 1. collect statistics from the source side - // plan of source table is extended to: - // - // AggregateFinal(min(source_join_side_expr),max(source_join_side_expr)) - // \ - // AggregatePartial(min(source_join_side_expr),max(source_join_side_expr)) - // \ - // EvalScalar(source_join_side_expr) - // \ - // SourcePlan - let m_join = MergeStyleJoin::new(join); - if m_join.source_conditions.is_empty() { - return Ok(Box::new(join.clone())); - } - let column_map = m_join.collect_column_map(); - - let fuse_table = table.as_any().downcast_ref::().ok_or_else(|| { - ErrorCode::Unimplemented(format!( - "table {}, engine type {}, does not support MERGE INTO", - table.name(), - table.get_table_info().engine(), - )) - })?; - - let group_expr = match fuse_table.cluster_key_str() { - None => { - info!("no cluster key found, use default plan"); - return Ok(Box::new(join.clone())); - } - Some(cluster_key_str) => { - match Self::extract_group_by_expr(&ctx, table.clone(), cluster_key_str, &column_map) - .await? - { - None => { - warn!( - "no suitable group by expr found, use default plan. cluster_key_str is '{}'", - cluster_key_str - ); - return Ok(Box::new(join.clone())); - } - Some(expr) => expr, - } - } - }; - - ctx.set_status_info("constructing static filter plan"); - let plan = Self::build_min_max_group_by_left_most_cluster_key_expr_plan( - &m_join, metadata, group_expr, - ) - .await?; - - ctx.set_status_info("executing static filter plan"); - let interpreter: InterpreterPtr = InterpreterFactory::get(ctx.clone(), &plan).await?; - let stream: SendableDataBlockStream = interpreter.execute(ctx.clone()).await?; - let blocks = stream.collect::>>().await?; - // check if number of partitions is too much - { - let max_number_partitions = - ctx.get_settings() - .get_merge_into_static_filter_partition_threshold()? as usize; - - let number_partitions: usize = blocks.iter().map(|b| b.num_rows()).sum(); - if number_partitions > max_number_partitions { - warn!( - "number of partitions {} exceeds threshold {}", - number_partitions, max_number_partitions - ); - return Ok(Box::new(join.clone())); - } - } - - // 2. build filter and push down to target side - ctx.set_status_info("building pushdown filters"); - let mut filters = Vec::with_capacity(m_join.target_conditions.len()); - - for (i, target_side_expr) in m_join.target_conditions.iter().enumerate() { - let mut filter_parts = vec![]; - for block in blocks.iter() { - let block = block.convert_to_full(); - let min_column = block.get_by_offset(i * 2).value.as_column().unwrap(); - let max_column = block.get_by_offset(i * 2 + 1).value.as_column().unwrap(); - for (min_scalar, max_scalar) in min_column.iter().zip(max_column.iter()) { - let gte_min = ScalarExpr::FunctionCall(FunctionCall { - span: None, - func_name: "gte".to_string(), - params: vec![], - arguments: vec![ - target_side_expr.clone(), - ScalarExpr::ConstantExpr(ConstantExpr { - span: None, - value: min_scalar.to_owned(), - }), - ], - }); - let lte_max = ScalarExpr::FunctionCall(FunctionCall { - span: None, - func_name: "lte".to_string(), - params: vec![], - arguments: vec![ - target_side_expr.clone(), - ScalarExpr::ConstantExpr(ConstantExpr { - span: None, - value: max_scalar.to_owned(), - }), - ], - }); - let and = ScalarExpr::FunctionCall(FunctionCall { - span: None, - func_name: "and".to_string(), - params: vec![], - arguments: vec![gte_min, lte_max], - }); - filter_parts.push(and); - } - } - filters.extend(Self::combine_filter_parts(&filter_parts).into_iter()); - } - let mut target_plan = m_join.target_sexpr.clone(); - Self::push_down_filters(&mut target_plan, &filters)?; - let source_plan = m_join.source_sexpr; - let new_sexpr = - join.replace_children(vec![Arc::new(target_plan), Arc::new(source_plan.clone())]); - - ctx.set_status_info("join expression replaced"); - Ok(Box::new(new_sexpr)) - } - - fn combine_filter_parts(filter_parts: &[ScalarExpr]) -> Option { - match filter_parts.len() { - 0 => None, - 1 => Some(filter_parts[0].clone()), - _ => { - let mid = filter_parts.len() / 2; - let left = Self::combine_filter_parts(&filter_parts[0..mid]); - let right = Self::combine_filter_parts(&filter_parts[mid..]); - if let Some(left) = left { - if let Some(right) = right { - Some(ScalarExpr::FunctionCall(FunctionCall { - span: None, - func_name: "or".to_string(), - params: vec![], - arguments: vec![left, right], - })) - } else { - Some(left) - } - } else { - right - } - } - } - } - - fn push_down_filters(s_expr: &mut SExpr, filters: &[ScalarExpr]) -> Result<()> { - match s_expr.plan() { - RelOperator::Scan(s) => { - let mut new_scan = s.clone(); - info!("push down {} filters:", filters.len()); - for filter in filters { - info!("{}", format_scalar(filter)); - } - if let Some(preds) = new_scan.push_down_predicates { - new_scan.push_down_predicates = - Some(preds.iter().chain(filters).cloned().collect()); - } else { - new_scan.push_down_predicates = Some(filters.to_vec()); - } - *s_expr = SExpr::create_leaf(Arc::new(RelOperator::Scan(new_scan))); - } - RelOperator::EvalScalar(_) - | RelOperator::Filter(_) - | RelOperator::Aggregate(_) - | RelOperator::Sort(_) - | RelOperator::Limit(_) => { - let mut new_child = s_expr.child(0)?.clone(); - Self::push_down_filters(&mut new_child, filters)?; - *s_expr = s_expr.replace_children(vec![Arc::new(new_child)]); - } - RelOperator::CteScan(_) => {} - RelOperator::Join(_) => {} - RelOperator::Exchange(_) => {} - RelOperator::UnionAll(_) => {} - RelOperator::DummyTableScan(_) => {} - RelOperator::Window(_) => {} - RelOperator::ProjectSet(_) => {} - RelOperator::MaterializedCte(_) => {} - RelOperator::ConstantTableScan(_) => {} - RelOperator::Pattern(_) => {} - RelOperator::AddRowNumber(_) => {} - RelOperator::Udf(_) => {} - } - Ok(()) - } - - // Extract group by expr from cluster key - // - // the left most cluster key expression will be returned if any, - // otherwise None will be returned. - async fn extract_group_by_expr( - ctx: &Arc, - table: Arc, - cluster_key_str: &str, - column_map: &HashMap, - ) -> Result> { - let ast_exprs = { - let sql_dialect = Dialect::MySQL; - let tokens = tokenize_sql(cluster_key_str)?; - parse_comma_separated_exprs(&tokens, sql_dialect)? - }; - - let ast_expr = if !ast_exprs.is_empty() { - &ast_exprs[0] - } else { - warn!("empty cluster key found after parsing."); - return Ok(None); - }; - - let (mut bind_context, metadata) = bind_one_table(table)?; - let name_resolution_ctx = NameResolutionContext::try_from(ctx.get_settings().as_ref())?; - let mut type_checker = { - let forbid_udf = true; - TypeChecker::try_create( - &mut bind_context, - ctx.clone(), - &name_resolution_ctx, - metadata.clone(), - &[], - forbid_udf, - )? - }; - - let (scalar_expr, _) = *type_checker.resolve(ast_expr).await?; - - let mut left_most_expr = match &scalar_expr { - ScalarExpr::FunctionCall(f) if f.func_name == "tuple" && !f.arguments.is_empty() => { - f.arguments[0].clone() - } - ScalarExpr::FunctionCall(_) => { - warn!("cluster key expr is not a (suitable) tuple expression"); - return Ok(None); - } - _ => scalar_expr, - }; - - struct ReplaceColumnVisitor<'a> { - column_map: &'a HashMap, - } - - impl<'a> VisitorMut<'a> for ReplaceColumnVisitor<'a> { - fn visit_bound_column_ref(&mut self, column: &mut BoundColumnRef) -> Result<()> { - if let Some(new_column) = self.column_map.get(&column.column.column_name) { - column.column = new_column.clone(); - Ok(()) - } else { - Err(ErrorCode::from_string_no_backtrace(String::new())) - } - } - } - - let mut visitor = ReplaceColumnVisitor { column_map }; - - if visitor.visit(&mut left_most_expr).is_ok() { - Ok(Some(left_most_expr)) - } else { - Ok(None) - } - } - - async fn build_min_max_group_by_left_most_cluster_key_expr_plan( - m_join: &MergeStyleJoin<'_>, - metadata: &MetadataRef, - group_expr: ScalarExpr, - ) -> Result { - let mut eval_scalar_items = Vec::with_capacity(m_join.source_conditions.len()); - let mut min_max_binding = Vec::with_capacity(m_join.source_conditions.len() * 2); - let mut min_max_scalar_items = Vec::with_capacity(m_join.source_conditions.len() * 2); - let mut group_items = vec![]; - - let index = metadata - .write() - .add_derived_column("".to_string(), group_expr.data_type()?); - let evaled = ScalarExpr::BoundColumnRef(BoundColumnRef { - span: None, - column: ColumnBindingBuilder::new( - "".to_string(), - index, - Box::new(group_expr.data_type()?), - Visibility::Visible, - ) - .build(), - }); - eval_scalar_items.push(ScalarItem { - scalar: group_expr, - index, - }); - group_items.push(ScalarItem { - scalar: evaled, - index, - }); - for source_side_expr in m_join.source_conditions { - // eval source side join expr - let index = metadata - .write() - .add_derived_column("".to_string(), source_side_expr.data_type()?); - let evaled = ScalarExpr::BoundColumnRef(BoundColumnRef { - span: None, - column: ColumnBindingBuilder::new( - "".to_string(), - index, - Box::new(source_side_expr.data_type()?), - Visibility::Visible, - ) - .build(), - }); - eval_scalar_items.push(ScalarItem { - scalar: source_side_expr.clone(), - index, - }); - - // eval min/max of source side join expr - let min_display_name = format!("min({:?})", source_side_expr); - let max_display_name = format!("max({:?})", source_side_expr); - let min_index = metadata - .write() - .add_derived_column(min_display_name.clone(), source_side_expr.data_type()?); - let max_index = metadata - .write() - .add_derived_column(max_display_name.clone(), source_side_expr.data_type()?); - let min_binding = ColumnBindingBuilder::new( - min_display_name.clone(), - min_index, - Box::new(source_side_expr.data_type()?), - Visibility::Visible, - ) - .build(); - let max_binding = ColumnBindingBuilder::new( - max_display_name.clone(), - max_index, - Box::new(source_side_expr.data_type()?), - Visibility::Visible, - ) - .build(); - min_max_binding.push(min_binding); - min_max_binding.push(max_binding); - let min = ScalarItem { - scalar: ScalarExpr::AggregateFunction(AggregateFunction { - func_name: "min".to_string(), - distinct: false, - params: vec![], - args: vec![evaled.clone()], - return_type: Box::new(source_side_expr.data_type()?), - display_name: min_display_name.clone(), - }), - index: min_index, - }; - let max = ScalarItem { - scalar: ScalarExpr::AggregateFunction(AggregateFunction { - func_name: "max".to_string(), - distinct: false, - params: vec![], - args: vec![evaled], - return_type: Box::new(source_side_expr.data_type()?), - display_name: max_display_name.clone(), - }), - index: max_index, - }; - min_max_scalar_items.push(min); - min_max_scalar_items.push(max); - } - - let eval_source_side_join_expr_op = EvalScalar { - items: eval_scalar_items, - }; - let source_plan = m_join.source_sexpr; - let eval_target_side_condition_sexpr = if let RelOperator::Exchange(_) = source_plan.plan() - { - SExpr::create_unary( - Arc::new(eval_source_side_join_expr_op.into()), - Arc::new(SExpr::create_unary( - // there is another row_number operator here - Arc::new(RelOperator::Exchange(common_sql::plans::Exchange::Merge)), - Arc::new(source_plan.child(0)?.child(0)?.clone()), - )), - ) - } else { - SExpr::create_unary( - Arc::new(eval_source_side_join_expr_op.into()), - Arc::new(source_plan.clone()), - ) - }; - - let mut bind_context = Box::new(BindContext::new()); - bind_context.columns = min_max_binding; - - let agg_partial_op = Aggregate { - mode: AggregateMode::Partial, - group_items: group_items.clone(), - aggregate_functions: min_max_scalar_items.clone(), - from_distinct: false, - limit: None, - grouping_sets: None, - }; - let agg_partial_sexpr = SExpr::create_unary( - Arc::new(agg_partial_op.into()), - Arc::new(eval_target_side_condition_sexpr), - ); - let agg_final_op = Aggregate { - mode: AggregateMode::Final, - group_items, - aggregate_functions: min_max_scalar_items, - from_distinct: false, - limit: None, - grouping_sets: None, - }; - let agg_final_sexpr = - SExpr::create_unary(Arc::new(agg_final_op.into()), Arc::new(agg_partial_sexpr)); - Ok(Plan::Query { - s_expr: Box::new(agg_final_sexpr), - metadata: metadata.clone(), - bind_context, - rewrite_kind: None, - formatted_ast: None, - ignore_result: false, - }) - } -} From e8231a6d12679bcc409369d42947b0b02b9de612 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Thu, 14 Dec 2023 22:56:11 +0800 Subject: [PATCH 19/32] fix typo --- src/query/service/src/interpreters/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index 890267e79a89..d0a73a12ed4e 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -47,7 +47,6 @@ mod interpreter_index_refresh; mod interpreter_insert; mod interpreter_kill; mod interpreter_merge_into; -mod interpreter_merge_into_static_filter; mod interpreter_metrics; mod interpreter_network_policies_show; mod interpreter_network_policy_alter; From 6c1e7ded2d39746164a30bd7a744a1a8c5bf76a0 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 15 Dec 2023 13:16:54 +0800 Subject: [PATCH 20/32] resolve conflict --- src/query/sql/src/planner/optimizer/optimizer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 8a518925cd84..876219750edd 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -248,8 +248,7 @@ pub fn optimize_query( if unsafe { ctx.get_settings().get_disable_join_reorder()? } { return heuristic.optimize_expression(&result, &[RuleID::EliminateEvalScalar]); } - let result = heuristic.optimize_expression(&result, &RESIDUAL_RULES)?; - Ok(result) + RecursiveOptimizer::new(&RESIDUAL_RULES, &opt_ctx).run(&s_expr) } // TODO(leiysky): reuse the optimization logic with `optimize_query` From 77bb6f52f17a19e3aa7c9a1d58c6320ed6ad6b7d Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 15 Dec 2023 14:20:37 +0800 Subject: [PATCH 21/32] fix conflict --- .../src/interpreters/interpreter_merge_into_static_filter.rs | 4 ++-- src/query/sql/src/planner/optimizer/optimizer.rs | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs index a64a1f0cb42f..a378e079d5fe 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs @@ -488,11 +488,11 @@ impl MergeIntoInterpreter { // there is another row_number operator here SExpr::create_unary( Arc::new(eval_build_side_join_expr_op.into()), - SExpr::create_unary( + Arc::new(SExpr::create_unary( // merge data here Arc::new(RelOperator::Exchange(common_sql::plans::Exchange::Merge)), Arc::new(build_plan.child(0)?.child(0)?.clone()), - ), + )), ) } else { SExpr::create_unary( diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index b0b7506ff261..955291692d06 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -25,9 +25,7 @@ use log::info; use super::cost::CostContext; use super::distributed::MergeSourceOptimizer; use super::format::display_memo; -use super::rule::TransformResult; use super::Memo; -use super::RuleFactory; use crate::optimizer::cascades::CascadesOptimizer; use crate::optimizer::decorrelate::decorrelate_subquery; use crate::optimizer::distributed::optimize_distributed_query; From 4dceb911461c861bf2d083d61dcf232b7b530082 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 15 Dec 2023 17:46:53 +0800 Subject: [PATCH 22/32] remove static filter --- .../service/src/interpreters/interpreter_merge_into.rs | 8 +++----- .../interpreters/interpreter_merge_into_static_filter.rs | 3 +++ 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 0ddf127c7c90..d37b07e9d97f 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -189,14 +189,12 @@ impl MergeIntoInterpreter { input }; - let optimized_input = - Self::build_static_filter(&input, meta_data, self.ctx.clone(), check_table).await?; + // let optimized_input = + // Self::build_static_filter(&input, meta_data, self.ctx.clone(), check_table).await?; let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false); // build source for MergeInto - let join_input = builder - .build(&optimized_input, *columns_set.clone()) - .await?; + let join_input = builder.build(input.as_ref(), *columns_set.clone()).await?; // find row_id column index let join_output_schema = join_input.output_schema()?; diff --git a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs index a378e079d5fe..086e5cbb0b58 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs @@ -56,6 +56,7 @@ use crate::interpreters::interpreter_merge_into::MergeIntoInterpreter; use crate::interpreters::InterpreterFactory; use crate::sessions::QueryContext; +#[allow(dead_code)] struct MergeStyleJoin<'a> { build_conditions: &'a [ScalarExpr], probe_conditions: &'a [ScalarExpr], @@ -63,6 +64,7 @@ struct MergeStyleJoin<'a> { probe_sexpr: &'a SExpr, } +#[allow(dead_code)] impl MergeStyleJoin<'_> { pub fn new(join: &SExpr) -> MergeStyleJoin { let join_op = match join.plan() { @@ -103,6 +105,7 @@ impl MergeStyleJoin<'_> { } } +#[allow(dead_code)] impl MergeIntoInterpreter { pub async fn build_static_filter( join: &SExpr, From a4b2d278baca271a97a6ce1ea367b2cb9968efa9 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 15 Dec 2023 19:25:33 +0800 Subject: [PATCH 23/32] fix optimizer --- src/query/sql/src/planner/optimizer/optimizer.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 955291692d06..078bd79c55e3 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -241,12 +241,15 @@ pub fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result { let mut state = TransformResult::new(); // we will reorder the join order according to the cardinality of target and source. rule.apply(&join_sexpr, &mut state)?; - assert_eq!(state.results().len(), 1); + assert!(state.results().len() <= 1); // we need to check whether we do swap left and right. - let old_left = join_sexpr.child(0)?; - let new_left = state.results()[0].child(0)?; - let change_join_order = old_left == new_left; - join_sexpr = Box::new(state.results()[0].clone()); + let change_join_order = if state.results().len() == 1 { + join_sexpr = Box::new(state.results()[0].clone()); + true + } else { + false + }; + // try to optimize distributed join if opt_ctx.enable_distributed_optimization && opt_ctx From df155ee066c038d30abee0793d2ab3d33fdc7a71 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 15 Dec 2023 21:55:06 +0800 Subject: [PATCH 24/32] fix rowid split for new distributed execution --- .../interpreters/interpreter_merge_into.rs | 20 ++++++++++++---- .../pipelines/builders/builder_merge_into.rs | 24 +++++++++++++++++++ .../fuse/src/operations/merge_into/mod.rs | 2 +- .../operations/merge_into/processors/mod.rs | 2 +- ...istributed_merge_into_block_deserialize.rs | 8 +++---- ..._distributed_merge_into_block_serialize.rs | 8 +++---- .../processor_merge_into_matched_and_split.rs | 6 ++--- ...sor_merge_into_split_row_number_and_log.rs | 10 ++++---- 8 files changed, 57 insertions(+), 23 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index d37b07e9d97f..ba3609d43887 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -23,12 +23,14 @@ use common_exception::Result; use common_expression::types::UInt32Type; use common_expression::ConstantFolder; use common_expression::DataBlock; +use common_expression::DataField; use common_expression::DataSchema; use common_expression::DataSchemaRef; use common_expression::FieldIndex; use common_expression::FromData; use common_expression::RemoteExpr; use common_expression::SendableDataBlockStream; +use common_expression::ROW_ID_COL_NAME; use common_expression::ROW_NUMBER_COL_NAME; use common_functions::BUILTIN_FUNCTIONS; use common_meta_app::schema::TableInfo; @@ -227,7 +229,7 @@ impl MergeIntoInterpreter { } } - if *distributed { + if *distributed && !*change_join_order { row_number_idx = Some(join_output_schema.index_of(ROW_NUMBER_COL_NAME)?); } @@ -238,7 +240,7 @@ impl MergeIntoInterpreter { )); } - if *distributed && row_number_idx.is_none() { + if *distributed && row_number_idx.is_none() && !*change_join_order { return Err(ErrorCode::InvalidRowIdIndex( "can't get internal row_number_idx when running merge into", )); @@ -412,9 +414,17 @@ impl MergeIntoInterpreter { row_id_idx, segments: segments.clone(), distributed: true, - output_schema: DataSchemaRef::new(DataSchema::new(vec![ - join_output_schema.fields[row_number_idx.unwrap()].clone(), - ])), + output_schema: match *change_join_order { + false => DataSchemaRef::new(DataSchema::new(vec![ + join_output_schema.fields[row_number_idx.unwrap()].clone(), + ])), + true => DataSchemaRef::new(DataSchema::new(vec![DataField::new( + ROW_ID_COL_NAME, + common_expression::types::DataType::Number( + common_expression::types::NumberDataType::UInt64, + ), + )])), + }, merge_type: merge_type.clone(), change_join_order: *change_join_order, })); diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index b3d7f793e8f5..62ae6b4849b2 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -44,6 +44,8 @@ use common_storages_fuse::operations::MergeIntoNotMatchedProcessor; use common_storages_fuse::operations::MergeIntoSplitProcessor; use common_storages_fuse::operations::RowNumberAndLogSplitProcessor; use common_storages_fuse::operations::TransformAddRowNumberColumnProcessor; +use common_storages_fuse::operations::TransformDistributedMergeIntoBlockDeserialize; +use common_storages_fuse::operations::TransformDistributedMergeIntoBlockSerialize; use common_storages_fuse::operations::TransformSerializeBlock; use common_storages_fuse::FuseTable; @@ -117,6 +119,17 @@ impl PipelineBuilder { self.build_pipeline(input)?; self.main_pipeline.try_resize(1)?; + // deserialize MixRowIdKindAndLog + if *change_join_order { + self.main_pipeline + .add_transform(|transform_input_port, transform_output_port| { + Ok(TransformDistributedMergeIntoBlockDeserialize::create( + transform_input_port, + transform_output_port, + )) + })?; + } + let tbl = self .ctx .build_table_by_table_info(catalog_info, table_info, None)?; @@ -878,6 +891,17 @@ impl PipelineBuilder { )); } + // add distributed_merge_into_block_serialize + // we will wrap rowid and log as MixRowIdKindAndLog + if *distributed && *change_join_order { + self.main_pipeline + .add_transform(|transform_input_port, transform_output_port| { + Ok(TransformDistributedMergeIntoBlockSerialize::create( + transform_input_port, + transform_output_port, + )) + })?; + } Ok(()) } } diff --git a/src/query/storages/fuse/src/operations/merge_into/mod.rs b/src/query/storages/fuse/src/operations/merge_into/mod.rs index 52af8dcd8fed..85510b43a9e9 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mod.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mod.rs @@ -19,7 +19,7 @@ pub use mutator::MatchedAggregator; pub use processors::MatchedSplitProcessor; pub use processors::MergeIntoNotMatchedProcessor; pub use processors::MergeIntoSplitProcessor; -pub use processors::MixRowNumberKindAndLog; +pub use processors::MixRowIdKindAndLog; pub use processors::RowNumberAndLogSplitProcessor; pub use processors::SourceFullMatched; pub use processors::TransformAddRowNumberColumnProcessor; diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs b/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs index 681a1fdb1dbf..1bdf2ca5c879 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs @@ -24,7 +24,7 @@ mod transform_matched_mutation_aggregator; pub use processor_distributed_merge_into_block_deserialize::TransformDistributedMergeIntoBlockDeserialize; pub use processor_distributed_merge_into_block_serialize::TransformDistributedMergeIntoBlockSerialize; pub use processor_merge_into_matched_and_split::MatchedSplitProcessor; -pub use processor_merge_into_matched_and_split::MixRowNumberKindAndLog; +pub use processor_merge_into_matched_and_split::MixRowIdKindAndLog; pub(crate) use processor_merge_into_matched_and_split::RowIdKind; pub use processor_merge_into_matched_and_split::SourceFullMatched; pub use processor_merge_into_not_matched::MergeIntoNotMatchedProcessor; diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_deserialize.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_deserialize.rs index 4c1a56e72e5d..407736a251a2 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_deserialize.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_deserialize.rs @@ -26,7 +26,7 @@ use common_pipeline_core::PipeItem; use common_pipeline_transforms::processors::Transform; use common_pipeline_transforms::processors::Transformer; -use super::processor_merge_into_matched_and_split::MixRowNumberKindAndLog; +use super::processor_merge_into_matched_and_split::MixRowIdKindAndLog; use super::RowIdKind; // It will receive MutationLogs Or RowIds. @@ -37,7 +37,7 @@ pub struct TransformDistributedMergeIntoBlockDeserialize; /// this processor will be used in the future for merge into based on shuffle hash join. impl TransformDistributedMergeIntoBlockDeserialize { - fn create(input: Arc, output: Arc) -> ProcessorPtr { + pub fn create(input: Arc, output: Arc) -> ProcessorPtr { ProcessorPtr::create(Transformer::create( input, output, @@ -66,7 +66,7 @@ impl Transform for TransformDistributedMergeIntoBlockDeserialize { const NAME: &'static str = "TransformDistributedMergeIntoBlockDeserialize"; fn transform(&mut self, data: DataBlock) -> Result { - let mix_kind = MixRowNumberKindAndLog::downcast_ref_from(data.get_meta().unwrap()).unwrap(); + let mix_kind = MixRowIdKindAndLog::downcast_ref_from(data.get_meta().unwrap()).unwrap(); match mix_kind.kind { 0 => Ok(DataBlock::new_with_meta( data.columns().to_vec(), @@ -84,7 +84,7 @@ impl Transform for TransformDistributedMergeIntoBlockDeserialize { data.num_rows(), Some(Box::new(RowIdKind::Delete)), )), - _ => Err(ErrorCode::BadBytes("get error MixRowNumberKindAndLog kind")), + _ => Err(ErrorCode::BadBytes("get error MixRowIdKindAndLog kind")), } } } diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_serialize.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_serialize.rs index 42a21d512d82..ca6c7466f230 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_serialize.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_serialize.rs @@ -30,7 +30,7 @@ use common_pipeline_core::PipeItem; use common_pipeline_transforms::processors::Transform; use common_pipeline_transforms::processors::Transformer; -use super::processor_merge_into_matched_and_split::MixRowNumberKindAndLog; +use super::processor_merge_into_matched_and_split::MixRowIdKindAndLog; use super::RowIdKind; use crate::operations::common::MutationLogs; @@ -42,7 +42,7 @@ pub struct TransformDistributedMergeIntoBlockSerialize; /// this processor will be used in the future for merge into based on shuffle hash join. impl TransformDistributedMergeIntoBlockSerialize { - fn create(input: Arc, output: Arc) -> ProcessorPtr { + pub fn create(input: Arc, output: Arc) -> ProcessorPtr { ProcessorPtr::create(Transformer::create( input, output, @@ -79,7 +79,7 @@ impl Transform for TransformDistributedMergeIntoBlockSerialize { Ok(DataBlock::new_with_meta( vec![entry], 1, - Some(Box::new(MixRowNumberKindAndLog { + Some(Box::new(MixRowIdKindAndLog { log: Some(log), kind: 0, })), @@ -90,7 +90,7 @@ impl Transform for TransformDistributedMergeIntoBlockSerialize { Ok(DataBlock::new_with_meta( data.columns().to_vec(), data.num_rows(), - Some(Box::new(MixRowNumberKindAndLog { + Some(Box::new(MixRowIdKindAndLog { log: None, kind: match row_id_kind { RowIdKind::Update => 1, diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs index c8b99c6a2dd4..60b75c91d2ac 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs @@ -66,7 +66,7 @@ enum MutationKind { // if we use hash shuffle join strategy, the enum // type can't be parser when transform data between nodes. #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] -pub struct MixRowNumberKindAndLog { +pub struct MixRowIdKindAndLog { pub log: Option, // kind's range is [0,1,2], 0 stands for log // 1 stands for row_id_update, 2 stands for row_id_delete, @@ -74,9 +74,9 @@ pub struct MixRowNumberKindAndLog { } #[typetag::serde(name = "mix_row_id_kind_and_log")] -impl BlockMetaInfo for MixRowNumberKindAndLog { +impl BlockMetaInfo for MixRowIdKindAndLog { fn equals(&self, info: &Box) -> bool { - MixRowNumberKindAndLog::downcast_ref_from(info).is_some_and(|other| self == other) + MixRowIdKindAndLog::downcast_ref_from(info).is_some_and(|other| self == other) } fn clone_self(&self) -> Box { diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs index 4ca785e0342b..5bd6c8ed26e7 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs @@ -27,6 +27,7 @@ use common_pipeline_core::Pipe; use common_pipeline_core::PipeItem; use super::processor_merge_into_matched_and_split::SourceFullMatched; +use crate::operations::merge_into::processors::RowIdKind; pub struct RowNumberAndLogSplitProcessor { input_port: Arc, @@ -133,19 +134,18 @@ impl Processor for RowNumberAndLogSplitProcessor { fn process(&mut self) -> Result<()> { if let Some(data_block) = self.input_data.take() { - // all matched or logs - // if it's rowid, the meta will be none if data_block.get_meta().is_some() { if SourceFullMatched::downcast_ref_from(data_block.get_meta().unwrap()).is_some() { + // distributed mode: source as build side + self.output_data_row_number = Some(data_block) + } else if RowIdKind::downcast_ref_from(data_block.get_meta().unwrap()).is_some() { + // distributed mode: target as build side self.output_data_row_number = Some(data_block) } else { // mutation logs self.output_data_log = Some(data_block); } } else { - // when we use source as probe side and do distributed - // execution,it could be rowid but we use output_data_row_number. - // it doesn't matter. self.output_data_row_number = Some(data_block) } } From 1265c0418e532d510576aeb11baafadd76c14110 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 15 Dec 2023 22:49:43 +0800 Subject: [PATCH 25/32] fix lint --- ...sor_merge_into_split_row_number_and_log.rs | 8 +++--- .../mode/standalone/explain/merge_into.test | 28 +++++++++++++++++++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs index 5bd6c8ed26e7..dded883ff844 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs @@ -135,11 +135,11 @@ impl Processor for RowNumberAndLogSplitProcessor { fn process(&mut self) -> Result<()> { if let Some(data_block) = self.input_data.take() { if data_block.get_meta().is_some() { - if SourceFullMatched::downcast_ref_from(data_block.get_meta().unwrap()).is_some() { - // distributed mode: source as build side - self.output_data_row_number = Some(data_block) - } else if RowIdKind::downcast_ref_from(data_block.get_meta().unwrap()).is_some() { + // distributed mode: source as build side + if SourceFullMatched::downcast_ref_from(data_block.get_meta().unwrap()).is_some() // distributed mode: target as build side + || RowIdKind::downcast_ref_from(data_block.get_meta().unwrap()).is_some() + { self.output_data_row_number = Some(data_block) } else { // mutation logs diff --git a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test index afac09e1830c..d5fe6eeb0e5c 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test @@ -18,6 +18,34 @@ MERGE INTO salaries2 USING (SELECT * FROM employees2) as employees2 ON salaries2 ---- 2 2 +query T +explain MERGE INTO salaries2 USING (SELECT * FROM employees2) as employees2 ON salaries2.employee_id = employees2.employee_id WHEN MATCHED AND employees2.department = 'HR' THEN UPDATE SET salaries2.salary = salaries2.salary + 1000.00 WHEN MATCHED THEN UPDATE SET salaries2.salary = salaries2.salary + 500.00 WHEN NOT MATCHED THEN INSERT (employee_id, salary) VALUES (employees2.employee_id, 55000.00); +---- +MergeInto: +target_table: default.default.salaries2 +├── matched update: [condition: eq(employees2.department (#2), 'HR'),update set salary = plus(salaries2.salary (#4), 1000.00)] +├── matched update: [condition: None,update set salary = plus(salaries2.salary (#4), 500.00)] +├── unmatched insert: [condition: None,insert into (employee_id,salary) values(CAST(employees2.employee_id (#0) AS Int32 NULL),CAST(55000.00 AS Decimal(10, 2) NULL))] +└── HashJoin: LEFT OUTER + ├── equi conditions: [eq(employees2.employee_id (#0), salaries2.employee_id (#3))] + ├── non-equi conditions: [] + ├── Exchange(Merge) + │ └── EvalScalar + │ ├── scalars: [employees2.employee_id (#0), employees2.employee_name (#1), employees2.department (#2)] + │ └── LogicalGet + │ ├── table: default.default.employees2 + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── LogicalGet + ├── table: default.default.salaries2 + ├── filters: [] + ├── order by: [] + └── limit: NONE + +statement ok +INSERT INTO salaries2 VALUES(1, 50000.00),(2, 60000.00); + query T explain MERGE INTO salaries2 USING (SELECT * FROM employees2) as employees2 ON salaries2.employee_id = employees2.employee_id WHEN MATCHED AND employees2.department = 'HR' THEN UPDATE SET salaries2.salary = salaries2.salary + 1000.00 WHEN MATCHED THEN UPDATE SET salaries2.salary = salaries2.salary + 500.00 WHEN NOT MATCHED THEN INSERT (employee_id, salary) VALUES (employees2.employee_id, 55000.00); ---- From 460cb4fdaf00658540eca46e1f915f081bd3a79b Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 15 Dec 2023 23:50:59 +0800 Subject: [PATCH 26/32] fix fragment type --- .../src/schedulers/fragments/fragmenter.rs | 4 +++- .../optimizer/distributed/distributed_merge.rs | 5 ++++- .../mode/standalone/explain/merge_into.test | 15 +++++++-------- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index f1f6648a1609..c198c0195151 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -155,7 +155,9 @@ impl PhysicalPlanReplacer for Fragmenter { fn replace_merge_into(&mut self, plan: &MergeInto) -> Result { let input = self.replace(&plan.input)?; - self.state = State::SelectLeaf; + if !*plan.change_join_order { + self.state = State::SelectLeaf; + } Ok(PhysicalPlan::MergeInto(Box::new(MergeInto { input: Box::new(input), ..plan.clone() diff --git a/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs b/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs index 3943687136e2..0766605ae501 100644 --- a/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs +++ b/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs @@ -57,7 +57,10 @@ impl MergeSourceOptimizer { // target is build side let new_join_children = if change_join_order { vec![ - Arc::new(left_exchange_input.clone()), + Arc::new(SExpr::create_unary( + Arc::new(RelOperator::Exchange(Random)), + Arc::new(left_exchange_input.clone()), + )) Arc::new(SExpr::create_unary( Arc::new(RelOperator::Exchange(Broadcast)), Arc::new(right_exchange_input.clone()), diff --git a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test index d5fe6eeb0e5c..a5e6d602d395 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test @@ -29,14 +29,13 @@ target_table: default.default.salaries2 └── HashJoin: LEFT OUTER ├── equi conditions: [eq(employees2.employee_id (#0), salaries2.employee_id (#3))] ├── non-equi conditions: [] - ├── Exchange(Merge) - │ └── EvalScalar - │ ├── scalars: [employees2.employee_id (#0), employees2.employee_name (#1), employees2.department (#2)] - │ └── LogicalGet - │ ├── table: default.default.employees2 - │ ├── filters: [] - │ ├── order by: [] - │ └── limit: NONE + ├── EvalScalar + │ ├── scalars: [employees2.employee_id (#0), employees2.employee_name (#1), employees2.department (#2)] + │ └── LogicalGet + │ ├── table: default.default.employees2 + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE └── LogicalGet ├── table: default.default.salaries2 ├── filters: [] From 805efcda051916b99986b67718a6e591122cd98a Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Fri, 15 Dec 2023 23:59:51 +0800 Subject: [PATCH 27/32] add rule comment --- .../service/src/schedulers/fragments/fragmenter.rs | 2 +- .../optimizer/distributed/distributed_merge.rs | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index c198c0195151..484a2d97915f 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -155,7 +155,7 @@ impl PhysicalPlanReplacer for Fragmenter { fn replace_merge_into(&mut self, plan: &MergeInto) -> Result { let input = self.replace(&plan.input)?; - if !*plan.change_join_order { + if !plan.change_join_order { self.state = State::SelectLeaf; } Ok(PhysicalPlan::MergeInto(Box::new(MergeInto { diff --git a/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs b/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs index 0766605ae501..9b5317b610c6 100644 --- a/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs +++ b/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs @@ -19,6 +19,7 @@ use common_exception::Result; use crate::optimizer::SExpr; use crate::plans::AddRowNumber; use crate::plans::Exchange::Broadcast; +use crate::plans::Exchange::Random; use crate::plans::Join; use crate::plans::PatternPlan; use crate::plans::RelOp; @@ -60,7 +61,7 @@ impl MergeSourceOptimizer { Arc::new(SExpr::create_unary( Arc::new(RelOperator::Exchange(Random)), Arc::new(left_exchange_input.clone()), - )) + )), Arc::new(SExpr::create_unary( Arc::new(RelOperator::Exchange(Broadcast)), Arc::new(right_exchange_input.clone()), @@ -104,9 +105,11 @@ impl MergeSourceOptimizer { // Join // / \ // / \ - // * Exchange(Broadcast) - // | - // AddRowNumber + // Exchange Exchange(Broadcast) + // (Random) | + // | AddRowNumber + // | | + // * * // if target is build we will get below: // Output: // Exchange From ba19cd4f5581d1a7ac89cfce3bdaf2f6cc744357 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Sat, 16 Dec 2023 12:35:16 +0800 Subject: [PATCH 28/32] fix conflict --- src/query/service/src/interpreters/interpreter_merge_into.rs | 4 ++-- .../src/interpreters/interpreter_merge_into_static_filter.rs | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index a3844f0be0f4..5823449920b9 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -420,8 +420,8 @@ impl MergeIntoInterpreter { ])), true => DataSchemaRef::new(DataSchema::new(vec![DataField::new( ROW_ID_COL_NAME, - common_expression::types::DataType::Number( - common_expression::types::NumberDataType::UInt64, + databend_common_expression::types::DataType::Number( + databend_common_expression::types::NumberDataType::UInt64, ), )])), }, diff --git a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs index 7557f55897bf..589e7ac8e70e 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs @@ -493,7 +493,9 @@ impl MergeIntoInterpreter { Arc::new(eval_build_side_join_expr_op.into()), Arc::new(SExpr::create_unary( // merge data here - Arc::new(RelOperator::Exchange(common_sql::plans::Exchange::Merge)), + Arc::new(RelOperator::Exchange( + databend_common_sql::plans::Exchange::Merge, + )), Arc::new(build_plan.child(0)?.child(0)?.clone()), )), ) From 864a283794692126f252ddb4d54db5ca3509972f Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Mon, 18 Dec 2023 17:22:23 +0800 Subject: [PATCH 29/32] add more tests and fix target build bug when not match merge into optimizer --- .../interpreters/interpreter_merge_into.rs | 35 +++- .../distributed/distributed_merge.rs | 10 ++ ...merge_into_without_distributed_enable.test | 170 ++++++++++++++++++ .../09_0038_target_build_merge_into.test | 10 ++ 4 files changed, 217 insertions(+), 8 deletions(-) create mode 100644 tests/sqllogictests/suites/base/09_fuse_engine/09_0037_target_build_merge_into_without_distributed_enable.test create mode 100644 tests/sqllogictests/suites/base/09_fuse_engine/09_0038_target_build_merge_into.test diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 5823449920b9..4df77a0db1da 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -185,10 +185,12 @@ impl MergeIntoInterpreter { let table_name = table_name.clone(); let input = input.clone(); - let input = if let RelOperator::Exchange(_) = input.plan() { - Box::new(input.child(0)?.clone()) + // we need to extract join plan, but we need to give this exchage + // back at last. + let (input, extract_exchange) = if let RelOperator::Exchange(_) = input.plan() { + (Box::new(input.child(0)?.clone()), true) } else { - input + (input, false) }; // let optimized_input = @@ -260,11 +262,28 @@ impl MergeIntoInterpreter { // merge_into_source is used to recv join's datablocks and split them into macthed and not matched // datablocks. - let merge_into_source = PhysicalPlan::MergeIntoSource(MergeIntoSource { - input: Box::new(join_input), - row_id_idx: row_id_idx as u32, - merge_type: merge_type.clone(), - }); + let merge_into_source = if !*distributed && extract_exchange { + // if we doesn't support distributed merge into, we should give the exchange merge back. + let rollback_join_input = PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: Box::new(join_input), + kind: FragmentKind::Merge, + keys: vec![], + allow_adjust_parallelism: true, + ignore_exchange: false, + }); + PhysicalPlan::MergeIntoSource(MergeIntoSource { + input: Box::new(rollback_join_input), + row_id_idx: row_id_idx as u32, + merge_type: merge_type.clone(), + }) + } else { + PhysicalPlan::MergeIntoSource(MergeIntoSource { + input: Box::new(join_input), + row_id_idx: row_id_idx as u32, + merge_type: merge_type.clone(), + }) + }; // transform unmatched for insert // reference to func `build_eval_scalar` diff --git a/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs b/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs index 724ac52072d7..e58c1434d988 100644 --- a/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs +++ b/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs @@ -88,6 +88,16 @@ impl MergeSourceOptimizer { Ok(s_expr.replace_children(vec![Arc::new(join_s_expr)])) } + // Todo!(JackTan25): some join_input S_Expr doesn't match below pattern, + // but we can also treat it as distributed mode. + // for example: + // // Input: + // Exchange(Merge) + // | + // Join + // / \ + // / \ + // * Exchange(broadcast) build_side fn merge_source_pattern() -> SExpr { // Input: // Exchange(Merge) diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0037_target_build_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0037_target_build_merge_into_without_distributed_enable.test new file mode 100644 index 000000000000..b5253c682aa7 --- /dev/null +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0037_target_build_merge_into_without_distributed_enable.test @@ -0,0 +1,170 @@ +statement ok +set enable_experimental_merge_into = 1; + +statement ok +create table target_build_table(a int,b string,c string) cluster by(a,b); + +statement ok +create table source_probe_table(a int,b string,c string); + +## target table should be small table +statement ok +insert into target_build_table values(1,'b1','c1'); + +statement ok +insert into target_build_table values(2,'b2','c2'); + +query TTT +select * from target_build_table order by a; +---- +1 b1 c1 +2 b2 c2 + +statement ok +insert into source_probe_table values(1,'b3','c3'); + +statement ok +insert into source_probe_table values(2,'b4','c4'); + +statement ok +insert into source_probe_table values(3,'b5','c5'); + +statement ok +insert into source_probe_table values(4,'b6','c6'); + +statement ok +insert into source_probe_table values(5,'b7','c7'); + +statement ok +insert into source_probe_table values(6,'b8','c8'); + +query TTT +select * from source_probe_table order by a; +---- +1 b3 c3 +2 b4 c4 +3 b5 c5 +4 b6 c6 +5 b7 c7 +6 b8 c8 + +## test matched only +query T +merge into target_build_table as t1 using source_probe_table as t2 on t1.a = t2.a when matched then update *; +---- +2 + +query TTT +select * from target_build_table order by a; +---- +1 b3 c3 +2 b4 c4 + +## test insert only +query T +merge into target_build_table as t1 using source_probe_table as t2 on t1.a = t2.a when not matched then insert *; +---- +4 + +query TTT +select * from target_build_table order by a; +---- +1 b3 c3 +2 b4 c4 +3 b5 c5 +4 b6 c6 +5 b7 c7 +6 b8 c8 + +## test full operation +statement ok +delete from target_build_table where a >= 3; + +query TTT +select * from target_build_table order by a; +---- +1 b3 c3 +2 b4 c4 + +query T +merge into target_build_table as t1 using source_probe_table as t2 on t1.a = t2.a when matched then update * when not matched then insert *; +---- +4 2 + +query TTT +select * from target_build_table order by a; +---- +1 b3 c3 +2 b4 c4 +3 b5 c5 +4 b6 c6 +5 b7 c7 +6 b8 c8 + +### test complex source but for target build +statement ok +DROP TABLE IF EXISTS orders_small; + +statement ok +CREATE TABLE orders_small ( + order_id INT NOT NULL, + user_id INT NOT NULL, + order_type VARCHAR NOT NULL, + asset_type VARCHAR NOT NULL, + quantity DECIMAL(18,8) NOT NULL, + price DECIMAL(18,8) NOT NULL, + status VARCHAR NOT NULL, + created_at DATE NOT NULL, + updated_at DATE NOT NULL +) row_per_block=513; + +statement ok +DROP TABLE IF EXISTS orders_large; + +statement ok +CREATE TABLE orders_large ( + order_id INT NOT NULL, + user_id INT NOT NULL, + order_type VARCHAR NOT NULL, + asset_type VARCHAR NOT NULL, + quantity DECIMAL(18,8) NOT NULL, + price DECIMAL(18,8) NOT NULL, + status VARCHAR NOT NULL, + created_at DATE NOT NULL, + updated_at DATE NOT NULL +) row_per_block=513; + + +statement ok +DROP TABLE IF EXISTS orders_random; + +statement ok +CREATE TABLE orders_random ( + order_id INT NOT NULL, + user_id INT NOT NULL, + order_type VARCHAR NOT NULL, + asset_type VARCHAR NOT NULL, + quantity DECIMAL(18,8) NOT NULL, + price DECIMAL(18,8) NOT NULL, + status VARCHAR NOT NULL, + created_at DATE NOT NULL, + updated_at DATE NOT NULL +) engine = random; + +statement ok +insert into orders_large (select * from orders_random limit 500000); + +statement ok +insert into orders_small (select * from orders_random limit 10); + +statement ok +MERGE INTO orders_small as orders USING ( + SELECT user_id, asset_type, SUM(quantity) AS total_quantity + FROM orders_large + GROUP BY user_id, asset_type +) AS agg_orders ON orders.user_id = agg_orders.user_id AND orders.asset_type = agg_orders.asset_type + WHEN MATCHED THEN + UPDATE SET orders.quantity = agg_orders.total_quantity; + +statement ok +set enable_experimental_merge_into = 0; diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0038_target_build_merge_into.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0038_target_build_merge_into.test new file mode 100644 index 000000000000..61d972de18d0 --- /dev/null +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0038_target_build_merge_into.test @@ -0,0 +1,10 @@ +statement ok +set enable_distributed_merge_into = 1; + +include ./09_0037_target_build_merge_into_without_distributed_enable.test + +statement ok +set enable_distributed_merge_into = 0; + +MERGE INTO orders_25 as orders USING ( SELECT * FROM orders WHERE status = 'pending') AS + pending_orders ON orders.order_id = pending_orders.order_id WHEN MATCHED THEN UPDATE SET orders.status = 'completed'; \ No newline at end of file From 67dffdc0aeb82c0ff7482a0570f85a6d14031176 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Mon, 18 Dec 2023 17:28:23 +0800 Subject: [PATCH 30/32] extract merge into branch --- .../sql/src/planner/optimizer/optimizer.rs | 171 +++++++++--------- 1 file changed, 85 insertions(+), 86 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 77520e7b6b38..a5bdc7e90b44 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -203,92 +203,7 @@ pub fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result { ); Ok(Plan::CopyIntoTable(plan)) } - Plan::MergeInto(plan) => { - // optimize source :fix issue #13733 - // reason: if there is subquery,windowfunc exprs etc. see - // src/planner/semantic/lowering.rs `as_raw_expr()`, we will - // get dummy index. So we need to use optimizer to solve this. - let mut right_source = optimize_query(opt_ctx.clone(), plan.input.child(1)?.clone())?; - - // if it's not distributed execution, we should reserve - // exchange to merge source data. - if opt_ctx.enable_distributed_optimization - && opt_ctx - .table_ctx - .get_settings() - .get_enable_distributed_merge_into()? - { - // we need to remove exchange of right_source, because it's - // not an end query. - if let RelOperator::Exchange(_) = right_source.plan.as_ref() { - right_source = right_source.child(0)?.clone(); - } - } - // replace right source - let mut join_sexpr = plan.input.clone(); - join_sexpr = Box::new(join_sexpr.replace_children(vec![ - Arc::new(join_sexpr.child(0)?.clone()), - Arc::new(right_source), - ])); - - // before, we think source table is always the small table. - // 1. for matched only, we use inner join - // 2. for insert only, we use right anti join - // 3. for full merge into, we use right outer join - // for now, let's import the statistic info to determine left join or right join - // we just do optimization for the top join (target and source),won't do recursive optimization. - let rule = RuleFactory::create_rule(RuleID::CommuteJoin, plan.meta_data.clone())?; - let mut state = TransformResult::new(); - // we will reorder the join order according to the cardinality of target and source. - rule.apply(&join_sexpr, &mut state)?; - assert!(state.results().len() <= 1); - // we need to check whether we do swap left and right. - let change_join_order = if state.results().len() == 1 { - join_sexpr = Box::new(state.results()[0].clone()); - true - } else { - false - }; - - // try to optimize distributed join - if opt_ctx.enable_distributed_optimization - && opt_ctx - .table_ctx - .get_settings() - .get_enable_distributed_merge_into()? - { - // input is a Join_SExpr - let merge_into_join_sexpr = - optimize_distributed_query(opt_ctx.table_ctx.clone(), &join_sexpr)?; - // after optimize source, we need to add - let merge_source_optimizer = MergeSourceOptimizer::create(); - let (optimized_distributed_merge_into_join_sexpr, distributed) = - if !merge_into_join_sexpr - .match_pattern(&merge_source_optimizer.merge_source_pattern) - { - (merge_into_join_sexpr.clone(), false) - } else { - ( - merge_source_optimizer - .optimize(&merge_into_join_sexpr, change_join_order)?, - true, - ) - }; - - Ok(Plan::MergeInto(Box::new(MergeInto { - input: Box::new(optimized_distributed_merge_into_join_sexpr), - distributed, - change_join_order, - ..*plan - }))) - } else { - Ok(Plan::MergeInto(Box::new(MergeInto { - input: join_sexpr, - change_join_order, - ..*plan - }))) - } - } + Plan::MergeInto(plan) => optimize_merge_into(opt_ctx.clone(), plan), // Passthrough statements. _ => Ok(plan), } @@ -353,3 +268,87 @@ fn get_optimized_memo( cascades.optimize(result)?; Ok((cascades.memo, cascades.best_cost_map)) } + +fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box) -> Result { + // optimize source :fix issue #13733 + // reason: if there is subquery,windowfunc exprs etc. see + // src/planner/semantic/lowering.rs `as_raw_expr()`, we will + // get dummy index. So we need to use optimizer to solve this. + let mut right_source = optimize_query(opt_ctx.clone(), plan.input.child(1)?.clone())?; + + // if it's not distributed execution, we should reserve + // exchange to merge source data. + if opt_ctx.enable_distributed_optimization + && opt_ctx + .table_ctx + .get_settings() + .get_enable_distributed_merge_into()? + { + // we need to remove exchange of right_source, because it's + // not an end query. + if let RelOperator::Exchange(_) = right_source.plan.as_ref() { + right_source = right_source.child(0)?.clone(); + } + } + // replace right source + let mut join_sexpr = plan.input.clone(); + join_sexpr = Box::new(join_sexpr.replace_children(vec![ + Arc::new(join_sexpr.child(0)?.clone()), + Arc::new(right_source), + ])); + + // before, we think source table is always the small table. + // 1. for matched only, we use inner join + // 2. for insert only, we use right anti join + // 3. for full merge into, we use right outer join + // for now, let's import the statistic info to determine left join or right join + // we just do optimization for the top join (target and source),won't do recursive optimization. + let rule = RuleFactory::create_rule(RuleID::CommuteJoin, plan.meta_data.clone())?; + let mut state = TransformResult::new(); + // we will reorder the join order according to the cardinality of target and source. + rule.apply(&join_sexpr, &mut state)?; + assert!(state.results().len() <= 1); + // we need to check whether we do swap left and right. + let change_join_order = if state.results().len() == 1 { + join_sexpr = Box::new(state.results()[0].clone()); + true + } else { + false + }; + + // try to optimize distributed join + if opt_ctx.enable_distributed_optimization + && opt_ctx + .table_ctx + .get_settings() + .get_enable_distributed_merge_into()? + { + // input is a Join_SExpr + let merge_into_join_sexpr = + optimize_distributed_query(opt_ctx.table_ctx.clone(), &join_sexpr)?; + // after optimize source, we need to add + let merge_source_optimizer = MergeSourceOptimizer::create(); + let (optimized_distributed_merge_into_join_sexpr, distributed) = + if !merge_into_join_sexpr.match_pattern(&merge_source_optimizer.merge_source_pattern) { + (merge_into_join_sexpr.clone(), false) + } else { + ( + merge_source_optimizer.optimize(&merge_into_join_sexpr, change_join_order)?, + true, + ) + }; + + Ok(Plan::MergeInto(Box::new(MergeInto { + input: Box::new(optimized_distributed_merge_into_join_sexpr), + distributed, + change_join_order, + ..*plan + }))) + } else { + Ok(Plan::MergeInto(Box::new(MergeInto { + input: join_sexpr, + change_join_order, + ..*plan + }))) + } +} From 50783bf7a6397d46d7e3b481729a394e87f2c943 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Mon, 18 Dec 2023 17:35:09 +0800 Subject: [PATCH 31/32] fix lint --- src/query/service/src/interpreters/interpreter_merge_into.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 4df77a0db1da..16465bdabdcf 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -185,7 +185,7 @@ impl MergeIntoInterpreter { let table_name = table_name.clone(); let input = input.clone(); - // we need to extract join plan, but we need to give this exchage + // we need to extract join plan, but we need to give this exchange // back at last. let (input, extract_exchange) = if let RelOperator::Exchange(_) = input.plan() { (Box::new(input.child(0)?.clone()), true) From 04ed6bb97003a629f206f981daad8d2dc3960b8d Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Mon, 18 Dec 2023 19:12:06 +0800 Subject: [PATCH 32/32] remove error test --- .../base/09_fuse_engine/09_0038_target_build_merge_into.test | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0038_target_build_merge_into.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0038_target_build_merge_into.test index 61d972de18d0..d6a7c97f92e1 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0038_target_build_merge_into.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0038_target_build_merge_into.test @@ -5,6 +5,3 @@ include ./09_0037_target_build_merge_into_without_distributed_enable.test statement ok set enable_distributed_merge_into = 0; - -MERGE INTO orders_25 as orders USING ( SELECT * FROM orders WHERE status = 'pending') AS - pending_orders ON orders.order_id = pending_orders.order_id WHEN MATCHED THEN UPDATE SET orders.status = 'completed'; \ No newline at end of file