diff --git a/configs/milvus.yaml b/configs/milvus.yaml index b7a2e6f55fd35..421e8c1976c04 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -650,6 +650,7 @@ dataNode: maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode. maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files. readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import. + maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task. compaction: levelZeroBatchMemoryRatio: 0.05 # The minimal memory ratio of free memory for level zero compaction executing in batch mode levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 73dd166616657..de278f9c2c8d5 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -183,7 +183,7 @@ func (c *importChecker) getLackFilesForImports(job ImportJob) []*datapb.ImportFi } func (c *importChecker) checkPendingJob(job ImportJob) { - logger := log.With(zap.Int64("jobID", job.GetJobID())) + log := log.With(zap.Int64("jobID", job.GetJobID())) lacks := c.getLackFilesForPreImports(job) if len(lacks) == 0 { return @@ -192,7 +192,7 @@ func (c *importChecker) checkPendingJob(job ImportJob) { newTasks, err := NewPreImportTasks(fileGroups, job, c.alloc) if err != nil { - logger.Warn("new preimport tasks failed", zap.Error(err)) + log.Warn("new preimport tasks failed", zap.Error(err)) return } for _, t := range newTasks { @@ -203,14 +203,19 @@ func (c *importChecker) checkPendingJob(job ImportJob) { } log.Info("add new preimport task", WrapTaskLog(t)...) } + err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) if err != nil { - logger.Warn("failed to update job state to PreImporting", zap.Error(err)) + log.Warn("failed to update job state to PreImporting", zap.Error(err)) + return } + pendingDuration := job.GetTR().RecordSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds())) + log.Info("import job start to execute", zap.Duration("jobTimeCost/pending", pendingDuration)) } func (c *importChecker) checkPreImportingJob(job ImportJob) { - logger := log.With(zap.Int64("jobID", job.GetJobID())) + log := log.With(zap.Int64("jobID", job.GetJobID())) lacks := c.getLackFilesForImports(job) if len(lacks) == 0 { return @@ -218,10 +223,10 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { requestSize, err := CheckDiskQuota(job, c.meta, c.imeta) if err != nil { - logger.Warn("import failed, disk quota exceeded", zap.Error(err)) + log.Warn("import failed, disk quota exceeded", zap.Error(err)) err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) if err != nil { - logger.Warn("failed to update job state to Failed", zap.Error(err)) + log.Warn("failed to update job state to Failed", zap.Error(err)) } return } @@ -230,7 +235,7 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { groups := RegroupImportFiles(job, lacks, allDiskIndex) newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta) if err != nil { - logger.Warn("new import tasks failed", zap.Error(err)) + log.Warn("new import tasks failed", zap.Error(err)) return } for _, t := range newTasks { @@ -241,13 +246,19 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { } log.Info("add new import task", WrapTaskLog(t)...) } + err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing), UpdateRequestedDiskSize(requestSize)) if err != nil { - logger.Warn("failed to update job state to Importing", zap.Error(err)) + log.Warn("failed to update job state to Importing", zap.Error(err)) + return } + preImportDuration := job.GetTR().RecordSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.ImportStagePreImport).Observe(float64(preImportDuration.Milliseconds())) + log.Info("import job preimport done", zap.Duration("jobTimeCost/preimport", preImportDuration)) } func (c *importChecker) checkImportingJob(job ImportJob) { + log := log.With(zap.Int64("jobID", job.GetJobID())) tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID())) for _, t := range tasks { if t.GetState() != datapb.ImportTaskStateV2_Completed { @@ -259,18 +270,22 @@ func (c *importChecker) checkImportingJob(job ImportJob) { log.Warn("failed to update job state to Stats", zap.Error(err)) return } - log.Info("update import job state to Stats", zap.Int64("jobID", job.GetJobID())) + importDuration := job.GetTR().RecordSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageImport).Observe(float64(importDuration.Milliseconds())) + log.Info("import job import done", zap.Duration("jobTimeCost/import", importDuration)) } func (c *importChecker) checkStatsJob(job ImportJob) { - logger := log.With(zap.Int64("jobID", job.GetJobID())) + log := log.With(zap.Int64("jobID", job.GetJobID())) updateJobState := func(state internalpb.ImportJobState) { err := c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(state)) if err != nil { - logger.Warn("failed to update job state", zap.Error(err)) + log.Warn("failed to update job state", zap.Error(err)) return } - logger.Info("update import job state", zap.String("state", state.String())) + statsDuration := job.GetTR().RecordSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageStats).Observe(float64(statsDuration.Milliseconds())) + log.Info("import job stats done", zap.Duration("jobTimeCost/stats", statsDuration)) } // Skip stats stage if not enable stats or is l0 import. @@ -290,18 +305,20 @@ func (c *importChecker) checkStatsJob(job ImportJob) { statsSegmentIDs := task.(*importTask).GetStatsSegmentIDs() taskCnt += len(originSegmentIDs) for i, originSegmentID := range originSegmentIDs { + taskLogFields := WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i])) state := c.sjm.GetStatsTaskState(originSegmentID, indexpb.StatsSubJob_Sort) switch state { case indexpb.JobState_JobStateNone: err := c.sjm.SubmitStatsTask(originSegmentID, statsSegmentIDs[i], indexpb.StatsSubJob_Sort, false) if err != nil { - logger.Warn("submit stats task failed", zap.Error(err)) + log.Warn("submit stats task failed", zap.Error(err)) continue } - log.Info("submit stats task done", WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))...) + log.Info("submit stats task done", taskLogFields...) case indexpb.JobState_JobStateInit, indexpb.JobState_JobStateRetry, indexpb.JobState_JobStateInProgress: - logger.Debug("waiting for stats task...", WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))...) + log.Debug("waiting for stats task...", taskLogFields...) case indexpb.JobState_JobStateFailed: + log.Warn("import job stats failed", taskLogFields...) updateJobState(internalpb.ImportJobState_Failed) return case indexpb.JobState_JobStateFinished: @@ -317,7 +334,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) { } func (c *importChecker) checkIndexBuildingJob(job ImportJob) { - logger := log.With(zap.Int64("jobID", job.GetJobID())) + log := log.With(zap.Int64("jobID", job.GetJobID())) tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID())) originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { return t.(*importTask).GetSegmentIDs() @@ -339,28 +356,32 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) { default: } } - logger.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed)) + log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed)) return } + buildIndexDuration := job.GetTR().RecordSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageBuildIndex).Observe(float64(buildIndexDuration.Milliseconds())) + log.Info("import job build index done", zap.Duration("jobTimeCost/buildIndex", buildIndexDuration)) + // Here, all segment indexes have been successfully built, try unset isImporting flag for all segments. isImportingSegments := lo.Filter(append(originSegmentIDs, statsSegmentIDs...), func(segmentID int64, _ int) bool { segment := c.meta.GetSegment(segmentID) if segment == nil { - logger.Warn("cannot find segment", zap.Int64("segmentID", segmentID)) + log.Warn("cannot find segment", zap.Int64("segmentID", segmentID)) return false } return segment.GetIsImporting() }) channels, err := c.meta.GetSegmentsChannels(isImportingSegments) if err != nil { - logger.Warn("get segments channels failed", zap.Error(err)) + log.Warn("get segments channels failed", zap.Error(err)) return } for _, segmentID := range isImportingSegments { channelCP := c.meta.GetChannelCheckpoint(channels[segmentID]) if channelCP == nil { - logger.Warn("nil channel checkpoint") + log.Warn("nil channel checkpoint") return } op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}}) @@ -368,7 +389,7 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) { op3 := UpdateIsImporting(segmentID, false) err = c.meta.UpdateSegmentsInfo(op1, op2, op3) if err != nil { - logger.Warn("update import segment failed", zap.Error(err)) + log.Warn("update import segment failed", zap.Error(err)) return } } @@ -377,10 +398,12 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) { completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime)) if err != nil { - logger.Warn("failed to update job state to Completed", zap.Error(err)) + log.Warn("failed to update job state to Completed", zap.Error(err)) return } - logger.Info("import job completed") + totalDuration := job.GetTR().ElapseSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.TotalLabel).Observe(float64(totalDuration.Milliseconds())) + log.Info("import job all completed", zap.Duration("jobTimeCost/total", totalDuration)) } func (c *importChecker) checkFailedJob(job ImportJob) { @@ -462,9 +485,9 @@ func (c *importChecker) checkGC(job ImportJob) { } cleanupTime := tsoutil.PhysicalTime(job.GetCleanupTs()) if time.Now().After(cleanupTime) { - logger := log.With(zap.Int64("jobID", job.GetJobID())) + log := log.With(zap.Int64("jobID", job.GetJobID())) GCRetention := Params.DataCoordCfg.ImportTaskRetention.GetAsDuration(time.Second) - logger.Info("job has reached the GC retention", + log.Info("job has reached the GC retention", zap.Time("cleanupTime", cleanupTime), zap.Duration("GCRetention", GCRetention)) tasks := c.imeta.GetTaskBy(WithJob(job.GetJobID())) shouldRemoveJob := true @@ -492,9 +515,9 @@ func (c *importChecker) checkGC(job ImportJob) { } err := c.imeta.RemoveJob(job.GetJobID()) if err != nil { - logger.Warn("remove import job failed", zap.Error(err)) + log.Warn("remove import job failed", zap.Error(err)) return } - logger.Info("import job removed") + log.Info("import job removed") } } diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index d7978821af127..4f181bbb5adc3 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -101,6 +102,7 @@ func (s *ImportCheckerSuite) SetupTest() { }, }, }, + tr: timerecord.NewTimeRecorder("import job"), } catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil) @@ -120,6 +122,7 @@ func (s *ImportCheckerSuite) TestLogStats() { TaskID: 1, State: datapb.ImportTaskStateV2_Failed, }, + tr: timerecord.NewTimeRecorder("preimport task"), } err := s.imeta.AddTask(pit1) s.NoError(err) @@ -131,6 +134,7 @@ func (s *ImportCheckerSuite) TestLogStats() { SegmentIDs: []int64{10, 11, 12}, State: datapb.ImportTaskStateV2_Pending, }, + tr: timerecord.NewTimeRecorder("import task"), } err = s.imeta.AddTask(it1) s.NoError(err) @@ -310,6 +314,7 @@ func (s *ImportCheckerSuite) TestCheckTimeout() { TaskID: 1, State: datapb.ImportTaskStateV2_InProgress, }, + tr: timerecord.NewTimeRecorder("preimport task"), } err := s.imeta.AddTask(task) s.NoError(err) @@ -332,6 +337,7 @@ func (s *ImportCheckerSuite) TestCheckFailure() { SegmentIDs: []int64{2}, StatsSegmentIDs: []int64{3}, }, + tr: timerecord.NewTimeRecorder("import task"), } err := s.imeta.AddTask(it) s.NoError(err) @@ -371,6 +377,7 @@ func (s *ImportCheckerSuite) TestCheckGC() { SegmentIDs: []int64{2}, StatsSegmentIDs: []int64{3}, }, + tr: timerecord.NewTimeRecorder("import task"), } err := s.imeta.AddTask(task) s.NoError(err) @@ -447,6 +454,7 @@ func (s *ImportCheckerSuite) TestCheckCollection() { TaskID: 1, State: datapb.ImportTaskStateV2_Pending, }, + tr: timerecord.NewTimeRecorder("preimport task"), } err := s.imeta.AddTask(task) s.NoError(err) diff --git a/internal/datacoord/import_job.go b/internal/datacoord/import_job.go index 44048105736dd..39645ec5154be 100644 --- a/internal/datacoord/import_job.go +++ b/internal/datacoord/import_job.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -91,15 +92,23 @@ type ImportJob interface { GetCompleteTime() string GetFiles() []*internalpb.ImportFile GetOptions() []*commonpb.KeyValuePair + GetTR() *timerecord.TimeRecorder Clone() ImportJob } type importJob struct { *datapb.ImportJob + + tr *timerecord.TimeRecorder +} + +func (j *importJob) GetTR() *timerecord.TimeRecorder { + return j.tr } func (j *importJob) Clone() ImportJob { return &importJob{ ImportJob: proto.Clone(j.ImportJob).(*datapb.ImportJob), + tr: j.tr, } } diff --git a/internal/datacoord/import_meta.go b/internal/datacoord/import_meta.go index debf5509e8dcd..0a9b272dc9684 100644 --- a/internal/datacoord/import_meta.go +++ b/internal/datacoord/import_meta.go @@ -19,6 +19,7 @@ package datacoord import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/pkg/util/lock" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) type ImportMeta interface { @@ -61,11 +62,13 @@ func NewImportMeta(catalog metastore.DataCoordCatalog) (ImportMeta, error) { for _, task := range restoredPreImportTasks { tasks[task.GetTaskID()] = &preImportTask{ PreImportTask: task, + tr: timerecord.NewTimeRecorder("preimport task"), } } for _, task := range restoredImportTasks { tasks[task.GetTaskID()] = &importTask{ ImportTaskV2: task, + tr: timerecord.NewTimeRecorder("import task"), } } @@ -73,6 +76,7 @@ func NewImportMeta(catalog metastore.DataCoordCatalog) (ImportMeta, error) { for _, job := range restoredJobs { jobs[job.GetJobID()] = &importJob{ ImportJob: job, + tr: timerecord.NewTimeRecorder("import job"), } } diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index d0f91aa578c38..070bec70c202f 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -91,23 +91,6 @@ func (s *importScheduler) Close() { } func (s *importScheduler) process() { - getNodeID := func(nodeSlots map[int64]int64) int64 { - var ( - nodeID int64 = NullNodeID - maxSlots int64 = -1 - ) - for id, slots := range nodeSlots { - if slots > 0 && slots > maxSlots { - nodeID = id - maxSlots = slots - } - } - if nodeID != NullNodeID { - nodeSlots[nodeID]-- - } - return nodeID - } - jobs := s.imeta.GetJobBy() sort.Slice(jobs, func(i, j int) bool { return jobs[i].GetJobID() < jobs[j].GetJobID() @@ -118,7 +101,7 @@ func (s *importScheduler) process() { for _, task := range tasks { switch task.GetState() { case datapb.ImportTaskStateV2_Pending: - nodeID := getNodeID(nodeSlots) + nodeID := s.getNodeID(task, nodeSlots) switch task.GetType() { case PreImportTaskType: s.processPendingPreImport(task, nodeID) @@ -167,6 +150,25 @@ func (s *importScheduler) peekSlots() map[int64]int64 { return nodeSlots } +func (s *importScheduler) getNodeID(task ImportTask, nodeSlots map[int64]int64) int64 { + var ( + nodeID int64 = NullNodeID + maxSlots int64 = -1 + ) + require := task.GetSlots() + for id, slots := range nodeSlots { + // find the most idle datanode + if slots > 0 && slots >= require && slots > maxSlots { + nodeID = id + maxSlots = slots + } + } + if nodeID != NullNodeID { + nodeSlots[nodeID] -= require + } + return nodeID +} + func (s *importScheduler) processPendingPreImport(task ImportTask, nodeID int64) { if nodeID == NullNodeID { return @@ -186,7 +188,9 @@ func (s *importScheduler) processPendingPreImport(task ImportTask, nodeID int64) log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...) return } - log.Info("process pending preimport task done", WrapTaskLog(task)...) + pendingDuration := task.GetTR().RecordSpan() + metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds())) + log.Info("preimport task start to execute", WrapTaskLog(task, zap.Int64("scheduledNodeID", nodeID), zap.Duration("taskTimeCost/pending", pendingDuration))...) } func (s *importScheduler) processPendingImport(task ImportTask, nodeID int64) { @@ -212,7 +216,9 @@ func (s *importScheduler) processPendingImport(task ImportTask, nodeID int64) { log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...) return } - log.Info("processing pending import task done", WrapTaskLog(task)...) + pendingDuration := task.GetTR().RecordSpan() + metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds())) + log.Info("import task start to execute", WrapTaskLog(task, zap.Int64("scheduledNodeID", nodeID), zap.Duration("taskTimeCost/pending", pendingDuration))...) } func (s *importScheduler) processInProgressPreImport(task ImportTask) { @@ -249,6 +255,11 @@ func (s *importScheduler) processInProgressPreImport(task ImportTask) { } log.Info("query preimport", WrapTaskLog(task, zap.String("state", resp.GetState().String()), zap.Any("fileStats", resp.GetFileStats()))...) + if resp.GetState() == datapb.ImportTaskStateV2_Completed { + preimportDuration := task.GetTR().RecordSpan() + metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePreImport).Observe(float64(preimportDuration.Milliseconds())) + log.Info("preimport done", WrapTaskLog(task, zap.Duration("taskTimeCost/preimport", preimportDuration))...) + } } func (s *importScheduler) processInProgressImport(task ImportTask) { @@ -322,6 +333,9 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...) return } + importDuration := task.GetTR().RecordSpan() + metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStageImport).Observe(float64(importDuration.Milliseconds())) + log.Info("import done", WrapTaskLog(task, zap.Duration("taskTimeCost/import", importDuration))...) } log.Info("query import", WrapTaskLog(task, zap.String("state", resp.GetState().String()), zap.String("reason", resp.GetReason()))...) diff --git a/internal/datacoord/import_scheduler_test.go b/internal/datacoord/import_scheduler_test.go index 753056d6888ff..d853f4de4f2dd 100644 --- a/internal/datacoord/import_scheduler_test.go +++ b/internal/datacoord/import_scheduler_test.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) type ImportSchedulerSuite struct { @@ -87,6 +88,7 @@ func (s *ImportSchedulerSuite) TestProcessPreImport() { CollectionID: s.collectionID, State: datapb.ImportTaskStateV2_Pending, }, + tr: timerecord.NewTimeRecorder("preimport task"), } err := s.imeta.AddTask(task) s.NoError(err) @@ -97,6 +99,7 @@ func (s *ImportSchedulerSuite) TestProcessPreImport() { TimeoutTs: math.MaxUint64, Schema: &schemapb.CollectionSchema{}, }, + tr: timerecord.NewTimeRecorder("import job"), } err = s.imeta.AddJob(job) s.NoError(err) @@ -157,6 +160,7 @@ func (s *ImportSchedulerSuite) TestProcessImport() { }, }, }, + tr: timerecord.NewTimeRecorder("import task"), } err := s.imeta.AddTask(task) s.NoError(err) @@ -169,6 +173,7 @@ func (s *ImportSchedulerSuite) TestProcessImport() { Schema: &schemapb.CollectionSchema{}, TimeoutTs: math.MaxUint64, }, + tr: timerecord.NewTimeRecorder("import job"), } err = s.imeta.AddJob(job) s.NoError(err) @@ -223,6 +228,7 @@ func (s *ImportSchedulerSuite) TestProcessFailed() { StatsSegmentIDs: []int64{4, 5}, State: datapb.ImportTaskStateV2_Failed, }, + tr: timerecord.NewTimeRecorder("import task"), } err := s.imeta.AddTask(task) s.NoError(err) @@ -235,6 +241,7 @@ func (s *ImportSchedulerSuite) TestProcessFailed() { Schema: &schemapb.CollectionSchema{}, TimeoutTs: math.MaxUint64, }, + tr: timerecord.NewTimeRecorder("import job"), } err = s.imeta.AddJob(job) s.NoError(err) diff --git a/internal/datacoord/import_task.go b/internal/datacoord/import_task.go index b4bb782eb2f30..8c4213658dd1f 100644 --- a/internal/datacoord/import_task.go +++ b/internal/datacoord/import_task.go @@ -17,9 +17,12 @@ package datacoord import ( + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) type TaskType int @@ -139,33 +142,62 @@ type ImportTask interface { GetState() datapb.ImportTaskStateV2 GetReason() string GetFileStats() []*datapb.ImportFileStats + GetTR() *timerecord.TimeRecorder + GetSlots() int64 Clone() ImportTask } type preImportTask struct { *datapb.PreImportTask + tr *timerecord.TimeRecorder } func (p *preImportTask) GetType() TaskType { return PreImportTaskType } +func (p *preImportTask) GetTR() *timerecord.TimeRecorder { + return p.tr +} + +func (p *preImportTask) GetSlots() int64 { + return int64(funcutil.Min(len(p.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) +} + func (p *preImportTask) Clone() ImportTask { return &preImportTask{ PreImportTask: proto.Clone(p.PreImportTask).(*datapb.PreImportTask), + tr: p.tr, } } type importTask struct { *datapb.ImportTaskV2 + tr *timerecord.TimeRecorder } func (t *importTask) GetType() TaskType { return ImportTaskType } +func (t *importTask) GetTR() *timerecord.TimeRecorder { + return t.tr +} + +func (t *importTask) GetSlots() int64 { + // Consider the following two scenarios: + // 1. Importing a large number of small files results in + // a small total data size, making file count unsuitable as a slot number. + // 2. Importing a file with many shards number results in many segments and a small total data size, + // making segment count unsuitable as a slot number. + // Taking these factors into account, we've decided to use the + // minimum value between segment count and file count as the slot number. + return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) +} + func (t *importTask) Clone() ImportTask { return &importTask{ ImportTaskV2: proto.Clone(t.ImportTaskV2).(*datapb.ImportTaskV2), + tr: t.tr, } } diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 1506ad8498475..1c6a1c83d5a3c 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -39,6 +39,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) func WrapTaskLog(task ImportTask, fields ...zap.Field) []zap.Field { @@ -47,6 +48,7 @@ func WrapTaskLog(task ImportTask, fields ...zap.Field) []zap.Field { zap.Int64("jobID", task.GetJobID()), zap.Int64("collectionID", task.GetCollectionID()), zap.String("type", task.GetType().String()), + zap.Int64("nodeID", task.GetNodeID()), } res = append(res, fields...) return res @@ -75,6 +77,7 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile, State: datapb.ImportTaskStateV2_Pending, FileStats: fileStats, }, + tr: timerecord.NewTimeRecorder("preimport task"), } tasks = append(tasks, task) } @@ -99,6 +102,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, State: datapb.ImportTaskStateV2_Pending, FileStats: group, }, + tr: timerecord.NewTimeRecorder("import task"), } segments, err := AssignSegments(job, task, alloc, meta) if err != nil { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index b8552b3d423a5..6d8a092c0c761 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -47,6 +47,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -1723,6 +1724,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter return importFile }) + startTime := time.Now() job := &importJob{ ImportJob: &datapb.ImportJob{ JobID: idStart, @@ -1736,8 +1738,9 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter State: internalpb.ImportJobState_Pending, Files: files, Options: in.GetOptions(), - StartTime: time.Now().Format("2006-01-02T15:04:05Z07:00"), + StartTime: startTime.Format("2006-01-02T15:04:05Z07:00"), }, + tr: timerecord.NewTimeRecorder("import job"), } err = s.importMeta.AddJob(job) if err != nil { diff --git a/internal/datanode/importv2/scheduler.go b/internal/datanode/importv2/scheduler.go index d1d58e8df0655..e94b2671c4667 100644 --- a/internal/datanode/importv2/scheduler.go +++ b/internal/datanode/importv2/scheduler.go @@ -17,6 +17,7 @@ package importv2 import ( + "sort" "sync" "time" @@ -64,6 +65,9 @@ func (s *scheduler) Start() { return case <-exeTicker.C: tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].GetTaskID() < tasks[j].GetTaskID() + }) futures := make(map[int64][]*conc.Future[any]) for _, task := range tasks { fs := task.Execute() @@ -86,7 +90,15 @@ func (s *scheduler) Start() { func (s *scheduler) Slots() int64 { tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress)) - return paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64() - int64(len(tasks)) + used := lo.SumBy(tasks, func(t Task) int64 { + return t.GetSlots() + }) + total := paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64() + free := total - used + if free >= 0 { + return free + } + return 0 } func (s *scheduler) Close() { diff --git a/internal/datanode/importv2/task.go b/internal/datanode/importv2/task.go index 023c23c0078d4..0d7c46e6cc474 100644 --- a/internal/datanode/importv2/task.go +++ b/internal/datanode/importv2/task.go @@ -161,6 +161,7 @@ type Task interface { GetState() datapb.ImportTaskStateV2 GetReason() string GetSchema() *schemapb.CollectionSchema + GetSlots() int64 Cancel() Clone() Task } diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index d524f3e514af2..34a255a6a36c3 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -103,6 +104,17 @@ func (t *ImportTask) GetSchema() *schemapb.CollectionSchema { return t.req.GetSchema() } +func (t *ImportTask) GetSlots() int64 { + // Consider the following two scenarios: + // 1. Importing a large number of small files results in + // a small total data size, making file count unsuitable as a slot number. + // 2. Importing a file with many shards number results in many segments and a small total data size, + // making segment count unsuitable as a slot number. + // Taking these factors into account, we've decided to use the + // minimum value between segment count and file count as the slot number. + return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) +} + func (t *ImportTask) Cancel() { t.cancel() } diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index 71f08a6f608a4..b410782a24f87 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -99,6 +99,10 @@ func (t *L0ImportTask) GetSchema() *schemapb.CollectionSchema { return t.req.GetSchema() } +func (t *L0ImportTask) GetSlots() int64 { + return 1 +} + func (t *L0ImportTask) Cancel() { t.cancel() } diff --git a/internal/datanode/importv2/task_l0_preimport.go b/internal/datanode/importv2/task_l0_preimport.go index 65851da007751..93777c8682e6f 100644 --- a/internal/datanode/importv2/task_l0_preimport.go +++ b/internal/datanode/importv2/task_l0_preimport.go @@ -94,6 +94,10 @@ func (t *L0PreImportTask) GetSchema() *schemapb.CollectionSchema { return t.schema } +func (t *L0PreImportTask) GetSlots() int64 { + return 1 +} + func (t *L0PreImportTask) Cancel() { t.cancel() } diff --git a/internal/datanode/importv2/task_preimport.go b/internal/datanode/importv2/task_preimport.go index be5b03afb6166..c507f69ff076f 100644 --- a/internal/datanode/importv2/task_preimport.go +++ b/internal/datanode/importv2/task_preimport.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -101,6 +102,10 @@ func (t *PreImportTask) GetSchema() *schemapb.CollectionSchema { return t.schema } +func (t *PreImportTask) GetSlots() int64 { + return int64(funcutil.Min(len(t.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) +} + func (t *PreImportTask) Cancel() { t.cancel() } diff --git a/pkg/metrics/datacoord_metrics.go b/pkg/metrics/datacoord_metrics.go index f260bd5f02142..493c958f18818 100644 --- a/pkg/metrics/datacoord_metrics.go +++ b/pkg/metrics/datacoord_metrics.go @@ -213,6 +213,28 @@ var ( stageLabelName, }) + ImportJobLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataCoordRole, + Name: "import_job_latency", + Help: "latency of import job", + Buckets: longTaskBuckets, + }, []string{ + importStageLabelName, + }) + + ImportTaskLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataCoordRole, + Name: "import_task_latency", + Help: "latency of import task", + Buckets: longTaskBuckets, + }, []string{ + importStageLabelName, + }) + FlushedSegmentFileNum = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: milvusNamespace, @@ -352,6 +374,8 @@ func RegisterDataCoord(registry *prometheus.Registry) { registry.MustRegister(DataCoordCompactedSegmentSize) registry.MustRegister(DataCoordCompactionTaskNum) registry.MustRegister(DataCoordCompactionLatency) + registry.MustRegister(ImportJobLatency) + registry.MustRegister(ImportTaskLatency) registry.MustRegister(DataCoordSizeStoredL0Segment) registry.MustRegister(DataCoordL0DeleteEntriesNum) registry.MustRegister(FlushedSegmentFileNum) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 99e932e4d77ff..daceb8c550a8b 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -77,6 +77,12 @@ const ( Executing = "executing" Done = "done" + ImportStagePending = "pending" + ImportStagePreImport = "preimport" + ImportStageImport = "import" + ImportStageStats = "stats" + ImportStageBuildIndex = "build_index" + compactionTypeLabelName = "compaction_type" isVectorFieldLabelName = "is_vector_field" segmentPruneLabelName = "segment_prune_label" @@ -105,6 +111,7 @@ const ( cacheStateLabelName = "cache_state" indexCountLabelName = "indexed_field_count" dataSourceLabelName = "data_source" + importStageLabelName = "import_stage" requestScope = "scope" fullMethodLabelName = "full_method" reduceLevelName = "reduce_level" diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index b30bba4c97e17..cc20a841c0e47 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4172,6 +4172,7 @@ type dataNodeConfig struct { MaxConcurrentImportTaskNum ParamItem `refreshable:"true"` MaxImportFileSizeInGB ParamItem `refreshable:"true"` ReadBufferSizeInMB ParamItem `refreshable:"true"` + MaxTaskSlotNum ParamItem `refreshable:"true"` // Compaction L0BatchMemoryRatio ParamItem `refreshable:"true"` @@ -4479,6 +4480,16 @@ if this parameter <= 0, will set it as 10`, } p.ReadBufferSizeInMB.Init(base.mgr) + p.MaxTaskSlotNum = ParamItem{ + Key: "dataNode.import.maxTaskSlotNum", + Version: "2.4.13", + Doc: "The maximum number of slots occupied by each import/pre-import task.", + DefaultValue: "16", + PanicIfEmpty: false, + Export: true, + } + p.MaxTaskSlotNum.Init(base.mgr) + p.L0BatchMemoryRatio = ParamItem{ Key: "dataNode.compaction.levelZeroBatchMemoryRatio", Version: "2.4.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index f81de8d8595fe..995ffda7acab1 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -561,6 +561,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 16, maxConcurrentImportTaskNum) assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64()) assert.Equal(t, 16, Params.ReadBufferSizeInMB.GetAsInt()) + assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt()) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 16, Params.SlotCap.GetAsInt())