Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): improve merge into #16736

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
//! - Intermediate data generated by query could be stored by temporary operator.

#![allow(clippy::uninlined_format_args)]
#![feature(let_chains)]

mod config;
pub use config::ShareTableConfig;
Expand Down
197 changes: 197 additions & 0 deletions src/common/storage/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use databend_common_base::base::OrderedFloat;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::number::NumberScalar;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::Scalar;

pub type F64 = OrderedFloat<f64>;
Expand Down Expand Up @@ -55,6 +57,123 @@ impl Datum {
}
}

pub fn to_scalar(&self, data_type: &DataType) -> Result<Option<Scalar>> {
let scalar = match self {
Datum::Bool(v) => Some(Scalar::Boolean(*v)),
Datum::Int(v) => match data_type {
DataType::Number(NumberDataType::Int8) => {
Some(Scalar::Number(NumberScalar::Int8(*v as i8)))
}
DataType::Number(NumberDataType::Int16) => {
Some(Scalar::Number(NumberScalar::Int16(*v as i16)))
}
DataType::Number(NumberDataType::Int32) => {
Some(Scalar::Number(NumberScalar::Int32(*v as i32)))
}
DataType::Number(NumberDataType::Int64) => {
Some(Scalar::Number(NumberScalar::Int64(*v)))
}
DataType::Number(NumberDataType::UInt8) => {
if *v > 0 {
Some(Scalar::Number(NumberScalar::UInt8(*v as u8)))
} else {
None
}
}
DataType::Number(NumberDataType::UInt16) => {
if *v > 0 {
Some(Scalar::Number(NumberScalar::UInt16(*v as u16)))
} else {
None
}
}
DataType::Number(NumberDataType::UInt32) => {
if *v > 0 {
Some(Scalar::Number(NumberScalar::UInt32(*v as u32)))
} else {
None
}
}
DataType::Number(NumberDataType::UInt64) => {
if *v > 0 {
Some(Scalar::Number(NumberScalar::UInt64(*v as u64)))
} else {
None
}
}
_ => None,
},
Datum::UInt(v) => match data_type {
DataType::Number(NumberDataType::Int8) => {
if *v <= i8::MAX as u64 {
Some(Scalar::Number(NumberScalar::Int8(*v as i8)))
} else {
None
}
}
DataType::Number(NumberDataType::Int16) => {
if *v <= i16::MAX as u64 {
Some(Scalar::Number(NumberScalar::Int16(*v as i16)))
} else {
None
}
}
DataType::Number(NumberDataType::Int32) => {
if *v <= i32::MAX as u64 {
Some(Scalar::Number(NumberScalar::Int32(*v as i32)))
} else {
None
}
}
DataType::Number(NumberDataType::Int64) => {
if *v <= i64::MAX as u64 {
Some(Scalar::Number(NumberScalar::Int64(*v as i64)))
} else {
None
}
}
DataType::Number(NumberDataType::UInt8) => {
Some(Scalar::Number(NumberScalar::UInt8(*v as u8)))
}
DataType::Number(NumberDataType::UInt16) => {
Some(Scalar::Number(NumberScalar::UInt16(*v as u16)))
}
DataType::Number(NumberDataType::UInt32) => {
Some(Scalar::Number(NumberScalar::UInt32(*v as u32)))
}
DataType::Number(NumberDataType::UInt64) => {
Some(Scalar::Number(NumberScalar::UInt64(*v)))
}
_ => None,
},
Datum::Float(v) => match data_type {
DataType::Number(NumberDataType::Float32) => {
if v.into_inner() <= f32::MAX as f64 {
Some(Scalar::Number(NumberScalar::Float32(OrderedFloat::from(
v.into_inner() as f32,
))))
} else {
None
}
}
DataType::Number(NumberDataType::Float64) => {
Some(Scalar::Number(NumberScalar::Float64(*v)))
}
_ => None,
},
Datum::Bytes(v) => match data_type {
DataType::String => {
let s = String::from_utf8(v.clone())?;
Some(Scalar::String(s))
}
DataType::Binary => Some(Scalar::Binary(v.clone())),
_ => None,
},
};

Ok(scalar)
}

pub fn is_bytes(&self) -> bool {
matches!(self, Datum::Bytes(_))
}
Expand Down Expand Up @@ -109,6 +228,84 @@ impl Datum {
_ => None,
}
}

pub fn sub(x: &Datum, y: &Datum) -> Option<Datum> {
match (x, y) {
(Datum::Int(x), Datum::Int(y)) => Some(Datum::Int(x - y)),
(Datum::UInt(x), Datum::UInt(y)) => Some(Datum::UInt(x - y)),
(Datum::Float(x), Datum::Float(y)) => {
Some(Datum::Float(F64::from(x.into_inner() - y.into_inner())))
}
_ => None,
}
}

pub fn add(&self, other: &Datum) -> Option<Datum> {
match (self, other) {
(Datum::Int(x), Datum::Int(y)) => Some(Datum::Int(x + y)),
(Datum::UInt(x), Datum::UInt(y)) => Some(Datum::UInt(x + y)),
(Datum::Float(x), Datum::Float(y)) => {
Some(Datum::Float(F64::from(x.into_inner() + y.into_inner())))
}
_ => None,
}
}

pub fn div(x: &Datum, y: &Datum) -> Option<Datum> {
match (x, y) {
(Datum::Int(x), Datum::Int(y)) => {
if *y == 0 {
return None;
}
Some(Datum::Int(x / y))
}
(Datum::UInt(x), Datum::UInt(y)) => {
if *y == 0 {
return None;
}
Some(Datum::UInt(x / y))
}
(Datum::Float(x), Datum::Float(y)) => {
if y.into_inner() == 0.0 {
return None;
}
Some(Datum::Float(F64::from(x.into_inner() / y.into_inner())))
}
_ => None,
}
}

pub fn build_range_info(
min: Datum,
max: Datum,
num_segments: usize,
data_type: &DataType,
) -> Result<Option<Vec<(Scalar, Scalar)>>> {
let mut result = Vec::with_capacity(num_segments);
let num_segments_datum = match min {
Datum::Int(_) => Datum::Int(num_segments as i64),
Datum::UInt(_) => Datum::UInt(num_segments as u64),
Datum::Float(_) => Datum::Float(OrderedFloat::from(num_segments as f64)),
_ => return Ok(None),
};
if let Some(range) = Self::sub(&max, &min)
&& let Some(step) = Self::div(&range, &num_segments_datum)
{
let mut start = min;
for _ in 0..num_segments {
let end = Self::add(&start, &step).unwrap();
if let Some(start) = start.to_scalar(data_type)?
&& let Some(end) = end.to_scalar(data_type)?
{
result.push((start, end));
} else {
return Ok(None);
}
start = end;
}
}
Ok(Some(result))
}
}

impl Display for Datum {
Expand Down
71 changes: 51 additions & 20 deletions src/query/catalog/src/runtime_filter_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,54 +12,85 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use databend_common_expression::Expr;
use xorf::BinaryFuse16;

#[derive(Clone, Debug, Default)]
pub struct RuntimeFilterInfo {
inlist: Vec<Expr<String>>,
min_max: Vec<Expr<String>>,
bloom: Vec<(String, BinaryFuse16)>,
min_max: Vec<(String, Expr<String>)>,
inlist: Vec<(String, Expr<String>)>,
bloom: Vec<(String, Arc<BinaryFuse16>)>,
}

impl RuntimeFilterInfo {
pub fn add_inlist(&mut self, expr: Expr<String>) {
self.inlist.push(expr);
pub fn add_min_max(&mut self, filter: (String, Expr<String>)) {
self.min_max.push(filter);
}

pub fn add_bloom(&mut self, bloom: (String, BinaryFuse16)) {
self.bloom.push(bloom);
pub fn add_inlist(&mut self, filter: (String, Expr<String>)) {
self.inlist.push(filter);
}

pub fn add_min_max(&mut self, expr: Expr<String>) {
self.min_max.push(expr);
pub fn add_bloom(&mut self, filter: (String, Arc<BinaryFuse16>)) {
self.bloom.push(filter);
}

pub fn get_inlist(&self) -> &Vec<Expr<String>> {
&self.inlist
pub fn get_min_max(&self) -> &Vec<(String, Expr<String>)> {
&self.min_max
}

pub fn get_bloom(&self) -> &Vec<(String, BinaryFuse16)> {
&self.bloom
pub fn get_inlist(&self) -> &Vec<(String, Expr<String>)> {
&self.inlist
}

pub fn get_min_max(&self) -> &Vec<Expr<String>> {
&self.min_max
pub fn get_bloom(&self) -> &Vec<(String, Arc<BinaryFuse16>)> {
&self.bloom
}

pub fn blooms(self) -> Vec<(String, BinaryFuse16)> {
self.bloom
pub fn min_maxs(self) -> Vec<(String, Expr<String>)> {
self.min_max
}

pub fn inlists(self) -> Vec<Expr<String>> {
pub fn inlists(self) -> Vec<(String, Expr<String>)> {
self.inlist
}

pub fn min_maxs(self) -> Vec<Expr<String>> {
self.min_max
pub fn blooms(self) -> Vec<(String, Arc<BinaryFuse16>)> {
self.bloom
}

pub fn is_empty(&self) -> bool {
self.inlist.is_empty() && self.bloom.is_empty() && self.min_max.is_empty()
}
}

#[derive(Debug, Default)]
pub struct HashJoinProbeStatistics {
// Statistics for runtime filter, the `num_rows` indicates the number of valid rows in probe side.
// the `num_hash_matched_rows` indicates the number of keys which matched by hash.
pub num_rows: AtomicU64,
pub num_hash_matched_rows: AtomicU64,
}

impl HashJoinProbeStatistics {
pub fn increment_num_rows(&self, num_rows: u64) {
self.num_rows.fetch_add(num_rows, Ordering::AcqRel);
}

pub fn increment_num_hash_matched_rows(&self, num_hash_matched_rows: u64) {
self.num_hash_matched_rows
.fetch_add(num_hash_matched_rows, Ordering::AcqRel);
}

// Check whether to use runtime filter in table scan.
pub fn prefer_runtime_filter(&self) -> bool {
// If the number of valid rows in probe side is less than 1/2 of the number
// of rows which matched by hash, we prefer to use runtime filter.
self.num_hash_matched_rows.load(Ordering::Acquire) * 2
< self.num_rows.load(Ordering::Acquire)
}
}
19 changes: 16 additions & 3 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use crate::plan::DataSourcePlan;
use crate::plan::PartInfoPtr;
use crate::plan::Partitions;
use crate::query_kind::QueryKind;
use crate::runtime_filter_info::HashJoinProbeStatistics;
use crate::runtime_filter_info::RuntimeFilterInfo;
use crate::statistics::data_cache_statistics::DataCacheMetrics;
use crate::table::Table;
Expand Down Expand Up @@ -322,17 +323,29 @@ pub trait TableContext: Send + Sync {

fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo));

fn set_runtime_filter_columns(&self, table_index: usize, columns: Vec<(usize, String)>);

fn get_runtime_filter_columns(&self, table_index: usize) -> Vec<(usize, String)>;

fn set_hash_join_probe_statistics(
&self,
join_id: usize,
statistics: Arc<HashJoinProbeStatistics>,
);

fn get_hash_join_probe_statistics(&self, join_id: usize) -> Arc<HashJoinProbeStatistics>;

fn clear_runtime_filter(&self);

fn set_merge_into_join(&self, join: MergeIntoJoin);

fn get_merge_into_join(&self) -> MergeIntoJoin;

fn get_bloom_runtime_filter_with_id(&self, id: usize) -> Vec<(String, BinaryFuse16)>;
fn get_bloom_runtime_filter_with_id(&self, id: usize) -> Vec<(String, Arc<BinaryFuse16>)>;

fn get_inlist_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;
fn get_inlist_runtime_filter_with_id(&self, id: usize) -> Vec<(String, Expr<String>)>;

fn get_min_max_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;
fn get_min_max_runtime_filter_with_id(&self, id: usize) -> Vec<(String, Expr<String>)>;

fn has_bloom_runtime_filters(&self, id: usize) -> bool;
fn txn_mgr(&self) -> TxnManagerRef;
Expand Down
5 changes: 5 additions & 0 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,11 @@ impl DataBlock {
})
}

#[inline]
pub fn into_columns(self) -> Vec<BlockEntry> {
self.columns
}

#[inline]
pub fn add_meta(self, meta: Option<BlockMetaInfoPtr>) -> Result<Self> {
if self.meta.is_some() {
Expand Down
Loading
Loading