Skip to content

Commit

Permalink
feat: enable shared source in session variable by default, and add cl…
Browse files Browse the repository at this point in the history
…uster-level config to disable (#18749) (#19023)

Signed-off-by: xxchan <[email protected]>
Co-authored-by: xxchan <[email protected]>
  • Loading branch information
github-actions[bot] and xxchan authored Oct 21, 2024
1 parent 0d2ded5 commit 58e12ff
Show file tree
Hide file tree
Showing 19 changed files with 111 additions and 31 deletions.
2 changes: 1 addition & 1 deletion e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ user create_compaction_group_for_mv
user datestyle
user enable_join_ordering
user enable_share_plan
user enable_shared_source
user enable_two_phase_agg
user extra_float_digits
user force_split_distinct_agg
Expand Down Expand Up @@ -61,6 +60,7 @@ user streaming_max_parallelism
user streaming_over_window_cache_policy
user streaming_parallelism
user streaming_use_arrangement_backfill
user streaming_use_shared_source
user streaming_use_snapshot_backfill
user synchronize_seqscans
user timezone
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/alter/add_column.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

system ok
rpk topic delete kafka_alter || true

Expand Down Expand Up @@ -269,3 +272,6 @@ select * from t

statement ok
drop table t;

statement ok
SET streaming_use_shared_source TO true;
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

############## Create kafka seed data

statement ok
Expand Down Expand Up @@ -127,3 +130,6 @@ drop sink kafka_sink;

statement ok
drop table kafka_seed_data;

statement ok
SET streaming_use_shared_source TO true;
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_source.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

# https://github.com/risingwavelabs/risingwave/issues/16486

# cleanup
Expand Down Expand Up @@ -66,3 +69,6 @@ ABC 1

statement ok
drop source s cascade;

statement ok
SET streaming_use_shared_source TO true;
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/avro/glue.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

system ok
rpk topic delete 'glue-sample-my-event'

Expand Down Expand Up @@ -148,3 +151,6 @@ drop source t;

system ok
rpk topic delete 'glue-sample-my-event'

statement ok
SET streaming_use_shared_source TO true;
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

system ok
rpk topic delete pb_alter_source_test || true; \
(rpk sr subject delete 'pb_alter_source_test-value' && rpk sr subject delete 'pb_alter_source_test-value' --permanent) || true;
Expand Down Expand Up @@ -89,3 +92,6 @@ DROP MATERIALIZED VIEW mv_user;

statement ok
DROP SOURCE src_user;

statement ok
SET streaming_use_shared_source TO true;
4 changes: 2 additions & 2 deletions e2e_test/source_inline/kafka/shared_source.slt.serial
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
control substitution on

statement ok
SET enable_shared_source TO true;
SET streaming_use_shared_source TO true;

system ok
rpk topic create shared_source -p 4
Expand Down Expand Up @@ -86,7 +86,7 @@ internal_table.mjs --name mv_1 --type sourcebackfill

# This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor.
statement ok
SET enable_shared_source TO false;
SET streaming_use_shared_source TO false;

statement ok
create materialized view mv_2 as select * from s0;
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/source_legacy/basic/kafka.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
SET streaming_use_shared_source TO false;

# We don't support CSV header for Kafka
statement error CSV HEADER is not supported when creating table with Kafka connector
create table s0 (v1 int, v2 varchar) with (
Expand Down Expand Up @@ -910,3 +913,6 @@ drop table debezium_ignore_key;

statement ok
drop table test_include_payload;

statement ok
SET streaming_use_shared_source TO true;
14 changes: 12 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,8 +1043,7 @@ pub struct StreamingDeveloperConfig {
/// Enable arrangement backfill
/// If false, the arrangement backfill will be disabled,
/// even if session variable set.
/// If true, it will be enabled by default, but session variable
/// can override it.
/// If true, it's decided by session variable `streaming_use_arrangement_backfill` (default true)
pub enable_arrangement_backfill: bool,

#[serde(default = "default::developer::stream_high_join_amplification_threshold")]
Expand All @@ -1064,6 +1063,13 @@ pub struct StreamingDeveloperConfig {
/// A flag to allow disabling the auto schema change handling
#[serde(default = "default::developer::stream_enable_auto_schema_change")]
pub enable_auto_schema_change: bool,

#[serde(default = "default::developer::enable_shared_source")]
/// Enable shared source
/// If false, the shared source will be disabled,
/// even if session variable set.
/// If true, it's decided by session variable `streaming_use_shared_source` (default true)
pub enable_shared_source: bool,
}

/// The subsections `[batch.developer]`.
Expand Down Expand Up @@ -1964,6 +1970,10 @@ pub mod default {
true
}

pub fn enable_shared_source() -> bool {
true
}

pub fn stream_high_join_amplification_threshold() -> usize {
2048
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ pub struct SessionConfig {
///
/// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
/// will forward the data from the same source streaming job, and also backfill prior data from the external source.
#[parameter(default = false, alias = "rw_enable_shared_source")]
enable_shared_source: bool,
#[parameter(default = true)]
streaming_use_shared_source: bool,

/// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
#[parameter(default = SERVER_ENCODING)]
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ stream_high_join_amplification_threshold = 2048
stream_enable_actor_tokio_metrics = false
stream_exchange_connection_pool_size = 1
stream_enable_auto_schema_change = true
stream_enable_shared_source = true

[storage]
share_buffers_sync_parallelism = 1
Expand Down
20 changes: 10 additions & 10 deletions src/frontend/planner_test/tests/testdata/input/shared_source.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
) FORMAT PLAIN ENCODE JSON;
expected_outputs: []
- with_config_map:
enable_shared_source: true
streaming_use_shared_source: true
sql: |
/* The shared source config doesn't affect table with connector. */
EXPLAIN CREATE TABLE s(x int,y int)
Expand All @@ -25,43 +25,43 @@
# We use with_config_map to control the config when CREATE SOURCE, and use another SET statement to change the config for CREATE MV
#
# batch: All 4 plans should be the same.
# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. enable_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW
# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. streaming_use_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW
- with_config_map:
enable_shared_source: false
streaming_use_shared_source: false
before:
- create_source
sql: |
SET enable_shared_source = false;
SET streaming_use_shared_source = false;
select * from s;
expected_outputs:
- batch_plan
- stream_plan
- with_config_map:
enable_shared_source: false
streaming_use_shared_source: false
before:
- create_source
sql: |
SET enable_shared_source = true;
SET streaming_use_shared_source = true;
select * from s;
expected_outputs:
- batch_plan
- stream_plan
- with_config_map:
enable_shared_source: true
streaming_use_shared_source: true
before:
- create_source
sql: |
SET enable_shared_source = false;
SET streaming_use_shared_source = false;
select * from s;
expected_outputs:
- batch_plan
- stream_plan
- with_config_map:
enable_shared_source: true
streaming_use_shared_source: true
before:
- create_source
sql: |
SET enable_shared_source = true;
SET streaming_use_shared_source = true;
select * from s;
expected_outputs:
- batch_plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
└─StreamDml { columns: [x, y, _row_id] }
└─StreamSource
with_config_map:
enable_shared_source: 'true'
streaming_use_shared_source: 'true'
- before:
- create_source
sql: |
SET enable_shared_source = false;
SET streaming_use_shared_source = false;
select * from s;
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -43,11 +43,11 @@
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id] }
with_config_map:
enable_shared_source: 'false'
streaming_use_shared_source: 'false'
- before:
- create_source
sql: |
SET enable_shared_source = true;
SET streaming_use_shared_source = true;
select * from s;
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -59,11 +59,11 @@
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id] }
with_config_map:
enable_shared_source: 'false'
streaming_use_shared_source: 'false'
- before:
- create_source
sql: |
SET enable_shared_source = false;
SET streaming_use_shared_source = false;
select * from s;
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -75,11 +75,11 @@
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
with_config_map:
enable_shared_source: 'true'
streaming_use_shared_source: 'true'
- before:
- create_source
sql: |
SET enable_shared_source = true;
SET streaming_use_shared_source = true;
select * from s;
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -91,4 +91,4 @@
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
with_config_map:
enable_shared_source: 'true'
streaming_use_shared_source: 'true'
22 changes: 21 additions & 1 deletion src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,31 @@ pub mod tests {
let session = frontend.session_ref();
let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);

let sql = r#"create source s_shared (v1 int) with (
connector = 'kafka',
topic = 'abc',
properties.bootstrap.server = 'localhost:29092',
) FORMAT PLAIN ENCODE JSON;"#;

frontend
.run_sql_with_session(session.clone(), sql)
.await
.unwrap();

frontend
.run_sql_with_session(session.clone(), "SET streaming_use_shared_source TO false;")
.await
.unwrap();
let sql = r#"create source s (v1 int) with (
connector = 'kafka',
topic = 'abc',
properties.bootstrap.server = 'localhost:29092',
) FORMAT PLAIN ENCODE JSON;"#;

frontend.run_sql(sql).await.unwrap();
frontend
.run_sql_with_session(session.clone(), sql)
.await
.unwrap();

let get_source = || {
let catalog_reader = session.env().catalog_reader().read_guard();
Expand All @@ -189,6 +207,8 @@ pub mod tests {
.map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
.collect();

let sql = "alter source s_shared add column v2 varchar;";
assert_eq!("Feature is not yet implemented: alter shared source\nTracking issue: https://github.com/risingwavelabs/risingwave/issues/16003", &frontend.run_sql(sql).await.unwrap_err().to_string());
let sql = "alter source s add column v2 varchar;";
frontend.run_sql(sql).await.unwrap();

Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,14 @@ pub mod tests {
let session = frontend.session_ref();
let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);

frontend.run_sql(sql).await.unwrap();
frontend
.run_sql_with_session(session.clone(), "SET streaming_use_shared_source TO false;")
.await
.unwrap();
frontend
.run_sql_with_session(session.clone(), sql)
.await
.unwrap();

let get_source = || {
let catalog_reader = session.env().catalog_reader().read_guard();
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1662,7 +1662,12 @@ pub async fn handle_create_source(
let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared = create_cdc_source_job
|| (with_properties.is_shareable_non_cdc_connector()
&& session.config().enable_shared_source());
&& session
.env()
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source());

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema)?
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl ToStream for LogicalSource {
}
SourceNodeKind::CreateMViewOrBatch => {
// Create MV on source.
// We only check enable_shared_source is true when `CREATE SOURCE`.
// We only check streaming_use_shared_source is true when `CREATE SOURCE`.
// The value does not affect the behavior of `CREATE MATERIALIZED VIEW` here.
let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared());
if use_shared_source {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ impl LocalFrontend {
res
}

/// Creates a new session
pub fn session_ref(&self) -> Arc<SessionImpl> {
self.session_user_ref(
DEFAULT_DATABASE_NAME.to_string(),
Expand Down
Loading

0 comments on commit 58e12ff

Please sign in to comment.