Skip to content

Commit

Permalink
chore: fix typo and refine the merge into status
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Dec 19, 2023
1 parent 5263744 commit 8636181
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ impl MergeIntoInterpreter {
plans::INSERT_NAME => {
columns.push(UInt32Type::from_data(vec![status.insert_rows as u32]))
}
plans::UPDTAE_NAME => {
plans::UPDATE_NAME => {
columns.push(UInt32Type::from_data(vec![status.update_rows as u32]))
}
plans::DELETE_NAME => {
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ pub use plans::insert::InsertInputSource;
pub use plans::ScalarExpr;
pub use plans::DELETE_NAME;
pub use plans::INSERT_NAME;
pub use plans::UPDTAE_NAME;
pub use plans::UPDATE_NAME;
pub use semantic::*;
pub use stream_column::*;
63 changes: 34 additions & 29 deletions src/query/sql/src/planner/plans/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ impl std::fmt::Debug for MergeInto {
.field("table_id", &self.table_id)
.field("join", &self.input)
.field("matched", &self.matched_evaluators)
.field("unmateched", &self.unmatched_evaluators)
.field("unmatched", &self.unmatched_evaluators)
.field("distributed", &self.distributed)
.finish()
}
}

pub const INSERT_NAME: &str = "number of rows insertd";
pub const UPDTAE_NAME: &str = "number of rows updated";
pub const INSERT_NAME: &str = "number of rows inserted";
pub const UPDATE_NAME: &str = "number of rows updated";
pub const DELETE_NAME: &str = "number of rows deleted";

impl MergeInto {
Expand All @@ -107,33 +107,38 @@ impl MergeInto {
}

fn merge_into_table_schema(&self) -> Result<DataSchemaRef> {
let field_insertd = DataField::new(INSERT_NAME, DataType::Number(NumberDataType::Int32));
let field_updated = DataField::new(UPDTAE_NAME, DataType::Number(NumberDataType::Int32));
let field_deleted = DataField::new(DELETE_NAME, DataType::Number(NumberDataType::Int32));
match self.merge_into_mutations() {
(true, true, true) => Ok(DataSchemaRefExt::create(vec![
field_insertd.clone(),
field_updated.clone(),
field_deleted.clone(),
])),
(true, true, false) => Ok(DataSchemaRefExt::create(vec![
field_insertd.clone(),
field_updated.clone(),
])),
(true, false, true) => Ok(DataSchemaRefExt::create(vec![
field_insertd.clone(),
field_deleted.clone(),
])),
(true, false, false) => Ok(DataSchemaRefExt::create(vec![field_insertd.clone()])),
(false, true, true) => Ok(DataSchemaRefExt::create(vec![
field_updated.clone(),
field_deleted.clone(),
])),
(false, true, false) => Ok(DataSchemaRefExt::create(vec![field_updated.clone()])),
(false, false, true) => Ok(DataSchemaRefExt::create(vec![field_deleted.clone()])),
_ => Err(ErrorCode::BadArguments(
let (insert, update, delete) = self.merge_into_mutations();

let fields = [
(
DataField::new(INSERT_NAME, DataType::Number(NumberDataType::Int32)),
insert,
),
(
DataField::new(UPDATE_NAME, DataType::Number(NumberDataType::Int32)),
update,
),
(
DataField::new(DELETE_NAME, DataType::Number(NumberDataType::Int32)),
delete,
),
];

let schema_fields: Vec<DataField> = fields
.iter()
.filter_map(
|(field, include)| {
if *include { Some(field.clone()) } else { None }
},
)
.collect();

if schema_fields.is_empty() {
Err(ErrorCode::BadArguments(
"at least one matched or unmatched clause for merge into",
)),
))
} else {
Ok(DataSchemaRefExt::create(schema_fields))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub use merge_into::MergeInto;
pub use merge_into::UnmatchedEvaluator;
pub use merge_into::DELETE_NAME;
pub use merge_into::INSERT_NAME;
pub use merge_into::UPDTAE_NAME;
pub use merge_into::UPDATE_NAME;
pub use operator::*;
pub use pattern::PatternPlan;
pub use plan::*;
Expand Down
12 changes: 6 additions & 6 deletions src/query/storages/system/src/virtual_columns_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ impl AsyncSystemTable for VirtualColumnsTable {
let mut database_names = Vec::with_capacity(virtual_column_metas.len());
let mut table_names = Vec::with_capacity(virtual_column_metas.len());
let mut virtual_columns = Vec::with_capacity(virtual_column_metas.len());
let mut created_ons = Vec::with_capacity(virtual_column_metas.len());
let mut updated_ons = Vec::with_capacity(virtual_column_metas.len());
let mut created_on_columns = Vec::with_capacity(virtual_column_metas.len());
let mut updated_on_columns = Vec::with_capacity(virtual_column_metas.len());
if !virtual_column_metas.is_empty() {
let mut virtual_column_meta_map: HashMap<MetaId, VirtualColumnMeta> =
virtual_column_metas
Expand All @@ -90,8 +90,8 @@ impl AsyncSystemTable for VirtualColumnsTable {
.as_bytes()
.to_vec(),
);
created_ons.push(virtual_column_meta.created_on.timestamp_micros());
updated_ons
created_on_columns.push(virtual_column_meta.created_on.timestamp_micros());
updated_on_columns
.push(virtual_column_meta.updated_on.map(|u| u.timestamp_micros()));
}
}
Expand All @@ -102,8 +102,8 @@ impl AsyncSystemTable for VirtualColumnsTable {
StringType::from_data(database_names),
StringType::from_data(table_names),
StringType::from_data(virtual_columns),
TimestampType::from_data(created_ons),
TimestampType::from_opt_data(updated_ons),
TimestampType::from_data(created_on_columns),
TimestampType::from_opt_data(updated_on_columns),
]))
}
}
Expand Down

0 comments on commit 8636181

Please sign in to comment.