Skip to content

Commit

Permalink
Refactor SortedStream trait, output block along with order column.
Browse files Browse the repository at this point in the history
  • Loading branch information
RinChanNOWWW committed Dec 15, 2023
1 parent e2649c9 commit 0fc2a5d
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 130 deletions.
7 changes: 7 additions & 0 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,13 @@ impl DataBlock {
}
DataBlock::new_with_meta(columns, self.num_rows, self.meta)
}

#[inline]
pub fn get_last_column(&self) -> &Column {
debug_assert!(!self.columns.is_empty());
debug_assert!(self.columns.last().unwrap().value.as_column().is_some());
self.columns.last().unwrap().value.as_column().unwrap()
}
}

impl TryFrom<DataBlock> for ArrowChunk<ArrayRef> {
Expand Down
1 change: 1 addition & 0 deletions src/query/pipeline/transforms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
#![feature(core_intrinsics)]
#![feature(int_roundings)]
#![feature(binary_heap_as_slice)]
#![feature(let_chains)]

pub mod processors;
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,27 @@ use std::collections::VecDeque;
use std::sync::Arc;

use common_exception::Result;
use common_expression::Column;
use common_expression::DataBlock;
use common_expression::DataSchemaRef;
use common_expression::SortColumnDescription;

use super::utils::find_bigger_child_of_root;
use super::Cursor;
use super::Rows;
use crate::processors::sort::utils::get_ordered_rows;

#[async_trait::async_trait]
pub trait SortedStream {
/// Returns the next block and if it is pending.
/// Returns the next block with the order column and if it is pending.
///
/// If the block is [None] and it's not pending, it means the stream is finished.
/// If the block is [None] but it's pending, it means the stream is not finished yet.
fn next(&mut self) -> Result<(Option<DataBlock>, bool)> {
fn next(&mut self) -> Result<(Option<(DataBlock, Column)>, bool)> {
Ok((None, false))
}

/// The async version of `next`.
async fn async_next(&mut self) -> Result<(Option<DataBlock>, bool)> {
async fn async_next(&mut self) -> Result<(Option<(DataBlock, Column)>, bool)> {
self.next()
}
}
Expand All @@ -56,7 +56,6 @@ where
buffer: Vec<DataBlock>,
pending_stream: VecDeque<usize>,
batch_size: usize,
output_order_col: bool,

temp_sorted_num_rows: usize,
temp_output_indices: Vec<(usize, usize, usize)>,
Expand All @@ -73,12 +72,10 @@ where
streams: Vec<S>,
sort_desc: Arc<Vec<SortColumnDescription>>,
batch_size: usize,
output_order_col: bool,
) -> Self {
// We only create a merger when there are at least two streams.
debug_assert!(streams.len() > 1);
debug_assert!(schema.num_fields() > 0);
debug_assert_eq!(schema.fields.last().unwrap().name(), "_order_col");
debug_assert!(streams.len() > 1, "streams.len() = {}", streams.len());

let heap = BinaryHeap::with_capacity(streams.len());
let buffer = vec![DataBlock::empty_with_schema(schema.clone()); streams.len()];
let pending_stream = (0..streams.len()).collect();
Expand All @@ -89,7 +86,6 @@ where
heap,
buffer,
batch_size,
output_order_col,
sort_desc,
pending_stream,
temp_sorted_num_rows: 0,
Expand All @@ -103,16 +99,13 @@ where
let mut continue_pendings = Vec::new();
while let Some(i) = self.pending_stream.pop_front() {
debug_assert!(self.buffer[i].is_empty());
let (block, pending) = self.unsorted_streams[i].async_next().await?;
let (input, pending) = self.unsorted_streams[i].async_next().await?;
if pending {
continue_pendings.push(i);
continue;
}
if let Some(mut block) = block {
let rows = get_ordered_rows(&block, &self.sort_desc)?;
if !self.output_order_col {
block.pop_columns(1);
}
if let Some((block, col)) = input {
let rows = R::from_column(&col, &self.sort_desc)?;
let cursor = Cursor::new(i, rows);
self.heap.push(Reverse(cursor));
self.buffer[i] = block;
Expand All @@ -127,16 +120,13 @@ where
let mut continue_pendings = Vec::new();
while let Some(i) = self.pending_stream.pop_front() {
debug_assert!(self.buffer[i].is_empty());
let (block, pending) = self.unsorted_streams[i].next()?;
let (input, pending) = self.unsorted_streams[i].next()?;
if pending {
continue_pendings.push(i);
continue;
}
if let Some(mut block) = block {
let rows = get_ordered_rows(&block, &self.sort_desc)?;
if !self.output_order_col {
block.pop_columns(1);
}
if let Some((block, col)) = input {
let rows = R::from_column(&col, &self.sort_desc)?;
let cursor = Cursor::new(i, rows);
self.heap.push(Reverse(cursor));
self.buffer[i] = block;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Rows for StringColumn {
Column::String(self.clone())
}

fn from_column(col: Column, _: &[SortColumnDescription]) -> Option<Self> {
fn try_from_column(col: &Column, _: &[SortColumnDescription]) -> Option<Self> {
col.as_string().cloned()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod common;
mod simple;

pub use common::*;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::DataType;
use common_expression::BlockEntry;
Expand Down Expand Up @@ -45,7 +46,17 @@ where Self: Sized + Clone
fn len(&self) -> usize;
fn row(&self, index: usize) -> Self::Item<'_>;
fn to_column(&self) -> Column;
fn from_column(col: Column, desc: &[SortColumnDescription]) -> Option<Self>;

fn from_column(col: &Column, desc: &[SortColumnDescription]) -> Result<Self> {
Self::try_from_column(col, desc).ok_or_else(|| {
ErrorCode::BadDataValueType(format!(
"Order column type mismatched. Expecetd {} but got {}",
Self::data_type(),
col.data_type()
))
})
}
fn try_from_column(col: &Column, desc: &[SortColumnDescription]) -> Option<Self>;

fn data_type() -> DataType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ where
T::upcast_column(self.inner.clone())
}

fn from_column(col: Column, desc: &[SortColumnDescription]) -> Option<Self> {
let inner = T::try_downcast_column(&col)?;
fn try_from_column(col: &Column, desc: &[SortColumnDescription]) -> Option<Self> {
let inner = T::try_downcast_column(col)?;
Some(Self {
inner,
desc: !desc[0].asc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

use std::collections::BinaryHeap;

use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::types::DataType;
use common_expression::DataField;
use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_expression::DataSchemaRefExt;
use common_expression::SortColumnDescription;

use super::Rows;
pub const ORDER_COL_NAME: &'static str = "_order_col";

/// Find the bigger child of the root of the heap.
#[inline(always)]
Expand All @@ -34,13 +36,30 @@ pub fn find_bigger_child_of_root<T: Ord>(heap: &BinaryHeap<T>) -> &T {
}

#[inline(always)]
pub fn get_ordered_rows<R: Rows>(block: &DataBlock, desc: &[SortColumnDescription]) -> Result<R> {
let order_col = block.columns().last().unwrap().value.as_column().unwrap();
R::from_column(order_col.clone(), desc).ok_or_else(|| {
let expected_ty = R::data_type();
let ty = order_col.data_type();
ErrorCode::BadDataValueType(format!(
"Order column type mismatched. Expecetd {expected_ty} but got {ty}"
))
})
fn order_field_type(schema: &DataSchema, desc: &[SortColumnDescription]) -> DataType {
debug_assert!(!desc.is_empty());
if desc.len() == 1 {
let order_by_field = schema.field(desc[0].offset);
if matches!(
order_by_field.data_type(),
DataType::Number(_) | DataType::Date | DataType::Timestamp | DataType::String
) {
return order_by_field.data_type().clone();
}
}
DataType::String
}

#[inline(always)]
pub fn add_order_field(schema: DataSchemaRef, desc: &[SortColumnDescription]) -> DataSchemaRef {
if let Some(f) = schema.fields.last() && f.name() == ORDER_COL_NAME {
schema
} else {
let mut fields = schema.fields().clone();
fields.push(DataField::new(
ORDER_COL_NAME,
order_field_type(&schema, desc),
));
DataSchemaRefExt::create(fields)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use common_pipeline_core::Pipeline;
use common_profile::SharedProcessorProfiles;

use super::sort::utils::find_bigger_child_of_root;
use super::sort::utils::get_ordered_rows;
use super::sort::Cursor;
use super::sort::Rows;
use super::sort::SimpleRows;
Expand Down Expand Up @@ -513,7 +512,7 @@ where R: Rows + Send + 'static
continue;
}
let mut block = block.convert_to_full();
let rows = get_ordered_rows(&block, &self.sort_desc)?;
let rows = R::from_column(block.get_last_column(), &self.sort_desc)?;
// Remove the order column
if self.remove_order_col {
block.pop_columns(1);
Expand Down
Loading

0 comments on commit 0fc2a5d

Please sign in to comment.