Skip to content

Commit

Permalink
rename func name
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Oct 11, 2023
1 parent a3cc89d commit c00121f
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 296 deletions.
18 changes: 0 additions & 18 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,24 +331,6 @@ pub trait Table: Sync + Send {
)))
}

// return the selected block num.
#[async_backtrace::framed]
async fn recluster(
&self,
ctx: Arc<dyn TableContext>,
push_downs: Option<PushDownInfo>,
limit: Option<usize>,
pipeline: &mut Pipeline,
) -> Result<u64> {
let (_, _, _, _) = (ctx, push_downs, limit, pipeline);

Err(ErrorCode::Unimplemented(format!(
"table {}, of engine type {}, does not support recluster",
self.name(),
self.get_table_info().engine(),
)))
}

#[async_backtrace::framed]
async fn revert_to(
&self,
Expand Down
11 changes: 0 additions & 11 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,17 +697,6 @@ impl Table for FuseTable {
self.do_compact_blocks(ctx, limit).await
}

#[async_backtrace::framed]
async fn recluster(
&self,
ctx: Arc<dyn TableContext>,
push_downs: Option<PushDownInfo>,
limit: Option<usize>,
pipeline: &mut Pipeline,
) -> Result<u64> {
todo!()
}

#[async_backtrace::framed]
async fn revert_to(
&self,
Expand Down
275 changes: 8 additions & 267 deletions src/query/storages/fuse/src/operations/recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ use crate::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD;

impl FuseTable {
#[async_backtrace::framed]
pub(crate) async fn do_reclusters(
pub(crate) async fn build_recluster_mutator(
&self,
ctx: Arc<dyn TableContext>,
push_downs: Option<PushDownInfo>,
limit: Option<usize>,
max_tasks: usize,
) -> Result<Option<ReclusterMutator>> {
if self.cluster_key_meta.is_none() {
return Ok(None);
Expand All @@ -75,6 +74,12 @@ impl FuseTable {
return Ok(None);
};

let mut nodes_num = 1;
let cluster = ctx.get_cluster();
if !cluster.is_empty() {
nodes_num = cluster.nodes.len();
}

let schema = self.schema();
let default_cluster_key_id = self.cluster_key_meta.clone().unwrap().0;
let block_thresholds = self.get_block_thresholds();
Expand All @@ -94,7 +99,7 @@ impl FuseTable {
threshold,
block_thresholds,
default_cluster_key_id,
max_tasks,
nodes_num,
)?;

let segment_locations = snapshot.segments.clone();
Expand Down Expand Up @@ -290,270 +295,6 @@ impl FuseTable {
})
}

/// The flow of Pipeline is as follows:
// ┌──────────┐ ┌───────────────┐ ┌─────────┐
// │FuseSource├────►│CompoundBlockOp├────►│SortMerge├────┐
// └──────────┘ └───────────────┘ └─────────┘ │
// ┌──────────┐ ┌───────────────┐ ┌─────────┐ │ ┌──────────────┐ ┌─────────┐
// │FuseSource├────►│CompoundBlockOp├────►│SortMerge├────┤────►│MultiSortMerge├────►│Resize(N)├───┐
// └──────────┘ └───────────────┘ └─────────┘ │ └──────────────┘ └─────────┘ │
// ┌──────────┐ ┌───────────────┐ ┌─────────┐ │ │
// │FuseSource├────►│CompoundBlockOp├────►│SortMerge├────┘ │
// └──────────┘ └───────────────┘ └─────────┘ │
// ┌──────────────────────────────────────────────────────────────────────────────────────────────┘
// │ ┌──────────────┐
// │ ┌───►│SerializeBlock├───┐
// │ │ └──────────────┘ │
// │ │ ┌──────────────┐ │ ┌─────────┐ ┌────────────────┐ ┌─────────────────┐ ┌──────────┐
// └───►│───►│SerializeBlock├───┤───►│Resize(1)├───►│SerializeSegment├────►│TableMutationAggr├────►│CommitSink│
// │ └──────────────┘ │ └─────────┘ └────────────────┘ └─────────────────┘ └──────────┘
// │ ┌──────────────┐ │
// └───►│SerializeBlock├───┘
// └──────────────┘
// #[async_backtrace::framed]
// pub(crate) async fn do_recluster(
// &self,
// ctx: Arc<dyn TableContext>,
// push_downs: Option<PushDownInfo>,
// limit: Option<usize>,
// pipeline: &mut Pipeline,
// ) -> Result<u64> {
// if self.cluster_key_meta.is_none() {
// return Ok(0);
// }
//
// let snapshot_opt = self.read_table_snapshot().await?;
// let snapshot = if let Some(val) = snapshot_opt {
// val
// } else {
// no snapshot, no recluster.
// return Ok(0);
// };
//
// let default_cluster_key_id = self.cluster_key_meta.clone().unwrap().0;
// let block_thresholds = self.get_block_thresholds();
// let block_per_seg =
// self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT);
// let avg_depth_threshold = self.get_option(
// FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD,
// DEFAULT_AVG_DEPTH_THRESHOLD,
// );
// let block_count = snapshot.summary.block_count;
// let threshold = (block_count as f64 * avg_depth_threshold)
// .max(1.0)
// .min(64.0);
// let mut mutator = ReclusterMutator::try_create(
// ctx.clone(),
// threshold,
// block_thresholds,
// default_cluster_key_id,
// )?;
//
// let segment_locations = snapshot.segments.clone();
// let segment_locations = create_segment_location_vector(segment_locations, None);
//
// let max_threads = ctx.get_settings().get_max_threads()? as usize;
// let limit = limit.unwrap_or(1000);
//
// let mut need_recluster = false;
// for chunk in segment_locations.chunks(limit) {
// read segments.
// let compact_segments = Self::segment_pruning(
// &ctx,
// self.schema(),
// self.get_operator(),
// &push_downs,
// chunk.to_vec(),
// )
// .await?;
// if compact_segments.is_empty() {
// continue;
// }
//
// select the segments with the highest depth.
// let selected_segs = ReclusterMutator::select_segments(
// &compact_segments,
// block_per_seg,
// max_threads * 2,
// default_cluster_key_id,
// )?;
// select the blocks with the highest depth.
// if selected_segs.is_empty() {
// for compact_segment in compact_segments.into_iter() {
// if !ReclusterMutator::segment_can_recluster(
// &compact_segment.1.summary,
// block_per_seg,
// default_cluster_key_id,
// ) {
// continue;
// }
//
// if mutator.target_select(vec![compact_segment]).await? {
// need_recluster = true;
// break;
// }
// }
// } else {
// let mut selected_segments = Vec::with_capacity(selected_segs.len());
// selected_segs.into_iter().for_each(|i| {
// selected_segments.push(compact_segments[i].clone());
// });
// need_recluster = mutator.target_select(selected_segments).await?;
// }
//
// if need_recluster {
// break;
// }
// }
//
// let block_metas: Vec<_> = mutator
// .take_blocks()
// .iter()
// .map(|meta| (None, meta.clone()))
// .collect();
// let block_count = block_metas.len();
// if block_count < 2 {
// return Ok(0);
// }
//
// Status.
// {
// let status = format!(
// "recluster: select block files: {}, total bytes: {}, total rows: {}",
// block_count, mutator.total_bytes, mutator.total_rows,
// );
// ctx.set_status_info(&status);
// info!("{}", status);
// }
//
// let (statistics, parts) = self.read_partitions_with_metas(
// ctx.clone(),
// self.table_info.schema(),
// None,
// &block_metas,
// block_count,
// PruningStatistics::default(),
// )?;
// let table_info = self.get_table_info();
// let catalog_info = ctx.get_catalog(table_info.catalog()).await?.info();
// let description = statistics.get_description(&table_info.desc);
// let plan = DataSourcePlan {
// catalog_info,
// source_info: DataSourceInfo::TableSource(table_info.clone()),
// output_schema: table_info.schema(),
// parts,
// statistics,
// description,
// tbl_args: self.table_args(),
// push_downs: None,
// query_internal_columns: false,
// data_mask_policy: None,
// };
//
// ctx.set_partitions(plan.parts.clone())?;
//
// ReadDataKind to avoid OOM.
// self.do_read_data(ctx.clone(), &plan, pipeline)?;
//
// let cluster_stats_gen =
// self.get_cluster_stats_gen(ctx.clone(), mutator.level + 1, block_thresholds)?;
// let operators = cluster_stats_gen.operators.clone();
// if !operators.is_empty() {
// let func_ctx2 = cluster_stats_gen.func_ctx.clone();
// pipeline.add_transform(move |input, output| {
// Ok(ProcessorPtr::create(CompoundBlockOperator::create(
// input,
// output,
// func_ctx2.clone(),
// operators.clone(),
// )))
// })?;
// }
//
// merge sort
// let block_num = mutator
// .total_bytes
// .div_ceil(block_thresholds.max_bytes_per_block);
// let final_block_size = std::cmp::min(
// estimate block_size based on max_bytes_per_block.
// mutator.total_rows / block_num,
// block_thresholds.max_rows_per_block,
// );
// let partial_block_size = if pipeline.output_len() > 1 {
// std::cmp::min(
// final_block_size,
// ctx.get_settings().get_max_block_size()? as usize,
// )
// } else {
// final_block_size
// };
// construct output fields
// let output_fields: Vec<DataField> = cluster_stats_gen.out_fields.clone();
// let schema = DataSchemaRefExt::create(output_fields);
// let sort_descs: Vec<SortColumnDescription> = cluster_stats_gen
// .cluster_key_index
// .iter()
// .map(|offset| SortColumnDescription {
// offset: *offset,
// asc: true,
// nulls_first: false,
// is_nullable: false, // This information is not needed here.
// })
// .collect();
//
// build_merge_sort_pipeline(
// pipeline,
// schema,
// sort_descs,
// None,
// partial_block_size,
// final_block_size,
// None,
// )?;
//
// let output_block_num = mutator.total_rows.div_ceil(final_block_size);
// let max_threads = std::cmp::min(max_threads, output_block_num);
// pipeline.try_resize(max_threads)?;
// pipeline.add_transform(|transform_input_port, transform_output_port| {
// let proc = TransformSerializeBlock::try_create(
// ctx.clone(),
// transform_input_port,
// transform_output_port,
// self,
// cluster_stats_gen.clone(),
// )?;
// proc.into_processor()
// })?;
//
// pipeline.try_resize(1)?;
// pipeline.add_transform(|input, output| {
// let aggregator = ReclusterAggregator::new(
// &mutator,
// self.get_operator(),
// self.meta_location_generator().clone(),
// block_per_seg,
// );
// Ok(ProcessorPtr::create(AsyncAccumulatingTransformer::create(
// input, output, aggregator,
// )))
// })?;
//
// let snapshot_gen = MutationGenerator::new(snapshot);
// pipeline.add_sink(|input| {
// CommitSink::try_create(
// self,
// ctx.clone(),
// None,
// snapshot_gen.clone(),
// input,
// None,
// true,
// None,
// )
// })?;
// Ok(block_count as u64)
// }

pub async fn segment_pruning(
ctx: &Arc<dyn TableContext>,
schema: TableSchemaRef,
Expand Down

0 comments on commit c00121f

Please sign in to comment.