Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(query): add fuzz tests #17100

Merged
merged 18 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/query/ast/src/ast/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,18 @@ impl Display for SetExpr {
write!(f, "({query})")?;
}
SetExpr::SetOperation(set_operation) => {
// Check if the left or right expressions are also SetOperations
let left_needs_parentheses = matches!(set_operation.left.as_ref(), SetExpr::SetOperation(left_op) if left_op.op < set_operation.op);
let right_needs_parentheses = matches!(set_operation.right.as_ref(), SetExpr::SetOperation(right_op) if right_op.op < set_operation.op);

if left_needs_parentheses {
write!(f, "(")?;
}
write!(f, "{}", set_operation.left)?;
if left_needs_parentheses {
write!(f, ")")?;
}

match set_operation.op {
SetOperator::Union => {
write!(f, " UNION ")?;
Expand All @@ -316,7 +327,14 @@ impl Display for SetExpr {
if set_operation.all {
write!(f, "ALL ")?;
}
// Add parentheses if necessary
if right_needs_parentheses {
write!(f, "(")?;
}
write!(f, "{}", set_operation.right)?;
if right_needs_parentheses {
write!(f, ")")?;
}
}
SetExpr::Values { values, .. } => {
write!(f, "VALUES")?;
Expand All @@ -341,6 +359,18 @@ pub enum SetOperator {
Intersect,
}

impl PartialOrd for SetOperator {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
if self == other {
Some(std::cmp::Ordering::Equal)
} else if self == &SetOperator::Intersect {
Some(std::cmp::Ordering::Greater)
} else {
Some(std::cmp::Ordering::Less)
}
}
}

/// `ORDER BY` clause
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct OrderByExpr {
Expand Down
5 changes: 5 additions & 0 deletions src/query/ast/src/parser/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ impl<'a, I: Iterator<Item = WithSpan<'a, SetOperationElement>>> PrattParser<I>

fn query(&mut self, input: &Self::Input) -> Result<Affix, &'static str> {
let affix = match &input.elem {
// https://learn.microsoft.com/en-us/sql/t-sql/language-elements/set-operators-except-and-intersect-transact-sql?view=sql-server-2017
// If EXCEPT or INTERSECT is used together with other operators in an expression, it's evaluated in the context of the following precedence:
// 1. Expressions in parentheses
// 2. The INTERSECT operator
// 3. EXCEPT and UNION evaluated from left to right based on their position in the expression
SetOperationElement::SetOperation { op, .. } => match op {
SetOperator::Union | SetOperator::Except => {
Affix::Infix(Precedence(10), Associativity::Left)
Expand Down
10 changes: 10 additions & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,17 @@ fn test_query() {
r#"select * from t1 except select * from t2"#,
r#"select * from t1 union select * from t2 union select * from t3"#,
r#"select * from t1 union select * from t2 union all select * from t3"#,
r#"select * from (
(SELECT f, g FROM union_fuzz_result1
EXCEPT
SELECT f, g FROM union_fuzz_result2)
UNION ALL
(SELECT f, g FROM union_fuzz_result2
EXCEPT
SELECT f, g FROM union_fuzz_result1)
)"#,
r#"select * from t1 union select * from t2 intersect select * from t3"#,
r#"(select * from t1 union select * from t2) intersect select * from t3"#,
r#"(select * from t1 union select * from t2) union select * from t3"#,
r#"select * from t1 union (select * from t2 union select * from t3)"#,
r#"SELECT * FROM ((SELECT *) EXCEPT (SELECT *)) foo"#,
Expand Down
621 changes: 621 additions & 0 deletions src/query/ast/tests/it/testdata/query.txt

Large diffs are not rendered by default.

68 changes: 50 additions & 18 deletions src/query/expression/src/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,25 @@ pub enum Column {
Geography(GeographyColumn),
}

#[derive(Clone, Debug, PartialEq)]
pub struct RandomOptions {
pub seed: Option<u64>,
pub min_string_len: usize,
pub max_string_len: usize,
pub max_array_len: usize,
}

impl Default for RandomOptions {
fn default() -> Self {
RandomOptions {
seed: None,
min_string_len: 0,
max_string_len: 5,
max_array_len: 3,
}
}
}

#[derive(Clone, EnumAsInner, Debug, PartialEq)]
pub enum ColumnVec {
Null,
Expand Down Expand Up @@ -1196,15 +1215,22 @@ impl Column {
}
}

pub fn random(ty: &DataType, len: usize, seed: Option<u64>) -> Self {
pub fn random(ty: &DataType, len: usize, options: Option<RandomOptions>) -> Self {
use rand::distributions::Alphanumeric;
use rand::rngs::SmallRng;
use rand::Rng;
use rand::SeedableRng;
let mut rng = match seed {
None => SmallRng::from_entropy(),
Some(seed) => SmallRng::seed_from_u64(seed),
let mut rng = match &options {
Some(RandomOptions {
seed: Some(seed), ..
}) => SmallRng::seed_from_u64(*seed),
_ => SmallRng::from_entropy(),
};

let min_string_len = options.as_ref().map(|opt| opt.min_string_len).unwrap_or(0);
let max_string_len = options.as_ref().map(|opt| opt.max_string_len).unwrap_or(5);
let max_arr_len = options.as_ref().map(|opt| opt.max_array_len).unwrap_or(3);

match ty {
DataType::Null => Column::Null { len },
DataType::EmptyArray => Column::EmptyArray { len },
Expand All @@ -1215,13 +1241,16 @@ impl Column {
DataType::Binary => BinaryType::from_data(
(0..len)
.map(|_| {
let rng = match seed {
None => SmallRng::from_entropy(),
Some(seed) => SmallRng::seed_from_u64(seed),
let mut rng = match &options {
Some(RandomOptions {
seed: Some(seed), ..
}) => SmallRng::seed_from_u64(*seed),
_ => SmallRng::from_entropy(),
};
let str_len = rng.gen_range(min_string_len..=max_string_len);
rng.sample_iter(&Alphanumeric)
// randomly generate 5 characters.
.take(5)
.take(str_len)
.map(u8::from)
.collect::<Vec<_>>()
})
Expand All @@ -1230,13 +1259,16 @@ impl Column {
DataType::String => StringType::from_data(
(0..len)
.map(|_| {
let rng = match seed {
None => SmallRng::from_entropy(),
Some(seed) => SmallRng::seed_from_u64(seed),
let mut rng = match &options {
Some(RandomOptions {
seed: Some(seed), ..
}) => SmallRng::seed_from_u64(*seed),
_ => SmallRng::from_entropy(),
};
let str_len = rng.gen_range(min_string_len..=max_string_len);
rng.sample_iter(&Alphanumeric)
// randomly generate 5 characters.
.take(5)
.take(str_len)
.map(char::from)
.collect::<String>()
})
Expand Down Expand Up @@ -1277,19 +1309,19 @@ impl Column {
),
DataType::Interval => unimplemented!(),
DataType::Nullable(ty) => NullableColumn::new_column(
Column::random(ty, len, seed),
Column::random(ty, len, options),
Bitmap::from((0..len).map(|_| rng.gen_bool(0.5)).collect::<Vec<bool>>()),
),
DataType::Array(inner_ty) => {
let mut inner_len = 0;
let mut offsets: Vec<u64> = Vec::with_capacity(len + 1);
offsets.push(0);
for _ in 0..len {
inner_len += rng.gen_range(0..=3);
inner_len += rng.gen_range(0..=max_arr_len) as u64;
offsets.push(inner_len);
}
Column::Array(Box::new(ArrayColumn {
values: Column::random(inner_ty, inner_len as usize, seed),
values: Column::random(inner_ty, inner_len as usize, options),
offsets: offsets.into(),
}))
}
Expand All @@ -1298,11 +1330,11 @@ impl Column {
let mut offsets: Vec<u64> = Vec::with_capacity(len + 1);
offsets.push(0);
for _ in 0..len {
inner_len += rng.gen_range(0..=3);
inner_len += rng.gen_range(0..=max_arr_len) as u64;
offsets.push(inner_len);
}
Column::Map(Box::new(ArrayColumn {
values: Column::random(inner_ty, inner_len as usize, seed),
values: Column::random(inner_ty, inner_len as usize, options),
offsets: offsets.into(),
}))
}
Expand All @@ -1321,7 +1353,7 @@ impl Column {
DataType::Tuple(fields) => {
let fields = fields
.iter()
.map(|ty| Column::random(ty, len, seed))
.map(|ty| Column::random(ty, len, options.clone()))
.collect::<Vec<_>>();
Column::Tuple(fields)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ use databend_storages_common_table_meta::table::OPT_KEY_CONNECTION_NAME;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_ENGINE;
use databend_storages_common_table_meta::table::OPT_KEY_LOCATION;
use databend_storages_common_table_meta::table::OPT_KEY_RANDOM_MAX_ARRAY_LEN;
use databend_storages_common_table_meta::table::OPT_KEY_RANDOM_MAX_STRING_LEN;
use databend_storages_common_table_meta::table::OPT_KEY_RANDOM_MIN_STRING_LEN;
use databend_storages_common_table_meta::table::OPT_KEY_RANDOM_SEED;
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_COMPRESSION;
Expand Down Expand Up @@ -84,6 +87,9 @@ pub static CREATE_RANDOM_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::ne
let mut r = HashSet::new();
r.insert(OPT_KEY_ENGINE);
r.insert(OPT_KEY_RANDOM_SEED);
r.insert(OPT_KEY_RANDOM_MIN_STRING_LEN);
r.insert(OPT_KEY_RANDOM_MAX_STRING_LEN);
r.insert(OPT_KEY_RANDOM_MAX_ARRAY_LEN);
r
});

Expand Down
13 changes: 1 addition & 12 deletions src/query/service/src/schedulers/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sinks::EmptySink;
use databend_common_sql::planner::query_executor::QueryExecutor;
use databend_common_sql::Planner;
use futures_util::TryStreamExt;
Expand Down Expand Up @@ -47,17 +45,8 @@ pub async fn build_query_pipeline(
ignore_result: bool,
) -> Result<PipelineBuildResult> {
let mut build_res = build_query_pipeline_without_render_result_set(ctx, plan).await?;
if matches!(plan, PhysicalPlan::UnionAll { .. }) {
// Union doesn't need to add extra processor to project the result.
// It will be handled in union processor.
if ignore_result {
build_res
.main_pipeline
.add_sink(|input| Ok(ProcessorPtr::create(EmptySink::create(input))))?;
}
return Ok(build_res);
}
let input_schema = plan.output_schema()?;

PipelineBuilder::build_result_projection(
&ctx.get_function_context()?,
input_schema,
Expand Down
27 changes: 16 additions & 11 deletions src/query/sql/src/executor/physical_plans/physical_union_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,11 @@ impl PhysicalPlanBuilder {
let metadata = self.metadata.read().clone();
let lazy_columns = metadata.lazy_columns();
required.extend(lazy_columns.clone());
let indices: Vec<_> = union_all
.left_outputs
.iter()
.enumerate()
.filter_map(|(index, v)| required.contains(&v.0).then_some(index))

let indices: Vec<usize> = (0..union_all.left_outputs.len())
.filter(|index| required.contains(&union_all.output_indexes[*index]))
.collect();

let (left_required, right_required) = if indices.is_empty() {
(
HashSet::from([union_all.left_outputs[0].0]),
Expand Down Expand Up @@ -98,13 +97,19 @@ impl PhysicalPlanBuilder {
let fields = union_all
.left_outputs
.iter()
.filter(|(index, _)| left_required.contains(index))
.map(|(index, expr)| {
if let Some(expr) = expr {
Ok(DataField::new(&index.to_string(), expr.data_type()?))
.enumerate()
.filter(|(_, (index, _))| left_required.contains(index))
.map(|(i, (index, expr))| {
let data_type = if let Some(expr) = expr {
expr.data_type()?
} else {
Ok(left_schema.field_with_name(&index.to_string())?.clone())
}
left_schema
.field_with_name(&index.to_string())?
.data_type()
.clone()
};
let output_index = union_all.output_indexes[i];
Ok(DataField::new(&output_index.to_string(), data_type))
})
.collect::<Result<Vec<_>>>()?;

Expand Down
26 changes: 16 additions & 10 deletions src/query/sql/src/planner/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ impl Binder {
right_context.clone(),
coercion_types,
)?;

if let Some(cte_name) = &cte_name {
for (col, cte_col) in new_bind_context.columns.iter_mut().zip(
new_bind_context
Expand All @@ -275,11 +276,15 @@ impl Binder {
}
}

let output_indexes = new_bind_context.columns.iter().map(|x| x.index).collect();

let union_plan = UnionAll {
left_outputs,
right_outputs,
cte_scan_names,
output_indexes,
};

let mut new_expr = SExpr::create_binary(
Arc::new(union_plan.into()),
Arc::new(left_expr),
Expand Down Expand Up @@ -406,7 +411,7 @@ impl Binder {
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
fn coercion_union_type(
&self,
&mut self,
left_span: Span,
right_span: Span,
left_bind_context: BindContext,
Expand All @@ -420,6 +425,7 @@ impl Binder {
let mut left_outputs = Vec::with_capacity(left_bind_context.columns.len());
let mut right_outputs = Vec::with_capacity(right_bind_context.columns.len());
let mut new_bind_context = BindContext::new();

new_bind_context
.cte_context
.set_cte_context(right_bind_context.cte_context);
Expand Down Expand Up @@ -447,18 +453,10 @@ impl Binder {
left_col.index,
Some(ScalarExpr::CastExpr(left_coercion_expr)),
));
let column_binding = ColumnBindingBuilder::new(
left_col.column_name.clone(),
left_col.index,
Box::new(coercion_types[idx].clone()),
Visibility::Visible,
)
.build();
new_bind_context.add_column_binding(column_binding);
} else {
left_outputs.push((left_col.index, None));
new_bind_context.add_column_binding(left_col.clone());
}

if *right_col.data_type != coercion_types[idx] {
let right_coercion_expr = CastExpr {
span: right_span,
Expand All @@ -479,7 +477,15 @@ impl Binder {
} else {
right_outputs.push((right_col.index, None));
}

let column_binding = self.create_derived_column_binding(
left_col.column_name.clone(),
coercion_types[idx].clone(),
None,
);
new_bind_context.add_column_binding(column_binding);
}

Ok((new_bind_context, left_outputs, right_outputs))
}

Expand Down
Loading
Loading