Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/datafuselabs/databend into …
Browse files Browse the repository at this point in the history
…import_upper_optimizer_for_merge_into
  • Loading branch information
JackTan25 committed Dec 20, 2023
2 parents 0f37154 + c76b37e commit 3adb07b
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 39 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ Databend thrives on community contributions! Whether it's through ideas, code, o

Here are some resources to help you get started:

- [Building Databend From Source](https://docs.databend.com/doc/contributing/building-from-source)
- [The First Good Pull Request](https://docs.databend.com/doc/contributing/good-pr)
- [Building Databend From Source](https://docs.databend.com/doc/overview/community/contributor/building-from-source)
- [The First Good Pull Request](https://docs.databend.com/doc/overview/community/contributor/good-pr)


## 👥 Community
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ impl MergeIntoInterpreter {
plans::INSERT_NAME => {
columns.push(UInt32Type::from_data(vec![status.insert_rows as u32]))
}
plans::UPDTAE_NAME => {
plans::UPDATE_NAME => {
columns.push(UInt32Type::from_data(vec![status.update_rows as u32]))
}
plans::DELETE_NAME => {
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ pub use plans::insert::InsertInputSource;
pub use plans::ScalarExpr;
pub use plans::DELETE_NAME;
pub use plans::INSERT_NAME;
pub use plans::UPDTAE_NAME;
pub use plans::UPDATE_NAME;
pub use semantic::*;
pub use stream_column::*;
64 changes: 36 additions & 28 deletions src/query/sql/src/planner/plans/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ impl std::fmt::Debug for MergeInto {
.field("table_id", &self.table_id)
.field("join", &self.input)
.field("matched", &self.matched_evaluators)
.field("unmateched", &self.unmatched_evaluators)
.field("unmatched", &self.unmatched_evaluators)
.field("distributed", &self.distributed)
.finish()
}
}

pub const INSERT_NAME: &str = "number of rows inserted";
pub const UPDTAE_NAME: &str = "number of rows updated";
pub const UPDATE_NAME: &str = "number of rows updated";
pub const DELETE_NAME: &str = "number of rows deleted";

impl MergeInto {
Expand All @@ -108,33 +108,41 @@ impl MergeInto {
}

fn merge_into_table_schema(&self) -> Result<DataSchemaRef> {
let field_insertd = DataField::new(INSERT_NAME, DataType::Number(NumberDataType::Int32));
let field_updated = DataField::new(UPDTAE_NAME, DataType::Number(NumberDataType::Int32));
let field_deleted = DataField::new(DELETE_NAME, DataType::Number(NumberDataType::Int32));
match self.merge_into_mutations() {
(true, true, true) => Ok(DataSchemaRefExt::create(vec![
field_insertd.clone(),
field_updated.clone(),
field_deleted.clone(),
])),
(true, true, false) => Ok(DataSchemaRefExt::create(vec![
field_insertd.clone(),
field_updated.clone(),
])),
(true, false, true) => Ok(DataSchemaRefExt::create(vec![
field_insertd.clone(),
field_deleted.clone(),
])),
(true, false, false) => Ok(DataSchemaRefExt::create(vec![field_insertd.clone()])),
(false, true, true) => Ok(DataSchemaRefExt::create(vec![
field_updated.clone(),
field_deleted.clone(),
])),
(false, true, false) => Ok(DataSchemaRefExt::create(vec![field_updated.clone()])),
(false, false, true) => Ok(DataSchemaRefExt::create(vec![field_deleted.clone()])),
_ => Err(ErrorCode::BadArguments(
let (insert, update, delete) = self.merge_into_mutations();

let fields = [
(
DataField::new(INSERT_NAME, DataType::Number(NumberDataType::Int32)),
insert,
),
(
DataField::new(UPDATE_NAME, DataType::Number(NumberDataType::Int32)),
update,
),
(
DataField::new(DELETE_NAME, DataType::Number(NumberDataType::Int32)),
delete,
),
];

// Filter and collect the fields to include in the schema.
// Only fields with a corresponding true value in the mutation states are included.
let schema_fields: Vec<DataField> = fields
.iter()
.filter_map(
|(field, include)| {
if *include { Some(field.clone()) } else { None }
},
)
.collect();

// Check if any fields are included. If none, return an error. Otherwise, return the schema.
if schema_fields.is_empty() {
Err(ErrorCode::BadArguments(
"at least one matched or unmatched clause for merge into",
)),
))
} else {
Ok(DataSchemaRefExt::create(schema_fields))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub use merge_into::MergeInto;
pub use merge_into::UnmatchedEvaluator;
pub use merge_into::DELETE_NAME;
pub use merge_into::INSERT_NAME;
pub use merge_into::UPDTAE_NAME;
pub use merge_into::UPDATE_NAME;
pub use operator::*;
pub use pattern::PatternPlan;
pub use plan::*;
Expand Down
12 changes: 6 additions & 6 deletions src/query/storages/system/src/virtual_columns_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ impl AsyncSystemTable for VirtualColumnsTable {
let mut database_names = Vec::with_capacity(virtual_column_metas.len());
let mut table_names = Vec::with_capacity(virtual_column_metas.len());
let mut virtual_columns = Vec::with_capacity(virtual_column_metas.len());
let mut created_ons = Vec::with_capacity(virtual_column_metas.len());
let mut updated_ons = Vec::with_capacity(virtual_column_metas.len());
let mut created_on_columns = Vec::with_capacity(virtual_column_metas.len());
let mut updated_on_columns = Vec::with_capacity(virtual_column_metas.len());
if !virtual_column_metas.is_empty() {
let mut virtual_column_meta_map: HashMap<MetaId, VirtualColumnMeta> =
virtual_column_metas
Expand All @@ -90,8 +90,8 @@ impl AsyncSystemTable for VirtualColumnsTable {
.as_bytes()
.to_vec(),
);
created_ons.push(virtual_column_meta.created_on.timestamp_micros());
updated_ons
created_on_columns.push(virtual_column_meta.created_on.timestamp_micros());
updated_on_columns
.push(virtual_column_meta.updated_on.map(|u| u.timestamp_micros()));
}
}
Expand All @@ -102,8 +102,8 @@ impl AsyncSystemTable for VirtualColumnsTable {
StringType::from_data(database_names),
StringType::from_data(table_names),
StringType::from_data(virtual_columns),
TimestampType::from_data(created_ons),
TimestampType::from_opt_data(updated_ons),
TimestampType::from_data(created_on_columns),
TimestampType::from_opt_data(updated_on_columns),
]))
}
}
Expand Down

0 comments on commit 3adb07b

Please sign in to comment.