Skip to content

Commit

Permalink
feat(frontend): Add Create subscription in frontend (#14831)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Mar 20, 2024
1 parent 718c4a8 commit 34d31aa
Show file tree
Hide file tree
Showing 53 changed files with 1,727 additions and 121 deletions.
26 changes: 26 additions & 0 deletions e2e_test/ddl/alter_owner.slt
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,29 @@ WHERE
----
sink user1

statement ok
CREATE SUBSCRIPTION subscription FROM mv WITH (
retention = '1D'
);

statement ok
ALTER SUBSCRIPTION subscription OWNER TO user1;

query TT
SELECT
pg_class.relname AS rel_name,
pg_roles.rolname AS owner
FROM
pg_class
JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
JOIN pg_roles ON pg_roles.oid = pg_class.relowner
WHERE
pg_namespace.nspname NOT LIKE 'pg_%'
AND pg_namespace.nspname != 'information_schema'
AND pg_class.relname = 'subscription';
----
subscription user1

statement ok
CREATE DATABASE d;

Expand Down Expand Up @@ -181,6 +204,9 @@ DROP DATABASE d;
statement ok
DROP SINK sink;

statement ok
DROP SUBSCRIPTION subscription;

statement ok
DROP SOURCE src;

Expand Down
25 changes: 25 additions & 0 deletions e2e_test/ddl/alter_parallelism.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ create view mview_parallelism as select m.name, tf.parallelism from rw_materiali
statement ok
create view sink_parallelism as select s.name, tf.parallelism from rw_sinks s, rw_table_fragments tf where s.id = tf.table_id;

statement ok
create view subscription_parallelism as select s.name, tf.parallelism from rw_subscriptions s, rw_table_fragments tf where s.id = tf.table_id;

statement ok
create view fragment_parallelism as select t.name as table_name, f.fragment_id, f.parallelism from rw_fragments f, rw_tables t where f.table_id = t.id;

Expand Down Expand Up @@ -94,9 +97,28 @@ select parallelism from sink_parallelism where name = 's';
----
FIXED(4)

statement ok
create subscription subscription1 from t with (retention = '1D');

query T
select parallelism from subscription_parallelism where name = 'subscription1';
----
ADAPTIVE

statement ok
alter subscription subscription1 set parallelism = 4;

query T
select parallelism from subscription_parallelism where name = 'subscription1';
----
FIXED(4)

statement ok
drop sink s;

statement ok
drop subscription subscription1;

statement ok
drop materialized view m_join;

Expand Down Expand Up @@ -157,5 +179,8 @@ drop view mview_parallelism;
statement ok
drop view sink_parallelism;

statement ok
drop view subscription_parallelism;

statement ok
drop view fragment_parallelism;
16 changes: 16 additions & 0 deletions e2e_test/ddl/alter_rename.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ CREATE SINK sink AS SELECT mv3.v1 AS v1, mv3.v21 AS v2 FROM mv AS mv3 WITH (
connector = 'blackhole'
);

statement ok
CREATE SUBSCRIPTION subscription FROM mv WITH (
retention = '1D'
);

statement ok
CREATE SOURCE src (v INT) WITH (
connector = 'datagen',
Expand Down Expand Up @@ -113,6 +118,14 @@ SHOW CREATE SINK sink1;
----
public.sink1 CREATE SINK sink1 AS SELECT mv3.v1 AS v1, mv3.v21 AS v2 FROM mv2 AS mv3 WITH (connector = 'blackhole')

statement ok
ALTER SUBSCRIPTION subscription RENAME TO subscription1;

query TT
SHOW CREATE SUBSCRIPTION subscription1;
----
public.subscription1 CREATE SUBSCRIPTION subscription1 FROM mv WITH (retention = '1D')

# alter mview rename with alias conflict, used by sink1
statement ok
ALTER MATERIALIZED VIEW mv2 RENAME TO mv3;
Expand Down Expand Up @@ -229,6 +242,9 @@ DROP SCHEMA schema1;
statement ok
DROP SINK sink1;

statement ok
DROP SUBSCRIPTION subscription1;

statement error Permission denied
DROP VIEW v5;

Expand Down
19 changes: 19 additions & 0 deletions e2e_test/ddl/alter_set_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ WHERE nspname = 'test_schema';
----
test_sink test_schema

statement ok
CREATE SUBSCRIPTION test_subscription FROM test_schema.test_table WITH (
retention = '1D'
);

statement ok
ALTER SUBSCRIPTION test_subscription SET SCHEMA test_schema;

query TT
SELECT name AS subscriptionname, nspname AS schemaname
FROM rw_subscriptions
JOIN pg_namespace ON pg_namespace.oid = rw_subscriptions.schema_id
WHERE nspname = 'test_schema';
----
test_subscription test_schema

statement ok
CREATE CONNECTION test_conn WITH (type = 'privatelink', provider = 'mock');

Expand All @@ -97,6 +113,9 @@ DROP CONNECTION test_schema.test_conn;
statement ok
DROP SINK test_schema.test_sink;

statement ok
DROP SUBSCRIPTION test_schema.test_subscription;

statement ok
DROP SOURCE test_schema.test_source;

Expand Down
47 changes: 47 additions & 0 deletions e2e_test/ddl/subscription.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
statement ok
create table ddl_t (v1 int);

statement ok
create materialized view ddl_mv as select v1 from ddl_t;

statement ok
create subscription ddl_subscription_table from ddl_t with(retention = '1D');

statement ok
create subscription ddl_subscription_mv from ddl_mv with(retention = '1D');

statement error
create subscription ddl_subscription_table from ddl_t with(retention = '1D');

statement error
create subscription ddl_subscription_mv from ddl_mv with(retention = '1D');

statement ok
create subscription if not exists ddl_subscription_table from ddl_t with(retention = '1D');

statement ok
create subscription if not exists ddl_subscription_mv from ddl_mv with(retention = '1D');

statement ok
drop subscription ddl_subscription_table;

statement ok
drop subscription ddl_subscription_mv;

statement error
drop subscription ddl_subscription_table;

statement error
drop subscription ddl_subscription_mv;

statement ok
drop subscription if exists ddl_subscription_table;

statement ok
drop subscription if exists ddl_subscription_mv;

statement ok
drop materialized view ddl_mv;

statement ok
drop table ddl_t;
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ message Subscription {

optional string initialized_at_cluster_version = 15;
optional string created_at_cluster_version = 16;

string subscription_from_name = 17;
}

message Connection {
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/acl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ pub static ALL_AVAILABLE_SOURCE_MODES: LazyLock<AclModeSet> = LazyLock::new(AclM
pub static ALL_AVAILABLE_MVIEW_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::readonly);
pub static ALL_AVAILABLE_VIEW_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::readonly);
pub static ALL_AVAILABLE_SINK_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::empty);
pub static ALL_AVAILABLE_SUBSCRIPTION_MODES: LazyLock<AclModeSet> =
LazyLock::new(AclModeSet::empty);
pub static ALL_AVAILABLE_FUNCTION_MODES: LazyLock<AclModeSet> =
LazyLock::new(|| BitFlags::from(AclMode::Execute).into());
pub static ALL_AVAILABLE_CONNECTION_MODES: LazyLock<AclModeSet> =
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ pub fn visit_stream_node_tables_inner<F>(
optional!(node.table, "Sink")
}

// Subscription
NodeBody::Subscription(node) => {
// A Subscription should have a state table.
optional!(node.log_store_table, "Subscription")
}

// Now
NodeBody::Now(node) => {
always!(node.state_table, "Now");
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/dbeaver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─BatchScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id] }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─BatchScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchProject { exprs: [rw_subscriptions.id, rw_subscriptions.name, 'subscription':Varchar, rw_subscriptions.schema_id] }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─BatchScan { table: rw_subscriptions, columns: [rw_subscriptions.id, rw_subscriptions.name, rw_subscriptions.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchProject { exprs: [rw_materialized_views.id, rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id] }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─BatchProject { exprs: [rw_views.id, rw_views.name, 'view':Varchar, rw_views.schema_id] }
Expand All @@ -121,6 +123,7 @@
│ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchScan { table: rw_subscriptions, columns: [rw_subscriptions.id, rw_subscriptions.name, rw_subscriptions.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ ├─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ │ └─BatchScan { table: rw_views, columns: [rw_views.id, rw_views.name, rw_views.schema_id], distribution: Single }
│ │ │ │ │ │ │ │ │ │ │ │ └─BatchExchange { order: [], dist: HashShard(rw_schemas.id) }
Expand All @@ -138,6 +141,7 @@
│ │ │ │ │ │ │ │ ├─BatchScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name], distribution: Single }
│ │ │ │ │ │ │ │ ├─BatchScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name], distribution: Single }
│ │ │ │ │ │ │ │ ├─BatchScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name], distribution: Single }
│ │ │ │ │ │ │ │ ├─BatchScan { table: rw_subscriptions, columns: [rw_subscriptions.id, rw_subscriptions.name], distribution: Single }
│ │ │ │ │ │ │ │ ├─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name], distribution: Single }
│ │ │ │ │ │ │ │ └─BatchScan { table: rw_views, columns: [rw_views.id, rw_views.name], distribution: Single }
│ │ │ │ │ │ │ └─BatchExchange { order: [], dist: HashShard(rw_schemas.id) }
Expand All @@ -151,6 +155,7 @@
│ │ │ │ │ ├─BatchScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id], distribution: Single }
│ │ │ │ │ ├─BatchScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.schema_id], distribution: Single }
│ │ │ │ │ ├─BatchScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id], distribution: Single }
│ │ │ │ │ ├─BatchScan { table: rw_subscriptions, columns: [rw_subscriptions.id, rw_subscriptions.name, rw_subscriptions.schema_id], distribution: Single }
│ │ │ │ │ ├─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id], distribution: Single }
│ │ │ │ │ └─BatchScan { table: rw_views, columns: [rw_views.id, rw_views.name, rw_views.schema_id], distribution: Single }
│ │ │ │ └─BatchExchange { order: [], dist: HashShard(rw_schemas.id) }
Expand Down
Loading

0 comments on commit 34d31aa

Please sign in to comment.