From 3007d6809161e24aa5018aa64b41cecf9cd57eb4 Mon Sep 17 00:00:00 2001 From: chasingegg Date: Fri, 21 Jun 2024 15:09:32 +0800 Subject: [PATCH] Fix centroids file not removed when data skew in major compaction Signed-off-by: chasingegg --- .../core/src/clustering/KmeansClustering.cpp | 2 + .../core/unittest/test_kmeans_clustering.cpp | 52 +++++++++++++------ 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/internal/core/src/clustering/KmeansClustering.cpp b/internal/core/src/clustering/KmeansClustering.cpp index 4c88a058d1378..39f43fd64701b 100644 --- a/internal/core/src/clustering/KmeansClustering.cpp +++ b/internal/core/src/clustering/KmeansClustering.cpp @@ -326,6 +326,8 @@ KmeansClustering::StreamingAssignandUpload( if (IsDataSkew(config, dim, num_vectors_each_centroid)) { LOG_INFO(msg_header_ + "data skew! skip clustering"); // remove uploaded files + remote_paths_to_size[cluster_result_.centroid_path] = + cluster_result_.centroid_file_size; RemoveClusteringResultFiles(file_manager_->GetChunkManager().get(), remote_paths_to_size); // skip clustering, nothing takes affect diff --git a/internal/core/unittest/test_kmeans_clustering.cpp b/internal/core/unittest/test_kmeans_clustering.cpp index 5cf35ef65dd6e..45a5eb29501d6 100644 --- a/internal/core/unittest/test_kmeans_clustering.cpp +++ b/internal/core/unittest/test_kmeans_clustering.cpp @@ -78,10 +78,35 @@ transforConfigToPB(const Config& config) { return analyze_info; } +// when we skip clustering, nothing uploaded +template +void +CheckResultEmpty(const milvus::clustering::KmeansClusteringPtr& clusteringJob, + const milvus::storage::ChunkManagerPtr cm, + int64_t segment_id, + int64_t segment_id2) { + std::string centroids_path_prefix = + clusteringJob->GetRemoteCentroidsObjectPrefix(); + std::string centroid_path = + centroids_path_prefix + "/" + std::string(CENTROIDS_NAME); + ASSERT_FALSE(cm->Exist(centroid_path)); + std::string offset_mapping_name = std::string(OFFSET_MAPPING_NAME); + std::string centroid_id_mapping_path = + clusteringJob->GetRemoteCentroidIdMappingObjectPrefix(segment_id) + + "/" + offset_mapping_name; + milvus::proto::clustering::ClusteringCentroidIdMappingStats mapping_stats; + std::string centroid_id_mapping_path2 = + clusteringJob->GetRemoteCentroidIdMappingObjectPrefix(segment_id2) + + "/" + offset_mapping_name; + ASSERT_FALSE(cm->Exist(centroid_id_mapping_path)); + ASSERT_FALSE(cm->Exist(centroid_id_mapping_path2)); +} + template void CheckResultCorrectness( const milvus::clustering::KmeansClusteringPtr& clusteringJob, + const milvus::storage::ChunkManagerPtr cm, int64_t segment_id, int64_t segment_id2, int64_t dim, @@ -137,6 +162,10 @@ CheckResultCorrectness( ASSERT_EQ(mapping_stats2.num_in_centroid(i), num_in_centroid[i]); } } + // remove files + cm->Remove(centroid_path); + cm->Remove(centroid_id_mapping_path); + cm->Remove(centroid_id_mapping_path2); } template @@ -196,7 +225,7 @@ test_run() { Config config; config["max_cluster_ratio"] = 10.0; config["max_cluster_size"] = 5L * 1024 * 1024 * 1024; - + auto clusteringJob = std::make_unique(ctx); // no need to sample train data { config["min_cluster_ratio"] = 0.01; @@ -205,10 +234,9 @@ test_run() { config["train_size"] = 25L * 1024 * 1024 * 1024; // 25GB config["dim"] = dim; config["num_rows"] = num_rows; - auto clusteringJob = - std::make_unique(ctx); clusteringJob->Run(transforConfigToPB(config)); CheckResultCorrectness(clusteringJob, + cm, segment_id, segment_id2, dim, @@ -223,10 +251,9 @@ test_run() { config["train_size"] = 25L * 1024 * 1024 * 1024; // 25GB config["dim"] = dim; config["num_rows"] = num_rows; - auto clusteringJob = - std::make_unique(ctx); clusteringJob->Run(transforConfigToPB(config)); CheckResultCorrectness(clusteringJob, + cm, segment_id, segment_id2, dim, @@ -244,11 +271,11 @@ test_run() { config["train_size"] = 25L * 1024 * 1024 * 1024; // 25GB config["dim"] = dim; config["num_rows"] = num_rows; - auto clusteringJob = - std::make_unique(ctx); clusteringJob->Run(transforConfigToPB(config)); } catch (SegcoreError& e) { ASSERT_EQ(e.get_error_code(), ErrorCode::ClusterSkip); + CheckResultEmpty(clusteringJob, cm, segment_id, segment_id2); + throw e; }, SegcoreError); @@ -264,11 +291,10 @@ test_run() { config["train_size"] = 25L * 1024 * 1024 * 1024; // 25GB config["dim"] = dim; config["num_rows"] = num_rows; - auto clusteringJob = - std::make_unique(ctx); clusteringJob->Run(transforConfigToPB(config)); } catch (SegcoreError& e) { ASSERT_EQ(e.get_error_code(), ErrorCode::ClusterSkip); + CheckResultEmpty(clusteringJob, cm, segment_id, segment_id2); throw e; }, SegcoreError); @@ -282,11 +308,9 @@ test_run() { config["train_size"] = 1536L * 1024; // 1.5MB config["dim"] = dim; config["num_rows"] = num_rows; - auto clusteringJob = - std::make_unique(ctx); - clusteringJob->Run(transforConfigToPB(config)); CheckResultCorrectness(clusteringJob, + cm, segment_id, segment_id2, dim, @@ -302,11 +326,9 @@ test_run() { config["train_size"] = 6L * 1024 * 1024; // 6MB config["dim"] = dim; config["num_rows"] = num_rows; - auto clusteringJob = - std::make_unique(ctx); - clusteringJob->Run(transforConfigToPB(config)); CheckResultCorrectness(clusteringJob, + cm, segment_id, segment_id2, dim,