Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support compaction group split and merge #17898

Closed
wants to merge 52 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
5ce1f4a
feat(storage): replace file_size with estimated_sstable_size:
Li0k Jul 11, 2024
537a555
feat(storage): basic split interface
Li0k Jul 25, 2024
a126f6f
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 25, 2024
5e578a3
feat(storage): support split_compaction_group_v2
Li0k Jul 31, 2024
fc0cee2
feat(storage): support group split and merge
Li0k Aug 1, 2024
63667b8
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 1, 2024
0da9a83
feat(storage): add unit for split
Li0k Aug 2, 2024
69dc63a
fix(storage): fix compile
Li0k Aug 2, 2024
8e1bf19
fix(storage): fix merge and add ut
Li0k Aug 2, 2024
3b57f46
fix(storage): bug fix and support new move_state_tables
Li0k Aug 2, 2024
4725519
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 4, 2024
4aaf0c5
feat(storage): refactor and support auto merge group
Li0k Aug 5, 2024
e10453d
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 5, 2024
3358600
fix(storage): fix sub_level partition count and split/merge condition
Li0k Aug 8, 2024
368c87c
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 8, 2024
1dcc4d5
feat(storage): add metrics for split/merge
Li0k Aug 8, 2024
033c8f3
fix(storage): reorg code
Li0k Aug 8, 2024
7f92561
refactor(storage): reorg code
Li0k Aug 8, 2024
06f08bb
refactor(storgae): remove the unused code
Li0k Aug 8, 2024
32ded2a
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 8, 2024
d4efbf3
fix(compaction): fix sub level insert hint and cover ut
Li0k Aug 9, 2024
f024d47
fix(storage): compactor iterator support key_range filter
Li0k Aug 10, 2024
ad05c11
fix(storage): split overlapping level
Li0k Aug 12, 2024
6b71753
chore(storage): add some debug log
Li0k Aug 13, 2024
d302e49
chore(storage): add log for split and merge
Li0k Aug 13, 2024
0a2e36e
chore(storage): add assert
Li0k Aug 13, 2024
2bae033
typo
Li0k Aug 13, 2024
4ab861f
fix(storage): fix commit_epoch split sst
Li0k Aug 13, 2024
29c9edf
typo
Li0k Aug 13, 2024
0f6fa7c
refactor(storage): refactor correct sst and group schedule interval
Li0k Aug 14, 2024
0ccb849
refactor(compaction): upgrade split and merge condition
Li0k Aug 19, 2024
556f9f7
rename
Li0k Aug 29, 2024
ad756a8
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Aug 29, 2024
ea57149
rebase
Li0k Sep 3, 2024
b8c66ea
fix(compaction): rewrite sub level id when merge
Li0k Sep 4, 2024
2c81d3d
typo
Li0k Sep 4, 2024
6246a14
fix(storage): fix split and merge condiction
Li0k Sep 4, 2024
32bafa7
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Sep 4, 2024
7da40e6
update grafana
Li0k Sep 4, 2024
ce1d7cc
fix(storage): fix commit_epoch correct_sst
Li0k Sep 5, 2024
a66a51b
rebase
Li0k Sep 5, 2024
0a3a91e
fix(storage): fix correct commit ssts
Li0k Sep 5, 2024
a0247fa
typo
Li0k Sep 5, 2024
5176eb3
fix merge
Li0k Sep 5, 2024
2a91d2d
fix(storage): introduce split_sst_for_commit_epoch
Li0k Sep 6, 2024
be5ecb6
fix(storage): fix merge
Li0k Sep 6, 2024
2a05daa
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Sep 11, 2024
2ebc0ef
fix(storage): fix ut
Li0k Sep 12, 2024
3e8bd6f
refactor
Li0k Sep 12, 2024
ded1709
add doc for proto
Li0k Sep 12, 2024
a6d06a5
typo
Li0k Sep 12, 2024
6719620
address comments
Li0k Sep 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

13 changes: 9 additions & 4 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3254,12 +3254,17 @@ def section_hummock_manager(outer_panels):
],
),
panels.timeseries_count(
"Move State Table Count",
"The times of move_state_table occurs",
"Compaction Group Schedule",
"The times of schedule occurs",
[
panels.target(
f"sum({table_metric('storage_move_state_table_count')}) by (group)",
"move table cg{{group}}",
f"sum({table_metric('storage_split_compaction_group_count')}) by (group)",
"split_compaction_group_count cg{{group}}",
),

panels.target(
f"sum({table_metric('storage_merge_compaction_group_count')}) by (group)",
"merge_compaction_group_count cg{{group}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ message GroupConstruct {
uint64 group_id = 4;
uint64 new_sst_start_id = 5;
CompatibilityVersion version = 6;

// The split_key is the key that the group is split by.
// When GroupConstruct with commit_epoch, split_key will be empty
// When split_key is not None, GroupConstruct tells to use split_key to check each level and split the sstable_info in the level into two groups (bounded by split_key).
// For the left sstable_info, split_key (right_exclusive=true) will be used as key_range_right.
// In the current implementation split_key always contains a table_id, vnode = 0, epoch = 0
optional bytes split_key = 7;
Li0k marked this conversation as resolved.
Show resolved Hide resolved
}

message GroupMetaChange {
Expand Down Expand Up @@ -729,6 +736,7 @@ message RiseCtlUpdateCompactionConfigRequest {
CompressionAlgorithm compression_algorithm = 17;
uint32 max_l0_compact_level_count = 18;
uint64 sst_allowed_trivial_move_min_size = 19;
uint32 split_weight_by_vnode = 20;
}
}
repeated uint64 compaction_group_ids = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ pub mod default {
}

pub fn move_table_size_limit() -> u64 {
10 * 1024 * 1024 * 1024 // 10GB
4 * 1024 * 1024 * 1024 // 4GB
}

pub fn split_group_size_limit() -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ This page is automatically generated by `./risedev generate-example-config`
| min_delta_log_num_for_hummock_version_checkpoint | The minimum delta log number a new checkpoint should compact, otherwise the checkpoint attempt is rejected. | 10 |
| min_sst_retention_time_sec | Objects within `min_sst_retention_time_sec` won't be deleted by hummock full GC, even they are dangling. | 86400 |
| min_table_split_write_throughput | If the size of one table is smaller than `min_table_split_write_throughput`, we would not split it to an single group. | 4194304 |
| move_table_size_limit | | 10737418240 |
| move_table_size_limit | | 4294967296 |
| node_num_monitor_interval_sec | | 10 |
| parallelism_control_batch_size | The number of streaming jobs per scaling operation. | 10 |
| parallelism_control_trigger_first_delay_sec | The first delay of parallelism control. | 30 |
Expand Down
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ periodic_space_reclaim_compaction_interval_sec = 3600
periodic_ttl_reclaim_compaction_interval_sec = 1800
periodic_tombstone_reclaim_compaction_interval_sec = 600
periodic_split_compact_group_interval_sec = 10
move_table_size_limit = 10737418240
move_table_size_limit = 4294967296
split_group_size_limit = 68719476736
cut_table_size_limit = 1073741824
do_not_config_object_storage_lifecycle = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ fn version_to_sstable_rows(version: HummockVersion) -> Vec<RwHummockSstable> {
for cg in version.levels.into_values() {
for level in cg.levels.into_iter().chain(cg.l0.sub_levels) {
for sst in level.table_infos {
let key_range = sst.key_range;
let key_range = &sst.key_range;
sstables.push(RwHummockSstable {
sstable_id: sst.sst_id as _,
object_id: sst.object_id as _,
Expand Down
117 changes: 64 additions & 53 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::ops::Deref;
use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::split_sst;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::compaction_group::group_split::split_sst_for_commit_epoch;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_stats::{
add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map, PbTableStatsMap,
Expand Down Expand Up @@ -160,65 +161,78 @@ impl HummockManager {
let state_table_info = &version.latest_version().state_table_info;
let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();

// Add new table
let (new_table_ids, new_compaction_group, compaction_group_manager_txn) =
let mut new_tables = None;
let mut compaction_group_manager_txn = None;
let mut compaction_group_config = None;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved inside if !matches!(new_table_fragment_info, NewTableFragmentInfo::None) and there is no need to be a Option.


if !matches!(new_table_fragment_info, NewTableFragmentInfo::None) {
let mut new_table_ids = Vec::default();
if compaction_group_manager_txn.is_none() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is always true. We can either remove the check or let init the compaction_group_manager_txn here without defining a mut variable outside.

let compaction_group_manager_guard = self.compaction_group_manager.write().await;
let config = compaction_group_manager_guard
.default_compaction_config()
.deref()
.clone();
compaction_group_config = Some(config);
compaction_group_manager_txn =
Some(CompactionGroupManager::start_owned_compaction_groups_txn(
compaction_group_manager_guard,
));
}

let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
compaction_group_manager_txn.as_mut().unwrap().insert(
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: Arc::new(compaction_group_config.as_ref().unwrap().clone()),
},
);

match new_table_fragment_info {
NewTableFragmentInfo::Normal {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that after this PR, any newly created streaming job will be put in a separate compaction group by default?

mv_table_id,
internal_table_ids,
} => {
let mut new_table_ids = HashMap::new();
on_handle_add_new_table(
state_table_info,
&internal_table_ids,
StaticCompactionGroupId::StateDefault as u64,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
if !internal_table_ids.is_empty() {
on_handle_add_new_table(
state_table_info,
&internal_table_ids,
new_compaction_group_id,
&mut table_compaction_group_mapping,
)?;

new_table_ids.extend(internal_table_ids);
}

on_handle_add_new_table(
state_table_info,
&mv_table_id,
StaticCompactionGroupId::MaterializedView as u64,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
(new_table_ids, None, None)
if let Some(mv_table_id) = mv_table_id {
on_handle_add_new_table(
state_table_info,
&[mv_table_id],
new_compaction_group_id,
&mut table_compaction_group_mapping,
)?;
new_table_ids.push(mv_table_id);
}
}
NewTableFragmentInfo::NewCompactionGroup { table_ids } => {
let compaction_group_manager_guard =
self.compaction_group_manager.write().await;
let compaction_group_config =
compaction_group_manager_guard.default_compaction_config();
let mut compaction_group_manager =
CompactionGroupManager::start_owned_compaction_groups_txn(
compaction_group_manager_guard,
);
let mut new_table_ids = HashMap::new();
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config.clone(),
},
);

on_handle_add_new_table(
state_table_info,
&table_ids,
new_compaction_group_id,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
(
new_table_ids,
Some((new_compaction_group_id, (*compaction_group_config).clone())),
Some(compaction_group_manager),
)
new_table_ids.extend(table_ids.iter().cloned());
}
NewTableFragmentInfo::None => (HashMap::new(), None, None),
};
NewTableFragmentInfo::None => unreachable!(),
}

new_tables = Some((
new_table_ids,
new_compaction_group_id,
compaction_group_config.as_ref().unwrap().clone(),
));
}

let commit_sstables = self
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
Expand All @@ -230,9 +244,8 @@ impl HummockManager {
committed_epoch,
&tables_to_commit,
is_visible_table_committed_epoch,
new_compaction_group,
commit_sstables,
&new_table_ids,
new_tables,
new_table_watermarks,
change_log_delta,
);
Expand Down Expand Up @@ -375,7 +388,7 @@ impl HummockManager {
}
}

async fn correct_commit_ssts(
pub(crate) async fn correct_commit_ssts(
&self,
sstables: Vec<LocalSstableInfo>,
table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
Expand Down Expand Up @@ -424,7 +437,6 @@ impl HummockManager {
.push(sst.sst_info);
break;
}

let origin_sst_size = sst.sst_info.sst_size;
let new_sst_size = match_ids
.iter()
Expand All @@ -434,7 +446,8 @@ impl HummockManager {
})
.sum();

let branch_sst = split_sst(
// let split_key = build_split_key_with_table_id(match_ids.last().unwrap() + 1);
let branch_sst = split_sst_for_commit_epoch(
&mut sst.sst_info,
&mut new_sst_id,
origin_sst_size - new_sst_size,
Expand All @@ -458,7 +471,6 @@ fn on_handle_add_new_table(
table_ids: impl IntoIterator<Item = &TableId>,
compaction_group_id: CompactionGroupId,
table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
) -> Result<()> {
for table_id in table_ids {
if let Some(info) = state_table_info.info().get(table_id) {
Expand All @@ -468,7 +480,6 @@ fn on_handle_add_new_table(
)));
}
table_compaction_group_mapping.insert(*table_id, compaction_group_id);
new_table_ids.insert(*table_id, compaction_group_id);
}

Ok(())
Expand Down
Loading