diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 828cedc6e5ce3..55d3371252d94 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -24,6 +24,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -37,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -91,7 +93,9 @@ func (dh *distHandler) start(ctx context.Context) { } func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask bool) { + tr := timerecord.NewTimeRecorder("") resp, err := dh.getDistribution(ctx) + d1 := tr.RecordSpan() if err != nil { node := dh.nodeManager.Get(dh.nodeID) *failures = *failures + 1 @@ -100,11 +104,15 @@ func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat())) } fields = append(fields, zap.Error(err)) - log.RatedWarn(30.0, "failed to get data distribution", fields...) + log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 60). + RatedWarn(30.0, "failed to get data distribution", fields...) } else { *failures = 0 dh.handleDistResp(ctx, resp, dispatchTask) } + log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 120). + RatedInfo(120.0, "pull and handle distribution done", + zap.Int("respSize", proto.Size(resp)), zap.Duration("pullDur", d1), zap.Duration("handleDur", tr.RecordSpan())) } func (dh *distHandler) handleDistResp(ctx context.Context, resp *querypb.GetDataDistributionResponse, dispatchTask bool) { diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 4795eade4cfae..1ed9714b6239f 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -32,37 +32,62 @@ import ( // CollectionTarget collection target is immutable, type CollectionTarget struct { - segments map[int64]*datapb.SegmentInfo - dmChannels map[string]*DmChannel - partitions typeutil.Set[int64] // stores target partitions info - version int64 + segments map[int64]*datapb.SegmentInfo + channel2Segments map[string][]*datapb.SegmentInfo + partition2Segments map[int64][]*datapb.SegmentInfo + dmChannels map[string]*DmChannel + partitions typeutil.Set[int64] // stores target partitions info + version int64 // record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info. lackSegmentInfo bool } func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget { + channel2Segments := make(map[string][]*datapb.SegmentInfo, len(dmChannels)) + partition2Segments := make(map[int64][]*datapb.SegmentInfo, len(partitionIDs)) + for _, segment := range segments { + channel := segment.GetInsertChannel() + if _, ok := channel2Segments[channel]; !ok { + channel2Segments[channel] = make([]*datapb.SegmentInfo, 0) + } + channel2Segments[channel] = append(channel2Segments[channel], segment) + partitionID := segment.GetPartitionID() + if _, ok := partition2Segments[partitionID]; !ok { + partition2Segments[partitionID] = make([]*datapb.SegmentInfo, 0) + } + partition2Segments[partitionID] = append(partition2Segments[partitionID], segment) + } return &CollectionTarget{ - segments: segments, - dmChannels: dmChannels, - partitions: typeutil.NewSet(partitionIDs...), - version: time.Now().UnixNano(), + segments: segments, + partition2Segments: partition2Segments, + dmChannels: dmChannels, + partitions: typeutil.NewSet(partitionIDs...), + version: time.Now().UnixNano(), } } func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget { segments := make(map[int64]*datapb.SegmentInfo) dmChannels := make(map[string]*DmChannel) + channel2Segments := make(map[string][]*datapb.SegmentInfo) + partition2Segments := make(map[int64][]*datapb.SegmentInfo) var partitions []int64 lackSegmentInfo := false for _, t := range target.GetChannelTargets() { + if _, ok := channel2Segments[t.GetChannelName()]; !ok { + channel2Segments[t.GetChannelName()] = make([]*datapb.SegmentInfo, 0) + } for _, partition := range t.GetPartitionTargets() { + if _, ok := partition2Segments[partition.GetPartitionID()]; !ok { + partition2Segments[partition.GetPartitionID()] = make([]*datapb.SegmentInfo, 0, len(partition.GetSegments())) + } for _, segment := range partition.GetSegments() { if segment.GetNumOfRows() <= 0 { lackSegmentInfo = true } - segments[segment.GetID()] = &datapb.SegmentInfo{ + info := &datapb.SegmentInfo{ ID: segment.GetID(), Level: segment.GetLevel(), CollectionID: target.GetCollectionID(), @@ -70,6 +95,9 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget InsertChannel: t.GetChannelName(), NumOfRows: segment.GetNumOfRows(), } + segments[segment.GetID()] = info + channel2Segments[t.GetChannelName()] = append(channel2Segments[t.GetChannelName()], info) + partition2Segments[partition.GetPartitionID()] = append(partition2Segments[partition.GetPartitionID()], info) } partitions = append(partitions, partition.GetPartitionID()) } @@ -90,11 +118,13 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget } return &CollectionTarget{ - segments: segments, - dmChannels: dmChannels, - partitions: typeutil.NewSet(partitions...), - version: target.GetVersion(), - lackSegmentInfo: lackSegmentInfo, + segments: segments, + channel2Segments: channel2Segments, + partition2Segments: partition2Segments, + dmChannels: dmChannels, + partitions: typeutil.NewSet(partitions...), + version: target.GetVersion(), + lackSegmentInfo: lackSegmentInfo, } } @@ -155,6 +185,14 @@ func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo { return p.segments } +func (p *CollectionTarget) GetChannelSegments(channel string) []*datapb.SegmentInfo { + return p.channel2Segments[channel] +} + +func (p *CollectionTarget) GetPartitionSegments(partitionID int64) []*datapb.SegmentInfo { + return p.partition2Segments[partitionID] +} + func (p *CollectionTarget) GetTargetVersion() int64 { return p.version } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 10fe0b787b55d..aad32e68d9e01 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -426,12 +426,9 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(ctx context.Context, collec targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { - ret := make(map[int64]*datapb.SegmentInfo) - for k, v := range t.GetAllSegments() { - if v.GetInsertChannel() == channelName { - ret[k] = v - } - } + ret := lo.KeyBy(t.GetChannelSegments(channelName), func(s *datapb.SegmentInfo) int64 { + return s.GetID() + }) if len(ret) > 0 { return ret @@ -468,10 +465,8 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(ctx context.Context, coll targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { segments := make(map[int64]*datapb.SegmentInfo) - for _, s := range t.GetAllSegments() { - if s.GetPartitionID() == partitionID { - segments[s.GetID()] = s - } + for _, s := range t.GetPartitionSegments(partitionID) { + segments[s.GetID()] = s } if len(segments) > 0 { diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 99f8c2f06a341..90ae9dfb23593 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -240,9 +240,13 @@ func (ob *CollectionObserver) readyToObserve(ctx context.Context, collectionID i func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { loading := false + observeTaskNum := 0 + observeStart := time.Now() ob.loadTasks.Range(func(traceID string, task LoadTask) bool { loading = true + observeTaskNum++ + start := time.Now() collection := ob.meta.CollectionManager.GetCollection(ctx, task.CollectionID) if collection == nil { return true @@ -296,9 +300,12 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { ob.loadTasks.Remove(traceID) } + log.Info("observe collection done", zap.Int64("collectionID", task.CollectionID), zap.Duration("dur", time.Since(start))) return true }) + log.Info("observe all collections done", zap.Int("num", observeTaskNum), zap.Duration("dur", time.Since(observeStart))) + // trigger check logic when loading collections/partitions if loading { ob.checkerController.Check() @@ -325,11 +332,6 @@ func (ob *CollectionObserver) observeChannelStatus(ctx context.Context, collecti } func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32, channelTargetNum, subChannelCount int) bool { - log := log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).With( - zap.Int64("collectionID", partition.GetCollectionID()), - zap.Int64("partitionID", partition.GetPartitionID()), - ) - segmentTargets := ob.targetMgr.GetSealedSegmentsByPartition(ctx, partition.GetCollectionID(), partition.GetPartitionID(), meta.NextTarget) targetNum := len(segmentTargets) + channelTargetNum @@ -338,7 +340,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa return false } - log.RatedInfo(10, "partition targets", + log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).RatedInfo(10, "partition targets", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID()), zap.Int("segmentTargetNum", len(segmentTargets)), zap.Int("channelTargetNum", channelTargetNum), zap.Int("totalTargetNum", targetNum), @@ -355,11 +359,6 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa group := utils.GroupNodesByReplica(ctx, ob.meta.ReplicaManager, partition.GetCollectionID(), nodes) loadedCount += len(group) } - if loadedCount > 0 { - log.Info("partition load progress", - zap.Int("subChannelCount", subChannelCount), - zap.Int("loadSegmentCount", loadedCount-subChannelCount)) - } loadPercentage = int32(loadedCount * 100 / (targetNum * int(replicaNum))) if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && loadPercentage != 100 { @@ -370,30 +369,37 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount if loadPercentage == 100 { if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) { - log.Warn("failed to manual check current target, skip update load status") + log.Ctx(ctx).Warn("failed to manual check current target, skip update load status", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID())) return false } delete(ob.partitionLoadedCount, partition.GetPartitionID()) } err := ob.meta.CollectionManager.UpdatePartitionLoadPercent(ctx, partition.PartitionID, loadPercentage) if err != nil { - log.Warn("failed to update partition load percentage") + log.Ctx(ctx).Warn("failed to update partition load percentage", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID())) } - log.Info("partition load status updated", + log.Ctx(ctx).Info("partition load status updated", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID()), zap.Int32("partitionLoadPercentage", loadPercentage), + zap.Int("subChannelCount", subChannelCount), + zap.Int("loadSegmentCount", loadedCount-subChannelCount), ) eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage))) return true } func (ob *CollectionObserver) observeCollectionLoadStatus(ctx context.Context, collectionID int64) { - log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) - collectionPercentage, err := ob.meta.CollectionManager.UpdateCollectionLoadPercent(ctx, collectionID) if err != nil { - log.Warn("failed to update collection load percentage") + log.Ctx(ctx).Warn("failed to update collection load percentage", zap.Int64("collectionID", collectionID)) } - log.Info("collection load status updated", + log.Ctx(ctx).Info("collection load status updated", + zap.Int64("collectionID", collectionID), zap.Int32("collectionLoadPercentage", collectionPercentage), ) if collectionPercentage == 100 { diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index dfbc4c44ddf52..2aa5a90d4b2ab 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -24,7 +24,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" - "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -116,49 +115,7 @@ func (action *SegmentAction) GetScope() querypb.DataScope { } func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool { - if action.Type() == ActionTypeGrow { - // rpc finished - if !action.rpcReturned.Load() { - return false - } - - // segment found in leader view - views := distMgr.LeaderViewManager.GetByFilter( - meta.WithChannelName2LeaderView(action.Shard), - meta.WithSegment2LeaderView(action.SegmentID, false)) - if len(views) == 0 { - return false - } - - // segment found in dist - segmentInTargetNode := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()), meta.WithSegmentID(action.SegmentID)) - return len(segmentInTargetNode) > 0 - } else if action.Type() == ActionTypeReduce { - // FIXME: Now shard leader's segment view is a map of segment ID to node ID, - // loading segment replaces the node ID with the new one, - // which confuses the condition of finishing, - // the leader should return a map of segment ID to list of nodes, - // now, we just always commit the release task to executor once. - // NOTE: DO NOT create a task containing release action and the action is not the last action - sealed := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node())) - views := distMgr.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(action.Node())) - growing := lo.FlatMap(views, func(view *meta.LeaderView, _ int) []int64 { - return lo.Keys(view.GrowingSegments) - }) - segments := make([]int64, 0, len(sealed)+len(growing)) - for _, segment := range sealed { - segments = append(segments, segment.GetID()) - } - segments = append(segments, growing...) - if !funcutil.SliceContain(segments, action.GetSegmentID()) { - return true - } - return action.rpcReturned.Load() - } else if action.Type() == ActionTypeUpdate { - return action.rpcReturned.Load() - } - - return true + return action.rpcReturned.Load() } func (action *SegmentAction) Desc() string { diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 316f1a552be71..371aa03b1ef3c 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -39,6 +39,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/timerecord" . "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -657,11 +658,13 @@ func (scheduler *taskScheduler) schedule(node int64) { return } + tr := timerecord.NewTimeRecorder("") log := log.Ctx(scheduler.ctx).With( zap.Int64("nodeID", node), ) scheduler.tryPromoteAll() + promoteDur := tr.RecordSpan() log.Debug("process tasks related to node", zap.Int("processingTaskNum", scheduler.processQueue.Len()), @@ -683,6 +686,7 @@ func (scheduler *taskScheduler) schedule(node int64) { return true }) + preprocessDur := tr.RecordSpan() // The scheduler doesn't limit the number of tasks, // to commit tasks to executors as soon as possible, to reach higher merge possibility @@ -693,15 +697,22 @@ func (scheduler *taskScheduler) schedule(node int64) { } return nil }, "process") + processDur := tr.RecordSpan() for _, task := range toRemove { scheduler.remove(task) } + scheduler.updateTaskMetrics() + log.Info("processed tasks", zap.Int("toProcessNum", len(toProcess)), zap.Int32("committedNum", commmittedNum.Load()), zap.Int("toRemoveNum", len(toRemove)), + zap.Duration("promoteDur", promoteDur), + zap.Duration("preprocessDUr", preprocessDur), + zap.Duration("processDUr", processDur), + zap.Duration("totalDur", tr.ElapseSpan()), ) log.Info("process tasks related to node done", @@ -749,10 +760,6 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool { // return true if the task should be executed, // false otherwise func (scheduler *taskScheduler) preProcess(task Task) bool { - log := log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).With( - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("taskID", task.ID()), - ) if task.Status() != TaskStatusStarted { return false } @@ -775,7 +782,9 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { } if !ready { - log.RatedInfo(30, "Blocking reduce action in balance channel task") + log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).RatedInfo(30, "Blocking reduce action in balance channel task", + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("taskID", task.ID())) break } } @@ -894,7 +903,6 @@ func (scheduler *taskScheduler) remove(task Task) { log = log.With(zap.Int64("segmentID", task.SegmentID())) } - scheduler.updateTaskMetrics() log.Info("task removed") if scheduler.meta.Exist(task.Context(), task.CollectionID()) { @@ -940,14 +948,18 @@ func (scheduler *taskScheduler) getTaskMetricsLabel(task Task) string { return metrics.UnknownTaskLabel } -func (scheduler *taskScheduler) checkStale(task Task) error { - log := log.Ctx(task.Context()).With( +func WrapTaskLog(task Task, fields ...zap.Field) []zap.Field { + res := []zap.Field{ zap.Int64("taskID", task.ID()), zap.Int64("collectionID", task.CollectionID()), zap.Int64("replicaID", task.ReplicaID()), zap.String("source", task.Source().String()), - ) + } + res = append(res, fields...) + return res +} +func (scheduler *taskScheduler) checkStale(task Task) error { switch task := task.(type) { case *SegmentTask: if err := scheduler.checkSegmentTaskStale(task); err != nil { @@ -974,7 +986,9 @@ func (scheduler *taskScheduler) checkStale(task Task) error { zap.Int("step", step)) if scheduler.nodeMgr.Get(action.Node()) == nil { - log.Warn("the task is stale, the target node is offline") + log.Warn("the task is stale, the target node is offline", WrapTaskLog(task, + zap.Int64("nodeID", action.Node()), + zap.Int("step", step))...) return merr.WrapErrNodeNotFound(action.Node()) } } @@ -983,38 +997,30 @@ func (scheduler *taskScheduler) checkStale(task Task) error { } func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { - log := log.Ctx(task.Context()).With( - zap.Int64("taskID", task.ID()), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("source", task.Source().String()), - ) - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok { - log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID)) + log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.Int64("segment", task.segmentID))...) return merr.WrapErrNodeOffline(action.Node()) } taskType := GetTaskType(task) segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst) if segment == nil { - log.Warn("task stale due to the segment to load not exists in targets", - zap.Int64("segment", task.segmentID), - zap.String("taskType", taskType.String()), - ) + log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets", + WrapTaskLog(task, zap.Int64("segment", task.segmentID), + zap.String("taskType", taskType.String()))...) return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment") } replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.ctx, task.CollectionID(), action.Node()) if replica == nil { - log.Warn("task stale due to replica not found") + log.Ctx(task.Context()).Warn("task stale due to replica not found", WrapTaskLog(task)...) return merr.WrapErrReplicaNotFound(task.CollectionID(), "by collectionID") } _, ok := scheduler.distMgr.GetShardLeader(replica, segment.GetInsertChannel()) if !ok { - log.Warn("task stale due to leader not found") + log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task)...) return merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "failed to get shard delegator") } @@ -1026,23 +1032,16 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { } func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error { - log := log.Ctx(task.Context()).With( - zap.Int64("taskID", task.ID()), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("source", task.Source().String()), - ) - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok { - log.Warn("task stale due to node offline", zap.String("channel", task.Channel())) + log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.String("channel", task.Channel()))...) return merr.WrapErrNodeOffline(action.Node()) } if scheduler.targetMgr.GetDmChannel(task.ctx, task.collectionID, task.Channel(), meta.NextTargetFirst) == nil { - log.Warn("the task is stale, the channel to subscribe not exists in targets", - zap.String("channel", task.Channel())) + log.Ctx(task.Context()).Warn("the task is stale, the channel to subscribe not exists in targets", + WrapTaskLog(task, zap.String("channel", task.Channel()))...) return merr.WrapErrChannelReduplicate(task.Channel(), "target doesn't contain this channel") } @@ -1054,48 +1053,41 @@ func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error { } func (scheduler *taskScheduler) checkLeaderTaskStale(task *LeaderTask) error { - log := log.Ctx(task.Context()).With( - zap.Int64("taskID", task.ID()), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("source", task.Source().String()), - zap.Int64("leaderID", task.leaderID), - ) - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.(*LeaderAction).GetLeaderID()); ok { - log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID)) + log.Ctx(task.Context()).Warn("task stale due to node offline", + WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), zap.Int64("segment", task.segmentID))...) return merr.WrapErrNodeOffline(action.Node()) } taskType := GetTaskType(task) segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst) if segment == nil { - log.Warn("task stale due to the segment to load not exists in targets", - zap.Int64("segment", task.segmentID), - zap.String("taskType", taskType.String()), - ) + log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets", + WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), + zap.Int64("segment", task.segmentID), + zap.String("taskType", taskType.String()))...) return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment") } replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.ctx, task.CollectionID(), action.Node()) if replica == nil { - log.Warn("task stale due to replica not found") + log.Ctx(task.Context()).Warn("task stale due to replica not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) return merr.WrapErrReplicaNotFound(task.CollectionID(), "by collectionID") } view := scheduler.distMgr.GetLeaderShardView(task.leaderID, task.Shard()) if view == nil { - log.Warn("task stale due to leader not found") + log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator") } case ActionTypeReduce: view := scheduler.distMgr.GetLeaderShardView(task.leaderID, task.Shard()) if view == nil { - log.Warn("task stale due to leader not found") + log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator") } } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 482282f54f319..aebf4b996276e 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1141,22 +1141,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() { suite.dispatchAndWait(targetNode) suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) - // Process tasks done - // Dist contains channels, first task stale - view := &meta.LeaderView{ - ID: targetNode, - CollectionID: suite.collection, - Segments: map[int64]*querypb.SegmentDist{}, - Channel: channel.ChannelName, - } - for _, segment := range suite.loadSegments[1:] { - view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0} - } - distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment { - return meta.SegmentFromInfo(info) - }) - suite.dist.LeaderViewManager.Update(targetNode, view) - suite.dist.SegmentDistManager.Update(targetNode, distSegments...) segments = make([]*datapb.SegmentInfo, 0) for _, segment := range suite.loadSegments[1:] { segments = append(segments, &datapb.SegmentInfo{ diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 5e283b926ee60..881873c418f27 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -48,7 +48,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error { func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView, scope int32) error { log := log.Ctx(context.TODO()). WithRateGroup("utils.CheckLeaderAvailable", 1, 60). - With(zap.Int64("leaderID", leader.ID)) + With(zap.Int64("leaderID", leader.ID), zap.Int64("collectionID", leader.CollectionID)) info := nodeMgr.Get(leader.ID) // Check whether leader is online diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 446f5fded4fa0..93492da088189 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1208,7 +1208,10 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get growingSegments[entry.SegmentID] = &msgpb.MsgPosition{} continue } - growingSegments[entry.SegmentID] = segment.StartPosition() + // QueryCoord only requires the timestamp from the position. + growingSegments[entry.SegmentID] = &msgpb.MsgPosition{ + Timestamp: segment.StartPosition().GetTimestamp(), + } numOfGrowingRows += segment.InsertCount() }