Skip to content

Commit

Permalink
fix: Fix import segments leak in segment manager (milvus-io#36602)
Browse files Browse the repository at this point in the history
Directly add import segments from the meta, eliminating the dependency
on the segment manager.

issue: milvus-io#34648

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Oct 8, 2024
1 parent ca9842e commit 1f47d55
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 304 deletions.
5 changes: 1 addition & 4 deletions internal/datacoord/import_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type importChecker struct {
broker broker.Broker
cluster Cluster
alloc allocator.Allocator
sm Manager
imeta ImportMeta
sjm StatsJobManager

Expand All @@ -58,7 +57,6 @@ func NewImportChecker(meta *meta,
broker broker.Broker,
cluster Cluster,
alloc allocator.Allocator,
sm Manager,
imeta ImportMeta,
sjm StatsJobManager,
) ImportChecker {
Expand All @@ -67,7 +65,6 @@ func NewImportChecker(meta *meta,
broker: broker,
cluster: cluster,
alloc: alloc,
sm: sm,
imeta: imeta,
sjm: sjm,
closeChan: make(chan struct{}),
Expand Down Expand Up @@ -231,7 +228,7 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {

allDiskIndex := c.meta.indexMeta.AreAllDiskIndex(job.GetCollectionID(), job.GetSchema())
groups := RegroupImportFiles(job, lacks, allDiskIndex)
newTasks, err := NewImportTasks(groups, job, c.sm, c.alloc)
newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta)
if err != nil {
logger.Warn("new import tasks failed", zap.Error(err))
return
Expand Down
3 changes: 1 addition & 2 deletions internal/datacoord/import_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,10 @@ func (s *ImportCheckerSuite) SetupTest() {
s.NoError(err)

broker := broker2.NewMockBroker(s.T())
sm := NewMockManager(s.T())

sjm := NewMockStatsJobManager(s.T())

checker := NewImportChecker(meta, broker, cluster, s.alloc, sm, imeta, sjm).(*importChecker)
checker := NewImportChecker(meta, broker, cluster, s.alloc, imeta, sjm).(*importChecker)
s.checker = checker

job := &importJob{
Expand Down
67 changes: 61 additions & 6 deletions internal/datacoord/import_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"context"
"fmt"
"math"
"path"
"sort"
"time"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
Expand Down Expand Up @@ -79,9 +82,7 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile,
}

func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
job ImportJob,
manager Manager,
alloc allocator.Allocator,
job ImportJob, alloc allocator.Allocator, meta *meta,
) ([]ImportTask, error) {
idBegin, _, err := alloc.AllocN(int64(len(fileGroups)))
if err != nil {
Expand All @@ -99,7 +100,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
FileStats: group,
},
}
segments, err := AssignSegments(job, task, manager)
segments, err := AssignSegments(job, task, alloc, meta)
if err != nil {
return nil, err
}
Expand All @@ -117,7 +118,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
return tasks, nil
}

func AssignSegments(job ImportJob, task ImportTask, manager Manager) ([]int64, error) {
func AssignSegments(job ImportJob, task ImportTask, alloc allocator.Allocator, meta *meta) ([]int64, error) {
// merge hashed sizes
hashedDataSize := make(map[string]map[int64]int64) // vchannel->(partitionID->size)
for _, fileStats := range task.GetFileStats() {
Expand Down Expand Up @@ -148,7 +149,8 @@ func AssignSegments(job ImportJob, task ImportTask, manager Manager) ([]int64, e
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for size > 0 {
segmentInfo, err := manager.AllocImportSegment(ctx, task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel, segmentLevel)
segmentInfo, err := AllocImportSegment(ctx, alloc, meta,
task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel, segmentLevel)
if err != nil {
return err
}
Expand All @@ -169,6 +171,59 @@ func AssignSegments(job ImportJob, task ImportTask, manager Manager) ([]int64, e
return segments, nil
}

func AllocImportSegment(ctx context.Context,
alloc allocator.Allocator,
meta *meta,
taskID int64, collectionID UniqueID,
partitionID UniqueID,
channelName string,
level datapb.SegmentLevel,
) (*SegmentInfo, error) {
log := log.Ctx(ctx)
id, err := alloc.AllocID(ctx)
if err != nil {
log.Error("failed to alloc id for import segment", zap.Error(err))
return nil, err
}
ts, err := alloc.AllocTimestamp(ctx)
if err != nil {
return nil, err
}
position := &msgpb.MsgPosition{
ChannelName: channelName,
MsgID: nil,
Timestamp: ts,
}

segmentInfo := &datapb.SegmentInfo{
ID: id,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channelName,
NumOfRows: 0,
State: commonpb.SegmentState_Importing,
MaxRowNum: 0,
Level: level,
LastExpireTime: math.MaxUint64,
StartPosition: position,
DmlPosition: position,
}
segmentInfo.IsImporting = true
segment := NewSegmentInfo(segmentInfo)
if err = meta.AddSegment(ctx, segment); err != nil {
log.Error("failed to add import segment", zap.Error(err))
return nil, err
}
log.Info("add import segment done",
zap.Int64("taskID", taskID),
zap.Int64("collectionID", segmentInfo.CollectionID),
zap.Int64("segmentID", segmentInfo.ID),
zap.String("channel", segmentInfo.InsertChannel),
zap.String("level", level.String()))

return segment, nil
}

func AssemblePreImportRequest(task ImportTask, job ImportJob) *datapb.PreImportRequest {
importFiles := lo.Map(task.(*preImportTask).GetFileStats(),
func(fileStats *datapb.ImportFileStats, _ int) *internalpb.ImportFile {
Expand Down
33 changes: 18 additions & 15 deletions internal/datacoord/import_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,24 @@ func TestImportUtil_NewImportTasks(t *testing.T) {
id := rand.Int63()
return id, id + n, nil
})
manager := NewMockManager(t)
manager.EXPECT().AllocImportSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, taskID int64, collectionID int64, partitionID int64, vchannel string, level datapb.SegmentLevel) (*SegmentInfo, error) {
return &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: rand.Int63(),
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: vchannel,
IsImporting: true,
Level: level,
},
}, nil
})
tasks, err := NewImportTasks(fileGroups, job, manager, alloc)
alloc.EXPECT().AllocID(mock.Anything).Return(rand.Int63(), nil)
alloc.EXPECT().AllocTimestamp(mock.Anything).Return(rand.Uint64(), nil)

catalog := mocks.NewDataCoordCatalog(t)
catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil)
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListCompactionTask(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListStatsTasks(mock.Anything).Return(nil, nil)

meta, err := newMeta(context.TODO(), catalog, nil)
assert.NoError(t, err)

tasks, err := NewImportTasks(fileGroups, job, alloc, meta)
assert.NoError(t, err)
assert.Equal(t, 2, len(tasks))
for _, task := range tasks {
Expand Down
104 changes: 0 additions & 104 deletions internal/datacoord/mock_segment_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1f47d55

Please sign in to comment.