From c3d446925919fa6a41c503756a10ff86c0cd7b57 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 19 Dec 2024 11:46:13 +0800 Subject: [PATCH] enhance: Print observe time (#38575) Print observe, dist handing and schedule time. issue: https://github.com/milvus-io/milvus/issues/37630 pr: https://github.com/milvus-io/milvus/pull/38566 Signed-off-by: bigsheeper --- internal/querycoordv2/dist/dist_handler.go | 10 +++++++++- internal/querycoordv2/meta/target.go | 9 +++++---- .../observers/collection_observer.go | 16 +++++++++------- internal/querycoordv2/task/scheduler.go | 9 +++++++++ 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 0e78f00dd40a5..3c1121c81fff7 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -24,6 +24,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -37,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -91,7 +93,9 @@ func (dh *distHandler) start(ctx context.Context) { } func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask bool) { + tr := timerecord.NewTimeRecorder("") resp, err := dh.getDistribution(ctx) + d1 := tr.RecordSpan() if err != nil { node := dh.nodeManager.Get(dh.nodeID) *failures = *failures + 1 @@ -100,11 +104,15 @@ func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat())) } fields = append(fields, zap.Error(err)) - log.RatedWarn(30.0, "failed to get data distribution", fields...) + log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 60). + RatedWarn(30.0, "failed to get data distribution", fields...) } else { *failures = 0 dh.handleDistResp(resp, dispatchTask) } + log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 120). + RatedInfo(120.0, "pull and handle distribution done", + zap.Int("respSize", proto.Size(resp)), zap.Duration("pullDur", d1), zap.Duration("handleDur", tr.RecordSpan())) } func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse, dispatchTask bool) { diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 62ae15e842448..4c492ff4b0520 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -90,10 +90,11 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget } return &CollectionTarget{ - segments: segments, - dmChannels: dmChannels, - partitions: typeutil.NewSet(partitions...), - version: target.GetVersion(), + segments: segments, + partition2Segments: partition2Segments, + dmChannels: dmChannels, + partitions: typeutil.NewSet(partitions...), + version: target.GetVersion(), } } diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 5ccd06d05bf7f..8317c04143bb2 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -240,9 +240,13 @@ func (ob *CollectionObserver) readyToObserve(collectionID int64) bool { func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { loading := false + observeTaskNum := 0 + observeStart := time.Now() ob.loadTasks.Range(func(traceID string, task LoadTask) bool { loading = true + observeTaskNum++ + start := time.Now() collection := ob.meta.CollectionManager.GetCollection(task.CollectionID) if collection == nil { return true @@ -296,9 +300,12 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { ob.loadTasks.Remove(traceID) } + log.Info("observe collection done", zap.Int64("collectionID", task.CollectionID), zap.Duration("dur", time.Since(start))) return true }) + log.Info("observe all collections done", zap.Int("num", observeTaskNum), zap.Duration("dur", time.Since(observeStart))) + // trigger check logic when loading collections/partitions if loading { ob.checkerController.Check() @@ -352,13 +359,6 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, partition.GetCollectionID(), nodes) loadedCount += len(group) } - if loadedCount > 0 { - log.Ctx(ctx).Info("partition load progress", - zap.Int64("collectionID", partition.GetCollectionID()), - zap.Int64("partitionID", partition.GetPartitionID()), - zap.Int("subChannelCount", subChannelCount), - zap.Int("loadSegmentCount", loadedCount-subChannelCount)) - } loadPercentage = int32(loadedCount * 100 / (targetNum * int(replicaNum))) if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && loadPercentage != 100 { @@ -386,6 +386,8 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa zap.Int64("collectionID", partition.GetCollectionID()), zap.Int64("partitionID", partition.GetPartitionID()), zap.Int32("partitionLoadPercentage", loadPercentage), + zap.Int("subChannelCount", subChannelCount), + zap.Int("loadSegmentCount", loadedCount-subChannelCount), ) eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage))) return true diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 518cd9cd72af7..3cc4758431585 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/timerecord" . "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -576,11 +577,13 @@ func (scheduler *taskScheduler) schedule(node int64) { return } + tr := timerecord.NewTimeRecorder("") log := log.With( zap.Int64("nodeID", node), ) scheduler.tryPromoteAll() + promoteDur := tr.RecordSpan() log.Debug("process tasks related to node", zap.Int("processingTaskNum", scheduler.processQueue.Len()), @@ -602,6 +605,7 @@ func (scheduler *taskScheduler) schedule(node int64) { return true }) + preprocessDur := tr.RecordSpan() // The scheduler doesn't limit the number of tasks, // to commit tasks to executors as soon as possible, to reach higher merge possibility @@ -612,6 +616,7 @@ func (scheduler *taskScheduler) schedule(node int64) { } return nil }, "process") + processDur := tr.RecordSpan() for _, task := range toRemove { scheduler.remove(task) @@ -623,6 +628,10 @@ func (scheduler *taskScheduler) schedule(node int64) { zap.Int("toProcessNum", len(toProcess)), zap.Int32("committedNum", commmittedNum.Load()), zap.Int("toRemoveNum", len(toRemove)), + zap.Duration("promoteDur", promoteDur), + zap.Duration("preprocessDUr", preprocessDur), + zap.Duration("processDUr", processDur), + zap.Duration("totalDur", tr.ElapseSpan()), ) log.Info("process tasks related to node done",