From ffd9728d90fded0038bd556a993097c0c9809fbb Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 9 Sep 2024 17:17:12 +0800 Subject: [PATCH] chore(storage): add ut --- .../compaction/compaction_group_schedule.rs | 38 ++ .../hummock_sdk/src/compaction_group/mod.rs | 8 +- .../hummock_test/src/compactor_tests.rs | 458 +++++++++++++++++- .../hummock_test/src/sync_point_tests.rs | 2 +- 4 files changed, 492 insertions(+), 14 deletions(-) diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index 4ca0d15ed43d..b73fceae9055 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -105,6 +105,44 @@ impl HummockManager { .collect_vec(); assert!(combined_member_table_ids.is_sorted()); + // check duplicated sst_id + let mut sst_id_set = HashSet::new(); + for sst in versioning.current_version.get_sst_ids() { + if !sst_id_set.insert(sst) { + return Err(Error::CompactionGroup(format!( + "invalid merge group_1 {} group_2 {} duplicated sst_id {}", + left_group_id, right_group_id, sst + ))); + } + } + + // TODO(li0k): remove this check (Since the current split_sst does not change key_range, this check can not be removed, otherwise concate will fail.) + // check branched sst on non-overlap level + { + for level in versioning + .current_version + .get_compaction_group_levels(group_1) + .levels + .iter() + .chain( + versioning + .current_version + .get_compaction_group_levels(group_2) + .levels + .iter(), + ) + { + for sst in &level.table_infos { + if sst.sst_id != sst.object_id { + return Err(Error::CompactionGroup(format!( + "invalid merge group_1 {} group_2 {} branched sst_id {}", + left_group_id, right_group_id, sst.sst_id + ))); + } + } + } + } + let mut version = HummockVersionTransaction::new( &mut versioning.current_version, &mut versioning.hummock_version_deltas, diff --git a/src/storage/hummock_sdk/src/compaction_group/mod.rs b/src/storage/hummock_sdk/src/compaction_group/mod.rs index ba6582c2b72c..94ef89b8046e 100644 --- a/src/storage/hummock_sdk/src/compaction_group/mod.rs +++ b/src/storage/hummock_sdk/src/compaction_group/mod.rs @@ -122,8 +122,12 @@ pub mod group_split { can_concat(&left_levels.levels[idx].table_infos), "{}", format!( - "left_levels.levels[{}].table_infos: {:?} level_idx {:?}", - idx, left_levels.levels[idx].table_infos, left_levels.levels[idx].level_idx + "left-group {} right-group {} left_levels.levels[{}].table_infos: {:?} level_idx {:?}", + left_levels.group_id, + right_levels.group_id, + idx, + left_levels.levels[idx].table_infos, + left_levels.levels[idx].level_idx ) ); } diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 79b00d0f9b8f..ebdb5adb8cd1 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -75,7 +75,7 @@ pub(crate) mod tests { use risingwave_storage::hummock::value::HummockValue; use risingwave_storage::hummock::{ BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, FilterBuilder, - HummockStorage as GlobalHummockStorage, HummockStorage, MemoryLimiter, + HummockStorage as GlobalHummockStorage, HummockStorage, LocalHummockStorage, MemoryLimiter, SharedComapctorObjectIdManager, Sstable, SstableBuilder, SstableBuilderOptions, SstableIteratorReadOptions, SstableObjectIdManager, SstableWriterOptions, }; @@ -92,7 +92,7 @@ pub(crate) mod tests { hummock_meta_client: Arc, notification_client: impl NotificationClient, hummock_manager_ref: &HummockManagerRef, - table_id: TableId, + table_id: &[u32], ) -> HummockStorage { let remote_dir = "hummock_001_test".to_string(); let options = Arc::new(StorageOpts { @@ -117,7 +117,7 @@ pub(crate) mod tests { register_tables_with_id_for_test( hummock.filter_key_extractor_manager(), hummock_manager_ref, - &[table_id.table_id()], + table_id, ) .await; @@ -189,7 +189,6 @@ pub(crate) mod tests { local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); } let res = storage.seal_and_sync_epoch(epoch).await.unwrap(); - hummock_meta_client.commit_epoch(epoch, res).await.unwrap(); } } @@ -604,7 +603,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; @@ -885,7 +884,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; @@ -1090,7 +1089,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; @@ -1290,7 +1289,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; let (compact_ctx, filter_key_extractor_manager) = @@ -1505,7 +1504,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; hummock_manager_ref.get_new_sst_ids(10).await.unwrap(); @@ -1680,7 +1679,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; hummock_manager_ref.get_new_sst_ids(10).await.unwrap(); @@ -1798,7 +1797,7 @@ pub(crate) mod tests { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; hummock_manager_ref.get_new_sst_ids(10).await.unwrap(); @@ -1980,4 +1979,441 @@ pub(crate) mod tests { count += 1; } } + + #[tokio::test] + async fn test_split_and_merge() { + let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = + setup_compute_env(8080).await; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager_ref.clone(), + worker_node.id, + )); + + let table_id_1 = TableId::from(1); + let table_id_2 = TableId::from(2); + + let storage = get_hummock_storage( + hummock_meta_client.clone(), + get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), + &hummock_manager_ref, + &[table_id_1.table_id(), table_id_2.table_id()], + ) + .await; + + // basic cg2 -> [1, 2] + let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone() + { + FilterKeyExtractorManager::RpcFilterKeyExtractorManager( + rpc_filter_key_extractor_manager, + ) => rpc_filter_key_extractor_manager, + FilterKeyExtractorManager::StaticFilterKeyExtractorManager(_) => unreachable!(), + }; + + let mut key = BytesMut::default(); + key.put_u16(1); + key.put_slice(b"key_prefix"); + let key_prefix = key.freeze(); + + rpc_filter_key_extractor_manager.update( + table_id_1.table_id(), + Arc::new(FilterKeyExtractorImpl::FixedLength( + FixedLengthFilterKeyExtractor::new(TABLE_PREFIX_LEN + key_prefix.len()), + )), + ); + rpc_filter_key_extractor_manager.update( + table_id_2.table_id(), + Arc::new(FilterKeyExtractorImpl::FixedLength( + FixedLengthFilterKeyExtractor::new(TABLE_PREFIX_LEN + key_prefix.len()), + )), + ); + + let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( + rpc_filter_key_extractor_manager, + ); + let compact_ctx = get_compactor_context(&storage); + let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new( + hummock_meta_client.clone(), + storage + .storage_opts() + .clone() + .sstable_id_remote_fetch_number, + )); + + let base_epoch = Epoch::now(); + let mut epoch: u64 = base_epoch.0; + let millisec_interval_epoch: u64 = (1 << 16) * 100; + // let mut epoch_set = BTreeSet::new(); + + let mut local_1 = storage + .new_local(NewLocalOptions::for_test(table_id_1.clone())) + .await; + let mut local_2 = storage + .new_local(NewLocalOptions::for_test(table_id_2.clone())) + .await; + + let val = Bytes::from(b"0"[..].to_vec()); + + async fn write_data( + storage: &HummockStorage, + local_1: (&mut LocalHummockStorage, bool), + local_2: (&mut LocalHummockStorage, bool), + epoch: &mut u64, + val: Bytes, + kv_count: u64, + millisec_interval_epoch: u64, + key_prefix: Bytes, + hummock_meta_client: Arc, + is_init: &mut bool, + ) { + let table_id_set = + HashSet::from_iter(vec![local_1.0.table_id(), local_2.0.table_id()].into_iter()); + + storage.start_epoch(*epoch, table_id_set.clone()); + for i in 0..kv_count { + if i == 0 && *is_init { + local_1.0.init_for_test(*epoch).await.unwrap(); + local_2.0.init_for_test(*epoch).await.unwrap(); + + *is_init = false; + } + let next_epoch = *epoch + millisec_interval_epoch; + storage.start_epoch(next_epoch, table_id_set.clone()); + + let ramdom_key = + [key_prefix.as_ref(), &rand::thread_rng().gen::<[u8; 32]>()].concat(); + + if local_1.1 { + local_1 + .0 + .insert(TableKey(Bytes::from(ramdom_key.clone())), val.clone(), None) + .unwrap(); + } + local_1.0.flush().await.unwrap(); + local_1 + .0 + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); + + if local_2.1 { + local_2 + .0 + .insert(TableKey(Bytes::from(ramdom_key.clone())), val.clone(), None) + .unwrap(); + } + local_2.0.flush().await.unwrap(); + local_2 + .0 + .seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test()); + + let res = storage.seal_and_sync_epoch(*epoch).await.unwrap(); + hummock_meta_client.commit_epoch(*epoch, res).await.unwrap(); + *epoch += millisec_interval_epoch; + } + } + + let mut is_init = true; + write_data( + &storage, + (&mut local_1, true), + (&mut local_2, true), + &mut epoch, + val.clone(), + 16, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + + epoch += millisec_interval_epoch; + + let parent_group_id = 2; + let split_table_ids = vec![table_id_2.table_id()]; + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + + assert_ne!(parent_group_id, new_cg_id); + + hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .unwrap(); + + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + + { + // compact left group + let manual_compcation_option = ManualCompactionOption { + level: 0, + ..Default::default() + }; + // 2. get compact task + let mut compact_task = hummock_manager_ref + .manual_get_compact_task(parent_group_id, manual_compcation_option) + .await + .unwrap() + .unwrap(); + + let compaction_filter_flag = + CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; + compact_task.compaction_filter_mask = compaction_filter_flag.bits(); + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch(); + + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let ((result_task, task_stats), _) = compact( + compact_ctx.clone(), + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await; + + hummock_manager_ref + .report_compact_task( + result_task.task_id, + result_task.task_status, + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) + .await + .unwrap(); + } + + { + // compact right group + let manual_compcation_option = ManualCompactionOption { + level: 0, + ..Default::default() + }; + // 2. get compact task + let mut compact_task = hummock_manager_ref + .manual_get_compact_task(new_cg_id, manual_compcation_option) + .await + .unwrap() + .unwrap(); + let compaction_filter_flag = + CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; + compact_task.compaction_filter_mask = compaction_filter_flag.bits(); + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch(); + + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let ((result_task, task_stats), _) = compact( + compact_ctx.clone(), + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await; + + hummock_manager_ref + .report_compact_task( + result_task.task_id, + result_task.task_status, + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) + .await + .unwrap(); + } + + // write left + write_data( + &storage, + (&mut local_1, true), + (&mut local_2, false), + &mut epoch, + val.clone(), + 16, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + + epoch += millisec_interval_epoch; + + // try merge + hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await + .unwrap(); + + // compact + { + // compact left group + let manual_compcation_option = ManualCompactionOption { + level: 0, + ..Default::default() + }; + // 2. get compact task + let mut compact_task = hummock_manager_ref + .manual_get_compact_task(parent_group_id, manual_compcation_option) + .await + .unwrap() + .unwrap(); + let compaction_filter_flag = + CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; + compact_task.compaction_filter_mask = compaction_filter_flag.bits(); + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch(); + + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let ((result_task, task_stats), _) = compact( + compact_ctx.clone(), + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await; + + hummock_manager_ref + .report_compact_task( + result_task.task_id, + result_task.task_status, + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) + .await + .unwrap(); + } + + // try split + let new_cg_id = hummock_manager_ref + .split_compaction_group(parent_group_id, &split_table_ids, 0) + .await + .unwrap(); + + // write right + write_data( + &storage, + (&mut local_1, false), + (&mut local_2, true), + &mut epoch, + val.clone(), + 16, + millisec_interval_epoch, + key_prefix.clone(), + hummock_meta_client.clone(), + &mut is_init, + ) + .await; + + let ret_err = hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await; + assert!(ret_err.is_err()); + + // try compact + { + // compact left + loop { + let compact_task = hummock_manager_ref + .get_compact_task(parent_group_id, &mut default_compaction_selector()) + .await + .unwrap(); + + if compact_task.is_none() { + break; + } + + let mut compact_task = compact_task.unwrap(); + let compaction_filter_flag = + CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; + compact_task.compaction_filter_mask = compaction_filter_flag.bits(); + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch(); + + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let ((result_task, task_stats), _) = compact( + compact_ctx.clone(), + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await; + + hummock_manager_ref + .report_compact_task( + result_task.task_id, + result_task.task_status, + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) + .await + .unwrap(); + } + + // compact right + loop { + let compact_task = hummock_manager_ref + .get_compact_task(new_cg_id, &mut default_compaction_selector()) + .await + .unwrap(); + + if compact_task.is_none() { + break; + } + + let mut compact_task = compact_task.unwrap(); + let compaction_filter_flag = + CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL; + compact_task.compaction_filter_mask = compaction_filter_flag.bits(); + compact_task.current_epoch_time = hummock_manager_ref + .get_current_version() + .await + .max_committed_epoch(); + + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let ((result_task, task_stats), _) = compact( + compact_ctx.clone(), + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await; + + hummock_manager_ref + .report_compact_task( + result_task.task_id, + result_task.task_status, + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) + .await + .unwrap(); + } + } + + // try merge + let ret_err = hummock_manager_ref + .merge_compaction_group(parent_group_id, new_cg_id) + .await; + assert!(ret_err.is_err()); + } } diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index f5ee41783813..008c667ccedf 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -242,7 +242,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() { hummock_meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), &hummock_manager_ref, - TableId::from(existing_table_id), + &[existing_table_id], ) .await; let (compact_ctx, filter_key_extractor_manager) =