diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index aae4a1b114a99..87d9ea9c02191 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -477,7 +477,7 @@ impl MergeIntoInterpreter { plans::INSERT_NAME => { columns.push(UInt32Type::from_data(vec![status.insert_rows as u32])) } - plans::UPDTAE_NAME => { + plans::UPDATE_NAME => { columns.push(UInt32Type::from_data(vec![status.update_rows as u32])) } plans::DELETE_NAME => { diff --git a/src/query/sql/src/planner/mod.rs b/src/query/sql/src/planner/mod.rs index 13f5162b67d03..1e6ff878462f7 100644 --- a/src/query/sql/src/planner/mod.rs +++ b/src/query/sql/src/planner/mod.rs @@ -45,6 +45,6 @@ pub use plans::insert::InsertInputSource; pub use plans::ScalarExpr; pub use plans::DELETE_NAME; pub use plans::INSERT_NAME; -pub use plans::UPDTAE_NAME; +pub use plans::UPDATE_NAME; pub use semantic::*; pub use stream_column::*; diff --git a/src/query/sql/src/planner/plans/merge_into.rs b/src/query/sql/src/planner/plans/merge_into.rs index a2251eda62689..b52ae8ef032e3 100644 --- a/src/query/sql/src/planner/plans/merge_into.rs +++ b/src/query/sql/src/planner/plans/merge_into.rs @@ -78,14 +78,14 @@ impl std::fmt::Debug for MergeInto { .field("table_id", &self.table_id) .field("join", &self.input) .field("matched", &self.matched_evaluators) - .field("unmateched", &self.unmatched_evaluators) + .field("unmatched", &self.unmatched_evaluators) .field("distributed", &self.distributed) .finish() } } -pub const INSERT_NAME: &str = "number of rows insertd"; -pub const UPDTAE_NAME: &str = "number of rows updated"; +pub const INSERT_NAME: &str = "number of rows inserted"; +pub const UPDATE_NAME: &str = "number of rows updated"; pub const DELETE_NAME: &str = "number of rows deleted"; impl MergeInto { @@ -107,33 +107,38 @@ impl MergeInto { } fn merge_into_table_schema(&self) -> Result { - let field_insertd = DataField::new(INSERT_NAME, DataType::Number(NumberDataType::Int32)); - let field_updated = DataField::new(UPDTAE_NAME, DataType::Number(NumberDataType::Int32)); - let field_deleted = DataField::new(DELETE_NAME, DataType::Number(NumberDataType::Int32)); - match self.merge_into_mutations() { - (true, true, true) => Ok(DataSchemaRefExt::create(vec![ - field_insertd.clone(), - field_updated.clone(), - field_deleted.clone(), - ])), - (true, true, false) => Ok(DataSchemaRefExt::create(vec![ - field_insertd.clone(), - field_updated.clone(), - ])), - (true, false, true) => Ok(DataSchemaRefExt::create(vec![ - field_insertd.clone(), - field_deleted.clone(), - ])), - (true, false, false) => Ok(DataSchemaRefExt::create(vec![field_insertd.clone()])), - (false, true, true) => Ok(DataSchemaRefExt::create(vec![ - field_updated.clone(), - field_deleted.clone(), - ])), - (false, true, false) => Ok(DataSchemaRefExt::create(vec![field_updated.clone()])), - (false, false, true) => Ok(DataSchemaRefExt::create(vec![field_deleted.clone()])), - _ => Err(ErrorCode::BadArguments( + let (insert, update, delete) = self.merge_into_mutations(); + + let fields = [ + ( + DataField::new(INSERT_NAME, DataType::Number(NumberDataType::Int32)), + insert, + ), + ( + DataField::new(UPDATE_NAME, DataType::Number(NumberDataType::Int32)), + update, + ), + ( + DataField::new(DELETE_NAME, DataType::Number(NumberDataType::Int32)), + delete, + ), + ]; + + let schema_fields: Vec = fields + .iter() + .filter_map( + |(field, include)| { + if *include { Some(field.clone()) } else { None } + }, + ) + .collect(); + + if schema_fields.is_empty() { + Err(ErrorCode::BadArguments( "at least one matched or unmatched clause for merge into", - )), + )) + } else { + Ok(DataSchemaRefExt::create(schema_fields)) } } diff --git a/src/query/sql/src/planner/plans/mod.rs b/src/query/sql/src/planner/plans/mod.rs index b971477e0cd4f..04e5dff0da5f1 100644 --- a/src/query/sql/src/planner/plans/mod.rs +++ b/src/query/sql/src/planner/plans/mod.rs @@ -77,7 +77,7 @@ pub use merge_into::MergeInto; pub use merge_into::UnmatchedEvaluator; pub use merge_into::DELETE_NAME; pub use merge_into::INSERT_NAME; -pub use merge_into::UPDTAE_NAME; +pub use merge_into::UPDATE_NAME; pub use operator::*; pub use pattern::PatternPlan; pub use plan::*; diff --git a/src/query/storages/system/src/virtual_columns_table.rs b/src/query/storages/system/src/virtual_columns_table.rs index 1dbf511d5d31f..670226f8722ad 100644 --- a/src/query/storages/system/src/virtual_columns_table.rs +++ b/src/query/storages/system/src/virtual_columns_table.rs @@ -67,8 +67,8 @@ impl AsyncSystemTable for VirtualColumnsTable { let mut database_names = Vec::with_capacity(virtual_column_metas.len()); let mut table_names = Vec::with_capacity(virtual_column_metas.len()); let mut virtual_columns = Vec::with_capacity(virtual_column_metas.len()); - let mut created_ons = Vec::with_capacity(virtual_column_metas.len()); - let mut updated_ons = Vec::with_capacity(virtual_column_metas.len()); + let mut created_on_columns = Vec::with_capacity(virtual_column_metas.len()); + let mut updated_on_columns = Vec::with_capacity(virtual_column_metas.len()); if !virtual_column_metas.is_empty() { let mut virtual_column_meta_map: HashMap = virtual_column_metas @@ -90,8 +90,8 @@ impl AsyncSystemTable for VirtualColumnsTable { .as_bytes() .to_vec(), ); - created_ons.push(virtual_column_meta.created_on.timestamp_micros()); - updated_ons + created_on_columns.push(virtual_column_meta.created_on.timestamp_micros()); + updated_on_columns .push(virtual_column_meta.updated_on.map(|u| u.timestamp_micros())); } } @@ -102,8 +102,8 @@ impl AsyncSystemTable for VirtualColumnsTable { StringType::from_data(database_names), StringType::from_data(table_names), StringType::from_data(virtual_columns), - TimestampType::from_data(created_ons), - TimestampType::from_opt_data(updated_ons), + TimestampType::from_data(created_on_columns), + TimestampType::from_opt_data(updated_on_columns), ])) } }