From 7d8a4ef52ae48e36f9ec3e82450eed966b3e8556 Mon Sep 17 00:00:00 2001 From: zhyass <34016424+zhyass@users.noreply.github.com> Date: Thu, 21 Sep 2023 17:17:25 +0800 Subject: [PATCH] Add recluster block size setting --- src/query/service/tests/it/storages/system.rs | 3 ++- src/query/settings/src/settings_default.rs | 17 ++++++++++++++++- .../settings/src/settings_getter_setter.rs | 9 +++++++++ .../operations/mutation/recluster_mutator.rs | 5 ++--- 4 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/query/service/tests/it/storages/system.rs b/src/query/service/tests/it/storages/system.rs index dd767dc8ef897..ac6e57cf1036a 100644 --- a/src/query/service/tests/it/storages/system.rs +++ b/src/query/service/tests/it/storages/system.rs @@ -83,7 +83,8 @@ async fn run_table_tests( actual_lines.retain(|&item| { !(item.contains("max_threads") || item.contains("max_memory_usage") - || item.contains("max_storage_io_requests")) + || item.contains("max_storage_io_requests") + || item.contains("recluster_block_size")) }); } for line in actual_lines { diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index cdd0954024de9..f1a8e612bd8a3 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -41,6 +41,7 @@ impl DefaultSettings { Ok(Arc::clone(DEFAULT_SETTINGS.get_or_try_init(|| -> Result> { let num_cpus = Self::num_cpus(); let max_memory_usage = Self::max_memory_usage()?; + let recluster_block_size = Self::recluster_block_size()?; let default_max_storage_io_requests = Self::storage_io_requests(num_cpus); let default_settings = HashMap::from([ @@ -420,7 +421,13 @@ impl DefaultSettings { desc: "Enables recording query profile", possible_values: None, display_in_show_settings: true, - }), + }), + ("recluster_block_size", DefaultSettingValue { + value: UserSettingValue::UInt64(recluster_block_size), + desc: "Sets the maximum byte size of blocks for recluster", + possible_values: None, + display_in_show_settings: true, + }), ]); Ok(Arc::new(DefaultSettings { @@ -482,6 +489,14 @@ impl DefaultSettings { }) } + fn recluster_block_size() -> Result { + let max_memory_usage = Self::max_memory_usage()?; + // The sort merge consumes more than twice as much memory, + // so the block size is set relatively conservatively here. + let recluster_block_size = max_memory_usage * 35 / 100; + Ok(recluster_block_size) + } + pub fn has_setting(key: &str) -> Result { Ok(Self::instance()?.settings.contains_key(key)) } diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index d15f934eb0bb8..743174207dbf6 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -483,10 +483,19 @@ impl Settings { pub fn get_recluster_timeout_secs(&self) -> Result { self.try_get_u64("recluster_timeout_secs") } + pub fn set_recluster_timeout_secs(&self, val: u64) -> Result<()> { self.try_set_u64("recluster_timeout_secs", val) } + pub fn get_recluster_block_size(&self) -> Result { + self.try_get_u64("recluster_block_size") + } + + pub fn set_recluster_block_size(&self, val: u64) -> Result<()> { + self.try_set_u64("recluster_block_size", val) + } + pub fn get_enable_refresh_aggregating_index_after_write(&self) -> Result { Ok(self.try_get_u64("enable_refresh_aggregating_index_after_write")? != 0) } diff --git a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs index a77f1a50b83dc..c37e49abd69fa 100644 --- a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs @@ -101,9 +101,8 @@ impl ReclusterMutator { } let mem_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?; - let max_memory_usage = self.ctx.get_settings().get_max_memory_usage()? as usize; - let memory_threshold = - cmp::min(mem_info.avail as usize * 1024, max_memory_usage) * 45 / 100; + let recluster_block_size = self.ctx.get_settings().get_recluster_block_size()? as usize; + let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 40 / 100); let mut remained_blocks = Vec::new(); let mut selected = false;