Skip to content

Commit

Permalink
feat: enable shared source in session variable by default, and add sy…
Browse files Browse the repository at this point in the history
…stem variable to disable

The config is similar to `streaming_use_arrangement_backfill` (session) and `stream_enable_arrangement_backfill` (system)

BTW one problem found: Currently session variables have different styles:
- with rw prefix: `rw_streaming_enable_delta_join`, `rw_batch_enable_sort_agg`, `rw_enable_share_plan`
- without rw prefix: `streaming_use_arrangement_backfill`, `batch_enable_distributed_dml`

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Oct 11, 2024
1 parent ae0dc87 commit 25cadfa
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 28 deletions.
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ user streaming_max_parallelism
user streaming_over_window_cache_policy
user streaming_parallelism
user streaming_use_arrangement_backfill
user streaming_use_shared_source
user streaming_use_snapshot_backfill
user synchronize_seqscans
user timezone
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/source_inline/kafka/shared_source.slt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
control substitution on

statement ok
SET enable_shared_source TO true;
SET streaming_use_shared_source TO true;

system ok
rpk topic create shared_source -p 4
Expand Down Expand Up @@ -86,7 +86,7 @@ internal_table.mjs --name mv_1 --type sourcebackfill

# This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor.
statement ok
SET enable_shared_source TO false;
SET streaming_use_shared_source TO false;

statement ok
create materialized view mv_2 as select * from s0;
Expand Down
14 changes: 12 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,8 +1034,7 @@ pub struct StreamingDeveloperConfig {
/// Enable arrangement backfill
/// If false, the arrangement backfill will be disabled,
/// even if session variable set.
/// If true, it will be enabled by default, but session variable
/// can override it.
/// If true, it's decided by session variable `streaming_use_arrangement_backfill` (default true)
pub enable_arrangement_backfill: bool,

#[serde(default = "default::developer::stream_high_join_amplification_threshold")]
Expand All @@ -1055,6 +1054,13 @@ pub struct StreamingDeveloperConfig {
/// A flag to allow disabling the auto schema change handling
#[serde(default = "default::developer::stream_enable_auto_schema_change")]
pub enable_auto_schema_change: bool,

#[serde(default = "default::developer::enable_shared_source")]
/// Enable shared source
/// If false, the shared source will be disabled,
/// even if session variable set.
/// If true, it's decided by session variable `streaming_use_shared_source` (default true)
pub enable_shared_source: bool,
}

/// The subsections `[batch.developer]`.
Expand Down Expand Up @@ -1939,6 +1945,10 @@ pub mod default {
true
}

pub fn enable_shared_source() -> bool {
true
}

pub fn stream_high_join_amplification_threshold() -> usize {
2048
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ pub struct SessionConfig {
///
/// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
/// will forward the data from the same source streaming job, and also backfill prior data from the external source.
#[parameter(default = false, alias = "rw_enable_shared_source")]
enable_shared_source: bool,
#[parameter(default = true, alias = "rw_enable_shared_source")]
streaming_use_shared_source: bool,

/// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
#[parameter(default = SERVER_ENCODING)]
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ stream_high_join_amplification_threshold = 2048
stream_enable_actor_tokio_metrics = false
stream_exchange_connection_pool_size = 1
stream_enable_auto_schema_change = true
stream_enable_shared_source = true

[storage]
share_buffers_sync_parallelism = 1
Expand Down
20 changes: 10 additions & 10 deletions src/frontend/planner_test/tests/testdata/input/shared_source.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
) FORMAT PLAIN ENCODE JSON;
expected_outputs: []
- with_config_map:
enable_shared_source: true
streaming_use_shared_source: true
sql: |
/* The shared source config doesn't affect table with connector. */
EXPLAIN CREATE TABLE s(x int,y int)
Expand All @@ -25,43 +25,43 @@
# We use with_config_map to control the config when CREATE SOURCE, and use another SET statement to change the config for CREATE MV
#
# batch: All 4 plans should be the same.
# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. enable_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW
# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. streaming_use_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW
- with_config_map:
enable_shared_source: false
streaming_use_shared_source: false
before:
- create_source
sql: |
SET enable_shared_source = false;
SET streaming_use_shared_source = false;
select * from s;
expected_outputs:
- batch_plan
- stream_plan
- with_config_map:
enable_shared_source: false
streaming_use_shared_source: false
before:
- create_source
sql: |
SET enable_shared_source = true;
SET streaming_use_shared_source = true;
select * from s;
expected_outputs:
- batch_plan
- stream_plan
- with_config_map:
enable_shared_source: true
streaming_use_shared_source: true
before:
- create_source
sql: |
SET enable_shared_source = false;
SET streaming_use_shared_source = false;
select * from s;
expected_outputs:
- batch_plan
- stream_plan
- with_config_map:
enable_shared_source: true
streaming_use_shared_source: true
before:
- create_source
sql: |
SET enable_shared_source = true;
SET streaming_use_shared_source = true;
select * from s;
expected_outputs:
- batch_plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
└─StreamDml { columns: [x, y, _row_id] }
└─StreamSource
with_config_map:
enable_shared_source: 'true'
streaming_use_shared_source: 'true'
- before:
- create_source
sql: |
SET enable_shared_source = false;
SET streaming_use_shared_source = false;
select * from s;
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -43,11 +43,11 @@
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id] }
with_config_map:
enable_shared_source: 'false'
streaming_use_shared_source: 'false'
- before:
- create_source
sql: |
SET enable_shared_source = true;
SET streaming_use_shared_source = true;
select * from s;
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -59,11 +59,11 @@
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id] }
with_config_map:
enable_shared_source: 'false'
streaming_use_shared_source: 'false'
- before:
- create_source
sql: |
SET enable_shared_source = false;
SET streaming_use_shared_source = false;
select * from s;
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -75,11 +75,11 @@
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
with_config_map:
enable_shared_source: 'true'
streaming_use_shared_source: 'true'
- before:
- create_source
sql: |
SET enable_shared_source = true;
SET streaming_use_shared_source = true;
select * from s;
batch_plan: |-
BatchExchange { order: [], dist: Single }
Expand All @@ -91,4 +91,4 @@
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
with_config_map:
enable_shared_source: 'true'
streaming_use_shared_source: 'true'
7 changes: 6 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1662,7 +1662,12 @@ pub async fn handle_create_source(
let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared = create_cdc_source_job
|| (with_properties.is_shareable_non_cdc_connector()
&& session.config().enable_shared_source());
&& session
.env()
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source());

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema)?
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl ToStream for LogicalSource {
}
SourceNodeKind::CreateMViewOrBatch => {
// Create MV on source.
// We only check enable_shared_source is true when `CREATE SOURCE`.
// We only check streaming_use_shared_source is true when `CREATE SOURCE`.
// The value does not affect the behavior of `CREATE MATERIALIZED VIEW` here.
let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared());
if use_shared_source {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl Configuration {

pub fn for_scale_shared_source() -> Self {
let mut conf = Self::for_scale();
conf.per_session_queries = vec!["SET ENABLE_SHARED_SOURCE = true;".into()].into();
conf.per_session_queries = vec!["SET STREAMING_USE_SHARED_SOURCE = true;".into()].into();
conf
}

Expand Down

0 comments on commit 25cadfa

Please sign in to comment.