Skip to content

Commit

Permalink
Add recluster block size setting
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Sep 22, 2023
1 parent 14819b3 commit 7d8a4ef
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 5 deletions.
3 changes: 2 additions & 1 deletion src/query/service/tests/it/storages/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ async fn run_table_tests(
actual_lines.retain(|&item| {
!(item.contains("max_threads")
|| item.contains("max_memory_usage")
|| item.contains("max_storage_io_requests"))
|| item.contains("max_storage_io_requests")
|| item.contains("recluster_block_size"))
});
}
for line in actual_lines {
Expand Down
17 changes: 16 additions & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl DefaultSettings {
Ok(Arc::clone(DEFAULT_SETTINGS.get_or_try_init(|| -> Result<Arc<DefaultSettings>> {
let num_cpus = Self::num_cpus();
let max_memory_usage = Self::max_memory_usage()?;
let recluster_block_size = Self::recluster_block_size()?;
let default_max_storage_io_requests = Self::storage_io_requests(num_cpus);

let default_settings = HashMap::from([
Expand Down Expand Up @@ -420,7 +421,13 @@ impl DefaultSettings {
desc: "Enables recording query profile",
possible_values: None,
display_in_show_settings: true,
}),
}),
("recluster_block_size", DefaultSettingValue {
value: UserSettingValue::UInt64(recluster_block_size),
desc: "Sets the maximum byte size of blocks for recluster",
possible_values: None,
display_in_show_settings: true,
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down Expand Up @@ -482,6 +489,14 @@ impl DefaultSettings {
})
}

fn recluster_block_size() -> Result<u64> {
let max_memory_usage = Self::max_memory_usage()?;
// The sort merge consumes more than twice as much memory,
// so the block size is set relatively conservatively here.
let recluster_block_size = max_memory_usage * 35 / 100;
Ok(recluster_block_size)
}

pub fn has_setting(key: &str) -> Result<bool> {
Ok(Self::instance()?.settings.contains_key(key))
}
Expand Down
9 changes: 9 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,10 +483,19 @@ impl Settings {
pub fn get_recluster_timeout_secs(&self) -> Result<u64> {
self.try_get_u64("recluster_timeout_secs")
}

pub fn set_recluster_timeout_secs(&self, val: u64) -> Result<()> {
self.try_set_u64("recluster_timeout_secs", val)
}

pub fn get_recluster_block_size(&self) -> Result<u64> {
self.try_get_u64("recluster_block_size")
}

pub fn set_recluster_block_size(&self, val: u64) -> Result<()> {
self.try_set_u64("recluster_block_size", val)
}

pub fn get_enable_refresh_aggregating_index_after_write(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_refresh_aggregating_index_after_write")? != 0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ impl ReclusterMutator {
}

let mem_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?;
let max_memory_usage = self.ctx.get_settings().get_max_memory_usage()? as usize;
let memory_threshold =
cmp::min(mem_info.avail as usize * 1024, max_memory_usage) * 45 / 100;
let recluster_block_size = self.ctx.get_settings().get_recluster_block_size()? as usize;
let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 40 / 100);

let mut remained_blocks = Vec::new();
let mut selected = false;
Expand Down

0 comments on commit 7d8a4ef

Please sign in to comment.