Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix predicate_index and not matched null cast #13208

Merged
merged 9 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use std::u64::MAX;

use common_base::runtime::GlobalIORuntime;
use common_exception::ErrorCode;
Expand All @@ -33,6 +34,7 @@ use common_sql::executor::PhysicalPlan;
use common_sql::executor::PhysicalPlanBuilder;
use common_sql::plans::MergeInto as MergePlan;
use common_sql::plans::UpdatePlan;
use common_sql::IndexType;
use common_sql::ScalarExpr;
use common_sql::TypeCheck;
use common_storages_factory::Table;
Expand All @@ -51,6 +53,8 @@ use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;

// predicate_index should not be conflict with update expr's column_binding's index.
pub const PREDICATE_COLUMN_INDEX: IndexType = MAX as usize;
const DUMMY_COL_INDEX: usize = 1;
pub struct MergeIntoInterpreter {
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -260,7 +264,7 @@ impl MergeIntoInterpreter {
self.ctx.clone(),
fuse_table.schema().into(),
col_indices,
Some(join_output_schema.num_fields()),
Some(PREDICATE_COLUMN_INDEX),
target_alias.is_some(),
)?;
let update_list = update_list
Expand All @@ -274,7 +278,7 @@ impl MergeIntoInterpreter {
// there will add a predicate col when we process matched clauses.
// so it's not in join_output_schema for now. But it's must be added
// to the tail, so let do it like below.
if name == &join_output_schema.num_fields().to_string() {
if *name == PREDICATE_COLUMN_INDEX.to_string() {
join_output_schema.num_fields()
} else {
join_output_schema.index_of(name).unwrap()
Expand Down
10 changes: 8 additions & 2 deletions src/query/sql/src/planner/binder/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,15 @@ impl Binder {
let mut values = Vec::with_capacity(default_schema.num_fields());
let update_columns_star = update_columns_star.unwrap();
for idx in 0..default_schema.num_fields() {
values.push(update_columns_star.get(&idx).unwrap().clone());
let scalar = update_columns_star.get(&idx).unwrap().clone();
// cast expr
values.push(wrap_cast_scalar(
&scalar,
&scalar.data_type()?,
&DataType::from(default_schema.field(idx).data_type()),
)?);
}

Ok(UnmatchedEvaluator {
source_schema: Arc::new(Arc::new(default_schema).into()),
condition,
Expand All @@ -423,7 +430,6 @@ impl Binder {
}

let mut values = Vec::with_capacity(clause.insert_operation.values.len());

// we need to get source schema, and use it for filling columns.
let source_schema = if let Some(fields) = clause.insert_operation.columns.clone() {
self.schema_project(&table_schema, &fields)?
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/plans/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl UpdatePlan {

let mut right = right.ok_or_else(|| ErrorCode::Internal("It's a bug"))?;
let right_data_type = right.data_type()?;

// cornor case: for merge into, if target_table's fields are not null, when after bind_join, it will
// change into nullable, so we need to cast this.
right = wrap_cast_scalar(&right, &right_data_type, target_type)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,12 @@ impl MatchedAggregator {
let permit = acquire_task_permit(self.io_request_semaphore.clone()).await?;
let aggregation_ctx = self.aggregation_ctx.clone();
let segment_info = segment_infos.get(&segment_idx).unwrap();
info!(
"merge into apply: segment_idx:{},blk_idx:{}",
segment_idx, block_idx
);
let block_idx = segment_info.blocks.len() - block_idx as usize - 1;
assert!(block_idx < segment_info.blocks.len());
// the row_id is generated by block_id, not block_idx,reference to fill_internal_column_meta()
let block_meta = segment_info.blocks[block_idx].clone();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,21 @@ select * from target_test order by a;
3 f
5 f

## test not match cast and predicate index
statement ok
set enable_experimental_merge_into = 0;
drop table if exists test_order;

statement ok
drop table if exists random_source;

statement ok
create table test_order(id bigint, id1 bigint, id2 bigint, id3 bigint, id4 bigint, id5 bigint, id6 bigint, id7 bigint, s1 varchar, s2 varchar, s3 varchar, s4 varchar, s5 varchar, s6 varchar, s7 varchar, s8 varchar, s9 varchar, s10 varchar, s11 varchar, s12 varchar, s13 varchar, d1 DECIMAL(20, 8), d2 DECIMAL(20, 8), d3 DECIMAL(20, 8), d4 DECIMAL(20, 8), d5 DECIMAL(20, 8), d6 DECIMAL(30, 8), d7 DECIMAL(30, 8), d8 DECIMAL(30, 8), d9 DECIMAL(30, 8), d10 DECIMAL(30, 8),insert_time datetime, insert_time1 datetime, insert_time2 datetime, insert_time3 datetime,i int) CLUSTER BY(to_yyyymmdd(insert_time), id) bloom_index_columns='insert_time,id';

statement ok
create table random_source(id bigint not null, id1 bigint, id2 bigint, id3 bigint, id4 bigint, id5 bigint, id6 bigint, id7 bigint,s1 varchar, s2 varchar, s3 varchar, s4 varchar, s5 varchar, s6 varchar, s7 varchar, s8 varchar, s9 varchar, s10 varchar, s11 varchar, s12 varchar, s13 varchar,d1 DECIMAL(20, 8), d2 DECIMAL(20, 8), d3 DECIMAL(20, 8), d4 DECIMAL(20, 8), d5 DECIMAL(20, 8), d6 DECIMAL(30, 8), d7 DECIMAL(30, 8), d8 DECIMAL(30, 8), d9 DECIMAL(30, 8), d10 DECIMAL(30, 8),insert_time datetime not null, insert_time1 datetime, insert_time2 datetime, insert_time3 datetime,i int) Engine = Random;

statement ok
merge into test_order as t using (select id,34 as id1,238 as id2, id3, id4, id5, id6, id7,s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12, s13,d1, d2, d3, d4, d5, d6, d7, d8, d9, d10,insert_time,insert_time1,insert_time2,insert_time3,i from random_source limit 1) as s on t.id = s.id and t.insert_time = s.insert_time when matched then update * when not matched then insert *;

statement ok
set enable_experimental_merge_into = 0;
Loading