diff --git a/src/meta/src/hummock/compactor_manager.rs b/src/meta/src/hummock/compactor_manager.rs index 252f92c404015..f8d8ae7f23c4e 100644 --- a/src/meta/src/hummock/compactor_manager.rs +++ b/src/meta/src/hummock/compactor_manager.rs @@ -476,16 +476,18 @@ impl CompactorManager { #[cfg(test)] mod tests { + use std::sync::Arc; use std::time::Duration; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_pb::hummock::CompactTaskProgress; + use risingwave_rpc_client::HummockMetaClient; use crate::hummock::compaction::selector::default_compaction_selector; use crate::hummock::test_utils::{ add_ssts, register_table_ids_to_compaction_group, setup_compute_env, }; - use crate::hummock::CompactorManager; + use crate::hummock::{CompactorManager, MockHummockMetaClient}; #[tokio::test] async fn test_compactor_manager() { @@ -493,6 +495,9 @@ mod tests { let (env, context_id) = { let (env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; let context_id = worker_node.id; + let hummock_meta_client: Arc = Arc::new( + MockHummockMetaClient::new(hummock_manager.clone(), worker_node.id), + ); let compactor_manager = hummock_manager.compactor_manager_ref_for_test(); register_table_ids_to_compaction_group( hummock_manager.as_ref(), @@ -500,7 +505,8 @@ mod tests { StaticCompactionGroupId::StateDefault.into(), ) .await; - let _sst_infos = add_ssts(1, hummock_manager.as_ref(), context_id).await; + let _sst_infos = + add_ssts(1, hummock_manager.as_ref(), hummock_meta_client.clone()).await; let _receiver = compactor_manager.add_compactor(context_id); hummock_manager .get_compact_task( diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index e92e91c8503d0..8c021509dcbb2 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -70,37 +70,6 @@ pub struct CommitEpochInfo { } impl HummockManager { - #[cfg(any(test, feature = "test"))] - pub async fn commit_epoch_for_test( - &self, - epoch: u64, - sstables: Vec>, - sst_to_context: HashMap, - ) -> Result<()> { - let tables = self - .versioning - .read() - .await - .current_version - .state_table_info - .info() - .keys() - .cloned() - .collect(); - let info = CommitEpochInfo { - sstables: sstables.into_iter().map(Into::into).collect(), - new_table_watermarks: HashMap::new(), - sst_to_context, - new_table_fragment_info: NewTableFragmentInfo::None, - change_log_delta: HashMap::new(), - committed_epoch: epoch, - tables_to_commit: tables, - is_visible_table_committed_epoch: true, - }; - self.commit_epoch(info).await?; - Ok(()) - } - /// Caller should ensure `epoch` > `max_committed_epoch` pub async fn commit_epoch( &self, diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 97a99945bcf41..596c36857907f 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -331,16 +331,24 @@ mod tests { use std::time::Duration; use itertools::Itertools; + use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::HummockSstableObjectId; + use risingwave_rpc_client::HummockMetaClient; use super::ResponseEvent; use crate::hummock::test_utils::{add_test_tables, setup_compute_env}; + use crate::hummock::MockHummockMetaClient; use crate::MetaOpts; #[tokio::test] async fn test_full_gc() { let (mut env, hummock_manager, cluster_manager, worker_node) = setup_compute_env(80).await; let context_id = worker_node.id; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); let compactor_manager = hummock_manager.compactor_manager_ref_for_test(); // Use smaller spin interval to accelerate test. env.opts = Arc::new(MetaOpts { @@ -426,7 +434,12 @@ mod tests { ); // All committed SST ids should be excluded from GC. - let sst_infos = add_test_tables(hummock_manager.as_ref(), context_id).await; + let sst_infos = add_test_tables( + hummock_manager.as_ref(), + hummock_meta_client.clone(), + compaction_group_id, + ) + .await; let committed_object_ids = sst_infos .into_iter() .flatten() diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index d0183d84d23c5..09d43bf5fc72c 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -13,38 +13,41 @@ // limitations under the License. #![cfg(test)] - -use std::borrow::Borrow; use std::cmp::Ordering; use std::collections::HashMap; +use std::sync::Arc; use itertools::Itertools; use prometheus::Registry; use risingwave_common::catalog::TableId; +use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH}; use risingwave_hummock_sdk::compact::compact_task_to_string; use risingwave_hummock_sdk::compact_task::CompactTask; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ssts; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; +use risingwave_hummock_sdk::key::{gen_key_from_str, FullKey}; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_stats::{to_prost_table_stats_map, TableStats, TableStatsMap}; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, - LocalSstableInfo, FIRST_VERSION_ID, + LocalSstableInfo, SyncResult, FIRST_VERSION_ID, }; use risingwave_pb::common::{HostAddress, WorkerType}; use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::{HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot}; use risingwave_pb::meta::add_worker_node_request::Property; +use risingwave_rpc_client::HummockMetaClient; +use thiserror_ext::AsReport; use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; use crate::hummock::compaction::selector::{default_compaction_selector, ManualCompactionOption}; use crate::hummock::error::Error; use crate::hummock::test_utils::*; -use crate::hummock::{HummockManager, HummockManagerRef}; -use crate::manager::{MetaSrvEnv, MetaStoreImpl, WorkerId}; +use crate::hummock::{HummockManagerRef, MockHummockMetaClient}; +use crate::manager::{MetaSrvEnv, MetaStoreImpl}; use crate::model::MetadataModel; use crate::rpc::metrics::MetaMetrics; @@ -59,12 +62,23 @@ fn pin_snapshots_epoch(pin_snapshots: &[HummockPinnedSnapshot]) -> Vec { .collect_vec() } -fn gen_sstable_info(sst_id: u64, idx: usize, table_ids: Vec) -> SstableInfo { +fn gen_sstable_info(sst_id: u64, table_ids: Vec, epoch: u64) -> SstableInfo { + let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1"); + let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1"); + let full_key_l = FullKey::for_test( + TableId::new(*table_ids.first().unwrap()), + table_key_l, + epoch, + ) + .encode(); + let full_key_r = + FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch).encode(); + SstableInfo { sst_id, key_range: KeyRange { - left: iterator_test_key_of_epoch(1, idx, 1).into(), - right: iterator_test_key_of_epoch(1, idx, 1).into(), + left: full_key_l.into(), + right: full_key_r.into(), right_exclusive: false, }, table_ids, @@ -77,9 +91,9 @@ fn gen_sstable_info(sst_id: u64, idx: usize, table_ids: Vec) -> SstableInfo } } -fn gen_local_sstable_info(sst_id: u64, idx: usize, table_ids: Vec) -> LocalSstableInfo { +fn gen_local_sstable_info(sst_id: u64, table_ids: Vec, epoch: u64) -> LocalSstableInfo { LocalSstableInfo { - sst_info: gen_sstable_info(sst_id, idx, table_ids), + sst_info: gen_sstable_info(sst_id, table_ids, epoch), table_stats: Default::default(), } } @@ -181,8 +195,12 @@ async fn test_unpin_snapshot_before() { #[tokio::test] async fn test_hummock_compaction_task() { - let (_, hummock_manager, _, _worker_node) = setup_compute_env(80).await; + let (_, hummock_manager, _, worker_node) = setup_compute_env(80).await; let sst_num = 2; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); // No compaction task available. assert!(hummock_manager @@ -196,9 +214,10 @@ async fn test_hummock_compaction_task() { // Add some sstables and commit. let epoch = test_epoch(1); + let table_id = 1; let original_tables = generate_test_sstables_with_table_id( epoch, - 1, + table_id, get_sst_ids(&hummock_manager, sst_num).await, ); register_sstable_infos_to_compaction_group( @@ -207,20 +226,23 @@ async fn test_hummock_compaction_task() { StaticCompactionGroupId::StateDefault.into(), ) .await; - commit_from_meta_node( - hummock_manager.borrow(), - epoch, - to_local_sstable_info(&original_tables), - ) - .await - .unwrap(); + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: to_local_sstable_info(&original_tables), + ..Default::default() + }, + false, + ) + .await + .unwrap(); // Get a compaction task. + let compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), table_id).await; let compact_task = hummock_manager - .get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - &mut default_compaction_selector(), - ) + .get_compact_task(compaction_group_id, &mut default_compaction_selector()) .await .unwrap() .unwrap(); @@ -235,10 +257,7 @@ async fn test_hummock_compaction_task() { // Get a compaction task. let compact_task = hummock_manager - .get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - &mut default_compaction_selector(), - ) + .get_compact_task(compaction_group_id, &mut default_compaction_selector()) .await .unwrap() .unwrap(); @@ -253,7 +272,11 @@ async fn test_hummock_compaction_task() { #[tokio::test] async fn test_hummock_table() { - let (_env, hummock_manager, _cluster_manager, _worker_node) = setup_compute_env(80).await; + let (_env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let epoch = test_epoch(1); let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await); @@ -263,17 +286,21 @@ async fn test_hummock_table() { StaticCompactionGroupId::StateDefault.into(), ) .await; - commit_from_meta_node( - hummock_manager.borrow(), - epoch, - to_local_sstable_info(&original_tables), - ) - .await - .unwrap(); + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: to_local_sstable_info(&original_tables), + ..Default::default() + }, + false, + ) + .await + .unwrap(); + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); let pinned_version = hummock_manager.get_current_version().await; - let levels = - pinned_version.get_compaction_group_levels(StaticCompactionGroupId::StateDefault.into()); + let levels = pinned_version.get_compaction_group_levels(compaction_group_id); assert_eq!( Ordering::Equal, levels @@ -290,14 +317,18 @@ async fn test_hummock_table() { // Confirm tables got are equal to original tables assert_eq!( get_sorted_object_ids(&original_tables), - get_sorted_committed_object_ids(&pinned_version) + get_sorted_committed_object_ids(&pinned_version, compaction_group_id) ); } #[tokio::test] async fn test_hummock_transaction() { - let (_env, hummock_manager, _cluster_manager, _worker_node) = setup_compute_env(80).await; + let (_env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; let mut committed_tables = vec![]; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); // Add and commit tables in epoch1. // BEFORE: committed_epochs = [] @@ -318,24 +349,30 @@ async fn test_hummock_transaction() { current_version.visible_table_committed_epoch(), INVALID_EPOCH ); - assert!(get_sorted_committed_object_ids(¤t_version).is_empty()); + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); + assert!(get_sorted_committed_object_ids(¤t_version, compaction_group_id).is_empty()); // Commit epoch1 - commit_from_meta_node( - hummock_manager.borrow(), - epoch1, - to_local_sstable_info(&tables_in_epoch1), - ) - .await - .unwrap(); + hummock_meta_client + .commit_epoch( + epoch1, + SyncResult { + uncommitted_ssts: to_local_sstable_info(&tables_in_epoch1), + ..Default::default() + }, + false, + ) + .await + .unwrap(); committed_tables.extend(tables_in_epoch1.clone()); // Get tables after committing epoch1. All tables committed in epoch1 should be returned let current_version = hummock_manager.get_current_version().await; + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); assert_eq!(current_version.visible_table_committed_epoch(), epoch1); assert_eq!( get_sorted_object_ids(&committed_tables), - get_sorted_committed_object_ids(¤t_version) + get_sorted_committed_object_ids(¤t_version, compaction_group_id) ); } @@ -355,29 +392,35 @@ async fn test_hummock_transaction() { // Get tables before committing epoch2. tables_in_epoch1 should be returned and // tables_in_epoch2 should be invisible. let current_version = hummock_manager.get_current_version().await; + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); assert_eq!(current_version.visible_table_committed_epoch(), epoch1); assert_eq!( get_sorted_object_ids(&committed_tables), - get_sorted_committed_object_ids(¤t_version) + get_sorted_committed_object_ids(¤t_version, compaction_group_id) ); // Commit epoch2 - commit_from_meta_node( - hummock_manager.borrow(), - epoch2, - to_local_sstable_info(&tables_in_epoch2), - ) - .await - .unwrap(); + hummock_meta_client + .commit_epoch( + epoch2, + SyncResult { + uncommitted_ssts: to_local_sstable_info(&tables_in_epoch2), + ..Default::default() + }, + false, + ) + .await + .unwrap(); committed_tables.extend(tables_in_epoch2); // Get tables after committing epoch2. tables_in_epoch1 and tables_in_epoch2 should be // returned let current_version = hummock_manager.get_current_version().await; + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); assert_eq!(current_version.visible_table_committed_epoch(), epoch2); assert_eq!( get_sorted_object_ids(&committed_tables), - get_sorted_committed_object_ids(¤t_version) + get_sorted_committed_object_ids(¤t_version, compaction_group_id) ); } } @@ -469,6 +512,10 @@ async fn test_context_id_validation() { async fn test_hummock_manager_basic() { let (_env, hummock_manager, cluster_manager, worker_node) = setup_compute_env(1).await; let context_id_1 = worker_node.id; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let fake_host_address_2 = HostAddress { host: "127.0.0.1".to_string(), @@ -501,7 +548,9 @@ async fn test_hummock_manager_basic() { let mut epoch = test_epoch(1); let mut register_log_count = 0; let mut commit_log_count = 0; - let commit_one = |epoch: HummockEpoch, hummock_manager: HummockManagerRef| async move { + let commit_one = |epoch: HummockEpoch, + hummock_manager: HummockManagerRef, + hummock_meta_client: Arc| async move { let original_tables = generate_test_tables(test_epoch(epoch), get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( @@ -510,16 +559,21 @@ async fn test_hummock_manager_basic() { StaticCompactionGroupId::StateDefault.into(), ) .await; - commit_from_meta_node( - hummock_manager.borrow(), - epoch, - to_local_sstable_info(&original_tables), - ) - .await - .unwrap(); + + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: to_local_sstable_info(&original_tables), + ..Default::default() + }, + false, + ) + .await + .unwrap(); }; - commit_one(epoch, hummock_manager.clone()).await; + commit_one(epoch, hummock_manager.clone(), hummock_meta_client.clone()).await; register_log_count += 1; commit_log_count += 1; epoch.inc_epoch(); @@ -559,7 +613,7 @@ async fn test_hummock_manager_basic() { ); } - commit_one(epoch, hummock_manager.clone()).await; + commit_one(epoch, hummock_manager.clone(), hummock_meta_client.clone()).await; commit_log_count += 1; register_log_count += 1; @@ -618,6 +672,10 @@ async fn test_hummock_manager_basic() { #[tokio::test] async fn test_pin_snapshot_response_lost() { let (_env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let context_id = worker_node.id; let mut epoch = test_epoch(1); @@ -629,13 +687,17 @@ async fn test_pin_snapshot_response_lost() { ) .await; // [ ] -> [ e0 ] - commit_from_meta_node( - hummock_manager.borrow(), - epoch, - to_local_sstable_info(&test_tables), - ) - .await - .unwrap(); + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: to_local_sstable_info(&test_tables), + ..Default::default() + }, + false, + ) + .await + .unwrap(); epoch.inc_epoch(); // Pin a snapshot with smallest last_pin @@ -652,13 +714,17 @@ async fn test_pin_snapshot_response_lost() { ) .await; // [ e0:pinned ] -> [ e0:pinned, e1 ] - commit_from_meta_node( - hummock_manager.borrow(), - epoch, - to_local_sstable_info(&test_tables), - ) - .await - .unwrap(); + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: to_local_sstable_info(&test_tables), + ..Default::default() + }, + false, + ) + .await + .unwrap(); epoch.inc_epoch(); // Assume the response of the previous rpc is lost. @@ -683,13 +749,17 @@ async fn test_pin_snapshot_response_lost() { ) .await; // [ e0, e1:pinned ] -> [ e0, e1:pinned, e2 ] - commit_from_meta_node( - hummock_manager.borrow(), - epoch, - to_local_sstable_info(&test_tables), - ) - .await - .unwrap(); + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: to_local_sstable_info(&test_tables), + ..Default::default() + }, + false, + ) + .await + .unwrap(); epoch.inc_epoch(); // Use correct snapshot id. @@ -708,13 +778,17 @@ async fn test_pin_snapshot_response_lost() { ) .await; // [ e0, e1:pinned, e2:pinned ] -> [ e0, e1:pinned, e2:pinned, e3 ] - commit_from_meta_node( - hummock_manager.borrow(), - epoch, - to_local_sstable_info(&test_tables), - ) - .await - .unwrap(); + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: to_local_sstable_info(&test_tables), + ..Default::default() + }, + false, + ) + .await + .unwrap(); epoch.inc_epoch(); // Use u64::MAX as epoch to pin greatest snapshot @@ -728,31 +802,37 @@ async fn test_pin_snapshot_response_lost() { #[tokio::test] async fn test_print_compact_task() { - let (_, hummock_manager, _cluster_manager, _) = setup_compute_env(80).await; + let (_, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); // Add some sstables and commit. let epoch = test_epoch(1); let original_tables = generate_test_sstables_with_table_id(epoch, 1, get_sst_ids(&hummock_manager, 2).await); + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); register_sstable_infos_to_compaction_group( &hummock_manager, &original_tables, - StaticCompactionGroupId::StateDefault.into(), + compaction_group_id, ) .await; - commit_from_meta_node( - hummock_manager.borrow(), - epoch, - to_local_sstable_info(&original_tables), - ) - .await - .unwrap(); + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: to_local_sstable_info(&original_tables), + ..Default::default() + }, + false, + ) + .await + .unwrap(); // Get a compaction task. let compact_task = hummock_manager - .get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - &mut default_compaction_selector(), - ) + .get_compact_task(compaction_group_id, &mut default_compaction_selector()) .await .unwrap() .unwrap(); @@ -765,33 +845,45 @@ async fn test_print_compact_task() { #[tokio::test] async fn test_invalid_sst_id() { let (_, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; - let context_id = worker_node.id; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let epoch = test_epoch(1); let ssts = generate_test_tables(epoch, vec![1]); - register_sstable_infos_to_compaction_group( - &hummock_manager, - &ssts, - StaticCompactionGroupId::StateDefault.into(), - ) - .await; + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); + register_sstable_infos_to_compaction_group(&hummock_manager, &ssts, compaction_group_id).await; let ssts = to_local_sstable_info(&ssts); // reject due to invalid context id - let sst_to_worker = ssts - .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, WorkerId::MAX)) - .collect(); - let error = hummock_manager - .commit_epoch_for_test(epoch, ssts.clone(), sst_to_worker) - .await - .unwrap_err(); - assert!(matches!(error, Error::InvalidSst(1))); + { + let hummock_meta_client: Arc = + Arc::new(MockHummockMetaClient::new(hummock_manager.clone(), 23333)); + let error = hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: ssts.clone(), + ..Default::default() + }, + false, + ) + .await + .unwrap_err(); + assert_eq!( + error.as_report().to_string(), + "mock error: SST 1 is invalid" + ); + } - let sst_to_worker = ssts - .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, context_id)) - .collect(); - hummock_manager - .commit_epoch_for_test(epoch, ssts, sst_to_worker) + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: ssts.clone(), + ..Default::default() + }, + false, + ) .await .unwrap(); } @@ -799,6 +891,10 @@ async fn test_invalid_sst_id() { #[tokio::test] async fn test_trigger_manual_compaction() { let (_, hummock_manager, _, worker_node) = setup_compute_env(80).await; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let context_id = worker_node.id; { @@ -831,7 +927,13 @@ async fn test_trigger_manual_compaction() { } // Generate data for compaction task - let _ = add_test_tables(&hummock_manager, context_id).await; + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); + let _ = add_test_tables( + &hummock_manager, + hummock_meta_client.clone(), + compaction_group_id, + ) + .await; { // to check compactor send task fail drop(receiver); @@ -879,6 +981,10 @@ async fn test_hummock_compaction_task_heartbeat() { use crate::hummock::HummockManager; let (_env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let context_id = worker_node.id; let sst_num = 2; @@ -910,13 +1016,17 @@ async fn test_hummock_compaction_task_heartbeat() { StaticCompactionGroupId::StateDefault.into(), ) .await; - commit_from_meta_node( - hummock_manager.borrow(), - epoch, - to_local_sstable_info(&original_tables), - ) - .await - .unwrap(); + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: to_local_sstable_info(&original_tables), + ..Default::default() + }, + false, + ) + .await + .unwrap(); // Get a compaction task. let compact_task = hummock_manager @@ -992,6 +1102,10 @@ async fn test_hummock_compaction_task_heartbeat_removal_on_node_removal() { use crate::hummock::HummockManager; let (_env, hummock_manager, cluster_manager, worker_node) = setup_compute_env(80).await; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let context_id = worker_node.id; let sst_num = 2; @@ -1023,13 +1137,17 @@ async fn test_hummock_compaction_task_heartbeat_removal_on_node_removal() { StaticCompactionGroupId::StateDefault.into(), ) .await; - commit_from_meta_node( - hummock_manager.borrow(), - epoch, - to_local_sstable_info(&original_tables), - ) - .await - .unwrap(); + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: to_local_sstable_info(&original_tables), + ..Default::default() + }, + false, + ) + .await + .unwrap(); // Get a compaction task. let compact_task = hummock_manager @@ -1071,8 +1189,18 @@ async fn test_hummock_compaction_task_heartbeat_removal_on_node_removal() { async fn test_extend_objects_to_delete() { let (_env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; let context_id = worker_node.id; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let _pinned_version1 = hummock_manager.pin_version(context_id).await.unwrap(); - let sst_infos = add_test_tables(hummock_manager.as_ref(), context_id).await; + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); + let sst_infos = add_test_tables( + hummock_manager.as_ref(), + hummock_meta_client.clone(), + compaction_group_id, + ) + .await; let max_committed_object_id = sst_infos .iter() .map(|ssts| { @@ -1150,11 +1278,14 @@ async fn test_extend_objects_to_delete() { let objects_to_delete = hummock_manager.get_objects_to_delete(); assert_eq!(objects_to_delete.len(), orphan_sst_num as usize); let new_epoch = pinned_version2.visible_table_committed_epoch().next_epoch(); - hummock_manager - .commit_epoch_for_test( + hummock_meta_client + .commit_epoch( new_epoch, - Vec::::new(), - Default::default(), + SyncResult { + uncommitted_ssts: vec![], + ..Default::default() + }, + false, ) .await .unwrap(); @@ -1179,6 +1310,11 @@ async fn test_extend_objects_to_delete() { #[tokio::test] async fn test_version_stats() { let (_env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); + let init_stats = hummock_manager.get_version_stats().await; assert!(init_stats.table_stats.is_empty()); @@ -1222,12 +1358,15 @@ async fn test_version_stats() { .collect(), }) .collect_vec(); - let sst_to_worker = ssts - .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, worker_node.id)) - .collect(); - hummock_manager - .commit_epoch_for_test(epoch, ssts, sst_to_worker) + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: ssts, + ..Default::default() + }, + false, + ) .await .unwrap(); @@ -1246,11 +1385,6 @@ async fn test_version_stats() { assert_eq!(table3_stats.total_value_size, 100); assert_eq!(table3_stats.total_key_size, 1000); - // Report compaction - hummock_manager - .compactor_manager_ref_for_test() - .add_compactor(worker_node.id); - let compact_task = hummock_manager .get_compact_task( StaticCompactionGroupId::StateDefault.into(), @@ -1305,13 +1439,12 @@ async fn test_version_stats() { #[tokio::test] async fn test_split_compaction_group_on_commit() { let (_env, hummock_manager, _, worker_node) = setup_compute_env(80).await; - let context_id = worker_node.id; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); hummock_manager - .register_table_ids_for_test(&[(100, 2)]) - .await - .unwrap(); - hummock_manager - .register_table_ids_for_test(&[(101, 3)]) + .register_table_ids_for_test(&[(100, 2), (101, 3)]) .await .unwrap(); let sst_1 = LocalSstableInfo { @@ -1343,8 +1476,15 @@ async fn test_split_compaction_group_on_commit() { ), ]), }; - hummock_manager - .commit_epoch_for_test(30, vec![sst_1], HashMap::from([(10, context_id)])) + hummock_meta_client + .commit_epoch( + test_epoch(30), + SyncResult { + uncommitted_ssts: vec![sst_1], + ..Default::default() + }, + false, + ) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; @@ -1380,7 +1520,10 @@ async fn test_split_compaction_group_on_commit() { #[tokio::test] async fn test_split_compaction_group_on_demand_basic() { let (_env, hummock_manager, _, worker_node) = setup_compute_env(80).await; - let context_id = worker_node.id; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let original_groups = hummock_manager .get_current_version() .await @@ -1407,11 +1550,7 @@ async fn test_split_compaction_group_on_demand_basic() { ); hummock_manager - .register_table_ids_for_test(&[(100, 2)]) - .await - .unwrap(); - hummock_manager - .register_table_ids_for_test(&[(101, 2)]) + .register_table_ids_for_test(&[(100, 2), (101, 2)]) .await .unwrap(); let sst_1 = LocalSstableInfo { @@ -1450,17 +1589,21 @@ async fn test_split_compaction_group_on_demand_basic() { }, table_stats: Default::default(), }; - hummock_manager - .commit_epoch_for_test( - 30, - vec![sst_1, sst_2], - HashMap::from([(10, context_id), (11, context_id)]), + hummock_meta_client + .commit_epoch( + test_epoch(30), + SyncResult { + uncommitted_ssts: vec![sst_1, sst_2], + ..Default::default() + }, + false, ) .await .unwrap(); + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); let err = hummock_manager - .split_compaction_group(2, &[100, 101], 0) + .split_compaction_group(compaction_group_id, &[100, 101], 0) .await .unwrap_err(); assert_eq!( @@ -1476,25 +1619,29 @@ async fn test_split_compaction_group_on_demand_basic() { .unwrap(); hummock_manager - .split_compaction_group(2, &[100, 101], 0) + .split_compaction_group(compaction_group_id, &[100, 101], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; assert_eq!(current_version.levels.len(), 3); - let new_group_id = current_version.levels.keys().max().cloned().unwrap(); - assert!(new_group_id > StaticCompactionGroupId::End as u64); + let new_compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), 100).await; + assert!(new_compaction_group_id > StaticCompactionGroupId::End as u64); + + let old_compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), 102).await; assert_eq!( - get_compaction_group_object_ids(¤t_version, 2), + get_compaction_group_object_ids(¤t_version, old_compaction_group_id), Vec::::new() ); assert_eq!( - get_compaction_group_object_ids(¤t_version, new_group_id), + get_compaction_group_object_ids(¤t_version, new_compaction_group_id), vec![10, 11] ); assert_eq!( current_version .state_table_info - .compaction_group_member_table_ids(2) + .compaction_group_member_table_ids(old_compaction_group_id) .iter() .map(|table_id| table_id.table_id) .collect_vec(), @@ -1503,7 +1650,7 @@ async fn test_split_compaction_group_on_demand_basic() { assert_eq!( current_version .state_table_info - .compaction_group_member_table_ids(new_group_id) + .compaction_group_member_table_ids(new_compaction_group_id) .iter() .map(|table_id| table_id.table_id) .sorted() @@ -1515,7 +1662,10 @@ async fn test_split_compaction_group_on_demand_basic() { #[tokio::test] async fn test_split_compaction_group_on_demand_non_trivial() { let (_env, hummock_manager, _, worker_node) = setup_compute_env(80).await; - let context_id = worker_node.id; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let sst_1 = LocalSstableInfo { sst_info: SstableInfo { object_id: 10, @@ -1531,39 +1681,46 @@ async fn test_split_compaction_group_on_demand_non_trivial() { table_stats: Default::default(), }; hummock_manager - .register_table_ids_for_test(&[(100, 2)]) - .await - .unwrap(); - hummock_manager - .register_table_ids_for_test(&[(101, 2)]) + .register_table_ids_for_test(&[(100, 2), (101, 2)]) .await .unwrap(); - hummock_manager - .commit_epoch_for_test(30, vec![sst_1], HashMap::from([(10, context_id)])) + hummock_meta_client + .commit_epoch( + 30, + SyncResult { + uncommitted_ssts: vec![sst_1], + ..Default::default() + }, + false, + ) .await .unwrap(); + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); hummock_manager - .split_compaction_group(2, &[100], 0) + .split_compaction_group(compaction_group_id, &[100], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; assert_eq!(current_version.levels.len(), 3); - let new_group_id = current_version.levels.keys().max().cloned().unwrap(); - assert!(new_group_id > StaticCompactionGroupId::End as u64); + let new_compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), 100).await; + assert!(new_compaction_group_id > StaticCompactionGroupId::End as u64); + let old_compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), 101).await; assert_eq!( - get_compaction_group_object_ids(¤t_version, 2), + get_compaction_group_object_ids(¤t_version, old_compaction_group_id), vec![10] ); assert_eq!( - get_compaction_group_object_ids(¤t_version, new_group_id), + get_compaction_group_object_ids(¤t_version, new_compaction_group_id), vec![10] ); assert_eq!( current_version .state_table_info - .compaction_group_member_table_ids(2) + .compaction_group_member_table_ids(old_compaction_group_id) .iter() .map(|table_id| table_id.table_id) .collect_vec(), @@ -1572,7 +1729,7 @@ async fn test_split_compaction_group_on_demand_non_trivial() { assert_eq!( current_version .state_table_info - .compaction_group_member_table_ids(new_group_id) + .compaction_group_member_table_ids(new_compaction_group_id) .iter() .map(|table_id| table_id.table_id) .collect_vec(), @@ -1583,7 +1740,10 @@ async fn test_split_compaction_group_on_demand_non_trivial() { #[tokio::test] async fn test_split_compaction_group_trivial_expired() { let (_env, hummock_manager, _, worker_node) = setup_compute_env(80).await; - let context_id = worker_node.id; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let original_groups = hummock_manager .get_current_version() .await @@ -1593,14 +1753,9 @@ async fn test_split_compaction_group_trivial_expired() { .sorted() .collect_vec(); assert_eq!(original_groups, vec![2, 3]); - hummock_manager.compactor_manager.add_compactor(context_id); hummock_manager - .register_table_ids_for_test(&[(100, 2)]) - .await - .unwrap(); - hummock_manager - .register_table_ids_for_test(&[(101, 2)]) + .register_table_ids_for_test(&[(100, 2), (101, 2)]) .await .unwrap(); let sst_1 = LocalSstableInfo { @@ -1645,16 +1800,14 @@ async fn test_split_compaction_group_trivial_expired() { sst_3.sst_info.object_id = 8; sst_4.sst_info.sst_id = 9; sst_4.sst_info.object_id = 9; - hummock_manager - .commit_epoch_for_test( + hummock_meta_client + .commit_epoch( 30, - vec![sst_1, sst_2, sst_3, sst_4], - HashMap::from([ - (10, context_id), - (11, context_id), - (9, context_id), - (8, context_id), - ]), + SyncResult { + uncommitted_ssts: vec![sst_1, sst_2, sst_3, sst_4], + ..Default::default() + }, + false, ) .await .unwrap(); @@ -1671,19 +1824,23 @@ async fn test_split_compaction_group_trivial_expired() { .unwrap() .unwrap(); + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); hummock_manager - .split_compaction_group(2, &[100], 0) + .split_compaction_group(compaction_group_id, &[100], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; - let new_group_id = current_version.levels.keys().max().cloned().unwrap(); + let new_compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), 100).await; + let old_compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), 101).await; assert_eq!(current_version.levels.len(), 3); - assert!(new_group_id > StaticCompactionGroupId::End as u64); + assert!(new_compaction_group_id > StaticCompactionGroupId::End as u64); assert_eq!( current_version .state_table_info - .compaction_group_member_table_ids(2) + .compaction_group_member_table_ids(old_compaction_group_id) .iter() .map(|table_id| table_id.table_id) .sorted() @@ -1693,7 +1850,7 @@ async fn test_split_compaction_group_trivial_expired() { assert_eq!( current_version .state_table_info - .compaction_group_member_table_ids(new_group_id) + .compaction_group_member_table_ids(new_compaction_group_id) .iter() .map(|table_id| table_id.table_id) .collect_vec(), @@ -1701,7 +1858,7 @@ async fn test_split_compaction_group_trivial_expired() { ); let task2 = hummock_manager - .get_compact_task(new_group_id, &mut default_compaction_selector()) + .get_compact_task(new_compaction_group_id, &mut default_compaction_selector()) .await .unwrap() .unwrap(); @@ -1735,18 +1892,17 @@ async fn test_split_compaction_group_trivial_expired() { } async fn get_manual_compact_task( - hummock_manager: &HummockManager, - context_id: HummockContextId, + hummock_manager_ref: HummockManagerRef, + compaction_group_id: u64, + level: usize, ) -> CompactTask { - hummock_manager.compactor_manager.add_compactor(context_id); - hummock_manager - .manual_get_compact_task( - 2, - ManualCompactionOption { - level: 0, - ..Default::default() - }, - ) + let manual_compcation_option = ManualCompactionOption { + level, + ..Default::default() + }; + + hummock_manager_ref + .manual_get_compact_task(compaction_group_id, manual_compcation_option) .await .unwrap() .unwrap() @@ -1755,14 +1911,13 @@ async fn get_manual_compact_task( #[tokio::test] async fn test_split_compaction_group_on_demand_bottom_levels() { let (_env, hummock_manager, _, worker_node) = setup_compute_env(80).await; - let context_id = worker_node.id; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); hummock_manager - .register_table_ids_for_test(&[(100, 2)]) - .await - .unwrap(); - hummock_manager - .register_table_ids_for_test(&[(101, 2)]) + .register_table_ids_for_test(&[(100, 2), (101, 2)]) .await .unwrap(); @@ -1784,12 +1939,22 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { }, table_stats: Default::default(), }; - hummock_manager - .commit_epoch_for_test(30, vec![sst_1.clone()], HashMap::from([(10, context_id)])) + hummock_meta_client + .commit_epoch( + 30, + SyncResult { + uncommitted_ssts: vec![sst_1.clone()], + ..Default::default() + }, + false, + ) .await .unwrap(); + // Construct data via manual compaction - let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); + let compaction_task = + get_manual_compact_task(hummock_manager.clone(), compaction_group_id, 0).await; let base_level: usize = 6; assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1); assert_eq!(compaction_task.target_level, base_level as u32); @@ -1832,43 +1997,56 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { .unwrap()); let current_version = hummock_manager.get_current_version().await; assert!(current_version - .get_compaction_group_levels(2) + .get_compaction_group_levels(compaction_group_id) .l0 .sub_levels .is_empty()); assert_eq!( - current_version.get_compaction_group_levels(2).levels[base_level - 1] + current_version + .get_compaction_group_levels(compaction_group_id) + .levels[base_level - 1] .table_infos .len(), 2 ); hummock_manager - .split_compaction_group(2, &[100], 0) + .split_compaction_group(compaction_group_id, &[100], 0) .await .unwrap(); let current_version = hummock_manager.get_current_version().await; - let new_group_id = current_version.levels.keys().max().cloned().unwrap(); + let new_compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), 100).await; + let old_compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), 101).await; assert_eq!( - current_version.get_compaction_group_levels(2).levels[base_level - 1] + current_version + .get_compaction_group_levels(old_compaction_group_id) + .levels[base_level - 1] .table_infos .len(), 1 ); assert_eq!( - current_version.get_compaction_group_levels(2).levels[base_level - 1].table_infos[0] + current_version + .get_compaction_group_levels(old_compaction_group_id) + .levels[base_level - 1] + .table_infos[0] .object_id, sst_1.sst_info.object_id + 1, ); assert_eq!( - current_version.get_compaction_group_levels(2).levels[base_level - 1].table_infos[0] + current_version + .get_compaction_group_levels(old_compaction_group_id) + .levels[base_level - 1] + .table_infos[0] .table_ids, vec![101] ); assert_eq!( current_version - .get_compaction_group_levels(new_group_id) + .get_compaction_group_levels(new_compaction_group_id) .levels[base_level - 1] .table_infos .len(), @@ -1876,7 +2054,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { ); assert_eq!( current_version - .get_compaction_group_levels(new_group_id) + .get_compaction_group_levels(new_compaction_group_id) .levels[base_level - 1] .table_infos[0] .table_ids, @@ -1884,7 +2062,7 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { ); assert_eq!( current_version - .get_compaction_group_levels(new_group_id) + .get_compaction_group_levels(new_compaction_group_id) .levels[base_level - 1] .table_infos[1] .table_ids, @@ -1895,14 +2073,14 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { #[tokio::test] async fn test_compaction_task_expiration_due_to_split_group() { let (_env, hummock_manager, _, worker_node) = setup_compute_env(80).await; - let context_id = worker_node.id; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); hummock_manager - .register_table_ids_for_test(&[(100, 2)]) - .await - .unwrap(); - hummock_manager - .register_table_ids_for_test(&[(101, 2)]) + .register_table_ids_for_test(&[(100, compaction_group_id), (101, compaction_group_id)]) .await .unwrap(); let sst_1 = LocalSstableInfo { @@ -1941,24 +2119,29 @@ async fn test_compaction_task_expiration_due_to_split_group() { }, table_stats: Default::default(), }; - hummock_manager - .commit_epoch_for_test( + + hummock_meta_client + .commit_epoch( 30, - vec![sst_1, sst_2], - HashMap::from([(10, context_id), (11, context_id)]), + SyncResult { + uncommitted_ssts: vec![sst_1, sst_2], + ..Default::default() + }, + false, ) .await .unwrap(); - let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); + let compaction_task = + get_manual_compact_task(hummock_manager.clone(), compaction_group_id, 0).await; assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 2); hummock_manager - .split_compaction_group(2, &[100], 0) + .split_compaction_group(compaction_group_id, &[100], 0) .await .unwrap(); let version_1 = hummock_manager.get_current_version().await; - // compaction_task.task_status = TaskStatus::Success.into(); assert!(!hummock_manager .report_compact_task(compaction_task.task_id, TaskStatus::Success, vec![], None) .await @@ -1969,7 +2152,8 @@ async fn test_compaction_task_expiration_due_to_split_group() { "version should not change because compaction task has been cancelled" ); - let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; + let compaction_task = + get_manual_compact_task(hummock_manager.clone(), compaction_group_id, 0).await; assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 2); hummock_manager .report_compact_task(compaction_task.task_id, TaskStatus::Success, vec![], None) @@ -1986,75 +2170,73 @@ async fn test_compaction_task_expiration_due_to_split_group() { #[tokio::test] async fn test_move_tables_between_compaction_group() { let (_env, hummock_manager, _, worker_node) = setup_compute_env(80).await; - let context_id = worker_node.id; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); hummock_manager - .register_table_ids_for_test(&[(100, 2)]) + .register_table_ids_for_test(&[(100, 2), (101, 2), (102, 2)]) .await .unwrap(); - hummock_manager - .register_table_ids_for_test(&[(101, 2)]) - .await - .unwrap(); - hummock_manager - .register_table_ids_for_test(&[(102, 2)]) - .await - .unwrap(); - let sst_1 = gen_local_sstable_info(10, 1, vec![100, 101, 102]); - hummock_manager - .commit_epoch_for_test(30, vec![sst_1.clone()], HashMap::from([(10, context_id)])) + let sst_1 = gen_local_sstable_info(10, vec![100, 101, 102], test_epoch(1)); + + hummock_meta_client + .commit_epoch( + 30, + SyncResult { + uncommitted_ssts: vec![sst_1.clone()], + ..Default::default() + }, + false, + ) .await .unwrap(); - // Construct data via manual compaction - let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; - let base_level: usize = 6; - assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1); - assert_eq!(compaction_task.target_level, base_level as u32); - assert!(hummock_manager - .report_compact_task( - compaction_task.task_id, - TaskStatus::Success, - vec![ - gen_sstable_info(11, 1, vec![100]), - gen_sstable_info(12, 2, vec![100, 101]), - gen_sstable_info(13, 3, vec![101, 102]), - ], - None, + + let sst_2 = gen_local_sstable_info(14, vec![101, 102], test_epoch(2)); + + hummock_meta_client + .commit_epoch( + 31, + SyncResult { + uncommitted_ssts: vec![sst_2.clone()], + ..Default::default() + }, + false, ) .await - .unwrap()); - let sst_2 = gen_local_sstable_info(14, 1, vec![101, 102]); - hummock_manager - .commit_epoch_for_test(31, vec![sst_2.clone()], HashMap::from([(14, context_id)])) - .await .unwrap(); + let current_version = hummock_manager.get_current_version().await; - assert_eq!( - current_version.get_compaction_group_levels(2).levels[base_level - 1] - .table_infos - .len(), - 3 - ); + let compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), 100).await; + let sst_ids = current_version + .get_sst_ids_by_group_id(compaction_group_id) + .collect_vec(); + assert_eq!(2, sst_ids.len()); + assert!(sst_ids.contains(&10)); + assert!(sst_ids.contains(&14)); hummock_manager .split_compaction_group(2, &[100], 0) .await .unwrap(); + let compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), 101).await; let current_version = hummock_manager.get_current_version().await; - let new_group_id = current_version.levels.keys().max().cloned().unwrap(); - assert_eq!( - current_version.get_compaction_group_levels(2).levels[base_level - 1] - .table_infos - .len(), - 2 - ); + let sst_ids = current_version + .get_sst_ids_by_group_id(compaction_group_id) + .collect_vec(); + assert_eq!(2, sst_ids.len()); + assert!(!sst_ids.contains(&10)); - let level = ¤t_version - .get_compaction_group_levels(new_group_id) - .levels[base_level - 1]; - assert_eq!(level.table_infos[0].table_ids, vec![100]); - assert_eq!(level.table_infos[1].table_ids, vec![100]); - assert_eq!(level.table_infos.len(), 2); + let compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), 100).await; + let sst_ids = current_version + .get_sst_ids_by_group_id(compaction_group_id) + .collect_vec(); + assert_eq!(1, sst_ids.len()); + assert!(!sst_ids.contains(&10)); } #[tokio::test] @@ -2068,6 +2250,10 @@ async fn test_gc_stats() { let registry = Registry::new(); let (_env, hummock_manager, _, worker_node) = setup_compute_env_with_metric(80, config, Some(MetaMetrics::for_test(®istry))).await; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let context_id = worker_node.id; let assert_eq_gc_stats = |stale_object_size, stale_object_count, @@ -2106,8 +2292,14 @@ async fn test_gc_stats() { 0 ); + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); hummock_manager.pin_version(context_id).await.unwrap(); - let _ = add_test_tables(&hummock_manager, context_id).await; + let _ = add_test_tables( + &hummock_manager, + hummock_meta_client.clone(), + compaction_group_id, + ) + .await; assert_eq_gc_stats(0, 0, 0, 0, 0, 0); assert_ne!( hummock_manager.create_version_checkpoint(0).await.unwrap(), @@ -2125,7 +2317,6 @@ async fn test_gc_stats() { hummock_manager.create_version_checkpoint(0).await.unwrap(), 0 ); - assert_eq_gc_stats(6, 3, 0, 0, 2, 4); } #[tokio::test] @@ -2139,79 +2330,63 @@ async fn test_partition_level() { let (env, hummock_manager, _, worker_node) = setup_compute_env_with_metric(80, config.clone(), Some(MetaMetrics::for_test(®istry))) .await; - let context_id = worker_node.id; - - hummock_manager - .register_table_ids_for_test(&[(100, 2)]) - .await - .unwrap(); - hummock_manager - .register_table_ids_for_test(&[(101, 2)]) - .await - .unwrap(); - let sst_1 = gen_local_sstable_info(10, 1, vec![100, 101]); + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); hummock_manager - .commit_epoch_for_test(30, vec![sst_1.clone()], HashMap::from([(10, context_id)])) + .register_table_ids_for_test(&[(100, 2), (101, 2)]) .await .unwrap(); - // Construct data via manual compaction - let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; - let base_level: usize = 6; - assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1); - assert_eq!(compaction_task.target_level, base_level as u32); - assert!(hummock_manager - .report_compact_task( - compaction_task.task_id, - TaskStatus::Success, - vec![ - gen_sstable_info(11, 1, vec![100]), - gen_sstable_info(12, 2, vec![101]), - ], - None, + let sst_1 = gen_local_sstable_info(10, vec![100, 101], test_epoch(1)); + + hummock_meta_client + .commit_epoch( + 30, + SyncResult { + uncommitted_ssts: vec![sst_1], + ..Default::default() + }, + false, ) .await - .unwrap()); + .unwrap(); + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); hummock_manager - .split_compaction_group(2, &[100], env.opts.partition_vnode_count) + .split_compaction_group(compaction_group_id, &[100], env.opts.partition_vnode_count) .await .unwrap(); - let current_version = hummock_manager.get_current_version().await; - - let new_group_id = current_version.levels.keys().max().cloned().unwrap(); - assert_eq!( - current_version - .get_compaction_group_levels(new_group_id) - .levels[base_level - 1] - .table_infos - .len(), - 1 - ); - + let new_compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager.clone(), 100).await; let mut global_sst_id = 13; const MB: u64 = 1024 * 1024; let mut selector = default_compaction_selector(); for epoch in 31..100 { - let mut sst = gen_local_sstable_info(global_sst_id, 10, vec![100]); + let mut sst = gen_local_sstable_info(global_sst_id, vec![100], test_epoch(epoch)); sst.sst_info.file_size = 10 * MB; sst.sst_info.sst_size = 10 * MB; sst.sst_info.uncompressed_file_size = 10 * MB; - hummock_manager - .commit_epoch_for_test( + hummock_meta_client + .commit_epoch( epoch, - vec![sst], - HashMap::from([(global_sst_id, context_id)]), + SyncResult { + uncommitted_ssts: vec![sst], + ..Default::default() + }, + false, ) .await .unwrap(); + global_sst_id += 1; if let Some(task) = hummock_manager - .get_compact_task(new_group_id, &mut selector) + .get_compact_task(new_compaction_group_id, &mut selector) .await .unwrap() { - let mut sst = gen_sstable_info(global_sst_id, 10, vec![100]); + let mut sst = gen_sstable_info(global_sst_id, vec![100], test_epoch(epoch)); sst.file_size = task .input_ssts .iter() @@ -2233,7 +2408,7 @@ async fn test_partition_level() { } } let current_version = hummock_manager.get_current_version().await; - let group = current_version.get_compaction_group_levels(new_group_id); + let group = current_version.get_compaction_group_levels(new_compaction_group_id); for sub_level in &group.l0.sub_levels { if sub_level.total_file_size > config.sub_level_max_compaction_bytes { assert!(sub_level.vnode_partition_count > 0); @@ -2244,7 +2419,10 @@ async fn test_partition_level() { #[tokio::test] async fn test_unregister_moved_table() { let (_env, hummock_manager, _, worker_node) = setup_compute_env(80).await; - let context_id = worker_node.id; + let hummock_meta_client = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let original_groups = hummock_manager .get_current_version() .await @@ -2262,11 +2440,7 @@ async fn test_unregister_moved_table() { ); hummock_manager - .register_table_ids_for_test(&[(100, 2)]) - .await - .unwrap(); - hummock_manager - .register_table_ids_for_test(&[(101, 2)]) + .register_table_ids_for_test(&[(100, 2), (101, 2)]) .await .unwrap(); let sst_1 = LocalSstableInfo { @@ -2305,25 +2479,30 @@ async fn test_unregister_moved_table() { }, table_stats: Default::default(), }; - hummock_manager - .commit_epoch_for_test( + + hummock_meta_client + .commit_epoch( 30, - vec![sst_1, sst_2], - HashMap::from([(10, context_id), (11, context_id)]), + SyncResult { + uncommitted_ssts: vec![sst_1, sst_2], + ..Default::default() + }, + false, ) .await .unwrap(); - let new_group_id = hummock_manager - .split_compaction_group(2, &[100], 0) + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); + let new_compaction_group_id = hummock_manager + .split_compaction_group(compaction_group_id, &[100], 0) .await .unwrap(); - assert_ne!(new_group_id, 2); - assert!(new_group_id > StaticCompactionGroupId::End as u64); + assert_ne!(new_compaction_group_id, 2); + assert!(new_compaction_group_id > StaticCompactionGroupId::End as u64); let current_version = hummock_manager.get_current_version().await; assert_eq!( - new_group_id, + new_compaction_group_id, current_version.levels.keys().max().cloned().unwrap() ); assert_eq!(current_version.levels.len(), 3); @@ -2332,7 +2511,7 @@ async fn test_unregister_moved_table() { vec![11] ); assert_eq!( - get_compaction_group_object_ids(¤t_version, new_group_id), + get_compaction_group_object_ids(¤t_version, new_compaction_group_id), vec![10, 11] ); assert_eq!( @@ -2347,7 +2526,7 @@ async fn test_unregister_moved_table() { assert_eq!( current_version .state_table_info - .compaction_group_member_table_ids(new_group_id) + .compaction_group_member_table_ids(new_compaction_group_id) .iter() .map(|table_id| table_id.table_id) .collect_vec(), @@ -2360,7 +2539,9 @@ async fn test_unregister_moved_table() { .unwrap(); let current_version = hummock_manager.get_current_version().await; assert_eq!(current_version.levels.len(), 2); - assert!(!current_version.levels.contains_key(&new_group_id)); + assert!(!current_version + .levels + .contains_key(&new_compaction_group_id)); assert_eq!( get_compaction_group_object_ids(¤t_version, 2), vec![11] diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 499d9df0958c4..c926e2145e886 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeSet; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::SystemTime; @@ -22,6 +23,7 @@ use fail::fail_point; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; use itertools::Itertools; +use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::build_table_change_log_delta; use risingwave_hummock_sdk::compact_task::CompactTask; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; @@ -162,14 +164,63 @@ impl HummockMetaClient for MockHummockMetaClient { }) } - async fn commit_epoch(&self, epoch: HummockEpoch, sync_result: SyncResult) -> Result<()> { + async fn commit_epoch( + &self, + epoch: HummockEpoch, + sync_result: SyncResult, + is_log_store: bool, + ) -> Result<()> { let version: HummockVersion = self.hummock_manager.get_current_version().await; - let sst_to_worker = sync_result + let table_ids = version + .state_table_info + .info() + .keys() + .map(|table_id| table_id.table_id) + .collect::>(); + + let old_value_ssts_vec = if is_log_store { + sync_result.old_value_ssts.clone() + } else { + vec![] + }; + let commit_table_ids = sync_result + .uncommitted_ssts + .iter() + .flat_map(|sstable| sstable.sst_info.table_ids.clone()) + .chain({ + old_value_ssts_vec + .iter() + .flat_map(|sstable| sstable.sst_info.table_ids.clone()) + }) + .collect::>(); + + let new_table_fragment_info = if commit_table_ids + .iter() + .all(|table_id| table_ids.contains(table_id)) + { + NewTableFragmentInfo::None + } else { + NewTableFragmentInfo::Normal { + mv_table_id: None, + internal_table_ids: commit_table_ids + .iter() + .cloned() + .map(TableId::from) + .collect_vec(), + } + }; + + let sst_to_context = sync_result .uncommitted_ssts .iter() .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, self.context_id)) .collect(); let new_table_watermark = sync_result.table_watermarks; + let table_change_log_table_ids = if is_log_store { + commit_table_ids.clone() + } else { + BTreeSet::new() + }; let table_change_log = build_table_change_log_delta( sync_result .old_value_ssts @@ -177,22 +228,24 @@ impl HummockMetaClient for MockHummockMetaClient { .map(|sst| sst.sst_info), sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info), &vec![epoch], - version - .state_table_info - .info() - .keys() - .map(|table_id| (table_id.table_id, 0)), + table_change_log_table_ids + .into_iter() + .map(|table_id| (table_id, 0)), ); self.hummock_manager .commit_epoch(CommitEpochInfo { sstables: sync_result.uncommitted_ssts, new_table_watermarks: new_table_watermark, - sst_to_context: sst_to_worker, - new_table_fragment_info: NewTableFragmentInfo::None, + sst_to_context, + new_table_fragment_info, change_log_delta: table_change_log, committed_epoch: epoch, - tables_to_commit: version.state_table_info.info().keys().cloned().collect(), + tables_to_commit: commit_table_ids + .iter() + .cloned() + .map(TableId::from) + .collect(), is_visible_table_committed_epoch: true, }) .await diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 2188d9b539325..00cb52b34a0a4 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -22,7 +22,6 @@ use bytes::Bytes; use itertools::Itertools; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::test_epoch; -use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::key_with_epoch; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::level::Levels; @@ -30,12 +29,13 @@ use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo}; use risingwave_hummock_sdk::{ - CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, + CompactionGroupId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult, }; use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::CompactionConfig; use risingwave_pb::meta::add_worker_node_request::Property; +use risingwave_rpc_client::HummockMetaClient; use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; use crate::hummock::compaction::selector::{default_compaction_selector, LocalSelectorStatistic}; @@ -44,9 +44,7 @@ use crate::hummock::level_handler::LevelHandler; pub use crate::hummock::manager::CommitEpochInfo; use crate::hummock::model::CompactionGroup; use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef}; -use crate::manager::{ - ClusterManager, ClusterManagerRef, FragmentManager, MetaSrvEnv, META_NODE_ID, -}; +use crate::manager::{ClusterManager, ClusterManagerRef, FragmentManager, MetaSrvEnv}; use crate::rpc::metrics::MetaMetrics; pub fn to_local_sstable_info(ssts: &[SstableInfo]) -> Vec { @@ -55,9 +53,15 @@ pub fn to_local_sstable_info(ssts: &[SstableInfo]) -> Vec { .collect_vec() } +// This function has 3 phases: +// 1. add 3 ssts to +// 2. trigger a compaction and replace the input from phase 1 with the 1 new sst +// 3. add 1 new sst +// Please make sure the function do what you want before using it. pub async fn add_test_tables( hummock_manager: &HummockManager, - context_id: HummockContextId, + hummock_meta_client: Arc, + compaction_group_id: CompactionGroupId, ) -> Vec> { // Increase version by 2. @@ -66,43 +70,31 @@ pub async fn add_test_tables( let mut epoch = test_epoch(1); let sstable_ids = get_sst_ids(hummock_manager, 3).await; let test_tables = generate_test_sstables_with_table_id(epoch, 1, sstable_ids); - register_sstable_infos_to_compaction_group( - hummock_manager, - &test_tables, - StaticCompactionGroupId::StateDefault.into(), - ) - .await; - let ssts = to_local_sstable_info(&test_tables); - let sst_to_worker = ssts - .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, context_id)) - .collect(); - hummock_manager - .commit_epoch_for_test(epoch, ssts, sst_to_worker) + register_sstable_infos_to_compaction_group(hummock_manager, &test_tables, compaction_group_id) + .await; + let test_local_tables = to_local_sstable_info(&test_tables); + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: test_local_tables, + ..Default::default() + }, + false, + ) .await .unwrap(); + // Simulate a compaction and increase version by 1. - let mut temp_compactor = false; - if hummock_manager - .compactor_manager_ref_for_test() - .compactor_num() - == 0 - { - hummock_manager - .compactor_manager_ref_for_test() - .add_compactor(context_id); - temp_compactor = true; - } let test_tables_2 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await); register_sstable_infos_to_compaction_group( hummock_manager, &test_tables_2, - StaticCompactionGroupId::StateDefault.into(), + compaction_group_id, ) .await; - let mut selector = default_compaction_selector(); let mut compact_task = hummock_manager - .get_compact_task(StaticCompactionGroupId::StateDefault.into(), &mut selector) + .get_compact_task(compaction_group_id, &mut default_compaction_selector()) .await .unwrap() .unwrap(); @@ -114,15 +106,8 @@ pub async fn add_test_tables( .sum::(), 3 ); - compact_task.target_level = 6; - if temp_compactor { - let compactor = hummock_manager - .compactor_manager_ref_for_test() - .next_compactor() - .unwrap(); - assert_eq!(compactor.context_id(), context_id); - } + compact_task.target_level = 6; hummock_manager .report_compact_task_for_test( compact_task.task_id, @@ -133,27 +118,25 @@ pub async fn add_test_tables( ) .await .unwrap(); - if temp_compactor { - hummock_manager - .compactor_manager_ref_for_test() - .remove_compactor(context_id); - } // Increase version by 1. epoch.inc_epoch(); let test_tables_3 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await); register_sstable_infos_to_compaction_group( hummock_manager, &test_tables_3, - StaticCompactionGroupId::StateDefault.into(), + compaction_group_id, ) .await; - let ssts = to_local_sstable_info(&test_tables_3); - let sst_to_worker = ssts - .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, context_id)) - .collect(); - hummock_manager - .commit_epoch_for_test(epoch, ssts, sst_to_worker) + let test_local_tables_3 = to_local_sstable_info(&test_tables_3); + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: test_local_tables_3, + ..Default::default() + }, + false, + ) .await .unwrap(); vec![test_tables, test_tables_2, test_tables_3] @@ -290,11 +273,9 @@ pub fn get_sorted_object_ids(sstables: &[SstableInfo]) -> Vec Vec { - let levels = match hummock_version - .levels - .get(&StaticCompactionGroupId::StateDefault.into()) - { + let levels = match hummock_version.levels.get(&compaction_group_id) { Some(levels) => levels, None => return vec![], }; @@ -385,34 +366,23 @@ pub async fn get_sst_ids( (range.start_id..range.end_id).collect_vec() } -pub async fn commit_from_meta_node( - hummock_manager_ref: &HummockManager, - epoch: HummockEpoch, - ssts: Vec, -) -> crate::hummock::error::Result<()> { - let sst_to_worker = ssts - .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, META_NODE_ID)) - .collect(); - hummock_manager_ref - .commit_epoch_for_test(epoch, ssts, sst_to_worker) - .await -} - pub async fn add_ssts( epoch: HummockEpoch, hummock_manager: &HummockManager, - context_id: HummockContextId, + hummock_meta_client: Arc, ) -> Vec { let table_ids = get_sst_ids(hummock_manager, 3).await; let test_tables = generate_test_sstables_with_table_id(test_epoch(epoch), 1, table_ids); let ssts = to_local_sstable_info(&test_tables); - let sst_to_worker = ssts - .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, context_id)) - .collect(); - hummock_manager - .commit_epoch_for_test(epoch, ssts, sst_to_worker) + hummock_meta_client + .commit_epoch( + epoch, + SyncResult { + uncommitted_ssts: ssts, + ..Default::default() + }, + false, + ) .await .unwrap(); test_tables @@ -441,3 +411,12 @@ pub fn compaction_selector_context<'a>( state_table_info, } } + +pub async fn get_compaction_group_id_by_table_id( + hummock_manager_ref: HummockManagerRef, + table_id: u32, +) -> u64 { + let version = hummock_manager_ref.get_current_version().await; + let mapping = version.state_table_info.build_table_compaction_group_id(); + *mapping.get(&(table_id.into())).unwrap() +} diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index d747651b86d43..10e2c08abd6e5 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -228,17 +228,23 @@ mod tests { use std::sync::Arc; use itertools::Itertools; + use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::hummock::VacuumTask; + use risingwave_rpc_client::HummockMetaClient; use crate::backup_restore::BackupManager; use crate::hummock::test_utils::{add_test_tables, setup_compute_env}; - use crate::hummock::VacuumManager; + use crate::hummock::{MockHummockMetaClient, VacuumManager}; #[tokio::test] async fn test_vacuum() { let (env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; let context_id = worker_node.id; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + worker_node.id, + )); let compactor_manager = hummock_manager.compactor_manager_ref_for_test(); let backup_manager = Arc::new(BackupManager::for_test(env.clone(), hummock_manager.clone()).await); @@ -251,7 +257,13 @@ mod tests { assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 0); assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 0); hummock_manager.pin_version(context_id).await.unwrap(); - let sst_infos = add_test_tables(hummock_manager.as_ref(), context_id).await; + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); + let sst_infos = add_test_tables( + hummock_manager.as_ref(), + hummock_meta_client.clone(), + compaction_group_id, + ) + .await; assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 0); hummock_manager.create_version_checkpoint(1).await.unwrap(); assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 6); diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index bb62875b3fae1..db99036a34754 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -38,7 +38,12 @@ pub trait HummockMetaClient: Send + Sync + 'static { async fn get_snapshot(&self) -> Result; async fn get_new_sst_ids(&self, number: u32) -> Result; // We keep `commit_epoch` only for test/benchmark. - async fn commit_epoch(&self, epoch: HummockEpoch, sync_result: SyncResult) -> Result<()>; + async fn commit_epoch( + &self, + epoch: HummockEpoch, + sync_result: SyncResult, + is_log_store: bool, + ) -> Result<()>; async fn report_vacuum_task(&self, vacuum_task: VacuumTask) -> Result<()>; async fn trigger_manual_compaction( &self, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index c7b7204bff7c8..67ea55269b2bd 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1531,7 +1531,12 @@ impl HummockMetaClient for MetaClient { Ok(SstObjectIdRange::new(resp.start_id, resp.end_id)) } - async fn commit_epoch(&self, _epoch: HummockEpoch, _sync_result: SyncResult) -> Result<()> { + async fn commit_epoch( + &self, + _epoch: HummockEpoch, + _sync_result: SyncResult, + _is_log_store: bool, + ) -> Result<()> { panic!("Only meta service can commit_epoch in production.") } diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 92856fb5022c6..9be54ec045840 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -49,7 +49,8 @@ pub(crate) mod tests { default_compaction_selector, ManualCompactionOption, }; use risingwave_meta::hummock::test_utils::{ - register_table_ids_to_compaction_group, setup_compute_env, setup_compute_env_with_config, + get_compaction_group_id_by_table_id, register_table_ids_to_compaction_group, + setup_compute_env, setup_compute_env_with_config, unregister_table_ids_from_compaction_group, }; use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient}; @@ -162,6 +163,8 @@ pub(crate) mod tests { let mut local = storage .new_local(NewLocalOptions::for_test(TableId::default())) .await; + let table_id = local.table_id(); + let table_id_set = HashSet::from_iter([table_id]); // 1. add sstables let val = b"0"[..].repeat(value_size); local.init_for_test(epochs[0]).await.unwrap(); @@ -188,8 +191,14 @@ pub(crate) mod tests { } else { local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); } - let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); - hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); + let res = storage + .seal_and_sync_epoch(epoch, table_id_set.clone()) + .await + .unwrap(); + hummock_meta_client + .commit_epoch(epoch, res, false) + .await + .unwrap(); } } @@ -228,6 +237,7 @@ pub(crate) mod tests { )); // 1. add sstables + let table_id = 0; let mut key = BytesMut::default(); key.put_u16(0); key.put_slice(b"same_key"); @@ -235,7 +245,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - &[0], + &[table_id], ) .await; let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone() @@ -283,12 +293,12 @@ pub(crate) mod tests { .collect_vec(), ) .await; + + let compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager_ref.clone(), table_id).await; // 2. get compact task while let Some(mut compact_task) = hummock_manager_ref - .get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - &mut default_compaction_selector(), - ) + .get_compact_task(compaction_group_id, &mut default_compaction_selector()) .await .unwrap() { @@ -333,8 +343,7 @@ pub(crate) mod tests { // 4. get the latest version and check let version = hummock_manager_ref.get_current_version().await; - let group = - version.get_compaction_group_levels(StaticCompactionGroupId::StateDefault.into()); + let group = version.get_compaction_group_levels(compaction_group_id); // base level let output_tables = group @@ -401,11 +410,12 @@ pub(crate) mod tests { worker_node.id, )); + let table_id = 0; let storage = get_hummock_storage( hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - &[0], + &[table_id], ) .await; @@ -447,12 +457,10 @@ pub(crate) mod tests { // 2. get compact task - // 3. compact + let compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager_ref.clone(), table_id).await; while let Some(compact_task) = hummock_manager_ref - .get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - &mut default_compaction_selector(), - ) + .get_compact_task(compaction_group_id, &mut default_compaction_selector()) .await .unwrap() { @@ -481,7 +489,7 @@ pub(crate) mod tests { // 4. get the latest version and check let version = hummock_manager_ref.get_current_version().await; let output_tables = version - .get_compaction_group_levels(StaticCompactionGroupId::StateDefault.into()) + .get_compaction_group_levels(compaction_group_id) .levels .iter() .flat_map(|level| level.table_infos.clone()) @@ -523,9 +531,16 @@ pub(crate) mod tests { hummock_meta_client: &Arc, storage: &HummockStorage, epoch: u64, + table_id: TableId, ) { - let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); - hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); + let res = storage + .seal_and_sync_epoch(epoch, HashSet::from([table_id])) + .await + .unwrap(); + hummock_meta_client + .commit_epoch(epoch, res, false) + .await + .unwrap(); } async fn prepare_data( @@ -538,8 +553,9 @@ pub(crate) mod tests { let kv_count: u16 = 128; let mut epoch = test_epoch(1); let mut local = storage.new_local(NewLocalOptions::for_test(table_id)).await; + let table_id_set = HashSet::from_iter([table_id]); - storage.start_epoch(epoch, HashSet::from_iter([table_id])); + storage.start_epoch(epoch, table_id_set); // 1. add sstables let val = Bytes::from(b"0"[..].repeat(1 << 10)); // 1024 Byte value @@ -561,7 +577,7 @@ pub(crate) mod tests { storage.start_epoch(next_epoch, HashSet::from_iter([table_id])); local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); - flush_and_commit(&hummock_meta_client, storage, epoch).await; + flush_and_commit(&hummock_meta_client, storage, epoch, table_id).await; epoch.inc_epoch(); } } @@ -615,6 +631,10 @@ pub(crate) mod tests { ) .await; + let compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager_ref.clone(), existing_table_id) + .await; + // Mimic dropping table unregister_table_ids_from_compaction_group(&hummock_manager_ref, &[existing_table_id]) .await; @@ -625,34 +645,19 @@ pub(crate) mod tests { }; // 2. get compact task and there should be none let compact_task = hummock_manager_ref - .manual_get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - manual_compcation_option, - ) + .manual_get_compact_task(compaction_group_id, manual_compcation_option) .await .unwrap(); assert!(compact_task.is_none()); - // 3. get the latest version and check - let version = hummock_manager_ref.get_current_version().await; - let output_level_info = version - .get_compaction_group_levels(StaticCompactionGroupId::StateDefault.into()) - .levels - .last() - .unwrap(); - assert_eq!(0, output_level_info.total_file_size); - - // 5. get compact task - let compact_task = hummock_manager_ref - .get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - &mut default_compaction_selector(), - ) - .await - .unwrap(); + let current_version = hummock_manager_ref.get_current_version().await; + assert!(current_version + .get_sst_ids_by_group_id(compaction_group_id) + .collect_vec() + .is_empty()); - assert!(compact_task.is_none()); + // assert_eq!(0, current_version.num_levels(compaction_group_id)); } #[tokio::test] @@ -678,6 +683,10 @@ pub(crate) mod tests { .new_local(NewLocalOptions::for_test(TableId::from(2))) .await; + let table_id_1 = storage_1.table_id(); + let table_id_2 = storage_2.table_id(); + let table_id_set = HashSet::from_iter([table_id_1, table_id_2]); + let rpc_filter_key_extractor_manager = match global_storage.filter_key_extractor_manager().clone() { FilterKeyExtractorManager::RpcFilterKeyExtractorManager( @@ -687,12 +696,12 @@ pub(crate) mod tests { }; rpc_filter_key_extractor_manager.update( - 1, + table_id_1.table_id(), Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)), ); rpc_filter_key_extractor_manager.update( - 2, + table_id_2.table_id(), Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)), ); let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( @@ -712,13 +721,13 @@ pub(crate) mod tests { // 1. add sstables let val = Bytes::from(b"0"[..].repeat(1 << 10)); // 1024 Byte value - let drop_table_id = 1; - let existing_table_ids = 2; + let drop_table_id = table_id_1.table_id(); + let existing_table_id = table_id_2.table_id(); let kv_count: usize = 128; let mut epoch = test_epoch(1); register_table_ids_to_compaction_group( &hummock_manager_ref, - &[drop_table_id, existing_table_ids], + &[drop_table_id, existing_table_id], StaticCompactionGroupId::StateDefault.into(), ) .await; @@ -728,10 +737,10 @@ pub(crate) mod tests { .await; let vnode = VirtualNode::from_index(1); - global_storage.start_epoch(epoch, HashSet::from_iter([1.into(), 2.into()])); + global_storage.start_epoch(epoch, table_id_set.clone()); for index in 0..kv_count { let next_epoch = epoch.next_epoch(); - global_storage.start_epoch(next_epoch, HashSet::from_iter([1.into(), 2.into()])); + global_storage.start_epoch(next_epoch, table_id_set.clone()); if index == 0 { storage_1.init_for_test(epoch).await.unwrap(); storage_2.init_for_test(epoch).await.unwrap(); @@ -755,8 +764,14 @@ pub(crate) mod tests { storage.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); other.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); - let res = global_storage.seal_and_sync_epoch(epoch).await.unwrap(); - hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); + let res = global_storage + .seal_and_sync_epoch(epoch, table_id_set.clone()) + .await + .unwrap(); + hummock_meta_client + .commit_epoch(epoch, res, false) + .await + .unwrap(); epoch.inc_epoch(); } @@ -767,12 +782,12 @@ pub(crate) mod tests { level: 0, ..Default::default() }; + let compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager_ref.clone(), existing_table_id) + .await; // 2. get compact task let mut compact_task = hummock_manager_ref - .manual_get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - manual_compcation_option, - ) + .manual_get_compact_task(compaction_group_id, manual_compcation_option) .await .unwrap() .unwrap(); @@ -813,7 +828,7 @@ pub(crate) mod tests { // 5. get the latest version and check let version: HummockVersion = hummock_manager_ref.get_current_version().await; let mut tables_from_version = vec![]; - version.level_iter(StaticCompactionGroupId::StateDefault.into(), |level| { + version.level_iter(compaction_group_id, |level| { tables_from_version.extend(level.table_infos.iter().cloned()); true }); @@ -832,10 +847,7 @@ pub(crate) mod tests { // 6. get compact task and there should be none let compact_task = hummock_manager_ref - .get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - &mut default_compaction_selector(), - ) + .get_compact_task(compaction_group_id, &mut default_compaction_selector()) .await .unwrap(); assert!(compact_task.is_none()); @@ -853,7 +865,7 @@ pub(crate) mod tests { epoch, None, ReadOptions { - table_id: TableId::from(existing_table_ids), + table_id: TableId::from(existing_table_id), prefetch_options: PrefetchOptions::default(), cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() @@ -864,7 +876,7 @@ pub(crate) mod tests { let mut scan_count = 0; for (k, _) in scan_result { let table_id = k.user_key.table_id.table_id(); - assert_eq!(table_id, existing_table_ids); + assert_eq!(table_id, existing_table_id); scan_count += 1; } assert_eq!(key_count, scan_count); @@ -905,7 +917,7 @@ pub(crate) mod tests { .sstable_id_remote_fetch_number, )); rpc_filter_key_extractor_manager.update( - 2, + existing_table_id, Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)), ); let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( @@ -922,14 +934,15 @@ pub(crate) mod tests { let vnode = VirtualNode::from_index(1); let mut epoch_set = BTreeSet::new(); - storage.start_epoch(epoch, HashSet::from_iter([existing_table_id.into()])); + let table_id_set = HashSet::from_iter([existing_table_id.into()]); + storage.start_epoch(epoch, table_id_set.clone()); let mut local = storage .new_local(NewLocalOptions::for_test(existing_table_id.into())) .await; for i in 0..kv_count { let next_epoch = epoch + millisec_interval_epoch; - storage.start_epoch(next_epoch, HashSet::from_iter([existing_table_id.into()])); + storage.start_epoch(next_epoch, table_id_set.clone()); if i == 0 { local.init_for_test(epoch).await.unwrap(); } @@ -945,8 +958,14 @@ pub(crate) mod tests { local.flush().await.unwrap(); local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); - let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); - hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); + let res = storage + .seal_and_sync_epoch(epoch, table_id_set.clone()) + .await + .unwrap(); + hummock_meta_client + .commit_epoch(epoch, res, false) + .await + .unwrap(); epoch += millisec_interval_epoch; } @@ -954,12 +973,12 @@ pub(crate) mod tests { level: 0, ..Default::default() }; + let compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager_ref.clone(), existing_table_id) + .await; // 2. get compact task let mut compact_task = hummock_manager_ref - .manual_get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - manual_compcation_option, - ) + .manual_get_compact_task(compaction_group_id, manual_compcation_option) .await .unwrap() .unwrap(); @@ -1012,7 +1031,7 @@ pub(crate) mod tests { // 4. get the latest version and check let version: HummockVersion = hummock_manager_ref.get_current_version().await; let mut tables_from_version = vec![]; - version.level_iter(StaticCompactionGroupId::StateDefault.into(), |level| { + version.level_iter(compaction_group_id, |level| { tables_from_version.extend(level.table_infos.iter().cloned()); true }); @@ -1032,10 +1051,7 @@ pub(crate) mod tests { // 5. get compact task and there should be none let compact_task = hummock_manager_ref - .get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - &mut default_compaction_selector(), - ) + .get_compact_task(compaction_group_id, &mut default_compaction_selector()) .await .unwrap(); assert!(compact_task.is_none()); @@ -1130,13 +1146,14 @@ pub(crate) mod tests { let mut local = storage .new_local(NewLocalOptions::for_test(existing_table_id.into())) .await; - storage.start_epoch(epoch, HashSet::from_iter([existing_table_id.into()])); + let table_id_set = HashSet::from_iter([existing_table_id.into()]); + storage.start_epoch(epoch, table_id_set.clone()); for i in 0..kv_count { if i == 0 { local.init_for_test(epoch).await.unwrap(); } let next_epoch = epoch + millisec_interval_epoch; - storage.start_epoch(next_epoch, HashSet::from_iter([existing_table_id.into()])); + storage.start_epoch(next_epoch, table_id_set.clone()); epoch_set.insert(epoch); let ramdom_key = [key_prefix.as_ref(), &rand::thread_rng().gen::<[u8; 32]>()].concat(); @@ -1145,8 +1162,14 @@ pub(crate) mod tests { .unwrap(); local.flush().await.unwrap(); local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); - let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); - hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); + let res = storage + .seal_and_sync_epoch(epoch, table_id_set.clone()) + .await + .unwrap(); + hummock_meta_client + .commit_epoch(epoch, res, false) + .await + .unwrap(); epoch += millisec_interval_epoch; } @@ -1154,12 +1177,12 @@ pub(crate) mod tests { level: 0, ..Default::default() }; + let compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager_ref.clone(), existing_table_id) + .await; // 2. get compact task let mut compact_task = hummock_manager_ref - .manual_get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - manual_compcation_option, - ) + .manual_get_compact_task(compaction_group_id, manual_compcation_option) .await .unwrap() .unwrap(); @@ -1204,7 +1227,7 @@ pub(crate) mod tests { // 4. get the latest version and check let version: HummockVersion = hummock_manager_ref.get_current_version().await; let tables_from_version: Vec<_> = version - .get_compaction_group_levels(StaticCompactionGroupId::StateDefault.into()) + .get_compaction_group_levels(compaction_group_id) .levels .iter() .flat_map(|level| level.table_infos.iter()) @@ -1225,10 +1248,7 @@ pub(crate) mod tests { // 5. get compact task and there should be none let compact_task = hummock_manager_ref - .get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - &mut default_compaction_selector(), - ) + .get_compact_task(compaction_group_id, &mut default_compaction_selector()) .await .unwrap(); assert!(compact_task.is_none()); @@ -1323,18 +1343,24 @@ pub(crate) mod tests { // .unwrap(); local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); - flush_and_commit(&hummock_meta_client, &storage, epoch).await; + flush_and_commit( + &hummock_meta_client, + &storage, + epoch, + existing_table_id.into(), + ) + .await; let manual_compcation_option = ManualCompactionOption { level: 0, ..Default::default() }; // 2. get compact task + let compaction_group_id = + get_compaction_group_id_by_table_id(hummock_manager_ref.clone(), existing_table_id) + .await; let mut compact_task = hummock_manager_ref - .manual_get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - manual_compcation_option, - ) + .manual_get_compact_task(compaction_group_id, manual_compcation_option) .await .unwrap() .unwrap(); @@ -1375,7 +1401,7 @@ pub(crate) mod tests { // 4. get the latest version and check let version = hummock_manager_ref.get_current_version().await; let output_level_info = version - .get_compaction_group_levels(StaticCompactionGroupId::StateDefault.into()) + .get_compaction_group_levels(compaction_group_id) .levels .last() .unwrap(); @@ -2103,8 +2129,14 @@ pub(crate) mod tests { .0 .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); - let res = storage.seal_and_sync_epoch(*epoch).await.unwrap(); - hummock_meta_client.commit_epoch(*epoch, res).await.unwrap(); + let res = storage + .seal_and_sync_epoch(*epoch, table_id_set.clone()) + .await + .unwrap(); + hummock_meta_client + .commit_epoch(*epoch, res, false) + .await + .unwrap(); *epoch += millisec_interval_epoch; } } diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index 240c07cd82c4b..27072abba08f2 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::ops::Bound; use std::sync::Arc; @@ -140,8 +141,12 @@ async fn test_failpoints_state_store_read_upload() { ); // sync epoch1 test the read_error - let res = hummock_storage.seal_and_sync_epoch(1).await.unwrap(); - meta_client.commit_epoch(1, res).await.unwrap(); + let table_id_set = HashSet::from_iter([local.table_id()]); + let res = hummock_storage + .seal_and_sync_epoch(1, table_id_set.clone()) + .await + .unwrap(); + meta_client.commit_epoch(1, res, false).await.unwrap(); hummock_storage .try_wait_epoch(HummockReadEpoch::Committed(1)) .await @@ -208,12 +213,17 @@ async fn test_failpoints_state_store_read_upload() { // test the upload_error fail::cfg(mem_upload_err, "return").unwrap(); - let result = hummock_storage.seal_and_sync_epoch(3).await; + let result = hummock_storage + .seal_and_sync_epoch(3, table_id_set.clone()) + .await; assert!(result.is_err()); fail::remove(mem_upload_err); - let res = hummock_storage.seal_and_sync_epoch(3).await.unwrap(); - meta_client.commit_epoch(3, res).await.unwrap(); + let res = hummock_storage + .seal_and_sync_epoch(3, table_id_set) + .await + .unwrap(); + meta_client.commit_epoch(3, res, false).await.unwrap(); hummock_storage .try_wait_epoch(HummockReadEpoch::Committed(3)) .await diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index fc0fd6ae97b4f..18bad67a62570 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -461,6 +461,7 @@ async fn test_storage_basic() { #[tokio::test] async fn test_state_store_sync() { const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let table_id_set = HashSet::from_iter([TEST_TABLE_ID]); let test_env = prepare_hummock_test_env().await; test_env.register_table_id(TEST_TABLE_ID).await; let mut hummock_storage = test_env @@ -557,10 +558,14 @@ async fn test_state_store_sync() { .start_epoch(epoch3, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); - let res = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); + let res = test_env + .storage + .seal_and_sync_epoch(epoch1, table_id_set.clone()) + .await + .unwrap(); test_env .meta_client - .commit_epoch(epoch1, res) + .commit_epoch(epoch1, res, false) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch1).await; @@ -599,10 +604,14 @@ async fn test_state_store_sync() { } } - let res = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); + let res = test_env + .storage + .seal_and_sync_epoch(epoch2, table_id_set.clone()) + .await + .unwrap(); test_env .meta_client - .commit_epoch(epoch2, res) + .commit_epoch(epoch2, res, false) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch2).await; @@ -819,6 +828,7 @@ async fn test_state_store_sync() { #[tokio::test] async fn test_delete_get() { const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let table_id_set = HashSet::from_iter([TEST_TABLE_ID]); let test_env = prepare_hummock_test_env().await; test_env.register_table_id(TEST_TABLE_ID).await; let mut hummock_storage = test_env @@ -864,10 +874,14 @@ async fn test_delete_get() { .storage .start_epoch(epoch2, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); - let res = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); + let res = test_env + .storage + .seal_and_sync_epoch(epoch1, table_id_set.clone()) + .await + .unwrap(); test_env .meta_client - .commit_epoch(epoch1, res) + .commit_epoch(epoch1, res, false) .await .unwrap(); @@ -886,10 +900,14 @@ async fn test_delete_get() { .await .unwrap(); hummock_storage.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); - let res = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); + let res = test_env + .storage + .seal_and_sync_epoch(epoch2, table_id_set) + .await + .unwrap(); test_env .meta_client - .commit_epoch(epoch2, res) + .commit_epoch(epoch2, res, false) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch2).await; @@ -912,6 +930,7 @@ async fn test_delete_get() { #[tokio::test] async fn test_multiple_epoch_sync() { const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let table_id_set = HashSet::from_iter([TEST_TABLE_ID]); let test_env = prepare_hummock_test_env().await; test_env.register_table_id(TEST_TABLE_ID).await; let mut hummock_storage = test_env @@ -1054,19 +1073,27 @@ async fn test_multiple_epoch_sync() { .storage .start_epoch(epoch4, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch4, SealCurrentEpochOptions::for_test()); - let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); - let sync_result3 = test_env.storage.seal_and_sync_epoch(epoch3).await.unwrap(); + let sync_result2 = test_env + .storage + .seal_and_sync_epoch(epoch2, table_id_set.clone()) + .await + .unwrap(); + let sync_result3 = test_env + .storage + .seal_and_sync_epoch(epoch3, table_id_set) + .await + .unwrap(); test_get().await; test_env .meta_client - .commit_epoch(epoch2, sync_result2) + .commit_epoch(epoch2, sync_result2, false) .await .unwrap(); test_env .meta_client - .commit_epoch(epoch3, sync_result3) + .commit_epoch(epoch3, sync_result3, false) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch3).await; @@ -1076,6 +1103,7 @@ async fn test_multiple_epoch_sync() { #[tokio::test] async fn test_iter_with_min_epoch() { const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let table_id_set = HashSet::from_iter([TEST_TABLE_ID]); let test_env = prepare_hummock_test_env().await; test_env.register_table_id(TEST_TABLE_ID).await; let mut hummock_storage = test_env @@ -1222,16 +1250,24 @@ async fn test_iter_with_min_epoch() { { // test after sync - let sync_result1 = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); - let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); + let sync_result1 = test_env + .storage + .seal_and_sync_epoch(epoch1, table_id_set.clone()) + .await + .unwrap(); + let sync_result2 = test_env + .storage + .seal_and_sync_epoch(epoch2, table_id_set) + .await + .unwrap(); test_env .meta_client - .commit_epoch(epoch1, sync_result1) + .commit_epoch(epoch1, sync_result1, false) .await .unwrap(); test_env .meta_client - .commit_epoch(epoch2, sync_result2) + .commit_epoch(epoch2, sync_result2, false) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch2).await; @@ -1320,6 +1356,7 @@ async fn test_iter_with_min_epoch() { #[tokio::test] async fn test_hummock_version_reader() { const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let table_id_set = HashSet::from_iter([TEST_TABLE_ID]); let test_env = prepare_hummock_test_env().await; test_env.register_table_id(TEST_TABLE_ID).await; let mut hummock_storage = test_env @@ -1514,26 +1551,38 @@ async fn test_hummock_version_reader() { } { - let sync_result1 = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); + let sync_result1 = test_env + .storage + .seal_and_sync_epoch(epoch1, table_id_set.clone()) + .await + .unwrap(); test_env .meta_client - .commit_epoch(epoch1, sync_result1) + .commit_epoch(epoch1, sync_result1, false) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch1).await; - let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); + let sync_result2 = test_env + .storage + .seal_and_sync_epoch(epoch2, table_id_set.clone()) + .await + .unwrap(); test_env .meta_client - .commit_epoch(epoch2, sync_result2) + .commit_epoch(epoch2, sync_result2, false) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch2).await; - let sync_result3 = test_env.storage.seal_and_sync_epoch(epoch3).await.unwrap(); + let sync_result3 = test_env + .storage + .seal_and_sync_epoch(epoch3, table_id_set) + .await + .unwrap(); test_env .meta_client - .commit_epoch(epoch3, sync_result3) + .commit_epoch(epoch3, sync_result3, false) .await .unwrap(); test_env.storage.try_wait_epoch_for_test(epoch3).await; @@ -1764,6 +1813,7 @@ async fn test_hummock_version_reader() { #[tokio::test] async fn test_get_with_min_epoch() { const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let table_id_set = HashSet::from_iter([TEST_TABLE_ID]); let test_env = prepare_hummock_test_env().await; test_env.register_table_id(TEST_TABLE_ID).await; let mut hummock_storage = test_env @@ -1908,16 +1958,24 @@ async fn test_get_with_min_epoch() { // test after sync - let sync_result1 = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); - let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); + let sync_result1 = test_env + .storage + .seal_and_sync_epoch(epoch1, table_id_set.clone()) + .await + .unwrap(); + let sync_result2 = test_env + .storage + .seal_and_sync_epoch(epoch2, table_id_set) + .await + .unwrap(); test_env .meta_client - .commit_epoch(epoch1, sync_result1) + .commit_epoch(epoch1, sync_result1, false) .await .unwrap(); test_env .meta_client - .commit_epoch(epoch2, sync_result2) + .commit_epoch(epoch2, sync_result2, false) .await .unwrap(); diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index bde3c046ed6ca..b15e8a3fa372c 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -139,10 +139,13 @@ async fn test_snapshot_inner( hummock_storage.start_epoch(epoch2, HashSet::from_iter([Default::default()])); local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); if enable_sync { - let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); + let res = hummock_storage + .seal_and_sync_epoch(epoch1, HashSet::from_iter([local.table_id()])) + .await + .unwrap(); if enable_commit { mock_hummock_meta_client - .commit_epoch(epoch1, res) + .commit_epoch(epoch1, res, false) .await .unwrap(); hummock_storage @@ -180,10 +183,13 @@ async fn test_snapshot_inner( hummock_storage.start_epoch(epoch3, HashSet::from_iter([Default::default()])); local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); if enable_sync { - let res = hummock_storage.seal_and_sync_epoch(epoch2).await.unwrap(); + let res = hummock_storage + .seal_and_sync_epoch(epoch2, HashSet::from_iter([local.table_id()])) + .await + .unwrap(); if enable_commit { mock_hummock_meta_client - .commit_epoch(epoch2, res) + .commit_epoch(epoch2, res, false) .await .unwrap(); hummock_storage @@ -220,10 +226,13 @@ async fn test_snapshot_inner( .unwrap(); local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); if enable_sync { - let res = hummock_storage.seal_and_sync_epoch(epoch3).await.unwrap(); + let res = hummock_storage + .seal_and_sync_epoch(epoch3, HashSet::from_iter([local.table_id()])) + .await + .unwrap(); if enable_commit { mock_hummock_meta_client - .commit_epoch(epoch3, res) + .commit_epoch(epoch3, res, false) .await .unwrap(); hummock_storage @@ -279,10 +288,13 @@ async fn test_snapshot_range_scan_inner( .unwrap(); local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); if enable_sync { - let res = hummock_storage.seal_and_sync_epoch(epoch).await.unwrap(); + let res = hummock_storage + .seal_and_sync_epoch(epoch, HashSet::from_iter([local.table_id()])) + .await + .unwrap(); if enable_commit { mock_hummock_meta_client - .commit_epoch(epoch, res) + .commit_epoch(epoch, res, false) .await .unwrap(); hummock_storage diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 67da2150735af..ab1e84aca2a66 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -375,8 +375,15 @@ async fn test_basic_v2() { .unwrap(); let len = count_stream(iter).await; assert_eq!(len, 4); - let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); - meta_client.commit_epoch(epoch1, res).await.unwrap(); + let res = hummock_storage + .seal_and_sync_epoch(epoch1, HashSet::from_iter([local.table_id()])) + .await + .unwrap(); + let is_log_store = false; + meta_client + .commit_epoch(epoch1, res, is_log_store) + .await + .unwrap(); hummock_storage .try_wait_epoch(HummockReadEpoch::Committed(epoch1)) .await @@ -516,11 +523,15 @@ async fn test_state_store_sync_v2() { local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); // trigger a sync + let table_id_set = HashSet::from_iter([local.table_id()]); + hummock_storage + .seal_and_sync_epoch(epoch.prev_epoch(), table_id_set.clone()) + .await + .unwrap(); hummock_storage - .seal_and_sync_epoch(epoch.prev_epoch()) + .seal_and_sync_epoch(epoch, table_id_set) .await .unwrap(); - hummock_storage.seal_and_sync_epoch(epoch).await.unwrap(); // TODO: Uncomment the following lines after flushed sstable can be accessed. // FYI: https://github.com/risingwavelabs/risingwave/pull/1928#discussion_r852698719 @@ -1056,8 +1067,16 @@ async fn test_delete_get_v2() { hummock_storage.start_epoch(epoch2, HashSet::from_iter([Default::default()])); local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); - let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); - meta_client.commit_epoch(epoch1, res).await.unwrap(); + let table_id_set = HashSet::from_iter([local.table_id()]); + let res = hummock_storage + .seal_and_sync_epoch(epoch1, table_id_set.clone()) + .await + .unwrap(); + let is_log_store = false; + meta_client + .commit_epoch(epoch1, res, is_log_store) + .await + .unwrap(); let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), @@ -1074,8 +1093,14 @@ async fn test_delete_get_v2() { .await .unwrap(); local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); - let res = hummock_storage.seal_and_sync_epoch(epoch2).await.unwrap(); - meta_client.commit_epoch(epoch2, res).await.unwrap(); + let res = hummock_storage + .seal_and_sync_epoch(epoch2, table_id_set) + .await + .unwrap(); + meta_client + .commit_epoch(epoch2, res, is_log_store) + .await + .unwrap(); hummock_storage .try_wait_epoch(HummockReadEpoch::Committed(epoch2)) .await @@ -1114,6 +1139,7 @@ async fn test_multiple_epoch_sync_v2() { let mut local = hummock_storage .new_local(NewLocalOptions::for_test(TableId::default())) .await; + let table_id_set = HashSet::from_iter([local.table_id()]); hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); local.init_for_test(epoch1).await.unwrap(); local @@ -1217,17 +1243,23 @@ async fn test_multiple_epoch_sync_v2() { } }; test_get().await; - let sync_result2 = hummock_storage.seal_and_sync_epoch(epoch2).await.unwrap(); - let sync_result3 = hummock_storage.seal_and_sync_epoch(epoch3).await.unwrap(); + let sync_result2 = hummock_storage + .seal_and_sync_epoch(epoch2, table_id_set.clone()) + .await + .unwrap(); + let sync_result3 = hummock_storage + .seal_and_sync_epoch(epoch3, table_id_set) + .await + .unwrap(); test_get().await; meta_client - .commit_epoch(epoch2, sync_result2) + .commit_epoch(epoch2, sync_result2, false) .await .unwrap(); meta_client - .commit_epoch(epoch3, sync_result3) + .commit_epoch(epoch3, sync_result3, false) .await .unwrap(); hummock_storage @@ -1251,6 +1283,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { let mut local_hummock_storage = hummock_storage .new_local(NewLocalOptions::for_test(Default::default())) .await; + let table_id_set = HashSet::from_iter([local_hummock_storage.table_id()]); let initial_epoch = hummock_storage.get_pinned_version().max_committed_epoch(); let epoch1 = initial_epoch.next_epoch(); @@ -1305,7 +1338,10 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .unwrap() }; local_hummock_storage.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); - let sync_result1 = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); + let sync_result1 = hummock_storage + .seal_and_sync_epoch(epoch1, table_id_set.clone()) + .await + .unwrap(); let min_object_id_epoch1 = min_object_id(&sync_result1); assert_eq!( hummock_storage @@ -1313,7 +1349,10 @@ async fn test_gc_watermark_and_clear_shared_buffer() { .global_watermark_object_id(), min_object_id_epoch1, ); - let sync_result2 = hummock_storage.seal_and_sync_epoch(epoch2).await.unwrap(); + let sync_result2 = hummock_storage + .seal_and_sync_epoch(epoch2, table_id_set) + .await + .unwrap(); let min_object_id_epoch2 = min_object_id(&sync_result2); assert_eq!( hummock_storage @@ -1322,7 +1361,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { min_object_id_epoch1, ); meta_client - .commit_epoch(epoch1, sync_result1) + .commit_epoch(epoch1, sync_result1, false) .await .unwrap(); hummock_storage @@ -1555,12 +1594,13 @@ async fn test_iter_log() { hummock_storage.start_epoch(MAX_EPOCH, HashSet::from_iter([table_id])); let in_memory_state_store = MemoryStateStore::new(); + let is_log_store = true; let mut in_memory_local = in_memory_state_store .new_local(NewLocalOptions { table_id, op_consistency_level: OpConsistencyLevel::ConsistentOldValue { check_old_value: CHECK_BYTES_EQUAL.clone(), - is_log_store: true, + is_log_store, }, table_option: Default::default(), is_replicated: false, @@ -1575,7 +1615,7 @@ async fn test_iter_log() { table_id, op_consistency_level: OpConsistencyLevel::ConsistentOldValue { check_old_value: CHECK_BYTES_EQUAL.clone(), - is_log_store: true, + is_log_store, }, table_option: Default::default(), is_replicated: false, @@ -1585,13 +1625,17 @@ async fn test_iter_log() { // flush for about 10 times per epoch apply_test_log_data(test_log_data.clone(), &mut hummock_local, 0.001).await; + let table_id_set = HashSet::from_iter([table_id]); for (epoch, _) in &test_log_data { - let res = hummock_storage.seal_and_sync_epoch(*epoch).await.unwrap(); + let res = hummock_storage + .seal_and_sync_epoch(*epoch, table_id_set.clone()) + .await + .unwrap(); if *epoch != test_log_data[0].0 { assert!(!res.old_value_ssts.is_empty()); } assert!(!res.uncommitted_ssts.is_empty()); - meta_client.commit_epoch(*epoch, res).await.unwrap(); + meta_client.commit_epoch(*epoch, res, true).await.unwrap(); } hummock_storage diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index 008c667ccedf5..84cdf5513cdeb 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -302,7 +302,13 @@ async fn test_syncpoints_get_in_delete_range_boundary() { test_epoch(101), risingwave_storage::store::SealCurrentEpochOptions::for_test(), ); - flush_and_commit(&hummock_meta_client, &storage, test_epoch(100)).await; + flush_and_commit( + &hummock_meta_client, + &storage, + test_epoch(100), + local.table_id(), + ) + .await; compact_once( hummock_manager_ref.clone(), compact_ctx.clone(), @@ -337,7 +343,13 @@ async fn test_syncpoints_get_in_delete_range_boundary() { test_epoch(102), risingwave_storage::store::SealCurrentEpochOptions::for_test(), ); - flush_and_commit(&hummock_meta_client, &storage, test_epoch(101)).await; + flush_and_commit( + &hummock_meta_client, + &storage, + test_epoch(101), + local.table_id(), + ) + .await; compact_once( hummock_manager_ref.clone(), compact_ctx.clone(), @@ -372,7 +384,13 @@ async fn test_syncpoints_get_in_delete_range_boundary() { test_epoch(103), risingwave_storage::store::SealCurrentEpochOptions::for_test(), ); - flush_and_commit(&hummock_meta_client, &storage, test_epoch(102)).await; + flush_and_commit( + &hummock_meta_client, + &storage, + test_epoch(102), + local.table_id(), + ) + .await; // move this two file to the same level. compact_once( hummock_manager_ref.clone(), @@ -401,7 +419,13 @@ async fn test_syncpoints_get_in_delete_range_boundary() { u64::MAX, risingwave_storage::store::SealCurrentEpochOptions::for_test(), ); - flush_and_commit(&hummock_meta_client, &storage, test_epoch(103)).await; + flush_and_commit( + &hummock_meta_client, + &storage, + test_epoch(103), + local.table_id(), + ) + .await; // move this two file to the same level. compact_once( hummock_manager_ref.clone(), diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index bf5c4a8dd8d8c..da861ff92810c 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -243,8 +243,24 @@ impl HummockTestEnv { // Seal, sync and commit a epoch. // On completion of this function call, the provided epoch should be committed and visible. pub async fn commit_epoch(&self, epoch: u64) { - let res = self.storage.seal_and_sync_epoch(epoch).await.unwrap(); - self.meta_client.commit_epoch(epoch, res).await.unwrap(); + let table_ids = self + .manager + .get_current_version() + .await + .state_table_info + .info() + .keys() + .cloned() + .collect(); + let res = self + .storage + .seal_and_sync_epoch(epoch, table_ids) + .await + .unwrap(); + self.meta_client + .commit_epoch(epoch, res, false) + .await + .unwrap(); self.storage.try_wait_epoch_for_test(epoch).await; } diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index d123558acc50b..038856a3ba2f3 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -80,7 +80,12 @@ impl HummockMetaClient for MonitoredHummockMetaClient { res } - async fn commit_epoch(&self, _epoch: HummockEpoch, _sync_result: SyncResult) -> Result<()> { + async fn commit_epoch( + &self, + _epoch: HummockEpoch, + _sync_result: SyncResult, + _is_log_store: bool, + ) -> Result<()> { panic!("Only meta service can commit_epoch in production.") } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index b4924a5dca60f..888de0db1af1c 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -661,17 +661,8 @@ impl HummockStorage { pub async fn seal_and_sync_epoch( &self, epoch: u64, + table_ids: HashSet, ) -> StorageResult { - let table_ids = self - .recent_versions - .load() - .latest_version() - .version() - .state_table_info - .info() - .keys() - .cloned() - .collect(); self.sync(epoch, table_ids).await } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index f4e62e429effa..440c7188d2fa1 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -526,7 +526,11 @@ mod tests { let epoch3 = epoch2.next_epoch(); writer.flush_current_epoch(epoch3, true).await.unwrap(); - let sync_result = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); + let sync_result = test_env + .storage + .seal_and_sync_epoch(epoch2, HashSet::from_iter([table.id.into()])) + .await + .unwrap(); assert!(!sync_result.uncommitted_ssts.is_empty()); reader.init().await.unwrap(); diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 13df85bf25d97..4fd246208b69a 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::future::Future; use std::ops::{Bound, RangeBounds}; use std::pin::{pin, Pin}; @@ -317,6 +318,7 @@ async fn run_compare_result( let mut normal = NormalState::new(hummock, 1, init_epoch).await; let mut delete_range = DeleteRangeState::new(hummock, 2, init_epoch).await; + let table_id_set = HashSet::from_iter([1.into(), 2.into()]); const RANGE_BASE: u64 = 4000; let range_mod = test_range / RANGE_BASE; @@ -381,9 +383,12 @@ async fn run_compare_result( normal.commit(next_epoch).await?; delete_range.commit(next_epoch).await?; // let checkpoint = epoch % 10 == 0; - let ret = hummock.seal_and_sync_epoch(epoch).await.unwrap(); + let ret = hummock + .seal_and_sync_epoch(epoch, table_id_set.clone()) + .await + .unwrap(); meta_client - .commit_epoch(epoch, ret) + .commit_epoch(epoch, ret, false) .await .map_err(|e| format!("{:?}", e))?; if (epoch / test_epoch(1)) % 200 == 0 {