Skip to content

Commit

Permalink
feat: add create or replace stream support (#14596)
Browse files Browse the repository at this point in the history
* feat: add create or replace stream support

* feat: add create or replace stream support

---------

Co-authored-by: ๅผ ็‚Žๆณผ <[email protected]>
  • Loading branch information
lichuang and drmingdrmer authored Feb 4, 2024
1 parent 71241d1 commit 3a43077
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 21 deletions.
18 changes: 15 additions & 3 deletions src/query/ast/src/ast/format/syntax/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,24 @@ pub(crate) fn pretty_alter_view(stmt: AlterViewStmt) -> RcDoc<'static> {
}

pub(crate) fn pretty_create_stream(stmt: CreateStreamStmt) -> RcDoc<'static> {
RcDoc::text("CREATE STREAM")
.append(if stmt.if_not_exists {
RcDoc::space().append(RcDoc::text("IF NOT EXISTS"))
RcDoc::text("CREATE")
.append(if let CreateOption::CreateOrReplace = stmt.create_option {
RcDoc::space().append(RcDoc::text("OR REPLACE"))
} else {
RcDoc::nil()
})
.append(RcDoc::space().append(RcDoc::text("STREAM")))
.append(
if let CreateOption::CreateIfNotExists(if_not_exists) = stmt.create_option {
if if_not_exists {
RcDoc::space().append(RcDoc::text("IF NOT EXISTS"))
} else {
RcDoc::nil()
}
} else {
RcDoc::nil()
},
)
.append(
RcDoc::space()
.append(if let Some(catalog) = stmt.catalog {
Expand Down
16 changes: 12 additions & 4 deletions src/query/ast/src/ast/statements/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::fmt::Display;
use std::fmt::Formatter;

use databend_common_meta_app::schema::CreateOption;

use crate::ast::write_dot_separated_list;
use crate::ast::Identifier;
use crate::ast::ShowLimit;
Expand All @@ -41,7 +43,7 @@ impl Display for StreamPoint {

#[derive(Debug, Clone, PartialEq)]
pub struct CreateStreamStmt {
pub if_not_exists: bool,
pub create_option: CreateOption,
pub catalog: Option<Identifier>,
pub database: Option<Identifier>,
pub stream: Identifier,
Expand All @@ -54,9 +56,15 @@ pub struct CreateStreamStmt {

impl Display for CreateStreamStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "CREATE STREAM ")?;
if self.if_not_exists {
write!(f, "IF NOT EXISTS ")?;
write!(f, "CREATE ")?;
if let CreateOption::CreateOrReplace = self.create_option {
write!(f, "OR REPLACE ")?;
}
write!(f, "STREAM ")?;
if let CreateOption::CreateIfNotExists(if_not_exists) = self.create_option {
if if_not_exists {
write!(f, "IF NOT EXISTS ")?;
}
}
write_dot_separated_list(
f,
Expand Down
2 changes: 1 addition & 1 deletion src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2014,7 +2014,7 @@ AS
)(i)
}

fn parse_create_option(
pub fn parse_create_option(
opt_or_replace: bool,
opt_if_not_exists: bool,
) -> Result<CreateOption, nom::Err<ErrorKind>> {
Expand Down
17 changes: 11 additions & 6 deletions src/query/ast/src/parser/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use nom::combinator::map;

use super::statement::parse_create_option;
use crate::ast::CreateStreamStmt;
use crate::ast::DescribeStreamStmt;
use crate::ast::DropStreamStmt;
Expand All @@ -27,22 +28,23 @@ use crate::parser::token::TokenKind::*;
use crate::rule;
use crate::util::dot_separated_idents_1_to_2;
use crate::util::dot_separated_idents_1_to_3;
use crate::util::map_res;
use crate::util::IResult;
use crate::Input;

pub fn stream_table(i: Input) -> IResult<Statement> {
rule!(
#create_stream: "`CREATE STREAM [IF NOT EXISTS] [<database>.]<stream> ON TABLE [<database>.]<table> [<stream_point>] [COMMENT = '<string_literal>']`"
#create_stream: "`CREATE [OR REPLACE] STREAM [IF NOT EXISTS] [<database>.]<stream> ON TABLE [<database>.]<table> [<stream_point>] [COMMENT = '<string_literal>']`"
| #drop_stream: "`DROP STREAM [IF EXISTS] [<database>.]<stream>`"
| #show_streams: "`SHOW [FULL] STREAMS [FROM <database>] [<show_limit>]`"
| #describe_stream: "`DESCRIBE STREAM [<database>.]<stream>`"
)(i)
}

fn create_stream(i: Input) -> IResult<Statement> {
map(
map_res(
rule! {
CREATE ~ STREAM ~ ( IF ~ ^NOT ~ ^EXISTS )?
CREATE ~ (OR ~ REPLACE)? ~ STREAM ~ ( IF ~ ^NOT ~ ^EXISTS )?
~ #dot_separated_idents_1_to_3
~ ON ~ TABLE ~ #dot_separated_idents_1_to_2
~ ( #stream_point )?
Expand All @@ -51,6 +53,7 @@ fn create_stream(i: Input) -> IResult<Statement> {
},
|(
_,
opt_or_replace,
_,
opt_if_not_exists,
(catalog, database, stream),
Expand All @@ -61,8 +64,10 @@ fn create_stream(i: Input) -> IResult<Statement> {
opt_append_only,
opt_comment,
)| {
Statement::CreateStream(CreateStreamStmt {
if_not_exists: opt_if_not_exists.is_some(),
let create_option =
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;
Ok(Statement::CreateStream(CreateStreamStmt {
create_option,
catalog,
database,
stream,
Expand All @@ -73,7 +78,7 @@ fn create_stream(i: Input) -> IResult<Statement> {
.map(|(_, _, append_only)| append_only)
.unwrap_or(true),
comment: opt_comment.map(|(_, _, comment)| comment),
})
}))
},
)(i)
}
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ fn test_statement() {
r#"alter view v1(c2) as select number % 3 as a from numbers(1000);"#,
r#"create stream test2.s1 on table test.t append_only = false;"#,
r#"create stream if not exists test2.s2 on table test.t at (stream => test1.s1) comment = 'this is a stream';"#,
r#"create or replace stream test2.s1 on table test.t append_only = false;"#,
r#"show full streams from default.test2 like 's%';"#,
r#"describe stream test2.s2;"#,
r#"drop stream if exists test2.s2;"#,
Expand Down
56 changes: 54 additions & 2 deletions src/query/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2582,7 +2582,9 @@ CREATE STREAM test2.s1 ON TABLE test.t APPEND_ONLY = false
---------- AST ------------
CreateStream(
CreateStreamStmt {
if_not_exists: false,
create_option: CreateIfNotExists(
false,
),
catalog: None,
database: Some(
Identifier {
Expand Down Expand Up @@ -2630,7 +2632,9 @@ CREATE STREAM IF NOT EXISTS test2.s2 ON TABLE test.t AT (STREAM => test1.s1) COM
---------- AST ------------
CreateStream(
CreateStreamStmt {
if_not_exists: true,
create_option: CreateIfNotExists(
true,
),
catalog: None,
database: Some(
Identifier {
Expand Down Expand Up @@ -2692,6 +2696,54 @@ CreateStream(
)


---------- Input ----------
create or replace stream test2.s1 on table test.t append_only = false;
---------- Output ---------
CREATE OR REPLACE STREAM test2.s1 ON TABLE test.t APPEND_ONLY = false
---------- AST ------------
CreateStream(
CreateStreamStmt {
create_option: CreateOrReplace,
catalog: None,
database: Some(
Identifier {
name: "test2",
quote: None,
span: Some(
25..30,
),
},
),
stream: Identifier {
name: "s1",
quote: None,
span: Some(
31..33,
),
},
table_database: Some(
Identifier {
name: "test",
quote: None,
span: Some(
43..47,
),
},
),
table: Identifier {
name: "t",
quote: None,
span: Some(
48..49,
),
},
stream_point: None,
append_only: false,
comment: None,
},
)


---------- Input ----------
show full streams from default.test2 like 's%';
---------- Output ---------
Expand Down
3 changes: 1 addition & 2 deletions src/query/ee/src/stream/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use databend_common_base::base::GlobalInstance;
use databend_common_catalog::table::Table;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::schema::CreateTableReply;
use databend_common_meta_app::schema::CreateTableReq;
use databend_common_meta_app::schema::DropTableByIdReq;
Expand Down Expand Up @@ -146,7 +145,7 @@ impl StreamHandler for RealStreamHandler {
}

let req = CreateTableReq {
create_option: CreateOption::CreateIfNotExists(plan.if_not_exists),
create_option: plan.create_option,
name_ident: TableNameIdent {
tenant: plan.tenant.clone(),
db_name: plan.database.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/query/sql/src/planner/binder/ddl/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Binder {
stmt: &CreateStreamStmt,
) -> Result<Plan> {
let CreateStreamStmt {
if_not_exists,
create_option,
catalog,
database,
stream,
Expand Down Expand Up @@ -73,7 +73,7 @@ impl Binder {
});

let plan = CreateStreamPlan {
if_not_exists: *if_not_exists,
create_option: *create_option,
tenant,
catalog,
database,
Expand Down
4 changes: 3 additions & 1 deletion src/query/sql/src/planner/plans/ddl/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_meta_app::schema::CreateOption;

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum StreamNavigation {
AtStream { database: String, name: String },
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CreateStreamPlan {
pub if_not_exists: bool,
pub create_option: CreateOption,
pub tenant: String,
pub catalog: String,
pub database: String,
Expand Down
41 changes: 41 additions & 0 deletions tests/sqllogictests/suites/ee/01_ee_system/01_0003_stream.test
Original file line number Diff line number Diff line change
Expand Up @@ -451,5 +451,46 @@ drop table t5 all
statement ok
drop table t6 all

statement ok
create table replace_t1(a int);

statement ok
alter table replace_t1 set options(change_tracking=true);

statement ok
create table replace_t2(a int);

statement ok
alter table replace_t2 set options(change_tracking=true);

statement ok
create stream replace_s on table replace_t1;

statement ok
insert into table replace_t1 values(1);

statement error 1005
create or replace stream if not exists replace_s on table replace_t2;

statement ok
create or replace stream replace_s on table replace_t2;

statement ok
insert into table replace_t2 values(2);

query I
select a from replace_s;
----
2

statement ok
drop table replace_t1;

statement ok
drop table replace_t2;

statement ok
drop stream replace_s;

statement ok
DROP DATABASE IF EXISTS test_stream

0 comments on commit 3a43077

Please sign in to comment.