Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable shared source in session variable by default, and add cluster-level config to disable #18749

Merged
merged 3 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ user create_compaction_group_for_mv
user datestyle
user enable_join_ordering
user enable_share_plan
user enable_shared_source
user enable_two_phase_agg
user extra_float_digits
user force_split_distinct_agg
Expand Down Expand Up @@ -61,6 +60,7 @@ user streaming_max_parallelism
user streaming_over_window_cache_policy
user streaming_parallelism
user streaming_use_arrangement_backfill
user streaming_use_shared_source
user streaming_use_snapshot_backfill
user synchronize_seqscans
user timezone
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/alter/add_column.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

system ok
rpk topic delete kafka_alter || true

Expand Down Expand Up @@ -269,3 +272,6 @@ select * from t

statement ok
drop table t;

statement ok
SET streaming_use_shared_source TO true;
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

############## Create kafka seed data

statement ok
Expand Down Expand Up @@ -127,3 +130,6 @@ drop sink kafka_sink;

statement ok
drop table kafka_seed_data;

statement ok
SET streaming_use_shared_source TO true;
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_source.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

# https://github.com/risingwavelabs/risingwave/issues/16486

# cleanup
Expand Down Expand Up @@ -66,3 +69,6 @@ ABC 1

statement ok
drop source s cascade;

statement ok
SET streaming_use_shared_source TO true;
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/avro/glue.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

system ok
rpk topic delete 'glue-sample-my-event'

Expand Down Expand Up @@ -148,3 +151,6 @@ drop source t;

system ok
rpk topic delete 'glue-sample-my-event'

statement ok
SET streaming_use_shared_source TO true;
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

system ok
rpk topic delete pb_alter_source_test || true; \
(rpk sr subject delete 'pb_alter_source_test-value' && rpk sr subject delete 'pb_alter_source_test-value' --permanent) || true;
Expand Down Expand Up @@ -89,3 +92,6 @@ DROP MATERIALIZED VIEW mv_user;

statement ok
DROP SOURCE src_user;

statement ok
SET streaming_use_shared_source TO true;
4 changes: 2 additions & 2 deletions e2e_test/source_inline/kafka/shared_source.slt.serial
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
control substitution on

statement ok
SET enable_shared_source TO true;
SET streaming_use_shared_source TO true;

system ok
rpk topic create shared_source -p 4
Expand Down Expand Up @@ -86,7 +86,7 @@ internal_table.mjs --name mv_1 --type sourcebackfill

# This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor.
statement ok
SET enable_shared_source TO false;
SET streaming_use_shared_source TO false;

statement ok
create materialized view mv_2 as select * from s0;
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/source_legacy/basic/kafka.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
SET streaming_use_shared_source TO false;

# We don't support CSV header for Kafka
statement error CSV HEADER is not supported when creating table with Kafka connector
create table s0 (v1 int, v2 varchar) with (
Expand Down Expand Up @@ -923,3 +926,6 @@ drop table test_include_payload_only;

statement ok
drop table test_include_payload;

statement ok
SET streaming_use_shared_source TO true;
14 changes: 12 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,8 +1043,7 @@ pub struct StreamingDeveloperConfig {
/// Enable arrangement backfill
/// If false, the arrangement backfill will be disabled,
/// even if session variable set.
/// If true, it will be enabled by default, but session variable
/// can override it.
/// If true, it's decided by session variable `streaming_use_arrangement_backfill` (default true)
pub enable_arrangement_backfill: bool,

#[serde(default = "default::developer::stream_high_join_amplification_threshold")]
Expand All @@ -1064,6 +1063,13 @@ pub struct StreamingDeveloperConfig {
/// A flag to allow disabling the auto schema change handling
#[serde(default = "default::developer::stream_enable_auto_schema_change")]
pub enable_auto_schema_change: bool,

#[serde(default = "default::developer::enable_shared_source")]
/// Enable shared source
/// If false, the shared source will be disabled,
/// even if session variable set.
/// If true, it's decided by session variable `streaming_use_shared_source` (default true)
pub enable_shared_source: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't follow the standard pattern of a session/global variable, particularly, the "cluster-level config" should be SystemParams instead of RwConfig. However, as the configuration is not user-facing but only for us, it's acceptable to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

standard pattern of a session/global variable

FYI actually session variable can also be ALTER SYSTEM to be changed globally.

Just add this extra gate in case it's needed.

}

/// The subsections `[batch.developer]`.
Expand Down Expand Up @@ -1964,6 +1970,10 @@ pub mod default {
true
}

pub fn enable_shared_source() -> bool {
true
}

pub fn stream_high_join_amplification_threshold() -> usize {
2048
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ pub struct SessionConfig {
///
/// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
/// will forward the data from the same source streaming job, and also backfill prior data from the external source.
#[parameter(default = false, alias = "rw_enable_shared_source")]
enable_shared_source: bool,
#[parameter(default = true)]
streaming_use_shared_source: bool,

/// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
#[parameter(default = SERVER_ENCODING)]
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ stream_high_join_amplification_threshold = 2048
stream_enable_actor_tokio_metrics = false
stream_exchange_connection_pool_size = 1
stream_enable_auto_schema_change = true
stream_enable_shared_source = true

[storage]
share_buffers_sync_parallelism = 1
Expand Down
20 changes: 10 additions & 10 deletions src/frontend/planner_test/tests/testdata/input/shared_source.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
) FORMAT PLAIN ENCODE JSON;
expected_outputs: []
- with_config_map:
enable_shared_source: true
streaming_use_shared_source: true
sql: |
/* The shared source config doesn't affect table with connector. */
EXPLAIN CREATE TABLE s(x int,y int)
Expand All @@ -25,43 +25,43 @@
# We use with_config_map to control the config when CREATE SOURCE, and use another SET statement to change the config for CREATE MV
#
# batch: All 4 plans should be the same.
# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. enable_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW
# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. streaming_use_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW
- with_config_map:
enable_shared_source: false
streaming_use_shared_source: false
before:
- create_source
sql: |
SET enable_shared_source = false;
SET streaming_use_shared_source = false;
select * from s;
expected_outputs:
- batch_plan
- stream_plan
- with_config_map:
enable_shared_source: false
streaming_use_shared_source: false
before:
- create_source
sql: |
SET enable_shared_source = true;
SET streaming_use_shared_source = true;
select * from s;
expected_outputs:
- batch_plan
- stream_plan
- with_config_map:
enable_shared_source: true
streaming_use_shared_source: true
before:
- create_source
sql: |
SET enable_shared_source = false;
SET streaming_use_shared_source = false;
select * from s;
expected_outputs:
- batch_plan
- stream_plan
- with_config_map:
enable_shared_source: true
streaming_use_shared_source: true
before:
- create_source
sql: |
SET enable_shared_source = true;
SET streaming_use_shared_source = true;
select * from s;
expected_outputs:
- batch_plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
└─StreamDml { columns: [x, y, _row_id] }
└─StreamSource
with_config_map:
enable_shared_source: 'true'
streaming_use_shared_source: 'true'
- before:
- create_source
sql: |
SET enable_shared_source = false;
SET streaming_use_shared_source = false;
select * from s;
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -43,11 +43,11 @@
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id] }
with_config_map:
enable_shared_source: 'false'
streaming_use_shared_source: 'false'
- before:
- create_source
sql: |
SET enable_shared_source = true;
SET streaming_use_shared_source = true;
select * from s;
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -59,11 +59,11 @@
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id] }
with_config_map:
enable_shared_source: 'false'
streaming_use_shared_source: 'false'
- before:
- create_source
sql: |
SET enable_shared_source = false;
SET streaming_use_shared_source = false;
select * from s;
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -75,11 +75,11 @@
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
with_config_map:
enable_shared_source: 'true'
streaming_use_shared_source: 'true'
- before:
- create_source
sql: |
SET enable_shared_source = true;
SET streaming_use_shared_source = true;
select * from s;
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -91,4 +91,4 @@
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
with_config_map:
enable_shared_source: 'true'
streaming_use_shared_source: 'true'
22 changes: 21 additions & 1 deletion src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,31 @@ pub mod tests {
let session = frontend.session_ref();
let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);

let sql = r#"create source s_shared (v1 int) with (
connector = 'kafka',
topic = 'abc',
properties.bootstrap.server = 'localhost:29092',
) FORMAT PLAIN ENCODE JSON;"#;

frontend
.run_sql_with_session(session.clone(), sql)
.await
.unwrap();

frontend
.run_sql_with_session(session.clone(), "SET streaming_use_shared_source TO false;")
.await
.unwrap();
let sql = r#"create source s (v1 int) with (
connector = 'kafka',
topic = 'abc',
properties.bootstrap.server = 'localhost:29092',
) FORMAT PLAIN ENCODE JSON;"#;

frontend.run_sql(sql).await.unwrap();
frontend
.run_sql_with_session(session.clone(), sql)
.await
.unwrap();

let get_source = || {
let catalog_reader = session.env().catalog_reader().read_guard();
Expand All @@ -189,6 +207,8 @@ pub mod tests {
.map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
.collect();

let sql = "alter source s_shared add column v2 varchar;";
assert_eq!("Feature is not yet implemented: alter shared source\nTracking issue: https://github.com/risingwavelabs/risingwave/issues/16003", &frontend.run_sql(sql).await.unwrap_err().to_string());
let sql = "alter source s add column v2 varchar;";
frontend.run_sql(sql).await.unwrap();

Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,14 @@ pub mod tests {
let session = frontend.session_ref();
let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);

frontend.run_sql(sql).await.unwrap();
frontend
.run_sql_with_session(session.clone(), "SET streaming_use_shared_source TO false;")
.await
.unwrap();
frontend
.run_sql_with_session(session.clone(), sql)
.await
.unwrap();

let get_source = || {
let catalog_reader = session.env().catalog_reader().read_guard();
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1665,7 +1665,12 @@ pub async fn handle_create_source(
let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared = create_cdc_source_job
|| (with_properties.is_shareable_non_cdc_connector()
&& session.config().enable_shared_source());
&& session
.env()
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source());

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema)?
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl ToStream for LogicalSource {
}
SourceNodeKind::CreateMViewOrBatch => {
// Create MV on source.
// We only check enable_shared_source is true when `CREATE SOURCE`.
// We only check streaming_use_shared_source is true when `CREATE SOURCE`.
// The value does not affect the behavior of `CREATE MATERIALIZED VIEW` here.
let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared());
if use_shared_source {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ impl LocalFrontend {
res
}

/// Creates a new session
pub fn session_ref(&self) -> Arc<SessionImpl> {
self.session_user_ref(
DEFAULT_DATABASE_NAME.to_string(),
Expand Down
Loading
Loading