Skip to content

Commit

Permalink
chore: release useless columns at once. (#13105)
Browse files Browse the repository at this point in the history
Co-authored-by: Yang Xiufeng <[email protected]>
  • Loading branch information
RinChanNOWWW and youngsofun authored Oct 8, 2023
1 parent 83f7001 commit 534002d
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ pub struct PredicateAndTopkPolicyBuilder {

src_schema: DataSchemaRef,
dst_schema: DataSchemaRef,

/// Record which prefetched columns are needed to be output.
/// Other prefetched columns can be released immediately.
output_prefetched_field_indices: Vec<usize>,
}

impl PredicateAndTopkPolicyBuilder {
Expand Down Expand Up @@ -96,15 +100,23 @@ impl PredicateAndTopkPolicyBuilder {
remain_schema.fields().clone()
};

let (prefetch_fields, topk) = if let Some(topk) = topk {
let fields = vec![topk.field.clone()]
.into_iter()
.chain(predicate.schema().fields().clone().into_iter())
.collect::<Vec<_>>();
(fields, Some(topk.topk.clone()))
} else {
(predicate.schema().fields().clone(), None)
};
let mut output_prefetched_field = vec![];
let mut output_prefetched_field_indices = vec![];
let offset = topk.is_some() as usize;
if let Some(topk) = topk {
if output_schema.has_field(&topk.field.name) {
output_prefetched_field.push(topk.field.clone());
output_prefetched_field_indices.push(0);
}
}
let topk = topk.map(|t| t.topk.clone());
for (index, field) in predicate.schema().fields().iter().enumerate() {
if !output_schema.has_field(&field.name) {
continue;
}
output_prefetched_field.push(field.clone());
output_prefetched_field_indices.push(offset + index);
}

let remain_field_levels =
parquet_to_arrow_field_levels(schema_desc, remain_projection.clone(), None)?;
Expand All @@ -117,7 +129,7 @@ impl PredicateAndTopkPolicyBuilder {
)?);

let mut src_schema = remain_schema;
src_schema.fields.extend(prefetch_fields);
src_schema.fields.extend(output_prefetched_field);

let src_schema = Arc::new(DataSchema::from(&src_schema));
let dst_schema = Arc::new(DataSchema::from(output_schema));
Expand All @@ -130,6 +142,7 @@ impl PredicateAndTopkPolicyBuilder {
remain_field_paths,
src_schema,
dst_schema,
output_prefetched_field_indices,
}))
}
}
Expand Down Expand Up @@ -214,6 +227,13 @@ impl ReadPolicyBuilder for PredicateAndTopkPolicyBuilder {
}
}

// Only retain the columns that are needed to be output. Release other columns.
let mut needed_columns = Vec::with_capacity(self.output_prefetched_field_indices.len());
for index in self.output_prefetched_field_indices.iter() {
needed_columns.push(prefetched.columns()[*index].clone());
}
let prefetched = DataBlock::new(needed_columns, prefetched.num_rows());

// Slice the prefetched block by `batch_size`.
let mut prefetched_blocks = VecDeque::with_capacity(num_rows.div_ceil(batch_size));
if num_rows > batch_size {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ pub struct TopkOnlyPolicyBuilder {

src_schema: DataSchemaRef,
dst_schema: DataSchemaRef,

/// If the topk column is in the output columns.
topk_in_output: bool,
}

impl TopkOnlyPolicyBuilder {
Expand All @@ -73,12 +76,16 @@ impl TopkOnlyPolicyBuilder {
.filter(|i| i != leaf_id)
.collect::<Vec<_>>();
let remain_projection = ProjectionMask::leaves(schema_desc, remain_leaves);
let remain_fields = output_schema
.fields()
.iter()
.cloned()
.filter(|f| f.name() != topk_field.name())
.collect::<Vec<_>>();

let mut remain_fields = Vec::with_capacity(output_schema.num_fields());
let mut topk_in_output = false;
for f in output_schema.fields() {
if f.name() != topk_field.name() {
remain_fields.push(f.clone());
} else {
topk_in_output = true;
}
}
let remain_schema = TableSchema::new(remain_fields);
let remain_field_levels =
parquet_to_arrow_field_levels(schema_desc, remain_projection.clone(), None)?;
Expand All @@ -90,7 +97,9 @@ impl TopkOnlyPolicyBuilder {
)?);

let mut src_schema = remain_schema;
src_schema.fields.push(topk_field.clone());
if topk_in_output {
src_schema.fields.push(topk_field.clone());
}
let src_schema = Arc::new(DataSchema::from(&src_schema));
let dst_schema = Arc::new(DataSchema::from(output_schema));

Expand All @@ -101,6 +110,7 @@ impl TopkOnlyPolicyBuilder {
remain_field_paths,
src_schema,
dst_schema,
topk_in_output,
}))
}
}
Expand Down Expand Up @@ -140,18 +150,24 @@ impl ReadPolicyBuilder for TopkOnlyPolicyBuilder {
return Ok(None);
};

// Slice the prefetched block by `batch_size`.
num_rows = prefetched.num_rows();
let mut prefetched_cols = VecDeque::with_capacity(num_rows.div_ceil(batch_size));
if num_rows > batch_size {
for i in (0..num_rows).step_by(batch_size) {
let end = std::cmp::min(i + batch_size, num_rows);
let block = prefetched.slice(i..end);
prefetched_cols.push_back(block.columns()[0].clone());
// Only store the topk column when we need to output it.
let prefetched_cols = if self.topk_in_output {
// Slice the prefetched block by `batch_size`.
num_rows = prefetched.num_rows();
let mut prefetched_cols = VecDeque::with_capacity(num_rows.div_ceil(batch_size));
if num_rows > batch_size {
for i in (0..num_rows).step_by(batch_size) {
let end = std::cmp::min(i + batch_size, num_rows);
let block = prefetched.slice(i..end);
prefetched_cols.push_back(block.columns()[0].clone());
}
} else {
prefetched_cols.push_back(prefetched.columns()[0].clone());
}
Some(prefetched_cols)
} else {
prefetched_cols.push_back(prefetched.columns()[0].clone());
}
None
};

// Fetch remain columns.
row_group
Expand All @@ -177,7 +193,7 @@ impl ReadPolicyBuilder for TopkOnlyPolicyBuilder {
/// We will prefetch the topk column (must be 1 column) and update the topk heap ([`TopKSorter`]),
/// and then read other columns.
pub struct TopkOnlyPolicy {
prefetched: VecDeque<BlockEntry>,
prefetched: Option<VecDeque<BlockEntry>>,
reader: ParquetRecordBatchReader,

/// See the comments of `field_paths` in [`super::NoPrefetchPolicy`].
Expand All @@ -192,14 +208,20 @@ impl ReadPolicy for TopkOnlyPolicy {
fn read_block(&mut self) -> Result<Option<DataBlock>> {
let batch = self.reader.next().transpose()?;
if let Some(batch) = batch {
debug_assert!(!self.prefetched.is_empty());
let prefetched = self.prefetched.pop_front().unwrap();
debug_assert!(
self.prefetched.is_none() || !self.prefetched.as_ref().unwrap().is_empty()
);
let mut block = transform_record_batch(&batch, &self.remain_field_paths)?;
block.add_column(prefetched);
if let Some(q) = self.prefetched.as_mut() {
let prefetched = q.pop_front().unwrap();
block.add_column(prefetched);
}
let block = block.resort(&self.src_schema, &self.dst_schema)?;
Ok(Some(block))
} else {
debug_assert!(self.prefetched.is_empty());
debug_assert!(
self.prefetched.is_none() || self.prefetched.as_ref().unwrap().is_empty()
);
Ok(None)
}
}
Expand Down

1 comment on commit 534002d

@vercel
Copy link

@vercel vercel bot commented on 534002d Oct 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.