Skip to content

Commit

Permalink
feat: standard stream (#14272)
Browse files Browse the repository at this point in the history
* add row_version stream column

* support standard stream

* fix test

* fix wrong row_num after delete

* fix test

* update

* add test

* fix test

* update
  • Loading branch information
zhyass authored Jan 20, 2024
1 parent 4fb7dcf commit e56422d
Show file tree
Hide file tree
Showing 47 changed files with 635 additions and 409 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/query/ast/src/ast/format/syntax/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ pub(crate) fn pretty_create_stream(stmt: CreateStreamStmt) -> RcDoc<'static> {
RcDoc::nil()
},
)
.append(if !stmt.append_only {
RcDoc::space().append(RcDoc::text("APPEND_ONLY = false"))
} else {
RcDoc::nil()
})
.append(if let Some(comment) = stmt.comment {
RcDoc::space().append(RcDoc::text(format!("COMMENT = '{comment}'")))
} else {
Expand Down
4 changes: 4 additions & 0 deletions src/query/ast/src/ast/statements/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct CreateStreamStmt {
pub table_database: Option<Identifier>,
pub table: Identifier,
pub stream_point: Option<StreamPoint>,
pub append_only: bool,
pub comment: Option<String>,
}

Expand All @@ -69,6 +70,9 @@ impl Display for CreateStreamStmt {
if let Some(stream_point) = &self.stream_point {
write!(f, "{}", stream_point)?;
}
if !self.append_only {
write!(f, " APPEND_ONLY = false")?;
}
if let Some(comment) = &self.comment {
write!(f, " COMMENT = '{}'", comment)?;
}
Expand Down
6 changes: 6 additions & 0 deletions src/query/ast/src/parser/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::ast::DropStreamStmt;
use crate::ast::ShowStreamsStmt;
use crate::ast::Statement;
use crate::ast::StreamPoint;
use crate::parser::expr::literal_bool;
use crate::parser::expr::literal_string;
use crate::parser::statement::show_limit;
use crate::parser::token::TokenKind::*;
Expand All @@ -45,6 +46,7 @@ fn create_stream(i: Input) -> IResult<Statement> {
~ #dot_separated_idents_1_to_3
~ ON ~ TABLE ~ #dot_separated_idents_1_to_2
~ ( #stream_point )?
~ ( APPEND_ONLY ~ "=" ~ #literal_bool )?
~ ( COMMENT ~ "=" ~ #literal_string )?
},
|(
Expand All @@ -56,6 +58,7 @@ fn create_stream(i: Input) -> IResult<Statement> {
_,
(table_database, table),
stream_point,
opt_append_only,
opt_comment,
)| {
Statement::CreateStream(CreateStreamStmt {
Expand All @@ -66,6 +69,9 @@ fn create_stream(i: Input) -> IResult<Statement> {
table_database,
table,
stream_point,
append_only: opt_append_only
.map(|(_, _, append_only)| append_only)
.unwrap_or(true),
comment: opt_comment.map(|(_, _, comment)| comment),
})
},
Expand Down
2 changes: 2 additions & 0 deletions src/query/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ pub enum TokenKind {
AGGREGATING,
#[token("ANY", ignore(ascii_case))]
ANY,
#[token("APPEND_ONLY", ignore(ascii_case))]
APPEND_ONLY,
#[token("ARGS", ignore(ascii_case))]
ARGS,
#[token("AUTO", ignore(ascii_case))]
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ fn test_statement() {
r#"drop view v;"#,
r#"create view v1(c1) as select number % 3 as a from numbers(1000);"#,
r#"alter view v1(c2) as select number % 3 as a from numbers(1000);"#,
r#"create stream test2.s1 on table test.t append_only = false;"#,
r#"create stream if not exists test2.s2 on table test.t at (stream => test1.s1) comment = 'this is a stream';"#,
r#"show full streams from default.test2 like 's%';"#,
r#"describe stream test2.s2;"#,
Expand Down
49 changes: 49 additions & 0 deletions src/query/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2427,6 +2427,54 @@ AlterView(
)


---------- Input ----------
create stream test2.s1 on table test.t append_only = false;
---------- Output ---------
CREATE STREAM test2.s1 ON TABLE test.t APPEND_ONLY = false
---------- AST ------------
CreateStream(
CreateStreamStmt {
if_not_exists: false,
catalog: None,
database: Some(
Identifier {
name: "test2",
quote: None,
span: Some(
14..19,
),
},
),
stream: Identifier {
name: "s1",
quote: None,
span: Some(
20..22,
),
},
table_database: Some(
Identifier {
name: "test",
quote: None,
span: Some(
32..36,
),
},
),
table: Identifier {
name: "t",
quote: None,
span: Some(
37..38,
),
},
stream_point: None,
append_only: false,
comment: None,
},
)


---------- Input ----------
create stream if not exists test2.s2 on table test.t at (stream => test1.s1) comment = 'this is a stream';
---------- Output ---------
Expand Down Expand Up @@ -2488,6 +2536,7 @@ CreateStream(
},
},
),
append_only: true,
comment: Some(
"this is a stream",
),
Expand Down
30 changes: 0 additions & 30 deletions src/query/catalog/src/plan/internal_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ use databend_common_expression::Value;
use databend_common_expression::BASE_BLOCK_IDS_COLUMN_ID;
use databend_common_expression::BASE_ROW_ID_COLUMN_ID;
use databend_common_expression::BLOCK_NAME_COLUMN_ID;
use databend_common_expression::CHANGE_ACTION_COLUMN_ID;
use databend_common_expression::CHANGE_IS_UPDATE_COLUMN_ID;
use databend_common_expression::CHANGE_ROW_ID_COLUMN_ID;
use databend_common_expression::ROW_ID_COLUMN_ID;
use databend_common_expression::SEGMENT_NAME_COLUMN_ID;
use databend_common_expression::SNAPSHOT_NAME_COLUMN_ID;
Expand Down Expand Up @@ -132,9 +129,6 @@ pub enum InternalColumnType {
// stream columns
BaseRowId,
BaseBlockIds,
ChangeAction,
ChangeIsUpdate,
ChangeRowId,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -168,9 +162,6 @@ impl InternalColumn {
scale: 0,
})),
)),
InternalColumnType::ChangeAction => TableDataType::String,
InternalColumnType::ChangeIsUpdate => TableDataType::Boolean,
InternalColumnType::ChangeRowId => TableDataType::String,
}
}

Expand All @@ -191,19 +182,6 @@ impl InternalColumn {
InternalColumnType::SnapshotName => SNAPSHOT_NAME_COLUMN_ID,
InternalColumnType::BaseRowId => BASE_ROW_ID_COLUMN_ID,
InternalColumnType::BaseBlockIds => BASE_BLOCK_IDS_COLUMN_ID,
InternalColumnType::ChangeAction => CHANGE_ACTION_COLUMN_ID,
InternalColumnType::ChangeIsUpdate => CHANGE_IS_UPDATE_COLUMN_ID,
InternalColumnType::ChangeRowId => CHANGE_ROW_ID_COLUMN_ID,
}
}

pub fn virtual_computed_expr(&self) -> Option<String> {
match &self.column_type {
InternalColumnType::ChangeRowId => Some(
"if(is_not_null(_origin_block_id), concat(to_uuid(_origin_block_id), lpad(hex(_origin_block_row_num), 6, '0')), _base_row_id)"
.to_string(),
),
_ => None,
}
}

Expand Down Expand Up @@ -302,14 +280,6 @@ impl InternalColumn {
Value::Scalar(meta.base_block_ids.clone().unwrap()),
)
}
InternalColumnType::ChangeAction => BlockEntry::new(
DataType::String,
Value::Scalar(Scalar::String("INSERT".as_bytes().to_vec())),
),
InternalColumnType::ChangeIsUpdate => {
BlockEntry::new(DataType::Boolean, Value::Scalar(Scalar::Boolean(false)))
}
InternalColumnType::ChangeRowId => unreachable!(),
}
}
}
2 changes: 2 additions & 0 deletions src/query/catalog/src/plan/pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchema;
use databend_storages_common_table_meta::table::ChangeAction;

use super::AggIndexInfo;
use crate::plan::Projection;
Expand Down Expand Up @@ -93,6 +94,7 @@ pub struct PushDownInfo {
pub lazy_materialization: bool,
/// Aggregating index information.
pub agg_index: Option<AggIndexInfo>,
pub change_action: Option<ChangeAction>,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
Expand Down
29 changes: 16 additions & 13 deletions src/query/catalog/src/plan/stream_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use databend_common_expression::Value;
use databend_common_expression::ORIGIN_BLOCK_ID_COLUMN_ID;
use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COLUMN_ID;
use databend_common_expression::ORIGIN_VERSION_COLUMN_ID;
use databend_common_expression::ROW_VERSION_COLUMN_ID;

use crate::plan::PartInfo;
use crate::plan::PartInfoPtr;
Expand Down Expand Up @@ -125,25 +126,26 @@ impl StreamColumnMeta {
},
)))
}
}

pub fn build_origin_block_row_num(&self, num_rows: usize) -> Value<AnyType> {
let mut row_ids = Vec::with_capacity(num_rows);
for i in 0..num_rows {
row_ids.push(i as u64);
}
let column = UInt64Type::from_data(row_ids);
Value::Column(Column::Nullable(Box::new(NullableColumn {
column,
validity: Bitmap::new_constant(true, num_rows),
})))
pub fn build_origin_block_row_num(num_rows: usize) -> Value<AnyType> {
let mut row_ids = Vec::with_capacity(num_rows);
for i in 0..num_rows {
row_ids.push(i as u64);
}
let column = UInt64Type::from_data(row_ids);
Value::Column(Column::Nullable(Box::new(NullableColumn {
column,
validity: Bitmap::new_constant(true, num_rows),
})))
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum StreamColumnType {
OriginVersion,
OriginBlockId,
OriginRowNum,
RowVersion,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
Expand All @@ -162,7 +164,6 @@ impl StreamColumn {

pub fn table_field(&self) -> TableField {
TableField::new_from_column_id(&self.column_name, self.table_data_type(), self.column_id())
.with_default_expr(Some("Null".to_string()))
}

pub fn column_type(&self) -> &StreamColumnType {
Expand All @@ -183,6 +184,7 @@ impl StreamColumn {
StreamColumnType::OriginRowNum => {
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64)))
}
StreamColumnType::RowVersion => TableDataType::Number(NumberDataType::UInt64),
}
}

Expand All @@ -200,12 +202,13 @@ impl StreamColumn {
StreamColumnType::OriginVersion => ORIGIN_VERSION_COLUMN_ID,
StreamColumnType::OriginBlockId => ORIGIN_BLOCK_ID_COLUMN_ID,
StreamColumnType::OriginRowNum => ORIGIN_BLOCK_ROW_NUM_COLUMN_ID,
StreamColumnType::RowVersion => ROW_VERSION_COLUMN_ID,
}
}

pub fn generate_column_values(&self, meta: &StreamColumnMeta, num_rows: usize) -> BlockEntry {
match &self.column_type {
StreamColumnType::OriginVersion => unreachable!(),
StreamColumnType::OriginVersion | StreamColumnType::RowVersion => unreachable!(),
StreamColumnType::OriginBlockId => BlockEntry::new(
DataType::Nullable(Box::new(DataType::Decimal(DecimalDataType::Decimal128(
DecimalSize {
Expand All @@ -217,7 +220,7 @@ impl StreamColumn {
),
StreamColumnType::OriginRowNum => BlockEntry::new(
DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))),
meta.build_origin_block_row_num(num_rows),
build_origin_block_row_num(num_rows),
),
}
}
Expand Down
26 changes: 19 additions & 7 deletions src/query/ee/src/stream/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,18 @@ use databend_common_sql::plans::StreamNavigation;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_common_storages_stream::stream_table::StreamTable;
use databend_common_storages_stream::stream_table::MODE_APPEND_ONLY;
use databend_common_storages_stream::stream_table::OPT_KEY_DATABASE_NAME;
use databend_common_storages_stream::stream_table::OPT_KEY_MODE;
use databend_common_storages_stream::stream_table::OPT_KEY_TABLE_ID;
use databend_common_storages_stream::stream_table::OPT_KEY_TABLE_NAME;
use databend_common_storages_stream::stream_table::OPT_KEY_TABLE_VER;
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_enterprise_stream_handler::StreamHandler;
use databend_enterprise_stream_handler::StreamHandlerWrapper;
use databend_storages_common_table_meta::table::MODE_APPEND_ONLY;
use databend_storages_common_table_meta::table::MODE_STANDARD;
use databend_storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_NAME;
use databend_storages_common_table_meta::table::OPT_KEY_MODE;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_NAME;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_VER;

pub struct RealStreamHandler {}

Expand Down Expand Up @@ -108,16 +109,27 @@ impl StreamHandler for RealStreamHandler {
)));
}
options = stream.get_table_info().options().clone();
let stream_mode = if plan.append_only {
MODE_APPEND_ONLY
} else {
MODE_STANDARD
};
options.insert(OPT_KEY_MODE.to_string(), stream_mode.to_string());
}
None => {
let stream_mode = if plan.append_only {
MODE_APPEND_ONLY
} else {
MODE_STANDARD
};
options.insert(OPT_KEY_MODE.to_string(), stream_mode.to_string());
options.insert(OPT_KEY_TABLE_NAME.to_string(), plan.table_name.clone());
options.insert(
OPT_KEY_DATABASE_NAME.to_string(),
plan.table_database.clone(),
);
options.insert(OPT_KEY_TABLE_ID.to_string(), table_id.to_string());
options.insert(OPT_KEY_TABLE_VER.to_string(), table_version.to_string());
options.insert(OPT_KEY_MODE.to_string(), MODE_APPEND_ONLY.to_string());
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
if let Some(snapshot_loc) = fuse_table.snapshot_loc().await? {
options.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), snapshot_loc);
Expand Down
Loading

0 comments on commit e56422d

Please sign in to comment.