From 7f46f4c628164654e7239016ba8559f5d111e124 Mon Sep 17 00:00:00 2001 From: wayblink Date: Sun, 19 Nov 2023 18:28:21 +0800 Subject: [PATCH] fix: failed to release collection with more than 128 partitions (#28446) #28343 Signed-off-by: wayblink --- .../metastore/kv/querycoord/kv_catalog.go | 33 ++++++++++++++----- .../kv/querycoord/kv_catalog_test.go | 17 ++++++++++ 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/internal/metastore/kv/querycoord/kv_catalog.go b/internal/metastore/kv/querycoord/kv_catalog.go index eb1dce1e74349..372d4cccd9356 100644 --- a/internal/metastore/kv/querycoord/kv_catalog.go +++ b/internal/metastore/kv/querycoord/kv_catalog.go @@ -21,6 +21,8 @@ const ( CollectionMetaPrefixV1 = "queryCoord-collectionMeta" ReplicaMetaPrefixV1 = "queryCoord-ReplicaMeta" ResourceGroupPrefix = "queryCoord-ResourceGroup" + + MetaOpsBatchSize = 128 ) type Catalog struct { @@ -195,27 +197,42 @@ func (s Catalog) ReleaseCollection(collection int64) error { if err != nil { return err } - partitions := make([]*querypb.PartitionLoadInfo, 0) + partitionIDs := make([]int64, 0) for _, v := range values { info := querypb.PartitionLoadInfo{} if err = proto.Unmarshal([]byte(v), &info); err != nil { return err } - partitions = append(partitions, &info) + partitionIDs = append(partitionIDs, (&info).GetPartitionID()) } // remove collection and obtained partitions - keys := lo.Map(partitions, func(partition *querypb.PartitionLoadInfo, _ int) string { - return EncodePartitionLoadInfoKey(collection, partition.GetPartitionID()) - }) - k := EncodeCollectionLoadInfoKey(collection) - keys = append(keys, k) - return s.cli.MultiRemove(keys) + collectionKey := EncodeCollectionLoadInfoKey(collection) + err = s.cli.Remove(collectionKey) + if err != nil { + return err + } + return s.ReleasePartition(collection, partitionIDs...) } func (s Catalog) ReleasePartition(collection int64, partitions ...int64) error { keys := lo.Map(partitions, func(partition int64, _ int) string { return EncodePartitionLoadInfoKey(collection, partition) }) + if len(partitions) >= MetaOpsBatchSize { + index := 0 + for index < len(partitions) { + endIndex := index + MetaOpsBatchSize + if endIndex > len(partitions) { + endIndex = len(partitions) + } + err := s.cli.MultiRemove(keys[index:endIndex]) + if err != nil { + return err + } + index = endIndex + } + return nil + } return s.cli.MultiRemove(keys) } diff --git a/internal/metastore/kv/querycoord/kv_catalog_test.go b/internal/metastore/kv/querycoord/kv_catalog_test.go index 862350ca1b7e5..4a377603212a0 100644 --- a/internal/metastore/kv/querycoord/kv_catalog_test.go +++ b/internal/metastore/kv/querycoord/kv_catalog_test.go @@ -121,6 +121,23 @@ func (suite *CatalogTestSuite) TestPartition() { suite.Len(partitions, 1) } +func (suite *CatalogTestSuite) TestReleaseManyPartitions() { + partitionIds := make([]int64, 0) + for i := 1; i <= 150; i++ { + suite.catalog.SavePartition(&querypb.PartitionLoadInfo{ + CollectionID: 1, + PartitionID: int64(i), + }) + partitionIds = append(partitionIds, int64(i)) + } + + err := suite.catalog.ReleasePartition(1, partitionIds...) + suite.NoError(err) + partitions, err := suite.catalog.GetPartitions() + suite.NoError(err) + suite.Len(partitions, 0) +} + func (suite *CatalogTestSuite) TestReplica() { suite.catalog.SaveReplica(&querypb.Replica{ CollectionID: 1,