diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index b1e091d4bb05..e1f0c8fd29d5 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -151,7 +151,9 @@ impl OptimizeTableInterpreter { .compact_blocks(self.ctx.clone(), self.plan.limit) .await?; - let is_distributed = !self.ctx.get_cluster().is_empty(); + let is_distributed = (!self.ctx.get_cluster().is_empty()) + && self.ctx.get_settings().get_enable_distributed_compact()?; + let catalog_info = catalog.info(); let mut compact_pipeline = if let Some((parts, snapshot)) = res { let physical_plan = Self::build_physical_plan( diff --git a/src/query/service/tests/it/storages/testdata/settings_table.txt b/src/query/service/tests/it/storages/testdata/settings_table.txt index 5b82582aa89e..068e3a102019 100644 --- a/src/query/service/tests/it/storages/testdata/settings_table.txt +++ b/src/query/service/tests/it/storages/testdata/settings_table.txt @@ -10,6 +10,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System | 'enable_aggregating_index_scan' | '1' | '1' | 'SESSION' | 'Enable scanning aggregating index data while querying.' | 'UInt64' | | 'enable_bushy_join' | '0' | '0' | 'SESSION' | 'Enables generating a bushy join plan with the optimizer.' | 'UInt64' | | 'enable_cbo' | '1' | '1' | 'SESSION' | 'Enables cost-based optimization.' | 'UInt64' | +| 'enable_distributed_compact' | '1' | '1' | 'SESSION' | 'Enable distributed execution of table compaction.' | 'UInt64' | | 'enable_distributed_copy_into' | '0' | '0' | 'SESSION' | 'Enable distributed execution of copy into.' | 'UInt64' | | 'enable_distributed_replace_into' | '0' | '0' | 'SESSION' | 'Enable distributed execution of replace into.' | 'UInt64' | | 'enable_dphyp' | '1' | '1' | 'SESSION' | 'Enables dphyp join order algorithm.' | 'UInt64' | diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index cdd0954024de..6bbeac48824b 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -355,6 +355,12 @@ impl DefaultSettings { possible_values: None, display_in_show_settings: true, }), + ("enable_distributed_compact", DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "Enable distributed execution of table compaction.", + possible_values: None, + display_in_show_settings: true, + }), ("enable_aggregating_index_scan", DefaultSettingValue { value: UserSettingValue::UInt64(1), desc: "Enable scanning aggregating index data while querying.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index d15f934eb0bb..35be5066cf2c 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -420,6 +420,10 @@ impl Settings { Ok(self.try_get_u64("enable_distributed_replace_into")? != 0) } + pub fn get_enable_distributed_compact(&self) -> Result { + Ok(self.try_get_u64("enable_distributed_compact")? != 0) + } + pub fn set_enable_distributed_replace(&self, val: bool) -> Result<()> { self.try_set_u64("enable_distributed_replace_into", u64::from(val)) } diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs index c8e4d55cb095..0b1c80186673 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs @@ -65,7 +65,6 @@ impl MergeIntoNotMatchedProcessor { for (idx, item) in unmatched.iter().enumerate() { let eval_projections: HashSet = (input_schema.num_fields()..input_schema.num_fields() + item.2.len()).collect(); - println!("data_schema: {:?}", item.0.clone()); data_schemas.insert(idx, item.0.clone()); ops.push(InsertDataBlockMutation { op: BlockOperator::Map {