From 1a8b79d7852dce100203d90fb920f4fd15c96b37 Mon Sep 17 00:00:00 2001 From: JackTan25 <60096118+JackTan25@users.noreply.github.com> Date: Thu, 12 Oct 2023 14:41:43 +0800 Subject: [PATCH] fix: fix predicate_index and not matched null cast (#13208) * fix predicate_index and not matched null cast * use u32 instead of uszie * fix test * add log * use u64 --- .../src/interpreters/interpreter_merge_into.rs | 8 ++++++-- src/query/sql/src/planner/binder/merge_into.rs | 10 ++++++++-- src/query/sql/src/planner/plans/update.rs | 1 + .../merge_into/mutator/matched_mutator.rs | 5 +++++ .../base/09_fuse_engine/09_0026_merge_into | 17 ++++++++++++++++- 5 files changed, 36 insertions(+), 5 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index b046407d98fb..ea7d5ba70e90 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; +use std::u64::MAX; use common_base::runtime::GlobalIORuntime; use common_exception::ErrorCode; @@ -33,6 +34,7 @@ use common_sql::executor::PhysicalPlan; use common_sql::executor::PhysicalPlanBuilder; use common_sql::plans::MergeInto as MergePlan; use common_sql::plans::UpdatePlan; +use common_sql::IndexType; use common_sql::ScalarExpr; use common_sql::TypeCheck; use common_storages_factory::Table; @@ -51,6 +53,8 @@ use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; +// predicate_index should not be conflict with update expr's column_binding's index. +pub const PREDICATE_COLUMN_INDEX: IndexType = MAX as usize; const DUMMY_COL_INDEX: usize = 1; pub struct MergeIntoInterpreter { ctx: Arc, @@ -260,7 +264,7 @@ impl MergeIntoInterpreter { self.ctx.clone(), fuse_table.schema().into(), col_indices, - Some(join_output_schema.num_fields()), + Some(PREDICATE_COLUMN_INDEX), target_alias.is_some(), )?; let update_list = update_list @@ -274,7 +278,7 @@ impl MergeIntoInterpreter { // there will add a predicate col when we process matched clauses. // so it's not in join_output_schema for now. But it's must be added // to the tail, so let do it like below. - if name == &join_output_schema.num_fields().to_string() { + if *name == PREDICATE_COLUMN_INDEX.to_string() { join_output_schema.num_fields() } else { join_output_schema.index_of(name).unwrap() diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index b198a9b8648c..ac7c1e914417 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -408,8 +408,15 @@ impl Binder { let mut values = Vec::with_capacity(default_schema.num_fields()); let update_columns_star = update_columns_star.unwrap(); for idx in 0..default_schema.num_fields() { - values.push(update_columns_star.get(&idx).unwrap().clone()); + let scalar = update_columns_star.get(&idx).unwrap().clone(); + // cast expr + values.push(wrap_cast_scalar( + &scalar, + &scalar.data_type()?, + &DataType::from(default_schema.field(idx).data_type()), + )?); } + Ok(UnmatchedEvaluator { source_schema: Arc::new(Arc::new(default_schema).into()), condition, @@ -423,7 +430,6 @@ impl Binder { } let mut values = Vec::with_capacity(clause.insert_operation.values.len()); - // we need to get source schema, and use it for filling columns. let source_schema = if let Some(fields) = clause.insert_operation.columns.clone() { self.schema_project(&table_schema, &fields)? diff --git a/src/query/sql/src/planner/plans/update.rs b/src/query/sql/src/planner/plans/update.rs index 591e26571421..a9e475005f0c 100644 --- a/src/query/sql/src/planner/plans/update.rs +++ b/src/query/sql/src/planner/plans/update.rs @@ -114,6 +114,7 @@ impl UpdatePlan { let mut right = right.ok_or_else(|| ErrorCode::Internal("It's a bug"))?; let right_data_type = right.data_type()?; + // cornor case: for merge into, if target_table's fields are not null, when after bind_join, it will // change into nullable, so we need to cast this. right = wrap_cast_scalar(&right, &right_data_type, target_type)?; diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs index 05704c975bff..bbdb8fbec0b3 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -211,7 +211,12 @@ impl MatchedAggregator { let permit = acquire_task_permit(self.io_request_semaphore.clone()).await?; let aggregation_ctx = self.aggregation_ctx.clone(); let segment_info = segment_infos.get(&segment_idx).unwrap(); + info!( + "merge into apply: segment_idx:{},blk_idx:{}", + segment_idx, block_idx + ); let block_idx = segment_info.blocks.len() - block_idx as usize - 1; + assert!(block_idx < segment_info.blocks.len()); // the row_id is generated by block_id, not block_idx,reference to fill_internal_column_meta() let block_meta = segment_info.blocks[block_idx].clone(); diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into b/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into index 5dd2d4acf453..81b24a93b422 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into @@ -545,6 +545,21 @@ select * from target_test order by a; 3 f 5 f +## test not match cast and predicate index statement ok -set enable_experimental_merge_into = 0; +drop table if exists test_order; + +statement ok +drop table if exists random_source; +statement ok +create table test_order(id bigint, id1 bigint, id2 bigint, id3 bigint, id4 bigint, id5 bigint, id6 bigint, id7 bigint, s1 varchar, s2 varchar, s3 varchar, s4 varchar, s5 varchar, s6 varchar, s7 varchar, s8 varchar, s9 varchar, s10 varchar, s11 varchar, s12 varchar, s13 varchar, d1 DECIMAL(20, 8), d2 DECIMAL(20, 8), d3 DECIMAL(20, 8), d4 DECIMAL(20, 8), d5 DECIMAL(20, 8), d6 DECIMAL(30, 8), d7 DECIMAL(30, 8), d8 DECIMAL(30, 8), d9 DECIMAL(30, 8), d10 DECIMAL(30, 8),insert_time datetime, insert_time1 datetime, insert_time2 datetime, insert_time3 datetime,i int) CLUSTER BY(to_yyyymmdd(insert_time), id) bloom_index_columns='insert_time,id'; + +statement ok +create table random_source(id bigint not null, id1 bigint, id2 bigint, id3 bigint, id4 bigint, id5 bigint, id6 bigint, id7 bigint,s1 varchar, s2 varchar, s3 varchar, s4 varchar, s5 varchar, s6 varchar, s7 varchar, s8 varchar, s9 varchar, s10 varchar, s11 varchar, s12 varchar, s13 varchar,d1 DECIMAL(20, 8), d2 DECIMAL(20, 8), d3 DECIMAL(20, 8), d4 DECIMAL(20, 8), d5 DECIMAL(20, 8), d6 DECIMAL(30, 8), d7 DECIMAL(30, 8), d8 DECIMAL(30, 8), d9 DECIMAL(30, 8), d10 DECIMAL(30, 8),insert_time datetime not null, insert_time1 datetime, insert_time2 datetime, insert_time3 datetime,i int) Engine = Random; + +statement ok +merge into test_order as t using (select id,34 as id1,238 as id2, id3, id4, id5, id6, id7,s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12, s13,d1, d2, d3, d4, d5, d6, d7, d8, d9, d10,insert_time,insert_time1,insert_time2,insert_time3,i from random_source limit 1) as s on t.id = s.id and t.insert_time = s.insert_time when matched then update * when not matched then insert *; + +statement ok +set enable_experimental_merge_into = 0;