From 1f47d5510b27e8a0a611525efa7ae432150cb2ba Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Tue, 8 Oct 2024 10:11:22 +0800 Subject: [PATCH] fix: Fix import segments leak in segment manager (#36602) Directly add import segments from the meta, eliminating the dependency on the segment manager. issue: https://github.com/milvus-io/milvus/issues/34648 --------- Signed-off-by: bigsheeper --- internal/datacoord/import_checker.go | 5 +- internal/datacoord/import_checker_test.go | 3 +- internal/datacoord/import_util.go | 67 +++++++++++-- internal/datacoord/import_util_test.go | 33 ++++--- internal/datacoord/mock_segment_manager.go | 104 -------------------- internal/datacoord/segment_manager.go | 109 --------------------- internal/datacoord/segment_manager_test.go | 59 ----------- internal/datacoord/server.go | 2 +- internal/datacoord/server_test.go | 4 - 9 files changed, 82 insertions(+), 304 deletions(-) diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 99bfd4e75ac8a..73dd166616657 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -46,7 +46,6 @@ type importChecker struct { broker broker.Broker cluster Cluster alloc allocator.Allocator - sm Manager imeta ImportMeta sjm StatsJobManager @@ -58,7 +57,6 @@ func NewImportChecker(meta *meta, broker broker.Broker, cluster Cluster, alloc allocator.Allocator, - sm Manager, imeta ImportMeta, sjm StatsJobManager, ) ImportChecker { @@ -67,7 +65,6 @@ func NewImportChecker(meta *meta, broker: broker, cluster: cluster, alloc: alloc, - sm: sm, imeta: imeta, sjm: sjm, closeChan: make(chan struct{}), @@ -231,7 +228,7 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { allDiskIndex := c.meta.indexMeta.AreAllDiskIndex(job.GetCollectionID(), job.GetSchema()) groups := RegroupImportFiles(job, lacks, allDiskIndex) - newTasks, err := NewImportTasks(groups, job, c.sm, c.alloc) + newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta) if err != nil { logger.Warn("new import tasks failed", zap.Error(err)) return diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index 714c6a2dc5bb1..d7978821af127 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -71,11 +71,10 @@ func (s *ImportCheckerSuite) SetupTest() { s.NoError(err) broker := broker2.NewMockBroker(s.T()) - sm := NewMockManager(s.T()) sjm := NewMockStatsJobManager(s.T()) - checker := NewImportChecker(meta, broker, cluster, s.alloc, sm, imeta, sjm).(*importChecker) + checker := NewImportChecker(meta, broker, cluster, s.alloc, imeta, sjm).(*importChecker) s.checker = checker job := &importJob{ diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 3408744fe4bcb..1506ad8498475 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -19,6 +19,7 @@ package datacoord import ( "context" "fmt" + "math" "path" "sort" "time" @@ -27,6 +28,8 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -79,9 +82,7 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile, } func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, - job ImportJob, - manager Manager, - alloc allocator.Allocator, + job ImportJob, alloc allocator.Allocator, meta *meta, ) ([]ImportTask, error) { idBegin, _, err := alloc.AllocN(int64(len(fileGroups))) if err != nil { @@ -99,7 +100,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, FileStats: group, }, } - segments, err := AssignSegments(job, task, manager) + segments, err := AssignSegments(job, task, alloc, meta) if err != nil { return nil, err } @@ -117,7 +118,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, return tasks, nil } -func AssignSegments(job ImportJob, task ImportTask, manager Manager) ([]int64, error) { +func AssignSegments(job ImportJob, task ImportTask, alloc allocator.Allocator, meta *meta) ([]int64, error) { // merge hashed sizes hashedDataSize := make(map[string]map[int64]int64) // vchannel->(partitionID->size) for _, fileStats := range task.GetFileStats() { @@ -148,7 +149,8 @@ func AssignSegments(job ImportJob, task ImportTask, manager Manager) ([]int64, e ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for size > 0 { - segmentInfo, err := manager.AllocImportSegment(ctx, task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel, segmentLevel) + segmentInfo, err := AllocImportSegment(ctx, alloc, meta, + task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel, segmentLevel) if err != nil { return err } @@ -169,6 +171,59 @@ func AssignSegments(job ImportJob, task ImportTask, manager Manager) ([]int64, e return segments, nil } +func AllocImportSegment(ctx context.Context, + alloc allocator.Allocator, + meta *meta, + taskID int64, collectionID UniqueID, + partitionID UniqueID, + channelName string, + level datapb.SegmentLevel, +) (*SegmentInfo, error) { + log := log.Ctx(ctx) + id, err := alloc.AllocID(ctx) + if err != nil { + log.Error("failed to alloc id for import segment", zap.Error(err)) + return nil, err + } + ts, err := alloc.AllocTimestamp(ctx) + if err != nil { + return nil, err + } + position := &msgpb.MsgPosition{ + ChannelName: channelName, + MsgID: nil, + Timestamp: ts, + } + + segmentInfo := &datapb.SegmentInfo{ + ID: id, + CollectionID: collectionID, + PartitionID: partitionID, + InsertChannel: channelName, + NumOfRows: 0, + State: commonpb.SegmentState_Importing, + MaxRowNum: 0, + Level: level, + LastExpireTime: math.MaxUint64, + StartPosition: position, + DmlPosition: position, + } + segmentInfo.IsImporting = true + segment := NewSegmentInfo(segmentInfo) + if err = meta.AddSegment(ctx, segment); err != nil { + log.Error("failed to add import segment", zap.Error(err)) + return nil, err + } + log.Info("add import segment done", + zap.Int64("taskID", taskID), + zap.Int64("collectionID", segmentInfo.CollectionID), + zap.Int64("segmentID", segmentInfo.ID), + zap.String("channel", segmentInfo.InsertChannel), + zap.String("level", level.String())) + + return segment, nil +} + func AssemblePreImportRequest(task ImportTask, job ImportJob) *datapb.PreImportRequest { importFiles := lo.Map(task.(*preImportTask).GetFileStats(), func(fileStats *datapb.ImportFileStats, _ int) *internalpb.ImportFile { diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 59e27e8f50847..ecd6a53d4b12d 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -98,21 +98,24 @@ func TestImportUtil_NewImportTasks(t *testing.T) { id := rand.Int63() return id, id + n, nil }) - manager := NewMockManager(t) - manager.EXPECT().AllocImportSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - RunAndReturn(func(ctx context.Context, taskID int64, collectionID int64, partitionID int64, vchannel string, level datapb.SegmentLevel) (*SegmentInfo, error) { - return &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: rand.Int63(), - CollectionID: collectionID, - PartitionID: partitionID, - InsertChannel: vchannel, - IsImporting: true, - Level: level, - }, - }, nil - }) - tasks, err := NewImportTasks(fileGroups, job, manager, alloc) + alloc.EXPECT().AllocID(mock.Anything).Return(rand.Int63(), nil) + alloc.EXPECT().AllocTimestamp(mock.Anything).Return(rand.Uint64(), nil) + + catalog := mocks.NewDataCoordCatalog(t) + catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) + catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil) + catalog.EXPECT().ListAnalyzeTasks(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListCompactionTask(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) + catalog.EXPECT().ListStatsTasks(mock.Anything).Return(nil, nil) + + meta, err := newMeta(context.TODO(), catalog, nil) + assert.NoError(t, err) + + tasks, err := NewImportTasks(fileGroups, job, alloc, meta) assert.NoError(t, err) assert.Equal(t, 2, len(tasks)) for _, task := range tasks { diff --git a/internal/datacoord/mock_segment_manager.go b/internal/datacoord/mock_segment_manager.go index eef98768f2d94..c2df11ab171de 100644 --- a/internal/datacoord/mock_segment_manager.go +++ b/internal/datacoord/mock_segment_manager.go @@ -5,7 +5,6 @@ package datacoord import ( context "context" - datapb "github.com/milvus-io/milvus/internal/proto/datapb" mock "github.com/stretchr/testify/mock" ) @@ -22,65 +21,6 @@ func (_m *MockManager) EXPECT() *MockManager_Expecter { return &MockManager_Expecter{mock: &_m.Mock} } -// AllocImportSegment provides a mock function with given fields: ctx, taskID, collectionID, partitionID, channelName, level -func (_m *MockManager) AllocImportSegment(ctx context.Context, taskID int64, collectionID int64, partitionID int64, channelName string, level datapb.SegmentLevel) (*SegmentInfo, error) { - ret := _m.Called(ctx, taskID, collectionID, partitionID, channelName, level) - - var r0 *SegmentInfo - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, string, datapb.SegmentLevel) (*SegmentInfo, error)); ok { - return rf(ctx, taskID, collectionID, partitionID, channelName, level) - } - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, string, datapb.SegmentLevel) *SegmentInfo); ok { - r0 = rf(ctx, taskID, collectionID, partitionID, channelName, level) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*SegmentInfo) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, int64, int64, int64, string, datapb.SegmentLevel) error); ok { - r1 = rf(ctx, taskID, collectionID, partitionID, channelName, level) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockManager_AllocImportSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocImportSegment' -type MockManager_AllocImportSegment_Call struct { - *mock.Call -} - -// AllocImportSegment is a helper method to define mock.On call -// - ctx context.Context -// - taskID int64 -// - collectionID int64 -// - partitionID int64 -// - channelName string -// - level datapb.SegmentLevel -func (_e *MockManager_Expecter) AllocImportSegment(ctx interface{}, taskID interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, level interface{}) *MockManager_AllocImportSegment_Call { - return &MockManager_AllocImportSegment_Call{Call: _e.mock.On("AllocImportSegment", ctx, taskID, collectionID, partitionID, channelName, level)} -} - -func (_c *MockManager_AllocImportSegment_Call) Run(run func(ctx context.Context, taskID int64, collectionID int64, partitionID int64, channelName string, level datapb.SegmentLevel)) *MockManager_AllocImportSegment_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(int64), args[4].(string), args[5].(datapb.SegmentLevel)) - }) - return _c -} - -func (_c *MockManager_AllocImportSegment_Call) Return(_a0 *SegmentInfo, _a1 error) *MockManager_AllocImportSegment_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockManager_AllocImportSegment_Call) RunAndReturn(run func(context.Context, int64, int64, int64, string, datapb.SegmentLevel) (*SegmentInfo, error)) *MockManager_AllocImportSegment_Call { - _c.Call.Return(run) - return _c -} - // AllocNewGrowingSegment provides a mock function with given fields: ctx, collectionID, partitionID, segmentID, channelName func (_m *MockManager) AllocNewGrowingSegment(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, channelName string) (*SegmentInfo, error) { ret := _m.Called(ctx, collectionID, partitionID, segmentID, channelName) @@ -308,50 +248,6 @@ func (_c *MockManager_ExpireAllocations_Call) RunAndReturn(run func(string, uint return _c } -// FlushImportSegments provides a mock function with given fields: ctx, collectionID, segmentIDs -func (_m *MockManager) FlushImportSegments(ctx context.Context, collectionID int64, segmentIDs []int64) error { - ret := _m.Called(ctx, collectionID, segmentIDs) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int64, []int64) error); ok { - r0 = rf(ctx, collectionID, segmentIDs) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockManager_FlushImportSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushImportSegments' -type MockManager_FlushImportSegments_Call struct { - *mock.Call -} - -// FlushImportSegments is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - segmentIDs []int64 -func (_e *MockManager_Expecter) FlushImportSegments(ctx interface{}, collectionID interface{}, segmentIDs interface{}) *MockManager_FlushImportSegments_Call { - return &MockManager_FlushImportSegments_Call{Call: _e.mock.On("FlushImportSegments", ctx, collectionID, segmentIDs)} -} - -func (_c *MockManager_FlushImportSegments_Call) Run(run func(ctx context.Context, collectionID int64, segmentIDs []int64)) *MockManager_FlushImportSegments_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].([]int64)) - }) - return _c -} - -func (_c *MockManager_FlushImportSegments_Call) Return(_a0 error) *MockManager_FlushImportSegments_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockManager_FlushImportSegments_Call) RunAndReturn(run func(context.Context, int64, []int64) error) *MockManager_FlushImportSegments_Call { - _c.Call.Return(run) - return _c -} - // GetFlushableSegments provides a mock function with given fields: ctx, channel, ts func (_m *MockManager) GetFlushableSegments(ctx context.Context, channel string, ts uint64) ([]int64, error) { ret := _m.Called(ctx, channel, ts) diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 7fc3ed8353860..2b00abbb005c7 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "fmt" - "math" "sync" "time" @@ -29,7 +28,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" @@ -77,15 +75,12 @@ type Manager interface { // Deprecated: AllocSegment allocates rows and record the allocation, will be deprecated after enabling streamingnode. AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) - AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, partitionID UniqueID, channelName string, level datapb.SegmentLevel) (*SegmentInfo, error) // AllocNewGrowingSegment allocates segment for streaming node. AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error) // DropSegment drops the segment from manager. DropSegment(ctx context.Context, segmentID UniqueID) - // FlushImportSegments set importing segment state to Flushed. - FlushImportSegments(ctx context.Context, collectionID UniqueID, segmentIDs []UniqueID) error // SealAllSegments seals all segments of collection with collectionID and return sealed segments. // If segIDs is not empty, also seals segments in segIDs. SealAllSegments(ctx context.Context, collectionID UniqueID, segIDs []UniqueID) ([]UniqueID, error) @@ -369,59 +364,6 @@ func (s *SegmentManager) genExpireTs(ctx context.Context) (Timestamp, error) { return expireTs, nil } -func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, - partitionID UniqueID, channelName string, level datapb.SegmentLevel, -) (*SegmentInfo, error) { - log := log.Ctx(ctx) - ctx, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "open-Segment") - defer sp.End() - id, err := s.allocator.AllocID(ctx) - if err != nil { - log.Error("failed to open new segment while AllocID", zap.Error(err)) - return nil, err - } - ts, err := s.allocator.AllocTimestamp(ctx) - if err != nil { - return nil, err - } - position := &msgpb.MsgPosition{ - ChannelName: channelName, - MsgID: nil, - Timestamp: ts, - } - - segmentInfo := &datapb.SegmentInfo{ - ID: id, - CollectionID: collectionID, - PartitionID: partitionID, - InsertChannel: channelName, - NumOfRows: 0, - State: commonpb.SegmentState_Importing, - MaxRowNum: 0, - Level: level, - LastExpireTime: math.MaxUint64, - StartPosition: position, - DmlPosition: position, - } - segmentInfo.IsImporting = true - segment := NewSegmentInfo(segmentInfo) - if err := s.meta.AddSegment(ctx, segment); err != nil { - log.Error("failed to add import segment", zap.Error(err)) - return nil, err - } - s.mu.Lock() - defer s.mu.Unlock() - s.segments = append(s.segments, id) - log.Info("add import segment done", - zap.Int64("taskID", taskID), - zap.Int64("collectionID", segmentInfo.CollectionID), - zap.Int64("segmentID", segmentInfo.ID), - zap.String("channel", segmentInfo.InsertChannel), - zap.String("level", level.String())) - - return segment, nil -} - // AllocNewGrowingSegment allocates segment for streaming node. func (s *SegmentManager) AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error) { return s.openNewSegmentWithGivenSegmentID(ctx, collectionID, partitionID, segmentID, channelName) @@ -504,50 +446,6 @@ func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) { } } -// FlushImportSegments set importing segment state to Flushed. -func (s *SegmentManager) FlushImportSegments(ctx context.Context, collectionID UniqueID, segmentIDs []UniqueID) error { - _, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Flush-Import-Segments") - defer sp.End() - - s.mu.Lock() - defer s.mu.Unlock() - - candidates := lo.Filter(segmentIDs, func(segmentID UniqueID, _ int) bool { - info := s.meta.GetHealthySegment(segmentID) - if info == nil { - log.Warn("failed to get seg info from meta", zap.Int64("segmentID", segmentID)) - return false - } - if info.CollectionID != collectionID { - return false - } - return info.State == commonpb.SegmentState_Importing - }) - - // We set the importing segment state directly to 'Flushed' rather than - // 'Sealed' because all data has been imported, and there is no data - // in the datanode flowgraph that needs to be synced. - candidatesMap := make(map[UniqueID]struct{}) - for _, id := range candidates { - if err := s.meta.SetState(id, commonpb.SegmentState_Flushed); err != nil { - return err - } - candidatesMap[id] = struct{}{} - } - - validSegments := make(map[UniqueID]struct{}) - for _, id := range s.segments { - if _, ok := candidatesMap[id]; !ok { - validSegments[id] = struct{}{} - } - } - - // it is necessary for v2.4.x, import segments were no longer assigned by the segmentManager. - s.segments = lo.Keys(validSegments) - - return nil -} - // SealAllSegments seals all segments of collection with collectionID and return sealed segments func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID, segIDs []UniqueID) ([]UniqueID, error) { _, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Seal-Segments") @@ -643,13 +541,6 @@ func (s *SegmentManager) cleanupSealedSegment(ts Timestamp, channel string) { continue } - // clean up importing segment since the task failed. - if segment.GetState() == commonpb.SegmentState_Importing && segment.GetLastExpireTime() < ts { - log.Info("cleanup staled importing segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id)) - s.meta.SetState(id, commonpb.SegmentState_Dropped) - continue - } - valids = append(valids, id) } s.segments = valids diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 5fc1a8267c205..4624e7f77351d 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -33,7 +33,6 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" mockkv "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" - "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -262,64 +261,6 @@ func TestLastExpireReset(t *testing.T) { assert.Equal(t, segmentID3, newAlloc[0].SegmentID) // segment3 still can be used to allocate } -func TestSegmentManager_AllocImportSegment(t *testing.T) { - ctx := context.Background() - mockErr := errors.New("mock error") - - t.Run("normal case", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().AllocID(mock.Anything).Return(0, nil) - alloc.EXPECT().AllocTimestamp(mock.Anything).Return(0, nil) - meta, err := newMemoryMeta() - assert.NoError(t, err) - sm, err := newSegmentManager(meta, alloc) - assert.NoError(t, err) - - segment, err := sm.AllocImportSegment(ctx, 0, 1, 1, "ch1", datapb.SegmentLevel_L1) - assert.NoError(t, err) - segment2 := meta.GetSegment(segment.GetID()) - assert.NotNil(t, segment2) - assert.Equal(t, true, segment2.GetIsImporting()) - }) - - t.Run("alloc id failed", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().AllocID(mock.Anything).Return(0, mockErr) - meta, err := newMemoryMeta() - assert.NoError(t, err) - sm, err := newSegmentManager(meta, alloc) - assert.NoError(t, err) - _, err = sm.AllocImportSegment(ctx, 0, 1, 1, "ch1", datapb.SegmentLevel_L1) - assert.Error(t, err) - }) - - t.Run("alloc ts failed", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().AllocID(mock.Anything).Return(0, nil) - alloc.EXPECT().AllocTimestamp(mock.Anything).Return(0, mockErr) - meta, err := newMemoryMeta() - assert.NoError(t, err) - sm, err := newSegmentManager(meta, alloc) - assert.NoError(t, err) - _, err = sm.AllocImportSegment(ctx, 0, 1, 1, "ch1", datapb.SegmentLevel_L1) - assert.Error(t, err) - }) - - t.Run("add segment failed", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().AllocID(mock.Anything).Return(0, nil) - alloc.EXPECT().AllocTimestamp(mock.Anything).Return(0, nil) - meta, err := newMemoryMeta() - assert.NoError(t, err) - sm, _ := newSegmentManager(meta, alloc) - catalog := mocks.NewDataCoordCatalog(t) - catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(mockErr) - meta.catalog = catalog - _, err = sm.AllocImportSegment(ctx, 0, 1, 1, "ch1", datapb.SegmentLevel_L1) - assert.Error(t, err) - }) -} - func TestLoadSegmentsFromMeta(t *testing.T) { ctx := context.Background() paramtable.Init() diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 0c10169de9e8d..ae14db35d9edc 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -396,7 +396,7 @@ func (s *Server) initDataCoord() error { return err } s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta) - s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta, s.jobManager) + s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.importMeta, s.jobManager) s.syncSegmentsScheduler = newSyncSegmentsScheduler(s.meta, s.channelManager, s.sessionManager) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index f292911a49887..f9c4d55d23919 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -840,10 +840,6 @@ func (s *spySegmentManager) allocSegmentForImport(ctx context.Context, collectio return nil, nil } -func (s *spySegmentManager) AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, partitionID UniqueID, channelName string, level datapb.SegmentLevel) (*SegmentInfo, error) { - return nil, nil -} - // DropSegment drops the segment from manager. func (s *spySegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) { }