Skip to content

Commit

Permalink
fix: do compact first , then do sort for cluster table (#14707)
Browse files Browse the repository at this point in the history
* add block count test

* fix flaky

* fix

* fix test
  • Loading branch information
JackTan25 authored Feb 21, 2024
1 parent c5f2be7 commit 926ac0a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 33 deletions.
89 changes: 56 additions & 33 deletions src/query/service/src/pipelines/builders/builder_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,18 +229,9 @@ impl PipelineBuilder {
self.main_pipeline.add_pipe(builder.finalize());
}

// 3. cluster sort
let block_thresholds = table.get_block_thresholds();
table.cluster_gen_for_append_with_specified_len(
self.ctx.clone(),
&mut self.main_pipeline,
block_thresholds,
1,
1,
)?;

// 4. we should avoid too much little block write, because for s3 write, there are too many
// 3. we should avoid too much little block write, because for s3 write, there are too many
// little blocks, it will cause high latency.
let block_thresholds = table.get_block_thresholds();
let mut builder = self.main_pipeline.add_transform_with_specified_len(
|transform_input_port, transform_output_port| {
Ok(ProcessorPtr::create(TransformCompact::try_create(
Expand All @@ -254,6 +245,15 @@ impl PipelineBuilder {
builder.add_items(vec![create_dummy_item()]);
self.main_pipeline.add_pipe(builder.finalize());

// 4. cluster sort
table.cluster_gen_for_append_with_specified_len(
self.ctx.clone(),
&mut self.main_pipeline,
block_thresholds,
1,
1,
)?;

// 5. serialize block
let cluster_stats_gen =
table.get_cluster_stats_gen(self.ctx.clone(), 0, block_thresholds, None)?;
Expand Down Expand Up @@ -747,6 +747,28 @@ impl PipelineBuilder {
output_lens
};

// we should avoid too much little block write, because for s3 write, there are too many
// little blocks, it will cause high latency.
let mut builder = self.main_pipeline.add_transform_with_specified_len(
|transform_input_port, transform_output_port| {
Ok(ProcessorPtr::create(TransformCompact::try_create(
transform_input_port,
transform_output_port,
BlockCompactor::new(block_thresholds),
)?))
},
mid_len,
)?;
if need_match {
builder.add_items_prepend(vec![create_dummy_item()]);
}

// need to receive row_number, we should give a dummy item here.
if *distributed && need_unmatch && !*change_join_order {
builder.add_items(vec![create_dummy_item()]);
}
self.main_pipeline.add_pipe(builder.finalize());

table.cluster_gen_for_append_with_specified_len(
self.ctx.clone(),
&mut self.main_pipeline,
Expand Down Expand Up @@ -776,6 +798,29 @@ impl PipelineBuilder {
// arrive the same result (that's appending only one dummy item)
(output_lens - 1, 0)
};

// we should avoid too much little block write, because for s3 write, there are too many
// little blocks, it will cause high latency.
let mut builder = self.main_pipeline.add_transform_with_specified_len(
|transform_input_port, transform_output_port| {
Ok(ProcessorPtr::create(TransformCompact::try_create(
transform_input_port,
transform_output_port,
BlockCompactor::new(block_thresholds),
)?))
},
mid_len,
)?;
if need_match {
builder.add_items_prepend(vec![create_dummy_item()]);
}

// need to receive row_number, we should give a dummy item here.
if *distributed && need_unmatch && !*change_join_order {
builder.add_items(vec![create_dummy_item()]);
}
self.main_pipeline.add_pipe(builder.finalize());

table.cluster_gen_for_append_with_specified_len(
self.ctx.clone(),
&mut self.main_pipeline,
Expand All @@ -787,28 +832,6 @@ impl PipelineBuilder {
};
pipe_items.clear();

// we should avoid too much little block write, because for s3 write, there are too many
// little blocks, it will cause high latency.
let mut builder = self.main_pipeline.add_transform_with_specified_len(
|transform_input_port, transform_output_port| {
Ok(ProcessorPtr::create(TransformCompact::try_create(
transform_input_port,
transform_output_port,
BlockCompactor::new(block_thresholds),
)?))
},
serialize_len,
)?;
if need_match {
builder.add_items_prepend(vec![create_dummy_item()]);
}

// need to receive row_number, we should give a dummy item here.
if *distributed && need_unmatch && !*change_join_order {
builder.add_items(vec![create_dummy_item()]);
}
self.main_pipeline.add_pipe(builder.finalize());

if need_match {
// rowid should be accumulated in main node.
if *change_join_order && *distributed {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ merge into t1_separate as t1 using (select * from t2_separate) as t2 on t1.a = t
----
2 4

query T
select count(*) from fuse_block('default','t1_separate');
----
1

## we will do compact
query TTT
select * from t1_separate;
Expand Down

0 comments on commit 926ac0a

Please sign in to comment.