Skip to content

Commit

Permalink
refactor: using column name instead of column id for agg index schema (
Browse files Browse the repository at this point in the history
…databendlabs#13125)

* using column name instead of column id for agg index schema

* fix reviewer comments

* fix tests
  • Loading branch information
ariesdevil authored and andylokandy committed Nov 27, 2023
1 parent 8af5171 commit 7737165
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions src/query/service/src/interpreters/interpreter_index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ use common_catalog::plan::Partitions;
use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::infer_table_schema;
use common_expression::infer_schema_type;
use common_expression::DataField;
use common_expression::DataSchemaRefExt;
use common_expression::TableField;
use common_expression::TableSchema;
use common_expression::BLOCK_NAME_COL_NAME;
use common_license::license::Feature;
use common_license::license_manager::get_license_manager;
Expand Down Expand Up @@ -324,10 +326,23 @@ impl Interpreter for RefreshIndexInterpreter {
})?;
let block_name_offset = output_schema.index_of(&block_name_col.index.to_string())?;

let fields = output_schema
.fields()
.iter()
.map(|f| {
let pos = select_columns
.iter()
.find(|col| col.index.to_string().eq_ignore_ascii_case(f.name()))
.ok_or_else(|| ErrorCode::Internal("should find the corresponding column"))?;
let field_type = infer_schema_type(f.data_type())?;
Ok(TableField::new(&pos.column_name, field_type))
})
.collect::<Result<Vec<_>>>()?;

// Build the final sink schema.
let mut sink_schema = infer_table_schema(&output_schema)?.as_ref().clone();
let mut sink_schema = TableSchema::new(fields);
if !self.plan.user_defined_block_name {
sink_schema.drop_column(&block_name_col.index.to_string())?;
sink_schema.drop_column(&block_name_col.column_name)?;
}
let sink_schema = Arc::new(sink_schema);

Expand Down

0 comments on commit 7737165

Please sign in to comment.