Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): Add struct scan and simply optimise struct filter. #19425

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ message ScanRange {
Bound lower_bound = 2;
// The upper bound of the next PK column subsequent to those in `eq_conds`.
Bound upper_bound = 3;

bool is_real_unbounded = 4;
}

message SourceNode {
Expand Down
80 changes: 50 additions & 30 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@ pub struct RowSeqScanExecutor<S: StateStore> {
}

/// Range for batch scan.
#[derive(Debug)]
pub struct ScanRange {
/// The prefix of the primary key.
pub pk_prefix: OwnedRow,

/// The range bounds of the next column.
pub next_col_bounds: (Bound<Datum>, Bound<Datum>),

pub is_real_unbounded: bool,
}
xxhZs marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -144,6 +147,7 @@ impl ScanRange {
Ok(Self {
pk_prefix,
next_col_bounds,
is_real_unbounded: scan_range.is_real_unbounded,
})
}

Expand All @@ -152,6 +156,7 @@ impl ScanRange {
Self {
pk_prefix: OwnedRow::default(),
next_col_bounds: (Bound::Unbounded, Bound::Unbounded),
is_real_unbounded: false,
}
}
}
Expand Down Expand Up @@ -420,11 +425,16 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
histogram: Option<impl Deref<Target = Histogram>>,
) {
let ScanRange {
pk_prefix,
mut pk_prefix,
next_col_bounds,
is_real_unbounded,
} = scan_range;

let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()];
let order_type = if !is_real_unbounded {
table.pk_serializer().get_order_types()[pk_prefix.len()]
} else {
table.pk_serializer().get_order_types()[0]
};
let (start_bound, end_bound) = if order_type.is_ascending() {
(next_col_bounds.0, next_col_bounds.1)
} else {
Expand All @@ -434,40 +444,50 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded);
let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded);

let build_bound = |other_bound_is_bounded: bool, bound| {
match bound {
Bound::Unbounded => {
if other_bound_is_bounded && order_type.nulls_are_first() {
// `NULL`s are at the start bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
}
}
Bound::Included(x) => {
if is_real_unbounded {
let mut rows = pk_prefix.clone().into_inner().to_vec();
rows.push(x);
Bound::Included(OwnedRow::new(rows))
} else {
Bound::Included(OwnedRow::new(vec![x]))
}
}
Bound::Excluded(x) => {
if is_real_unbounded {
let mut rows = pk_prefix.clone().into_inner().to_vec();
rows.push(x);
Bound::Excluded(OwnedRow::new(rows))
} else {
Bound::Excluded(OwnedRow::new(vec![x]))
}
}
}
};
let start_bound = build_bound(end_bound_is_bounded, start_bound);
let end_bound = build_bound(start_bound_is_bounded, end_bound);
if is_real_unbounded {
pk_prefix = OwnedRow::empty();
}

// Range Scan.
assert!(pk_prefix.len() < table.pk_indices().len());
let iter = table
.batch_chunk_iter_with_pk_bounds(
epoch.into(),
&pk_prefix,
(
match start_bound {
Bound::Unbounded => {
if end_bound_is_bounded && order_type.nulls_are_first() {
// `NULL`s are at the start bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
}
}
Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])),
Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])),
},
match end_bound {
Bound::Unbounded => {
if start_bound_is_bounded && order_type.nulls_are_last() {
// `NULL`s are at the end bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
}
}
Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])),
Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])),
},
),
(start_bound, end_bound),
ordered,
chunk_size,
PrefetchOptions::new(limit.is_none(), true),
Expand Down
12 changes: 12 additions & 0 deletions src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::util::value_encoding::serialize_datum_into;
pub struct ScanRange {
pub eq_conds: Vec<Datum>,
pub range: (Bound<ScalarImpl>, Bound<ScalarImpl>),
pub is_real_unbounded: bool,
}

fn bound_to_proto(bound: &Bound<ScalarImpl>) -> Option<PbBound> {
Expand Down Expand Up @@ -59,6 +60,7 @@ impl ScanRange {
.collect(),
lower_bound: bound_to_proto(&self.range.0),
upper_bound: bound_to_proto(&self.range.1),
is_real_unbounded: self.is_real_unbounded,
}
}

Expand All @@ -80,6 +82,15 @@ impl ScanRange {
Self {
eq_conds: vec![],
range: full_range(),
is_real_unbounded: false,
}
}

pub const fn full_table_scan_real_unbounded() -> Self {
Self {
eq_conds: vec![],
range: full_range(),
is_real_unbounded: true,
}
}

Expand Down Expand Up @@ -128,6 +139,7 @@ macro_rules! impl_split_small_range {
.map(|i| ScanRange {
eq_conds: vec![Some(ScalarImpl::$type_name(i))],
range: full_range(),
is_real_unbounded: false,
})
.collect(),
);
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/expr/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ impl Literal {
&self.data
}

pub fn get_data_type(&self) -> &Option<DataType> {
&self.data_type
}

pub fn is_untyped(&self) -> bool {
self.data_type.is_none()
}
Expand Down
70 changes: 6 additions & 64 deletions src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound;

use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::types::ScalarImpl;
use risingwave_common::util::scan_range::{is_full_range, ScanRange};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::RowSeqScanNode;
use risingwave_sqlparser::ast::AsOf;

use super::batch::prelude::*;
use super::utils::{childless_record, to_pb_time_travel_as_of, Distill};
use super::utils::{childless_record, scan_ranges_as_strs, to_pb_time_travel_as_of, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, ToDistributedBatch};
use crate::catalog::ColumnId;
use crate::error::Result;
Expand Down Expand Up @@ -135,71 +131,13 @@ impl BatchSeqScan {
&self.scan_ranges
}

fn scan_ranges_as_strs(&self, verbose: bool) -> Vec<String> {
let order_names = match verbose {
true => self.core.order_names_with_table_prefix(),
false => self.core.order_names(),
};
let mut range_strs = vec![];

let explain_max_range = 20;
for scan_range in self.scan_ranges.iter().take(explain_max_range) {
#[expect(clippy::disallowed_methods)]
let mut range_str = scan_range
.eq_conds
.iter()
.zip(order_names.iter())
.map(|(v, name)| match v {
Some(v) => format!("{} = {:?}", name, v),
None => format!("{} IS NULL", name),
})
.collect_vec();
if !is_full_range(&scan_range.range) {
let i = scan_range.eq_conds.len();
range_str.push(range_to_string(&order_names[i], &scan_range.range))
}
range_strs.push(range_str.join(" AND "));
}
if self.scan_ranges.len() > explain_max_range {
range_strs.push("...".to_string());
}
range_strs
}

pub fn limit(&self) -> &Option<u64> {
&self.limit
}
}

impl_plan_tree_node_for_leaf! { BatchSeqScan }

fn lb_to_string(name: &str, lb: &Bound<ScalarImpl>) -> String {
let (op, v) = match lb {
Bound::Included(v) => (">=", v),
Bound::Excluded(v) => (">", v),
Bound::Unbounded => unreachable!(),
};
format!("{} {} {:?}", name, op, v)
}
fn ub_to_string(name: &str, ub: &Bound<ScalarImpl>) -> String {
let (op, v) = match ub {
Bound::Included(v) => ("<=", v),
Bound::Excluded(v) => ("<", v),
Bound::Unbounded => unreachable!(),
};
format!("{} {} {:?}", name, op, v)
}
fn range_to_string(name: &str, range: &(Bound<ScalarImpl>, Bound<ScalarImpl>)) -> String {
match (&range.0, &range.1) {
(Bound::Unbounded, Bound::Unbounded) => unreachable!(),
(Bound::Unbounded, ub) => ub_to_string(name, ub),
(lb, Bound::Unbounded) => lb_to_string(name, lb),
(lb, ub) => {
format!("{} AND {}", lb_to_string(name, lb), ub_to_string(name, ub))
}
}
}

impl Distill for BatchSeqScan {
fn distill<'a>(&self) -> XmlNode<'a> {
let verbose = self.base.ctx().is_explain_verbose();
Expand All @@ -208,7 +146,11 @@ impl Distill for BatchSeqScan {
vec.push(("columns", self.core.columns_pretty(verbose)));

if !self.scan_ranges.is_empty() {
let range_strs = self.scan_ranges_as_strs(verbose);
let order_names = match verbose {
true => self.core.order_names_with_table_prefix(),
false => self.core.order_names(),
};
let range_strs = scan_ranges_as_strs(order_names, &self.scan_ranges);
vec.push((
"scan_ranges",
Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()),
Expand Down
70 changes: 6 additions & 64 deletions src/frontend/src/optimizer/plan_node/batch_sys_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound;

use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::types::ScalarImpl;
use risingwave_common::util::scan_range::{is_full_range, ScanRange};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SysRowSeqScanNode;
use risingwave_pb::plan_common::PbColumnDesc;

use super::batch::prelude::*;
use super::utils::{childless_record, Distill};
use super::utils::{childless_record, scan_ranges_as_strs, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch};
use crate::error::Result;
use crate::expr::{ExprRewriter, ExprVisitor};
Expand Down Expand Up @@ -91,68 +87,10 @@ impl BatchSysSeqScan {
pub fn scan_ranges(&self) -> &[ScanRange] {
&self.scan_ranges
}

fn scan_ranges_as_strs(&self, verbose: bool) -> Vec<String> {
let order_names = match verbose {
true => self.core.order_names_with_table_prefix(),
false => self.core.order_names(),
};
let mut range_strs = vec![];

let explain_max_range = 20;
for scan_range in self.scan_ranges.iter().take(explain_max_range) {
#[expect(clippy::disallowed_methods)]
let mut range_str = scan_range
.eq_conds
.iter()
.zip(order_names.iter())
.map(|(v, name)| match v {
Some(v) => format!("{} = {:?}", name, v),
None => format!("{} IS NULL", name),
})
.collect_vec();
if !is_full_range(&scan_range.range) {
let i = scan_range.eq_conds.len();
range_str.push(range_to_string(&order_names[i], &scan_range.range))
}
range_strs.push(range_str.join(" AND "));
}
if self.scan_ranges.len() > explain_max_range {
range_strs.push("...".to_string());
}
range_strs
}
}

impl_plan_tree_node_for_leaf! { BatchSysSeqScan }

fn lb_to_string(name: &str, lb: &Bound<ScalarImpl>) -> String {
let (op, v) = match lb {
Bound::Included(v) => (">=", v),
Bound::Excluded(v) => (">", v),
Bound::Unbounded => unreachable!(),
};
format!("{} {} {:?}", name, op, v)
}
fn ub_to_string(name: &str, ub: &Bound<ScalarImpl>) -> String {
let (op, v) = match ub {
Bound::Included(v) => ("<=", v),
Bound::Excluded(v) => ("<", v),
Bound::Unbounded => unreachable!(),
};
format!("{} {} {:?}", name, op, v)
}
fn range_to_string(name: &str, range: &(Bound<ScalarImpl>, Bound<ScalarImpl>)) -> String {
match (&range.0, &range.1) {
(Bound::Unbounded, Bound::Unbounded) => unreachable!(),
(Bound::Unbounded, ub) => ub_to_string(name, ub),
(lb, Bound::Unbounded) => lb_to_string(name, lb),
(lb, ub) => {
format!("{} AND {}", lb_to_string(name, lb), ub_to_string(name, ub))
}
}
}

impl Distill for BatchSysSeqScan {
fn distill<'a>(&self) -> XmlNode<'a> {
let verbose = self.base.ctx().is_explain_verbose();
Expand All @@ -161,7 +99,11 @@ impl Distill for BatchSysSeqScan {
vec.push(("columns", self.core.columns_pretty(verbose)));

if !self.scan_ranges.is_empty() {
let range_strs = self.scan_ranges_as_strs(verbose);
let order_names = match verbose {
true => self.core.order_names_with_table_prefix(),
false => self.core.order_names(),
};
let range_strs = scan_ranges_as_strs(order_names, &self.scan_ranges);
vec.push((
"scan_ranges",
Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()),
Expand Down
Loading
Loading