Skip to content

Commit

Permalink
chore: rename update_star_columns (#14806)
Browse files Browse the repository at this point in the history
rename update_star_columns
  • Loading branch information
JackTan25 authored Mar 1, 2024
1 parent 7c4d690 commit 07b3fb9
Showing 1 changed file with 52 additions and 51 deletions.
103 changes: 52 additions & 51 deletions src/query/sql/src/planner/binder/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,61 +248,62 @@ impl Binder {
// todo: (JackTan25) do column prune after finish "split expr for target and source"
let mut columns_set = HashSet::<IndexType>::new();

let update_columns_star = if self.has_star_clause(&matched_clauses, &unmatched_clauses) {
// when there are "update *"/"insert *", we need to get the index of correlated columns in source.
let default_target_table_schema = table.schema().remove_computed_fields();
let mut update_columns = HashMap::with_capacity(
default_target_table_schema
.remove_computed_fields()
.num_fields(),
);
let source_output_columns = &source_context.columns;
// we use Vec as the value, because there could be duplicate names
let mut name_map = HashMap::<String, Vec<ColumnBinding>>::new();
for column in source_output_columns {
name_map
.entry(column.column_name.clone())
.or_default()
.push(column.clone());
}
let update_or_insert_columns_star =
if self.has_star_clause(&matched_clauses, &unmatched_clauses) {
// when there are "update *"/"insert *", we need to get the index of correlated columns in source.
let default_target_table_schema = table.schema().remove_computed_fields();
let mut update_columns = HashMap::with_capacity(
default_target_table_schema
.remove_computed_fields()
.num_fields(),
);
let source_output_columns = &source_context.columns;
// we use Vec as the value, because there could be duplicate names
let mut name_map = HashMap::<String, Vec<ColumnBinding>>::new();
for column in source_output_columns {
name_map
.entry(column.column_name.clone())
.or_default()
.push(column.clone());
}

for (field_idx, field) in default_target_table_schema.fields.iter().enumerate() {
let column = match name_map.get(field.name()) {
None => {
return Err(ErrorCode::SemanticError(
format!("can't find {} in source output", field.name).to_string(),
));
}
Some(indices) => {
if indices.len() != 1 {
for (field_idx, field) in default_target_table_schema.fields.iter().enumerate() {
let column = match name_map.get(field.name()) {
None => {
return Err(ErrorCode::SemanticError(
format!("can't find {} in source output", field.name).to_string(),
));
}
Some(indices) => {
if indices.len() != 1 {
return Err(ErrorCode::SemanticError(
format!(
"there should be only one {} in source output,but we get {}",
field.name,
indices.len()
)
.to_string(),
));
} else {
indices[0].clone()
} else {
indices[0].clone()
}
}
}
};
let column = ColumnBindingBuilder::new(
field.name.to_string(),
column.index,
column.data_type.clone(),
Visibility::Visible,
)
.build();
let col = ScalarExpr::BoundColumnRef(BoundColumnRef { span: None, column });
};
let column = ColumnBindingBuilder::new(
field.name.to_string(),
column.index,
column.data_type.clone(),
Visibility::Visible,
)
.build();
let col = ScalarExpr::BoundColumnRef(BoundColumnRef { span: None, column });

update_columns.insert(field_idx, col);
}
Some(update_columns)
} else {
None
};
update_columns.insert(field_idx, col);
}
Some(update_columns)
} else {
None
};

// Todo: (JackTan25) Maybe we can remove bind target_table
// when the target table has been binded in bind_merge_into_source
Expand Down Expand Up @@ -410,7 +411,7 @@ impl Binder {
clause,
&mut columns_set,
table_schema.clone(),
update_columns_star.clone(),
update_or_insert_columns_star.clone(),
target_name.as_ref(),
)
.await?,
Expand All @@ -425,7 +426,7 @@ impl Binder {
clause,
&mut columns_set,
table_schema.clone(),
update_columns_star.clone(),
update_or_insert_columns_star.clone(),
)
.await?,
);
Expand Down Expand Up @@ -472,7 +473,7 @@ impl Binder {
clause: &MatchedClause,
columns: &mut HashSet<IndexType>,
schema: TableSchemaRef,
update_columns_star: Option<HashMap<FieldIndex, ScalarExpr>>,
update_or_insert_columns_star: Option<HashMap<FieldIndex, ScalarExpr>>,
target_name: &str,
) -> Result<MatchedEvaluator> {
let condition = if let Some(expr) = &clause.selection {
Expand Down Expand Up @@ -501,7 +502,7 @@ impl Binder {
if *is_star {
Ok(MatchedEvaluator {
condition,
update: update_columns_star,
update: update_or_insert_columns_star,
})
} else {
let mut update_columns = HashMap::with_capacity(update_list.len());
Expand Down Expand Up @@ -567,7 +568,7 @@ impl Binder {
clause: &UnmatchedClause,
columns: &mut HashSet<IndexType>,
table_schema: TableSchemaRef,
update_columns_star: Option<HashMap<FieldIndex, ScalarExpr>>,
update_or_insert_columns_star: Option<HashMap<FieldIndex, ScalarExpr>>,
) -> Result<UnmatchedEvaluator> {
let condition = if let Some(expr) = &clause.selection {
let (scalar_expr, _) = scalar_binder.bind(expr).await?;
Expand All @@ -589,9 +590,9 @@ impl Binder {
if clause.insert_operation.is_star {
let default_schema = table_schema.remove_computed_fields();
let mut values = Vec::with_capacity(default_schema.num_fields());
let update_columns_star = update_columns_star.unwrap();
let update_or_insert_columns_star = update_or_insert_columns_star.unwrap();
for idx in 0..default_schema.num_fields() {
let scalar = update_columns_star.get(&idx).unwrap().clone();
let scalar = update_or_insert_columns_star.get(&idx).unwrap().clone();
// cast expr
values.push(wrap_cast(
&scalar,
Expand Down

0 comments on commit 07b3fb9

Please sign in to comment.