Skip to content

Commit

Permalink
fix: Literal in ORDER BY window definition should not be an ordinal…
Browse files Browse the repository at this point in the history
… referring to relation column (apache#8419)

* fix: RANGE frame can be regularized to ROWS frame only if empty ORDER BY clause

* Fix flaky test

* Update test comment

* Add code comment

* Update

* fix: Literal in window definition should not refer to relation column

* Remove unused import

* Update datafusion/sql/src/expr/function.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Add code comment

* Fix format

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
viirya and alamb authored Dec 6, 2023
1 parent 107791a commit 439339a
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 22 deletions.
8 changes: 2 additions & 6 deletions datafusion/physical-expr/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::PhysicalExpr;
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_common::Result;
use datafusion_expr::ColumnarValue;

/// Represents Sort operation for a column in a RecordBatch
Expand Down Expand Up @@ -65,11 +65,7 @@ impl PhysicalSortExpr {
let value_to_sort = self.expr.evaluate(batch)?;
let array_to_sort = match value_to_sort {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => {
return exec_err!(
"Sort operation is not applicable to scalar value {scalar}"
);
}
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(batch.num_rows())?,
};
Ok(SortColumn {
values: array_to_sort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use datafusion_common::utils::{
evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices,
get_record_batch_at_indices, get_row_at_idx,
};
use datafusion_common::{exec_err, plan_err, DataFusionError, Result};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::window_state::{PartitionBatchState, WindowAggState};
use datafusion_expr::ColumnarValue;
Expand Down Expand Up @@ -585,7 +585,7 @@ impl LinearSearch {
.map(|item| match item.evaluate(record_batch)? {
ColumnarValue::Array(array) => Ok(array),
ColumnarValue::Scalar(scalar) => {
plan_err!("Sort operation is not applicable to scalar value {scalar}")
scalar.to_array_of_size(record_batch.num_rows())
}
})
.collect()
Expand Down
11 changes: 8 additions & 3 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
.collect::<Result<Vec<_>>>()?;
let order_by =
self.order_by_to_sort_expr(&window.order_by, schema, planner_context)?;
let order_by = self.order_by_to_sort_expr(
&window.order_by,
schema,
planner_context,
// Numeric literals in window function ORDER BY are treated as constants
false,
)?;
let window_frame = window
.window_frame
.as_ref()
Expand Down Expand Up @@ -143,7 +148,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// next, aggregate built-ins
if let Ok(fun) = AggregateFunction::from_str(&name) {
let order_by =
self.order_by_to_sort_expr(&order_by, schema, planner_context)?;
self.order_by_to_sort_expr(&order_by, schema, planner_context, true)?;
let order_by = (!order_by.is_empty()).then_some(order_by);
let args = self.function_args_to_expr(args, schema, planner_context)?;
let filter: Option<Box<Expr>> = filter
Expand Down
7 changes: 6 additions & 1 deletion datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} = array_agg;

let order_by = if let Some(order_by) = order_by {
Some(self.order_by_to_sort_expr(&order_by, input_schema, planner_context)?)
Some(self.order_by_to_sort_expr(
&order_by,
input_schema,
planner_context,
true,
)?)
} else {
None
};
Expand Down
9 changes: 7 additions & 2 deletions datafusion/sql/src/expr/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ use datafusion_expr::Expr;
use sqlparser::ast::{Expr as SQLExpr, OrderByExpr, Value};

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
/// convert sql [OrderByExpr] to `Vec<Expr>`
/// Convert sql [OrderByExpr] to `Vec<Expr>`.
///
/// If `literal_to_column` is true, treat any numeric literals (e.g. `2`) as a 1 based index
/// into the SELECT list (e.g. `SELECT a, b FROM table ORDER BY 2`).
/// If false, interpret numeric literals as constant values.
pub(crate) fn order_by_to_sort_expr(
&self,
exprs: &[OrderByExpr],
schema: &DFSchema,
planner_context: &mut PlannerContext,
literal_to_column: bool,
) -> Result<Vec<Expr>> {
let mut expr_vec = vec![];
for e in exprs {
Expand All @@ -40,7 +45,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} = e;

let expr = match expr {
SQLExpr::Value(Value::Number(v, _)) => {
SQLExpr::Value(Value::Number(v, _)) if literal_to_column => {
let field_index = v
.parse::<usize>()
.map_err(|err| plan_datafusion_err!("{}", err))?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}

let order_by_rex =
self.order_by_to_sort_expr(&order_by, plan.schema(), planner_context)?;
self.order_by_to_sort_expr(&order_by, plan.schema(), planner_context, true)?;

if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
// In case of `DISTINCT ON` we must capture the sort expressions since during the plan
Expand Down
3 changes: 2 additions & 1 deletion datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut all_results = vec![];
for expr in order_exprs {
// Convert each OrderByExpr to a SortExpr:
let expr_vec = self.order_by_to_sort_expr(&expr, schema, planner_context)?;
let expr_vec =
self.order_by_to_sort_expr(&expr, schema, planner_context, true)?;
// Verify that columns of all SortExprs exist in the schema:
for expr in expr_vec.iter() {
for column in expr.to_columns()?.iter() {
Expand Down
12 changes: 6 additions & 6 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3778,10 +3778,10 @@ query error DataFusion error: Arrow error: Invalid argument error: must either s
select rank() over (RANGE between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk
from (select 1 a union select 2 a) q;

# TODO: this is different to Postgres which returns [1, 1] for `rnk`.
query I
select rank() over (order by 1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk
from (select 1 a union select 2 a) q ORDER BY rnk
query II
select a,
rank() over (order by 1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk
from (select 1 a union select 2 a) q ORDER BY a
----
1
2
1 1
2 1

0 comments on commit 439339a

Please sign in to comment.