From 754bb3ef52bbe780053a14b57b7dba3b68e1327f Mon Sep 17 00:00:00 2001 From: JackTan25 <60096118+JackTan25@users.noreply.github.com> Date: Sat, 7 Oct 2023 12:50:02 +0800 Subject: [PATCH] feat: improve parallel run for merge into (#13045) * add test first * modify shell * finish parallel * fix linux check * fix bug --------- Co-authored-by: dantengsky --- .../clickbench/benchmark_local_merge_into.sh | 2 +- benchmark/clickbench/merge_into/queries.sql | 2 + src/query/pipeline/core/src/pipeline.rs | 2 +- .../service/src/pipelines/pipeline_builder.rs | 154 +++++++++++++----- .../storages/fuse/src/operations/merge.rs | 2 +- .../mutator/split_by_expr_mutator.rs | 1 + .../mutator/update_by_expr_mutator.rs | 1 + .../processor_merge_into_not_matched.rs | 1 + 8 files changed, 122 insertions(+), 43 deletions(-) diff --git a/benchmark/clickbench/benchmark_local_merge_into.sh b/benchmark/clickbench/benchmark_local_merge_into.sh index 9cd249e5337c..4df03333af31 100755 --- a/benchmark/clickbench/benchmark_local_merge_into.sh +++ b/benchmark/clickbench/benchmark_local_merge_into.sh @@ -95,7 +95,7 @@ function run_query() { fi } -TRIES=3 +TRIES=1 QUERY_NUM=0 while read -r query; do echo "Running Q${QUERY_NUM}: ${query}" diff --git a/benchmark/clickbench/merge_into/queries.sql b/benchmark/clickbench/merge_into/queries.sql index d96d2b1ae52e..4f5af6beb3eb 100644 --- a/benchmark/clickbench/merge_into/queries.sql +++ b/benchmark/clickbench/merge_into/queries.sql @@ -1 +1,3 @@ set enable_experimental_merge_into = 1;merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *; +set enable_experimental_merge_into = 1;merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *; +set enable_experimental_merge_into = 1;merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *; diff --git a/src/query/pipeline/core/src/pipeline.rs b/src/query/pipeline/core/src/pipeline.rs index 24847893bcaf..3768ed6e45fb 100644 --- a/src/query/pipeline/core/src/pipeline.rs +++ b/src/query/pipeline/core/src/pipeline.rs @@ -274,7 +274,7 @@ impl Pipeline { } } - /// Duplite a pipe input to two outputs. + /// Duplicate a pipe input to two outputs. /// /// If `force_finish_together` enabled, once one output is finished, the other output will be finished too. pub fn duplicate(&mut self, force_finish_together: bool) -> Result<()> { diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 2f280beb16b5..14e99e5d399a 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -305,11 +305,16 @@ impl PipelineBuilder { let MergeIntoSource { input, row_id_idx } = merge_into_source; self.build_pipeline(input)?; - self.main_pipeline.try_resize(1)?; - let merge_into_split_processor = MergeIntoSplitProcessor::create(*row_id_idx, false)?; + // merge into's parallism depends on the join probe number. + let mut items = Vec::with_capacity(self.main_pipeline.output_len()); + let output_len = self.main_pipeline.output_len(); + for _ in 0..output_len { + let merge_into_split_processor = MergeIntoSplitProcessor::create(*row_id_idx, false)?; + items.push(merge_into_split_processor.into_pipe_item()); + } self.main_pipeline - .add_pipe(merge_into_split_processor.into_pipe()); + .add_pipe(Pipe::create(output_len, output_len * 2, items)); Ok(()) } @@ -431,24 +436,11 @@ impl PipelineBuilder { row_id_idx, segments, } = merge_into; + self.build_pipeline(input)?; let tbl = self .ctx .build_table_by_table_info(catalog_info, table_info, None)?; - let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create( - unmatched.clone(), - input.output_schema()?, - self.ctx.get_function_context()?, - )?; - - let matched_split_processor = MatchedSplitProcessor::create( - self.ctx.clone(), - *row_id_idx, - matched.clone(), - field_index_of_input_schema.clone(), - input.output_schema()?, - Arc::new(DataSchema::from(tbl.schema())), - )?; let table = FuseTable::try_from_table(tbl.as_ref())?; let block_thresholds = table.get_block_thresholds(); @@ -456,15 +448,15 @@ impl PipelineBuilder { let cluster_stats_gen = table.get_cluster_stats_gen(self.ctx.clone(), 0, block_thresholds)?; - // append data for unmatched data - let serialize_block_transform = TransformSerializeBlock::try_create( + // this TransformSerializeBlock is just used to get block_builder + let block_builder = TransformSerializeBlock::try_create( self.ctx.clone(), InputPort::create(), OutputPort::create(), table, - cluster_stats_gen, - )?; - let block_builder = serialize_block_transform.get_block_builder(); + cluster_stats_gen.clone(), + )? + .get_block_builder(); let serialize_segment_transform = TransformSerializeSegment::new( self.ctx.clone(), @@ -480,19 +472,79 @@ impl PipelineBuilder { } output_len }; - let pipe_items = vec![ - matched_split_processor.into_pipe_item(), - merge_into_not_matched_processor.into_pipe_item(), - ]; + // receive matched data and not matched data parallelly. + let mut pipe_items = Vec::with_capacity(self.main_pipeline.output_len()); + for _ in (0..self.main_pipeline.output_len()).step_by(2) { + let matched_split_processor = MatchedSplitProcessor::create( + self.ctx.clone(), + *row_id_idx, + matched.clone(), + field_index_of_input_schema.clone(), + input.output_schema()?, + Arc::new(DataSchema::from(tbl.schema())), + )?; + + let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create( + unmatched.clone(), + input.output_schema()?, + self.ctx.get_function_context()?, + )?; + + pipe_items.push(matched_split_processor.into_pipe_item()); + pipe_items.push(merge_into_not_matched_processor.into_pipe_item()); + } self.main_pipeline.add_pipe(Pipe::create( self.main_pipeline.output_len(), get_output_len(&pipe_items), - pipe_items, + pipe_items.clone(), )); - self.main_pipeline - .resize_partial_one(vec![vec![0], vec![1, 2]])?; + // row_id port0_1 + // matched update data port0_2 + // not macthed insert data port0_3 + // row_id port1_1 + // matched update data port1_2 + // not macthed insert data port1_3 + // ...... + assert_eq!(self.main_pipeline.output_len() % 3, 0); + + // merge rowid and serialize_blocks + let mut ranges = Vec::with_capacity(self.main_pipeline.output_len() / 3 * 2); + for idx in (0..self.main_pipeline.output_len()).step_by(3) { + ranges.push(vec![idx]); + ranges.push(vec![idx + 1, idx + 2]); + } + self.main_pipeline.resize_partial_one(ranges.clone())?; + assert_eq!(self.main_pipeline.output_len() % 2, 0); + + // shuffle outputs and resize row_id + // ---------------------------------------------------------------------- + // row_id port0_1 row_id port0_1 + // data port0_2 row_id port1_1 row_id port + // row_id port1_1 ======> ...... ======> + // data port1_2 data port0_2 data port0 + // ...... data port1_2 data port1 + // ...... ..... ..... + // ---------------------------------------------------------------------- + let mut rules = Vec::with_capacity(self.main_pipeline.output_len()); + for idx in 0..(self.main_pipeline.output_len() / 2) { + rules.push(idx); + rules.push(idx + self.main_pipeline.output_len() / 2); + } + self.main_pipeline.reorder_inputs(rules); + // resize row_id + ranges.clear(); + let mut vec = Vec::with_capacity(self.main_pipeline.output_len() / 2); + for idx in 0..(self.main_pipeline.output_len() / 2) { + vec.push(idx); + } + ranges.push(vec.clone()); + for idx in 0..(self.main_pipeline.output_len() / 2) { + ranges.push(vec![idx + self.main_pipeline.output_len() / 2]); + } + self.main_pipeline.resize_partial_one(ranges.clone())?; + // fill default columns let table_default_schema = &table.schema().remove_computed_fields(); let mut builder = self.main_pipeline.add_transform_with_specified_len( @@ -505,9 +557,8 @@ impl PipelineBuilder { tbl.clone(), ) }, - 1, + self.main_pipeline.output_len() - 1, )?; - builder.add_items_prepend(vec![create_dummy_item()]); self.main_pipeline.add_pipe(builder.finalize()); @@ -526,7 +577,7 @@ impl PipelineBuilder { computed_schema.clone(), ) }, - 1, + self.main_pipeline.output_len() - 1, )?; builder.add_items_prepend(vec![create_dummy_item()]); self.main_pipeline.add_pipe(builder.finalize()); @@ -535,15 +586,24 @@ impl PipelineBuilder { let max_threads = self.ctx.get_settings().get_max_threads()?; let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize)); - let pipe_items = vec![ - table.matched_mutator( + pipe_items.clear(); + pipe_items.push(table.rowid_aggregate_mutator( + self.ctx.clone(), + block_builder, + io_request_semaphore, + segments.clone(), + )?); + + for _ in 0..self.main_pipeline.output_len() - 1 { + let serialize_block_transform = TransformSerializeBlock::try_create( self.ctx.clone(), - block_builder, - io_request_semaphore, - segments.clone(), - )?, - serialize_block_transform.into_pipe_item(), - ]; + InputPort::create(), + OutputPort::create(), + table, + cluster_stats_gen.clone(), + )?; + pipe_items.push(serialize_block_transform.into_pipe_item()); + } self.main_pipeline.add_pipe(Pipe::create( self.main_pipeline.output_len(), @@ -551,6 +611,20 @@ impl PipelineBuilder { pipe_items, )); + // resize block ports + // aggregate_mutator port aggregate_mutator port + // serialize_block port0 ======> + // serialize_block port1 serialize_block port + // ....... + ranges.clear(); + ranges.push(vec![0]); + vec.clear(); + for idx in 0..self.main_pipeline.output_len() - 1 { + vec.push(idx + 1); + } + ranges.push(vec); + self.main_pipeline.resize_partial_one(ranges)?; + let pipe_items = vec![ create_dummy_item(), serialize_segment_transform.into_pipe_item(), diff --git a/src/query/storages/fuse/src/operations/merge.rs b/src/query/storages/fuse/src/operations/merge.rs index 6e0e8a07b0c5..e0cea8b952e2 100644 --- a/src/query/storages/fuse/src/operations/merge.rs +++ b/src/query/storages/fuse/src/operations/merge.rs @@ -28,7 +28,7 @@ use crate::FuseTable; impl FuseTable { // todo: (JackTan25) add pipeline picture - pub fn matched_mutator( + pub fn rowid_aggregate_mutator( &self, ctx: Arc, block_builder: BlockBuilder, diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/split_by_expr_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/split_by_expr_mutator.rs index 62429991e8c4..f983c982ab7e 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/split_by_expr_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/split_by_expr_mutator.rs @@ -23,6 +23,7 @@ use common_expression::Expr; use common_expression::FunctionContext; use common_functions::BUILTIN_FUNCTIONS; use common_sql::executor::cast_expr_to_non_null_boolean; + pub struct SplitByExprMutator { expr: Option, func_ctx: FunctionContext, diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/update_by_expr_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/update_by_expr_mutator.rs index a10789143e81..6fc9d09241fa 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/update_by_expr_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/update_by_expr_mutator.rs @@ -32,6 +32,7 @@ use common_functions::BUILTIN_FUNCTIONS; use common_sql::evaluator::BlockOperator; use common_sql::executor::cast_expr_to_non_null_boolean; +#[derive(Clone)] pub struct UpdateByExprMutator { expr: Option, func_ctx: FunctionContext, diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs index af1d2f455e9e..7eab78e67883 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs @@ -43,6 +43,7 @@ struct InsertDataBlockMutation { } // need to evaluate expression and + pub struct MergeIntoNotMatchedProcessor { input_port: Arc, output_port: Arc,