Skip to content

Commit

Permalink
feat: improve parallel run for merge into (#13045)
Browse files Browse the repository at this point in the history
* add test first

* modify shell

* finish parallel

* fix linux check

* fix bug

---------

Co-authored-by: dantengsky <[email protected]>
  • Loading branch information
JackTan25 and dantengsky authored Oct 7, 2023
1 parent def4923 commit 754bb3e
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 43 deletions.
2 changes: 1 addition & 1 deletion benchmark/clickbench/benchmark_local_merge_into.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ function run_query() {
fi
}

TRIES=3
TRIES=1
QUERY_NUM=0
while read -r query; do
echo "Running Q${QUERY_NUM}: ${query}"
Expand Down
2 changes: 2 additions & 0 deletions benchmark/clickbench/merge_into/queries.sql
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
set enable_experimental_merge_into = 1;merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *;
set enable_experimental_merge_into = 1;merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *;
set enable_experimental_merge_into = 1;merge into target_table as t1 using (select * from source_table as t2) on t1.l_partkey = t2.l_partkey and t1.l_orderkey = t2.l_orderkey and t1.l_suppkey = t2.l_suppkey and t1.l_linenumber = t2.l_linenumber when matched and t1.l_partkey >= 200000 then update * when matched then delete when not matched then insert *;
2 changes: 1 addition & 1 deletion src/query/pipeline/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl Pipeline {
}
}

/// Duplite a pipe input to two outputs.
/// Duplicate a pipe input to two outputs.
///
/// If `force_finish_together` enabled, once one output is finished, the other output will be finished too.
pub fn duplicate(&mut self, force_finish_together: bool) -> Result<()> {
Expand Down
154 changes: 114 additions & 40 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,16 @@ impl PipelineBuilder {
let MergeIntoSource { input, row_id_idx } = merge_into_source;

self.build_pipeline(input)?;
self.main_pipeline.try_resize(1)?;
let merge_into_split_processor = MergeIntoSplitProcessor::create(*row_id_idx, false)?;
// merge into's parallism depends on the join probe number.
let mut items = Vec::with_capacity(self.main_pipeline.output_len());
let output_len = self.main_pipeline.output_len();
for _ in 0..output_len {
let merge_into_split_processor = MergeIntoSplitProcessor::create(*row_id_idx, false)?;
items.push(merge_into_split_processor.into_pipe_item());
}

self.main_pipeline
.add_pipe(merge_into_split_processor.into_pipe());
.add_pipe(Pipe::create(output_len, output_len * 2, items));

Ok(())
}
Expand Down Expand Up @@ -431,40 +436,27 @@ impl PipelineBuilder {
row_id_idx,
segments,
} = merge_into;

self.build_pipeline(input)?;
let tbl = self
.ctx
.build_table_by_table_info(catalog_info, table_info, None)?;
let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create(
unmatched.clone(),
input.output_schema()?,
self.ctx.get_function_context()?,
)?;

let matched_split_processor = MatchedSplitProcessor::create(
self.ctx.clone(),
*row_id_idx,
matched.clone(),
field_index_of_input_schema.clone(),
input.output_schema()?,
Arc::new(DataSchema::from(tbl.schema())),
)?;

let table = FuseTable::try_from_table(tbl.as_ref())?;
let block_thresholds = table.get_block_thresholds();

let cluster_stats_gen =
table.get_cluster_stats_gen(self.ctx.clone(), 0, block_thresholds)?;

// append data for unmatched data
let serialize_block_transform = TransformSerializeBlock::try_create(
// this TransformSerializeBlock is just used to get block_builder
let block_builder = TransformSerializeBlock::try_create(
self.ctx.clone(),
InputPort::create(),
OutputPort::create(),
table,
cluster_stats_gen,
)?;
let block_builder = serialize_block_transform.get_block_builder();
cluster_stats_gen.clone(),
)?
.get_block_builder();

let serialize_segment_transform = TransformSerializeSegment::new(
self.ctx.clone(),
Expand All @@ -480,19 +472,79 @@ impl PipelineBuilder {
}
output_len
};
let pipe_items = vec![
matched_split_processor.into_pipe_item(),
merge_into_not_matched_processor.into_pipe_item(),
];

// receive matched data and not matched data parallelly.
let mut pipe_items = Vec::with_capacity(self.main_pipeline.output_len());
for _ in (0..self.main_pipeline.output_len()).step_by(2) {
let matched_split_processor = MatchedSplitProcessor::create(
self.ctx.clone(),
*row_id_idx,
matched.clone(),
field_index_of_input_schema.clone(),
input.output_schema()?,
Arc::new(DataSchema::from(tbl.schema())),
)?;

let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create(
unmatched.clone(),
input.output_schema()?,
self.ctx.get_function_context()?,
)?;

pipe_items.push(matched_split_processor.into_pipe_item());
pipe_items.push(merge_into_not_matched_processor.into_pipe_item());
}
self.main_pipeline.add_pipe(Pipe::create(
self.main_pipeline.output_len(),
get_output_len(&pipe_items),
pipe_items,
pipe_items.clone(),
));

self.main_pipeline
.resize_partial_one(vec![vec![0], vec![1, 2]])?;
// row_id port0_1
// matched update data port0_2
// not macthed insert data port0_3
// row_id port1_1
// matched update data port1_2
// not macthed insert data port1_3
// ......
assert_eq!(self.main_pipeline.output_len() % 3, 0);

// merge rowid and serialize_blocks
let mut ranges = Vec::with_capacity(self.main_pipeline.output_len() / 3 * 2);
for idx in (0..self.main_pipeline.output_len()).step_by(3) {
ranges.push(vec![idx]);
ranges.push(vec![idx + 1, idx + 2]);
}
self.main_pipeline.resize_partial_one(ranges.clone())?;
assert_eq!(self.main_pipeline.output_len() % 2, 0);

// shuffle outputs and resize row_id
// ----------------------------------------------------------------------
// row_id port0_1 row_id port0_1
// data port0_2 row_id port1_1 row_id port
// row_id port1_1 ======> ...... ======>
// data port1_2 data port0_2 data port0
// ...... data port1_2 data port1
// ...... ..... .....
// ----------------------------------------------------------------------
let mut rules = Vec::with_capacity(self.main_pipeline.output_len());
for idx in 0..(self.main_pipeline.output_len() / 2) {
rules.push(idx);
rules.push(idx + self.main_pipeline.output_len() / 2);
}
self.main_pipeline.reorder_inputs(rules);
// resize row_id
ranges.clear();
let mut vec = Vec::with_capacity(self.main_pipeline.output_len() / 2);
for idx in 0..(self.main_pipeline.output_len() / 2) {
vec.push(idx);
}
ranges.push(vec.clone());
for idx in 0..(self.main_pipeline.output_len() / 2) {
ranges.push(vec![idx + self.main_pipeline.output_len() / 2]);
}
self.main_pipeline.resize_partial_one(ranges.clone())?;

// fill default columns
let table_default_schema = &table.schema().remove_computed_fields();
let mut builder = self.main_pipeline.add_transform_with_specified_len(
Expand All @@ -505,9 +557,8 @@ impl PipelineBuilder {
tbl.clone(),
)
},
1,
self.main_pipeline.output_len() - 1,
)?;

builder.add_items_prepend(vec![create_dummy_item()]);
self.main_pipeline.add_pipe(builder.finalize());

Expand All @@ -526,7 +577,7 @@ impl PipelineBuilder {
computed_schema.clone(),
)
},
1,
self.main_pipeline.output_len() - 1,
)?;
builder.add_items_prepend(vec![create_dummy_item()]);
self.main_pipeline.add_pipe(builder.finalize());
Expand All @@ -535,22 +586,45 @@ impl PipelineBuilder {
let max_threads = self.ctx.get_settings().get_max_threads()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize));

let pipe_items = vec![
table.matched_mutator(
pipe_items.clear();
pipe_items.push(table.rowid_aggregate_mutator(
self.ctx.clone(),
block_builder,
io_request_semaphore,
segments.clone(),
)?);

for _ in 0..self.main_pipeline.output_len() - 1 {
let serialize_block_transform = TransformSerializeBlock::try_create(
self.ctx.clone(),
block_builder,
io_request_semaphore,
segments.clone(),
)?,
serialize_block_transform.into_pipe_item(),
];
InputPort::create(),
OutputPort::create(),
table,
cluster_stats_gen.clone(),
)?;
pipe_items.push(serialize_block_transform.into_pipe_item());
}

self.main_pipeline.add_pipe(Pipe::create(
self.main_pipeline.output_len(),
get_output_len(&pipe_items),
pipe_items,
));

// resize block ports
// aggregate_mutator port aggregate_mutator port
// serialize_block port0 ======>
// serialize_block port1 serialize_block port
// .......
ranges.clear();
ranges.push(vec![0]);
vec.clear();
for idx in 0..self.main_pipeline.output_len() - 1 {
vec.push(idx + 1);
}
ranges.push(vec);
self.main_pipeline.resize_partial_one(ranges)?;

let pipe_items = vec![
create_dummy_item(),
serialize_segment_transform.into_pipe_item(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::FuseTable;

impl FuseTable {
// todo: (JackTan25) add pipeline picture
pub fn matched_mutator(
pub fn rowid_aggregate_mutator(
&self,
ctx: Arc<dyn TableContext>,
block_builder: BlockBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_expression::Expr;
use common_expression::FunctionContext;
use common_functions::BUILTIN_FUNCTIONS;
use common_sql::executor::cast_expr_to_non_null_boolean;

pub struct SplitByExprMutator {
expr: Option<Expr>,
func_ctx: FunctionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use common_functions::BUILTIN_FUNCTIONS;
use common_sql::evaluator::BlockOperator;
use common_sql::executor::cast_expr_to_non_null_boolean;

#[derive(Clone)]
pub struct UpdateByExprMutator {
expr: Option<Expr>,
func_ctx: FunctionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct InsertDataBlockMutation {
}

// need to evaluate expression and

pub struct MergeIntoNotMatchedProcessor {
input_port: Arc<InputPort>,
output_port: Arc<OutputPort>,
Expand Down

1 comment on commit 754bb3e

@vercel
Copy link

@vercel vercel bot commented on 754bb3e Oct 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.