Skip to content

Commit

Permalink
Replace the merger in TransformSortMerge with HeapMerger.
Browse files Browse the repository at this point in the history
  • Loading branch information
RinChanNOWWW committed Dec 14, 2023
1 parent ab84ac2 commit 749b8bf
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::cmp::Ordering;

use common_expression::Column;

use super::rows::Rows;

/// A cursor point to a certain row in a data block.
Expand Down Expand Up @@ -64,6 +66,11 @@ impl<R: Rows> Cursor<R> {
pub fn num_rows(&self) -> usize {
self.num_rows
}

#[inline]
pub fn to_column(&self) -> Column {
self.rows.to_column()
}
}

impl<R: Rows> Ord for Cursor<R> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::collections::BinaryHeap;
use std::collections::VecDeque;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::DataSchemaRef;
Expand All @@ -26,6 +25,7 @@ use common_expression::SortColumnDescription;
use super::utils::find_bigger_child_of_root;
use super::Cursor;
use super::Rows;
use crate::processors::sort::utils::get_ordered_rows;

#[async_trait::async_trait]
pub trait SortedStream {
Expand Down Expand Up @@ -56,6 +56,7 @@ where
buffer: Vec<DataBlock>,
pending_stream: VecDeque<usize>,
batch_size: usize,
output_order_col: bool,

temp_sorted_num_rows: usize,
temp_output_indices: Vec<(usize, usize, usize)>,
Expand All @@ -72,9 +73,12 @@ where
streams: Vec<S>,
sort_desc: Arc<Vec<SortColumnDescription>>,
batch_size: usize,
output_order_col: bool,
) -> Self {
// We only create a merger when there are at least two streams.
debug_assert!(streams.len() > 1);
debug_assert!(schema.num_fields() > 0);
debug_assert_eq!(schema.fields.last().unwrap().name(), "_order_col");
let heap = BinaryHeap::with_capacity(streams.len());
let buffer = vec![DataBlock::empty_with_schema(schema.clone()); streams.len()];
let pending_stream = (0..streams.len()).collect();
Expand All @@ -85,6 +89,7 @@ where
heap,
buffer,
batch_size,
output_order_col,
sort_desc,
pending_stream,
temp_sorted_num_rows: 0,
Expand All @@ -103,10 +108,11 @@ where
continue_pendings.push(i);
continue;
}
if let Some(block) = block {
let order_col = block.columns().last().unwrap().value.as_column().unwrap();
let rows = R::from_column(order_col.clone(), &self.sort_desc)
.ok_or_else(|| ErrorCode::BadDataValueType("Order column type mismatched."))?;
if let Some(mut block) = block {
let rows = get_ordered_rows(&block, &self.sort_desc)?;
if !self.output_order_col {
block.pop_columns(1);
}
let cursor = Cursor::new(i, rows);
self.heap.push(Reverse(cursor));
self.buffer[i] = block;
Expand All @@ -126,10 +132,11 @@ where
continue_pendings.push(i);
continue;
}
if let Some(block) = block {
let order_col = block.columns().last().unwrap().value.as_column().unwrap();
let rows = R::from_column(order_col.clone(), &self.sort_desc)
.ok_or_else(|| ErrorCode::BadDataValueType("Order column type mismatched."))?;
if let Some(mut block) = block {
let rows = get_ordered_rows(&block, &self.sort_desc)?;
if !self.output_order_col {
block.pop_columns(1);
}
let cursor = Cursor::new(i, rows);
self.heap.push(Reverse(cursor));
self.buffer[i] = block;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ impl Rows for StringColumn {
fn from_column(col: Column, _: &[SortColumnDescription]) -> Option<Self> {
col.as_string().cloned()
}

fn data_type() -> DataType {
DataType::String
}
}

impl RowConverter<StringColumn> for CommonRowConverter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod simple;

pub use common::*;
use common_exception::Result;
use common_expression::types::DataType;
use common_expression::BlockEntry;
use common_expression::Column;
use common_expression::DataSchemaRef;
Expand Down Expand Up @@ -46,6 +47,8 @@ where Self: Sized + Clone
fn to_column(&self) -> Column;
fn from_column(col: Column, desc: &[SortColumnDescription]) -> Option<Self>;

fn data_type() -> DataType;

fn is_empty(&self) -> bool {
self.len() == 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::marker::PhantomData;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::ArgType;
use common_expression::types::DataType;
use common_expression::types::DateType;
use common_expression::types::StringType;
use common_expression::types::TimestampType;
Expand Down Expand Up @@ -93,7 +94,7 @@ where

impl<T> Rows for SimpleRows<T>
where
T: ValueType,
T: ArgType,
T::Scalar: Ord,
{
type Item<'a> = SimpleRow<T>;
Expand Down Expand Up @@ -121,6 +122,10 @@ where
desc: !desc[0].asc,
})
}

fn data_type() -> DataType {
T::data_type()
}
}

pub type DateConverter = SimpleRowConverter<DateType>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

use std::collections::BinaryHeap;

use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::SortColumnDescription;

use super::Rows;

/// Find the bigger child of the root of the heap.
#[inline(always)]
pub fn find_bigger_child_of_root<T: Ord>(heap: &BinaryHeap<T>) -> &T {
Expand All @@ -25,3 +32,15 @@ pub fn find_bigger_child_of_root<T: Ord>(heap: &BinaryHeap<T>) -> &T {
(&slice[1]).max(&slice[2])
}
}

#[inline(always)]
pub fn get_ordered_rows<R: Rows>(block: &DataBlock, desc: &[SortColumnDescription]) -> Result<R> {
let order_col = block.columns().last().unwrap().value.as_column().unwrap();
R::from_column(order_col.clone(), desc).ok_or_else(|| {
let expected_ty = R::data_type();
let ty = order_col.data_type();
ErrorCode::BadDataValueType(format!(
"Order column type mismatched. Expecetd {expected_ty} but got {ty}"
))
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use common_pipeline_core::Pipeline;
use common_profile::SharedProcessorProfiles;

use super::sort::utils::find_bigger_child_of_root;
use super::sort::utils::get_ordered_rows;
use super::sort::Cursor;
use super::sort::Rows;
use super::sort::SimpleRows;
Expand Down Expand Up @@ -512,17 +513,7 @@ where R: Rows + Send + 'static
continue;
}
let mut block = block.convert_to_full();
let order_col = block
.columns()
.last()
.unwrap()
.value
.as_column()
.unwrap()
.clone();
let rows = R::from_column(order_col, &self.sort_desc).ok_or_else(|| {
ErrorCode::BadDataValueType("Order column type mismatched.")
})?;
let rows = get_ordered_rows(&block, &self.sort_desc)?;
// Remove the order column
if self.remove_order_col {
block.pop_columns(1);
Expand Down
Loading

0 comments on commit 749b8bf

Please sign in to comment.