diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 278436a884c7..73e84f371c35 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -33,7 +33,6 @@ user create_compaction_group_for_mv user datestyle user enable_join_ordering user enable_share_plan -user enable_shared_source user enable_two_phase_agg user extra_float_digits user force_split_distinct_agg @@ -61,6 +60,7 @@ user streaming_max_parallelism user streaming_over_window_cache_policy user streaming_parallelism user streaming_use_arrangement_backfill +user streaming_use_shared_source user streaming_use_snapshot_backfill user synchronize_seqscans user timezone diff --git a/e2e_test/source_inline/kafka/alter/add_column.slt b/e2e_test/source_inline/kafka/alter/add_column.slt index cde818f6baf9..9d0bbe671079 100644 --- a/e2e_test/source_inline/kafka/alter/add_column.slt +++ b/e2e_test/source_inline/kafka/alter/add_column.slt @@ -1,5 +1,8 @@ control substitution on +statement ok +SET streaming_use_shared_source TO false; + system ok rpk topic delete kafka_alter || true @@ -269,3 +272,6 @@ select * from t statement ok drop table t; + +statement ok +SET streaming_use_shared_source TO true; diff --git a/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt b/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt index 9bfa7238e347..96fd016c5812 100644 --- a/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt +++ b/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt @@ -1,5 +1,8 @@ control substitution on +statement ok +SET streaming_use_shared_source TO false; + ############## Create kafka seed data statement ok @@ -127,3 +130,6 @@ drop sink kafka_sink; statement ok drop table kafka_seed_data; + +statement ok +SET streaming_use_shared_source TO true; diff --git a/e2e_test/source_inline/kafka/avro/alter_source.slt b/e2e_test/source_inline/kafka/avro/alter_source.slt index 57677af57cd9..5a73941dc789 100644 --- a/e2e_test/source_inline/kafka/avro/alter_source.slt +++ b/e2e_test/source_inline/kafka/avro/alter_source.slt @@ -1,5 +1,8 @@ control substitution on +statement ok +SET streaming_use_shared_source TO false; + # https://github.com/risingwavelabs/risingwave/issues/16486 # cleanup @@ -66,3 +69,6 @@ ABC 1 statement ok drop source s cascade; + +statement ok +SET streaming_use_shared_source TO true; diff --git a/e2e_test/source_inline/kafka/avro/glue.slt b/e2e_test/source_inline/kafka/avro/glue.slt index 70d9c1d80024..a4359e9a7c12 100644 --- a/e2e_test/source_inline/kafka/avro/glue.slt +++ b/e2e_test/source_inline/kafka/avro/glue.slt @@ -1,5 +1,8 @@ control substitution on +statement ok +SET streaming_use_shared_source TO false; + system ok rpk topic delete 'glue-sample-my-event' @@ -148,3 +151,6 @@ drop source t; system ok rpk topic delete 'glue-sample-my-event' + +statement ok +SET streaming_use_shared_source TO true; diff --git a/e2e_test/source_inline/kafka/protobuf/alter_source.slt b/e2e_test/source_inline/kafka/protobuf/alter_source.slt index 81085f7da3fe..7f1b431a06ba 100644 --- a/e2e_test/source_inline/kafka/protobuf/alter_source.slt +++ b/e2e_test/source_inline/kafka/protobuf/alter_source.slt @@ -1,5 +1,8 @@ control substitution on +statement ok +SET streaming_use_shared_source TO false; + system ok rpk topic delete pb_alter_source_test || true; \ (rpk sr subject delete 'pb_alter_source_test-value' && rpk sr subject delete 'pb_alter_source_test-value' --permanent) || true; @@ -89,3 +92,6 @@ DROP MATERIALIZED VIEW mv_user; statement ok DROP SOURCE src_user; + +statement ok +SET streaming_use_shared_source TO true; diff --git a/e2e_test/source_inline/kafka/shared_source.slt.serial b/e2e_test/source_inline/kafka/shared_source.slt.serial index ca429781604e..b66bf84fbd80 100644 --- a/e2e_test/source_inline/kafka/shared_source.slt.serial +++ b/e2e_test/source_inline/kafka/shared_source.slt.serial @@ -1,7 +1,7 @@ control substitution on statement ok -SET enable_shared_source TO true; +SET streaming_use_shared_source TO true; system ok rpk topic create shared_source -p 4 @@ -86,7 +86,7 @@ internal_table.mjs --name mv_1 --type sourcebackfill # This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor. statement ok -SET enable_shared_source TO false; +SET streaming_use_shared_source TO false; statement ok create materialized view mv_2 as select * from s0; diff --git a/e2e_test/source_legacy/basic/kafka.slt b/e2e_test/source_legacy/basic/kafka.slt index 227c0aa46bac..52cf82ad373d 100644 --- a/e2e_test/source_legacy/basic/kafka.slt +++ b/e2e_test/source_legacy/basic/kafka.slt @@ -1,3 +1,6 @@ +statement ok +SET streaming_use_shared_source TO false; + # We don't support CSV header for Kafka statement error CSV HEADER is not supported when creating table with Kafka connector create table s0 (v1 int, v2 varchar) with ( @@ -910,3 +913,6 @@ drop table debezium_ignore_key; statement ok drop table test_include_payload; + +statement ok +SET streaming_use_shared_source TO true; diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 325bc0a6a7e7..541d4896a6de 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1043,8 +1043,7 @@ pub struct StreamingDeveloperConfig { /// Enable arrangement backfill /// If false, the arrangement backfill will be disabled, /// even if session variable set. - /// If true, it will be enabled by default, but session variable - /// can override it. + /// If true, it's decided by session variable `streaming_use_arrangement_backfill` (default true) pub enable_arrangement_backfill: bool, #[serde(default = "default::developer::stream_high_join_amplification_threshold")] @@ -1064,6 +1063,13 @@ pub struct StreamingDeveloperConfig { /// A flag to allow disabling the auto schema change handling #[serde(default = "default::developer::stream_enable_auto_schema_change")] pub enable_auto_schema_change: bool, + + #[serde(default = "default::developer::enable_shared_source")] + /// Enable shared source + /// If false, the shared source will be disabled, + /// even if session variable set. + /// If true, it's decided by session variable `streaming_use_shared_source` (default true) + pub enable_shared_source: bool, } /// The subsections `[batch.developer]`. @@ -1964,6 +1970,10 @@ pub mod default { true } + pub fn enable_shared_source() -> bool { + true + } + pub fn stream_high_join_amplification_threshold() -> usize { 2048 } diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 43ac67363dd9..4a16fad1bbe9 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -287,8 +287,8 @@ pub struct SessionConfig { /// /// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source /// will forward the data from the same source streaming job, and also backfill prior data from the external source. - #[parameter(default = false, alias = "rw_enable_shared_source")] - enable_shared_source: bool, + #[parameter(default = true)] + streaming_use_shared_source: bool, /// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time. #[parameter(default = SERVER_ENCODING)] diff --git a/src/config/example.toml b/src/config/example.toml index 6f42344659de..f285720dc6b2 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -130,6 +130,7 @@ stream_high_join_amplification_threshold = 2048 stream_enable_actor_tokio_metrics = false stream_exchange_connection_pool_size = 1 stream_enable_auto_schema_change = true +stream_enable_shared_source = true [storage] share_buffers_sync_parallelism = 1 diff --git a/src/frontend/planner_test/tests/testdata/input/shared_source.yml b/src/frontend/planner_test/tests/testdata/input/shared_source.yml index 71c87ab2e3ce..4d684422d9fd 100644 --- a/src/frontend/planner_test/tests/testdata/input/shared_source.yml +++ b/src/frontend/planner_test/tests/testdata/input/shared_source.yml @@ -9,7 +9,7 @@ ) FORMAT PLAIN ENCODE JSON; expected_outputs: [] - with_config_map: - enable_shared_source: true + streaming_use_shared_source: true sql: | /* The shared source config doesn't affect table with connector. */ EXPLAIN CREATE TABLE s(x int,y int) @@ -25,43 +25,43 @@ # We use with_config_map to control the config when CREATE SOURCE, and use another SET statement to change the config for CREATE MV # # batch: All 4 plans should be the same. -# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. enable_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW +# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. streaming_use_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW - with_config_map: - enable_shared_source: false + streaming_use_shared_source: false before: - create_source sql: | - SET enable_shared_source = false; + SET streaming_use_shared_source = false; select * from s; expected_outputs: - batch_plan - stream_plan - with_config_map: - enable_shared_source: false + streaming_use_shared_source: false before: - create_source sql: | - SET enable_shared_source = true; + SET streaming_use_shared_source = true; select * from s; expected_outputs: - batch_plan - stream_plan - with_config_map: - enable_shared_source: true + streaming_use_shared_source: true before: - create_source sql: | - SET enable_shared_source = false; + SET streaming_use_shared_source = false; select * from s; expected_outputs: - batch_plan - stream_plan - with_config_map: - enable_shared_source: true + streaming_use_shared_source: true before: - create_source sql: | - SET enable_shared_source = true; + SET streaming_use_shared_source = true; select * from s; expected_outputs: - batch_plan diff --git a/src/frontend/planner_test/tests/testdata/output/shared_source.yml b/src/frontend/planner_test/tests/testdata/output/shared_source.yml index 5083c23952f3..83fde26bfc7d 100644 --- a/src/frontend/planner_test/tests/testdata/output/shared_source.yml +++ b/src/frontend/planner_test/tests/testdata/output/shared_source.yml @@ -27,11 +27,11 @@ └─StreamDml { columns: [x, y, _row_id] } └─StreamSource with_config_map: - enable_shared_source: 'true' + streaming_use_shared_source: 'true' - before: - create_source sql: | - SET enable_shared_source = false; + SET streaming_use_shared_source = false; select * from s; batch_plan: |- BatchExchange { order: [], dist: Single } @@ -43,11 +43,11 @@ └─StreamRowIdGen { row_id_index: 3 } └─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id] } with_config_map: - enable_shared_source: 'false' + streaming_use_shared_source: 'false' - before: - create_source sql: | - SET enable_shared_source = true; + SET streaming_use_shared_source = true; select * from s; batch_plan: |- BatchExchange { order: [], dist: Single } @@ -59,11 +59,11 @@ └─StreamRowIdGen { row_id_index: 3 } └─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id] } with_config_map: - enable_shared_source: 'false' + streaming_use_shared_source: 'false' - before: - create_source sql: | - SET enable_shared_source = false; + SET streaming_use_shared_source = false; select * from s; batch_plan: |- BatchExchange { order: [], dist: Single } @@ -75,11 +75,11 @@ └─StreamRowIdGen { row_id_index: 3 } └─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] } with_config_map: - enable_shared_source: 'true' + streaming_use_shared_source: 'true' - before: - create_source sql: | - SET enable_shared_source = true; + SET streaming_use_shared_source = true; select * from s; batch_plan: |- BatchExchange { order: [], dist: Single } @@ -91,4 +91,4 @@ └─StreamRowIdGen { row_id_index: 3 } └─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] } with_config_map: - enable_shared_source: 'true' + streaming_use_shared_source: 'true' diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 013b27ef8a21..2d2e2c669828 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -165,13 +165,31 @@ pub mod tests { let session = frontend.session_ref(); let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME); + let sql = r#"create source s_shared (v1 int) with ( + connector = 'kafka', + topic = 'abc', + properties.bootstrap.server = 'localhost:29092', + ) FORMAT PLAIN ENCODE JSON;"#; + + frontend + .run_sql_with_session(session.clone(), sql) + .await + .unwrap(); + + frontend + .run_sql_with_session(session.clone(), "SET streaming_use_shared_source TO false;") + .await + .unwrap(); let sql = r#"create source s (v1 int) with ( connector = 'kafka', topic = 'abc', properties.bootstrap.server = 'localhost:29092', ) FORMAT PLAIN ENCODE JSON;"#; - frontend.run_sql(sql).await.unwrap(); + frontend + .run_sql_with_session(session.clone(), sql) + .await + .unwrap(); let get_source = || { let catalog_reader = session.env().catalog_reader().read_guard(); @@ -189,6 +207,8 @@ pub mod tests { .map(|col| (col.name(), (col.data_type().clone(), col.column_id()))) .collect(); + let sql = "alter source s_shared add column v2 varchar;"; + assert_eq!("Feature is not yet implemented: alter shared source\nTracking issue: https://github.com/risingwavelabs/risingwave/issues/16003", &frontend.run_sql(sql).await.unwrap_err().to_string()); let sql = "alter source s add column v2 varchar;"; frontend.run_sql(sql).await.unwrap(); diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index 17722625d534..bf8cf991d1a4 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -348,7 +348,14 @@ pub mod tests { let session = frontend.session_ref(); let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME); - frontend.run_sql(sql).await.unwrap(); + frontend + .run_sql_with_session(session.clone(), "SET streaming_use_shared_source TO false;") + .await + .unwrap(); + frontend + .run_sql_with_session(session.clone(), sql) + .await + .unwrap(); let get_source = || { let catalog_reader = session.env().catalog_reader().read_guard(); diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index d6338b89456a..f2720d111cc4 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1662,7 +1662,12 @@ pub async fn handle_create_source( let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); let is_shared = create_cdc_source_job || (with_properties.is_shareable_non_cdc_connector() - && session.config().enable_shared_source()); + && session + .env() + .streaming_config() + .developer + .enable_shared_source + && session.config().streaming_use_shared_source()); let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job { bind_columns_from_source_for_cdc(&session, &source_schema)? diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 5bbf1a021641..81dc1128c2a4 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -354,7 +354,7 @@ impl ToStream for LogicalSource { } SourceNodeKind::CreateMViewOrBatch => { // Create MV on source. - // We only check enable_shared_source is true when `CREATE SOURCE`. + // We only check streaming_use_shared_source is true when `CREATE SOURCE`. // The value does not affect the behavior of `CREATE MATERIALIZED VIEW` here. let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared()); if use_shared_source { diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index ff1c92227716..ac8784fde4dd 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -183,6 +183,7 @@ impl LocalFrontend { res } + /// Creates a new session pub fn session_ref(&self) -> Arc { self.session_user_ref( DEFAULT_DATABASE_NAME.to_string(), diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 3b41c82afab0..2a6118970ccd 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -158,7 +158,7 @@ impl Configuration { pub fn for_scale_shared_source() -> Self { let mut conf = Self::for_scale(); - conf.per_session_queries = vec!["SET ENABLE_SHARED_SOURCE = true;".into()].into(); + conf.per_session_queries = vec!["SET STREAMING_USE_SHARED_SOURCE = true;".into()].into(); conf }