Skip to content

Commit

Permalink
feat(storage): distributed execution of recluster (databendlabs#13048)
Browse files Browse the repository at this point in the history
* distributed execution of recluster

* enable distribute recluster

* resolve conflict

* remove checkout prefect block

* add metrics

* fix test

* add comment

* add setting enable_distribute_recluster

* enable in optimize

* add status

* add test

* resolve conflict

---------

Co-authored-by: sundyli <[email protected]>
  • Loading branch information
2 people authored and andylokandy committed Nov 27, 2023
1 parent 74e001b commit 8244e13
Show file tree
Hide file tree
Showing 30 changed files with 843 additions and 405 deletions.
2 changes: 1 addition & 1 deletion src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ pub trait TableExt: Table {
let tid = table_info.ident.table_id;
let catalog = ctx.get_catalog(table_info.catalog()).await?;
let (ident, meta) = catalog.get_table_meta_by_id(tid).await?;
let table_info: TableInfo = TableInfo {
let table_info = TableInfo {
ident,
desc: "".to_owned(),
name,
Expand Down
99 changes: 60 additions & 39 deletions src/query/service/src/interpreters/interpreter_table_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::time::SystemTime;
use common_base::runtime::GlobalIORuntime;
use common_catalog::plan::Partitions;
use common_catalog::table::CompactTarget;
use common_catalog::table::TableExt;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_app::schema::CatalogInfo;
Expand All @@ -33,8 +32,10 @@ use common_sql::executor::PhysicalPlan;
use common_sql::plans::OptimizeTableAction;
use common_sql::plans::OptimizeTablePlan;
use common_storages_factory::NavigationPoint;
use common_storages_fuse::FuseTable;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::interpreter_table_recluster::build_recluster_physical_plan;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::pipelines::executor::ExecutorSettings;
Expand Down Expand Up @@ -122,9 +123,10 @@ impl OptimizeTableInterpreter {
target: CompactTarget,
need_purge: bool,
) -> Result<PipelineBuildResult> {
let mut table = self
.ctx
.get_table(&self.plan.catalog, &self.plan.database, &self.plan.table)
let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let tenant = self.ctx.get_tenant();
let mut table = catalog
.get_table(tenant.as_str(), &self.plan.database, &self.plan.table)
.await?;

let table_info = table.get_table_info().clone();
Expand Down Expand Up @@ -174,7 +176,6 @@ impl OptimizeTableInterpreter {

let mut build_res = PipelineBuildResult::create();
let settings = self.ctx.get_settings();
let mut reclustered_block_count = 0;
let need_recluster = !table.cluster_keys(self.ctx.clone()).is_empty();
if need_recluster {
if !compact_pipeline.is_empty() {
Expand All @@ -189,50 +190,70 @@ impl OptimizeTableInterpreter {
executor.execute()?;

// refresh table.
table = table.as_ref().refresh(self.ctx.as_ref()).await?;
table = catalog
.get_table(tenant.as_str(), &self.plan.database, &self.plan.table)
.await?;
}

reclustered_block_count = table
.recluster(
self.ctx.clone(),
None,
self.plan.limit,
&mut build_res.main_pipeline,
)
.await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
if let Some(mutator) = fuse_table
.build_recluster_mutator(self.ctx.clone(), None, self.plan.limit)
.await?
{
if !mutator.tasks.is_empty() {
let reclustered_block_count = mutator.recluster_blocks_count;
let physical_plan = build_recluster_physical_plan(
mutator.tasks,
table.get_table_info().clone(),
catalog.info(),
mutator.snapshot,
mutator.remained_blocks,
mutator.removed_segment_indexes,
mutator.removed_segment_summary,
)?;

build_res = build_query_pipeline_without_render_result_set(
&self.ctx,
&physical_plan,
false,
)
.await?;

let ctx = self.ctx.clone();
let plan = self.plan.clone();
let start = SystemTime::now();
build_res
.main_pipeline
.set_on_finished(move |may_error| match may_error {
None => InterpreterClusteringHistory::write_log(
&ctx,
start,
&plan.database,
&plan.table,
reclustered_block_count,
),
Some(error_code) => Err(error_code.clone()),
});
}
}
} else {
build_res.main_pipeline = compact_pipeline;
}

let ctx = self.ctx.clone();
let plan = self.plan.clone();
if build_res.main_pipeline.is_empty() {
if need_purge {
if need_purge {
if build_res.main_pipeline.is_empty() {
purge(ctx, plan, None).await?;
} else {
build_res
.main_pipeline
.set_on_finished(move |may_error| match may_error {
None => GlobalIORuntime::instance()
.block_on(async move { purge(ctx, plan, None).await }),
Some(error_code) => Err(error_code.clone()),
});
}
} else {
let start = SystemTime::now();
build_res
.main_pipeline
.set_on_finished(move |may_error| match may_error {
None => {
if need_recluster {
InterpreterClusteringHistory::write_log(
&ctx,
start,
&plan.database,
&plan.table,
reclustered_block_count,
)?;
}
if need_purge {
GlobalIORuntime::instance()
.block_on(async move { purge(ctx, plan, None).await })?;
}
Ok(())
}
Some(error_code) => Err(error_code.clone()),
});
}

Ok(build_res)
Expand Down
127 changes: 100 additions & 27 deletions src/query/service/src/interpreters/interpreter_table_recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,28 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::type_check::check_function;
use common_functions::BUILTIN_FUNCTIONS;
use common_meta_app::schema::CatalogInfo;
use common_meta_app::schema::TableInfo;
use common_sql::executor::Exchange;
use common_sql::executor::FragmentKind;
use common_sql::executor::PhysicalPlan;
use common_sql::executor::ReclusterSink;
use common_sql::executor::ReclusterSource;
use common_sql::executor::ReclusterTask;
use common_storages_fuse::FuseTable;
use log::error;
use log::info;
use log::warn;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::Statistics;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::Pipeline;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::sql::executor::cast_expr_to_non_null_boolean;
Expand Down Expand Up @@ -59,17 +71,9 @@ impl Interpreter for ReclusterTableInterpreter {
let plan = &self.plan;
let ctx = self.ctx.clone();
let settings = ctx.get_settings();
let tenant = ctx.get_tenant();
let max_threads = settings.get_max_threads()?;
let max_threads = settings.get_max_threads()? as usize;
let recluster_timeout_secs = settings.get_recluster_timeout_secs()?;

// Status.
{
let status = "recluster: begin to run recluster";
ctx.set_status_info(status);
info!("{}", status);
}

// Build extras via push down scalar
let extras = if let Some(scalar) = &plan.push_downs {
// prepare the filter expression
Expand All @@ -95,6 +99,12 @@ impl Interpreter for ReclusterTableInterpreter {
None
};

let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let tenant = self.ctx.get_tenant();
let mut table = catalog
.get_table(tenant.as_str(), &self.plan.database, &self.plan.table)
.await?;

let mut times = 0;
let mut block_count = 0;
let start = SystemTime::now();
Expand All @@ -108,13 +118,6 @@ impl Interpreter for ReclusterTableInterpreter {
return Err(err);
}

let table = self
.ctx
.get_catalog(&plan.catalog)
.await?
.get_table(tenant.as_str(), &plan.database, &plan.table)
.await?;

// check if the table is locked.
let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let reply = catalog
Expand All @@ -127,24 +130,52 @@ impl Interpreter for ReclusterTableInterpreter {
)));
}

let mut pipeline = Pipeline::create();
let reclustered_block_count = table
.recluster(ctx.clone(), extras.clone(), plan.limit, &mut pipeline)
// Status.
{
let status = "recluster: begin to run recluster";
ctx.set_status_info(status);
info!("{}", status);
}

let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let mutator = fuse_table
.build_recluster_mutator(ctx.clone(), extras.clone(), plan.limit)
.await?;
if pipeline.is_empty() {
if mutator.is_none() {
break;
};

block_count += reclustered_block_count;
let max_threads = std::cmp::min(max_threads, reclustered_block_count) as usize;
pipeline.set_max_threads(max_threads);
let mutator = mutator.unwrap();
if mutator.tasks.is_empty() {
break;
};
block_count += mutator.recluster_blocks_count;
let physical_plan = build_recluster_physical_plan(
mutator.tasks,
table.get_table_info().clone(),
catalog.info(),
mutator.snapshot,
mutator.remained_blocks,
mutator.removed_segment_indexes,
mutator.removed_segment_summary,
)?;

let mut build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan, false)
.await?;
assert!(build_res.main_pipeline.is_complete_pipeline()?);
build_res.set_max_threads(max_threads);

let query_id = ctx.get_id();
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;

ctx.set_executor(executor.get_inner())?;
executor.execute()?;
let mut pipelines = build_res.sources_pipelines;
pipelines.push(build_res.main_pipeline);

let complete_executor =
PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;

let elapsed_time = SystemTime::now().duration_since(start).unwrap();
times += 1;
Expand All @@ -170,6 +201,11 @@ impl Interpreter for ReclusterTableInterpreter {
);
break;
}

// refresh table.
table = catalog
.get_table(tenant.as_str(), &self.plan.database, &self.plan.table)
.await?;
}

if block_count != 0 {
Expand All @@ -185,3 +221,40 @@ impl Interpreter for ReclusterTableInterpreter {
Ok(PipelineBuildResult::create())
}
}

pub fn build_recluster_physical_plan(
tasks: Vec<ReclusterTask>,
table_info: TableInfo,
catalog_info: CatalogInfo,
snapshot: Arc<TableSnapshot>,
remained_blocks: Vec<Arc<BlockMeta>>,
removed_segment_indexes: Vec<usize>,
removed_segment_summary: Statistics,
) -> Result<PhysicalPlan> {
let is_distributed = tasks.len() > 1;
let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource {
tasks,
table_info: table_info.clone(),
catalog_info: catalog_info.clone(),
}));

if is_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
kind: FragmentKind::Merge,
keys: vec![],
ignore_exchange: false,
});
}

Ok(PhysicalPlan::ReclusterSink(Box::new(ReclusterSink {
input: Box::new(root),
table_info,
catalog_info,
snapshot,
remained_blocks,
removed_segment_indexes,
removed_segment_summary,
})))
}
Loading

0 comments on commit 8244e13

Please sign in to comment.