Skip to content

Commit

Permalink
Fix centroids file not removed when data skew in major compaction
Browse files Browse the repository at this point in the history
Signed-off-by: chasingegg <[email protected]>
  • Loading branch information
chasingegg committed Jun 21, 2024
1 parent dac20d4 commit 3007d68
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 15 deletions.
2 changes: 2 additions & 0 deletions internal/core/src/clustering/KmeansClustering.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ KmeansClustering::StreamingAssignandUpload(
if (IsDataSkew<T>(config, dim, num_vectors_each_centroid)) {
LOG_INFO(msg_header_ + "data skew! skip clustering");
// remove uploaded files
remote_paths_to_size[cluster_result_.centroid_path] =
cluster_result_.centroid_file_size;
RemoveClusteringResultFiles(file_manager_->GetChunkManager().get(),
remote_paths_to_size);
// skip clustering, nothing takes affect
Expand Down
52 changes: 37 additions & 15 deletions internal/core/unittest/test_kmeans_clustering.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,35 @@ transforConfigToPB(const Config& config) {
return analyze_info;
}

// when we skip clustering, nothing uploaded
template <typename T>
void
CheckResultEmpty(const milvus::clustering::KmeansClusteringPtr& clusteringJob,
const milvus::storage::ChunkManagerPtr cm,
int64_t segment_id,
int64_t segment_id2) {
std::string centroids_path_prefix =
clusteringJob->GetRemoteCentroidsObjectPrefix();
std::string centroid_path =
centroids_path_prefix + "/" + std::string(CENTROIDS_NAME);
ASSERT_FALSE(cm->Exist(centroid_path));
std::string offset_mapping_name = std::string(OFFSET_MAPPING_NAME);
std::string centroid_id_mapping_path =
clusteringJob->GetRemoteCentroidIdMappingObjectPrefix(segment_id) +
"/" + offset_mapping_name;
milvus::proto::clustering::ClusteringCentroidIdMappingStats mapping_stats;
std::string centroid_id_mapping_path2 =
clusteringJob->GetRemoteCentroidIdMappingObjectPrefix(segment_id2) +
"/" + offset_mapping_name;
ASSERT_FALSE(cm->Exist(centroid_id_mapping_path));
ASSERT_FALSE(cm->Exist(centroid_id_mapping_path2));
}

template <typename T>
void
CheckResultCorrectness(
const milvus::clustering::KmeansClusteringPtr& clusteringJob,
const milvus::storage::ChunkManagerPtr cm,
int64_t segment_id,
int64_t segment_id2,
int64_t dim,
Expand Down Expand Up @@ -137,6 +162,10 @@ CheckResultCorrectness(
ASSERT_EQ(mapping_stats2.num_in_centroid(i), num_in_centroid[i]);
}
}
// remove files
cm->Remove(centroid_path);
cm->Remove(centroid_id_mapping_path);
cm->Remove(centroid_id_mapping_path2);
}

template <typename T, DataType dtype>
Expand Down Expand Up @@ -196,7 +225,7 @@ test_run() {
Config config;
config["max_cluster_ratio"] = 10.0;
config["max_cluster_size"] = 5L * 1024 * 1024 * 1024;

auto clusteringJob = std::make_unique<clustering::KmeansClustering>(ctx);
// no need to sample train data
{
config["min_cluster_ratio"] = 0.01;
Expand All @@ -205,10 +234,9 @@ test_run() {
config["train_size"] = 25L * 1024 * 1024 * 1024; // 25GB
config["dim"] = dim;
config["num_rows"] = num_rows;
auto clusteringJob =
std::make_unique<clustering::KmeansClustering>(ctx);
clusteringJob->Run<T>(transforConfigToPB(config));
CheckResultCorrectness<T>(clusteringJob,
cm,
segment_id,
segment_id2,
dim,
Expand All @@ -223,10 +251,9 @@ test_run() {
config["train_size"] = 25L * 1024 * 1024 * 1024; // 25GB
config["dim"] = dim;
config["num_rows"] = num_rows;
auto clusteringJob =
std::make_unique<clustering::KmeansClustering>(ctx);
clusteringJob->Run<T>(transforConfigToPB(config));
CheckResultCorrectness<T>(clusteringJob,
cm,
segment_id,
segment_id2,
dim,
Expand All @@ -244,11 +271,11 @@ test_run() {
config["train_size"] = 25L * 1024 * 1024 * 1024; // 25GB
config["dim"] = dim;
config["num_rows"] = num_rows;
auto clusteringJob =
std::make_unique<clustering::KmeansClustering>(ctx);
clusteringJob->Run<T>(transforConfigToPB(config));
} catch (SegcoreError& e) {
ASSERT_EQ(e.get_error_code(), ErrorCode::ClusterSkip);
CheckResultEmpty<T>(clusteringJob, cm, segment_id, segment_id2);

throw e;
},
SegcoreError);
Expand All @@ -264,11 +291,10 @@ test_run() {
config["train_size"] = 25L * 1024 * 1024 * 1024; // 25GB
config["dim"] = dim;
config["num_rows"] = num_rows;
auto clusteringJob =
std::make_unique<clustering::KmeansClustering>(ctx);
clusteringJob->Run<T>(transforConfigToPB(config));
} catch (SegcoreError& e) {
ASSERT_EQ(e.get_error_code(), ErrorCode::ClusterSkip);
CheckResultEmpty<T>(clusteringJob, cm, segment_id, segment_id2);
throw e;
},
SegcoreError);
Expand All @@ -282,11 +308,9 @@ test_run() {
config["train_size"] = 1536L * 1024; // 1.5MB
config["dim"] = dim;
config["num_rows"] = num_rows;
auto clusteringJob =
std::make_unique<clustering::KmeansClustering>(ctx);

clusteringJob->Run<T>(transforConfigToPB(config));
CheckResultCorrectness<T>(clusteringJob,
cm,
segment_id,
segment_id2,
dim,
Expand All @@ -302,11 +326,9 @@ test_run() {
config["train_size"] = 6L * 1024 * 1024; // 6MB
config["dim"] = dim;
config["num_rows"] = num_rows;
auto clusteringJob =
std::make_unique<clustering::KmeansClustering>(ctx);

clusteringJob->Run<T>(transforConfigToPB(config));
CheckResultCorrectness<T>(clusteringJob,
cm,
segment_id,
segment_id2,
dim,
Expand Down

0 comments on commit 3007d68

Please sign in to comment.