diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index 71f740e9fb20..644f0a06f818 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -291,7 +291,7 @@ impl ReplaceInterpreter { table_info: table_info.clone(), catalog_info: catalog.info(), select_ctx, - table_schema: plan.schema.clone(), + target_schema: plan.schema.clone(), table_level_range_index, need_insert: true, delete_when, diff --git a/src/query/service/src/pipelines/builders/builder_append_table.rs b/src/query/service/src/pipelines/builders/builder_append_table.rs index cc07ff1f5620..af3082ae08cf 100644 --- a/src/query/service/src/pipelines/builders/builder_append_table.rs +++ b/src/query/service/src/pipelines/builders/builder_append_table.rs @@ -39,12 +39,7 @@ impl PipelineBuilder { append_mode: AppendMode, deduplicated_label: Option, ) -> Result<()> { - Self::build_fill_missing_columns_pipeline( - ctx.clone(), - main_pipeline, - table.clone(), - source_schema, - )?; + Self::fill_and_reorder_columns(ctx.clone(), main_pipeline, table.clone(), source_schema)?; table.append_data(ctx.clone(), main_pipeline, append_mode)?; @@ -68,12 +63,7 @@ impl PipelineBuilder { source_schema: DataSchemaRef, append_mode: AppendMode, ) -> Result<()> { - Self::build_fill_missing_columns_pipeline( - ctx.clone(), - main_pipeline, - table.clone(), - source_schema, - )?; + Self::fill_and_reorder_columns(ctx.clone(), main_pipeline, table.clone(), source_schema)?; table.append_data(ctx, main_pipeline, append_mode)?; diff --git a/src/query/service/src/pipelines/builders/builder_distributed_insert_select.rs b/src/query/service/src/pipelines/builders/builder_distributed_insert_select.rs index 0935c21d828e..5c52fa99c6e2 100644 --- a/src/query/service/src/pipelines/builders/builder_distributed_insert_select.rs +++ b/src/query/service/src/pipelines/builders/builder_distributed_insert_select.rs @@ -58,7 +58,7 @@ impl PipelineBuilder { )?; let source_schema = insert_schema; - Self::build_fill_missing_columns_pipeline( + Self::fill_and_reorder_columns( self.ctx.clone(), &mut self.main_pipeline, table.clone(), diff --git a/src/query/service/src/pipelines/builders/builder_fill_missing_columns.rs b/src/query/service/src/pipelines/builders/builder_fill_missing_columns.rs index 58dff429f1c1..118eac0421ab 100644 --- a/src/query/service/src/pipelines/builders/builder_fill_missing_columns.rs +++ b/src/query/service/src/pipelines/builders/builder_fill_missing_columns.rs @@ -26,7 +26,9 @@ use crate::sessions::QueryContext; /// This file implements append to table pipeline builder. impl PipelineBuilder { - pub fn build_fill_missing_columns_pipeline( + // Fill missing columns with default or compute expr + // ** Also reorder the block into table's schema order ** + pub fn fill_and_reorder_columns( ctx: Arc, pipeline: &mut Pipeline, table: Arc, diff --git a/src/query/service/src/pipelines/builders/builder_replace_into.rs b/src/query/service/src/pipelines/builders/builder_replace_into.rs index 9d75d6db565d..b6f0c9028be0 100644 --- a/src/query/service/src/pipelines/builders/builder_replace_into.rs +++ b/src/query/service/src/pipelines/builders/builder_replace_into.rs @@ -259,7 +259,7 @@ impl PipelineBuilder { catalog_info, select_ctx, table_level_range_index, - table_schema, + target_schema, need_insert, delete_when, } = deduplicate; @@ -284,7 +284,7 @@ impl PipelineBuilder { false, )?; - let mut target_schema: DataSchema = table_schema.clone().into(); + let mut target_schema: DataSchema = target_schema.clone().into(); if let Some((_, delete_column)) = delete_when { delete_column_idx = select_schema.index_of(delete_column.as_str())?; let delete_column = select_schema.field(delete_column_idx).clone(); @@ -314,11 +314,11 @@ impl PipelineBuilder { } } - Self::build_fill_missing_columns_pipeline( + Self::fill_and_reorder_columns( self.ctx.clone(), &mut self.main_pipeline, tbl.clone(), - Arc::new(table_schema.clone().into()), + Arc::new(target_schema.clone().into()), )?; let _ = table.cluster_gen_for_append( @@ -359,7 +359,7 @@ impl PipelineBuilder { on_conflicts.clone(), cluster_keys, bloom_filter_column_indexes.clone(), - table_schema.as_ref(), + &table.schema(), *table_is_empty, table_level_range_index.clone(), delete_when.map(|(expr, _)| (expr, delete_column_idx)), @@ -372,7 +372,7 @@ impl PipelineBuilder { on_conflicts.clone(), cluster_keys, bloom_filter_column_indexes.clone(), - table_schema.as_ref(), + &table.schema(), *table_is_empty, table_level_range_index.clone(), delete_when.map(|_| delete_column_idx), diff --git a/src/query/service/src/test_kits/fixture.rs b/src/query/service/src/test_kits/fixture.rs index 7a915a736c9b..6a0a04aac565 100644 --- a/src/query/service/src/test_kits/fixture.rs +++ b/src/query/service/src/test_kits/fixture.rs @@ -687,7 +687,7 @@ impl TestFixture { )?; let data_schema: DataSchemaRef = Arc::new(source_schema.into()); - PipelineBuilder::build_fill_missing_columns_pipeline( + PipelineBuilder::fill_and_reorder_columns( ctx.clone(), &mut build_res.main_pipeline, table.clone(), diff --git a/src/query/sql/src/executor/physical_plans/physical_replace_deduplicate.rs b/src/query/sql/src/executor/physical_plans/physical_replace_deduplicate.rs index 4a986b45f1f5..eb22d1d6a486 100644 --- a/src/query/sql/src/executor/physical_plans/physical_replace_deduplicate.rs +++ b/src/query/sql/src/executor/physical_plans/physical_replace_deduplicate.rs @@ -35,7 +35,7 @@ pub struct ReplaceDeduplicate { pub table_is_empty: bool, pub table_info: TableInfo, pub catalog_info: CatalogInfo, - pub table_schema: TableSchemaRef, + pub target_schema: TableSchemaRef, pub select_ctx: Option, pub table_level_range_index: HashMap, pub need_insert: bool, diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_14593.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_14593.test new file mode 100644 index 000000000000..2504944de5d8 --- /dev/null +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0024_replace_into_issue_14593.test @@ -0,0 +1,41 @@ +statement ok +DROP DATABASE IF EXISTS issue_14593 + +statement ok +CREATE DATABASE issue_14593 + +statement ok +USE issue_14593 + +# https://github.com/datafuselabs/databend/issues/14593 + +statement ok +create table t (a string, b string, c string, id int, d string) cluster by (id); + +statement ok +replace into t (b, id, a) on(id) values('b', 1, 'a'); + +query ITT +select id, a, b from t; +---- +1 a b + +statement ok +alter table t drop column c; + +statement ok +alter table t drop column d; + +statement ok +replace into t (b, id, a) on(id) values('bb', 1, 'aa'); + +query ITT +select id, a, b from t; +---- +1 aa bb + +statement ok +drop table t; + +statement ok +DROP DATABASE issue_14593