Skip to content

Commit

Permalink
feat: improve table function fuse_encoding (#13708)
Browse files Browse the repository at this point in the history
* only accepts one parameter
&& add new columns: table_name, column_name

* add new column column_type

* add filter push down

* add test case

* resolve conflict

* fix loop logic err
  • Loading branch information
guojidan authored Nov 21, 2023
1 parent e0dcf43 commit 2d917ff
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,151 +17,211 @@ use std::sync::Arc;
use common_arrow::arrow::datatypes::Field;
use common_arrow::native::read::reader::NativeReader;
use common_arrow::native::stat::stat_simple;
use common_arrow::native::stat::ColumnInfo;
use common_arrow::native::stat::PageBody;
use common_arrow::native::stat::PageInfo;
use common_catalog::plan::Filters;
use common_catalog::table::Table;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::nullable::NullableColumnBuilder;
use common_expression::types::string::StringColumnBuilder;
use common_expression::types::BooleanType;
use common_expression::types::DataType;
use common_expression::types::NumberDataType;
use common_expression::types::StringType;
use common_expression::types::UInt32Type;
use common_expression::BlockEntry;
use common_expression::Column;
use common_expression::DataBlock;
use common_expression::Evaluator;
use common_expression::Expr;
use common_expression::FromData;
use common_expression::FunctionContext;
use common_expression::FunctionRegistry;
use common_expression::RemoteExpr;
use common_expression::TableDataType;
use common_expression::TableField;
use common_expression::TableSchema;
use common_expression::TableSchemaRefExt;
use common_expression::Value;
use common_functions::BUILTIN_FUNCTIONS;
use storages_common_table_meta::meta::SegmentInfo;

use crate::io::BlockReader;
use crate::io::ReadSettings;
use crate::io::SegmentsIO;
use crate::sessions::TableContext;
use crate::FuseStorageFormat;
use crate::FuseTable;

pub struct FuseEncoding<'a> {
pub ctx: Arc<dyn TableContext>,
pub table: &'a FuseTable,
pub column: String,
pub tables: Vec<&'a FuseTable>,
pub limit: Option<usize>,
pub filters: Option<Filters>,
}

impl<'a> FuseEncoding<'a> {
pub fn new(
ctx: Arc<dyn TableContext>,
table: &'a FuseTable,
column: String,
tables: Vec<&'a FuseTable>,
limit: Option<usize>,
filters: Option<Filters>,
) -> Self {
Self {
ctx,
table,
column,
tables,
limit,
filters,
}
}

#[async_backtrace::framed]
pub async fn get_blocks(&self) -> Result<DataBlock> {
let snapshot = self.table.read_table_snapshot().await?;
if snapshot.is_none() {
return Ok(DataBlock::empty_with_schema(Arc::new(
Self::schema().into(),
)));
}
let snapshot = snapshot.unwrap();

let segments_io = SegmentsIO::create(
self.ctx.clone(),
self.table.operator.clone(),
self.table.schema(),
);

let chunk_size = self.ctx.get_settings().get_max_threads()? as usize * 4;

let schema = self.table.schema();
let field = schema.field_with_name(&self.column)?;
if field.is_nested() {
return Err(ErrorCode::Unimplemented(format!(
"Nested column {} is not supported yet",
self.column
)));
}
let column_id = field.column_id;
let arrow_field: Field = field.into();
let mut pages_info = vec![];
for chunk in snapshot.segments.chunks(chunk_size) {
let segments = segments_io
.read_segments::<SegmentInfo>(chunk, false)
.await?;
for segment in segments {
let segment = segment?;
for block in segment.blocks.iter() {
let column_meta = block.col_metas.get(&column_id).unwrap();
let (offset, len) = column_meta.offset_length();
let ranges = vec![(column_id, offset..(offset + len))];
let read_settings = ReadSettings::from_ctx(&self.ctx)?;
let merge_io_read_res = BlockReader::merge_io_read(
&read_settings,
self.table.operator.clone(),
&block.location.0,
ranges,
true,
)
let mut info = Vec::new();
for table in self.tables.clone() {
if matches!(table.storage_format, FuseStorageFormat::Parquet) {
continue;
}
let mut columns_info = vec![];
let snapshot = table.read_table_snapshot().await?;
if snapshot.is_none() {
continue;
}
let snapshot = snapshot.unwrap();

let segments_io =
SegmentsIO::create(self.ctx.clone(), table.operator.clone(), table.schema());

let chunk_size = self.ctx.get_settings().get_max_threads()? as usize * 4;

let schema = table.schema();
let fields = schema.fields();
for chunk in snapshot.segments.chunks(chunk_size) {
let segments = segments_io
.read_segments::<SegmentInfo>(chunk, false)
.await?;
let column_chunks = merge_io_read_res.columns_chunks()?;
let pages = column_chunks
.get(&column_id)
.unwrap()
.as_raw_data()
.unwrap();
let pages = std::io::Cursor::new(pages);
let page_metas = column_meta.as_native().unwrap().pages.clone();
let reader = NativeReader::new(pages, page_metas, vec![]);
let this_pages_info = stat_simple(reader, arrow_field.clone())?.pages;
pages_info.extend(this_pages_info);
for segment in segments {
let segment = segment?;
for block in segment.blocks.iter() {
for field in fields {
if field.is_nested() {
continue;
}
let column_id = field.column_id;
let arrow_field: Field = field.into();
let column_meta = block.col_metas.get(&column_id).unwrap();
let (offset, len) = column_meta.offset_length();
let ranges = vec![(column_id, offset..(offset + len))];
let read_settings = ReadSettings::from_ctx(&self.ctx)?;
let merge_io_read_res = BlockReader::merge_io_read(
&read_settings,
table.operator.clone(),
&block.location.0,
ranges,
true,
)
.await?;
let column_chunks = merge_io_read_res.columns_chunks()?;
let pages = column_chunks
.get(&column_id)
.unwrap()
.as_raw_data()
.unwrap();
let pages = std::io::Cursor::new(pages);
let page_metas = column_meta.as_native().unwrap().pages.clone();
let reader = NativeReader::new(pages, page_metas, vec![]);
let this_column_info = stat_simple(reader, arrow_field.clone())?;
columns_info.push((field.data_type.sql_name(), this_column_info));
}
}
}
}
info.push((table.name(), columns_info));
}
self.to_block(&pages_info).await
let data_block = self.to_block(&info).await?;
let result = if let Some(filter) = self.filters.as_ref().map(|f| &f.filter) {
let func_ctx = FunctionContext::default();
let evaluator = Evaluator::new(&data_block, &func_ctx, &BUILTIN_FUNCTIONS);
let filter = evaluator
.run(&as_expr(
filter,
&BUILTIN_FUNCTIONS,
&FuseEncoding::schema(),
))?
.try_downcast::<BooleanType>()
.unwrap();
data_block.filter_boolean_value(&filter)?
} else {
data_block
};

Ok(result)
}

#[async_backtrace::framed]
async fn to_block(&self, pages_info: &[PageInfo]) -> Result<DataBlock> {
let num_row = pages_info.len();
let mut validity_size = Vec::with_capacity(pages_info.len());
let mut compressed_size = Vec::with_capacity(pages_info.len());
let mut uncompressed_size = Vec::with_capacity(pages_info.len());
let mut l1 = StringColumnBuilder::with_capacity(pages_info.len(), pages_info.len());
let mut l2 = NullableColumnBuilder::<StringType>::with_capacity(pages_info.len(), &[]);
for p in pages_info {
validity_size.push(p.validity_size);
compressed_size.push(p.compressed_size);
uncompressed_size.push(p.uncompressed_size);
l1.put_slice(encoding_to_string(&p.body).as_bytes());
l1.commit_row();
let l2_encoding = match &p.body {
PageBody::Dict(dict) => Some(encoding_to_string(&dict.indices.body)),
PageBody::Freq(freq) => freq
.exceptions
.as_ref()
.map(|e| encoding_to_string(&e.body)),
_ => None,
};
if let Some(l2_encoding) = l2_encoding {
l2.push(l2_encoding.as_bytes());
} else {
l2.push_null();
async fn to_block(&self, info: &Vec<(&str, Vec<(String, ColumnInfo)>)>) -> Result<DataBlock> {
let mut validity_size = Vec::new();
let mut compressed_size = Vec::new();
let mut uncompressed_size = Vec::new();
let mut l1 = StringColumnBuilder::with_capacity(0, 0);
let mut l2 = NullableColumnBuilder::<StringType>::with_capacity(0, &[]);
let mut table_name = StringColumnBuilder::with_capacity(0, 0);
let mut column_name = StringColumnBuilder::with_capacity(0, 0);
let mut column_type = StringColumnBuilder::with_capacity(0, 0);
let mut all_num_rows = 0;
for (table, columns_info) in info {
for (type_str, column_info) in columns_info {
let pages_info = &column_info.pages;
let num_row = pages_info.len();
all_num_rows += num_row;
validity_size.reserve(num_row);
compressed_size.reserve(num_row);
uncompressed_size.reserve(num_row);
let tmp_table_name = StringColumnBuilder::repeat(table.as_bytes(), num_row);
let tmp_column_name =
StringColumnBuilder::repeat(column_info.field.name.as_bytes(), num_row);
let tmp_column_type = StringColumnBuilder::repeat(type_str.as_bytes(), num_row);
for p in pages_info {
validity_size.push(p.validity_size);
compressed_size.push(p.compressed_size);
uncompressed_size.push(p.uncompressed_size);
l1.put_slice(encoding_to_string(&p.body).as_bytes());
l1.commit_row();
let l2_encoding = match &p.body {
PageBody::Dict(dict) => Some(encoding_to_string(&dict.indices.body)),
PageBody::Freq(freq) => freq
.exceptions
.as_ref()
.map(|e| encoding_to_string(&e.body)),
_ => None,
};
if let Some(l2_encoding) = l2_encoding {
l2.push(l2_encoding.as_bytes());
} else {
l2.push_null();
}
}

table_name.append_column(&tmp_table_name.build());
column_name.append_column(&tmp_column_name.build());
column_type.append_column(&tmp_column_type.build());
}
}

Ok(DataBlock::new(
vec![
BlockEntry::new(
DataType::String,
Value::Column(Column::String(table_name.build())),
),
BlockEntry::new(
DataType::String,
Value::Column(Column::String(column_name.build())),
),
BlockEntry::new(
DataType::String,
Value::Column(Column::String(column_type.build())),
),
BlockEntry::new(
DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt32))),
Value::Column(UInt32Type::from_opt_data(validity_size)),
Expand All @@ -180,12 +240,15 @@ impl<'a> FuseEncoding<'a> {
Value::Column(Column::Nullable(Box::new(l2.build().upcast()))),
),
],
num_row,
all_num_rows,
))
}

pub fn schema() -> Arc<TableSchema> {
TableSchemaRefExt::create(vec![
TableField::new("table_name", TableDataType::String),
TableField::new("column_name", TableDataType::String),
TableField::new("column_type", TableDataType::String),
TableField::new(
"validity_size",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt32))),
Expand Down Expand Up @@ -219,3 +282,66 @@ fn encoding_to_string(page_body: &PageBody) -> String {
PageBody::Common(c) => format!("Common({:?})", c),
}
}

pub fn as_expr(
remote_expr: &RemoteExpr<String>,
fn_registry: &FunctionRegistry,
schema: &Arc<TableSchema>,
) -> Expr {
match remote_expr {
RemoteExpr::Constant {
span,
scalar,
data_type,
} => Expr::Constant {
span: *span,
scalar: scalar.clone(),
data_type: data_type.clone(),
},
RemoteExpr::ColumnRef {
span,
id,
data_type,
display_name,
} => {
let id = schema.index_of(id).unwrap();
Expr::ColumnRef {
span: *span,
id,
data_type: data_type.clone(),
display_name: display_name.clone(),
}
}
RemoteExpr::Cast {
span,
is_try,
expr,
dest_type,
} => Expr::Cast {
span: *span,
is_try: *is_try,
expr: Box::new(as_expr(expr, fn_registry, schema)),
dest_type: dest_type.clone(),
},
RemoteExpr::FunctionCall {
span,
id,
generics,
args,
return_type,
} => {
let function = fn_registry.get(id).expect("function id not found");
Expr::FunctionCall {
span: *span,
id: id.clone(),
function,
generics: generics.clone(),
args: args
.iter()
.map(|arg| as_expr(arg, fn_registry, schema))
.collect(),
return_type: return_type.clone(),
}
}
}
}
Loading

0 comments on commit 2d917ff

Please sign in to comment.