Skip to content

Commit

Permalink
enhance: Optimize import scheduling and add time cost metric (milvus-…
Browse files Browse the repository at this point in the history
…io#36601)

1. Optimize import scheduling strategic:
a. Revise slot weights, calculating them based on the number of files
and segments for both import and pre-import tasks.
b. Ensure that the DN executes tasks in ascending order of task ID.
2. Add time cost metric and log.

issue: milvus-io#36600,
milvus-io#36518

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Oct 9, 2024
1 parent 2c3a8b7 commit 0fc2a4a
Show file tree
Hide file tree
Showing 20 changed files with 235 additions and 49 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ dataNode:
maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode.
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import.
maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task.
compaction:
levelZeroBatchMemoryRatio: 0.05 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.
Expand Down
77 changes: 50 additions & 27 deletions internal/datacoord/import_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *importChecker) getLackFilesForImports(job ImportJob) []*datapb.ImportFi
}

func (c *importChecker) checkPendingJob(job ImportJob) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
log := log.With(zap.Int64("jobID", job.GetJobID()))
lacks := c.getLackFilesForPreImports(job)
if len(lacks) == 0 {
return
Expand All @@ -192,7 +192,7 @@ func (c *importChecker) checkPendingJob(job ImportJob) {

newTasks, err := NewPreImportTasks(fileGroups, job, c.alloc)
if err != nil {
logger.Warn("new preimport tasks failed", zap.Error(err))
log.Warn("new preimport tasks failed", zap.Error(err))
return
}
for _, t := range newTasks {
Expand All @@ -203,25 +203,30 @@ func (c *importChecker) checkPendingJob(job ImportJob) {
}
log.Info("add new preimport task", WrapTaskLog(t)...)
}

err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
if err != nil {
logger.Warn("failed to update job state to PreImporting", zap.Error(err))
log.Warn("failed to update job state to PreImporting", zap.Error(err))
return
}
pendingDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds()))
log.Info("import job start to execute", zap.Duration("jobTimeCost/pending", pendingDuration))
}

func (c *importChecker) checkPreImportingJob(job ImportJob) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
log := log.With(zap.Int64("jobID", job.GetJobID()))
lacks := c.getLackFilesForImports(job)
if len(lacks) == 0 {
return
}

requestSize, err := CheckDiskQuota(job, c.meta, c.imeta)
if err != nil {
logger.Warn("import failed, disk quota exceeded", zap.Error(err))
log.Warn("import failed, disk quota exceeded", zap.Error(err))
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
if err != nil {
logger.Warn("failed to update job state to Failed", zap.Error(err))
log.Warn("failed to update job state to Failed", zap.Error(err))
}
return
}
Expand All @@ -230,7 +235,7 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
groups := RegroupImportFiles(job, lacks, allDiskIndex)
newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta)
if err != nil {
logger.Warn("new import tasks failed", zap.Error(err))
log.Warn("new import tasks failed", zap.Error(err))
return
}
for _, t := range newTasks {
Expand All @@ -241,13 +246,19 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
}
log.Info("add new import task", WrapTaskLog(t)...)
}

err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing), UpdateRequestedDiskSize(requestSize))
if err != nil {
logger.Warn("failed to update job state to Importing", zap.Error(err))
log.Warn("failed to update job state to Importing", zap.Error(err))
return
}
preImportDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStagePreImport).Observe(float64(preImportDuration.Milliseconds()))
log.Info("import job preimport done", zap.Duration("jobTimeCost/preimport", preImportDuration))
}

func (c *importChecker) checkImportingJob(job ImportJob) {
log := log.With(zap.Int64("jobID", job.GetJobID()))
tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID()))
for _, t := range tasks {
if t.GetState() != datapb.ImportTaskStateV2_Completed {
Expand All @@ -259,18 +270,22 @@ func (c *importChecker) checkImportingJob(job ImportJob) {
log.Warn("failed to update job state to Stats", zap.Error(err))
return
}
log.Info("update import job state to Stats", zap.Int64("jobID", job.GetJobID()))
importDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageImport).Observe(float64(importDuration.Milliseconds()))
log.Info("import job import done", zap.Duration("jobTimeCost/import", importDuration))
}

func (c *importChecker) checkStatsJob(job ImportJob) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
log := log.With(zap.Int64("jobID", job.GetJobID()))
updateJobState := func(state internalpb.ImportJobState) {
err := c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(state))
if err != nil {
logger.Warn("failed to update job state", zap.Error(err))
log.Warn("failed to update job state", zap.Error(err))
return
}
logger.Info("update import job state", zap.String("state", state.String()))
statsDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageStats).Observe(float64(statsDuration.Milliseconds()))
log.Info("import job stats done", zap.Duration("jobTimeCost/stats", statsDuration))
}

// Skip stats stage if not enable stats or is l0 import.
Expand All @@ -290,18 +305,20 @@ func (c *importChecker) checkStatsJob(job ImportJob) {
statsSegmentIDs := task.(*importTask).GetStatsSegmentIDs()
taskCnt += len(originSegmentIDs)
for i, originSegmentID := range originSegmentIDs {
taskLogFields := WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))
state := c.sjm.GetStatsTaskState(originSegmentID, indexpb.StatsSubJob_Sort)
switch state {
case indexpb.JobState_JobStateNone:
err := c.sjm.SubmitStatsTask(originSegmentID, statsSegmentIDs[i], indexpb.StatsSubJob_Sort, false)
if err != nil {
logger.Warn("submit stats task failed", zap.Error(err))
log.Warn("submit stats task failed", zap.Error(err))
continue
}
log.Info("submit stats task done", WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))...)
log.Info("submit stats task done", taskLogFields...)
case indexpb.JobState_JobStateInit, indexpb.JobState_JobStateRetry, indexpb.JobState_JobStateInProgress:
logger.Debug("waiting for stats task...", WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))...)
log.Debug("waiting for stats task...", taskLogFields...)
case indexpb.JobState_JobStateFailed:
log.Warn("import job stats failed", taskLogFields...)
updateJobState(internalpb.ImportJobState_Failed)
return
case indexpb.JobState_JobStateFinished:
Expand All @@ -317,7 +334,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) {
}

func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
log := log.With(zap.Int64("jobID", job.GetJobID()))
tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID()))
originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetSegmentIDs()
Expand All @@ -339,36 +356,40 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
default:
}
}
logger.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed))
log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed))
return
}

buildIndexDuration := job.GetTR().RecordSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageBuildIndex).Observe(float64(buildIndexDuration.Milliseconds()))
log.Info("import job build index done", zap.Duration("jobTimeCost/buildIndex", buildIndexDuration))

// Here, all segment indexes have been successfully built, try unset isImporting flag for all segments.
isImportingSegments := lo.Filter(append(originSegmentIDs, statsSegmentIDs...), func(segmentID int64, _ int) bool {
segment := c.meta.GetSegment(segmentID)
if segment == nil {
logger.Warn("cannot find segment", zap.Int64("segmentID", segmentID))
log.Warn("cannot find segment", zap.Int64("segmentID", segmentID))
return false
}
return segment.GetIsImporting()
})
channels, err := c.meta.GetSegmentsChannels(isImportingSegments)
if err != nil {
logger.Warn("get segments channels failed", zap.Error(err))
log.Warn("get segments channels failed", zap.Error(err))
return
}
for _, segmentID := range isImportingSegments {
channelCP := c.meta.GetChannelCheckpoint(channels[segmentID])
if channelCP == nil {
logger.Warn("nil channel checkpoint")
log.Warn("nil channel checkpoint")
return
}
op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}})
op2 := UpdateDmlPosition(segmentID, channelCP)
op3 := UpdateIsImporting(segmentID, false)
err = c.meta.UpdateSegmentsInfo(op1, op2, op3)
if err != nil {
logger.Warn("update import segment failed", zap.Error(err))
log.Warn("update import segment failed", zap.Error(err))
return
}
}
Expand All @@ -377,10 +398,12 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime))
if err != nil {
logger.Warn("failed to update job state to Completed", zap.Error(err))
log.Warn("failed to update job state to Completed", zap.Error(err))
return
}
logger.Info("import job completed")
totalDuration := job.GetTR().ElapseSpan()
metrics.ImportJobLatency.WithLabelValues(metrics.TotalLabel).Observe(float64(totalDuration.Milliseconds()))
log.Info("import job all completed", zap.Duration("jobTimeCost/total", totalDuration))
}

func (c *importChecker) checkFailedJob(job ImportJob) {
Expand Down Expand Up @@ -462,9 +485,9 @@ func (c *importChecker) checkGC(job ImportJob) {
}
cleanupTime := tsoutil.PhysicalTime(job.GetCleanupTs())
if time.Now().After(cleanupTime) {
logger := log.With(zap.Int64("jobID", job.GetJobID()))
log := log.With(zap.Int64("jobID", job.GetJobID()))
GCRetention := Params.DataCoordCfg.ImportTaskRetention.GetAsDuration(time.Second)
logger.Info("job has reached the GC retention",
log.Info("job has reached the GC retention",
zap.Time("cleanupTime", cleanupTime), zap.Duration("GCRetention", GCRetention))
tasks := c.imeta.GetTaskBy(WithJob(job.GetJobID()))
shouldRemoveJob := true
Expand Down Expand Up @@ -492,9 +515,9 @@ func (c *importChecker) checkGC(job ImportJob) {
}
err := c.imeta.RemoveJob(job.GetJobID())
if err != nil {
logger.Warn("remove import job failed", zap.Error(err))
log.Warn("remove import job failed", zap.Error(err))
return
}
logger.Info("import job removed")
log.Info("import job removed")
}
}
8 changes: 8 additions & 0 deletions internal/datacoord/import_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

Expand Down Expand Up @@ -101,6 +102,7 @@ func (s *ImportCheckerSuite) SetupTest() {
},
},
},
tr: timerecord.NewTimeRecorder("import job"),
}

catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil)
Expand All @@ -120,6 +122,7 @@ func (s *ImportCheckerSuite) TestLogStats() {
TaskID: 1,
State: datapb.ImportTaskStateV2_Failed,
},
tr: timerecord.NewTimeRecorder("preimport task"),
}
err := s.imeta.AddTask(pit1)
s.NoError(err)
Expand All @@ -131,6 +134,7 @@ func (s *ImportCheckerSuite) TestLogStats() {
SegmentIDs: []int64{10, 11, 12},
State: datapb.ImportTaskStateV2_Pending,
},
tr: timerecord.NewTimeRecorder("import task"),
}
err = s.imeta.AddTask(it1)
s.NoError(err)
Expand Down Expand Up @@ -310,6 +314,7 @@ func (s *ImportCheckerSuite) TestCheckTimeout() {
TaskID: 1,
State: datapb.ImportTaskStateV2_InProgress,
},
tr: timerecord.NewTimeRecorder("preimport task"),
}
err := s.imeta.AddTask(task)
s.NoError(err)
Expand All @@ -332,6 +337,7 @@ func (s *ImportCheckerSuite) TestCheckFailure() {
SegmentIDs: []int64{2},
StatsSegmentIDs: []int64{3},
},
tr: timerecord.NewTimeRecorder("import task"),
}
err := s.imeta.AddTask(it)
s.NoError(err)
Expand Down Expand Up @@ -371,6 +377,7 @@ func (s *ImportCheckerSuite) TestCheckGC() {
SegmentIDs: []int64{2},
StatsSegmentIDs: []int64{3},
},
tr: timerecord.NewTimeRecorder("import task"),
}
err := s.imeta.AddTask(task)
s.NoError(err)
Expand Down Expand Up @@ -447,6 +454,7 @@ func (s *ImportCheckerSuite) TestCheckCollection() {
TaskID: 1,
State: datapb.ImportTaskStateV2_Pending,
},
tr: timerecord.NewTimeRecorder("preimport task"),
}
err := s.imeta.AddTask(task)
s.NoError(err)
Expand Down
9 changes: 9 additions & 0 deletions internal/datacoord/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

Expand Down Expand Up @@ -91,15 +92,23 @@ type ImportJob interface {
GetCompleteTime() string
GetFiles() []*internalpb.ImportFile
GetOptions() []*commonpb.KeyValuePair
GetTR() *timerecord.TimeRecorder
Clone() ImportJob
}

type importJob struct {
*datapb.ImportJob

tr *timerecord.TimeRecorder
}

func (j *importJob) GetTR() *timerecord.TimeRecorder {
return j.tr
}

func (j *importJob) Clone() ImportJob {
return &importJob{
ImportJob: proto.Clone(j.ImportJob).(*datapb.ImportJob),
tr: j.tr,
}
}
4 changes: 4 additions & 0 deletions internal/datacoord/import_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)

type ImportMeta interface {
Expand Down Expand Up @@ -61,18 +62,21 @@ func NewImportMeta(catalog metastore.DataCoordCatalog) (ImportMeta, error) {
for _, task := range restoredPreImportTasks {
tasks[task.GetTaskID()] = &preImportTask{
PreImportTask: task,
tr: timerecord.NewTimeRecorder("preimport task"),
}
}
for _, task := range restoredImportTasks {
tasks[task.GetTaskID()] = &importTask{
ImportTaskV2: task,
tr: timerecord.NewTimeRecorder("import task"),
}
}

jobs := make(map[int64]ImportJob)
for _, job := range restoredJobs {
jobs[job.GetJobID()] = &importJob{
ImportJob: job,
tr: timerecord.NewTimeRecorder("import job"),
}
}

Expand Down
Loading

0 comments on commit 0fc2a4a

Please sign in to comment.