Skip to content

Commit

Permalink
feat(compaction): support merge compaction group (#18188)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Sep 10, 2024
1 parent 952962c commit 99d6121
Show file tree
Hide file tree
Showing 18 changed files with 1,631 additions and 212 deletions.
15 changes: 15 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,19 @@ message GroupTableChange {

message GroupDestroy {}

message GroupMerge {
uint64 left_group_id = 1;
uint64 right_group_id = 2;
}

message GroupDelta {
oneof delta_type {
IntraLevelDelta intra_level = 1;
GroupConstruct group_construct = 2;
GroupDestroy group_destroy = 3;
GroupMetaChange group_meta_change = 4 [deprecated = true];
GroupTableChange group_table_change = 5 [deprecated = true];
GroupMerge group_merge = 6;
}
}

Expand Down Expand Up @@ -744,6 +750,7 @@ message PinVersionResponse {
message SplitCompactionGroupRequest {
uint64 group_id = 1;
repeated uint32 table_ids = 2;
uint32 partition_vnode_count = 3;
}

message SplitCompactionGroupResponse {
Expand Down Expand Up @@ -839,6 +846,13 @@ message GetVersionByEpochResponse {
HummockVersion version = 1;
}

message MergeCompactionGroupRequest {
uint64 left_group_id = 1;
uint64 right_group_id = 2;
}

message MergeCompactionGroupResponse {}

service HummockManagerService {
rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse);
rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse);
Expand Down Expand Up @@ -880,6 +894,7 @@ service HummockManagerService {
rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse);
rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse);
rpc GetVersionByEpoch(GetVersionByEpochRequest) returns (GetVersionByEpochResponse);
rpc MergeCompactionGroup(MergeCompactionGroupRequest) returns (MergeCompactionGroupResponse);
}

message CompactionConfig {
Expand Down
15 changes: 14 additions & 1 deletion src/ctl/src/cmd_impl/hummock/compaction_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ pub async fn split_compaction_group(
context: &CtlContext,
group_id: CompactionGroupId,
table_ids_to_new_group: &[StateTableId],
partition_vnode_count: u32,
) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let new_group_id = meta_client
.split_compaction_group(group_id, table_ids_to_new_group)
.split_compaction_group(group_id, table_ids_to_new_group, partition_vnode_count)
.await?;
println!(
"Succeed: split compaction group {}. tables {:#?} are moved to new group {}.",
Expand Down Expand Up @@ -284,3 +285,15 @@ pub async fn cancel_compact_task(context: &CtlContext, task_id: u64) -> anyhow::

Ok(())
}

pub async fn merge_compaction_group(
context: &CtlContext,
left_group_id: CompactionGroupId,
right_group_id: CompactionGroupId,
) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
meta_client
.merge_compaction_group(left_group_id, right_group_id)
.await?;
Ok(())
}
25 changes: 23 additions & 2 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ enum HummockCommands {
compaction_group_id: u64,
#[clap(long, value_delimiter = ',')]
table_ids: Vec<u32>,
#[clap(long, default_value_t = 0)]
partition_vnode_count: u32,
},
/// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
PauseVersionCheckpoint,
Expand Down Expand Up @@ -340,6 +342,12 @@ enum HummockCommands {
#[clap(long)]
record_hybrid_fetch_threshold_ms: Option<u32>,
},
MergeCompactionGroup {
#[clap(long)]
left_group_id: u64,
#[clap(long)]
right_group_id: u64,
},
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -711,9 +719,15 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Hummock(HummockCommands::SplitCompactionGroup {
compaction_group_id,
table_ids,
partition_vnode_count,
}) => {
cmd_impl::hummock::split_compaction_group(context, compaction_group_id, &table_ids)
.await?;
cmd_impl::hummock::split_compaction_group(
context,
compaction_group_id,
&table_ids,
partition_vnode_count,
)
.await?;
}
Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
cmd_impl::hummock::pause_version_checkpoint(context).await?;
Expand Down Expand Up @@ -790,6 +804,13 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
)
.await?
}
Commands::Hummock(HummockCommands::MergeCompactionGroup {
left_group_id,
right_group_id,
}) => {
cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
.await?
}
Commands::Table(TableCommands::Scan {
mv_name,
data_dir,
Expand Down
13 changes: 12 additions & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let new_group_id = self
.hummock_manager
.split_compaction_group(req.group_id, &req.table_ids)
.split_compaction_group(req.group_id, &req.table_ids, req.partition_vnode_count)
.await?;
Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
}
Expand Down Expand Up @@ -716,6 +716,17 @@ impl HummockManagerService for HummockServiceImpl {
version: Some(version.to_protobuf()),
}))
}

async fn merge_compaction_group(
&self,
request: Request<MergeCompactionGroupRequest>,
) -> Result<Response<MergeCompactionGroupResponse>, Status> {
let req = request.into_inner();
self.hummock_manager
.merge_compaction_group(req.left_group_id, req.right_group_id)
.await?;
Ok(Response::new(MergeCompactionGroupResponse {}))
}
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ impl HummockManager {
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
{
for group_deltas in version_delta.group_deltas.values() {
let summary = summarize_group_deltas(group_deltas);
for (group_id, group_deltas) in &version_delta.group_deltas {
let summary = summarize_group_deltas(group_deltas, *group_id);
object_sizes.extend(
summary
.insert_table_infos
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ use crate::model::{
type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;

impl CompactionGroupManager {
pub(super) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
let default_config = match env.opts.compaction_config.as_ref() {
None => CompactionConfigBuilder::new().build(),
Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
};
Self::new_with_config(env, default_config).await
}

pub(super) async fn new_with_config(
pub(crate) async fn new_with_config(
env: &MetaSrvEnv,
default_config: CompactionConfig,
) -> Result<CompactionGroupManager> {
Expand Down Expand Up @@ -428,24 +428,6 @@ impl HummockManager {
results
}

/// Splits a compaction group into two. The new one will contain `table_ids`.
/// Returns the newly created compaction group id.
pub async fn split_compaction_group(
&self,
parent_group_id: CompactionGroupId,
table_ids: &[StateTableId],
) -> Result<CompactionGroupId> {
let result = self
.move_state_table_to_compaction_group(
parent_group_id,
table_ids,
self.env.opts.partition_vnode_count,
)
.await?;

Ok(result)
}

/// move some table to another compaction-group. Create a new compaction group if it does not
/// exist.
pub async fn move_state_table_to_compaction_group(
Expand Down Expand Up @@ -651,7 +633,7 @@ impl HummockManager {
infos
}

pub(super) async fn initial_compaction_group_config_after_load(
pub(crate) async fn initial_compaction_group_config_after_load(
&self,
versioning_guard: &Versioning,
compaction_group_manager: &mut CompactionGroupManager,
Expand All @@ -675,7 +657,7 @@ impl HummockManager {
/// 1. initialize default static compaction group.
/// 2. register new table to new compaction group.
/// 3. move existent table to new compaction group.
pub(super) struct CompactionGroupManager {
pub(crate) struct CompactionGroupManager {
compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
default_config: Arc<CompactionConfig>,
/// Tables that write limit is trigger for.
Expand Down Expand Up @@ -709,15 +691,15 @@ impl CompactionGroupManager {
}

/// Tries to get compaction group config for `compaction_group_id`.
pub(super) fn try_get_compaction_group_config(
pub(crate) fn try_get_compaction_group_config(
&self,
compaction_group_id: CompactionGroupId,
) -> Option<CompactionGroup> {
self.compaction_groups.get(&compaction_group_id).cloned()
}

/// Tries to get compaction group config for `compaction_group_id`.
pub(super) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
self.default_config.clone()
}
}
Expand Down Expand Up @@ -814,15 +796,15 @@ impl<'a> CompactionGroupTransaction<'a> {
}

/// Tries to get compaction group config for `compaction_group_id`.
pub(super) fn try_get_compaction_group_config(
pub(crate) fn try_get_compaction_group_config(
&self,
compaction_group_id: CompactionGroupId,
) -> Option<&CompactionGroup> {
self.get(&compaction_group_id)
}

/// Removes stale group configs.
fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
let stale_group = self
.tree_ref()
.keys()
Expand All @@ -837,7 +819,7 @@ impl<'a> CompactionGroupTransaction<'a> {
}
}

pub(super) fn update_compaction_config(
pub(crate) fn update_compaction_config(
&mut self,
compaction_group_ids: &[CompactionGroupId],
config_to_update: &[MutableConfig],
Expand Down
Loading

0 comments on commit 99d6121

Please sign in to comment.