From c00121f300732fb4ed49c976a881d8bc2e8defd0 Mon Sep 17 00:00:00 2001 From: zhyass <34016424+zhyass@users.noreply.github.com> Date: Wed, 11 Oct 2023 16:13:22 +0800 Subject: [PATCH] rename func name --- src/query/catalog/src/table.rs | 18 -- src/query/storages/fuse/src/fuse_table.rs | 11 - .../storages/fuse/src/operations/recluster.rs | 275 +----------------- 3 files changed, 8 insertions(+), 296 deletions(-) diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 5681e078decd0..aba8e8b38ba43 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -331,24 +331,6 @@ pub trait Table: Sync + Send { ))) } - // return the selected block num. - #[async_backtrace::framed] - async fn recluster( - &self, - ctx: Arc, - push_downs: Option, - limit: Option, - pipeline: &mut Pipeline, - ) -> Result { - let (_, _, _, _) = (ctx, push_downs, limit, pipeline); - - Err(ErrorCode::Unimplemented(format!( - "table {}, of engine type {}, does not support recluster", - self.name(), - self.get_table_info().engine(), - ))) - } - #[async_backtrace::framed] async fn revert_to( &self, diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 63c6b1dbbb84f..88103ed0d5db6 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -697,17 +697,6 @@ impl Table for FuseTable { self.do_compact_blocks(ctx, limit).await } - #[async_backtrace::framed] - async fn recluster( - &self, - ctx: Arc, - push_downs: Option, - limit: Option, - pipeline: &mut Pipeline, - ) -> Result { - todo!() - } - #[async_backtrace::framed] async fn revert_to( &self, diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index f885b79761c88..f16c0cff0ca48 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -56,12 +56,11 @@ use crate::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD; impl FuseTable { #[async_backtrace::framed] - pub(crate) async fn do_reclusters( + pub(crate) async fn build_recluster_mutator( &self, ctx: Arc, push_downs: Option, limit: Option, - max_tasks: usize, ) -> Result> { if self.cluster_key_meta.is_none() { return Ok(None); @@ -75,6 +74,12 @@ impl FuseTable { return Ok(None); }; + let mut nodes_num = 1; + let cluster = ctx.get_cluster(); + if !cluster.is_empty() { + nodes_num = cluster.nodes.len(); + } + let schema = self.schema(); let default_cluster_key_id = self.cluster_key_meta.clone().unwrap().0; let block_thresholds = self.get_block_thresholds(); @@ -94,7 +99,7 @@ impl FuseTable { threshold, block_thresholds, default_cluster_key_id, - max_tasks, + nodes_num, )?; let segment_locations = snapshot.segments.clone(); @@ -290,270 +295,6 @@ impl FuseTable { }) } - /// The flow of Pipeline is as follows: - // ┌──────────┐ ┌───────────────┐ ┌─────────┐ - // │FuseSource├────►│CompoundBlockOp├────►│SortMerge├────┐ - // └──────────┘ └───────────────┘ └─────────┘ │ - // ┌──────────┐ ┌───────────────┐ ┌─────────┐ │ ┌──────────────┐ ┌─────────┐ - // │FuseSource├────►│CompoundBlockOp├────►│SortMerge├────┤────►│MultiSortMerge├────►│Resize(N)├───┐ - // └──────────┘ └───────────────┘ └─────────┘ │ └──────────────┘ └─────────┘ │ - // ┌──────────┐ ┌───────────────┐ ┌─────────┐ │ │ - // │FuseSource├────►│CompoundBlockOp├────►│SortMerge├────┘ │ - // └──────────┘ └───────────────┘ └─────────┘ │ - // ┌──────────────────────────────────────────────────────────────────────────────────────────────┘ - // │ ┌──────────────┐ - // │ ┌───►│SerializeBlock├───┐ - // │ │ └──────────────┘ │ - // │ │ ┌──────────────┐ │ ┌─────────┐ ┌────────────────┐ ┌─────────────────┐ ┌──────────┐ - // └───►│───►│SerializeBlock├───┤───►│Resize(1)├───►│SerializeSegment├────►│TableMutationAggr├────►│CommitSink│ - // │ └──────────────┘ │ └─────────┘ └────────────────┘ └─────────────────┘ └──────────┘ - // │ ┌──────────────┐ │ - // └───►│SerializeBlock├───┘ - // └──────────────┘ - // #[async_backtrace::framed] - // pub(crate) async fn do_recluster( - // &self, - // ctx: Arc, - // push_downs: Option, - // limit: Option, - // pipeline: &mut Pipeline, - // ) -> Result { - // if self.cluster_key_meta.is_none() { - // return Ok(0); - // } - // - // let snapshot_opt = self.read_table_snapshot().await?; - // let snapshot = if let Some(val) = snapshot_opt { - // val - // } else { - // no snapshot, no recluster. - // return Ok(0); - // }; - // - // let default_cluster_key_id = self.cluster_key_meta.clone().unwrap().0; - // let block_thresholds = self.get_block_thresholds(); - // let block_per_seg = - // self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); - // let avg_depth_threshold = self.get_option( - // FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD, - // DEFAULT_AVG_DEPTH_THRESHOLD, - // ); - // let block_count = snapshot.summary.block_count; - // let threshold = (block_count as f64 * avg_depth_threshold) - // .max(1.0) - // .min(64.0); - // let mut mutator = ReclusterMutator::try_create( - // ctx.clone(), - // threshold, - // block_thresholds, - // default_cluster_key_id, - // )?; - // - // let segment_locations = snapshot.segments.clone(); - // let segment_locations = create_segment_location_vector(segment_locations, None); - // - // let max_threads = ctx.get_settings().get_max_threads()? as usize; - // let limit = limit.unwrap_or(1000); - // - // let mut need_recluster = false; - // for chunk in segment_locations.chunks(limit) { - // read segments. - // let compact_segments = Self::segment_pruning( - // &ctx, - // self.schema(), - // self.get_operator(), - // &push_downs, - // chunk.to_vec(), - // ) - // .await?; - // if compact_segments.is_empty() { - // continue; - // } - // - // select the segments with the highest depth. - // let selected_segs = ReclusterMutator::select_segments( - // &compact_segments, - // block_per_seg, - // max_threads * 2, - // default_cluster_key_id, - // )?; - // select the blocks with the highest depth. - // if selected_segs.is_empty() { - // for compact_segment in compact_segments.into_iter() { - // if !ReclusterMutator::segment_can_recluster( - // &compact_segment.1.summary, - // block_per_seg, - // default_cluster_key_id, - // ) { - // continue; - // } - // - // if mutator.target_select(vec![compact_segment]).await? { - // need_recluster = true; - // break; - // } - // } - // } else { - // let mut selected_segments = Vec::with_capacity(selected_segs.len()); - // selected_segs.into_iter().for_each(|i| { - // selected_segments.push(compact_segments[i].clone()); - // }); - // need_recluster = mutator.target_select(selected_segments).await?; - // } - // - // if need_recluster { - // break; - // } - // } - // - // let block_metas: Vec<_> = mutator - // .take_blocks() - // .iter() - // .map(|meta| (None, meta.clone())) - // .collect(); - // let block_count = block_metas.len(); - // if block_count < 2 { - // return Ok(0); - // } - // - // Status. - // { - // let status = format!( - // "recluster: select block files: {}, total bytes: {}, total rows: {}", - // block_count, mutator.total_bytes, mutator.total_rows, - // ); - // ctx.set_status_info(&status); - // info!("{}", status); - // } - // - // let (statistics, parts) = self.read_partitions_with_metas( - // ctx.clone(), - // self.table_info.schema(), - // None, - // &block_metas, - // block_count, - // PruningStatistics::default(), - // )?; - // let table_info = self.get_table_info(); - // let catalog_info = ctx.get_catalog(table_info.catalog()).await?.info(); - // let description = statistics.get_description(&table_info.desc); - // let plan = DataSourcePlan { - // catalog_info, - // source_info: DataSourceInfo::TableSource(table_info.clone()), - // output_schema: table_info.schema(), - // parts, - // statistics, - // description, - // tbl_args: self.table_args(), - // push_downs: None, - // query_internal_columns: false, - // data_mask_policy: None, - // }; - // - // ctx.set_partitions(plan.parts.clone())?; - // - // ReadDataKind to avoid OOM. - // self.do_read_data(ctx.clone(), &plan, pipeline)?; - // - // let cluster_stats_gen = - // self.get_cluster_stats_gen(ctx.clone(), mutator.level + 1, block_thresholds)?; - // let operators = cluster_stats_gen.operators.clone(); - // if !operators.is_empty() { - // let func_ctx2 = cluster_stats_gen.func_ctx.clone(); - // pipeline.add_transform(move |input, output| { - // Ok(ProcessorPtr::create(CompoundBlockOperator::create( - // input, - // output, - // func_ctx2.clone(), - // operators.clone(), - // ))) - // })?; - // } - // - // merge sort - // let block_num = mutator - // .total_bytes - // .div_ceil(block_thresholds.max_bytes_per_block); - // let final_block_size = std::cmp::min( - // estimate block_size based on max_bytes_per_block. - // mutator.total_rows / block_num, - // block_thresholds.max_rows_per_block, - // ); - // let partial_block_size = if pipeline.output_len() > 1 { - // std::cmp::min( - // final_block_size, - // ctx.get_settings().get_max_block_size()? as usize, - // ) - // } else { - // final_block_size - // }; - // construct output fields - // let output_fields: Vec = cluster_stats_gen.out_fields.clone(); - // let schema = DataSchemaRefExt::create(output_fields); - // let sort_descs: Vec = cluster_stats_gen - // .cluster_key_index - // .iter() - // .map(|offset| SortColumnDescription { - // offset: *offset, - // asc: true, - // nulls_first: false, - // is_nullable: false, // This information is not needed here. - // }) - // .collect(); - // - // build_merge_sort_pipeline( - // pipeline, - // schema, - // sort_descs, - // None, - // partial_block_size, - // final_block_size, - // None, - // )?; - // - // let output_block_num = mutator.total_rows.div_ceil(final_block_size); - // let max_threads = std::cmp::min(max_threads, output_block_num); - // pipeline.try_resize(max_threads)?; - // pipeline.add_transform(|transform_input_port, transform_output_port| { - // let proc = TransformSerializeBlock::try_create( - // ctx.clone(), - // transform_input_port, - // transform_output_port, - // self, - // cluster_stats_gen.clone(), - // )?; - // proc.into_processor() - // })?; - // - // pipeline.try_resize(1)?; - // pipeline.add_transform(|input, output| { - // let aggregator = ReclusterAggregator::new( - // &mutator, - // self.get_operator(), - // self.meta_location_generator().clone(), - // block_per_seg, - // ); - // Ok(ProcessorPtr::create(AsyncAccumulatingTransformer::create( - // input, output, aggregator, - // ))) - // })?; - // - // let snapshot_gen = MutationGenerator::new(snapshot); - // pipeline.add_sink(|input| { - // CommitSink::try_create( - // self, - // ctx.clone(), - // None, - // snapshot_gen.clone(), - // input, - // None, - // true, - // None, - // ) - // })?; - // Ok(block_count as u64) - // } - pub async fn segment_pruning( ctx: &Arc, schema: TableSchemaRef,