diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 4f00d13a664f1..dca6e32bb601d 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -386,7 +386,7 @@ pub trait TableExt: Table { let tid = table_info.ident.table_id; let catalog = ctx.get_catalog(table_info.catalog()).await?; let (ident, meta) = catalog.get_table_meta_by_id(tid).await?; - let table_info: TableInfo = TableInfo { + let table_info = TableInfo { ident, desc: "".to_owned(), name, diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index f32dd35013729..eeaf612e18b0f 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -18,7 +18,6 @@ use std::time::SystemTime; use common_base::runtime::GlobalIORuntime; use common_catalog::plan::Partitions; use common_catalog::table::CompactTarget; -use common_catalog::table::TableExt; use common_exception::ErrorCode; use common_exception::Result; use common_meta_app::schema::CatalogInfo; @@ -33,8 +32,10 @@ use common_sql::executor::PhysicalPlan; use common_sql::plans::OptimizeTableAction; use common_sql::plans::OptimizeTablePlan; use common_storages_factory::NavigationPoint; +use common_storages_fuse::FuseTable; use storages_common_table_meta::meta::TableSnapshot; +use crate::interpreters::interpreter_table_recluster::build_recluster_physical_plan; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; use crate::pipelines::executor::ExecutorSettings; @@ -122,9 +123,10 @@ impl OptimizeTableInterpreter { target: CompactTarget, need_purge: bool, ) -> Result { - let mut table = self - .ctx - .get_table(&self.plan.catalog, &self.plan.database, &self.plan.table) + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; + let tenant = self.ctx.get_tenant(); + let mut table = catalog + .get_table(tenant.as_str(), &self.plan.database, &self.plan.table) .await?; let table_info = table.get_table_info().clone(); @@ -174,7 +176,6 @@ impl OptimizeTableInterpreter { let mut build_res = PipelineBuildResult::create(); let settings = self.ctx.get_settings(); - let mut reclustered_block_count = 0; let need_recluster = !table.cluster_keys(self.ctx.clone()).is_empty(); if need_recluster { if !compact_pipeline.is_empty() { @@ -189,50 +190,70 @@ impl OptimizeTableInterpreter { executor.execute()?; // refresh table. - table = table.as_ref().refresh(self.ctx.as_ref()).await?; + table = catalog + .get_table(tenant.as_str(), &self.plan.database, &self.plan.table) + .await?; } - reclustered_block_count = table - .recluster( - self.ctx.clone(), - None, - self.plan.limit, - &mut build_res.main_pipeline, - ) - .await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + if let Some(mutator) = fuse_table + .build_recluster_mutator(self.ctx.clone(), None, self.plan.limit) + .await? + { + if !mutator.tasks.is_empty() { + let reclustered_block_count = mutator.recluster_blocks_count; + let physical_plan = build_recluster_physical_plan( + mutator.tasks, + table.get_table_info().clone(), + catalog.info(), + mutator.snapshot, + mutator.remained_blocks, + mutator.removed_segment_indexes, + mutator.removed_segment_summary, + )?; + + build_res = build_query_pipeline_without_render_result_set( + &self.ctx, + &physical_plan, + false, + ) + .await?; + + let ctx = self.ctx.clone(); + let plan = self.plan.clone(); + let start = SystemTime::now(); + build_res + .main_pipeline + .set_on_finished(move |may_error| match may_error { + None => InterpreterClusteringHistory::write_log( + &ctx, + start, + &plan.database, + &plan.table, + reclustered_block_count, + ), + Some(error_code) => Err(error_code.clone()), + }); + } + } } else { build_res.main_pipeline = compact_pipeline; } let ctx = self.ctx.clone(); let plan = self.plan.clone(); - if build_res.main_pipeline.is_empty() { - if need_purge { + if need_purge { + if build_res.main_pipeline.is_empty() { purge(ctx, plan, None).await?; + } else { + build_res + .main_pipeline + .set_on_finished(move |may_error| match may_error { + None => GlobalIORuntime::instance() + .block_on(async move { purge(ctx, plan, None).await }), + Some(error_code) => Err(error_code.clone()), + }); } - } else { - let start = SystemTime::now(); - build_res - .main_pipeline - .set_on_finished(move |may_error| match may_error { - None => { - if need_recluster { - InterpreterClusteringHistory::write_log( - &ctx, - start, - &plan.database, - &plan.table, - reclustered_block_count, - )?; - } - if need_purge { - GlobalIORuntime::instance() - .block_on(async move { purge(ctx, plan, None).await })?; - } - Ok(()) - } - Some(error_code) => Err(error_code.clone()), - }); } Ok(build_res) diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 98ff1891c2455..d5d6800463c10 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -22,16 +22,28 @@ use common_exception::ErrorCode; use common_exception::Result; use common_expression::type_check::check_function; use common_functions::BUILTIN_FUNCTIONS; +use common_meta_app::schema::CatalogInfo; +use common_meta_app::schema::TableInfo; +use common_sql::executor::Exchange; +use common_sql::executor::FragmentKind; +use common_sql::executor::PhysicalPlan; +use common_sql::executor::ReclusterSink; +use common_sql::executor::ReclusterSource; +use common_sql::executor::ReclusterTask; +use common_storages_fuse::FuseTable; use log::error; use log::info; use log::warn; +use storages_common_table_meta::meta::BlockMeta; +use storages_common_table_meta::meta::Statistics; +use storages_common_table_meta::meta::TableSnapshot; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; -use crate::pipelines::Pipeline; use crate::pipelines::PipelineBuildResult; +use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; use crate::sessions::TableContext; use crate::sql::executor::cast_expr_to_non_null_boolean; @@ -59,17 +71,9 @@ impl Interpreter for ReclusterTableInterpreter { let plan = &self.plan; let ctx = self.ctx.clone(); let settings = ctx.get_settings(); - let tenant = ctx.get_tenant(); - let max_threads = settings.get_max_threads()?; + let max_threads = settings.get_max_threads()? as usize; let recluster_timeout_secs = settings.get_recluster_timeout_secs()?; - // Status. - { - let status = "recluster: begin to run recluster"; - ctx.set_status_info(status); - info!("{}", status); - } - // Build extras via push down scalar let extras = if let Some(scalar) = &plan.push_downs { // prepare the filter expression @@ -95,6 +99,12 @@ impl Interpreter for ReclusterTableInterpreter { None }; + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; + let tenant = self.ctx.get_tenant(); + let mut table = catalog + .get_table(tenant.as_str(), &self.plan.database, &self.plan.table) + .await?; + let mut times = 0; let mut block_count = 0; let start = SystemTime::now(); @@ -108,13 +118,6 @@ impl Interpreter for ReclusterTableInterpreter { return Err(err); } - let table = self - .ctx - .get_catalog(&plan.catalog) - .await? - .get_table(tenant.as_str(), &plan.database, &plan.table) - .await?; - // check if the table is locked. let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; let reply = catalog @@ -127,24 +130,52 @@ impl Interpreter for ReclusterTableInterpreter { ))); } - let mut pipeline = Pipeline::create(); - let reclustered_block_count = table - .recluster(ctx.clone(), extras.clone(), plan.limit, &mut pipeline) + // Status. + { + let status = "recluster: begin to run recluster"; + ctx.set_status_info(status); + info!("{}", status); + } + + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let mutator = fuse_table + .build_recluster_mutator(ctx.clone(), extras.clone(), plan.limit) .await?; - if pipeline.is_empty() { + if mutator.is_none() { break; }; - block_count += reclustered_block_count; - let max_threads = std::cmp::min(max_threads, reclustered_block_count) as usize; - pipeline.set_max_threads(max_threads); + let mutator = mutator.unwrap(); + if mutator.tasks.is_empty() { + break; + }; + block_count += mutator.recluster_blocks_count; + let physical_plan = build_recluster_physical_plan( + mutator.tasks, + table.get_table_info().clone(), + catalog.info(), + mutator.snapshot, + mutator.remained_blocks, + mutator.removed_segment_indexes, + mutator.removed_segment_summary, + )?; + + let mut build_res = + build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan, false) + .await?; + assert!(build_res.main_pipeline.is_complete_pipeline()?); + build_res.set_max_threads(max_threads); let query_id = ctx.get_id(); let executor_settings = ExecutorSettings::try_create(&settings, query_id)?; - let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?; - ctx.set_executor(executor.get_inner())?; - executor.execute()?; + let mut pipelines = build_res.sources_pipelines; + pipelines.push(build_res.main_pipeline); + + let complete_executor = + PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?; + ctx.set_executor(complete_executor.get_inner())?; + complete_executor.execute()?; let elapsed_time = SystemTime::now().duration_since(start).unwrap(); times += 1; @@ -170,6 +201,11 @@ impl Interpreter for ReclusterTableInterpreter { ); break; } + + // refresh table. + table = catalog + .get_table(tenant.as_str(), &self.plan.database, &self.plan.table) + .await?; } if block_count != 0 { @@ -185,3 +221,40 @@ impl Interpreter for ReclusterTableInterpreter { Ok(PipelineBuildResult::create()) } } + +pub fn build_recluster_physical_plan( + tasks: Vec, + table_info: TableInfo, + catalog_info: CatalogInfo, + snapshot: Arc, + remained_blocks: Vec>, + removed_segment_indexes: Vec, + removed_segment_summary: Statistics, +) -> Result { + let is_distributed = tasks.len() > 1; + let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource { + tasks, + table_info: table_info.clone(), + catalog_info: catalog_info.clone(), + })); + + if is_distributed { + root = PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: Box::new(root), + kind: FragmentKind::Merge, + keys: vec![], + ignore_exchange: false, + }); + } + + Ok(PhysicalPlan::ReclusterSink(Box::new(ReclusterSink { + input: Box::new(root), + table_info, + catalog_info, + snapshot, + remained_blocks, + removed_segment_indexes, + removed_segment_summary, + }))) +} diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 04493697e4057..3df696193acc7 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -100,6 +100,8 @@ use common_sql::executor::PhysicalPlan; use common_sql::executor::Project; use common_sql::executor::ProjectSet; use common_sql::executor::RangeJoin; +use common_sql::executor::ReclusterSink; +use common_sql::executor::ReclusterSource; use common_sql::executor::ReplaceInto; use common_sql::executor::RowFetch; use common_sql::executor::RuntimeFilterSource; @@ -288,6 +290,12 @@ impl PipelineBuilder { PhysicalPlan::MergeIntoSource(merge_into_source) => { self.build_merge_into_source(merge_into_source) } + PhysicalPlan::ReclusterSource(recluster_source) => { + self.build_recluster_source(recluster_source) + } + PhysicalPlan::ReclusterSink(recluster_sink) => { + self.build_recluster_sink(recluster_sink) + } } } @@ -901,6 +909,43 @@ impl PipelineBuilder { Ok(()) } + fn build_recluster_source(&mut self, recluster_source: &ReclusterSource) -> Result<()> { + match recluster_source.tasks.len() { + 0 => self.main_pipeline.add_source(EmptySource::create, 1), + 1 => { + let table = self.ctx.build_table_by_table_info( + &recluster_source.catalog_info, + &recluster_source.table_info, + None, + )?; + let table = FuseTable::try_from_table(table.as_ref())?; + + table.build_recluster_source( + self.ctx.clone(), + recluster_source.tasks[0].clone(), + recluster_source.catalog_info.clone(), + &mut self.main_pipeline, + ) + } + _ => Err(ErrorCode::Internal( + "A node can only execute one recluster task".to_string(), + )), + } + } + + fn build_recluster_sink(&mut self, recluster_sink: &ReclusterSink) -> Result<()> { + self.build_pipeline(&recluster_sink.input)?; + + let table = self.ctx.build_table_by_table_info( + &recluster_sink.catalog_info, + &recluster_sink.table_info, + None, + )?; + let table = FuseTable::try_from_table(table.as_ref())?; + + table.build_recluster_sink(self.ctx.clone(), recluster_sink, &mut self.main_pipeline) + } + fn build_compact_source(&mut self, compact_block: &CompactSource) -> Result<()> { let table = self.ctx.build_table_by_table_info( &compact_block.catalog_info, @@ -1029,8 +1074,7 @@ impl PipelineBuilder { plan.snapshot.clone(), plan.mutation_kind, plan.merge_meta, - )?; - Ok(()) + ) } fn build_range_join(&mut self, range_join: &RangeJoin) -> Result<()> { diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index 06c2b0fd4d9be..c1e5e9a091732 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -58,6 +58,7 @@ enum State { DeleteLeaf, ReplaceInto, Compact, + Recluster, Other, } @@ -185,6 +186,15 @@ impl PhysicalPlanReplacer for Fragmenter { } } + fn replace_recluster_source( + &mut self, + plan: &common_sql::executor::ReclusterSource, + ) -> Result { + self.state = State::Recluster; + + Ok(PhysicalPlan::ReclusterSource(Box::new(plan.clone()))) + } + fn replace_compact_source( &mut self, plan: &common_sql::executor::CompactSource, @@ -265,6 +275,7 @@ impl PhysicalPlanReplacer for Fragmenter { State::Other => FragmentType::Intermediate, State::ReplaceInto => FragmentType::ReplaceInto, State::Compact => FragmentType::Compact, + State::Recluster => FragmentType::Recluster, }; self.state = State::Other; let exchange = Self::get_exchange( diff --git a/src/query/service/src/schedulers/fragments/plan_fragment.rs b/src/query/service/src/schedulers/fragments/plan_fragment.rs index 5210c8eedae37..686392a2e958b 100644 --- a/src/query/service/src/schedulers/fragments/plan_fragment.rs +++ b/src/query/service/src/schedulers/fragments/plan_fragment.rs @@ -26,6 +26,8 @@ use common_sql::executor::CopyIntoTableSource; use common_sql::executor::Deduplicate; use common_sql::executor::DeleteSource; use common_sql::executor::QuerySource; +use common_sql::executor::ReclusterSource; +use common_sql::executor::ReclusterTask; use common_sql::executor::ReplaceInto; use common_storages_fuse::TableContext; use storages_common_table_meta::meta::BlockSlotDescription; @@ -60,6 +62,7 @@ pub enum FragmentType { /// Intermediate fragment of a replace into plan, which contains a `ReplaceInto` operator. ReplaceInto, Compact, + Recluster, } #[derive(Clone)] @@ -93,10 +96,6 @@ impl PlanFragment { self.plan.clone(), ); fragment_actions.add_action(action); - if let Some(ref exchange) = self.exchange { - fragment_actions.set_exchange(exchange.clone()); - } - actions.add_fragment_actions(fragment_actions)?; } FragmentType::Intermediate => { if self @@ -118,48 +117,38 @@ impl PlanFragment { fragment_actions.add_action(action); } } - if let Some(ref exchange) = self.exchange { - fragment_actions.set_exchange(exchange.clone()); - } - actions.add_fragment_actions(fragment_actions)?; } FragmentType::Source => { // Redistribute partitions - let mut fragment_actions = self.redistribute_source_fragment(ctx)?; - if let Some(ref exchange) = self.exchange { - fragment_actions.set_exchange(exchange.clone()); - } - actions.add_fragment_actions(fragment_actions)?; + self.redistribute_source_fragment(ctx, &mut fragment_actions)?; } FragmentType::DeleteLeaf => { - let mut fragment_actions = self.redistribute_delete_leaf(ctx)?; - if let Some(ref exchange) = self.exchange { - fragment_actions.set_exchange(exchange.clone()); - } - actions.add_fragment_actions(fragment_actions)?; + self.redistribute_delete_leaf(ctx, &mut fragment_actions)?; } FragmentType::ReplaceInto => { // Redistribute partitions - let mut fragment_actions = self.redistribute_replace_into(ctx)?; - if let Some(ref exchange) = self.exchange { - fragment_actions.set_exchange(exchange.clone()); - } - actions.add_fragment_actions(fragment_actions)?; + self.redistribute_replace_into(ctx, &mut fragment_actions)?; } FragmentType::Compact => { - let mut fragment_actions = self.redistribute_compact(ctx)?; - if let Some(ref exchange) = self.exchange { - fragment_actions.set_exchange(exchange.clone()); - } - actions.add_fragment_actions(fragment_actions)?; + self.redistribute_compact(ctx, &mut fragment_actions)?; + } + FragmentType::Recluster => { + self.redistribute_recluster(ctx, &mut fragment_actions)?; } } - Ok(()) + if let Some(ref exchange) = self.exchange { + fragment_actions.set_exchange(exchange.clone()); + } + actions.add_fragment_actions(fragment_actions) } /// Redistribute partitions of current source fragment to executors. - fn redistribute_source_fragment(&self, ctx: Arc) -> Result { + fn redistribute_source_fragment( + &self, + ctx: Arc, + fragment_actions: &mut QueryFragmentActions, + ) -> Result<()> { if self.fragment_type != FragmentType::Source { return Err(ErrorCode::Internal( "Cannot redistribute a non-source fragment".to_string(), @@ -170,8 +159,6 @@ impl PlanFragment { let executors = Fragmenter::get_executors(ctx); // Redistribute partitions of ReadDataSourcePlan. - let mut fragment_actions = QueryFragmentActions::create(self.fragment_id); - let partitions = &read_source.parts; let partition_reshuffle = partitions.reshuffle(executors)?; @@ -190,10 +177,14 @@ impl PlanFragment { .add_action(QueryFragmentAction::create(executor.clone(), plan.clone())); } - Ok(fragment_actions) + Ok(()) } - fn redistribute_delete_leaf(&self, ctx: Arc) -> Result { + fn redistribute_delete_leaf( + &self, + ctx: Arc, + fragment_actions: &mut QueryFragmentActions, + ) -> Result<()> { let plan = match &self.plan { PhysicalPlan::ExchangeSink(plan) => plan, _ => unreachable!("logic error"), @@ -205,7 +196,6 @@ impl PlanFragment { let partitions: &Partitions = &plan.parts; let executors = Fragmenter::get_executors(ctx); - let mut fragment_actions = QueryFragmentActions::create(self.fragment_id); let partition_reshuffle = partitions.reshuffle(executors)?; @@ -218,10 +208,14 @@ impl PlanFragment { fragment_actions.add_action(QueryFragmentAction::create(executor, plan)); } - Ok(fragment_actions) + Ok(()) } - fn redistribute_replace_into(&self, ctx: Arc) -> Result { + fn redistribute_replace_into( + &self, + ctx: Arc, + fragment_actions: &mut QueryFragmentActions, + ) -> Result<()> { let plan = match &self.plan { PhysicalPlan::ExchangeSink(plan) => plan, _ => unreachable!("logic error"), @@ -232,7 +226,6 @@ impl PlanFragment { }; let partitions = &plan.segments; let executors = Fragmenter::get_executors(ctx.clone()); - let mut fragment_actions = QueryFragmentActions::create(self.fragment_id); let local_id = ctx.get_cluster().local_id.clone(); match ctx.get_settings().get_replace_into_shuffle_strategy()? { ReplaceIntoShuffleStrategy::SegmentLevelShuffling => { @@ -272,10 +265,14 @@ impl PlanFragment { } } } - Ok(fragment_actions) + Ok(()) } - fn redistribute_compact(&self, ctx: Arc) -> Result { + fn redistribute_compact( + &self, + ctx: Arc, + fragment_actions: &mut QueryFragmentActions, + ) -> Result<()> { let exchange_sink = match &self.plan { PhysicalPlan::ExchangeSink(plan) => plan, _ => unreachable!("logic error"), @@ -287,7 +284,6 @@ impl PlanFragment { let partitions: &Partitions = &compact_block.parts; let executors = Fragmenter::get_executors(ctx); - let mut fragment_actions = QueryFragmentActions::create(self.fragment_id); let partition_reshuffle = partitions.reshuffle(executors)?; @@ -300,7 +296,42 @@ impl PlanFragment { fragment_actions.add_action(QueryFragmentAction::create(executor, plan)); } - Ok(fragment_actions) + Ok(()) + } + + fn redistribute_recluster( + &self, + ctx: Arc, + fragment_actions: &mut QueryFragmentActions, + ) -> Result<()> { + let exchange_sink = match &self.plan { + PhysicalPlan::ExchangeSink(plan) => plan, + _ => unreachable!("logic error"), + }; + let recluster = match exchange_sink.input.as_ref() { + PhysicalPlan::ReclusterSource(plan) => plan, + _ => unreachable!("logic error"), + }; + + let tasks = recluster.tasks.clone(); + let executors = Fragmenter::get_executors(ctx); + if tasks.len() > executors.len() { + return Err(ErrorCode::Internal(format!( + "Cannot recluster {} tasks to {} executors", + tasks.len(), + executors.len() + ))); + } + + let task_reshuffle = Self::reshuffle(executors, tasks)?; + for (executor, tasks) in task_reshuffle.into_iter() { + let mut plan = self.plan.clone(); + let mut replace_recluster = ReplaceReclusterSource { tasks }; + plan = replace_recluster.replace(&plan)?; + fragment_actions.add_action(QueryFragmentAction::create(executor, plan)); + } + + Ok(()) } fn reshuffle( @@ -422,6 +453,19 @@ impl PhysicalPlanReplacer for ReplaceReadSource { } } +struct ReplaceReclusterSource { + pub tasks: Vec, +} + +impl PhysicalPlanReplacer for ReplaceReclusterSource { + fn replace_recluster_source(&mut self, plan: &ReclusterSource) -> Result { + Ok(PhysicalPlan::ReclusterSource(Box::new(ReclusterSource { + tasks: self.tasks.clone(), + ..plan.clone() + }))) + } +} + struct ReplaceCompactBlock { pub partitions: Partitions, } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index a81a3527d96dc..a1b69278157ff 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use chrono::Utc; use common_base::base::tokio; -use common_base::runtime::execute_futures_in_parallel; use common_exception::ErrorCode; use common_exception::Result; use common_expression::BlockThresholds; @@ -26,26 +25,24 @@ use common_expression::DataBlock; use common_expression::Scalar; use common_expression::TableSchema; use common_expression::TableSchemaRef; -use common_storages_fuse::io::MetaWriter; use common_storages_fuse::io::SegmentWriter; use common_storages_fuse::io::TableMetaLocationGenerator; use common_storages_fuse::operations::ReclusterMutator; use common_storages_fuse::pruning::create_segment_location_vector; use common_storages_fuse::statistics::reducers::merge_statistics_mut; use common_storages_fuse::statistics::reducers::reduce_block_metas; -use common_storages_fuse::statistics::sort_by_cluster_stats; +use common_storages_fuse::FusePartInfo; use common_storages_fuse::FuseTable; use databend_query::sessions::TableContext; use databend_query::test_kits::table_test_fixture::TestFixture; -use itertools::Itertools; use rand::thread_rng; use rand::Rng; use storages_common_table_meta::meta; use storages_common_table_meta::meta::BlockMeta; use storages_common_table_meta::meta::ClusterStatistics; -use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::SegmentInfo; use storages_common_table_meta::meta::Statistics; +use storages_common_table_meta::meta::TableSnapshot; use storages_common_table_meta::meta::Versioned; use uuid::Uuid; @@ -112,6 +109,7 @@ async fn test_recluster_mutator_block_select() -> Result<()> { test_segment_locations.push(segment_location); test_block_locations.push(block_location); + let schema = TableSchemaRef::new(TableSchema::empty()); let (segment_location, block_location) = gen_test_seg(Some(ClusterStatistics::new( cluster_key_id, vec![Scalar::from(4i64)], @@ -122,24 +120,34 @@ async fn test_recluster_mutator_block_select() -> Result<()> { .await?; test_segment_locations.push(segment_location); test_block_locations.push(block_location); + // unused snapshot. + let snapshot = TableSnapshot::new_empty_snapshot(schema.as_ref().clone()); - let schema = TableSchemaRef::new(TableSchema::empty()); let ctx: Arc = ctx.clone(); let segment_locations = create_segment_location_vector(test_segment_locations, None); let compact_segments = FuseTable::segment_pruning( &ctx, - schema, + schema.clone(), data_accessor.clone(), &None, segment_locations, ) .await?; - let mut mutator = - ReclusterMutator::try_create(ctx, 1.0, BlockThresholds::default(), cluster_key_id)?; + let mut mutator = ReclusterMutator::try_create( + ctx, + Arc::new(snapshot), + schema, + 1.0, + BlockThresholds::default(), + cluster_key_id, + 1, + )?; let need_recluster = mutator.target_select(compact_segments).await?; assert!(need_recluster); - assert_eq!(mutator.take_blocks().len(), 3); + assert_eq!(mutator.tasks.len(), 1); + let total_block_nums = mutator.tasks.iter().map(|t| t.parts.len()).sum::(); + assert_eq!(total_block_nums, 3); Ok(()) } @@ -150,6 +158,10 @@ async fn test_safety_for_recluster() -> Result<()> { let ctx = fixture.ctx(); let operator = ctx.get_data_operator()?.operator(); + let recluster_block_size = 300; + ctx.get_settings() + .set_recluster_block_size(recluster_block_size as u64)?; + let cluster_key_id = 0; let block_per_seg = 5; let threshold = BlockThresholds { @@ -166,6 +178,7 @@ async fn test_safety_for_recluster() -> Result<()> { for r in 1..10 { eprintln!("round {}", r); let number_of_segments: usize = rand.gen_range(1..10); + let max_tasks: usize = rand.gen_range(1..5); let mut block_number_of_segments = Vec::with_capacity(number_of_segments); let mut rows_per_blocks = Vec::with_capacity(number_of_segments); @@ -202,6 +215,18 @@ async fn test_safety_for_recluster() -> Result<()> { merge_statistics_mut(&mut summary, &seg.summary, Some(cluster_key_id)); } + let id = Uuid::new_v4(); + let snapshot = Arc::new(TableSnapshot::new( + id, + &None, + None, + schema.as_ref().clone(), + summary, + locations.clone(), + None, + None, + )); + let mut block_ids = HashSet::new(); for seg in &segment_infos { for b in &seg.blocks { @@ -221,8 +246,15 @@ async fn test_safety_for_recluster() -> Result<()> { .await?; let mut need_recluster = false; - let mut mutator = - ReclusterMutator::try_create(ctx.clone(), 1.0, threshold, cluster_key_id)?; + let mut mutator = ReclusterMutator::try_create( + ctx.clone(), + snapshot, + schema.clone(), + 1.0, + threshold, + cluster_key_id, + max_tasks, + )?; let selected_segs = ReclusterMutator::select_segments(&compact_segments, block_per_seg, 8, cluster_key_id)?; if selected_segs.is_empty() { @@ -250,7 +282,19 @@ async fn test_safety_for_recluster() -> Result<()> { eprintln!("need_recluster: {}", need_recluster); if need_recluster { - let mut blocks = mutator.take_blocks(); + let tasks = mutator.tasks; + assert!(tasks.len() <= max_tasks && !tasks.is_empty()); + eprintln!("tasks_num: {}, max_tasks: {}", tasks.len(), max_tasks); + let mut blocks = Vec::new(); + for task in tasks.into_iter() { + let parts = task.parts.partitions; + assert!(task.total_bytes <= recluster_block_size); + for part in parts.into_iter() { + let fuse_part = FusePartInfo::from_part(&part)?; + blocks.push(fuse_part.location.clone()); + } + } + let remained_blocks = std::mem::take(&mut mutator.remained_blocks); eprintln!( "selected segments number {}, selected blocks number {}, remained blocks number {}", @@ -258,62 +302,19 @@ async fn test_safety_for_recluster() -> Result<()> { blocks.len(), remained_blocks.len() ); - blocks.extend(remained_blocks); - - let mut block_ids_after_target = HashSet::new(); - for b in &blocks { - block_ids_after_target.insert(b.location.clone()); + for remain in remained_blocks { + blocks.push(remain.location.0.clone()); } + let block_ids_after_target = HashSet::from_iter(blocks.into_iter()); + let mut origin_blocks_ids = HashSet::new(); for idx in &mutator.removed_segment_indexes { for b in &segment_infos[*idx].blocks { - origin_blocks_ids.insert(b.location.clone()); + origin_blocks_ids.insert(b.location.0.clone()); } } assert_eq!(block_ids_after_target, origin_blocks_ids); - - // test recluster aggregator. - let mut new_segments = - generage_segments(ctx, blocks, threshold, cluster_key_id, block_per_seg).await?; - let new_segments_len = new_segments.len(); - let removed_segments_len = mutator.removed_segment_indexes.len(); - let replaced_segments_len = new_segments_len.min(removed_segments_len); - let mut merged_statistics = Statistics::default(); - let mut appended_segments = Vec::new(); - let mut replaced_segments = HashMap::with_capacity(replaced_segments_len); - - if new_segments_len > removed_segments_len { - let appended = new_segments.split_off(removed_segments_len); - for (location, stats) in appended.into_iter().rev() { - appended_segments.push(location); - merge_statistics_mut(&mut merged_statistics, &stats, Some(cluster_key_id)); - } - } - - let mut indices = Vec::with_capacity(removed_segments_len); - for (i, (location, stats)) in new_segments.into_iter().enumerate() { - let idx = mutator.removed_segment_indexes[i]; - indices.push(idx); - replaced_segments.insert(idx, location); - merge_statistics_mut(&mut merged_statistics, &stats, Some(cluster_key_id)); - } - - let removed_segment_indexes = - mutator.removed_segment_indexes[replaced_segments_len..].to_vec(); - eprintln!( - "append segments number {}, replaced segments number {}, removed segments number {}", - appended_segments.len(), - replaced_segments.len(), - removed_segment_indexes.len() - ); - indices.extend(removed_segment_indexes); - assert_eq!(indices, mutator.removed_segment_indexes); - assert_eq!(merged_statistics, mutator.removed_segment_summary); - assert_eq!( - new_segments_len, - appended_segments.len() + replaced_segments.len() - ); } } @@ -352,49 +353,3 @@ fn test_check_point() { let end = vec![0, 1]; assert!(!ReclusterMutator::check_point(&start, &end)); } - -async fn generage_segments( - ctx: Arc, - blocks: Vec>, - block_thresholds: BlockThresholds, - default_cluster_key: u32, - block_per_seg: usize, -) -> Result> { - let location_gen = TableMetaLocationGenerator::with_prefix("test/".to_owned()); - let data_accessor = ctx.get_data_operator()?.operator(); - - let mut merged_blocks = blocks; - // sort ascending. - merged_blocks.sort_by(|a, b| { - sort_by_cluster_stats(&a.cluster_stats, &b.cluster_stats, default_cluster_key) - }); - - let mut tasks = Vec::new(); - let segments_num = (merged_blocks.len() / block_per_seg).max(1); - let chunk_size = merged_blocks.len().div_ceil(segments_num); - for chunk in &merged_blocks.into_iter().chunks(chunk_size) { - let new_blocks = chunk.collect::>(); - let location_gen = location_gen.clone(); - let data_accessor = data_accessor.clone(); - tasks.push(async move { - let new_summary = - reduce_block_metas(&new_blocks, block_thresholds, Some(default_cluster_key)); - let segment_info = SegmentInfo::new(new_blocks, new_summary.clone()); - - let path = location_gen.gen_segment_info_location(); - segment_info.write_meta(&data_accessor, &path).await?; - Ok::<_, ErrorCode>(((path, SegmentInfo::VERSION), new_summary)) - }); - } - - let threads_nums = ctx.get_settings().get_max_threads()? as usize; - execute_futures_in_parallel( - tasks, - threads_nums, - threads_nums * 2, - "fuse-write-segments-worker".to_owned(), - ) - .await? - .into_iter() - .collect::>>() -} diff --git a/src/query/service/tests/it/storages/testdata/settings_table.txt b/src/query/service/tests/it/storages/testdata/settings_table.txt index 5cd4e1fc9aa19..6a73b1376dd19 100644 --- a/src/query/service/tests/it/storages/testdata/settings_table.txt +++ b/src/query/service/tests/it/storages/testdata/settings_table.txt @@ -12,6 +12,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System | 'enable_cbo' | '1' | '1' | 'SESSION' | 'Enables cost-based optimization.' | 'UInt64' | | 'enable_distributed_compact' | '0' | '0' | 'SESSION' | 'Enable distributed execution of table compaction.' | 'UInt64' | | 'enable_distributed_copy_into' | '0' | '0' | 'SESSION' | 'Enable distributed execution of copy into.' | 'UInt64' | +| 'enable_distributed_recluster' | '0' | '0' | 'SESSION' | 'Enable distributed execution of table recluster.' | 'UInt64' | | 'enable_distributed_replace_into' | '0' | '0' | 'SESSION' | 'Enable distributed execution of replace into.' | 'UInt64' | | 'enable_dphyp' | '1' | '1' | 'SESSION' | 'Enables dphyp join order algorithm.' | 'UInt64' | | 'enable_experimental_merge_into' | '0' | '0' | 'SESSION' | 'Enable unstable merge into.' | 'UInt64' | diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 393a556127353..1a75ea77de504 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -428,6 +428,12 @@ impl DefaultSettings { possible_values: None, display_in_show_settings: true, }), + ("enable_distributed_recluster", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Enable distributed execution of table recluster.", + possible_values: None, + display_in_show_settings: true, + }), ("enable_parquet_page_index", DefaultSettingValue { value: UserSettingValue::UInt64(1), desc: "Enables parquet page index", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 9e99f7b8ccd26..9634eefbfc872 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -365,10 +365,18 @@ impl Settings { self.try_get_u64("recluster_timeout_secs") } + pub fn set_recluster_block_size(&self, val: u64) -> Result<()> { + self.try_set_u64("recluster_block_size", val) + } + pub fn get_recluster_block_size(&self) -> Result { self.try_get_u64("recluster_block_size") } + pub fn get_enable_distributed_recluster(&self) -> Result { + Ok(self.try_get_u64("enable_distributed_recluster")? != 0) + } + pub fn get_enable_refresh_aggregating_index_after_write(&self) -> Result { Ok(self.try_get_u64("enable_refresh_aggregating_index_after_write")? != 0) } diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index e2def9fd756db..03419cdebf756 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -30,7 +30,6 @@ use crate::executor::physical_plans::physical_commit_sink::CommitSink; use crate::executor::physical_plans::physical_constant_table_scan::ConstantTableScan; use crate::executor::physical_plans::physical_copy_into::CopyIntoTablePhysicalPlan; use crate::executor::physical_plans::physical_cte_scan::CteScan; -use crate::executor::physical_plans::physical_delete_source::DeleteSource; use crate::executor::physical_plans::physical_distributed_insert_select::DistributedInsertSelect; use crate::executor::physical_plans::physical_eval_scalar::EvalScalar; use crate::executor::physical_plans::physical_exchange::Exchange; @@ -45,6 +44,7 @@ use crate::executor::physical_plans::physical_project::Project; use crate::executor::physical_plans::physical_project_set::ProjectSet; use crate::executor::physical_plans::physical_range_join::RangeJoin; use crate::executor::physical_plans::physical_range_join::RangeJoinType; +use crate::executor::physical_plans::physical_recluster_sink::ReclusterSink; use crate::executor::physical_plans::physical_row_fetch::RowFetch; use crate::executor::physical_plans::physical_runtime_filter_source::RuntimeFilterSource; use crate::executor::physical_plans::physical_sort::Sort; @@ -195,9 +195,9 @@ fn to_format_tree( PhysicalPlan::DistributedInsertSelect(plan) => { distributed_insert_to_format_tree(plan.as_ref(), metadata, profs) } - PhysicalPlan::DeleteSource(plan) => { - delete_source_to_format_tree(plan.as_ref(), metadata, profs) - } + PhysicalPlan::DeleteSource(_) => Ok(FormatTreeNode::new("DeleteSource".to_string())), + PhysicalPlan::ReclusterSource(_) => Ok(FormatTreeNode::new("ReclusterSource".to_string())), + PhysicalPlan::ReclusterSink(plan) => recluster_sink_to_format_tree(plan, metadata, profs), PhysicalPlan::CompactSource(_) => Ok(FormatTreeNode::new("CompactSource".to_string())), PhysicalPlan::CommitSink(plan) => commit_sink_to_format_tree(plan, metadata, profs), PhysicalPlan::ProjectSet(plan) => project_set_to_format_tree(plan, metadata, profs), @@ -1057,12 +1057,16 @@ fn distributed_insert_to_format_tree( )) } -fn delete_source_to_format_tree( - _plan: &DeleteSource, - _metadata: &Metadata, - _prof_span_set: &SharedProcessorProfiles, +fn recluster_sink_to_format_tree( + plan: &ReclusterSink, + metadata: &Metadata, + prof_span_set: &SharedProcessorProfiles, ) -> Result> { - Ok(FormatTreeNode::new("DeleteSource".to_string())) + let children = vec![to_format_tree(&plan.input, metadata, prof_span_set)?]; + Ok(FormatTreeNode::with_children( + "ReclusterSink".to_string(), + children, + )) } fn commit_sink_to_format_tree( diff --git a/src/query/sql/src/executor/mod.rs b/src/query/sql/src/executor/mod.rs index 91520a43a15df..4c94a316cf682 100644 --- a/src/query/sql/src/executor/mod.rs +++ b/src/query/sql/src/executor/mod.rs @@ -63,6 +63,9 @@ pub use physical_plans::physical_project_set::ProjectSet; pub use physical_plans::physical_range_join::RangeJoin; pub use physical_plans::physical_range_join::RangeJoinCondition; pub use physical_plans::physical_range_join::RangeJoinType; +pub use physical_plans::physical_recluster_sink::ReclusterSink; +pub use physical_plans::physical_recluster_source::ReclusterSource; +pub use physical_plans::physical_recluster_source::ReclusterTask; pub use physical_plans::physical_replace_into::ReplaceInto; pub use physical_plans::physical_row_fetch::RowFetch; pub use physical_plans::physical_runtime_filter_source::RuntimeFilterSource; diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 1bff9e6ac436e..91a52827060ec 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -43,6 +43,8 @@ use crate::executor::physical_plans::physical_merge_into::MergeIntoSource; use crate::executor::physical_plans::physical_project::Project; use crate::executor::physical_plans::physical_project_set::ProjectSet; use crate::executor::physical_plans::physical_range_join::RangeJoin; +use crate::executor::physical_plans::physical_recluster_sink::ReclusterSink; +use crate::executor::physical_plans::physical_recluster_source::ReclusterSource; use crate::executor::physical_plans::physical_replace_into::ReplaceInto; use crate::executor::physical_plans::physical_row_fetch::RowFetch; use crate::executor::physical_plans::physical_runtime_filter_source::RuntimeFilterSource; @@ -101,6 +103,10 @@ pub enum PhysicalPlan { /// Compact CompactSource(Box), CommitSink(Box), + + /// Recluster + ReclusterSource(Box), + ReclusterSink(Box), } impl PhysicalPlan { @@ -139,7 +145,9 @@ impl PhysicalPlan { | PhysicalPlan::AsyncSourcer(_) | PhysicalPlan::Deduplicate(_) | PhysicalPlan::ReplaceInto(_) - | PhysicalPlan::CompactSource(_) => { + | PhysicalPlan::CompactSource(_) + | PhysicalPlan::ReclusterSource(_) + | PhysicalPlan::ReclusterSink(_) => { unreachable!() } } @@ -164,11 +172,8 @@ impl PhysicalPlan { PhysicalPlan::ExchangeSource(plan) => plan.output_schema(), PhysicalPlan::ExchangeSink(plan) => plan.output_schema(), PhysicalPlan::UnionAll(plan) => plan.output_schema(), - PhysicalPlan::DistributedInsertSelect(plan) => plan.output_schema(), PhysicalPlan::ProjectSet(plan) => plan.output_schema(), PhysicalPlan::RuntimeFilterSource(plan) => plan.output_schema(), - PhysicalPlan::DeleteSource(plan) => plan.output_schema(), - PhysicalPlan::CommitSink(plan) => plan.output_schema(), PhysicalPlan::RangeJoin(plan) => plan.output_schema(), PhysicalPlan::CopyIntoTable(plan) => plan.output_schema(), PhysicalPlan::CteScan(plan) => plan.output_schema(), @@ -179,7 +184,12 @@ impl PhysicalPlan { | PhysicalPlan::MergeInto(_) | PhysicalPlan::Deduplicate(_) | PhysicalPlan::ReplaceInto(_) - | PhysicalPlan::CompactSource(_) => Ok(DataSchemaRef::default()), + | PhysicalPlan::CompactSource(_) + | PhysicalPlan::CommitSink(_) + | PhysicalPlan::DistributedInsertSelect(_) + | PhysicalPlan::DeleteSource(_) + | PhysicalPlan::ReclusterSource(_) + | PhysicalPlan::ReclusterSink(_) => Ok(DataSchemaRef::default()), } } @@ -218,6 +228,8 @@ impl PhysicalPlan { PhysicalPlan::CteScan(_) => "PhysicalCteScan".to_string(), PhysicalPlan::MaterializedCte(_) => "PhysicalMaterializedCte".to_string(), PhysicalPlan::ConstantTableScan(_) => "PhysicalConstantTableScan".to_string(), + PhysicalPlan::ReclusterSource(_) => "ReclusterSource".to_string(), + PhysicalPlan::ReclusterSink(_) => "ReclusterSink".to_string(), } } @@ -225,7 +237,13 @@ impl PhysicalPlan { match self { PhysicalPlan::TableScan(_) | PhysicalPlan::CteScan(_) - | PhysicalPlan::ConstantTableScan(_) => Box::new(std::iter::empty()), + | PhysicalPlan::ConstantTableScan(_) + | PhysicalPlan::ExchangeSource(_) + | PhysicalPlan::CompactSource(_) + | PhysicalPlan::DeleteSource(_) + | PhysicalPlan::CopyIntoTable(_) + | PhysicalPlan::AsyncSourcer(_) + | PhysicalPlan::ReclusterSource(_) => Box::new(std::iter::empty()), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::Project(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::EvalScalar(plan) => Box::new(std::iter::once(plan.input.as_ref())), @@ -241,7 +259,6 @@ impl PhysicalPlan { std::iter::once(plan.probe.as_ref()).chain(std::iter::once(plan.build.as_ref())), ), PhysicalPlan::Exchange(plan) => Box::new(std::iter::once(plan.input.as_ref())), - PhysicalPlan::ExchangeSource(_) => Box::new(std::iter::empty()), PhysicalPlan::ExchangeSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::UnionAll(plan) => Box::new( std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), @@ -249,8 +266,6 @@ impl PhysicalPlan { PhysicalPlan::DistributedInsertSelect(plan) => { Box::new(std::iter::once(plan.input.as_ref())) } - PhysicalPlan::CompactSource(_) => Box::new(std::iter::empty()), - PhysicalPlan::DeleteSource(_plan) => Box::new(std::iter::empty()), PhysicalPlan::CommitSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::ProjectSet(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::RuntimeFilterSource(plan) => Box::new( @@ -260,8 +275,6 @@ impl PhysicalPlan { PhysicalPlan::RangeJoin(plan) => Box::new( std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), ), - PhysicalPlan::CopyIntoTable(_) => Box::new(std::iter::empty()), - PhysicalPlan::AsyncSourcer(_) => Box::new(std::iter::empty()), PhysicalPlan::Deduplicate(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::ReplaceInto(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::MergeInto(plan) => Box::new(std::iter::once(plan.input.as_ref())), @@ -269,6 +282,7 @@ impl PhysicalPlan { PhysicalPlan::MaterializedCte(plan) => Box::new( std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), ), + PhysicalPlan::ReclusterSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), } } @@ -307,7 +321,9 @@ impl PhysicalPlan { | PhysicalPlan::MergeInto(_) | PhysicalPlan::MergeIntoSource(_) | PhysicalPlan::ConstantTableScan(_) - | PhysicalPlan::CteScan(_) => None, + | PhysicalPlan::CteScan(_) + | PhysicalPlan::ReclusterSource(_) + | PhysicalPlan::ReclusterSink(_) => None, } } diff --git a/src/query/sql/src/executor/physical_plan_display.rs b/src/query/sql/src/executor/physical_plan_display.rs index 14c5fcb76a0a8..33675f2621ca9 100644 --- a/src/query/sql/src/executor/physical_plan_display.rs +++ b/src/query/sql/src/executor/physical_plan_display.rs @@ -44,6 +44,8 @@ use crate::executor::physical_plans::physical_merge_into::MergeIntoSource; use crate::executor::physical_plans::physical_project::Project; use crate::executor::physical_plans::physical_project_set::ProjectSet; use crate::executor::physical_plans::physical_range_join::RangeJoin; +use crate::executor::physical_plans::physical_recluster_sink::ReclusterSink; +use crate::executor::physical_plans::physical_recluster_source::ReclusterSource; use crate::executor::physical_plans::physical_replace_into::ReplaceInto; use crate::executor::physical_plans::physical_row_fetch::RowFetch; use crate::executor::physical_plans::physical_runtime_filter_source::RuntimeFilterSource; @@ -103,6 +105,8 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> { PhysicalPlan::CteScan(cte_scan) => write!(f, "{}", cte_scan)?, PhysicalPlan::MaterializedCte(plan) => write!(f, "{}", plan)?, PhysicalPlan::ConstantTableScan(scan) => write!(f, "{}", scan)?, + PhysicalPlan::ReclusterSource(plan) => write!(f, "{}", plan)?, + PhysicalPlan::ReclusterSink(plan) => write!(f, "{}", plan)?, } for node in self.node.children() { @@ -494,3 +498,15 @@ impl Display for Lambda { write!(f, "Lambda functions: {}", scalars.join(", ")) } } + +impl Display for ReclusterSource { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "ReclusterSource") + } +} + +impl Display for ReclusterSink { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "ReclusterSink") + } +} diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 0dbab7a395205..2e55583b573b6 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -42,6 +42,8 @@ use crate::executor::physical_plans::physical_merge_into::MergeIntoSource; use crate::executor::physical_plans::physical_project::Project; use crate::executor::physical_plans::physical_project_set::ProjectSet; use crate::executor::physical_plans::physical_range_join::RangeJoin; +use crate::executor::physical_plans::physical_recluster_sink::ReclusterSink; +use crate::executor::physical_plans::physical_recluster_source::ReclusterSource; use crate::executor::physical_plans::physical_replace_into::ReplaceInto; use crate::executor::physical_plans::physical_row_fetch::RowFetch; use crate::executor::physical_plans::physical_runtime_filter_source::RuntimeFilterSource; @@ -87,9 +89,23 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::MergeIntoSource(plan) => self.replace_merge_into_source(plan), PhysicalPlan::MaterializedCte(plan) => self.replace_materialized_cte(plan), PhysicalPlan::ConstantTableScan(plan) => self.replace_constant_table_scan(plan), + PhysicalPlan::ReclusterSource(plan) => self.replace_recluster_source(plan), + PhysicalPlan::ReclusterSink(plan) => self.replace_recluster_sink(plan), } } + fn replace_recluster_source(&mut self, plan: &ReclusterSource) -> Result { + Ok(PhysicalPlan::ReclusterSource(Box::new(plan.clone()))) + } + + fn replace_recluster_sink(&mut self, plan: &ReclusterSink) -> Result { + let input = self.replace(&plan.input)?; + Ok(PhysicalPlan::ReclusterSink(Box::new(ReclusterSink { + input: Box::new(input), + ..plan.clone() + }))) + } + fn replace_table_scan(&mut self, plan: &TableScan) -> Result { Ok(PhysicalPlan::TableScan(plan.clone())) } @@ -472,7 +488,11 @@ impl PhysicalPlan { PhysicalPlan::TableScan(_) | PhysicalPlan::AsyncSourcer(_) | PhysicalPlan::CteScan(_) - | PhysicalPlan::ConstantTableScan(_) => {} + | PhysicalPlan::ConstantTableScan(_) + | PhysicalPlan::ReclusterSource(_) + | PhysicalPlan::ExchangeSource(_) + | PhysicalPlan::CompactSource(_) + | PhysicalPlan::DeleteSource(_) => {} PhysicalPlan::Filter(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } @@ -510,7 +530,6 @@ impl PhysicalPlan { PhysicalPlan::Exchange(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } - PhysicalPlan::ExchangeSource(_) => {} PhysicalPlan::ExchangeSink(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } @@ -541,8 +560,9 @@ impl PhysicalPlan { Self::traverse(&plan.left, pre_visit, visit, post_visit); Self::traverse(&plan.right, pre_visit, visit, post_visit); } - PhysicalPlan::CompactSource(_) => {} - PhysicalPlan::DeleteSource(_) => {} + PhysicalPlan::ReclusterSink(plan) => { + Self::traverse(&plan.input, pre_visit, visit, post_visit); + } PhysicalPlan::CommitSink(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } diff --git a/src/query/sql/src/executor/physical_plans/mod.rs b/src/query/sql/src/executor/physical_plans/mod.rs index 7ccd857105cb4..5b0aaeb4e7a48 100644 --- a/src/query/sql/src/executor/physical_plans/mod.rs +++ b/src/query/sql/src/executor/physical_plans/mod.rs @@ -39,6 +39,8 @@ pub mod physical_merge_into; pub mod physical_project; pub mod physical_project_set; pub mod physical_range_join; +pub mod physical_recluster_sink; +pub mod physical_recluster_source; pub mod physical_refresh_index; pub mod physical_replace_into; pub mod physical_row_fetch; diff --git a/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs b/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs index ec64b41873d0e..4d8f2a2b4e3c1 100644 --- a/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs +++ b/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs @@ -14,8 +14,6 @@ use std::sync::Arc; -use common_exception::Result; -use common_expression::DataSchemaRef; use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::TableInfo; use storages_common_table_meta::meta::TableSnapshot; @@ -33,9 +31,3 @@ pub struct CommitSink { pub mutation_kind: MutationKind, pub merge_meta: bool, } - -impl CommitSink { - pub fn output_schema(&self) -> Result { - Ok(DataSchemaRef::default()) - } -} diff --git a/src/query/sql/src/executor/physical_plans/physical_delete_source.rs b/src/query/sql/src/executor/physical_plans/physical_delete_source.rs index a14e49783cb11..f5248f481a34a 100644 --- a/src/query/sql/src/executor/physical_plans/physical_delete_source.rs +++ b/src/query/sql/src/executor/physical_plans/physical_delete_source.rs @@ -16,8 +16,6 @@ use std::sync::Arc; use common_catalog::plan::Filters; use common_catalog::plan::Partitions; -use common_exception::Result; -use common_expression::DataSchemaRef; use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::TableInfo; use storages_common_table_meta::meta::TableSnapshot; @@ -32,9 +30,3 @@ pub struct DeleteSource { pub query_row_id_col: bool, pub snapshot: Arc, } - -impl DeleteSource { - pub fn output_schema(&self) -> Result { - Ok(DataSchemaRef::default()) - } -} diff --git a/src/query/sql/src/executor/physical_plans/physical_distributed_insert_select.rs b/src/query/sql/src/executor/physical_plans/physical_distributed_insert_select.rs index d0b894ba759b8..d7156d3754940 100644 --- a/src/query/sql/src/executor/physical_plans/physical_distributed_insert_select.rs +++ b/src/query/sql/src/executor/physical_plans/physical_distributed_insert_select.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_exception::Result; use common_expression::DataSchemaRef; use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::TableInfo; @@ -33,9 +32,3 @@ pub struct DistributedInsertSelect { pub select_column_bindings: Vec, pub cast_needed: bool, } - -impl DistributedInsertSelect { - pub fn output_schema(&self) -> Result { - Ok(DataSchemaRef::default()) - } -} diff --git a/src/query/sql/src/executor/physical_plans/physical_recluster_sink.rs b/src/query/sql/src/executor/physical_plans/physical_recluster_sink.rs new file mode 100644 index 0000000000000..4d9da230bffd6 --- /dev/null +++ b/src/query/sql/src/executor/physical_plans/physical_recluster_sink.rs @@ -0,0 +1,36 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_meta_app::schema::CatalogInfo; +use common_meta_app::schema::TableInfo; +use storages_common_table_meta::meta::BlockMeta; +use storages_common_table_meta::meta::Statistics; +use storages_common_table_meta::meta::TableSnapshot; + +use crate::executor::PhysicalPlan; + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct ReclusterSink { + pub input: Box, + + pub table_info: TableInfo, + pub catalog_info: CatalogInfo, + + pub snapshot: Arc, + pub remained_blocks: Vec>, + pub removed_segment_indexes: Vec, + pub removed_segment_summary: Statistics, +} diff --git a/src/query/sql/src/executor/physical_plans/physical_recluster_source.rs b/src/query/sql/src/executor/physical_plans/physical_recluster_source.rs new file mode 100644 index 0000000000000..6c85a042bbdc7 --- /dev/null +++ b/src/query/sql/src/executor/physical_plans/physical_recluster_source.rs @@ -0,0 +1,34 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_catalog::plan::PartStatistics; +use common_catalog::plan::Partitions; +use common_meta_app::schema::CatalogInfo; +use common_meta_app::schema::TableInfo; + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct ReclusterTask { + pub parts: Partitions, + pub stats: PartStatistics, + pub total_rows: usize, + pub total_bytes: usize, + pub level: i32, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct ReclusterSource { + pub tasks: Vec, + pub table_info: TableInfo, + pub catalog_info: CatalogInfo, +} diff --git a/src/query/sql/src/executor/profile.rs b/src/query/sql/src/executor/profile.rs index e00eb9f7aedf2..f58d3e90460ed 100644 --- a/src/query/sql/src/executor/profile.rs +++ b/src/query/sql/src/executor/profile.rs @@ -513,7 +513,9 @@ fn flatten_plan_node_profile( | PhysicalPlan::MergeIntoSource(_) | PhysicalPlan::Deduplicate(_) | PhysicalPlan::ReplaceInto(_) - | PhysicalPlan::CompactSource(_) => unreachable!(), + | PhysicalPlan::CompactSource(_) + | PhysicalPlan::ReclusterSource(_) + | PhysicalPlan::ReclusterSink(_) => unreachable!(), } Ok(()) diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index b4debd50ba80d..5ba2644bcce3e 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -699,17 +699,6 @@ impl Table for FuseTable { self.do_compact_blocks(ctx, limit).await } - #[async_backtrace::framed] - async fn recluster( - &self, - ctx: Arc, - push_downs: Option, - limit: Option, - pipeline: &mut Pipeline, - ) -> Result { - self.do_recluster(ctx, push_downs, limit, pipeline).await - } - #[async_backtrace::framed] async fn revert_to( &self, diff --git a/src/query/storages/fuse/src/metrics/fuse_metrics.rs b/src/query/storages/fuse/src/metrics/fuse_metrics.rs index 2ff3c95112319..eb7feae32be69 100644 --- a/src/query/storages/fuse/src/metrics/fuse_metrics.rs +++ b/src/query/storages/fuse/src/metrics/fuse_metrics.rs @@ -63,6 +63,14 @@ lazy_static! { register_histogram_in_milliseconds(key!("compact_block_read_milliseconds")); static ref COMPACT_BLOCK_BUILD_TASK_MILLISECONDS: Histogram = register_histogram_in_milliseconds(key!("compact_block_build_task_milliseconds")); + static ref RECLUSTER_BLOCK_NUMS_TO_READ: Counter = + register_counter(key!("recluster_block_nums_to_read")); + static ref RECLUSTER_BLOCK_BYTES_TO_READ: Counter = + register_counter(key!("recluster_block_bytes_to_read")); + static ref RECLUSTER_ROW_NUMS_TO_READ: Counter = + register_counter(key!("recluster_row_nums_to_read")); + static ref RECLUSTER_WRITE_BLOCK_NUMS: Counter = + register_counter(key!("recluster_write_block_nums")); static ref SEGMENTS_RANGE_PRUNING_BEFORE: Counter = register_counter(key!("segments_range_pruning_before")); static ref SEGMENTS_RANGE_PRUNING_AFTER: Counter = @@ -413,3 +421,19 @@ pub fn metrics_inc_replace_deleted_blocks_rows(c: u64) { pub fn metrics_inc_replace_append_blocks_rows(c: u64) { REPLACE_INTO_APPEND_BLOCKS_ROWS.inc_by(c); } + +pub fn metrics_inc_recluster_block_nums_to_read(c: u64) { + RECLUSTER_BLOCK_NUMS_TO_READ.inc_by(c); +} + +pub fn metrics_inc_recluster_block_bytes_to_read(c: u64) { + RECLUSTER_BLOCK_BYTES_TO_READ.inc_by(c); +} + +pub fn metrics_inc_recluster_row_nums_to_read(c: u64) { + RECLUSTER_ROW_NUMS_TO_READ.inc_by(c); +} + +pub fn metrics_inc_recluster_write_block_nums() { + RECLUSTER_WRITE_BLOCK_NUMS.inc(); +} diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 552a543db49e6..04f018e3f66ed 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -358,7 +358,15 @@ where F: SnapshotGenerator + Send + 'static } } metrics_inc_commit_mutation_success(); - let duration = self.start_time.elapsed(); + { + let elapsed_time = self.start_time.elapsed().as_millis(); + let status = format!( + "commit mutation success after {} retries, which took {} ms", + self.retries, elapsed_time + ); + metrics_inc_commit_milliseconds(elapsed_time); + self.ctx.set_status_info(&status); + } if let Some(files) = &self.copied_files { metrics_inc_commit_copied_files(files.file_info.len() as u64); } @@ -368,7 +376,6 @@ where F: SnapshotGenerator + Send + 'static SegmentInfo::VERSION, ))?; } - metrics_inc_commit_milliseconds(duration.as_millis()); self.heartbeat.shutdown().await?; self.state = State::Finish; } diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index a05350ddf7d84..6c917f6eb8cd5 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -44,6 +44,7 @@ pub use mutation::BlockCompactMutator; pub use mutation::CompactPartInfo; pub use mutation::DeletedSegmentInfo; pub use mutation::Mutation; +pub use mutation::ReclusterAggregator; pub use mutation::ReclusterMutator; pub use mutation::SegmentCompactMutator; pub use mutation::SegmentCompactionState; diff --git a/src/query/storages/fuse/src/operations/mutation/recluster_aggregator.rs b/src/query/storages/fuse/src/operations/mutation/recluster_aggregator.rs index 857e7e15d0fc4..131a915b37ce1 100644 --- a/src/query/storages/fuse/src/operations/mutation/recluster_aggregator.rs +++ b/src/query/storages/fuse/src/operations/mutation/recluster_aggregator.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Instant; use common_base::runtime::execute_futures_in_parallel; +use common_catalog::table::Table; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; @@ -35,14 +36,17 @@ use storages_common_table_meta::meta::Versioned; use crate::io::SegmentsIO; use crate::io::SerializedSegment; use crate::io::TableMetaLocationGenerator; +use crate::metrics::metrics_inc_recluster_write_block_nums; use crate::operations::common::AbortOperation; use crate::operations::common::CommitMeta; use crate::operations::common::ConflictResolveContext; use crate::operations::common::SnapshotChanges; -use crate::operations::ReclusterMutator; use crate::statistics::reduce_block_metas; use crate::statistics::reducers::merge_statistics_mut; use crate::statistics::sort_by_cluster_stats; +use crate::FuseTable; +use crate::DEFAULT_BLOCK_PER_SEGMENT; +use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; pub struct ReclusterAggregator { ctx: Arc, @@ -73,6 +77,7 @@ impl AsyncAccumulatingTransform for ReclusterAggregator { self.merged_blocks.push(Arc::new(meta)); // Refresh status { + metrics_inc_recluster_write_block_nums(); let status = format!( "recluster: generate new blocks:{}, cost:{} sec", self.abort_operation.blocks.len(), @@ -138,21 +143,25 @@ impl AsyncAccumulatingTransform for ReclusterAggregator { impl ReclusterAggregator { pub fn new( - mutator: &ReclusterMutator, - dal: Operator, - location_gen: TableMetaLocationGenerator, - block_per_seg: usize, + table: &FuseTable, + ctx: Arc, + merged_blocks: Vec>, + removed_segment_indexes: Vec, + removed_statistics: Statistics, ) -> Self { + let block_per_seg = + table.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); + let default_cluster_key = table.cluster_key_meta.clone().unwrap().0; ReclusterAggregator { - ctx: mutator.ctx.clone(), - dal, - location_gen, - default_cluster_key: mutator.cluster_key_id, - block_thresholds: mutator.block_thresholds, + ctx, + dal: table.get_operator(), + location_gen: table.meta_location_generator().clone(), + default_cluster_key, + block_thresholds: table.get_block_thresholds(), block_per_seg, - merged_blocks: mutator.remained_blocks.clone(), - removed_segment_indexes: mutator.removed_segment_indexes.clone(), - removed_statistics: mutator.removed_segment_summary.clone(), + merged_blocks, + removed_segment_indexes, + removed_statistics, start_time: Instant::now(), abort_operation: AbortOperation::default(), } diff --git a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs index a6335d49f9e5f..543f5f264b2d8 100644 --- a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::cmp; -use std::cmp::Ordering; use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::HashSet; @@ -25,17 +24,23 @@ use common_exception::ErrorCode; use common_exception::Result; use common_expression::BlockThresholds; use common_expression::Scalar; +use common_expression::TableSchemaRef; +use common_sql::executor::ReclusterTask; +use common_storage::ColumnNodes; use indexmap::IndexSet; use itertools::Itertools; use log::error; use minitrace::future::FutureExt; use minitrace::Span; +use storages_common_pruner::BlockMetaIndex; use storages_common_table_meta::meta::BlockMeta; use storages_common_table_meta::meta::CompactSegmentInfo; use storages_common_table_meta::meta::Statistics; +use storages_common_table_meta::meta::TableSnapshot; use crate::statistics::reducers::merge_statistics_mut; use crate::table_functions::cmp_with_null; +use crate::FuseTable; use crate::SegmentLocation; #[derive(Clone)] @@ -44,12 +49,12 @@ pub struct ReclusterMutator { pub(crate) depth_threshold: f64, pub(crate) block_thresholds: BlockThresholds, pub(crate) cluster_key_id: u32, + pub(crate) schema: TableSchemaRef, + pub(crate) max_tasks: usize, - pub(crate) total_rows: usize, - pub(crate) total_bytes: usize, - pub(crate) level: i32, - - pub selected_blocks: Vec>, + pub snapshot: Arc, + pub tasks: Vec, + pub recluster_blocks_count: u64, pub remained_blocks: Vec>, pub removed_segment_indexes: Vec, pub removed_segment_summary: Statistics, @@ -58,29 +63,29 @@ pub struct ReclusterMutator { impl ReclusterMutator { pub fn try_create( ctx: Arc, + snapshot: Arc, + schema: TableSchemaRef, depth_threshold: f64, block_thresholds: BlockThresholds, cluster_key_id: u32, + max_tasks: usize, ) -> Result { Ok(Self { ctx, + schema, depth_threshold, block_thresholds, cluster_key_id, - selected_blocks: Vec::new(), + max_tasks, + snapshot, + tasks: Vec::new(), remained_blocks: Vec::new(), + recluster_blocks_count: 0, removed_segment_indexes: Vec::new(), removed_segment_summary: Statistics::default(), - total_rows: 0, - total_bytes: 0, - level: 0, }) } - pub fn take_blocks(&mut self) -> Vec> { - std::mem::take(&mut self.selected_blocks) - } - #[async_backtrace::framed] pub async fn target_select( &mut self, @@ -104,6 +109,15 @@ impl ReclusterMutator { let recluster_block_size = self.ctx.get_settings().get_recluster_block_size()? as usize; let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 40 / 100); + let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + let max_blocks_num = std::cmp::max( + memory_threshold / self.block_thresholds.max_bytes_per_block, + max_threads, + ) * self.max_tasks; + + let arrow_schema = self.schema.to_arrow(); + let column_nodes = ColumnNodes::new_from_schema(&arrow_schema, Some(&self.schema)); + let mut remained_blocks = Vec::new(); let mut selected = false; for (level, block_metas) in blocks_map.into_iter() { @@ -136,15 +150,21 @@ impl ReclusterMutator { .block_thresholds .check_for_recluster(total_rows as usize, total_bytes as usize) { - self.selected_blocks = block_metas; - self.total_rows = total_rows as usize; - self.total_bytes = total_bytes as usize; - self.level = level; + let block_metas: Vec<_> = + block_metas.into_iter().map(|meta| (None, meta)).collect(); + self.generate_task( + &block_metas, + &column_nodes, + total_rows as usize, + total_bytes as usize, + level, + ); selected = true; continue; } - let selected_idx = Self::fetch_max_depth(points_map, self.depth_threshold)?; + let selected_idx = + Self::fetch_max_depth(points_map, self.depth_threshold, max_blocks_num)?; if selected_idx.is_empty() { remained_blocks.extend(block_metas.into_iter()); continue; @@ -156,6 +176,9 @@ impl ReclusterMutator { .for_each(|v| remained_blocks.push(block_metas[*v].clone())); let mut over_memory = false; + let mut task_bytes = 0; + let mut task_rows = 0; + let mut selected_blocks = Vec::new(); for idx in selected_idx { let block_meta = block_metas[idx].clone(); if over_memory { @@ -163,19 +186,46 @@ impl ReclusterMutator { continue; } - let memory_usage = self.total_bytes + block_meta.block_size as usize; - if memory_usage > memory_threshold { - remained_blocks.push(block_meta); - over_memory = true; - continue; + let block_size = block_meta.block_size as usize; + let row_count = block_meta.row_count as usize; + if task_bytes + block_size > memory_threshold { + self.generate_task( + &selected_blocks, + &column_nodes, + task_rows, + task_bytes, + level, + ); + + task_rows = 0; + task_bytes = 0; + selected_blocks.clear(); + + if self.tasks.len() >= self.max_tasks { + remained_blocks.push(block_meta); + over_memory = true; + continue; + } } - self.total_rows += block_meta.row_count as usize; - self.total_bytes = memory_usage; - self.selected_blocks.push(block_meta); + task_rows += row_count; + task_bytes += block_size; + selected_blocks.push((None, block_meta)); + } + + // the remains. + match selected_blocks.len() { + 0 => (), + 1 => remained_blocks.push(selected_blocks[0].1.clone()), + _ => self.generate_task( + &selected_blocks, + &column_nodes, + task_rows, + task_bytes, + level, + ), } - self.level = level; selected = true; } @@ -193,6 +243,27 @@ impl ReclusterMutator { Ok(selected) } + fn generate_task( + &mut self, + block_metas: &[(Option, Arc)], + column_nodes: &ColumnNodes, + total_rows: usize, + total_bytes: usize, + level: i32, + ) { + let (stats, parts) = + FuseTable::to_partitions(Some(&self.schema), block_metas, column_nodes, None, None); + let task = ReclusterTask { + parts, + stats, + total_rows, + total_bytes, + level, + }; + self.tasks.push(task); + self.recluster_blocks_count += block_metas.len() as u64; + } + pub fn select_segments( compact_segments: &[(SegmentLocation, Arc)], block_per_seg: usize, @@ -229,9 +300,7 @@ impl ReclusterMutator { return Ok(indices); } - let mut selected_segs = ReclusterMutator::fetch_max_depth(points_map, 1.0)?; - selected_segs.truncate(max_len); - Ok(selected_segs) + ReclusterMutator::fetch_max_depth(points_map, 1.0, max_len) } pub fn segment_can_recluster( @@ -293,9 +362,10 @@ impl ReclusterMutator { fn fetch_max_depth( points_map: HashMap, (Vec, Vec)>, depth_threshold: f64, + max_len: usize, ) -> Result> { let mut max_depth = 0; - let mut max_points = Vec::new(); + let mut max_point = 0; let mut block_depths = Vec::new(); let mut point_overlaps: Vec> = Vec::new(); let mut unfinished_parts: HashMap = HashMap::new(); @@ -310,13 +380,9 @@ impl ReclusterMutator { unfinished_parts.len() + start.len() }; - match point_depth.cmp(&max_depth) { - Ordering::Greater => { - max_depth = point_depth; - max_points = vec![i]; - } - Ordering::Equal => max_points.push(i), - Ordering::Less => (), + if point_depth > max_depth { + max_depth = point_depth; + max_point = i; } unfinished_parts @@ -335,7 +401,6 @@ impl ReclusterMutator { } }); } - assert!(!max_points.is_empty()); if !unfinished_parts.is_empty() { error!("Recluster: unfinished_parts is not empty after calculate the blocks overlaps"); return Err(ErrorCode::Internal( @@ -351,21 +416,48 @@ impl ReclusterMutator { // find the max point, gather the indices. let mut selected_idx = IndexSet::new(); if average_depth > depth_threshold { - let mut point = max_points[0]; - point_overlaps[point].iter().for_each(|idx| { + point_overlaps[max_point].iter().for_each(|idx| { selected_idx.insert(*idx); }); - for next in max_points.into_iter().skip(1) { - point += 1; - if next != point { + + let mut left = max_point; + let mut right = max_point; + while selected_idx.len() < max_len { + let left_depth = if left > 0 { + point_overlaps[left - 1].len() as f64 + } else { + 0.0 + }; + + let right_depth = if right < point_overlaps.len() - 1 { + point_overlaps[right + 1].len() as f64 + } else { + 0.0 + }; + + let max_depth = left_depth.max(right_depth); + if max_depth <= depth_threshold { break; } - point_overlaps[next].iter().for_each(|idx| { - selected_idx.insert(*idx); - }); + if left_depth >= right_depth { + left -= 1; + let mut merged_idx = IndexSet::new(); + point_overlaps[left].iter().for_each(|idx| { + merged_idx.insert(*idx); + }); + merged_idx.extend(selected_idx); + selected_idx = merged_idx; + } else { + right += 1; + point_overlaps[right].iter().for_each(|idx| { + selected_idx.insert(*idx); + }); + } } } + + selected_idx.truncate(max_len); Ok(selected_idx) } diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 1442c8772b3ea..907eac48a8096 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use common_base::runtime::TrySpawn; use common_catalog::plan::DataSourceInfo; use common_catalog::plan::DataSourcePlan; -use common_catalog::plan::PruningStatistics; use common_catalog::plan::PushDownInfo; use common_catalog::table::Table; use common_catalog::table_context::TableContext; @@ -27,16 +26,21 @@ use common_expression::DataField; use common_expression::DataSchemaRefExt; use common_expression::SortColumnDescription; use common_expression::TableSchemaRef; +use common_meta_app::schema::CatalogInfo; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_transforms::processors::transforms::build_merge_sort_pipeline; use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; use common_sql::evaluator::CompoundBlockOperator; +use common_sql::executor::ReclusterSink; +use common_sql::executor::ReclusterTask; use common_sql::BloomIndexColumns; -use log::info; use log::warn; use opendal::Operator; use storages_common_table_meta::meta::CompactSegmentInfo; +use crate::metrics::metrics_inc_recluster_block_bytes_to_read; +use crate::metrics::metrics_inc_recluster_block_nums_to_read; +use crate::metrics::metrics_inc_recluster_row_nums_to_read; use crate::operations::common::CommitSink; use crate::operations::common::MutationGenerator; use crate::operations::common::TransformSerializeBlock; @@ -68,22 +72,21 @@ impl FuseTable { // │ ┌──────────────┐ // │ ┌───►│SerializeBlock├───┐ // │ │ └──────────────┘ │ - // │ │ ┌──────────────┐ │ ┌─────────┐ ┌────────────────┐ ┌─────────────────┐ ┌──────────┐ - // └───►│───►│SerializeBlock├───┤───►│Resize(1)├───►│SerializeSegment├────►│TableMutationAggr├────►│CommitSink│ - // │ └──────────────┘ │ └─────────┘ └────────────────┘ └─────────────────┘ └──────────┘ + // │ │ ┌──────────────┐ │ ┌─────────┐ ┌────────────────┐ ┌─────────────┐ ┌──────────┐ + // └───►│───►│SerializeBlock├───┤───►│Resize(1)├───►│SerializeSegment├────►│ReclusterAggr├────►│CommitSink│ + // │ └──────────────┘ │ └─────────┘ └────────────────┘ └─────────────┘ └──────────┘ // │ ┌──────────────┐ │ // └───►│SerializeBlock├───┘ // └──────────────┘ #[async_backtrace::framed] - pub(crate) async fn do_recluster( + pub async fn build_recluster_mutator( &self, ctx: Arc, push_downs: Option, limit: Option, - pipeline: &mut Pipeline, - ) -> Result { + ) -> Result> { if self.cluster_key_meta.is_none() { - return Ok(0); + return Ok(None); } let snapshot_opt = self.read_table_snapshot().await?; @@ -91,9 +94,17 @@ impl FuseTable { val } else { // no snapshot, no recluster. - return Ok(0); + return Ok(None); }; + let settings = ctx.get_settings(); + let mut max_tasks = 1; + let cluster = ctx.get_cluster(); + if !cluster.is_empty() && settings.get_enable_distributed_recluster()? { + max_tasks = cluster.nodes.len(); + } + + let schema = self.schema(); let default_cluster_key_id = self.cluster_key_meta.clone().unwrap().0; let block_thresholds = self.get_block_thresholds(); let block_per_seg = @@ -108,19 +119,21 @@ impl FuseTable { .min(64.0); let mut mutator = ReclusterMutator::try_create( ctx.clone(), + snapshot.clone(), + schema, threshold, block_thresholds, default_cluster_key_id, + max_tasks, )?; let segment_locations = snapshot.segments.clone(); let segment_locations = create_segment_location_vector(segment_locations, None); - let max_threads = ctx.get_settings().get_max_threads()? as usize; + let max_threads = settings.get_max_threads()? as usize; let limit = limit.unwrap_or(1000); - let mut need_recluster = false; - for chunk in segment_locations.chunks(limit) { + 'F: for chunk in segment_locations.chunks(limit) { // read segments. let compact_segments = Self::segment_pruning( &ctx, @@ -153,8 +166,7 @@ impl FuseTable { } if mutator.target_select(vec![compact_segment]).await? { - need_recluster = true; - break; + break 'F; } } } else { @@ -162,51 +174,32 @@ impl FuseTable { selected_segs.into_iter().for_each(|i| { selected_segments.push(compact_segments[i].clone()); }); - need_recluster = mutator.target_select(selected_segments).await?; - } - - if need_recluster { - break; + if mutator.target_select(selected_segments).await? { + break; + } } } - let block_metas: Vec<_> = mutator - .take_blocks() - .iter() - .map(|meta| (None, meta.clone())) - .collect(); - let block_count = block_metas.len(); - if block_count < 2 { - return Ok(0); - } - - // Status. - { - let status = format!( - "recluster: select block files: {}, total bytes: {}, total rows: {}", - block_count, mutator.total_bytes, mutator.total_rows, - ); - ctx.set_status_info(&status); - info!("{}", status); - } + Ok(Some(mutator)) + } - let (statistics, parts) = self.read_partitions_with_metas( - ctx.clone(), - self.table_info.schema(), - None, - &block_metas, - block_count, - PruningStatistics::default(), - )?; + pub fn build_recluster_source( + &self, + ctx: Arc, + task: ReclusterTask, + catalog_info: CatalogInfo, + pipeline: &mut Pipeline, + ) -> Result<()> { + let recluster_block_nums = task.parts.len(); + let block_thresholds = self.get_block_thresholds(); let table_info = self.get_table_info(); - let catalog_info = ctx.get_catalog(table_info.catalog()).await?.info(); - let description = statistics.get_description(&table_info.desc); + let description = task.stats.get_description(&table_info.desc); let plan = DataSourcePlan { catalog_info, source_info: DataSourceInfo::TableSource(table_info.clone()), output_schema: table_info.schema(), - parts, - statistics, + parts: task.parts, + statistics: task.stats, description, tbl_args: self.table_args(), push_downs: None, @@ -219,8 +212,14 @@ impl FuseTable { // ReadDataKind to avoid OOM. self.do_read_data(ctx.clone(), &plan, pipeline, false)?; + { + metrics_inc_recluster_block_nums_to_read(recluster_block_nums as u64); + metrics_inc_recluster_block_bytes_to_read(task.total_bytes as u64); + metrics_inc_recluster_row_nums_to_read(task.total_rows as u64); + } + let cluster_stats_gen = - self.get_cluster_stats_gen(ctx.clone(), mutator.level + 1, block_thresholds, None)?; + self.get_cluster_stats_gen(ctx.clone(), task.level + 1, block_thresholds, None)?; let operators = cluster_stats_gen.operators.clone(); if !operators.is_empty() { let num_input_columns = self.table_info.schema().fields().len(); @@ -238,12 +237,12 @@ impl FuseTable { // merge sort let block_num = std::cmp::max( - mutator.total_bytes * 80 / (block_thresholds.max_bytes_per_block * 100), + task.total_bytes * 80 / (block_thresholds.max_bytes_per_block * 100), 1, ); let final_block_size = std::cmp::min( // estimate block_size based on max_bytes_per_block. - mutator.total_rows / block_num, + task.total_rows / block_num, block_thresholds.max_rows_per_block, ); let partial_block_size = if pipeline.output_len() > 1 { @@ -278,8 +277,11 @@ impl FuseTable { None, )?; - let output_block_num = mutator.total_rows.div_ceil(final_block_size); - let max_threads = std::cmp::min(max_threads, output_block_num); + let output_block_num = task.total_rows.div_ceil(final_block_size); + let max_threads = std::cmp::min( + ctx.get_settings().get_max_threads()? as usize, + output_block_num, + ); pipeline.try_resize(max_threads)?; pipeline.add_transform(|transform_input_port, transform_output_port| { let proc = TransformSerializeBlock::try_create( @@ -290,22 +292,30 @@ impl FuseTable { cluster_stats_gen.clone(), )?; proc.into_processor() - })?; + }) + } + pub fn build_recluster_sink( + &self, + ctx: Arc, + plan: &ReclusterSink, + pipeline: &mut Pipeline, + ) -> Result<()> { pipeline.try_resize(1)?; pipeline.add_transform(|input, output| { let aggregator = ReclusterAggregator::new( - &mutator, - self.get_operator(), - self.meta_location_generator().clone(), - block_per_seg, + self, + ctx.clone(), + plan.remained_blocks.clone(), + plan.removed_segment_indexes.clone(), + plan.removed_segment_summary.clone(), ); Ok(ProcessorPtr::create(AsyncAccumulatingTransformer::create( input, output, aggregator, ))) })?; - let snapshot_gen = MutationGenerator::new(snapshot); + let snapshot_gen = MutationGenerator::new(plan.snapshot.clone()); pipeline.add_sink(|input| { CommitSink::try_create( self, @@ -317,8 +327,7 @@ impl FuseTable { true, None, ) - })?; - Ok(block_count as u64) + }) } pub async fn segment_pruning( diff --git a/tests/sqllogictests/suites/mode/cluster/distributed_recluster.sql b/tests/sqllogictests/suites/mode/cluster/distributed_recluster.sql new file mode 100644 index 0000000000000..508dfbeb42515 --- /dev/null +++ b/tests/sqllogictests/suites/mode/cluster/distributed_recluster.sql @@ -0,0 +1,34 @@ +statement ok +drop table if exists t_recluster + +statement ok +create table t_recluster (a int not null) cluster by(a) row_per_block=3 + +statement ok +set recluster_block_size = 30 + +statement ok +insert into t_recluster select 10-number from numbers(20) + +statement ok +insert into t_recluster select 10-number from numbers(20) + +statement ok +insert into t_recluster select 10-number from numbers(20) + +statement ok +set enable_distributed_recluster = 1 + +statement ok +alter table t_recluster recluster + +# query I +# select count()>1 from system.metrics where metric='fuse_recluster_block_nums_to_read_total' +# ---- +# 1 + +statement ok +set enable_distributed_recluster = 0 + +statement ok +drop table if exists t_recluster