Skip to content

Commit

Permalink
feat(sql): WINDOW without ORDER BY
Browse files Browse the repository at this point in the history
Rules:
- ALLOWED for ROWS-type WINDOW
- NOT ALLOWED for RANGE-type WINDOW with offset PRECEDING/FOLLOWING
- NOT ALLOWED for WINDOW with attribute EXCLUDE CURRENT_TIME

Without ORDER BY, rows are processed in an unspecified order.
  • Loading branch information
aceforeverd committed Oct 17, 2023
1 parent 024a0d0 commit b30b42d
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 43 deletions.
32 changes: 30 additions & 2 deletions cases/function/window/error_window.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ debugs: []
version: 0.5.0
cases:
- id: 0
desc: no order by
desc: RANGE-type WINDOW with offset PRECEDING/FOLLOWING requires ORDER BY
inputs:
- columns: [ "id int","c1 string","c3 int","c4 bigint","c5 float","c6 double","c7 timestamp","c8 date" ]
indexs: [ "index1:c8:c4" ]
rows:
- [1,"aa",20,30,1.1,2.1,1590738990000,"2020-05-01"]
sql: |
SELECT id, c1, c4, count(c4) OVER w1 as w1_c4_count FROM {0} WINDOW w1 AS (PARTITION BY {0}.c8 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);
SELECT id, c1, c4, count(c4) OVER w1 as w1_c4_count FROM {0}
WINDOW w1 AS (PARTITION BY {0}.c8 ROWS_RANGE BETWEEN 2 PRECEDING AND CURRENT ROW);
expect:
msg: RANGE/ROWS_RANGE-type FRAME with offset PRECEDING/FOLLOWING requires exactly one ORDER BY column
success: false
- id: 1
desc: no partition by
Expand Down Expand Up @@ -301,3 +303,29 @@ cases:
SELECT id, c1, c3, sum(c4) OVER w1 as w1_c4_sum FROM {0} WINDOW w1 AS (PARTITION BY {0}.c33 ORDER BY {0}.c7 ROWS_RANGE BETWEEN 2s PRECEDING AND CURRENT ROW);
expect:
success: false
- id: 17
desc: ROWS WINDOW + EXCLUDE CURRENT_TIME requires order by
inputs:
- columns: [ "id int","c1 string","c3 int","c4 bigint","c5 float","c6 double","c7 timestamp","c8 date" ]
indexs: [ "index1:c8:c4" ]
rows:
- [1,"aa",20,30,1.1,2.1,1590738990000,"2020-05-01"]
sql: |
SELECT id, c1, c4, count(c4) OVER w1 as w1_c4_count FROM {0}
WINDOW w1 AS (PARTITION BY {0}.c8 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW EXCLUDE CURRENT_TIME);
expect:
msg: WINDOW with EXCLUDE CURRENT_TIME requires exactly one ORDER BY column
success: false
- id: 18
desc: RANGE WINDOW + EXCLUDE CURRENT_TIME requires order by
inputs:
- columns: [ "id int","c1 string","c3 int","c4 bigint","c5 float","c6 double","c7 timestamp","c8 date" ]
indexs: [ "index1:c8:c4" ]
rows:
- [1,"aa",20,30,1.1,2.1,1590738990000,"2020-05-01"]
sql: |
SELECT id, c1, c4, count(c4) OVER w1 as w1_c4_count FROM {0}
WINDOW w1 AS (PARTITION BY {0}.c8 ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW EXCLUDE CURRENT_TIME);
expect:
msg: WINDOW with EXCLUDE CURRENT_TIME requires exactly one ORDER BY column
success: false
231 changes: 231 additions & 0 deletions cases/query/window_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -901,3 +901,234 @@ cases:
200, 1, 1
300, 0, 0
400, 2, 2
# ======================================================================
# WINDOW without ORDER BY
# ======================================================================
- id: 24
desc: ROWS WINDOW WITHOUT ORDER BY
mode: batch-unsupport
inputs:
- name: t1
columns:
- id int
- gp int
- ts timestamp
indexs:
- idx:gp:ts
data: |
1, 100, 20000
2, 100, 10000
3, 400, 20000
4, 400, 10000
5, 400, 15000
6, 400, 40000
sql: |
select id, count(ts) over w as agg
from t1
window w as (
partition by gp
rows between 2 open preceding and current row
)
request_plan: |
PROJECT(type=Aggregation)
REQUEST_UNION(partition_keys=(), orders=, rows=(, 2 OPEN PRECEDING, 0 CURRENT), index_keys=(gp))
DATA_PROVIDER(request=t1)
DATA_PROVIDER(type=Partition, table=t1, index=idx)
cluster_request_plan: |
SIMPLE_PROJECT(sources=(id, agg))
REQUEST_JOIN(type=kJoinTypeConcat)
SIMPLE_PROJECT(sources=(id))
DATA_PROVIDER(request=t1)
PROJECT(type=Aggregation)
REQUEST_UNION(partition_keys=(), orders=, rows=(, 2 OPEN PRECEDING, 0 CURRENT), index_keys=(gp))
DATA_PROVIDER(request=t1)
DATA_PROVIDER(type=Partition, table=t1, index=idx)
expect:
columns: ["id int", "agg int64"]
order: id
data: |
1, 1
2, 2
3, 1
4, 2
5, 2
6, 2
- id: 25
desc: RANGE WINDOW WITHOUT ORDER BY
mode: batch-unsupport
inputs:
- name: t1
columns:
- id int
- gp int
- ts timestamp
indexs:
- idx:gp:ts
data: |
1, 100, 20000
2, 100, 10000
3, 400, 20000
4, 400, 10
5, 400, 15000
sql: |
select id, count(ts) over w as agg
from t1
window w as (
partition by gp
rows_range between unbounded preceding and current row
)
request_plan: |
PROJECT(type=Aggregation)
REQUEST_UNION(partition_keys=(), orders=, range=(, 0 PRECEDING UNBOUND, 0 CURRENT), index_keys=(gp))
DATA_PROVIDER(request=t1)
DATA_PROVIDER(type=Partition, table=t1, index=idx)
cluster_request_plan: |
SIMPLE_PROJECT(sources=(id, agg))
REQUEST_JOIN(type=kJoinTypeConcat)
SIMPLE_PROJECT(sources=(id))
DATA_PROVIDER(request=t1)
PROJECT(type=Aggregation)
REQUEST_UNION(partition_keys=(), orders=, range=(, 0 PRECEDING UNBOUND, 0 CURRENT), index_keys=(gp))
DATA_PROVIDER(request=t1)
DATA_PROVIDER(type=Partition, table=t1, index=idx)
expect:
columns: ["id int", "agg int64"]
order: id
data: |
1, 1
2, 2
3, 1
4, 2
5, 3
- id: 26
desc: RANGE-type WINDOW WITHOUT ORDER BY + WINDOW attributes
mode: batch-unsupport
inputs:
- name: t1
columns:
- id int
- gp int
- ts timestamp
indexs:
- idx:gp:ts
data: |
1, 100, 20000
2, 100, 10000
3, 400, 20000
4, 400, 10000
5, 400, 15000
- name: t2
columns:
- id int
- gp int
- ts timestamp
indexs:
- idx:gp:ts
data: |
1, 100, 20000
2, 100, 10000
3, 400, 20000
4, 400, 10000
5, 400, 15000
sql: |
select id,
count(ts) over w1 as agg1,
count(ts) over w2 as agg2,
count(ts) over w3 as agg3,
count(ts) over w4 as agg4,
count(ts) over w5 as agg5,
count(ts) over w6 as agg6,
count(ts) over w7 as agg7,
from t1
window w1 as (
PARTITION by gp
ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
w2 as (partition by gp
ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW EXCLUDE CURRENT_ROW),
w3 as (PARTITION BY gp
ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW MAXSIZE 1),
w4 as (
UNION (select * from t2)
PARTITION BY gp
ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW INSTANCE_NOT_IN_WINDOW),
w5 as (
UNION (select * from t2)
PARTITION BY gp
ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW INSTANCE_NOT_IN_WINDOW EXCLUDE CURRENT_ROW),
w6 as (
UNION (select * from t2)
PARTITION BY gp
ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW MAXSIZE 2 INSTANCE_NOT_IN_WINDOW EXCLUDE CURRENT_ROW),
w7 as (
UNION (select * from t2)
PARTITION BY gp
ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW EXCLUDE CURRENT_ROW)
expect:
columns: ["id int", "agg1 int64", "agg2 int64", "agg3 int64", "agg4 int64", "agg5 int64", "agg6 int64", "agg7 int64"]
order: id
data: |
1, 1, 0, 1, 3, 2, 2, 2
2, 2, 1, 1, 3, 2, 2, 3
3, 1, 0, 1, 4, 3, 2, 3
4, 2, 1, 1, 4, 3, 2, 4
5, 3, 2, 1, 4, 3, 2, 5
- id: 27
desc: ROWS-type WINDOW WITHOUT ORDER BY + WINDOW attributes
mode: batch-unsupport
inputs:
- name: t1
columns:
- id int
- gp int
- ts timestamp
indexs:
- idx:gp:ts
data: |
1, 100, 20000
2, 100, 10000
3, 400, 20000
4, 400, 10000
5, 400, 15000
- name: t2
columns:
- id int
- gp int
- ts timestamp
indexs:
- idx:gp:ts
data: |
1, 100, 20000
2, 100, 10000
3, 400, 20000
4, 400, 10000
5, 400, 15000
sql: |
select id,
count(ts) over w1 as agg1,
count(ts) over w2 as agg2,
count(ts) over w3 as agg3,
count(ts) over w4 as agg4,
from t1
window w1 as (
PARTITION by gp
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW),
w2 as (partition by gp
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW EXCLUDE CURRENT_ROW),
w3 as (
UNION (select * from t2)
PARTITION BY gp
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW INSTANCE_NOT_IN_WINDOW),
w4 as (
UNION (select * from t2)
PARTITION BY gp
ROWS BETWEEN 3 PRECEDING AND CURRENT ROW INSTANCE_NOT_IN_WINDOW EXCLUDE CURRENT_ROW)
expect:
columns: ["id int", "agg1 int64", "agg2 int64", "agg3 int64", "agg4 int64"]
order: id
data: |
1, 1, 0, 3, 2
2, 2, 1, 3, 2
3, 1, 0, 3, 3
4, 2, 1, 3, 3
5, 3, 2, 3, 3
3 changes: 3 additions & 0 deletions hybridse/include/node/sql_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,9 @@ class FrameBound : public SqlNode {
int64_t GetOffset() const { return offset_; }
void SetOffset(int64_t v) { offset_ = v; }

// is offset [OPEN] PRECEDING/FOLLOWING
bool is_offset_bound() const;


/// \brief get the inclusive frame bound offset value that has signed symbol
///
Expand Down
12 changes: 5 additions & 7 deletions hybridse/include/vm/physical_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ class Range : public FnComponent {
const bool Valid() const { return nullptr != range_key_; }
const std::string ToString() const {
std::ostringstream oss;
if (nullptr != range_key_ && nullptr != frame_) {
if (nullptr != frame_) {
if (nullptr != frame_->frame_range()) {
oss << "range=(" << range_key_->GetExprString() << ", "
oss << "range=(" << node::ExprString(range_key_) << ", "

Check warning on line 205 in hybridse/include/vm/physical_op.h

View check run for this annotation

Codecov / codecov/patch

hybridse/include/vm/physical_op.h#L205

Added line #L205 was not covered by tests
<< frame_->frame_range()->start()->GetExprString() << ", "
<< frame_->frame_range()->end()->GetExprString();

Expand All @@ -216,7 +216,7 @@ class Range : public FnComponent {
if (nullptr != frame_->frame_range()) {
oss << ", ";
}
oss << "rows=(" << range_key_->GetExprString() << ", "
oss << "rows=(" << node::ExprString(range_key_) << ", "
<< frame_->frame_rows()->start()->GetExprString() << ", "
<< frame_->frame_rows()->end()->GetExprString() << ")";
}
Expand Down Expand Up @@ -578,7 +578,7 @@ class PhysicalRequestProviderNode : public PhysicalDataProviderNode {
PhysicalOpNode **out) override;

virtual ~PhysicalRequestProviderNode() {}
virtual void Print(std::ostream &output, const std::string &tab) const;
void Print(std::ostream &output, const std::string &tab) const override;
};

class PhysicalRequestProviderNodeWithCommonColumn
Expand Down Expand Up @@ -846,9 +846,7 @@ class WindowOp {
std::ostringstream oss;
oss << "partition_" << partition_.ToString();
oss << ", " << sort_.ToString();
if (range_.Valid()) {
oss << ", " << range_.ToString();
}
oss << ", " << range_.ToString();
return oss.str();
}
const std::string FnDetail() const {
Expand Down
5 changes: 5 additions & 0 deletions hybridse/src/node/sql_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2099,6 +2099,11 @@ void FrameBound::Print(std::ostream &output, const std::string &org_tab) const {
}
}

bool FrameBound::is_offset_bound() const {
return bound_type_ == kPreceding || bound_type_ == kOpenPreceding || bound_type_ == kFollowing ||
bound_type_ == kOpenFollowing;
}

int FrameBound::Compare(const FrameBound *bound1, const FrameBound *bound2) {
if (SqlEquals(bound1, bound2)) {
return 0;
Expand Down
1 change: 0 additions & 1 deletion hybridse/src/plan/planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <algorithm>
#include <map>
#include <random>
#include <set>
#include <string>
#include <utility>
Expand Down
Loading

0 comments on commit b30b42d

Please sign in to comment.