Skip to content

Commit

Permalink
enhance: Print observe time (milvus-io#38575)
Browse files Browse the repository at this point in the history
Print observe, dist handing and schedule time.

issue: milvus-io#37630

pr: milvus-io#38566

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Dec 19, 2024
1 parent ca234e7 commit c3d4469
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 12 deletions.
10 changes: 9 additions & 1 deletion internal/querycoordv2/dist/dist_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/samber/lo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -91,7 +93,9 @@ func (dh *distHandler) start(ctx context.Context) {
}

func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask bool) {
tr := timerecord.NewTimeRecorder("")
resp, err := dh.getDistribution(ctx)
d1 := tr.RecordSpan()
if err != nil {
node := dh.nodeManager.Get(dh.nodeID)
*failures = *failures + 1
Expand All @@ -100,11 +104,15 @@ func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask
fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat()))
}
fields = append(fields, zap.Error(err))
log.RatedWarn(30.0, "failed to get data distribution", fields...)
log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 60).
RatedWarn(30.0, "failed to get data distribution", fields...)
} else {
*failures = 0
dh.handleDistResp(resp, dispatchTask)
}
log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 120).
RatedInfo(120.0, "pull and handle distribution done",
zap.Int("respSize", proto.Size(resp)), zap.Duration("pullDur", d1), zap.Duration("handleDur", tr.RecordSpan()))
}

func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse, dispatchTask bool) {
Expand Down
9 changes: 5 additions & 4 deletions internal/querycoordv2/meta/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
}

return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
segments: segments,
partition2Segments: partition2Segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitions...),
version: target.GetVersion(),
}
}

Expand Down
16 changes: 9 additions & 7 deletions internal/querycoordv2/observers/collection_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,13 @@ func (ob *CollectionObserver) readyToObserve(collectionID int64) bool {

func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
loading := false
observeTaskNum := 0
observeStart := time.Now()
ob.loadTasks.Range(func(traceID string, task LoadTask) bool {
loading = true
observeTaskNum++

start := time.Now()
collection := ob.meta.CollectionManager.GetCollection(task.CollectionID)
if collection == nil {
return true
Expand Down Expand Up @@ -296,9 +300,12 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
ob.loadTasks.Remove(traceID)
}

log.Info("observe collection done", zap.Int64("collectionID", task.CollectionID), zap.Duration("dur", time.Since(start)))
return true
})

log.Info("observe all collections done", zap.Int("num", observeTaskNum), zap.Duration("dur", time.Since(observeStart)))

// trigger check logic when loading collections/partitions
if loading {
ob.checkerController.Check()
Expand Down Expand Up @@ -352,13 +359,6 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, partition.GetCollectionID(), nodes)
loadedCount += len(group)
}
if loadedCount > 0 {
log.Ctx(ctx).Info("partition load progress",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Int("subChannelCount", subChannelCount),
zap.Int("loadSegmentCount", loadedCount-subChannelCount))
}
loadPercentage = int32(loadedCount * 100 / (targetNum * int(replicaNum)))

if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && loadPercentage != 100 {
Expand Down Expand Up @@ -386,6 +386,8 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Int32("partitionLoadPercentage", loadPercentage),
zap.Int("subChannelCount", subChannelCount),
zap.Int("loadSegmentCount", loadedCount-subChannelCount),
)
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage)))
return true
Expand Down
9 changes: 9 additions & 0 deletions internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/timerecord"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -576,11 +577,13 @@ func (scheduler *taskScheduler) schedule(node int64) {
return
}

tr := timerecord.NewTimeRecorder("")
log := log.With(
zap.Int64("nodeID", node),
)

scheduler.tryPromoteAll()
promoteDur := tr.RecordSpan()

log.Debug("process tasks related to node",
zap.Int("processingTaskNum", scheduler.processQueue.Len()),
Expand All @@ -602,6 +605,7 @@ func (scheduler *taskScheduler) schedule(node int64) {

return true
})
preprocessDur := tr.RecordSpan()

// The scheduler doesn't limit the number of tasks,
// to commit tasks to executors as soon as possible, to reach higher merge possibility
Expand All @@ -612,6 +616,7 @@ func (scheduler *taskScheduler) schedule(node int64) {
}
return nil
}, "process")
processDur := tr.RecordSpan()

for _, task := range toRemove {
scheduler.remove(task)
Expand All @@ -623,6 +628,10 @@ func (scheduler *taskScheduler) schedule(node int64) {
zap.Int("toProcessNum", len(toProcess)),
zap.Int32("committedNum", commmittedNum.Load()),
zap.Int("toRemoveNum", len(toRemove)),
zap.Duration("promoteDur", promoteDur),
zap.Duration("preprocessDUr", preprocessDur),
zap.Duration("processDUr", processDur),
zap.Duration("totalDur", tr.ElapseSpan()),
)

log.Info("process tasks related to node done",
Expand Down

0 comments on commit c3d4469

Please sign in to comment.