Skip to content

Commit

Permalink
enhance: Support L0 import (milvus-io#33514)
Browse files Browse the repository at this point in the history
issue: milvus-io#33157

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Jun 7, 2024
1 parent 60baaed commit 3540eee
Show file tree
Hide file tree
Showing 30 changed files with 1,425 additions and 389 deletions.
3 changes: 2 additions & 1 deletion internal/datacoord/import_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/datacoord/broker"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down Expand Up @@ -253,7 +254,7 @@ func (c *importChecker) checkImportingJob(job ImportJob) {

// Verify completion of index building for imported segments.
unindexed := c.meta.indexMeta.GetUnindexedSegments(job.GetCollectionID(), segmentIDs)
if Params.DataCoordCfg.WaitForIndex.GetAsBool() && len(unindexed) > 0 {
if Params.DataCoordCfg.WaitForIndex.GetAsBool() && len(unindexed) > 0 && !importutilv2.IsL0Import(job.GetOptions()) {
log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed))
return
}
Expand Down
6 changes: 3 additions & 3 deletions internal/datacoord/import_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,13 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
for _, info := range resp.GetImportSegmentsInfo() {
// try to parse path and fill logID
err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetStatslogs())
err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetDeltalogs(), info.GetStatslogs())
if err != nil {
log.Warn("fail to CompressFieldBinlogs for import binlogs",
log.Warn("fail to CompressBinLogs for import binlogs",
WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...)
return
}
op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), nil)
op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs())
op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
err = s.meta.UpdateSegmentsInfo(op1, op2)
if err != nil {
Expand Down
21 changes: 18 additions & 3 deletions internal/datacoord/import_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -96,7 +97,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
FileStats: group,
},
}
segments, err := AssignSegments(task, manager)
segments, err := AssignSegments(job, task, manager)
if err != nil {
return nil, err
}
Expand All @@ -106,7 +107,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
return tasks, nil
}

func AssignSegments(task ImportTask, manager Manager) ([]int64, error) {
func AssignSegments(job ImportJob, task ImportTask, manager Manager) ([]int64, error) {
// merge hashed sizes
hashedDataSize := make(map[string]map[int64]int64) // vchannel->(partitionID->size)
for _, fileStats := range task.GetFileStats() {
Expand All @@ -120,15 +121,24 @@ func AssignSegments(task ImportTask, manager Manager) ([]int64, error) {
}
}

isL0Import := importutilv2.IsL0Import(job.GetOptions())

segmentMaxSize := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024
if isL0Import {
segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64()
}
segmentLevel := datapb.SegmentLevel_L1
if isL0Import {
segmentLevel = datapb.SegmentLevel_L0
}

// alloc new segments
segments := make([]int64, 0)
addSegment := func(vchannel string, partitionID int64, size int64) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for size > 0 {
segmentInfo, err := manager.AllocImportSegment(ctx, task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel)
segmentInfo, err := manager.AllocImportSegment(ctx, task.GetTaskID(), task.GetCollectionID(), partitionID, vchannel, segmentLevel)
if err != nil {
return err
}
Expand Down Expand Up @@ -215,7 +225,12 @@ func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats) [][]*dat
return nil
}

isL0Import := importutilv2.IsL0Import(job.GetOptions())
segmentMaxSize := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt() * 1024 * 1024
if isL0Import {
segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt()
}

threshold := paramtable.Get().DataCoordCfg.MaxSizeInMBPerImportTask.GetAsInt() * 1024 * 1024
maxSizePerFileGroup := segmentMaxSize * len(job.GetPartitionIDs()) * len(job.GetVchannels())
if maxSizePerFileGroup > threshold {
Expand Down
5 changes: 3 additions & 2 deletions internal/datacoord/import_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,16 @@ func TestImportUtil_NewImportTasks(t *testing.T) {
return id, id + n, nil
})
manager := NewMockManager(t)
manager.EXPECT().AllocImportSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, taskID int64, collectionID int64, partitionID int64, vchannel string) (*SegmentInfo, error) {
manager.EXPECT().AllocImportSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, taskID int64, collectionID int64, partitionID int64, vchannel string, level datapb.SegmentLevel) (*SegmentInfo, error) {
return &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: rand.Int63(),
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: vchannel,
IsImporting: true,
Level: level,
},
}, nil
})
Expand Down
139 changes: 41 additions & 98 deletions internal/datacoord/mock_segment_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions internal/datacoord/segment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Manager interface {

// AllocSegment allocates rows and record the allocation.
AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error)
AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, partitionID UniqueID, channelName string) (*SegmentInfo, error)
AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, partitionID UniqueID, channelName string, level datapb.SegmentLevel) (*SegmentInfo, error)
// DropSegment drops the segment from manager.
DropSegment(ctx context.Context, segmentID UniqueID)
// FlushImportSegments set importing segment state to Flushed.
Expand Down Expand Up @@ -350,7 +350,7 @@ func (s *SegmentManager) genExpireTs(ctx context.Context) (Timestamp, error) {
}

func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID,
partitionID UniqueID, channelName string,
partitionID UniqueID, channelName string, level datapb.SegmentLevel,
) (*SegmentInfo, error) {
log := log.Ctx(ctx)
ctx, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "open-Segment")
Expand Down Expand Up @@ -378,7 +378,7 @@ func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, c
NumOfRows: 0,
State: commonpb.SegmentState_Importing,
MaxRowNum: 0,
Level: datapb.SegmentLevel_L1,
Level: level,
LastExpireTime: math.MaxUint64,
StartPosition: position,
DmlPosition: position,
Expand All @@ -394,9 +394,10 @@ func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, c
s.segments = append(s.segments, id)
log.Info("add import segment done",
zap.Int64("taskID", taskID),
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.ID),
zap.String("Channel", segmentInfo.InsertChannel))
zap.Int64("collectionID", segmentInfo.CollectionID),
zap.Int64("segmentID", segmentInfo.ID),
zap.String("channel", segmentInfo.InsertChannel),
zap.String("level", level.String()))

return segment, nil
}
Expand Down
Loading

0 comments on commit 3540eee

Please sign in to comment.