Skip to content

Commit

Permalink
enhance: [cherry-pick] Add monitoring metrics for task execution time…
Browse files Browse the repository at this point in the history
… in datacoord (#35141)

issue: #35138 

master pr: #35139

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Aug 2, 2024
1 parent 3a997e8 commit 2534b30
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 3 deletions.
33 changes: 33 additions & 0 deletions internal/datacoord/task_analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/samber/lo"
"go.uber.org/zap"
Expand All @@ -41,6 +42,10 @@ type analyzeTask struct {
nodeID int64
taskInfo *indexpb.AnalyzeResult

queueTime time.Time
startTime time.Time
endTime time.Time

req *indexpb.AnalyzeRequest
}

Expand All @@ -56,6 +61,34 @@ func (at *analyzeTask) ResetNodeID() {
at.nodeID = 0
}

func (at *analyzeTask) SetQueueTime(t time.Time) {
at.queueTime = t
}

func (at *analyzeTask) GetQueueTime() time.Time {
return at.queueTime
}

func (at *analyzeTask) SetStartTime(t time.Time) {
at.startTime = t
}

func (at *analyzeTask) GetStartTime() time.Time {
return at.startTime
}

func (at *analyzeTask) SetEndTime(t time.Time) {
at.endTime = t
}

func (at *analyzeTask) GetEndTime() time.Time {
return at.endTime
}

func (at *analyzeTask) GetTaskType() string {
return indexpb.JobType_JobTypeIndexJob.String()
}

func (at *analyzeTask) CheckTaskHealthy(mt *meta) bool {
t := mt.analyzeMeta.GetTask(at.GetTaskID())
return t != nil
Expand Down
33 changes: 33 additions & 0 deletions internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"context"
"path"
"time"

"go.uber.org/zap"

Expand All @@ -41,6 +42,10 @@ type indexBuildTask struct {
nodeID int64
taskInfo *indexpb.IndexTaskInfo

queueTime time.Time
startTime time.Time
endTime time.Time

req *indexpb.CreateJobRequest
}

Expand All @@ -58,6 +63,34 @@ func (it *indexBuildTask) ResetNodeID() {
it.nodeID = 0
}

func (it *indexBuildTask) SetQueueTime(t time.Time) {
it.queueTime = t
}

func (it *indexBuildTask) GetQueueTime() time.Time {
return it.queueTime
}

func (it *indexBuildTask) SetStartTime(t time.Time) {
it.startTime = t
}

func (it *indexBuildTask) GetStartTime() time.Time {
return it.startTime
}

func (it *indexBuildTask) SetEndTime(t time.Time) {
it.endTime = t
}

func (it *indexBuildTask) GetEndTime() time.Time {
return it.endTime
}

func (it *indexBuildTask) GetTaskType() string {
return indexpb.JobType_JobTypeIndexJob.String()
}

func (it *indexBuildTask) CheckTaskHealthy(mt *meta) bool {
_, exist := mt.indexMeta.GetIndexJob(it.GetTaskID())
return exist
Expand Down
114 changes: 112 additions & 2 deletions internal/datacoord/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/lock"
)

Expand All @@ -41,11 +42,13 @@ type taskScheduler struct {
cancel context.CancelFunc
wg sync.WaitGroup

scheduleDuration time.Duration
scheduleDuration time.Duration
collectMetricsDuration time.Duration

// TODO @xiaocai2333: use priority queue
tasks map[int64]Task
notifyChan chan struct{}
taskLock *lock.KeyLock[int64]

meta *meta

Expand All @@ -71,7 +74,9 @@ func newTaskScheduler(
meta: metaTable,
tasks: make(map[int64]Task),
notifyChan: make(chan struct{}, 1),
taskLock: lock.NewKeyLock[int64](),
scheduleDuration: Params.DataCoordCfg.IndexTaskSchedulerInterval.GetAsDuration(time.Millisecond),
collectMetricsDuration: time.Minute,
policy: defaultBuildIndexPolicy,
nodeManager: nodeManager,
chunkManager: chunkManager,
Expand All @@ -83,8 +88,9 @@ func newTaskScheduler(
}

func (s *taskScheduler) Start() {
s.wg.Add(1)
s.wg.Add(2)
go s.schedule()
go s.collectTaskMetrics()
}

func (s *taskScheduler) Stop() {
Expand All @@ -108,6 +114,9 @@ func (s *taskScheduler) reloadFromKV() {
State: segIndex.IndexState,
FailReason: segIndex.FailReason,
},
queueTime: time.Now(),
startTime: time.Now(),
endTime: time.Now(),
}
}
}
Expand All @@ -124,6 +133,9 @@ func (s *taskScheduler) reloadFromKV() {
State: t.State,
FailReason: t.FailReason,
},
queueTime: time.Now(),
startTime: time.Now(),
endTime: time.Now(),
}
}
}
Expand All @@ -146,6 +158,7 @@ func (s *taskScheduler) enqueue(task Task) {
if _, ok := s.tasks[taskID]; !ok {
s.tasks[taskID] = task
}
task.SetQueueTime(time.Now())
log.Info("taskScheduler enqueue task", zap.Int64("taskID", taskID))
}

Expand Down Expand Up @@ -194,11 +207,14 @@ func (s *taskScheduler) run() {
s.policy(taskIDs)

for _, taskID := range taskIDs {
s.taskLock.Lock(taskID)
ok := s.process(taskID)
if !ok {
s.taskLock.Unlock(taskID)
log.Ctx(s.ctx).Info("there is no idle indexing node, wait a minute...")
break
}
s.taskLock.Unlock(taskID)
}
}

Expand Down Expand Up @@ -262,13 +278,31 @@ func (s *taskScheduler) process(taskID UniqueID) bool {
task.SetState(indexpb.JobState_JobStateRetry, "update meta building state failed")
return false
}
task.SetStartTime(time.Now())
queueingTime := task.GetStartTime().Sub(task.GetQueueTime())
if queueingTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) {
log.Warn("task queueing time is too long", zap.Int64("taskID", taskID),
zap.Int64("queueing time(ms)", queueingTime.Milliseconds()))
}
metrics.DataCoordTaskExecuteLatency.
WithLabelValues(task.GetTaskType(), metrics.Pending).Observe(float64(queueingTime.Milliseconds()))

log.Ctx(s.ctx).Info("update task meta state to InProgress success", zap.Int64("taskID", taskID),
zap.Int64("nodeID", nodeID))
case indexpb.JobState_JobStateFinished, indexpb.JobState_JobStateFailed:
if err := task.SetJobInfo(s.meta); err != nil {
log.Ctx(s.ctx).Warn("update task info failed", zap.Error(err))
return true
}
task.SetEndTime(time.Now())
runningTime := task.GetEndTime().Sub(task.GetStartTime())
if runningTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) {
log.Warn("task running time is too long", zap.Int64("taskID", taskID),
zap.Int64("running time(ms)", runningTime.Milliseconds()))
}
metrics.DataCoordTaskExecuteLatency.
WithLabelValues(task.GetTaskType(), metrics.Executing).Observe(float64(runningTime.Milliseconds()))

client, exist := s.nodeManager.GetClientByID(task.GetNodeID())
if exist {
if !task.DropTaskOnWorker(s.ctx, client) {
Expand Down Expand Up @@ -297,3 +331,79 @@ func (s *taskScheduler) process(taskID UniqueID) bool {
}
return true
}

func (s *taskScheduler) collectTaskMetrics() {
defer s.wg.Done()

ticker := time.NewTicker(s.collectMetricsDuration)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
log.Warn("task scheduler context done")
return
case <-ticker.C:
s.RLock()
taskIDs := make([]UniqueID, 0, len(s.tasks))
for tID := range s.tasks {
taskIDs = append(taskIDs, tID)
}
s.RUnlock()

maxTaskQueueingTime := make(map[string]int64)
maxTaskRunningTime := make(map[string]int64)

collectMetricsFunc := func(taskID int64) {
s.taskLock.Lock(taskID)
defer s.taskLock.Unlock(taskID)

task := s.getTask(taskID)
if task == nil {
return
}

state := task.GetState()
switch state {
case indexpb.JobState_JobStateNone:
return
case indexpb.JobState_JobStateInit:
queueingTime := time.Since(task.GetQueueTime())
if queueingTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) {
log.Warn("task queueing time is too long", zap.Int64("taskID", taskID),
zap.Int64("queueing time(ms)", queueingTime.Milliseconds()))
}

maxQueueingTime, ok := maxTaskQueueingTime[task.GetTaskType()]
if !ok || maxQueueingTime < queueingTime.Milliseconds() {
maxTaskQueueingTime[task.GetTaskType()] = queueingTime.Milliseconds()
}
case indexpb.JobState_JobStateInProgress:
runningTime := time.Since(task.GetStartTime())
if runningTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) {
log.Warn("task running time is too long", zap.Int64("taskID", taskID),
zap.Int64("running time(ms)", runningTime.Milliseconds()))
}

maxRunningTime, ok := maxTaskRunningTime[task.GetTaskType()]
if !ok || maxRunningTime < runningTime.Milliseconds() {
maxTaskRunningTime[task.GetTaskType()] = runningTime.Milliseconds()
}
}
}

for _, taskID := range taskIDs {
collectMetricsFunc(taskID)
}

for taskType, queueingTime := range maxTaskQueueingTime {
metrics.DataCoordTaskExecuteLatency.
WithLabelValues(taskType, metrics.Pending).Observe(float64(queueingTime))
}

for taskType, runningTime := range maxTaskRunningTime {
metrics.DataCoordTaskExecuteLatency.
WithLabelValues(taskType, metrics.Executing).Observe(float64(runningTime))
}
}
}
}
16 changes: 15 additions & 1 deletion internal/datacoord/task_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -732,14 +733,26 @@ func (s *taskSchedulerSuite) TearDownSuite() {
func (s *taskSchedulerSuite) scheduler(handler Handler) {
ctx := context.Background()

var once sync.Once

paramtable.Get().Save(paramtable.Get().DataCoordCfg.TaskSlowThreshold.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.TaskSlowThreshold.Key)
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task *indexpb.AnalyzeTask) error {
once.Do(func() {
time.Sleep(time.Second * 3)
})
return nil
})
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil)

in := mocks.NewMockIndexNodeClient(s.T())
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil)
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, request *indexpb.QueryJobsV2Request, option ...grpc.CallOption) (*indexpb.QueryJobsV2Response, error) {
once.Do(func() {
time.Sleep(time.Second * 3)
})
switch request.GetJobType() {
case indexpb.JobType_JobTypeIndexJob:
results := make([]*indexpb.IndexTaskInfo, 0)
Expand Down Expand Up @@ -815,6 +828,7 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) {
mt.segments.DropSegment(segID + 9)

scheduler.scheduleDuration = time.Millisecond * 500
scheduler.collectMetricsDuration = time.Millisecond * 200
scheduler.Start()

s.Run("enqueue", func() {
Expand Down
8 changes: 8 additions & 0 deletions internal/datacoord/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package datacoord

import (
"context"
"time"

"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/types"
Expand All @@ -38,4 +39,11 @@ type Task interface {
QueryResult(ctx context.Context, client types.IndexNodeClient)
DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool
SetJobInfo(meta *meta) error
SetQueueTime(time.Time)
GetQueueTime() time.Time
SetStartTime(time.Time)
GetStartTime() time.Time
SetEndTime(time.Time)
GetEndTime() time.Time
GetTaskType() string
}
13 changes: 13 additions & 0 deletions pkg/metrics/datacoord_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,18 @@ var (
Name: "import_tasks",
Help: "the import tasks grouping by type and state",
}, []string{"task_type", "import_state"})

DataCoordTaskExecuteLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "task_execute_max_latency",
Help: "latency of task execute operation",
Buckets: longTaskBuckets,
}, []string{
taskTypeLabel,
statusLabelName,
})
)

// RegisterDataCoord registers DataCoord metrics
Expand Down Expand Up @@ -336,6 +348,7 @@ func RegisterDataCoord(registry *prometheus.Registry) {
registry.MustRegister(ImportTasks)
registry.MustRegister(GarbageCollectorFileScanDuration)
registry.MustRegister(GarbageCollectorRunCount)
registry.MustRegister(DataCoordTaskExecuteLatency)
}

func CleanupDataCoordSegmentMetrics(dbName string, collectionID int64, segmentID int64) {
Expand Down
Loading

0 comments on commit 2534b30

Please sign in to comment.