Skip to content

Commit

Permalink
fix: Fix collection leak in querynode (#37061)
Browse files Browse the repository at this point in the history
Unref the removed L0 segment count.

issue: #36918

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Oct 25, 2024
1 parent 139f4e5 commit ed37c27
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 6 deletions.
9 changes: 6 additions & 3 deletions internal/querynodev2/segments/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func (m *collectionManager) Unref(collectionID int64, count uint32) bool {

if collection, ok := m.collections[collectionID]; ok {
if collection.Unref(count) == 0 {
log.Info("release collection due to ref count to 0", zap.Int64("collectionID", collectionID))
log.Info("release collection due to ref count to 0",
zap.Int64("nodeID", paramtable.GetNodeID()), zap.Int64("collectionID", collectionID))
delete(m.collections, collectionID)
DeleteCollection(collection)

Expand Down Expand Up @@ -217,7 +218,8 @@ func (c *Collection) GetLoadType() querypb.LoadType {

func (c *Collection) Ref(count uint32) uint32 {
refCount := c.refCount.Add(count)
log.Debug("collection ref increment",
log.Info("collection ref increment",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Int64("collectionID", c.ID()),
zap.Uint32("refCount", refCount),
)
Expand All @@ -226,7 +228,8 @@ func (c *Collection) Ref(count uint32) uint32 {

func (c *Collection) Unref(count uint32) uint32 {
refCount := c.refCount.Sub(count)
log.Debug("collection ref decrement",
log.Info("collection ref decrement",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Int64("collectionID", c.ID()),
zap.Uint32("refCount", refCount),
)
Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,10 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC

node.pipelineManager.Remove(req.GetChannelName())
node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing))
node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0))
_, sealed := node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0))
node.tSafeManager.Remove(ctx, req.GetChannelName())

node.manager.Collection.Unref(req.GetCollectionID(), 1)
node.manager.Collection.Unref(req.GetCollectionID(), uint32(1+sealed))
}
log.Info("unsubscribed channel")

Expand Down
7 changes: 6 additions & 1 deletion tests/integration/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand Down Expand Up @@ -153,9 +154,13 @@ func (s *MiniClusterSuite) TearDownTest() {
if err == nil {
for idx, collectionName := range resp.GetCollectionNames() {
if resp.GetInMemoryPercentages()[idx] == 100 || resp.GetQueryServiceAvailable()[idx] {
s.Cluster.Proxy.ReleaseCollection(context.Background(), &milvuspb.ReleaseCollectionRequest{
status, err := s.Cluster.Proxy.ReleaseCollection(context.Background(), &milvuspb.ReleaseCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(status, err)
s.NoError(err)
collectionID := resp.GetCollectionIds()[idx]
s.CheckCollectionCacheReleased(collectionID)
}
}
}
Expand Down
32 changes: 32 additions & 0 deletions tests/integration/util_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/testutils"
)

Expand Down Expand Up @@ -99,6 +101,36 @@ func (s *MiniClusterSuite) WaitForLoadRefresh(ctx context.Context, dbName, colle
}
}

// CheckCollectionCacheReleased checks if the collection cache was released from querynodes.
func (s *MiniClusterSuite) CheckCollectionCacheReleased(collectionID int64) {
for _, qn := range s.Cluster.GetAllQueryNodes() {
s.Eventually(func() bool {
state, err := qn.GetComponentStates(context.Background(), &milvuspb.GetComponentStatesRequest{})
s.NoError(err)
if state.GetState().GetStateCode() != commonpb.StateCode_Healthy {
// skip checking stopping/stopped node
return true
}
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
s.NoError(err)
resp, err := qn.GetQueryNode().GetMetrics(context.Background(), req)
err = merr.CheckRPCCall(resp.GetStatus(), err)
s.NoError(err)
infos := metricsinfo.QueryNodeInfos{}
err = metricsinfo.UnmarshalComponentInfos(resp.Response, &infos)
s.NoError(err)
for _, id := range infos.QuotaMetrics.Effect.CollectionIDs {
if id == collectionID {
s.T().Logf("collection %d was not released in querynode %d", collectionID, qn.GetQueryNode().GetNodeID())
return false
}
}
s.T().Logf("collection %d has been released from querynode %d", collectionID, qn.GetQueryNode().GetNodeID())
return true
}, 3*time.Minute, 200*time.Millisecond)
}
}

func ConstructSearchRequest(
dbName, collectionName string,
expr string,
Expand Down

0 comments on commit ed37c27

Please sign in to comment.