diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index fe75da1639141..b08bebe382657 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -253,7 +254,7 @@ func (c *importChecker) checkImportingJob(job ImportJob) { // Verify completion of index building for imported segments. unindexed := c.meta.indexMeta.GetUnindexedSegments(job.GetCollectionID(), segmentIDs) - if Params.DataCoordCfg.WaitForIndex.GetAsBool() && len(unindexed) > 0 { + if Params.DataCoordCfg.WaitForIndex.GetAsBool() && len(unindexed) > 0 && !importutilv2.IsL0Import(job.GetOptions()) { log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed)) return } diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index d8296129dcda4..0a47101ca6d20 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -304,13 +304,13 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { if resp.GetState() == datapb.ImportTaskStateV2_Completed { for _, info := range resp.GetImportSegmentsInfo() { // try to parse path and fill logID - err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetStatslogs()) + err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetDeltalogs(), info.GetStatslogs()) if err != nil { - log.Warn("fail to CompressFieldBinlogs for import binlogs", + log.Warn("fail to CompressBinLogs for import binlogs", WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...) return } - op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), nil) + op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs()) op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed) err = s.meta.UpdateSegmentsInfo(op1, op2) if err != nil { diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 65a44424b6965..95abf147f3c86 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -96,7 +97,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, FileStats: group, }, } - segments, err := AssignSegments(task, manager) + segments, err := AssignSegments(job, task, manager) if err != nil { return nil, err } @@ -106,7 +107,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, return tasks, nil } -func AssignSegments(task ImportTask, manager Manager) ([]int64, error) { +func AssignSegments(job ImportJob, task ImportTask, manager Manager) ([]int64, error) { // merge hashed sizes hashedDataSize := make(map[string]map[int64]int64) // vchannel->(partitionID->size) for _, fileStats := range task.GetFileStats() { @@ -120,7 +121,16 @@ func AssignSegments(task ImportTask, manager Manager) ([]int64, error) { } } + isL0Import := importutilv2.IsL0Import(job.GetOptions()) + segmentMaxSize := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 + if isL0Import { + segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() + } + segmentLevel := datapb.SegmentLevel_L1 + if isL0Import { + segmentLevel = datapb.SegmentLevel_L0 + } // alloc new segments segments := make([]int64, 0) @@ -128,7 +138,7 @@ func AssignSegments(task ImportTask, manager Manager) ([]int64, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for size > 0 { - segmentInfo, err := manager.AllocImportSegment(ctx, task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel) + segmentInfo, err := manager.AllocImportSegment(ctx, task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel, segmentLevel) if err != nil { return err } @@ -215,7 +225,12 @@ func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats) [][]*dat return nil } + isL0Import := importutilv2.IsL0Import(job.GetOptions()) segmentMaxSize := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt() * 1024 * 1024 + if isL0Import { + segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt() + } + threshold := paramtable.Get().DataCoordCfg.MaxSizeInMBPerImportTask.GetAsInt() * 1024 * 1024 maxSizePerFileGroup := segmentMaxSize * len(job.GetPartitionIDs()) * len(job.GetVchannels()) if maxSizePerFileGroup > threshold { diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 415880ba3d266..18bcbaadedaa3 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -96,8 +96,8 @@ func TestImportUtil_NewImportTasks(t *testing.T) { return id, id + n, nil }) manager := NewMockManager(t) - manager.EXPECT().AllocImportSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - RunAndReturn(func(ctx context.Context, taskID int64, collectionID int64, partitionID int64, vchannel string) (*SegmentInfo, error) { + manager.EXPECT().AllocImportSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, taskID int64, collectionID int64, partitionID int64, vchannel string, level datapb.SegmentLevel) (*SegmentInfo, error) { return &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: rand.Int63(), @@ -105,6 +105,7 @@ func TestImportUtil_NewImportTasks(t *testing.T) { PartitionID: partitionID, InsertChannel: vchannel, IsImporting: true, + Level: level, }, }, nil }) diff --git a/internal/datacoord/mock_segment_manager.go b/internal/datacoord/mock_segment_manager.go index 5e03a770e63d7..da2930b2660d6 100644 --- a/internal/datacoord/mock_segment_manager.go +++ b/internal/datacoord/mock_segment_manager.go @@ -5,6 +5,7 @@ package datacoord import ( context "context" + datapb "github.com/milvus-io/milvus/internal/proto/datapb" mock "github.com/stretchr/testify/mock" ) @@ -21,25 +22,25 @@ func (_m *MockManager) EXPECT() *MockManager_Expecter { return &MockManager_Expecter{mock: &_m.Mock} } -// AllocImportSegment provides a mock function with given fields: ctx, taskID, collectionID, partitionID, channelName -func (_m *MockManager) AllocImportSegment(ctx context.Context, taskID int64, collectionID int64, partitionID int64, channelName string) (*SegmentInfo, error) { - ret := _m.Called(ctx, taskID, collectionID, partitionID, channelName) +// AllocImportSegment provides a mock function with given fields: ctx, taskID, collectionID, partitionID, channelName, level +func (_m *MockManager) AllocImportSegment(ctx context.Context, taskID int64, collectionID int64, partitionID int64, channelName string, level datapb.SegmentLevel) (*SegmentInfo, error) { + ret := _m.Called(ctx, taskID, collectionID, partitionID, channelName, level) var r0 *SegmentInfo var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, string) (*SegmentInfo, error)); ok { - return rf(ctx, taskID, collectionID, partitionID, channelName) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, string, datapb.SegmentLevel) (*SegmentInfo, error)); ok { + return rf(ctx, taskID, collectionID, partitionID, channelName, level) } - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, string) *SegmentInfo); ok { - r0 = rf(ctx, taskID, collectionID, partitionID, channelName) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, string, datapb.SegmentLevel) *SegmentInfo); ok { + r0 = rf(ctx, taskID, collectionID, partitionID, channelName, level) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*SegmentInfo) } } - if rf, ok := ret.Get(1).(func(context.Context, int64, int64, int64, string) error); ok { - r1 = rf(ctx, taskID, collectionID, partitionID, channelName) + if rf, ok := ret.Get(1).(func(context.Context, int64, int64, int64, string, datapb.SegmentLevel) error); ok { + r1 = rf(ctx, taskID, collectionID, partitionID, channelName, level) } else { r1 = ret.Error(1) } @@ -53,18 +54,19 @@ type MockManager_AllocImportSegment_Call struct { } // AllocImportSegment is a helper method to define mock.On call -// - ctx context.Context -// - taskID int64 -// - collectionID int64 -// - partitionID int64 -// - channelName string -func (_e *MockManager_Expecter) AllocImportSegment(ctx interface{}, taskID interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}) *MockManager_AllocImportSegment_Call { - return &MockManager_AllocImportSegment_Call{Call: _e.mock.On("AllocImportSegment", ctx, taskID, collectionID, partitionID, channelName)} +// - ctx context.Context +// - taskID int64 +// - collectionID int64 +// - partitionID int64 +// - channelName string +// - level datapb.SegmentLevel +func (_e *MockManager_Expecter) AllocImportSegment(ctx interface{}, taskID interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, level interface{}) *MockManager_AllocImportSegment_Call { + return &MockManager_AllocImportSegment_Call{Call: _e.mock.On("AllocImportSegment", ctx, taskID, collectionID, partitionID, channelName, level)} } -func (_c *MockManager_AllocImportSegment_Call) Run(run func(ctx context.Context, taskID int64, collectionID int64, partitionID int64, channelName string)) *MockManager_AllocImportSegment_Call { +func (_c *MockManager_AllocImportSegment_Call) Run(run func(ctx context.Context, taskID int64, collectionID int64, partitionID int64, channelName string, level datapb.SegmentLevel)) *MockManager_AllocImportSegment_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(int64), args[4].(string)) + run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(int64), args[4].(string), args[5].(datapb.SegmentLevel)) }) return _c } @@ -74,7 +76,7 @@ func (_c *MockManager_AllocImportSegment_Call) Return(_a0 *SegmentInfo, _a1 erro return _c } -func (_c *MockManager_AllocImportSegment_Call) RunAndReturn(run func(context.Context, int64, int64, int64, string) (*SegmentInfo, error)) *MockManager_AllocImportSegment_Call { +func (_c *MockManager_AllocImportSegment_Call) RunAndReturn(run func(context.Context, int64, int64, int64, string, datapb.SegmentLevel) (*SegmentInfo, error)) *MockManager_AllocImportSegment_Call { _c.Call.Return(run) return _c } @@ -111,11 +113,11 @@ type MockManager_AllocSegment_Call struct { } // AllocSegment is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - partitionID int64 -// - channelName string -// - requestRows int64 +// - ctx context.Context +// - collectionID int64 +// - partitionID int64 +// - channelName string +// - requestRows int64 func (_e *MockManager_Expecter) AllocSegment(ctx interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, requestRows interface{}) *MockManager_AllocSegment_Call { return &MockManager_AllocSegment_Call{Call: _e.mock.On("AllocSegment", ctx, collectionID, partitionID, channelName, requestRows)} } @@ -148,8 +150,8 @@ type MockManager_DropSegment_Call struct { } // DropSegment is a helper method to define mock.On call -// - ctx context.Context -// - segmentID int64 +// - ctx context.Context +// - segmentID int64 func (_e *MockManager_Expecter) DropSegment(ctx interface{}, segmentID interface{}) *MockManager_DropSegment_Call { return &MockManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, segmentID)} } @@ -182,8 +184,8 @@ type MockManager_DropSegmentsOfChannel_Call struct { } // DropSegmentsOfChannel is a helper method to define mock.On call -// - ctx context.Context -// - channel string +// - ctx context.Context +// - channel string func (_e *MockManager_Expecter) DropSegmentsOfChannel(ctx interface{}, channel interface{}) *MockManager_DropSegmentsOfChannel_Call { return &MockManager_DropSegmentsOfChannel_Call{Call: _e.mock.On("DropSegmentsOfChannel", ctx, channel)} } @@ -225,8 +227,8 @@ type MockManager_ExpireAllocations_Call struct { } // ExpireAllocations is a helper method to define mock.On call -// - channel string -// - ts uint64 +// - channel string +// - ts uint64 func (_e *MockManager_Expecter) ExpireAllocations(channel interface{}, ts interface{}) *MockManager_ExpireAllocations_Call { return &MockManager_ExpireAllocations_Call{Call: _e.mock.On("ExpireAllocations", channel, ts)} } @@ -268,9 +270,9 @@ type MockManager_FlushImportSegments_Call struct { } // FlushImportSegments is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - segmentIDs []int64 +// - ctx context.Context +// - collectionID int64 +// - segmentIDs []int64 func (_e *MockManager_Expecter) FlushImportSegments(ctx interface{}, collectionID interface{}, segmentIDs interface{}) *MockManager_FlushImportSegments_Call { return &MockManager_FlushImportSegments_Call{Call: _e.mock.On("FlushImportSegments", ctx, collectionID, segmentIDs)} } @@ -324,9 +326,9 @@ type MockManager_GetFlushableSegments_Call struct { } // GetFlushableSegments is a helper method to define mock.On call -// - ctx context.Context -// - channel string -// - ts uint64 +// - ctx context.Context +// - channel string +// - ts uint64 func (_e *MockManager_Expecter) GetFlushableSegments(ctx interface{}, channel interface{}, ts interface{}) *MockManager_GetFlushableSegments_Call { return &MockManager_GetFlushableSegments_Call{Call: _e.mock.On("GetFlushableSegments", ctx, channel, ts)} } @@ -380,9 +382,9 @@ type MockManager_SealAllSegments_Call struct { } // SealAllSegments is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - segIDs []int64 +// - ctx context.Context +// - collectionID int64 +// - segIDs []int64 func (_e *MockManager_Expecter) SealAllSegments(ctx interface{}, collectionID interface{}, segIDs interface{}) *MockManager_SealAllSegments_Call { return &MockManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, collectionID, segIDs)} } @@ -404,65 +406,6 @@ func (_c *MockManager_SealAllSegments_Call) RunAndReturn(run func(context.Contex return _c } -// allocSegmentForImport provides a mock function with given fields: ctx, collectionID, partitionID, channelName, requestRows, taskID -func (_m *MockManager) allocSegmentForImport(ctx context.Context, collectionID int64, partitionID int64, channelName string, requestRows int64, taskID int64) (*Allocation, error) { - ret := _m.Called(ctx, collectionID, partitionID, channelName, requestRows, taskID) - - var r0 *Allocation - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, string, int64, int64) (*Allocation, error)); ok { - return rf(ctx, collectionID, partitionID, channelName, requestRows, taskID) - } - if rf, ok := ret.Get(0).(func(context.Context, int64, int64, string, int64, int64) *Allocation); ok { - r0 = rf(ctx, collectionID, partitionID, channelName, requestRows, taskID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*Allocation) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, int64, int64, string, int64, int64) error); ok { - r1 = rf(ctx, collectionID, partitionID, channelName, requestRows, taskID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockManager_allocSegmentForImport_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'allocSegmentForImport' -type MockManager_allocSegmentForImport_Call struct { - *mock.Call -} - -// allocSegmentForImport is a helper method to define mock.On call -// - ctx context.Context -// - collectionID int64 -// - partitionID int64 -// - channelName string -// - requestRows int64 -// - taskID int64 -func (_e *MockManager_Expecter) allocSegmentForImport(ctx interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, requestRows interface{}, taskID interface{}) *MockManager_allocSegmentForImport_Call { - return &MockManager_allocSegmentForImport_Call{Call: _e.mock.On("allocSegmentForImport", ctx, collectionID, partitionID, channelName, requestRows, taskID)} -} - -func (_c *MockManager_allocSegmentForImport_Call) Run(run func(ctx context.Context, collectionID int64, partitionID int64, channelName string, requestRows int64, taskID int64)) *MockManager_allocSegmentForImport_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(string), args[4].(int64), args[5].(int64)) - }) - return _c -} - -func (_c *MockManager_allocSegmentForImport_Call) Return(_a0 *Allocation, _a1 error) *MockManager_allocSegmentForImport_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockManager_allocSegmentForImport_Call) RunAndReturn(run func(context.Context, int64, int64, string, int64, int64) (*Allocation, error)) *MockManager_allocSegmentForImport_Call { - _c.Call.Return(run) - return _c -} - // NewMockManager creates a new instance of MockManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockManager(t interface { diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index a113552e073a7..6c7cfe0ef52a7 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -76,7 +76,7 @@ type Manager interface { // AllocSegment allocates rows and record the allocation. AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) - AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, partitionID UniqueID, channelName string) (*SegmentInfo, error) + AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, partitionID UniqueID, channelName string, level datapb.SegmentLevel) (*SegmentInfo, error) // DropSegment drops the segment from manager. DropSegment(ctx context.Context, segmentID UniqueID) // FlushImportSegments set importing segment state to Flushed. @@ -350,7 +350,7 @@ func (s *SegmentManager) genExpireTs(ctx context.Context) (Timestamp, error) { } func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, - partitionID UniqueID, channelName string, + partitionID UniqueID, channelName string, level datapb.SegmentLevel, ) (*SegmentInfo, error) { log := log.Ctx(ctx) ctx, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "open-Segment") @@ -378,7 +378,7 @@ func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, c NumOfRows: 0, State: commonpb.SegmentState_Importing, MaxRowNum: 0, - Level: datapb.SegmentLevel_L1, + Level: level, LastExpireTime: math.MaxUint64, StartPosition: position, DmlPosition: position, @@ -394,9 +394,10 @@ func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, c s.segments = append(s.segments, id) log.Info("add import segment done", zap.Int64("taskID", taskID), - zap.Int64("CollectionID", segmentInfo.CollectionID), - zap.Int64("SegmentID", segmentInfo.ID), - zap.String("Channel", segmentInfo.InsertChannel)) + zap.Int64("collectionID", segmentInfo.CollectionID), + zap.Int64("segmentID", segmentInfo.ID), + zap.String("channel", segmentInfo.InsertChannel), + zap.String("level", level.String())) return segment, nil } diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index f869c719ea6f4..42a0e9efc6ef1 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -255,7 +255,7 @@ func TestSegmentManager_AllocImportSegment(t *testing.T) { sm, err := newSegmentManager(meta, alloc) assert.NoError(t, err) - segment, err := sm.AllocImportSegment(ctx, 0, 1, 1, "ch1") + segment, err := sm.AllocImportSegment(ctx, 0, 1, 1, "ch1", datapb.SegmentLevel_L1) assert.NoError(t, err) segment2 := meta.GetSegment(segment.GetID()) assert.NotNil(t, segment2) @@ -269,7 +269,7 @@ func TestSegmentManager_AllocImportSegment(t *testing.T) { assert.NoError(t, err) sm, err := newSegmentManager(meta, alloc) assert.NoError(t, err) - _, err = sm.AllocImportSegment(ctx, 0, 1, 1, "ch1") + _, err = sm.AllocImportSegment(ctx, 0, 1, 1, "ch1", datapb.SegmentLevel_L1) assert.Error(t, err) }) @@ -281,7 +281,7 @@ func TestSegmentManager_AllocImportSegment(t *testing.T) { assert.NoError(t, err) sm, err := newSegmentManager(meta, alloc) assert.NoError(t, err) - _, err = sm.AllocImportSegment(ctx, 0, 1, 1, "ch1") + _, err = sm.AllocImportSegment(ctx, 0, 1, 1, "ch1", datapb.SegmentLevel_L1) assert.Error(t, err) }) @@ -295,7 +295,7 @@ func TestSegmentManager_AllocImportSegment(t *testing.T) { catalog := mocks.NewDataCoordCatalog(t) catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(mockErr) meta.catalog = catalog - _, err = sm.AllocImportSegment(ctx, 0, 1, 1, "ch1") + _, err = sm.AllocImportSegment(ctx, 0, 1, 1, "ch1", datapb.SegmentLevel_L1) assert.Error(t, err) }) } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index e8cbe8e8d1612..50ae78a9691d3 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -835,7 +835,7 @@ func (s *spySegmentManager) allocSegmentForImport(ctx context.Context, collectio return nil, nil } -func (s *spySegmentManager) AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, partitionID UniqueID, channelName string) (*SegmentInfo, error) { +func (s *spySegmentManager) AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, partitionID UniqueID, channelName string, level datapb.SegmentLevel) (*SegmentInfo, error) { return nil, nil } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 5ae29788fa0c9..f6a1996ebe4ae 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -713,6 +713,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf zap.Int("# of flushed segments", len(channelInfo.GetFlushedSegmentIds())), zap.Int("# of dropped segments", len(channelInfo.GetDroppedSegmentIds())), zap.Int("# of indexed segments", len(channelInfo.GetIndexedSegmentIds())), + zap.Int("# of l0 segments", len(channelInfo.GetLevelZeroSegmentIds())), ) flushedIDs.Insert(channelInfo.GetFlushedSegmentIds()...) } @@ -837,6 +838,7 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI zap.Int("# of flushed segments", len(channelInfo.GetFlushedSegmentIds())), zap.Int("# of dropped segments", len(channelInfo.GetDroppedSegmentIds())), zap.Int("# of indexed segments", len(channelInfo.GetIndexedSegmentIds())), + zap.Int("# of l0 segments", len(channelInfo.GetLevelZeroSegmentIds())), ) flushedIDs.Insert(channelInfo.GetFlushedSegmentIds()...) } diff --git a/internal/datanode/importv2/hash.go b/internal/datanode/importv2/hash.go index 8fa72a2168eed..b7070527d8104 100644 --- a/internal/datanode/importv2/hash.go +++ b/internal/datanode/importv2/hash.go @@ -55,6 +55,9 @@ func HashData(task Task, rows *storage.InsertData) (HashedData, error) { } partKeyField, _ := typeutil.GetPartitionKeyFieldSchema(schema) + id1 := pkField.GetFieldID() + id2 := partKeyField.GetFieldID() + f1 := hashByVChannel(int64(channelNum), pkField) f2 := hashByPartition(int64(partitionNum), partKeyField) @@ -65,7 +68,7 @@ func HashData(task Task, rows *storage.InsertData) (HashedData, error) { for i := 0; i < rows.GetRowNum(); i++ { row := rows.GetRow(i) - p1, p2 := f1(row), f2(row) + p1, p2 := f1(row[id1]), f2(row[id2]) err = res[p1][p2].Append(row) if err != nil { return nil, err @@ -74,6 +77,33 @@ func HashData(task Task, rows *storage.InsertData) (HashedData, error) { return res, nil } +func HashDeleteData(task Task, delData *storage.DeleteData) ([]*storage.DeleteData, error) { + var ( + schema = typeutil.AppendSystemFields(task.GetSchema()) + channelNum = len(task.GetVchannels()) + ) + + pkField, err := typeutil.GetPrimaryFieldSchema(schema) + if err != nil { + return nil, err + } + + f1 := hashByVChannel(int64(channelNum), pkField) + + res := make([]*storage.DeleteData, channelNum) + for i := 0; i < channelNum; i++ { + res[i] = storage.NewDeleteData(nil, nil) + } + + for i := 0; i < int(delData.RowCount); i++ { + pk := delData.Pks[i] + ts := delData.Tss[i] + p := f1(pk.GetValue()) + res[p].Append(pk, ts) + } + return res, nil +} + func GetRowsStats(task Task, rows *storage.InsertData) (map[string]*datapb.PartitionImportStats, error) { var ( schema = task.GetSchema() @@ -87,6 +117,9 @@ func GetRowsStats(task Task, rows *storage.InsertData) (map[string]*datapb.Parti } partKeyField, _ := typeutil.GetPartitionKeyFieldSchema(schema) + id1 := pkField.GetFieldID() + id2 := partKeyField.GetFieldID() + hashRowsCount := make([][]int, channelNum) hashDataSize := make([][]int, channelNum) for i := 0; i < channelNum; i++ { @@ -104,7 +137,7 @@ func GetRowsStats(task Task, rows *storage.InsertData) (map[string]*datapb.Parti return fieldID != pkField.GetFieldID() }) for i := 0; i < rowNum; i++ { - p1, p2 := fn1(id, num), fn2(rows.GetRow(i)) + p1, p2 := fn1(id, num), fn2(rows.GetRow(i)[id2]) hashRowsCount[p1][p2]++ hashDataSize[p1][p2] += rows.GetRowSize(i) id++ @@ -114,7 +147,7 @@ func GetRowsStats(task Task, rows *storage.InsertData) (map[string]*datapb.Parti f2 := hashByPartition(int64(partitionNum), partKeyField) for i := 0; i < rowNum; i++ { row := rows.GetRow(i) - p1, p2 := f1(row), f2(row) + p1, p2 := f1(row[id1]), f2(row[id2]) hashRowsCount[p1][p2]++ hashDataSize[p1][p2] += rows.GetRowSize(i) } @@ -138,22 +171,62 @@ func GetRowsStats(task Task, rows *storage.InsertData) (map[string]*datapb.Parti return res, nil } -func hashByVChannel(channelNum int64, pkField *schemapb.FieldSchema) func(row map[int64]interface{}) int64 { +func GetDeleteStats(task Task, delData *storage.DeleteData) (map[string]*datapb.PartitionImportStats, error) { + var ( + schema = typeutil.AppendSystemFields(task.GetSchema()) + channelNum = len(task.GetVchannels()) + ) + + pkField, err := typeutil.GetPrimaryFieldSchema(schema) + if err != nil { + return nil, err + } + + f1 := hashByVChannel(int64(channelNum), pkField) + + hashRowsCount := make([][]int, channelNum) + hashDataSize := make([][]int, channelNum) + for i := 0; i < channelNum; i++ { + hashRowsCount[i] = make([]int, 1) + hashDataSize[i] = make([]int, 1) + } + + for i := 0; i < int(delData.RowCount); i++ { + pk := delData.Pks[i] + p := f1(pk.GetValue()) + hashRowsCount[p][0]++ + hashDataSize[p][0] += int(pk.Size()) + 8 // pk + ts + } + + res := make(map[string]*datapb.PartitionImportStats) + for i := range hashRowsCount { + channel := task.GetVchannels()[i] + partition := task.GetPartitionIDs()[0] + res[channel] = &datapb.PartitionImportStats{ + PartitionRows: make(map[int64]int64), + PartitionDataSize: make(map[int64]int64), + } + res[channel].PartitionRows[partition] = int64(hashRowsCount[i][0]) + res[channel].PartitionDataSize[partition] = int64(hashDataSize[i][0]) + } + + return res, nil +} + +func hashByVChannel(channelNum int64, pkField *schemapb.FieldSchema) func(pk any) int64 { if channelNum == 1 || pkField == nil { - return func(_ map[int64]interface{}) int64 { + return func(_ any) int64 { return 0 } } switch pkField.GetDataType() { case schemapb.DataType_Int64: - return func(row map[int64]interface{}) int64 { - pk := row[pkField.GetFieldID()] + return func(pk any) int64 { hash, _ := typeutil.Hash32Int64(pk.(int64)) return int64(hash) % channelNum } case schemapb.DataType_VarChar: - return func(row map[int64]interface{}) int64 { - pk := row[pkField.GetFieldID()] + return func(pk any) int64 { hash := typeutil.HashString2Uint32(pk.(string)) return int64(hash) % channelNum } @@ -162,23 +235,21 @@ func hashByVChannel(channelNum int64, pkField *schemapb.FieldSchema) func(row ma } } -func hashByPartition(partitionNum int64, partField *schemapb.FieldSchema) func(row map[int64]interface{}) int64 { +func hashByPartition(partitionNum int64, partField *schemapb.FieldSchema) func(key any) int64 { if partitionNum == 1 { - return func(_ map[int64]interface{}) int64 { + return func(_ any) int64 { return 0 } } switch partField.GetDataType() { case schemapb.DataType_Int64: - return func(row map[int64]interface{}) int64 { - data := row[partField.GetFieldID()] - hash, _ := typeutil.Hash32Int64(data.(int64)) + return func(key any) int64 { + hash, _ := typeutil.Hash32Int64(key.(int64)) return int64(hash) % partitionNum } case schemapb.DataType_VarChar: - return func(row map[int64]interface{}) int64 { - data := row[partField.GetFieldID()] - hash := typeutil.HashString2Uint32(data.(string)) + return func(key any) int64 { + hash := typeutil.HashString2Uint32(key.(string)) return int64(hash) % partitionNum } default: diff --git a/internal/datanode/importv2/scheduler_test.go b/internal/datanode/importv2/scheduler_test.go index abb5c6d4a24e8..112d537f3a4aa 100644 --- a/internal/datanode/importv2/scheduler_test.go +++ b/internal/datanode/importv2/scheduler_test.go @@ -18,11 +18,8 @@ package importv2 import ( "context" - rand2 "crypto/rand" "encoding/json" - "fmt" "io" - "math/rand" "strconv" "strings" "sync" @@ -30,7 +27,6 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -42,10 +38,10 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/importutilv2" + "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) type sampleRow struct { @@ -119,113 +115,6 @@ func (s *SchedulerSuite) SetupTest() { s.scheduler = NewScheduler(s.manager).(*scheduler) } -func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData { - insertData, err := storage.NewInsertData(schema) - assert.NoError(t, err) - for _, field := range schema.GetFields() { - if field.GetAutoID() && field.GetIsPrimaryKey() { - continue - } - switch field.GetDataType() { - case schemapb.DataType_Bool: - boolData := make([]bool, 0) - for i := 0; i < rowCount; i++ { - boolData = append(boolData, i%3 != 0) - } - insertData.Data[field.GetFieldID()] = &storage.BoolFieldData{Data: boolData} - case schemapb.DataType_Float: - floatData := make([]float32, 0) - for i := 0; i < rowCount; i++ { - floatData = append(floatData, float32(i/2)) - } - insertData.Data[field.GetFieldID()] = &storage.FloatFieldData{Data: floatData} - case schemapb.DataType_Double: - doubleData := make([]float64, 0) - for i := 0; i < rowCount; i++ { - doubleData = append(doubleData, float64(i/5)) - } - insertData.Data[field.GetFieldID()] = &storage.DoubleFieldData{Data: doubleData} - case schemapb.DataType_Int8: - int8Data := make([]int8, 0) - for i := 0; i < rowCount; i++ { - int8Data = append(int8Data, int8(i%256)) - } - insertData.Data[field.GetFieldID()] = &storage.Int8FieldData{Data: int8Data} - case schemapb.DataType_Int16: - int16Data := make([]int16, 0) - for i := 0; i < rowCount; i++ { - int16Data = append(int16Data, int16(i%65536)) - } - insertData.Data[field.GetFieldID()] = &storage.Int16FieldData{Data: int16Data} - case schemapb.DataType_Int32: - int32Data := make([]int32, 0) - for i := 0; i < rowCount; i++ { - int32Data = append(int32Data, int32(i%1000)) - } - insertData.Data[field.GetFieldID()] = &storage.Int32FieldData{Data: int32Data} - case schemapb.DataType_Int64: - int64Data := make([]int64, 0) - for i := 0; i < rowCount; i++ { - int64Data = append(int64Data, int64(i)) - } - insertData.Data[field.GetFieldID()] = &storage.Int64FieldData{Data: int64Data} - case schemapb.DataType_BinaryVector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - binVecData := make([]byte, 0) - total := rowCount * int(dim) / 8 - for i := 0; i < total; i++ { - binVecData = append(binVecData, byte(i%256)) - } - insertData.Data[field.GetFieldID()] = &storage.BinaryVectorFieldData{Data: binVecData, Dim: int(dim)} - case schemapb.DataType_FloatVector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - floatVecData := make([]float32, 0) - total := rowCount * int(dim) - for i := 0; i < total; i++ { - floatVecData = append(floatVecData, rand.Float32()) - } - insertData.Data[field.GetFieldID()] = &storage.FloatVectorFieldData{Data: floatVecData, Dim: int(dim)} - case schemapb.DataType_Float16Vector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - total := int64(rowCount) * dim * 2 - float16VecData := make([]byte, total) - _, err = rand2.Read(float16VecData) - assert.NoError(t, err) - insertData.Data[field.GetFieldID()] = &storage.Float16VectorFieldData{Data: float16VecData, Dim: int(dim)} - case schemapb.DataType_String, schemapb.DataType_VarChar: - varcharData := make([]string, 0) - for i := 0; i < rowCount; i++ { - varcharData = append(varcharData, strconv.Itoa(i)) - } - insertData.Data[field.GetFieldID()] = &storage.StringFieldData{Data: varcharData} - case schemapb.DataType_JSON: - jsonData := make([][]byte, 0) - for i := 0; i < rowCount; i++ { - jsonData = append(jsonData, []byte(fmt.Sprintf("{\"y\": %d}", i))) - } - insertData.Data[field.GetFieldID()] = &storage.JSONFieldData{Data: jsonData} - case schemapb.DataType_Array: - arrayData := make([]*schemapb.ScalarField, 0) - for i := 0; i < rowCount; i++ { - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: []int32{int32(i), int32(i + 1), int32(i + 2)}, - }, - }, - }) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - default: - panic(fmt.Sprintf("unexpected data type: %s", field.GetDataType().String())) - } - } - return insertData -} - func (s *SchedulerSuite) TestScheduler_Slots() { preimportReq := &datapb.PreImportRequest{ JobID: 1, @@ -465,7 +354,8 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() { } var once sync.Once - data := createInsertData(s.T(), s.schema, s.numRows) + data, err := testutil.CreateInsertData(s.schema, s.numRows) + s.NoError(err) s.reader = importutilv2.NewMockReader(s.T()) s.reader.EXPECT().Size().Return(1024, nil) s.reader.EXPECT().Read().RunAndReturn(func() (*storage.InsertData, error) { @@ -489,7 +379,7 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() { } preimportTask := NewPreImportTask(preimportReq, s.manager, s.cm) s.manager.Add(preimportTask) - err := preimportTask.(*PreImportTask).readFileStat(s.reader, preimportTask, 0) + err = preimportTask.(*PreImportTask).readFileStat(s.reader, preimportTask, 0) s.NoError(err) } @@ -501,7 +391,8 @@ func (s *SchedulerSuite) TestScheduler_ImportFile() { return future }) var once sync.Once - data := createInsertData(s.T(), s.schema, s.numRows) + data, err := testutil.CreateInsertData(s.schema, s.numRows) + s.NoError(err) s.reader = importutilv2.NewMockReader(s.T()) s.reader.EXPECT().Read().RunAndReturn(func() (*storage.InsertData, error) { var res *storage.InsertData @@ -540,7 +431,7 @@ func (s *SchedulerSuite) TestScheduler_ImportFile() { } importTask := NewImportTask(importReq, s.manager, s.syncMgr, s.cm) s.manager.Add(importTask) - err := importTask.(*ImportTask).importFile(s.reader, importTask) + err = importTask.(*ImportTask).importFile(s.reader, importTask) s.NoError(err) } diff --git a/internal/datanode/importv2/task.go b/internal/datanode/importv2/task.go index d349bf833bb03..023c23c0078d4 100644 --- a/internal/datanode/importv2/task.go +++ b/internal/datanode/importv2/task.go @@ -28,13 +28,17 @@ import ( type TaskType int const ( - PreImportTaskType TaskType = 0 - ImportTaskType TaskType = 1 + PreImportTaskType TaskType = 0 + ImportTaskType TaskType = 1 + L0PreImportTaskType TaskType = 2 + L0ImportTaskType TaskType = 3 ) var ImportTaskTypeName = map[TaskType]string{ 0: "PreImportTask", 1: "ImportTask", + 2: "L0PreImportTaskType", + 3: "L0ImportTaskType", } func (t TaskType) String() string { @@ -69,6 +73,10 @@ func UpdateState(state datapb.ImportTaskStateV2) UpdateAction { t.(*PreImportTask).PreImportTask.State = state case ImportTaskType: t.(*ImportTask).ImportTaskV2.State = state + case L0PreImportTaskType: + t.(*L0PreImportTask).PreImportTask.State = state + case L0ImportTaskType: + t.(*L0ImportTask).ImportTaskV2.State = state } } } @@ -80,17 +88,28 @@ func UpdateReason(reason string) UpdateAction { t.(*PreImportTask).PreImportTask.Reason = reason case ImportTaskType: t.(*ImportTask).ImportTaskV2.Reason = reason + case L0PreImportTaskType: + t.(*L0PreImportTask).PreImportTask.Reason = reason + case L0ImportTaskType: + t.(*L0ImportTask).ImportTaskV2.Reason = reason } } } func UpdateFileStat(idx int, fileStat *datapb.ImportFileStats) UpdateAction { return func(task Task) { - if it, ok := task.(*PreImportTask); ok { - it.PreImportTask.FileStats[idx].FileSize = fileStat.GetFileSize() - it.PreImportTask.FileStats[idx].TotalRows = fileStat.GetTotalRows() - it.PreImportTask.FileStats[idx].TotalMemorySize = fileStat.GetTotalMemorySize() - it.PreImportTask.FileStats[idx].HashedStats = fileStat.GetHashedStats() + var t *datapb.PreImportTask + switch it := task.(type) { + case *PreImportTask: + t = it.PreImportTask + case *L0PreImportTask: + t = it.PreImportTask + } + if t != nil { + t.FileStats[idx].FileSize = fileStat.GetFileSize() + t.FileStats[idx].TotalRows = fileStat.GetTotalRows() + t.FileStats[idx].TotalMemorySize = fileStat.GetTotalMemorySize() + t.FileStats[idx].HashedStats = fileStat.GetHashedStats() } } } @@ -110,15 +129,23 @@ func UpdateSegmentInfo(info *datapb.ImportSegmentInfo) UpdateAction { return current } return func(task Task) { - if it, ok := task.(*ImportTask); ok { + var segmentsInfo map[int64]*datapb.ImportSegmentInfo + switch it := task.(type) { + case *ImportTask: + segmentsInfo = it.segmentsInfo + case *L0ImportTask: + segmentsInfo = it.segmentsInfo + } + if segmentsInfo != nil { segment := info.GetSegmentID() - if _, ok = it.segmentsInfo[segment]; ok { - it.segmentsInfo[segment].ImportedRows = info.GetImportedRows() - it.segmentsInfo[segment].Binlogs = mergeFn(it.segmentsInfo[segment].Binlogs, info.GetBinlogs()) - it.segmentsInfo[segment].Statslogs = mergeFn(it.segmentsInfo[segment].Statslogs, info.GetStatslogs()) + if _, ok := segmentsInfo[segment]; ok { + segmentsInfo[segment].ImportedRows = info.GetImportedRows() + segmentsInfo[segment].Binlogs = mergeFn(segmentsInfo[segment].Binlogs, info.GetBinlogs()) + segmentsInfo[segment].Statslogs = mergeFn(segmentsInfo[segment].Statslogs, info.GetStatslogs()) + segmentsInfo[segment].Deltalogs = mergeFn(segmentsInfo[segment].Deltalogs, info.GetDeltalogs()) return } - it.segmentsInfo[segment] = info + segmentsInfo[segment] = info } } } diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index 4e6405a1ff813..b5094bbd650cf 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -36,7 +36,6 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) type ImportTask struct { @@ -78,29 +77,10 @@ func NewImportTask(req *datapb.ImportRequest, syncMgr: syncMgr, cm: cm, } - task.initMetaCaches(req) + task.metaCaches = NewMetaCache(req) return task } -func (t *ImportTask) initMetaCaches(req *datapb.ImportRequest) { - metaCaches := make(map[string]metacache.MetaCache) - schema := typeutil.AppendSystemFields(req.GetSchema()) - for _, channel := range req.GetVchannels() { - info := &datapb.ChannelWatchInfo{ - Vchan: &datapb.VchannelInfo{ - CollectionID: req.GetCollectionID(), - ChannelName: channel, - }, - Schema: schema, - } - metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) *metacache.BloomFilterSet { - return metacache.NewBloomFilterSet() - }) - metaCaches[channel] = metaCache - } - t.metaCaches = metaCaches -} - func (t *ImportTask) GetType() TaskType { return ImportTaskType } @@ -210,7 +190,7 @@ func (t *ImportTask) importFile(reader importutilv2.Reader, task Task) error { return err } for _, syncTask := range syncTasks { - segmentInfo, err := NewImportSegmentInfo(syncTask, iTask) + segmentInfo, err := NewImportSegmentInfo(syncTask, iTask.metaCaches) if err != nil { return err } @@ -231,8 +211,9 @@ func (t *ImportTask) sync(task *ImportTask, hashedData HashedData) ([]*conc.Futu continue } partitionID := task.GetPartitionIDs()[partitionIdx] - segmentID := PickSegment(task, channel, partitionID) - syncTask, err := NewSyncTask(task.ctx, task, segmentID, partitionID, channel, data) + segmentID := PickSegment(task.req.GetRequestSegments(), channel, partitionID) + syncTask, err := NewSyncTask(task.ctx, task.metaCaches, task.req.GetTs(), + segmentID, partitionID, task.GetCollectionID(), channel, data, nil) if err != nil { return nil, nil, err } diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go new file mode 100644 index 0000000000000..48d35d2f830fc --- /dev/null +++ b/internal/datanode/importv2/task_l0_import.go @@ -0,0 +1,222 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importv2 + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/cockroachdb/errors" + "github.com/golang/protobuf/proto" + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2/binlog" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type L0ImportTask struct { + *datapb.ImportTaskV2 + ctx context.Context + cancel context.CancelFunc + segmentsInfo map[int64]*datapb.ImportSegmentInfo + req *datapb.ImportRequest + + manager TaskManager + syncMgr syncmgr.SyncManager + cm storage.ChunkManager + metaCaches map[string]metacache.MetaCache +} + +func NewL0ImportTask(req *datapb.ImportRequest, + manager TaskManager, + syncMgr syncmgr.SyncManager, + cm storage.ChunkManager, +) Task { + ctx, cancel := context.WithCancel(context.Background()) + task := &L0ImportTask{ + ImportTaskV2: &datapb.ImportTaskV2{ + JobID: req.GetJobID(), + TaskID: req.GetTaskID(), + CollectionID: req.GetCollectionID(), + State: datapb.ImportTaskStateV2_Pending, + }, + ctx: ctx, + cancel: cancel, + segmentsInfo: make(map[int64]*datapb.ImportSegmentInfo), + req: req, + manager: manager, + syncMgr: syncMgr, + cm: cm, + } + task.metaCaches = NewMetaCache(req) + return task +} + +func (t *L0ImportTask) GetType() TaskType { + return L0ImportTaskType +} + +func (t *L0ImportTask) GetPartitionIDs() []int64 { + return t.req.GetPartitionIDs() +} + +func (t *L0ImportTask) GetVchannels() []string { + return t.req.GetVchannels() +} + +func (t *L0ImportTask) GetSchema() *schemapb.CollectionSchema { + return t.req.GetSchema() +} + +func (t *L0ImportTask) Cancel() { + t.cancel() +} + +func (t *L0ImportTask) GetSegmentsInfo() []*datapb.ImportSegmentInfo { + return lo.Values(t.segmentsInfo) +} + +func (t *L0ImportTask) Clone() Task { + ctx, cancel := context.WithCancel(t.ctx) + return &L0ImportTask{ + ImportTaskV2: proto.Clone(t.ImportTaskV2).(*datapb.ImportTaskV2), + ctx: ctx, + cancel: cancel, + segmentsInfo: t.segmentsInfo, + req: t.req, + metaCaches: t.metaCaches, + } +} + +func (t *L0ImportTask) Execute() []*conc.Future[any] { + bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024 + log.Info("start to import l0", WrapLogFields(t, + zap.Int("bufferSize", bufferSize), + zap.Any("schema", t.GetSchema()))...) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) + + fn := func() (err error) { + defer func() { + if err != nil { + log.Warn("l0 import task execute failed", WrapLogFields(t, zap.Error(err))...) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) + } + }() + + if len(t.req.GetFiles()) != 1 { + err = merr.WrapErrImportFailed( + fmt.Sprintf("there should be one prefix for l0 import, but got %v", t.req.GetFiles())) + return + } + pkField, err := typeutil.GetPrimaryFieldSchema(t.GetSchema()) + if err != nil { + return + } + reader, err := binlog.NewL0Reader(t.ctx, t.cm, pkField, t.req.GetFiles()[0], bufferSize) + if err != nil { + return + } + start := time.Now() + err = t.importL0(reader, t) + if err != nil { + return + } + log.Info("l0 import done", WrapLogFields(t, + zap.Strings("l0 prefix", t.req.GetFiles()[0].GetPaths()), + zap.Duration("dur", time.Since(start)))...) + return nil + } + + f := GetExecPool().Submit(func() (any, error) { + err := fn() + return err, err + }) + return []*conc.Future[any]{f} +} + +func (t *L0ImportTask) importL0(reader binlog.L0Reader, task Task) error { + iTask := task.(*L0ImportTask) + syncFutures := make([]*conc.Future[struct{}], 0) + syncTasks := make([]syncmgr.Task, 0) + for { + data, err := reader.Read() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + delData, err := HashDeleteData(iTask, data) + if err != nil { + return err + } + fs, sts, err := t.syncDelete(iTask, delData) + if err != nil { + return err + } + syncFutures = append(syncFutures, fs...) + syncTasks = append(syncTasks, sts...) + } + err := conc.AwaitAll(syncFutures...) + if err != nil { + return err + } + for _, syncTask := range syncTasks { + segmentInfo, err := NewImportSegmentInfo(syncTask, iTask.metaCaches) + if err != nil { + return err + } + t.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo)) + log.Info("sync l0 data done", WrapLogFields(task, zap.Any("segmentInfo", segmentInfo))...) + } + return nil +} + +func (t *L0ImportTask) syncDelete(task *L0ImportTask, delData []*storage.DeleteData) ([]*conc.Future[struct{}], []syncmgr.Task, error) { + log.Info("start to sync l0 delete data", WrapLogFields(task)...) + futures := make([]*conc.Future[struct{}], 0) + syncTasks := make([]syncmgr.Task, 0) + for channelIdx, data := range delData { + channel := task.GetVchannels()[channelIdx] + if data.RowCount == 0 { + continue + } + partitionID := task.GetPartitionIDs()[0] + segmentID := PickSegment(task.req.GetRequestSegments(), channel, partitionID) + syncTask, err := NewSyncTask(task.ctx, task.metaCaches, task.req.GetTs(), + segmentID, partitionID, task.GetCollectionID(), channel, nil, data) + if err != nil { + return nil, nil, err + } + future := t.syncMgr.SyncData(task.ctx, syncTask) + futures = append(futures, future) + syncTasks = append(syncTasks, syncTask) + } + return futures, syncTasks, nil +} diff --git a/internal/datanode/importv2/task_l0_import_test.go b/internal/datanode/importv2/task_l0_import_test.go new file mode 100644 index 0000000000000..9dc1740cd2f90 --- /dev/null +++ b/internal/datanode/importv2/task_l0_import_test.go @@ -0,0 +1,195 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importv2 + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type L0ImportSuite struct { + suite.Suite + + collectionID int64 + partitionID int64 + segmentID int64 + channel string + + delCnt int + deleteData *storage.DeleteData + schema *schemapb.CollectionSchema + + cm storage.ChunkManager + reader *importutilv2.MockReader + syncMgr *syncmgr.MockSyncManager + manager TaskManager +} + +func (s *L0ImportSuite) SetupSuite() { + paramtable.Init() +} + +func (s *L0ImportSuite) SetupTest() { + s.collectionID = 1 + s.partitionID = 2 + s.segmentID = 3 + s.channel = "ch-0" + s.delCnt = 100 + + s.schema = &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + DataType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.MaxLengthKey, Value: "128"}, + }, + }, + }, + } + + s.manager = NewTaskManager() + s.syncMgr = syncmgr.NewMockSyncManager(s.T()) + + deleteData := storage.NewDeleteData(nil, nil) + for i := 0; i < s.delCnt; i++ { + deleteData.Append(storage.NewVarCharPrimaryKey(fmt.Sprintf("No.%d", i)), uint64(i+1)) + } + s.deleteData = deleteData + deleteCodec := storage.NewDeleteCodec() + blob, err := deleteCodec.Serialize(s.collectionID, s.partitionID, s.segmentID, deleteData) + s.NoError(err) + + cm := mocks.NewChunkManager(s.T()) + cm.EXPECT().Read(mock.Anything, mock.Anything).Return(blob.Value, nil) + cm.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, s string, b bool, walkFunc storage.ChunkObjectWalkFunc) error { + for _, file := range []string{"a/b/c/"} { + walkFunc(&storage.ChunkObjectInfo{FilePath: file}) + } + return nil + }) + s.cm = cm +} + +func (s *L0ImportSuite) TestL0PreImport() { + req := &datapb.PreImportRequest{ + JobID: 1, + TaskID: 2, + CollectionID: s.collectionID, + PartitionIDs: []int64{s.partitionID}, + Vchannels: []string{s.channel}, + Schema: s.schema, + ImportFiles: []*internalpb.ImportFile{{Paths: []string{"dummy-prefix"}}}, + } + task := NewL0PreImportTask(req, s.manager, s.cm) + s.manager.Add(task) + fu := task.Execute() + err := conc.AwaitAll(fu...) + s.NoError(err) + l0Task := s.manager.Get(task.GetTaskID()).(*L0PreImportTask) + s.Equal(1, len(l0Task.GetFileStats())) + fileStats := l0Task.GetFileStats()[0] + s.Equal(int64(s.delCnt), fileStats.GetTotalRows()) + s.Equal(s.deleteData.Size(), fileStats.GetTotalMemorySize()) + partitionStats := fileStats.GetHashedStats()[s.channel] + s.Equal(int64(s.delCnt), partitionStats.GetPartitionRows()[s.partitionID]) + s.Equal(s.deleteData.Size(), partitionStats.GetPartitionDataSize()[s.partitionID]) +} + +func (s *L0ImportSuite) TestL0Import() { + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] { + alloc := allocator.NewMockAllocator(s.T()) + alloc.EXPECT().Alloc(mock.Anything).Return(1, int64(s.delCnt)+1, nil) + task.(*syncmgr.SyncTask).WithAllocator(alloc) + + s.cm.(*mocks.ChunkManager).EXPECT().RootPath().Return("mock-rootpath") + s.cm.(*mocks.ChunkManager).EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(nil) + task.(*syncmgr.SyncTask).WithChunkManager(s.cm) + + err := task.Run() + s.NoError(err) + + future := conc.Go(func() (struct{}, error) { + return struct{}{}, nil + }) + return future + }) + + req := &datapb.ImportRequest{ + JobID: 1, + TaskID: 2, + CollectionID: s.collectionID, + PartitionIDs: []int64{s.partitionID}, + Vchannels: []string{s.channel}, + Schema: s.schema, + Files: []*internalpb.ImportFile{{Paths: []string{"dummy-prefix"}}}, + RequestSegments: []*datapb.ImportRequestSegment{ + { + SegmentID: s.segmentID, + PartitionID: s.partitionID, + Vchannel: s.channel, + }, + }, + } + task := NewL0ImportTask(req, s.manager, s.syncMgr, s.cm) + s.manager.Add(task) + fu := task.Execute() + err := conc.AwaitAll(fu...) + s.NoError(err) + + l0Task := s.manager.Get(task.GetTaskID()).(*L0ImportTask) + s.Equal(1, len(l0Task.GetSegmentsInfo())) + + segmentInfo := l0Task.GetSegmentsInfo()[0] + s.Equal(s.segmentID, segmentInfo.GetSegmentID()) + s.Equal(int64(0), segmentInfo.GetImportedRows()) + s.Equal(0, len(segmentInfo.GetBinlogs())) + s.Equal(0, len(segmentInfo.GetStatslogs())) + s.Equal(1, len(segmentInfo.GetDeltalogs())) + + actual := segmentInfo.GetDeltalogs()[0] + s.Equal(1, len(actual.GetBinlogs())) + + deltaLog := actual.GetBinlogs()[0] + s.Equal(int64(s.delCnt), deltaLog.GetEntriesNum()) + s.Equal(s.deleteData.Size(), deltaLog.GetMemorySize()) +} + +func TestL0Import(t *testing.T) { + suite.Run(t, new(L0ImportSuite)) +} diff --git a/internal/datanode/importv2/task_l0_preimport.go b/internal/datanode/importv2/task_l0_preimport.go new file mode 100644 index 0000000000000..89f16c876a5e5 --- /dev/null +++ b/internal/datanode/importv2/task_l0_preimport.go @@ -0,0 +1,194 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importv2 + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/cockroachdb/errors" + "github.com/golang/protobuf/proto" + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2/binlog" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type L0PreImportTask struct { + *datapb.PreImportTask + ctx context.Context + cancel context.CancelFunc + partitionIDs []int64 + vchannels []string + schema *schemapb.CollectionSchema + + manager TaskManager + cm storage.ChunkManager +} + +func NewL0PreImportTask(req *datapb.PreImportRequest, + manager TaskManager, + cm storage.ChunkManager, +) Task { + fileStats := lo.Map(req.GetImportFiles(), func(file *internalpb.ImportFile, _ int) *datapb.ImportFileStats { + return &datapb.ImportFileStats{ + ImportFile: file, + } + }) + ctx, cancel := context.WithCancel(context.Background()) + return &L0PreImportTask{ + PreImportTask: &datapb.PreImportTask{ + JobID: req.GetJobID(), + TaskID: req.GetTaskID(), + CollectionID: req.GetCollectionID(), + State: datapb.ImportTaskStateV2_Pending, + FileStats: fileStats, + }, + ctx: ctx, + cancel: cancel, + partitionIDs: req.GetPartitionIDs(), + vchannels: req.GetVchannels(), + schema: req.GetSchema(), + manager: manager, + cm: cm, + } +} + +func (t *L0PreImportTask) GetPartitionIDs() []int64 { + return t.partitionIDs +} + +func (t *L0PreImportTask) GetVchannels() []string { + return t.vchannels +} + +func (t *L0PreImportTask) GetType() TaskType { + return L0PreImportTaskType +} + +func (t *L0PreImportTask) GetSchema() *schemapb.CollectionSchema { + return t.schema +} + +func (t *L0PreImportTask) Cancel() { + t.cancel() +} + +func (t *L0PreImportTask) Clone() Task { + ctx, cancel := context.WithCancel(t.ctx) + return &L0PreImportTask{ + PreImportTask: proto.Clone(t.PreImportTask).(*datapb.PreImportTask), + ctx: ctx, + cancel: cancel, + partitionIDs: t.GetPartitionIDs(), + vchannels: t.GetVchannels(), + schema: t.GetSchema(), + } +} + +func (t *L0PreImportTask) Execute() []*conc.Future[any] { + bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024 + log.Info("start to preimport l0", WrapLogFields(t, + zap.Int("bufferSize", bufferSize), + zap.Any("schema", t.GetSchema()))...) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) + + fn := func() (err error) { + defer func() { + if err != nil { + log.Warn("l0 import task execute failed", WrapLogFields(t, zap.Error(err))...) + t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error())) + } + }() + + files := lo.Map(t.GetFileStats(), func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile { + return fileStat.GetImportFile() + }) + if len(files) != 1 { + err = merr.WrapErrImportFailed( + fmt.Sprintf("there should be one prefix for l0 import, but got %v", files)) + return + } + pkField, err := typeutil.GetPrimaryFieldSchema(t.GetSchema()) + if err != nil { + return + } + reader, err := binlog.NewL0Reader(t.ctx, t.cm, pkField, files[0], bufferSize) + if err != nil { + return + } + start := time.Now() + err = t.readL0Stat(reader, t) + if err != nil { + return + } + log.Info("l0 preimport done", WrapLogFields(t, + zap.Strings("l0 prefix", files[0].GetPaths()), + zap.Duration("dur", time.Since(start)))...) + return nil + } + + f := GetExecPool().Submit(func() (any, error) { + err := fn() + return err, err + }) + return []*conc.Future[any]{f} +} + +func (t *L0PreImportTask) readL0Stat(reader binlog.L0Reader, task Task) error { + totalRows := 0 + totalSize := 0 + hashedStats := make(map[string]*datapb.PartitionImportStats) + for { + data, err := reader.Read() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + stats, err := GetDeleteStats(task, data) + if err != nil { + return err + } + MergeHashedStats(stats, hashedStats) + rows := int(data.RowCount) + size := int(data.Size()) + totalRows += rows + totalSize += size + log.Info("reading l0 stat...", WrapLogFields(task, zap.Int("readRows", rows), zap.Int("readSize", size))...) + } + + stat := &datapb.ImportFileStats{ + TotalRows: int64(totalRows), + TotalMemorySize: int64(totalSize), + HashedStats: hashedStats, + } + t.manager.Update(task.GetTaskID(), UpdateFileStat(0, stat)) + return nil +} diff --git a/internal/datanode/importv2/task_manager_test.go b/internal/datanode/importv2/task_manager_test.go index 2a54feb3bfc37..d22163da491e1 100644 --- a/internal/datanode/importv2/task_manager_test.go +++ b/internal/datanode/importv2/task_manager_test.go @@ -76,3 +76,73 @@ func TestImportManager(t *testing.T) { tasks = manager.GetBy() assert.Equal(t, 1, len(tasks)) } + +func TestImportManager_L0(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + t.Run("l0 preimport", func(t *testing.T) { + manager := NewTaskManager() + task := &L0PreImportTask{ + PreImportTask: &datapb.PreImportTask{ + JobID: 1, + TaskID: 2, + CollectionID: 3, + NodeID: 7, + State: datapb.ImportTaskStateV2_Pending, + FileStats: []*datapb.ImportFileStats{{ + TotalRows: 50, + }}, + }, + ctx: ctx, + cancel: cancel, + } + manager.Add(task) + res := manager.Get(task.GetTaskID()) + assert.Equal(t, task, res) + + reason := "mock reason" + manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), + UpdateReason(reason), UpdateFileStat(0, &datapb.ImportFileStats{ + TotalRows: 100, + })) + + res = manager.Get(task.GetTaskID()) + assert.Equal(t, datapb.ImportTaskStateV2_Failed, res.GetState()) + assert.Equal(t, reason, res.GetReason()) + assert.Equal(t, int64(100), res.(*L0PreImportTask).GetFileStats()[0].GetTotalRows()) + }) + + t.Run("l0 import", func(t *testing.T) { + manager := NewTaskManager() + task := &L0ImportTask{ + ImportTaskV2: &datapb.ImportTaskV2{ + JobID: 1, + TaskID: 2, + CollectionID: 3, + SegmentIDs: []int64{5, 6}, + NodeID: 7, + State: datapb.ImportTaskStateV2_Pending, + }, + segmentsInfo: map[int64]*datapb.ImportSegmentInfo{ + 10: {ImportedRows: 50}, + }, + ctx: ctx, + cancel: cancel, + } + manager.Add(task) + res := manager.Get(task.GetTaskID()) + assert.Equal(t, task, res) + + reason := "mock reason" + manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), + UpdateReason(reason), UpdateSegmentInfo(&datapb.ImportSegmentInfo{ + SegmentID: 10, + ImportedRows: 100, + })) + + res = manager.Get(task.GetTaskID()) + assert.Equal(t, datapb.ImportTaskStateV2_Failed, res.GetState()) + assert.Equal(t, reason, res.GetReason()) + assert.Equal(t, int64(100), res.(*L0ImportTask).GetSegmentsInfo()[0].GetImportedRows()) + }) +} diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index 648fc24d90bda..1098b3e9edf9c 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -39,21 +39,27 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -func WrapNoTaskError(taskID int64, taskType TaskType) error { - return merr.WrapErrImportFailed(fmt.Sprintf("cannot find %s with id %d", taskType.String(), taskID)) +func WrapTaskNotFoundError(taskID int64) error { + return merr.WrapErrImportFailed(fmt.Sprintf("cannot find import task with id %d", taskID)) } -func NewSyncTask(ctx context.Context, task *ImportTask, segmentID, partitionID int64, vchannel string, insertData *storage.InsertData) (syncmgr.Task, error) { +func NewSyncTask(ctx context.Context, + metaCaches map[string]metacache.MetaCache, + ts uint64, + segmentID, partitionID, collectionID int64, vchannel string, + insertData *storage.InsertData, + deleteData *storage.DeleteData, +) (syncmgr.Task, error) { if params.Params.CommonCfg.EnableStorageV2.GetAsBool() { return nil, merr.WrapErrImportFailed("storage v2 is not supported") // TODO: dyh, resolve storage v2 } - metaCache := task.metaCaches[vchannel] + metaCache := metaCaches[vchannel] if _, ok := metaCache.GetSegmentByID(segmentID); !ok { metaCache.AddSegment(&datapb.SegmentInfo{ ID: segmentID, State: commonpb.SegmentState_Importing, - CollectionID: task.GetCollectionID(), + CollectionID: collectionID, PartitionID: partitionID, InsertChannel: vchannel, }, func(info *datapb.SegmentInfo) *metacache.BloomFilterSet { @@ -74,34 +80,40 @@ func NewSyncTask(ctx context.Context, task *ImportTask, segmentID, partitionID i syncPack := &syncmgr.SyncPack{} syncPack.WithInsertData(insertData). - WithCollectionID(task.GetCollectionID()). + WithDeleteData(deleteData). + WithCollectionID(collectionID). WithPartitionID(partitionID). WithChannelName(vchannel). WithSegmentID(segmentID). - WithTimeRange(task.req.GetTs(), task.req.GetTs()). + WithTimeRange(ts, ts). WithBatchSize(int64(insertData.GetRowNum())) return serializer.EncodeBuffer(ctx, syncPack) } -func NewImportSegmentInfo(syncTask syncmgr.Task, task *ImportTask) (*datapb.ImportSegmentInfo, error) { +func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache.MetaCache) (*datapb.ImportSegmentInfo, error) { segmentID := syncTask.SegmentID() - insertBinlogs, statsBinlog, _ := syncTask.(*syncmgr.SyncTask).Binlogs() - metaCache := task.metaCaches[syncTask.ChannelName()] + insertBinlogs, statsBinlog, deltaLog := syncTask.(*syncmgr.SyncTask).Binlogs() + metaCache := metaCaches[syncTask.ChannelName()] segment, ok := metaCache.GetSegmentByID(segmentID) if !ok { return nil, merr.WrapErrSegmentNotFound(segmentID, "import failed") } + var deltaLogs []*datapb.FieldBinlog + if len(deltaLog.GetBinlogs()) > 0 { + deltaLogs = []*datapb.FieldBinlog{deltaLog} + } return &datapb.ImportSegmentInfo{ SegmentID: segmentID, ImportedRows: segment.FlushedRows(), Binlogs: lo.Values(insertBinlogs), Statslogs: lo.Values(statsBinlog), + Deltalogs: deltaLogs, }, nil } -func PickSegment(task *ImportTask, vchannel string, partitionID int64) int64 { - candidates := lo.Filter(task.req.GetRequestSegments(), func(info *datapb.ImportRequestSegment, _ int) bool { +func PickSegment(segments []*datapb.ImportRequestSegment, vchannel string, partitionID int64) int64 { + candidates := lo.Filter(segments, func(info *datapb.ImportRequestSegment, _ int) bool { return info.GetVchannel() == vchannel && info.GetPartitionID() == partitionID }) @@ -162,13 +174,17 @@ func AppendSystemFieldsData(task *ImportTask, data *storage.InsertData) error { data.Data[pkField.GetFieldID()] = &storage.StringFieldData{Data: strIDs} } } - data.Data[common.RowIDField] = &storage.Int64FieldData{Data: ids} - tss := make([]int64, rowNum) - ts := int64(task.req.GetTs()) - for i := 0; i < rowNum; i++ { - tss[i] = ts + if _, ok := data.Data[common.RowIDField]; !ok { // for binlog import, keep original rowID and ts + data.Data[common.RowIDField] = &storage.Int64FieldData{Data: ids} + } + if _, ok := data.Data[common.TimeStampField]; !ok { + tss := make([]int64, rowNum) + ts := int64(task.req.GetTs()) + for i := 0; i < rowNum; i++ { + tss[i] = ts + } + data.Data[common.TimeStampField] = &storage.Int64FieldData{Data: tss} } - data.Data[common.TimeStampField] = &storage.Int64FieldData{Data: tss} return nil } @@ -212,3 +228,22 @@ func UnsetAutoID(schema *schemapb.CollectionSchema) { } } } + +func NewMetaCache(req *datapb.ImportRequest) map[string]metacache.MetaCache { + metaCaches := make(map[string]metacache.MetaCache) + schema := typeutil.AppendSystemFields(req.GetSchema()) + for _, channel := range req.GetVchannels() { + info := &datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ + CollectionID: req.GetCollectionID(), + ChannelName: channel, + }, + Schema: schema, + } + metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + metaCaches[channel] = metaCache + } + return metaCaches +} diff --git a/internal/datanode/importv2/util_test.go b/internal/datanode/importv2/util_test.go index a7c7d27a86670..15c1c5cf8144a 100644 --- a/internal/datanode/importv2/util_test.go +++ b/internal/datanode/importv2/util_test.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/common" ) @@ -67,11 +68,12 @@ func Test_AppendSystemFieldsData(t *testing.T) { pkField.DataType = schemapb.DataType_Int64 schema.Fields = []*schemapb.FieldSchema{pkField, vecField, int64Field} - insertData := createInsertData(t, schema, count) + insertData, err := testutil.CreateInsertData(schema, count) + assert.NoError(t, err) assert.Equal(t, 0, insertData.Data[pkField.GetFieldID()].RowNum()) assert.Nil(t, insertData.Data[common.RowIDField]) assert.Nil(t, insertData.Data[common.TimeStampField]) - err := AppendSystemFieldsData(task, insertData) + err = AppendSystemFieldsData(task, insertData) assert.NoError(t, err) assert.Equal(t, count, insertData.Data[pkField.GetFieldID()].RowNum()) assert.Equal(t, count, insertData.Data[common.RowIDField].RowNum()) @@ -79,7 +81,8 @@ func Test_AppendSystemFieldsData(t *testing.T) { pkField.DataType = schemapb.DataType_VarChar schema.Fields = []*schemapb.FieldSchema{pkField, vecField, int64Field} - insertData = createInsertData(t, schema, count) + insertData, err = testutil.CreateInsertData(schema, count) + assert.NoError(t, err) assert.Equal(t, 0, insertData.Data[pkField.GetFieldID()].RowNum()) assert.Nil(t, insertData.Data[common.RowIDField]) assert.Nil(t, insertData.Data[common.TimeStampField]) @@ -152,7 +155,7 @@ func Test_PickSegment(t *testing.T) { batchSize := 16 * 1024 * 1024 for totalSize > 0 { - picked := PickSegment(task, vchannel, partitionID) + picked := PickSegment(task.req.GetRequestSegments(), vchannel, partitionID) importedSize[picked] += batchSize totalSize -= batchSize } diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 5a5e7a297688d..bab9519794c9a 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -433,7 +434,12 @@ func (node *DataNode) PreImport(ctx context.Context, req *datapb.PreImportReques return merr.Status(err), nil } - task := importv2.NewPreImportTask(req, node.importTaskMgr, node.chunkManager) + var task importv2.Task + if importutilv2.IsL0Import(req.GetOptions()) { + task = importv2.NewL0PreImportTask(req, node.importTaskMgr, node.chunkManager) + } else { + task = importv2.NewPreImportTask(req, node.importTaskMgr, node.chunkManager) + } node.importTaskMgr.Add(task) log.Info("datanode added preimport task") @@ -452,7 +458,12 @@ func (node *DataNode) ImportV2(ctx context.Context, req *datapb.ImportRequest) ( if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return merr.Status(err), nil } - task := importv2.NewImportTask(req, node.importTaskMgr, node.syncMgr, node.chunkManager) + var task importv2.Task + if importutilv2.IsL0Import(req.GetOptions()) { + task = importv2.NewL0ImportTask(req, node.importTaskMgr, node.syncMgr, node.chunkManager) + } else { + task = importv2.NewImportTask(req, node.importTaskMgr, node.syncMgr, node.chunkManager) + } node.importTaskMgr.Add(task) log.Info("datanode added import task") @@ -468,17 +479,19 @@ func (node *DataNode) QueryPreImport(ctx context.Context, req *datapb.QueryPreIm } status := merr.Success() task := node.importTaskMgr.Get(req.GetTaskID()) - if task == nil || task.GetType() != importv2.PreImportTaskType { - status = merr.Status(importv2.WrapNoTaskError(req.GetTaskID(), importv2.PreImportTaskType)) + if task == nil { + status = merr.Status(importv2.WrapTaskNotFoundError(req.GetTaskID())) } log.RatedInfo(10, "datanode query preimport", zap.String("state", task.GetState().String()), zap.String("reason", task.GetReason())) return &datapb.QueryPreImportResponse{ - Status: status, - TaskID: task.GetTaskID(), - State: task.GetState(), - Reason: task.GetReason(), - FileStats: task.(*importv2.PreImportTask).GetFileStats(), + Status: status, + TaskID: task.GetTaskID(), + State: task.GetState(), + Reason: task.GetReason(), + FileStats: task.(interface { + GetFileStats() []*datapb.ImportFileStats + }).GetFileStats(), }, nil } @@ -502,17 +515,19 @@ func (node *DataNode) QueryImport(ctx context.Context, req *datapb.QueryImportRe // query import task := node.importTaskMgr.Get(req.GetTaskID()) - if task == nil || task.GetType() != importv2.ImportTaskType { - status = merr.Status(importv2.WrapNoTaskError(req.GetTaskID(), importv2.ImportTaskType)) + if task == nil { + status = merr.Status(importv2.WrapTaskNotFoundError(req.GetTaskID())) } log.RatedInfo(10, "datanode query import", zap.String("state", task.GetState().String()), zap.String("reason", task.GetReason())) return &datapb.QueryImportResponse{ - Status: status, - TaskID: task.GetTaskID(), - State: task.GetState(), - Reason: task.GetReason(), - ImportSegmentsInfo: task.(*importv2.ImportTask).GetSegmentsInfo(), + Status: status, + TaskID: task.GetTaskID(), + State: task.GetState(), + Reason: task.GetReason(), + ImportSegmentsInfo: task.(interface { + GetSegmentsInfo() []*datapb.ImportSegmentInfo + }).GetSegmentsInfo(), }, nil } diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index e4a533becda74..80c709090b390 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -775,6 +775,7 @@ message ImportSegmentInfo { int64 imported_rows = 2; repeated FieldBinlog binlogs = 3; repeated FieldBinlog statslogs = 4; + repeated FieldBinlog deltalogs = 5; } message QueryImportResponse { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 32e54df425364..a33098967f2e7 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -6103,6 +6103,7 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) } isBackup := importutilv2.IsBackup(req.GetOptions()) + isL0Import := importutilv2.IsL0Import(req.GetOptions()) hasPartitionKey := typeutil.HasPartitionKey(schema.CollectionSchema) var partitionIDs []int64 @@ -6118,6 +6119,31 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) return resp, nil } partitionIDs = []UniqueID{partitionID} + } else if isL0Import { + if req.GetPartitionName() == "" { + partitionIDs = []UniqueID{common.AllPartitionsID} + } else { + partitionID, err := globalMetaCache.GetPartitionID(ctx, req.GetDbName(), req.GetCollectionName(), req.PartitionName) + if err != nil { + resp.Status = merr.Status(err) + return resp, nil + } + partitionIDs = []UniqueID{partitionID} + } + // Currently, querynodes first load L0 segments and then load L1 segments. + // Therefore, to ensure the deletes from L0 import take effect, + // the collection needs to be in an unloaded state, + // and then all L0 and L1 segments should be loaded at once. + // We will remove this restriction after querynode supported to load L0 segments dynamically. + loaded, err := isCollectionLoaded(ctx, node.queryCoord, collectionID) + if err != nil { + resp.Status = merr.Status(err) + return resp, nil + } + if loaded { + resp.Status = merr.Status(merr.WrapErrImportFailed("for l0 import, collection cannot be loaded, please release it first")) + return resp, nil + } } else { if hasPartitionKey { if req.GetPartitionName() != "" { @@ -6159,7 +6185,7 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) Params.DataCoordCfg.MaxFilesPerImportReq.GetAsInt(), len(req.Files)))) return resp, nil } - if !isBackup { + if !isBackup && !isL0Import { // check file type for _, file := range req.GetFiles() { _, err = importutilv2.GetFileType(file) diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go index 9e91b6bb8760a..791b373458e12 100644 --- a/internal/querycoordv2/balance/utils.go +++ b/internal/querycoordv2/balance/utils.go @@ -72,6 +72,7 @@ func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeou zap.Int64("segmentID", p.Segment.GetID()), zap.Int64("replica", p.Replica.GetID()), zap.String("channel", p.Segment.GetInsertChannel()), + zap.String("level", p.Segment.GetLevel().String()), zap.Int64("from", p.From), zap.Int64("to", p.To)) if task.GetTaskType(t) == task.TaskTypeMove { diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index d1e5b8a5d4edb..f7807886ce128 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -408,6 +408,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen zap.Int64("partitionID", segment.GetPartitionID()), zap.String("shard", segment.GetInsertChannel()), zap.Int64("segmentID", segment.GetSegmentID()), + zap.String("level", segment.GetLevel().String()), zap.Int64("currentNodeID", node.GetNodeID()), ) diff --git a/internal/util/importutilv2/binlog/l0_reader.go b/internal/util/importutilv2/binlog/l0_reader.go new file mode 100644 index 0000000000000..06d3207361b82 --- /dev/null +++ b/internal/util/importutilv2/binlog/l0_reader.go @@ -0,0 +1,110 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "context" + "encoding/json" + "fmt" + "io" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus-storage/go/common/log" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +type L0Reader interface { + Read() (*storage.DeleteData, error) +} + +type l0Reader struct { + ctx context.Context + cm storage.ChunkManager + pkField *schemapb.FieldSchema + + bufferSize int + deltaLogs []string + readIdx int +} + +func NewL0Reader(ctx context.Context, + cm storage.ChunkManager, + pkField *schemapb.FieldSchema, + importFile *internalpb.ImportFile, + bufferSize int, +) (*l0Reader, error) { + r := &l0Reader{ + ctx: ctx, + cm: cm, + pkField: pkField, + bufferSize: bufferSize, + } + if len(importFile.GetPaths()) != 1 { + return nil, merr.WrapErrImportFailed( + fmt.Sprintf("there should be one prefix, but got %s", importFile.GetPaths())) + } + path := importFile.GetPaths()[0] + deltaLogs, _, err := storage.ListAllChunkWithPrefix(context.Background(), r.cm, path, true) + if err != nil { + return nil, err + } + if len(deltaLogs) == 0 { + log.Info("no delta logs for l0 segments", zap.String("prefix", path)) + } + r.deltaLogs = deltaLogs + return r, nil +} + +func (r *l0Reader) Read() (*storage.DeleteData, error) { + deleteData := storage.NewDeleteData(nil, nil) + for { + if r.readIdx == len(r.deltaLogs) { + if deleteData.RowCount != 0 { + return deleteData, nil + } + return nil, io.EOF + } + path := r.deltaLogs[r.readIdx] + br, err := newBinlogReader(r.ctx, r.cm, path) + if err != nil { + return nil, err + } + rowsSet, err := readData(br, storage.DeleteEventType) + if err != nil { + return nil, err + } + for _, rows := range rowsSet { + for _, row := range rows.([]string) { + dl := &storage.DeleteLog{} + err = json.Unmarshal([]byte(row), dl) + if err != nil { + return nil, err + } + deleteData.Append(dl.Pk, dl.Ts) + } + } + r.readIdx++ + if deleteData.Size() >= int64(r.bufferSize) { + break + } + } + return deleteData, nil +} diff --git a/internal/util/importutilv2/binlog/l0_reader_test.go b/internal/util/importutilv2/binlog/l0_reader_test.go new file mode 100644 index 0000000000000..dbbd63dd4fa34 --- /dev/null +++ b/internal/util/importutilv2/binlog/l0_reader_test.go @@ -0,0 +1,95 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "context" + "fmt" + "io" + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/storage" +) + +func TestL0Reader_NewL0Reader(t *testing.T) { + ctx := context.Background() + + t.Run("normal", func(t *testing.T) { + cm := mocks.NewChunkManager(t) + cm.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + r, err := NewL0Reader(ctx, cm, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix"}}, 100) + assert.NoError(t, err) + assert.NotNil(t, r) + }) + + t.Run("invalid path", func(t *testing.T) { + r, err := NewL0Reader(ctx, nil, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix", "mock-prefix2"}}, 100) + assert.Error(t, err) + assert.Nil(t, r) + }) + + t.Run("list failed", func(t *testing.T) { + cm := mocks.NewChunkManager(t) + cm.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock error")) + r, err := NewL0Reader(ctx, cm, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix"}}, 100) + assert.Error(t, err) + assert.Nil(t, r) + }) +} + +func TestL0Reader_Read(t *testing.T) { + ctx := context.Background() + const ( + delCnt = 100 + ) + + deleteData := storage.NewDeleteData(nil, nil) + for i := 0; i < delCnt; i++ { + deleteData.Append(storage.NewVarCharPrimaryKey(fmt.Sprintf("No.%d", i)), uint64(i+1)) + } + deleteCodec := storage.NewDeleteCodec() + blob, err := deleteCodec.Serialize(1, 2, 3, deleteData) + assert.NoError(t, err) + + cm := mocks.NewChunkManager(t) + cm.EXPECT().WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, s string, b bool, walkFunc storage.ChunkObjectWalkFunc) error { + for _, file := range []string{"a/b/c/"} { + walkFunc(&storage.ChunkObjectInfo{FilePath: file}) + } + return nil + }) + cm.EXPECT().Read(mock.Anything, mock.Anything).Return(blob.Value, nil) + + r, err := NewL0Reader(ctx, cm, nil, &internalpb.ImportFile{Paths: []string{"mock-prefix"}}, 100) + assert.NoError(t, err) + + res, err := r.Read() + assert.NoError(t, err) + assert.Equal(t, int64(delCnt), res.RowCount) + assert.Equal(t, deleteData.Size(), res.Size()) + + _, err = r.Read() + assert.Error(t, err) + assert.ErrorIs(t, err, io.EOF) +} diff --git a/internal/util/importutilv2/option.go b/internal/util/importutilv2/option.go index b779e3e444be7..652caeda2a14c 100644 --- a/internal/util/importutilv2/option.go +++ b/internal/util/importutilv2/option.go @@ -34,6 +34,7 @@ const ( EndTs = "end_ts" EndTs2 = "endTs" BackupFlag = "backup" + L0Import = "l0_import" ) type Options []*commonpb.KeyValuePair @@ -76,3 +77,11 @@ func IsBackup(options Options) bool { } return true } + +func IsL0Import(options Options) bool { + isL0Import, err := funcutil.GetAttrByKeyFromRepeatedKV(L0Import, options) + if err != nil || strings.ToLower(isL0Import) != "true" { + return false + } + return true +} diff --git a/tests/integration/import/binlog_test.go b/tests/integration/import/binlog_test.go index 70a23323412a2..d1368e110d83d 100644 --- a/tests/integration/import/binlog_test.go +++ b/tests/integration/import/binlog_test.go @@ -19,6 +19,7 @@ package importv2 import ( "context" "fmt" + "time" "github.com/golang/protobuf/proto" "github.com/samber/lo" @@ -27,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -37,17 +39,11 @@ import ( "github.com/milvus-io/milvus/tests/integration" ) -func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64, *schemapb.IDs) { - ctx, cancel := context.WithCancel(context.Background()) +func (s *BulkInsertSuite) PrepareCollectionA(dim, rowNum, delNum, delBatch int) (int64, int64, *schemapb.IDs) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) defer cancel() c := s.Cluster - const ( - dim = 128 - dbName = "" - rowNum = 50000 - ) - collectionName := "TestBinlogImport_A_" + funcutil.GenRandomStr() schema := integration.ConstructSchema(collectionName, dim, true) @@ -55,10 +51,10 @@ func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64, *schemapb.IDs) { s.NoError(err) createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ - DbName: dbName, - CollectionName: collectionName, - Schema: marshaledSchema, - ShardsNum: common.DefaultShardsNum, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + ConsistencyLevel: commonpb.ConsistencyLevel_Strong, }) s.NoError(merr.CheckRPCCall(createCollectionStatus, err)) @@ -72,10 +68,27 @@ func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64, *schemapb.IDs) { s.NoError(merr.CheckRPCCall(showPartitionsResp, err)) log.Info("ShowPartitions result", zap.Any("showPartitionsResp", showPartitionsResp)) + // create index + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.L2), + }) + s.NoError(merr.CheckRPCCall(createIndexStatus, err)) + + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + + // load + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + CollectionName: collectionName, + }) + s.NoError(merr.CheckRPCCall(loadStatus, err)) + s.WaitForLoad(ctx, collectionName) + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) hashKeys := integration.GenerateHashKeys(rowNum) insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ - DbName: dbName, CollectionName: collectionName, FieldsData: []*schemapb.FieldData{fVecColumn}, HashKeys: hashKeys, @@ -86,7 +99,6 @@ func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64, *schemapb.IDs) { // flush flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ - DbName: dbName, CollectionNames: []string{collectionName}, }) s.NoError(merr.CheckRPCCall(flushResp, err)) @@ -103,26 +115,38 @@ func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64, *schemapb.IDs) { for _, segment := range segments { log.Info("ShowSegments result", zap.String("segment", segment.String())) } - s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) - - // create index - createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ - CollectionName: collectionName, - FieldName: integration.FloatVecField, - IndexName: "_default", - ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.L2), - }) - s.NoError(merr.CheckRPCCall(createIndexStatus, err)) - - s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + s.WaitForFlush(ctx, ids, flushTs, "", collectionName) + + // delete + beginIndex := 0 + for i := 0; i < delBatch; i++ { + delCnt := delNum / delBatch + idBegin := insertedIDs.GetIntId().GetData()[beginIndex] + idEnd := insertedIDs.GetIntId().GetData()[beginIndex+delCnt] + deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + CollectionName: collectionName, + Expr: fmt.Sprintf("%d <= %s < %d", idBegin, integration.Int64Field, idEnd), + }) + s.NoError(merr.CheckRPCCall(deleteResult, err)) + beginIndex += delCnt + + flushResp, err = c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + CollectionNames: []string{collectionName}, + }) + s.NoError(merr.CheckRPCCall(flushResp, err)) + flushTs, has = flushResp.GetCollFlushTs()[collectionName] + s.True(has) + s.WaitForFlush(ctx, nil, flushTs, "", collectionName) + } - // load - loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ - DbName: dbName, - CollectionName: collectionName, + // check l0 segments + segments, err = c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + l0Segments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetLevel() == datapb.SegmentLevel_L0 }) - s.NoError(merr.CheckRPCCall(loadStatus, err)) - s.WaitForLoad(ctx, collectionName) + s.Equal(delBatch, len(l0Segments)) // search expr := fmt.Sprintf("%s > 0", integration.Int64Field) @@ -140,37 +164,69 @@ func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64, *schemapb.IDs) { s.NoError(err) s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) + // query + expr = fmt.Sprintf("%s >= 0", integration.Int64Field) + queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + CollectionName: collectionName, + Expr: expr, + OutputFields: []string{"count(*)"}, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + count := int(queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) + s.Equal(rowNum-delNum, count) + + // query 2 + expr = fmt.Sprintf("%s < %d", integration.Int64Field, insertedIDs.GetIntId().GetData()[10]) + queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + CollectionName: collectionName, + Expr: expr, + OutputFields: []string{}, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + count = len(queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()) + s.Equal(0, count) + // get collectionID and partitionID collectionID := showCollectionsResp.GetCollectionIds()[0] partitionID := showPartitionsResp.GetPartitionIDs()[0] + return collectionID, partitionID, insertedIDs } func (s *BulkInsertSuite) TestBinlogImport() { const ( - startTs = "0" - endTs = "548373346338803234" + dim = 128 + rowNum = 50000 + delNum = 30000 + delBatch = 10 ) - collectionID, partitionID, insertedIDs := s.PrepareCollectionA() + collectionID, partitionID, insertedIDs := s.PrepareCollectionA(dim, rowNum, delNum, delBatch) c := s.Cluster ctx := c.GetContext() - collectionName := "TestBulkInsert_B_" + funcutil.GenRandomStr() + collectionName := "TestBinlogImport_B_" + funcutil.GenRandomStr() schema := integration.ConstructSchema(collectionName, dim, true) marshaledSchema, err := proto.Marshal(schema) s.NoError(err) createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ - DbName: "", CollectionName: collectionName, Schema: marshaledSchema, ShardsNum: common.DefaultShardsNum, }) s.NoError(merr.CheckRPCCall(createCollectionStatus, err)) + describeCollectionResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + CollectionName: collectionName, + }) + s.NoError(merr.CheckRPCCall(describeCollectionResp, err)) + newCollectionID := describeCollectionResp.GetCollectionID() + // create index createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collectionName, @@ -182,18 +238,11 @@ func (s *BulkInsertSuite) TestBinlogImport() { s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) - // load - loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ - CollectionName: collectionName, - }) - s.NoError(merr.CheckRPCCall(loadStatus, err)) - s.WaitForLoad(ctx, collectionName) - + // binlog import files := []*internalpb.ImportFile{ { Paths: []string{ fmt.Sprintf("/tmp/%s/insert_log/%d/%d/", paramtable.Get().EtcdCfg.RootPath.GetValue(), collectionID, partitionID), - fmt.Sprintf("/tmp/%s/delta_log/%d/%d/", paramtable.Get().EtcdCfg.RootPath.GetValue(), collectionID, partitionID), }, }, } @@ -202,8 +251,6 @@ func (s *BulkInsertSuite) TestBinlogImport() { PartitionName: paramtable.Get().CommonCfg.DefaultPartitionName.GetValue(), Files: files, Options: []*commonpb.KeyValuePair{ - {Key: "startTs", Value: startTs}, - {Key: "endTs", Value: endTs}, {Key: "backup", Value: "true"}, }, }) @@ -217,15 +264,67 @@ func (s *BulkInsertSuite) TestBinlogImport() { segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) + segments = lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetCollectionID() == newCollectionID + }) log.Info("Show segments", zap.Any("segments", segments)) + s.Equal(1, len(segments)) + segment := segments[0] + s.Equal(commonpb.SegmentState_Flushed, segment.GetState()) + s.True(len(segment.GetBinlogs()) > 0) + s.NoError(CheckLogID(segment.GetBinlogs())) + s.True(len(segment.GetDeltalogs()) == 0) + s.NoError(CheckLogID(segment.GetDeltalogs())) + s.True(len(segment.GetStatslogs()) > 0) + s.NoError(CheckLogID(segment.GetStatslogs())) + + // l0 import + files = []*internalpb.ImportFile{ + { + Paths: []string{ + fmt.Sprintf("/tmp/%s/delta_log/%d/%d/", paramtable.Get().EtcdCfg.RootPath.GetValue(), collectionID, common.AllPartitionsID), + }, + }, + } + importResp, err = c.Proxy.ImportV2(ctx, &internalpb.ImportRequest{ + CollectionName: collectionName, + Files: files, + Options: []*commonpb.KeyValuePair{ + {Key: "l0_import", Value: "true"}, + }, + }) + s.NoError(merr.CheckRPCCall(importResp, err)) + log.Info("Import result", zap.Any("importResp", importResp)) + + jobID = importResp.GetJobID() + err = WaitForImportDone(ctx, c, jobID) + s.NoError(err) + + segments, err = c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + segments = lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetCollectionID() == newCollectionID + }) + log.Info("Show segments", zap.Any("segments", segments)) + l0Segments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetCollectionID() == newCollectionID && segment.GetLevel() == datapb.SegmentLevel_L0 + }) + s.Equal(1, len(l0Segments)) + segment = l0Segments[0] + s.Equal(commonpb.SegmentState_Flushed, segment.GetState()) + s.Equal(common.AllPartitionsID, segment.GetPartitionID()) + s.True(len(segment.GetBinlogs()) == 0) + s.True(len(segment.GetDeltalogs()) > 0) + s.NoError(CheckLogID(segment.GetDeltalogs())) + s.True(len(segment.GetStatslogs()) == 0) - // load refresh - loadStatus, err = c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + // load + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ CollectionName: collectionName, - Refresh: true, }) s.NoError(merr.CheckRPCCall(loadStatus, err)) - s.WaitForLoadRefresh(ctx, "", collectionName) + s.WaitForLoad(ctx, collectionName) // search expr := fmt.Sprintf("%s > 0", integration.Int64Field) @@ -251,4 +350,28 @@ func (s *BulkInsertSuite) TestBinlogImport() { _, ok := insertedIDsMap[id] s.True(ok) } + + // query + expr = fmt.Sprintf("%s >= 0", integration.Int64Field) + queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + CollectionName: collectionName, + Expr: expr, + OutputFields: []string{"count(*)"}, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + count := int(queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) + s.Equal(rowNum-delNum, count) + + // query 2 + expr = fmt.Sprintf("%s < %d", integration.Int64Field, insertedIDs.GetIntId().GetData()[10]) + queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + CollectionName: collectionName, + Expr: expr, + OutputFields: []string{}, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + count = len(queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()) + s.Equal(0, count) } diff --git a/tests/integration/import/import_test.go b/tests/integration/import/import_test.go index bb60ddc4198cf..f86cade2be36f 100644 --- a/tests/integration/import/import_test.go +++ b/tests/integration/import/import_test.go @@ -31,7 +31,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/common" @@ -159,20 +158,12 @@ func (s *BulkInsertSuite) run() { segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) - checkLogID := func(fieldBinlogs []*datapb.FieldBinlog) { - for _, fieldBinlog := range fieldBinlogs { - for _, l := range fieldBinlog.GetBinlogs() { - s.NotEqual(int64(0), l.GetLogID()) - } - } - } for _, segment := range segments { s.True(len(segment.GetBinlogs()) > 0) - checkLogID(segment.GetBinlogs()) + s.NoError(CheckLogID(segment.GetBinlogs())) s.True(len(segment.GetDeltalogs()) == 0) - checkLogID(segment.GetDeltalogs()) s.True(len(segment.GetStatslogs()) > 0) - checkLogID(segment.GetStatslogs()) + s.NoError(CheckLogID(segment.GetStatslogs())) } // create index diff --git a/tests/integration/import/util_test.go b/tests/integration/import/util_test.go index 6987ffc355253..946cecd4ab642 100644 --- a/tests/integration/import/util_test.go +++ b/tests/integration/import/util_test.go @@ -33,6 +33,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" pq "github.com/milvus-io/milvus/internal/util/importutilv2/parquet" @@ -44,6 +45,17 @@ import ( const dim = 128 +func CheckLogID(fieldBinlogs []*datapb.FieldBinlog) error { + for _, fieldBinlog := range fieldBinlogs { + for _, l := range fieldBinlog.GetBinlogs() { + if l.GetLogID() == 0 { + return fmt.Errorf("unexpected log id 0") + } + } + } + return nil +} + func GenerateParquetFile(filePath string, schema *schemapb.CollectionSchema, numRows int) error { _, err := GenerateParquetFileAndReturnInsertData(filePath, schema, numRows) return err