Skip to content

Commit

Permalink
chore(storage): add enable_trivial_move and `enable_check_task_over…
Browse files Browse the repository at this point in the history
…lap` (#14801)

Co-authored-by: Patrick Huang <[email protected]>
  • Loading branch information
Li0k and hzxa21 authored Jan 29, 2024
1 parent 55aaa92 commit 7e937c2
Show file tree
Hide file tree
Showing 23 changed files with 342 additions and 60 deletions.
14 changes: 14 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,12 @@ pub struct MetaDeveloperConfig {
/// in the meta node.
#[serde(default = "default::developer::meta_cached_traces_memory_limit_bytes")]
pub cached_traces_memory_limit_bytes: usize,

/// Compaction picker config
#[serde(default = "default::developer::enable_trivial_move")]
pub enable_trivial_move: bool,
#[serde(default = "default::developer::enable_check_task_level_overlap")]
pub enable_check_task_level_overlap: bool,
}

/// The section `[server]` in `risingwave.toml`.
Expand Down Expand Up @@ -1422,6 +1428,14 @@ pub mod default {
pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
64 << 20 // 64MB
}

pub fn enable_trivial_move() -> bool {
true
}

pub fn enable_check_task_level_overlap() -> bool {
false
}
}

pub use crate::system_param::default as system;
Expand Down
2 changes: 2 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ enable_emergency_picker = true
[meta.developer]
meta_cached_traces_num = 256
meta_cached_traces_memory_limit_bytes = 134217728
meta_enable_trivial_move = true
meta_enable_check_task_level_overlap = false

[batch]
enable_barrier_read = false
Expand Down
5 changes: 5 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.meta
.developer
.cached_traces_memory_limit_bytes,
enable_trivial_move: config.meta.developer.enable_trivial_move,
enable_check_task_level_overlap: config
.meta
.developer
.enable_check_task_level_overlap,
},
config.system.into_init_system_params(),
)
Expand Down
40 changes: 30 additions & 10 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use risingwave_pb::hummock::compact_task::{self, TaskType};

mod picker;
pub mod selector;

use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
Expand All @@ -38,7 +37,9 @@ use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlap
use crate::hummock::compaction::picker::CompactionInput;
use crate::hummock::level_handler::LevelHandler;
use crate::hummock::model::CompactionGroup;
use crate::MetaOpts;

#[derive(Clone)]
pub struct CompactStatus {
pub compaction_group_id: CompactionGroupId,
pub level_handlers: Vec<LevelHandler>,
Expand All @@ -60,15 +61,6 @@ impl PartialEq for CompactStatus {
}
}

impl Clone for CompactStatus {
fn clone(&self) -> Self {
Self {
compaction_group_id: self.compaction_group_id,
level_handlers: self.level_handlers.clone(),
}
}
}

pub struct CompactionTask {
pub input: CompactionInput,
pub base_level: usize,
Expand Down Expand Up @@ -104,6 +96,7 @@ impl CompactStatus {
stats: &mut LocalSelectorStatistic,
selector: &mut Box<dyn CompactionSelector>,
table_id_to_options: HashMap<u32, TableOption>,
developer_config: Arc<CompactionDeveloperConfig>,
) -> Option<CompactionTask> {
// When we compact the files, we must make the result of compaction meet the following
// conditions, for any user key, the epoch of it in the file existing in the lower
Expand All @@ -115,6 +108,7 @@ impl CompactStatus {
&mut self.level_handlers,
stats,
table_id_to_options,
developer_config,
)
}

Expand Down Expand Up @@ -216,3 +210,29 @@ pub fn get_compression_algorithm(
compaction_config.compression_algorithm[idx].clone()
}
}

pub struct CompactionDeveloperConfig {
/// l0 picker whether to select trivial move task
pub enable_trivial_move: bool,

/// l0 multi level picker whether to check the overlap accuracy between sub levels
pub enable_check_task_level_overlap: bool,
}

impl CompactionDeveloperConfig {
pub fn new_from_meta_opts(opts: &MetaOpts) -> Self {
Self {
enable_trivial_move: opts.enable_trivial_move,
enable_check_task_level_overlap: opts.enable_check_task_level_overlap,
}
}
}

impl Default for CompactionDeveloperConfig {
fn default() -> Self {
Self {
enable_trivial_move: true,
enable_check_task_level_overlap: true,
}
}
}
5 changes: 3 additions & 2 deletions src/meta/src/hummock/compaction/overlap_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
// limitations under the License.

use std::cmp;
use std::fmt::Debug;
use std::ops::Range;

use itertools::Itertools;
use risingwave_hummock_sdk::key_range::KeyRangeCommon;
use risingwave_hummock_sdk::KeyComparator;
use risingwave_pb::hummock::{KeyRange, SstableInfo};

pub trait OverlapInfo {
pub trait OverlapInfo: Debug {
fn check_overlap(&self, a: &SstableInfo) -> bool;
fn check_multiple_overlap(&self, others: &[SstableInfo]) -> Range<usize>;
fn check_multiple_include(&self, others: &[SstableInfo]) -> Range<usize>;
Expand Down Expand Up @@ -67,7 +68,7 @@ pub trait OverlapStrategy: Send + Sync {
fn create_overlap_info(&self) -> Box<dyn OverlapInfo>;
}

#[derive(Default)]
#[derive(Default, Debug)]
pub struct RangeOverlapInfo {
target_range: Option<KeyRange>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ use super::{
CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
ValidationRuleType,
};
use crate::hummock::compaction::create_overlap_strategy;
use crate::hummock::compaction::picker::TrivialMovePicker;
use crate::hummock::compaction::{create_overlap_strategy, CompactionDeveloperConfig};
use crate::hummock::level_handler::LevelHandler;

pub struct LevelCompactionPicker {
target_level: usize,
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
developer_config: Arc<CompactionDeveloperConfig>,
}

impl CompactionPicker for LevelCompactionPicker {
Expand Down Expand Up @@ -87,23 +88,30 @@ impl CompactionPicker for LevelCompactionPicker {

impl LevelCompactionPicker {
#[cfg(test)]
pub fn new(target_level: usize, config: Arc<CompactionConfig>) -> LevelCompactionPicker {
pub fn new(
target_level: usize,
config: Arc<CompactionConfig>,
developer_config: Arc<CompactionDeveloperConfig>,
) -> LevelCompactionPicker {
LevelCompactionPicker {
target_level,
compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
config,
developer_config,
}
}

pub fn new_with_validator(
target_level: usize,
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
developer_config: Arc<CompactionDeveloperConfig>,
) -> LevelCompactionPicker {
LevelCompactionPicker {
target_level,
config,
compaction_task_validator,
developer_config,
}
}

Expand All @@ -114,6 +122,10 @@ impl LevelCompactionPicker {
level_handlers: &[LevelHandler],
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
if !self.developer_config.enable_trivial_move {
return None;
}

let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
let trivial_move_picker =
TrivialMovePicker::new(0, self.target_level, overlap_strategy.clone());
Expand Down Expand Up @@ -148,6 +160,7 @@ impl LevelCompactionPicker {
// The maximum number of sub_level compact level per task
self.config.level0_max_compact_file_number,
overlap_strategy.clone(),
self.developer_config.enable_check_task_level_overlap,
);

let mut max_vnode_partition_idx = 0;
Expand Down Expand Up @@ -264,7 +277,9 @@ pub mod tests {
use super::*;
use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
use crate::hummock::compaction::selector::tests::*;
use crate::hummock::compaction::{CompactionMode, TierCompactionPicker};
use crate::hummock::compaction::{
CompactionDeveloperConfig, CompactionMode, TierCompactionPicker,
};

fn create_compaction_picker_for_test() -> LevelCompactionPicker {
let config = Arc::new(
Expand All @@ -273,7 +288,7 @@ pub mod tests {
.level0_sub_level_compact_level_count(1)
.build(),
);
LevelCompactionPicker::new(1, config)
LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()))
}

#[test]
Expand Down Expand Up @@ -373,7 +388,8 @@ pub mod tests {
.level0_sub_level_compact_level_count(1)
.build(),
);
let mut picker = LevelCompactionPicker::new(1, config);
let mut picker =
LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));

let levels = vec![Level {
level_idx: 1,
Expand Down Expand Up @@ -497,7 +513,11 @@ pub mod tests {
.max_bytes_for_level_multiplier(1)
.level0_sub_level_compact_level_count(2)
.build();
let mut picker = LevelCompactionPicker::new(1, Arc::new(config));
let mut picker = LevelCompactionPicker::new(
1,
Arc::new(config),
Arc::new(CompactionDeveloperConfig::default()),
);

let mut levels = Levels {
levels: vec![Level {
Expand Down Expand Up @@ -571,7 +591,8 @@ pub mod tests {
);
// Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION.
// So all sub-levels are included to make write amplification < MAX_WRITE_AMPLIFICATION.
let mut picker = LevelCompactionPicker::new(1, config);
let mut picker =
LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
Expand All @@ -597,7 +618,8 @@ pub mod tests {
.level0_sub_level_compact_level_count(1)
.build(),
);
let mut picker = LevelCompactionPicker::new(1, config);
let mut picker =
LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
Expand Down Expand Up @@ -663,7 +685,11 @@ pub mod tests {

// Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION.
// But stopped by pending sub-level when trying to include more sub-levels.
let mut picker = LevelCompactionPicker::new(1, config.clone());
let mut picker = LevelCompactionPicker::new(
1,
config.clone(),
Arc::new(CompactionDeveloperConfig::default()),
);
let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
assert!(ret.is_none());

Expand All @@ -673,7 +699,8 @@ pub mod tests {
}

// No more pending sub-level so we can get a task now.
let mut picker = LevelCompactionPicker::new(1, config);
let mut picker =
LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
Expand Down Expand Up @@ -706,7 +733,8 @@ pub mod tests {
.build(),
);

let mut picker = LevelCompactionPicker::new(1, config);
let mut picker =
LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,25 @@ use super::{
CompactionInput, CompactionPicker, CompactionTaskValidator, LevelCompactionPicker,
LocalPickerStatistic, TierCompactionPicker,
};
use crate::hummock::compaction::CompactionDeveloperConfig;
use crate::hummock::level_handler::LevelHandler;

pub struct EmergencyCompactionPicker {
target_level: usize,
config: Arc<CompactionConfig>,
developer_config: Arc<CompactionDeveloperConfig>,
}

impl EmergencyCompactionPicker {
pub fn new(target_level: usize, config: Arc<CompactionConfig>) -> Self {
pub fn new(
target_level: usize,
config: Arc<CompactionConfig>,
developer_config: Arc<CompactionDeveloperConfig>,
) -> Self {
Self {
target_level,
config,
developer_config,
}
}

Expand All @@ -48,6 +55,7 @@ impl EmergencyCompactionPicker {
self.target_level,
self.config.clone(),
unused_validator.clone(),
self.developer_config.clone(),
);

if let Some(ret) =
Expand Down
Loading

0 comments on commit 7e937c2

Please sign in to comment.