Skip to content

Commit

Permalink
feat: add auto_recluster,compact,support cluster table (databendlabs#…
Browse files Browse the repository at this point in the history
…13150)

* add auto_recluster,compact,support cluster table

* fix typo

* Update tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into

Co-authored-by: dantengsky <[email protected]>

* add comments

---------

Co-authored-by: dantengsky <[email protected]>
  • Loading branch information
2 people authored and andylokandy committed Nov 27, 2023
1 parent e5f7659 commit 759a411
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use common_base::runtime::GlobalIORuntime;
use common_exception::ErrorCode;
Expand Down Expand Up @@ -43,6 +44,9 @@ use table_lock::TableLockHandlerWrapper;

use super::Interpreter;
use super::InterpreterPtr;
use crate::interpreters::common::hook_compact;
use crate::interpreters::common::CompactHookTraceCtx;
use crate::interpreters::common::CompactTargetTableDescription;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
Expand All @@ -67,6 +71,7 @@ impl Interpreter for MergeIntoInterpreter {

#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
let start = Instant::now();
let (physical_plan, table_info) = self.build_physical_plan().await?;
let mut build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan, false)
Expand All @@ -76,6 +81,26 @@ impl Interpreter for MergeIntoInterpreter {
let handler = TableLockHandlerWrapper::instance(self.ctx.clone());
let mut heartbeat = handler.try_lock(self.ctx.clone(), table_info).await?;

// hook compact
let compact_target = CompactTargetTableDescription {
catalog: self.plan.catalog.clone(),
database: self.plan.database.clone(),
table: self.plan.table.clone(),
};

let compact_hook_trace_ctx = CompactHookTraceCtx {
start,
operation_name: "merge_into".to_owned(),
};

hook_compact(
self.ctx.clone(),
&mut build_res.main_pipeline,
compact_target,
compact_hook_trace_ctx,
)
.await;

if build_res.main_pipeline.is_empty() {
heartbeat.shutdown().await?;
} else {
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,15 @@ impl PipelineBuilder {
let max_threads = self.settings.get_max_threads()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize));

// after filling default columns, we need to add cluster‘s blocksort if it's a cluster table
let output_lens = self.main_pipeline.output_len();
table.cluster_gen_for_append_with_specified_last_len(
self.ctx.clone(),
&mut self.main_pipeline,
block_thresholds,
output_lens - 1,
)?;

pipe_items.clear();
pipe_items.push(table.rowid_aggregate_mutator(
self.ctx.clone(),
Expand Down
60 changes: 60 additions & 0 deletions src/query/storages/fuse/src/operations/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_expression::SortColumnDescription;
use common_functions::BUILTIN_FUNCTIONS;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_core::Pipeline;
use common_pipeline_transforms::processors::transforms::create_dummy_items;
use common_pipeline_transforms::processors::transforms::transform_block_compact_for_copy::BlockCompactorForCopy;
use common_pipeline_transforms::processors::transforms::BlockCompactor;
use common_pipeline_transforms::processors::transforms::TransformCompact;
Expand Down Expand Up @@ -85,6 +86,64 @@ impl FuseTable {
Ok(())
}

pub fn cluster_gen_for_append_with_specified_last_len(
&self,
ctx: Arc<dyn TableContext>,
pipeline: &mut Pipeline,
block_thresholds: BlockThresholds,
specified_last_len: usize,
) -> Result<ClusterStatsGenerator> {
let cluster_stats_gen = self.get_cluster_stats_gen(ctx.clone(), 0, block_thresholds)?;
let output_lens = pipeline.output_len();
let items1 = create_dummy_items(output_lens - specified_last_len, output_lens);
let items2 = create_dummy_items(output_lens - specified_last_len, output_lens);
let operators = cluster_stats_gen.operators.clone();
if !operators.is_empty() {
let func_ctx2 = cluster_stats_gen.func_ctx.clone();
let mut builder = pipeline.add_transform_with_specified_len(
move |input, output| {
Ok(ProcessorPtr::create(CompoundBlockOperator::create(
input,
output,
func_ctx2.clone(),
operators.clone(),
)))
},
specified_last_len,
)?;
builder.add_items_prepend(items1);
pipeline.add_pipe(builder.finalize());
}

let cluster_keys = &cluster_stats_gen.cluster_key_index;
if !cluster_keys.is_empty() {
let sort_descs: Vec<SortColumnDescription> = cluster_keys
.iter()
.map(|index| SortColumnDescription {
offset: *index,
asc: true,
nulls_first: false,
is_nullable: false, // This information is not needed here.
})
.collect();

let mut builder = pipeline.add_transform_with_specified_len(
|transform_input_port, transform_output_port| {
Ok(ProcessorPtr::create(TransformSortPartial::try_create(
transform_input_port,
transform_output_port,
None,
sort_descs.clone(),
)?))
},
specified_last_len,
)?;
builder.add_items_prepend(items2);
pipeline.add_pipe(builder.finalize());
}
Ok(cluster_stats_gen)
}

pub fn cluster_gen_for_append(
&self,
ctx: Arc<dyn TableContext>,
Expand All @@ -96,6 +155,7 @@ impl FuseTable {
let operators = cluster_stats_gen.operators.clone();
if !operators.is_empty() {
let func_ctx2 = cluster_stats_gen.func_ctx.clone();

pipeline.add_transform(move |input, output| {
Ok(ProcessorPtr::create(CompoundBlockOperator::create(
input,
Expand Down
26 changes: 26 additions & 0 deletions tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into
Original file line number Diff line number Diff line change
Expand Up @@ -481,5 +481,31 @@ select * from t1_target order by a;
1
2

## cluster table test
statement ok
create table cluster_target(a int,b string,c int) cluster by(a,b);

statement ok
create table cluster_source(a int,b string,c int);

statement ok
insert into cluster_source values(12,'b',1),(1,'a',2),(2,'b',3),(2,'a',4),(3,'a',3);

statement ok
merge into cluster_target as t1 using (select * from cluster_source) as t2 on t1.a = t2.a when not matched then insert *;

# By default setting, all rows merged from `cluster_source` will be resident in a single block of `cluster_target`,
# as table `cluster_target` is clustered by `(a,b)`, the rows inside the one block are assumed to be sorted
# by `(a, b)`, consequently, the result of the following query should be ordered by `(a,b)` without an explicit
# `order by` clause.
query TTT
select * from cluster_target;
----
1 a 2
2 a 4
2 b 3
3 a 3
12 b 1

statement ok
set enable_experimental_merge_into = 0;

0 comments on commit 759a411

Please sign in to comment.