Skip to content

Commit

Permalink
refactor: additional_columns -> additional_column (#14988)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Feb 5, 2024
1 parent 823382b commit 2349211
Show file tree
Hide file tree
Showing 20 changed files with 46 additions and 46 deletions.
2 changes: 1 addition & 1 deletion proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ message ColumnDesc {

ColumnDescVersion version = 10;

AdditionalColumn additional_columns = 11;
AdditionalColumn additional_column = 11;
}

message ColumnCatalog {
Expand Down
24 changes: 12 additions & 12 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub struct ColumnDesc {
pub type_name: String,
pub generated_or_default_column: Option<GeneratedOrDefaultColumn>,
pub description: Option<String>,
pub additional_columns: AdditionalColumn,
pub additional_column: AdditionalColumn,
pub version: ColumnDescVersion,
}

Expand All @@ -117,7 +117,7 @@ impl ColumnDesc {
type_name: String::new(),
generated_or_default_column: None,
description: None,
additional_columns: AdditionalColumn { column_type: None },
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -131,7 +131,7 @@ impl ColumnDesc {
type_name: String::new(),
generated_or_default_column: None,
description: None,
additional_columns: AdditionalColumn { column_type: None },
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -150,7 +150,7 @@ impl ColumnDesc {
type_name: String::new(),
generated_or_default_column: None,
description: None,
additional_columns: additional_column_type,
additional_column: additional_column_type,
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -170,7 +170,7 @@ impl ColumnDesc {
type_name: self.type_name.clone(),
generated_or_default_column: self.generated_or_default_column.clone(),
description: self.description.clone(),
additional_columns: Some(self.additional_columns.clone()),
additional_column: Some(self.additional_column.clone()),
version: self.version as i32,
}
}
Expand Down Expand Up @@ -198,7 +198,7 @@ impl ColumnDesc {
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
additional_columns: AdditionalColumn { column_type: None },
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -221,7 +221,7 @@ impl ColumnDesc {
type_name: type_name.to_string(),
generated_or_default_column: None,
description: None,
additional_columns: AdditionalColumn { column_type: None },
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -239,7 +239,7 @@ impl ColumnDesc {
type_name: field.type_name.clone(),
description: None,
generated_or_default_column: None,
additional_columns: AdditionalColumn { column_type: None },
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -265,8 +265,8 @@ impl ColumnDesc {

impl From<PbColumnDesc> for ColumnDesc {
fn from(prost: PbColumnDesc) -> Self {
let additional_columns = prost
.get_additional_columns()
let additional_column = prost
.get_additional_column()
.unwrap_or(&AdditionalColumn { column_type: None })
.clone();
let version = prost.version();
Expand All @@ -283,7 +283,7 @@ impl From<PbColumnDesc> for ColumnDesc {
field_descs,
generated_or_default_column: prost.generated_or_default_column,
description: prost.description.clone(),
additional_columns,
additional_column,
version,
}
}
Expand All @@ -305,7 +305,7 @@ impl From<&ColumnDesc> for PbColumnDesc {
type_name: c.type_name.clone(),
generated_or_default_column: c.generated_or_default_column.clone(),
description: c.description.clone(),
additional_columns: c.additional_columns.clone().into(),
additional_column: c.additional_column.clone().into(),
version: c.version as i32,
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/catalog/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl ColumnDescTestExt for ColumnDesc {
column_type: Some(data_type),
column_id,
name: name.to_string(),
additional_columns: Some(AdditionalColumn { column_type: None }),
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
..Default::default()
}
Expand All @@ -60,7 +60,7 @@ impl ColumnDescTestExt for ColumnDesc {
field_descs: fields,
generated_or_default_column: None,
description: None,
additional_columns: Some(AdditionalColumn { column_type: None }),
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn avro_field_to_column_desc(
type_name: schema_name.to_string(),
generated_or_default_column: None,
description: None,
additional_columns: Some(AdditionalColumn { column_type: None }),
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
})
}
Expand All @@ -71,7 +71,7 @@ fn avro_field_to_column_desc(
column_type: Some(data_type.to_protobuf()),
column_id: *index,
name: name.to_owned(),
additional_columns: Some(AdditionalColumn { column_type: None }),
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
..Default::default()
})
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ mod tests {
column_type: SourceColumnType::Normal,
is_pk: false,
is_hidden_addition_col: false,
additional_column_type: AdditionalColumn { column_type: None },
additional_column: AdditionalColumn { column_type: None },
},
SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)),
SourceColumnDesc::simple("o_char", DataType::Varchar, ColumnId::from(9)),
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ mod tests {
column_type: SourceColumnType::Normal,
is_pk: true,
is_hidden_addition_col: false,
additional_column_type: AdditionalColumn {
additional_column: AdditionalColumn {
column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
},
};
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl SourceStreamChunkRowWriter<'_> {
mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<A::Output>,
) -> AccessResult<()> {
let mut wrapped_f = |desc: &SourceColumnDesc| {
match (&desc.column_type, &desc.additional_column_type.column_type) {
match (&desc.column_type, &desc.additional_column.column_type) {
(&SourceColumnType::Offset | &SourceColumnType::RowId, _) => {
// SourceColumnType is for CDC source only.
Ok(A::output_for(
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl ProtobufParserConfig {
type_name: m.full_name().to_string(),
generated_or_default_column: None,
description: None,
additional_columns: Some(AdditionalColumn { column_type: None }),
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
})
} else {
Expand All @@ -189,7 +189,7 @@ impl ProtobufParserConfig {
column_id: *index,
name: field_descriptor.name().to_string(),
column_type: Some(field_type.to_protobuf()),
additional_columns: Some(AdditionalColumn { column_type: None }),
additional_column: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
..Default::default()
})
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/unified/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ where
}

fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult {
match desc.additional_column_type.column_type {
match desc.additional_column.column_type {
Some(AdditionalColumnType::Key(_)) => {
if let Some(key_as_column_name) = &self.key_column_name
&& &desc.name == key_as_column_name
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/upsert_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn build_accessor_builder(
pub fn get_key_column_name(columns: &[SourceColumnDesc]) -> Option<String> {
columns.iter().find_map(|column| {
if matches!(
column.additional_column_type.column_type,
column.additional_column.column_type,
Some(AdditionalColumnType::Key(_))
) {
Some(column.name.clone())
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl CommonSplitReader for KafkaSplitReader {
// ingest kafka message header can be expensive, do it only when required
let require_message_header = self.parser_config.common.rw_columns.iter().any(|col_desc| {
matches!(
col_desc.additional_column_type.column_type,
col_desc.additional_column.column_type,
Some(AdditionalColumnType::Headers(_) | AdditionalColumnType::HeaderInner(_))
)
});
Expand Down
12 changes: 6 additions & 6 deletions src/connector/src/source/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ pub struct SourceColumnDesc {
/// `is_hidden_addition_col` is used to indicate whether the column is a hidden addition column.
pub is_hidden_addition_col: bool,

/// `additional_column_type` and `column_type` are orthogonal
/// `additional_column_type` is used to indicate the column is from which part of the message
/// `additional_column` and `column_type` are orthogonal
/// `additional_column` is used to indicate the column is from which part of the message
/// `column_type` is used to indicate the type of the column, only used in cdc scenario
pub additional_column_type: AdditionalColumn,
pub additional_column: AdditionalColumn,
}

/// `SourceColumnType` is used to indicate the type of a column emitted by the Source.
Expand Down Expand Up @@ -91,7 +91,7 @@ impl SourceColumnDesc {
column_type: SourceColumnType::Normal,
is_pk: false,
is_hidden_addition_col: false,
additional_column_type: AdditionalColumn { column_type: None },
additional_column: AdditionalColumn { column_type: None },
}
}

Expand Down Expand Up @@ -131,7 +131,7 @@ impl From<&ColumnDesc> for SourceColumnDesc {
column_type,
is_pk: false,
is_hidden_addition_col: false,
additional_column_type: c.additional_columns.clone(),
additional_column: c.additional_column.clone(),
}
}
}
Expand All @@ -146,7 +146,7 @@ impl From<&SourceColumnDesc> for ColumnDesc {
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
additional_columns: s.additional_column_type.clone(),
additional_column: s.additional_column.clone(),
version: ColumnDescVersion::Pr13707,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/reader/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl SourceDescBuilder {

// Check if partition/file/offset columns are included explicitly.
for col in &self.columns {
match col.column_desc.as_ref().unwrap().get_additional_columns() {
match col.column_desc.as_ref().unwrap().get_additional_column() {
Ok(AdditionalColumn {
column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
}) => {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result<ColumnDesc> {
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
additional_columns: AdditionalColumn { column_type: None },
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
})
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ mod tests {
type_name: ".test.Country".to_string(),
description: None,
generated_or_default_column: None,
additional_columns: AdditionalColumn { column_type: None },
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
},
is_hidden: false
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ pub(crate) async fn bind_source_pk(
// return the key column names if exists
columns.iter().find_map(|catalog| {
if matches!(
catalog.column_desc.additional_columns.column_type,
catalog.column_desc.additional_column.column_type,
Some(AdditionalColumnType::Key(_))
) {
Some(catalog.name().to_string())
Expand All @@ -697,7 +697,7 @@ pub(crate) async fn bind_source_pk(
let additional_column_names = columns
.iter()
.filter_map(|col| {
if col.column_desc.additional_columns.column_type.is_some() {
if col.column_desc.additional_column.column_type.is_some() {
Some(col.name().to_string())
} else {
None
Expand Down Expand Up @@ -848,7 +848,7 @@ fn check_and_add_timestamp_column(
if is_kafka_connector(with_properties) {
if columns.iter().any(|col| {
matches!(
col.column_desc.additional_columns.column_type,
col.column_desc.additional_column.column_type,
Some(AdditionalColumnType::Timestamp(_))
)
}) {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ pub fn bind_sql_columns(column_defs: &[ColumnDef]) -> Result<Vec<ColumnCatalog>>
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
additional_columns: AdditionalColumn { column_type: None },
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
},
is_hidden: false,
Expand Down
8 changes: 4 additions & 4 deletions src/storage/src/row_serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ mod test {
type_name: "",
generated_or_default_column: None,
description: None,
additional_columns: AdditionalColumn {
additional_column: AdditionalColumn {
column_type: None,
},
version: Pr13707,
Expand All @@ -108,7 +108,7 @@ mod test {
type_name: "",
generated_or_default_column: None,
description: None,
additional_columns: AdditionalColumn {
additional_column: AdditionalColumn {
column_type: None,
},
version: Pr13707,
Expand Down Expand Up @@ -141,7 +141,7 @@ mod test {
type_name: "",
generated_or_default_column: None,
description: None,
additional_columns: AdditionalColumn {
additional_column: AdditionalColumn {
column_type: None,
},
version: Pr13707,
Expand All @@ -154,7 +154,7 @@ mod test {
type_name: "",
generated_or_default_column: None,
description: None,
additional_columns: AdditionalColumn {
additional_column: AdditionalColumn {
column_type: None,
},
version: Pr13707,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub fn get_split_offset_col_idx(
let mut split_idx = None;
let mut offset_idx = None;
for (idx, column) in column_descs.iter().enumerate() {
match column.additional_column_type {
match column.additional_column {
AdditionalColumn {
column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
} => {
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
// the column is from a legacy version
&& desc.version == ColumnDescVersion::Unspecified as i32
{
desc.additional_columns = Some(AdditionalColumn {
desc.additional_column = Some(AdditionalColumn {
column_type: Some(AdditionalColumnType::Key(
AdditionalColumnKey {},
)),
Expand All @@ -110,7 +110,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
{
// compatible code: handle legacy column `_rw_kafka_timestamp`
// the column is auto added for all kafka source to empower batch query on source
// solution: rewrite the column `additional_columns` to Timestamp
// solution: rewrite the column `additional_column` to Timestamp

let _ = source_columns.iter_mut().map(|c| {
let _ = c.column_desc.as_mut().map(|desc| {
Expand All @@ -125,7 +125,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
// the column is from a legacy version
&& desc.version == ColumnDescVersion::Unspecified as i32
{
desc.additional_columns = Some(AdditionalColumn {
desc.additional_column = Some(AdditionalColumn {
column_type: Some(AdditionalColumnType::Timestamp(
AdditionalColumnTimestamp {},
)),
Expand Down

0 comments on commit 2349211

Please sign in to comment.