Skip to content

Commit

Permalink
feat: only ingest key-ed value in additional header column (#14628)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Jan 25, 2024
1 parent fa9e754 commit df87c2d
Show file tree
Hide file tree
Showing 27 changed files with 599 additions and 381 deletions.
28 changes: 28 additions & 0 deletions e2e_test/source/basic/inlcude_key_as.slt
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,32 @@ WITH (
topic = 'kafka_additional_columns')
FORMAT PLAIN ENCODE JSON

# header with varchar type & non-exist header key
statement ok
create table additional_columns_1 (a int)
include key as key_col
include partition as partition_col
include offset as offset_col
include timestamp as timestamp_col
include header 'header1' as header_col_1
include header 'header2' as header_col_2
include header 'header2' varchar as header_col_3
include header 'header3' as header_col_4
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'kafka_additional_columns')
FORMAT PLAIN ENCODE JSON

statement ok
select * from upsert_students_default_key;

statement ok
select * from additional_columns;

statement ok
select * from additional_columns_1;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 3s

Expand Down Expand Up @@ -98,8 +118,16 @@ FROM additional_columns limit 1;
----
header1 \x7631

query TTTT
select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns_1 limit 1
----
\x7631 \x7632 v2 NULL

statement ok
drop table upsert_students_default_key

statement ok
drop table additional_columns

statement ok
drop table additional_columns_1
49 changes: 37 additions & 12 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@ message Field {
string name = 2;
}

enum AdditionalColumnType {
UNSPECIFIED = 0;
KEY = 1;
TIMESTAMP = 2;
PARTITION = 3;
OFFSET = 4;
HEADER = 5;
FILENAME = 6;
NORMAL = 7;
}

enum ColumnDescVersion {
COLUMN_DESC_VERSION_UNSPECIFIED = 0;
// Introduced in https://github.com/risingwavelabs/risingwave/pull/13707#discussion_r1429947537,
Expand Down Expand Up @@ -64,9 +53,15 @@ message ColumnDesc {

// This field is used to represent the connector-spec additional column type.
// UNSPECIFIED or unset for normal column.
AdditionalColumnType additional_column_type = 9;

// deprecated, use AdditionalColumn instead
// AdditionalColumnType additional_column_type = 9;
reserved "additional_column_type";
reserved 9;

ColumnDescVersion version = 10;

AdditionalColumn additional_columns = 11;
}

message ColumnCatalog {
Expand Down Expand Up @@ -190,3 +185,33 @@ message Cardinality {
message ExprContext {
string time_zone = 1;
}

message AdditionalColumnKey {}

message AdditionalColumnTimestamp {}

message AdditionalColumnPartition {}

message AdditionalColumnOffset {}

message AdditionalColumnFilename {}

message AdditionalColumnHeader {
string inner_field = 1;
data.DataType data_type = 2;
}

// this type means we read all headers as a whole
message AdditionalColumnHeaders {}

message AdditionalColumn {
oneof column_type {
AdditionalColumnKey key = 1;
AdditionalColumnTimestamp timestamp = 2;
AdditionalColumnPartition partition = 3;
AdditionalColumnOffset offset = 4;
AdditionalColumnHeader header_inner = 5;
AdditionalColumnFilename filename = 6;
AdditionalColumnHeaders headers = 7;
}
}
29 changes: 16 additions & 13 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use itertools::Itertools;
use risingwave_pb::expr::ExprNode;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::{
AdditionalColumnType, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc,
};

use super::row_id_column_desc;
Expand Down Expand Up @@ -103,7 +103,7 @@ pub struct ColumnDesc {
pub type_name: String,
pub generated_or_default_column: Option<GeneratedOrDefaultColumn>,
pub description: Option<String>,
pub additional_column_type: AdditionalColumnType,
pub additional_columns: AdditionalColumn,
pub version: ColumnDescVersion,
}

Expand All @@ -117,7 +117,7 @@ impl ColumnDesc {
type_name: String::new(),
generated_or_default_column: None,
description: None,
additional_column_type: AdditionalColumnType::Normal,
additional_columns: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -131,7 +131,7 @@ impl ColumnDesc {
type_name: String::new(),
generated_or_default_column: None,
description: None,
additional_column_type: AdditionalColumnType::Normal,
additional_columns: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -140,7 +140,7 @@ impl ColumnDesc {
name: impl Into<String>,
column_id: ColumnId,
data_type: DataType,
additional_column_type: AdditionalColumnType,
additional_column_type: AdditionalColumn,
) -> ColumnDesc {
ColumnDesc {
data_type,
Expand All @@ -150,7 +150,7 @@ impl ColumnDesc {
type_name: String::new(),
generated_or_default_column: None,
description: None,
additional_column_type,
additional_columns: additional_column_type,
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -170,7 +170,7 @@ impl ColumnDesc {
type_name: self.type_name.clone(),
generated_or_default_column: self.generated_or_default_column.clone(),
description: self.description.clone(),
additional_column_type: self.additional_column_type as i32,
additional_columns: Some(self.additional_columns.clone()),
version: self.version as i32,
}
}
Expand Down Expand Up @@ -198,7 +198,7 @@ impl ColumnDesc {
type_name: "".to_string(),
generated_or_default_column: None,
description: None,
additional_column_type: AdditionalColumnType::Normal,
additional_columns: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -221,7 +221,7 @@ impl ColumnDesc {
type_name: type_name.to_string(),
generated_or_default_column: None,
description: None,
additional_column_type: AdditionalColumnType::Normal,
additional_columns: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -239,7 +239,7 @@ impl ColumnDesc {
type_name: field.type_name.clone(),
description: None,
generated_or_default_column: None,
additional_column_type: AdditionalColumnType::Normal,
additional_columns: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
}
}
Expand All @@ -265,7 +265,10 @@ impl ColumnDesc {

impl From<PbColumnDesc> for ColumnDesc {
fn from(prost: PbColumnDesc) -> Self {
let additional_column_type = prost.additional_column_type();
let additional_columns = prost
.get_additional_columns()
.unwrap_or(&AdditionalColumn { column_type: None })
.clone();
let version = prost.version();
let field_descs: Vec<ColumnDesc> = prost
.field_descs
Expand All @@ -280,7 +283,7 @@ impl From<PbColumnDesc> for ColumnDesc {
field_descs,
generated_or_default_column: prost.generated_or_default_column,
description: prost.description.clone(),
additional_column_type,
additional_columns,
version,
}
}
Expand All @@ -302,7 +305,7 @@ impl From<&ColumnDesc> for PbColumnDesc {
type_name: c.type_name.clone(),
generated_or_default_column: c.generated_or_default_column.clone(),
description: c.description.clone(),
additional_column_type: c.additional_column_type as i32,
additional_columns: c.additional_columns.clone().into(),
version: c.version as i32,
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/catalog/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use itertools::Itertools;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType;
use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDesc, ColumnDescVersion};
use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion};

pub trait ColumnDescTestExt {
/// Create a [`ColumnDesc`] with the given name and type.
Expand All @@ -35,7 +35,7 @@ impl ColumnDescTestExt for ColumnDesc {
column_type: Some(data_type),
column_id,
name: name.to_string(),
additional_column_type: AdditionalColumnType::Normal as i32,
additional_columns: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
..Default::default()
}
Expand All @@ -60,7 +60,7 @@ impl ColumnDescTestExt for ColumnDesc {
field_descs: fields,
generated_or_default_column: None,
description: None,
additional_column_type: AdditionalColumnType::Normal as i32,
additional_columns: Some(AdditionalColumn { column_type: None }),
version: ColumnDescVersion::Pr13707 as i32,
}
}
Expand Down
Loading

0 comments on commit df87c2d

Please sign in to comment.