diff --git a/internal/querynodev2/segments/collection.go b/internal/querynodev2/segments/collection.go index 38ea2b2438410..561507baa91e9 100644 --- a/internal/querynodev2/segments/collection.go +++ b/internal/querynodev2/segments/collection.go @@ -127,7 +127,8 @@ func (m *collectionManager) Unref(collectionID int64, count uint32) bool { if collection, ok := m.collections[collectionID]; ok { if collection.Unref(count) == 0 { - log.Info("release collection due to ref count to 0", zap.Int64("collectionID", collectionID)) + log.Info("release collection due to ref count to 0", + zap.Int64("nodeID", paramtable.GetNodeID()), zap.Int64("collectionID", collectionID)) delete(m.collections, collectionID) DeleteCollection(collection) @@ -217,7 +218,8 @@ func (c *Collection) GetLoadType() querypb.LoadType { func (c *Collection) Ref(count uint32) uint32 { refCount := c.refCount.Add(count) - log.Debug("collection ref increment", + log.Info("collection ref increment", + zap.Int64("nodeID", paramtable.GetNodeID()), zap.Int64("collectionID", c.ID()), zap.Uint32("refCount", refCount), ) @@ -226,7 +228,8 @@ func (c *Collection) Ref(count uint32) uint32 { func (c *Collection) Unref(count uint32) uint32 { refCount := c.refCount.Sub(count) - log.Debug("collection ref decrement", + log.Info("collection ref decrement", + zap.Int64("nodeID", paramtable.GetNodeID()), zap.Int64("collectionID", c.ID()), zap.Uint32("refCount", refCount), ) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 317127603ea83..36dd9621fd272 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -369,10 +369,10 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC node.pipelineManager.Remove(req.GetChannelName()) node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing)) - node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0)) + _, sealed := node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0)) node.tSafeManager.Remove(ctx, req.GetChannelName()) - node.manager.Collection.Unref(req.GetCollectionID(), 1) + node.manager.Collection.Unref(req.GetCollectionID(), uint32(1+sealed)) } log.Info("unsubscribed channel") diff --git a/tests/integration/suite.go b/tests/integration/suite.go index 4bbecf5375765..68751b40f1b39 100644 --- a/tests/integration/suite.go +++ b/tests/integration/suite.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/util/hookutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -153,9 +154,13 @@ func (s *MiniClusterSuite) TearDownTest() { if err == nil { for idx, collectionName := range resp.GetCollectionNames() { if resp.GetInMemoryPercentages()[idx] == 100 || resp.GetQueryServiceAvailable()[idx] { - s.Cluster.Proxy.ReleaseCollection(context.Background(), &milvuspb.ReleaseCollectionRequest{ + status, err := s.Cluster.Proxy.ReleaseCollection(context.Background(), &milvuspb.ReleaseCollectionRequest{ CollectionName: collectionName, }) + err = merr.CheckRPCCall(status, err) + s.NoError(err) + collectionID := resp.GetCollectionIds()[idx] + s.CheckCollectionCacheReleased(collectionID) } } } diff --git a/tests/integration/util_query.go b/tests/integration/util_query.go index f43b8a8789fab..d9a6b1919cb2f 100644 --- a/tests/integration/util_query.go +++ b/tests/integration/util_query.go @@ -31,6 +31,8 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/testutils" ) @@ -99,6 +101,36 @@ func (s *MiniClusterSuite) WaitForLoadRefresh(ctx context.Context, dbName, colle } } +// CheckCollectionCacheReleased checks if the collection cache was released from querynodes. +func (s *MiniClusterSuite) CheckCollectionCacheReleased(collectionID int64) { + for _, qn := range s.Cluster.GetAllQueryNodes() { + s.Eventually(func() bool { + state, err := qn.GetComponentStates(context.Background(), &milvuspb.GetComponentStatesRequest{}) + s.NoError(err) + if state.GetState().GetStateCode() != commonpb.StateCode_Healthy { + // skip checking stopping/stopped node + return true + } + req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + s.NoError(err) + resp, err := qn.GetQueryNode().GetMetrics(context.Background(), req) + err = merr.CheckRPCCall(resp.GetStatus(), err) + s.NoError(err) + infos := metricsinfo.QueryNodeInfos{} + err = metricsinfo.UnmarshalComponentInfos(resp.Response, &infos) + s.NoError(err) + for _, id := range infos.QuotaMetrics.Effect.CollectionIDs { + if id == collectionID { + s.T().Logf("collection %d was not released in querynode %d", collectionID, qn.GetQueryNode().GetNodeID()) + return false + } + } + s.T().Logf("collection %d has been released from querynode %d", collectionID, qn.GetQueryNode().GetNodeID()) + return true + }, 3*time.Minute, 200*time.Millisecond) + } +} + func ConstructSearchRequest( dbName, collectionName string, expr string,