diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index a2a44e0908e2d..f095b82ec58eb 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -168,6 +168,8 @@ type taskScheduler struct { channelTasks map[replicaChannelIndex]Task processQueue *taskQueue waitQueue *taskQueue + + lastUpdateMetricTime time.Time } func NewScheduler(ctx context.Context, @@ -280,13 +282,15 @@ func (scheduler *taskScheduler) Add(task Task) error { scheduler.segmentTasks[index] = task } - scheduler.updateTaskMetrics() log.Ctx(task.Context()).Info("task added", zap.String("task", task.String())) task.RecordStartTs() return nil } func (scheduler *taskScheduler) updateTaskMetrics() { + if time.Since(scheduler.lastUpdateMetricTime) < 30*time.Second { + return + } segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0 channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0 for _, task := range scheduler.segmentTasks { @@ -319,6 +323,7 @@ func (scheduler *taskScheduler) updateTaskMetrics() { metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelGrowTaskLabel).Set(float64(channelGrowNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelReduceTaskLabel).Set(float64(channelReduceNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelMoveTaskLabel).Set(float64(channelMoveNum)) + scheduler.lastUpdateMetricTime = time.Now() } // check whether the task is valid to add,