Skip to content

Commit

Permalink
enable in optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Oct 16, 2023
1 parent b3fecde commit 4c41ac4
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 35 deletions.
87 changes: 53 additions & 34 deletions src/query/service/src/interpreters/interpreter_table_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ use common_sql::executor::PhysicalPlan;
use common_sql::plans::OptimizeTableAction;
use common_sql::plans::OptimizeTablePlan;
use common_storages_factory::NavigationPoint;
use common_storages_fuse::FuseTable;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::interpreter_table_recluster::build_recluster_physical_plan;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::pipelines::executor::ExecutorSettings;
Expand Down Expand Up @@ -174,7 +176,6 @@ impl OptimizeTableInterpreter {

let mut build_res = PipelineBuildResult::create();
let settings = self.ctx.get_settings();
let mut reclustered_block_count = 0;
let need_recluster = !table.cluster_keys(self.ctx.clone()).is_empty();
if need_recluster {
if !compact_pipeline.is_empty() {
Expand All @@ -192,47 +193,65 @@ impl OptimizeTableInterpreter {
table = table.as_ref().refresh(self.ctx.as_ref()).await?;
}

reclustered_block_count = table
.recluster(
self.ctx.clone(),
None,
self.plan.limit,
&mut build_res.main_pipeline,
)
.await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
if let Some(mutator) = fuse_table
.build_recluster_mutator(self.ctx.clone(), None, self.plan.limit)
.await?
{
if !mutator.tasks.is_empty() {
let reclustered_block_count = mutator.recluster_blocks_count;
let physical_plan = build_recluster_physical_plan(
mutator.tasks,
table.get_table_info().clone(),
catalog.info(),
mutator.snapshot,
mutator.remained_blocks,
mutator.removed_segment_indexes,
mutator.removed_segment_summary,
)?;

build_res = build_query_pipeline_without_render_result_set(
&self.ctx,
&physical_plan,
false,
)
.await?;

let ctx = self.ctx.clone();
let plan = self.plan.clone();
let start = SystemTime::now();
build_res
.main_pipeline
.set_on_finished(move |may_error| match may_error {
None => InterpreterClusteringHistory::write_log(
&ctx,
start,
&plan.database,
&plan.table,
reclustered_block_count,
),
Some(error_code) => Err(error_code.clone()),
});
}
}
} else {
build_res.main_pipeline = compact_pipeline;
}

let ctx = self.ctx.clone();
let plan = self.plan.clone();
if build_res.main_pipeline.is_empty() {
if need_purge {
if need_purge {
if build_res.main_pipeline.is_empty() {
purge(ctx, plan, None).await?;
} else {
build_res
.main_pipeline
.set_on_finished(move |may_error| match may_error {
None => GlobalIORuntime::instance()
.block_on(async move { purge(ctx, plan, None).await }),
Some(error_code) => Err(error_code.clone()),
});
}
} else {
let start = SystemTime::now();
build_res
.main_pipeline
.set_on_finished(move |may_error| match may_error {
None => {
if need_recluster {
InterpreterClusteringHistory::write_log(
&ctx,
start,
&plan.database,
&plan.table,
reclustered_block_count,
)?;
}
if need_purge {
GlobalIORuntime::instance()
.block_on(async move { purge(ctx, plan, None).await })?;
}
Ok(())
}
Some(error_code) => Err(error_code.clone()),
});
}

Ok(build_res)
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl FuseTable {
let settings = ctx.get_settings();
let mut max_tasks = 1;
let cluster = ctx.get_cluster();
if !cluster.is_empty() && settings.get_enable_distributed_recluster()?{
if !cluster.is_empty() && settings.get_enable_distributed_recluster()? {
max_tasks = cluster.nodes.len();
}

Expand Down

0 comments on commit 4c41ac4

Please sign in to comment.