Skip to content

Commit

Permalink
refactor: explain merge into (#15195)
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 authored Apr 9, 2024
1 parent 993ba75 commit 23ea5bd
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 197 deletions.
8 changes: 8 additions & 0 deletions src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ impl Interpreter for ExplainInterpreter {
}
_ => self.explain_plan(&self.plan)?,
},
Plan::MergeInto(plan) => {
let mut res = self.explain_plan(&self.plan)?;
let input = self
.explain_query(&plan.input, &plan.meta_data, &plan.bind_context, &None)
.await?;
res.extend(input);
vec![DataBlock::concat(&res)?]
}
_ => self.explain_plan(&self.plan)?,
},

Expand Down
4 changes: 0 additions & 4 deletions src/query/sql/src/planner/format/display_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,16 +383,12 @@ fn format_merge_into(merge_into: &MergeInto) -> Result<String> {
condition_format, unmatched_format
)));
}
let s_expr = merge_into.input.as_ref();
let metadata = &*merge_into.meta_data.read();
let input_format_child = s_expr.to_format_tree(metadata, false)?;
let all_children = [
vec![distributed_format],
vec![target_build_optimization_format],
vec![can_try_update_column_only_format],
matched_children,
unmatched_children,
vec![input_format_child],
]
.concat();
let res = FormatTreeNode::with_children(target_table_format, all_children).format_pretty()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,40 @@ target_table: default.default.t1
├── target_build_optimization: false
├── can_try_update_column_only: true
├── matched update: [condition: None,update set a = a (#0)]
├── unmatched insert: [condition: None,insert into (a) values(CAST(a (#0) AS Int32 NULL))]
└── Join(Left)
├── build keys: []
├── probe keys: []
├── other filters: [lt(t1.a (#1), t2.a (#0))]
├── Exchange(Merge)
│ └── Scan
│ ├── table: default.t2
│ ├── filters: []
│ ├── order by: []
│ └── limit: NONE
└── Exchange(Merge)
└── Scan
├── table: default.t1
├── filters: []
├── order by: []
└── limit: NONE
└── unmatched insert: [condition: None,insert into (a) values(CAST(a (#0) AS Int32 NULL))]
HashJoin
├── output columns: [t2.a (#0), t1.a (#1), t1._row_id (#2)]
├── join type: LEFT OUTER
├── build keys: []
├── probe keys: []
├── filters: [t1.a (#1) < t2.a (#0)]
├── estimated rows: 1.00
├── Exchange(Build)
│ ├── output columns: [t1.a (#1), t1._row_id (#2)]
│ ├── exchange type: Merge
│ └── TableScan
│ ├── table: default.default.t1
│ ├── output columns: [a (#1), _row_id (#2)]
│ ├── read rows: 15
│ ├── read bytes: 156
│ ├── partitions total: 3
│ ├── partitions scanned: 3
│ ├── pruning stats: [segments: <range pruning: 3 to 3>, blocks: <range pruning: 3 to 3>]
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 0.00
└── Exchange(Probe)
├── output columns: [t2.a (#0)]
├── exchange type: Merge
└── TableScan
├── table: default.default.t2
├── output columns: [a (#0)]
├── read rows: 1
├── read bytes: 36
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 1.00

query TT
merge into t1 using t2 on t1.a < t2.a when matched then update * when not matched then insert *;
Expand Down Expand Up @@ -141,28 +158,42 @@ target_table: default.default.t1
├── target_build_optimization: false
├── can_try_update_column_only: true
├── matched update: [condition: None,update set a = a (#0)]
├── unmatched insert: [condition: None,insert into (a) values(CAST(a (#0) AS Int32 NULL))]
└── Exchange(Merge)
└── Join(Left)
├── build keys: [t1.a (#1)]
├── probe keys: [t2.a (#0)]
├── other filters: []
├── Exchange(Hash)
│ ├── Exchange(Hash): keys: [t2.a (#0)]
│ └── EvalScalar
│ ├── scalars: [stage._$1 (#0) AS (#0)]
│ └── Scan
│ ├── table: system.stage
│ ├── filters: []
│ ├── order by: []
│ └── limit: NONE
└── Exchange(Hash)
├── Exchange(Hash): keys: [t1.a (#1)]
└── Scan
├── table: default.t1
├── filters: []
├── order by: []
└── limit: NONE
└── unmatched insert: [condition: None,insert into (a) values(CAST(a (#0) AS Int32 NULL))]
Exchange
├── output columns: [stage._$1 (#0), t1.a (#1), t1._row_id (#2)]
├── exchange type: Merge
└── HashJoin
├── output columns: [stage._$1 (#0), t1.a (#1), t1._row_id (#2)]
├── join type: LEFT OUTER
├── build keys: [CAST(t1.a (#1) AS Int64 NULL)]
├── probe keys: [CAST(t2.a (#0) AS Int64 NULL)]
├── filters: []
├── estimated rows: 0.00
├── Exchange(Build)
│ ├── output columns: [t1.a (#1), t1._row_id (#2)]
│ ├── exchange type: Hash(CAST(t1.a (#1) AS Int64 NULL))
│ └── TableScan
│ ├── table: default.default.t1
│ ├── output columns: [a (#1), _row_id (#2)]
│ ├── read rows: 2
│ ├── read bytes: 40
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 0.00
└── Exchange(Probe)
├── output columns: [stage._$1 (#0)]
├── exchange type: Hash(CAST(t2.a (#0) AS Int64 NULL))
└── TableScan
├── table: default.system.stage
├── output columns: [_$1 (#0)]
├── read rows: 6
├── read bytes: 12
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 0.00

query TT
merge into t1 using (select $1 as a from @ss) as t2 on t1.a = t2.a when matched then update * when not matched then insert *;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,34 @@ target_table: default.default.target_build_optimization
├── target_build_optimization: true
├── can_try_update_column_only: true
├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)]
├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))]
└── Join(Left)
├── build keys: [t1.a (#3), t1.b (#4)]
├── probe keys: [t2.a (#0), t2.b (#1)]
├── other filters: []
├── Scan
│ ├── table: default.source_optimization
│ ├── filters: []
│ ├── order by: []
│ └── limit: NONE
└── Scan
├── table: default.target_build_optimization
├── filters: []
├── order by: []
└── limit: NONE
└── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))]
HashJoin
├── output columns: [t2.a (#0), t2.b (#1), t2.c (#2), t1.a (#3), t1.b (#4), t1.c (#5), t1._row_id (#6)]
├── join type: LEFT OUTER
├── build keys: [t1.a (#3), t1.b (#4)]
├── probe keys: [t2.a (#0), t2.b (#1)]
├── filters: []
├── estimated rows: 12.00
├── TableScan(Build)
│ ├── table: default.default.target_build_optimization
│ ├── output columns: [a (#3), b (#4), c (#5), _row_id (#6)]
│ ├── read rows: 8
│ ├── read bytes: 512
│ ├── partitions total: 0
│ ├── partitions scanned: 4
│ ├── pruning stats: [segments: <range pruning: 4 to 4>, blocks: <range pruning: 4 to 4>]
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 0.00
└── TableScan(Probe)
├── table: default.default.source_optimization
├── output columns: [a (#0), b (#1), c (#2)]
├── read rows: 12
├── read bytes: 770
├── partitions total: 0
├── partitions scanned: 6
├── pruning stats: [segments: <range pruning: 6 to 6>, blocks: <range pruning: 6 to 6>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 12.00

### 3. test with conjunct and without conjunct
statement ok
Expand Down Expand Up @@ -125,21 +138,34 @@ target_table: default.default.target_build_optimization
├── target_build_optimization: true
├── can_try_update_column_only: true
├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)]
├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))]
└── Join(Left)
├── build keys: [t1.a (#3), t1.b (#4)]
├── probe keys: [t2.a (#0), t2.b (#1)]
├── other filters: []
├── Scan
│ ├── table: default.source_optimization
│ ├── filters: []
│ ├── order by: []
│ └── limit: NONE
└── Scan
├── table: default.target_build_optimization
├── filters: []
├── order by: []
└── limit: NONE
└── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))]
HashJoin
├── output columns: [t2.a (#0), t2.b (#1), t2.c (#2), t1.a (#3), t1.b (#4), t1.c (#5), t1._row_id (#6)]
├── join type: LEFT OUTER
├── build keys: [t1.a (#3), t1.b (#4)]
├── probe keys: [t2.a (#0), t2.b (#1)]
├── filters: []
├── estimated rows: 10.00
├── TableScan(Build)
│ ├── table: default.default.target_build_optimization
│ ├── output columns: [a (#3), b (#4), c (#5), _row_id (#6)]
│ ├── read rows: 8
│ ├── read bytes: 512
│ ├── partitions total: 0
│ ├── partitions scanned: 4
│ ├── pruning stats: [segments: <range pruning: 4 to 4>, blocks: <range pruning: 4 to 4>]
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 0.00
└── TableScan(Probe)
├── table: default.default.source_optimization
├── output columns: [a (#0), b (#1), c (#2)]
├── read rows: 10
├── read bytes: 752
├── partitions total: 6
├── partitions scanned: 6
├── pruning stats: [segments: <range pruning: 6 to 6>, blocks: <range pruning: 6 to 6>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 10.00

## test without conjunct
query TT
Expand Down Expand Up @@ -204,21 +230,34 @@ target_table: default.default.target_build_optimization
├── target_build_optimization: false
├── can_try_update_column_only: true
├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)]
├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))]
└── Join(Left)
├── build keys: [t1.a (#3)]
├── probe keys: [t2.a (#0)]
├── other filters: [gt(t1.b (#4), t2.b (#1))]
├── Scan
│ ├── table: default.source_optimization
│ ├── filters: []
│ ├── order by: []
│ └── limit: NONE
└── Scan
├── table: default.target_build_optimization
├── filters: []
├── order by: []
└── limit: NONE
└── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))]
HashJoin
├── output columns: [t2.a (#0), t2.b (#1), t2.c (#2), t1.a (#3), t1.b (#4), t1.c (#5), t1._row_id (#6)]
├── join type: LEFT OUTER
├── build keys: [t1.a (#3)]
├── probe keys: [t2.a (#0)]
├── filters: [t1.b (#4) > t2.b (#1)]
├── estimated rows: 10.00
├── TableScan(Build)
│ ├── table: default.default.target_build_optimization
│ ├── output columns: [a (#3), b (#4), c (#5), _row_id (#6)]
│ ├── read rows: 8
│ ├── read bytes: 512
│ ├── partitions total: 4
│ ├── partitions scanned: 4
│ ├── pruning stats: [segments: <range pruning: 4 to 4>, blocks: <range pruning: 4 to 4>]
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 0.00
└── TableScan(Probe)
├── table: default.default.source_optimization
├── output columns: [a (#0), b (#1), c (#2)]
├── read rows: 10
├── read bytes: 752
├── partitions total: 6
├── partitions scanned: 6
├── pruning stats: [segments: <range pruning: 6 to 6>, blocks: <range pruning: 6 to 6>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 10.00

statement ok
update source_optimization set a = 2,b = 'b2' where a = 3 and b = 'b3';
Expand Down Expand Up @@ -287,21 +326,33 @@ target_table: default.default.target_build_optimization
├── target_build_optimization: false
├── can_try_update_column_only: true
├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)]
├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))]
└── Join(Left)
├── build keys: [t1.a (#3)]
├── probe keys: [t2.a (#0)]
├── other filters: [gt(t1.b (#4), t2.b (#1))]
├── Scan
│ ├── table: default.source_optimization
│ ├── filters: []
│ ├── order by: []
│ └── limit: NONE
└── Scan
├── table: default.target_build_optimization
├── filters: []
├── order by: []
└── limit: NONE
└── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))]
HashJoin
├── output columns: [t2.a (#0), t2.b (#1), t2.c (#2), t1.a (#3), t1.b (#4), t1.c (#5), t1._row_id (#6)]
├── join type: LEFT OUTER
├── build keys: [t1.a (#3)]
├── probe keys: [t2.a (#0)]
├── filters: [t1.b (#4) > t2.b (#1)]
├── estimated rows: 10.00
├── TableScan(Build)
│ ├── table: default.default.target_build_optimization
│ ├── output columns: [a (#3), b (#4), c (#5), _row_id (#6)]
│ ├── read rows: 0
│ ├── read bytes: 0
│ ├── partitions total: 0
│ ├── partitions scanned: 0
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 0.00
└── TableScan(Probe)
├── table: default.default.source_optimization
├── output columns: [a (#0), b (#1), c (#2)]
├── read rows: 10
├── read bytes: 752
├── partitions total: 0
├── partitions scanned: 6
├── pruning stats: [segments: <range pruning: 6 to 6>, blocks: <range pruning: 6 to 6>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 10.00

query TT
merge into target_build_optimization as t1 using source_optimization as t2 on t1.a > t2.a and t1.b = t2.b when matched then update * when not matched then insert *;
Expand Down
Loading

0 comments on commit 23ea5bd

Please sign in to comment.