From 534002da6b331da7fda32092aa3fe1b5b180e5c1 Mon Sep 17 00:00:00 2001 From: RinChanNOW Date: Sun, 8 Oct 2023 10:08:46 +0800 Subject: [PATCH] chore: release useless columns at once. (#13105) Co-authored-by: Yang Xiufeng --- .../read_policy/predicate_and_topk.rs | 40 ++++++++--- .../parquet_reader/read_policy/topk_only.rs | 66 ++++++++++++------- 2 files changed, 74 insertions(+), 32 deletions(-) diff --git a/src/query/storages/parquet/src/parquet_rs/parquet_reader/read_policy/predicate_and_topk.rs b/src/query/storages/parquet/src/parquet_rs/parquet_reader/read_policy/predicate_and_topk.rs index f3a9e31854cb..f232f1ae243f 100644 --- a/src/query/storages/parquet/src/parquet_rs/parquet_reader/read_policy/predicate_and_topk.rs +++ b/src/query/storages/parquet/src/parquet_rs/parquet_reader/read_policy/predicate_and_topk.rs @@ -54,6 +54,10 @@ pub struct PredicateAndTopkPolicyBuilder { src_schema: DataSchemaRef, dst_schema: DataSchemaRef, + + /// Record which prefetched columns are needed to be output. + /// Other prefetched columns can be released immediately. + output_prefetched_field_indices: Vec, } impl PredicateAndTopkPolicyBuilder { @@ -96,15 +100,23 @@ impl PredicateAndTopkPolicyBuilder { remain_schema.fields().clone() }; - let (prefetch_fields, topk) = if let Some(topk) = topk { - let fields = vec![topk.field.clone()] - .into_iter() - .chain(predicate.schema().fields().clone().into_iter()) - .collect::>(); - (fields, Some(topk.topk.clone())) - } else { - (predicate.schema().fields().clone(), None) - }; + let mut output_prefetched_field = vec![]; + let mut output_prefetched_field_indices = vec![]; + let offset = topk.is_some() as usize; + if let Some(topk) = topk { + if output_schema.has_field(&topk.field.name) { + output_prefetched_field.push(topk.field.clone()); + output_prefetched_field_indices.push(0); + } + } + let topk = topk.map(|t| t.topk.clone()); + for (index, field) in predicate.schema().fields().iter().enumerate() { + if !output_schema.has_field(&field.name) { + continue; + } + output_prefetched_field.push(field.clone()); + output_prefetched_field_indices.push(offset + index); + } let remain_field_levels = parquet_to_arrow_field_levels(schema_desc, remain_projection.clone(), None)?; @@ -117,7 +129,7 @@ impl PredicateAndTopkPolicyBuilder { )?); let mut src_schema = remain_schema; - src_schema.fields.extend(prefetch_fields); + src_schema.fields.extend(output_prefetched_field); let src_schema = Arc::new(DataSchema::from(&src_schema)); let dst_schema = Arc::new(DataSchema::from(output_schema)); @@ -130,6 +142,7 @@ impl PredicateAndTopkPolicyBuilder { remain_field_paths, src_schema, dst_schema, + output_prefetched_field_indices, })) } } @@ -214,6 +227,13 @@ impl ReadPolicyBuilder for PredicateAndTopkPolicyBuilder { } } + // Only retain the columns that are needed to be output. Release other columns. + let mut needed_columns = Vec::with_capacity(self.output_prefetched_field_indices.len()); + for index in self.output_prefetched_field_indices.iter() { + needed_columns.push(prefetched.columns()[*index].clone()); + } + let prefetched = DataBlock::new(needed_columns, prefetched.num_rows()); + // Slice the prefetched block by `batch_size`. let mut prefetched_blocks = VecDeque::with_capacity(num_rows.div_ceil(batch_size)); if num_rows > batch_size { diff --git a/src/query/storages/parquet/src/parquet_rs/parquet_reader/read_policy/topk_only.rs b/src/query/storages/parquet/src/parquet_rs/parquet_reader/read_policy/topk_only.rs index 00f55d5861bb..b724b3e64f33 100644 --- a/src/query/storages/parquet/src/parquet_rs/parquet_reader/read_policy/topk_only.rs +++ b/src/query/storages/parquet/src/parquet_rs/parquet_reader/read_policy/topk_only.rs @@ -50,6 +50,9 @@ pub struct TopkOnlyPolicyBuilder { src_schema: DataSchemaRef, dst_schema: DataSchemaRef, + + /// If the topk column is in the output columns. + topk_in_output: bool, } impl TopkOnlyPolicyBuilder { @@ -73,12 +76,16 @@ impl TopkOnlyPolicyBuilder { .filter(|i| i != leaf_id) .collect::>(); let remain_projection = ProjectionMask::leaves(schema_desc, remain_leaves); - let remain_fields = output_schema - .fields() - .iter() - .cloned() - .filter(|f| f.name() != topk_field.name()) - .collect::>(); + + let mut remain_fields = Vec::with_capacity(output_schema.num_fields()); + let mut topk_in_output = false; + for f in output_schema.fields() { + if f.name() != topk_field.name() { + remain_fields.push(f.clone()); + } else { + topk_in_output = true; + } + } let remain_schema = TableSchema::new(remain_fields); let remain_field_levels = parquet_to_arrow_field_levels(schema_desc, remain_projection.clone(), None)?; @@ -90,7 +97,9 @@ impl TopkOnlyPolicyBuilder { )?); let mut src_schema = remain_schema; - src_schema.fields.push(topk_field.clone()); + if topk_in_output { + src_schema.fields.push(topk_field.clone()); + } let src_schema = Arc::new(DataSchema::from(&src_schema)); let dst_schema = Arc::new(DataSchema::from(output_schema)); @@ -101,6 +110,7 @@ impl TopkOnlyPolicyBuilder { remain_field_paths, src_schema, dst_schema, + topk_in_output, })) } } @@ -140,18 +150,24 @@ impl ReadPolicyBuilder for TopkOnlyPolicyBuilder { return Ok(None); }; - // Slice the prefetched block by `batch_size`. - num_rows = prefetched.num_rows(); - let mut prefetched_cols = VecDeque::with_capacity(num_rows.div_ceil(batch_size)); - if num_rows > batch_size { - for i in (0..num_rows).step_by(batch_size) { - let end = std::cmp::min(i + batch_size, num_rows); - let block = prefetched.slice(i..end); - prefetched_cols.push_back(block.columns()[0].clone()); + // Only store the topk column when we need to output it. + let prefetched_cols = if self.topk_in_output { + // Slice the prefetched block by `batch_size`. + num_rows = prefetched.num_rows(); + let mut prefetched_cols = VecDeque::with_capacity(num_rows.div_ceil(batch_size)); + if num_rows > batch_size { + for i in (0..num_rows).step_by(batch_size) { + let end = std::cmp::min(i + batch_size, num_rows); + let block = prefetched.slice(i..end); + prefetched_cols.push_back(block.columns()[0].clone()); + } + } else { + prefetched_cols.push_back(prefetched.columns()[0].clone()); } + Some(prefetched_cols) } else { - prefetched_cols.push_back(prefetched.columns()[0].clone()); - } + None + }; // Fetch remain columns. row_group @@ -177,7 +193,7 @@ impl ReadPolicyBuilder for TopkOnlyPolicyBuilder { /// We will prefetch the topk column (must be 1 column) and update the topk heap ([`TopKSorter`]), /// and then read other columns. pub struct TopkOnlyPolicy { - prefetched: VecDeque, + prefetched: Option>, reader: ParquetRecordBatchReader, /// See the comments of `field_paths` in [`super::NoPrefetchPolicy`]. @@ -192,14 +208,20 @@ impl ReadPolicy for TopkOnlyPolicy { fn read_block(&mut self) -> Result> { let batch = self.reader.next().transpose()?; if let Some(batch) = batch { - debug_assert!(!self.prefetched.is_empty()); - let prefetched = self.prefetched.pop_front().unwrap(); + debug_assert!( + self.prefetched.is_none() || !self.prefetched.as_ref().unwrap().is_empty() + ); let mut block = transform_record_batch(&batch, &self.remain_field_paths)?; - block.add_column(prefetched); + if let Some(q) = self.prefetched.as_mut() { + let prefetched = q.pop_front().unwrap(); + block.add_column(prefetched); + } let block = block.resort(&self.src_schema, &self.dst_schema)?; Ok(Some(block)) } else { - debug_assert!(self.prefetched.is_empty()); + debug_assert!( + self.prefetched.is_none() || self.prefetched.as_ref().unwrap().is_empty() + ); Ok(None) } }