From 4c41ac42003521e032a32be5e6758fd5b7b9bff0 Mon Sep 17 00:00:00 2001 From: zhyass Date: Mon, 16 Oct 2023 20:13:58 +0800 Subject: [PATCH] enable in optimize --- .../interpreter_table_optimize.rs | 87 +++++++++++-------- .../storages/fuse/src/operations/recluster.rs | 2 +- 2 files changed, 54 insertions(+), 35 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index 0756398b4c94f..83a4ec0fcdc52 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -33,8 +33,10 @@ use common_sql::executor::PhysicalPlan; use common_sql::plans::OptimizeTableAction; use common_sql::plans::OptimizeTablePlan; use common_storages_factory::NavigationPoint; +use common_storages_fuse::FuseTable; use storages_common_table_meta::meta::TableSnapshot; +use crate::interpreters::interpreter_table_recluster::build_recluster_physical_plan; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; use crate::pipelines::executor::ExecutorSettings; @@ -174,7 +176,6 @@ impl OptimizeTableInterpreter { let mut build_res = PipelineBuildResult::create(); let settings = self.ctx.get_settings(); - let mut reclustered_block_count = 0; let need_recluster = !table.cluster_keys(self.ctx.clone()).is_empty(); if need_recluster { if !compact_pipeline.is_empty() { @@ -192,47 +193,65 @@ impl OptimizeTableInterpreter { table = table.as_ref().refresh(self.ctx.as_ref()).await?; } - reclustered_block_count = table - .recluster( - self.ctx.clone(), - None, - self.plan.limit, - &mut build_res.main_pipeline, - ) - .await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + if let Some(mutator) = fuse_table + .build_recluster_mutator(self.ctx.clone(), None, self.plan.limit) + .await? + { + if !mutator.tasks.is_empty() { + let reclustered_block_count = mutator.recluster_blocks_count; + let physical_plan = build_recluster_physical_plan( + mutator.tasks, + table.get_table_info().clone(), + catalog.info(), + mutator.snapshot, + mutator.remained_blocks, + mutator.removed_segment_indexes, + mutator.removed_segment_summary, + )?; + + build_res = build_query_pipeline_without_render_result_set( + &self.ctx, + &physical_plan, + false, + ) + .await?; + + let ctx = self.ctx.clone(); + let plan = self.plan.clone(); + let start = SystemTime::now(); + build_res + .main_pipeline + .set_on_finished(move |may_error| match may_error { + None => InterpreterClusteringHistory::write_log( + &ctx, + start, + &plan.database, + &plan.table, + reclustered_block_count, + ), + Some(error_code) => Err(error_code.clone()), + }); + } + } } else { build_res.main_pipeline = compact_pipeline; } let ctx = self.ctx.clone(); let plan = self.plan.clone(); - if build_res.main_pipeline.is_empty() { - if need_purge { + if need_purge { + if build_res.main_pipeline.is_empty() { purge(ctx, plan, None).await?; + } else { + build_res + .main_pipeline + .set_on_finished(move |may_error| match may_error { + None => GlobalIORuntime::instance() + .block_on(async move { purge(ctx, plan, None).await }), + Some(error_code) => Err(error_code.clone()), + }); } - } else { - let start = SystemTime::now(); - build_res - .main_pipeline - .set_on_finished(move |may_error| match may_error { - None => { - if need_recluster { - InterpreterClusteringHistory::write_log( - &ctx, - start, - &plan.database, - &plan.table, - reclustered_block_count, - )?; - } - if need_purge { - GlobalIORuntime::instance() - .block_on(async move { purge(ctx, plan, None).await })?; - } - Ok(()) - } - Some(error_code) => Err(error_code.clone()), - }); } Ok(build_res) diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index be46cb2fc6411..ffce6f471b808 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -100,7 +100,7 @@ impl FuseTable { let settings = ctx.get_settings(); let mut max_tasks = 1; let cluster = ctx.get_cluster(); - if !cluster.is_empty() && settings.get_enable_distributed_recluster()?{ + if !cluster.is_empty() && settings.get_enable_distributed_recluster()? { max_tasks = cluster.nodes.len(); }