diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index f8fcd896942cb..62ae15e842448 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -28,36 +28,52 @@ import ( // CollectionTarget collection target is immutable, type CollectionTarget struct { - segments map[int64]*datapb.SegmentInfo - dmChannels map[string]*DmChannel - partitions typeutil.Set[int64] // stores target partitions info - version int64 + segments map[int64]*datapb.SegmentInfo + partition2Segments map[int64][]*datapb.SegmentInfo + dmChannels map[string]*DmChannel + partitions typeutil.Set[int64] // stores target partitions info + version int64 } func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget { + partition2Segments := make(map[int64][]*datapb.SegmentInfo, len(partitionIDs)) + for _, segment := range segments { + partitionID := segment.GetPartitionID() + if _, ok := partition2Segments[partitionID]; !ok { + partition2Segments[partitionID] = make([]*datapb.SegmentInfo, 0) + } + partition2Segments[partitionID] = append(partition2Segments[partitionID], segment) + } return &CollectionTarget{ - segments: segments, - dmChannels: dmChannels, - partitions: typeutil.NewSet(partitionIDs...), - version: time.Now().UnixNano(), + segments: segments, + partition2Segments: partition2Segments, + dmChannels: dmChannels, + partitions: typeutil.NewSet(partitionIDs...), + version: time.Now().UnixNano(), } } func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget { segments := make(map[int64]*datapb.SegmentInfo) dmChannels := make(map[string]*DmChannel) + partition2Segments := make(map[int64][]*datapb.SegmentInfo) var partitions []int64 for _, t := range target.GetChannelTargets() { for _, partition := range t.GetPartitionTargets() { + if _, ok := partition2Segments[partition.GetPartitionID()]; !ok { + partition2Segments[partition.GetPartitionID()] = make([]*datapb.SegmentInfo, 0, len(partition.GetSegments())) + } for _, segment := range partition.GetSegments() { - segments[segment.GetID()] = &datapb.SegmentInfo{ + info := &datapb.SegmentInfo{ ID: segment.GetID(), Level: segment.GetLevel(), CollectionID: target.GetCollectionID(), PartitionID: partition.GetPartitionID(), InsertChannel: t.GetChannelName(), } + segments[segment.GetID()] = info + partition2Segments[partition.GetPartitionID()] = append(partition2Segments[partition.GetPartitionID()], info) } partitions = append(partitions, partition.GetPartitionID()) } @@ -137,6 +153,10 @@ func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo { return p.segments } +func (p *CollectionTarget) GetPartitionSegments(partitionID int64) []*datapb.SegmentInfo { + return p.partition2Segments[partitionID] +} + func (p *CollectionTarget) GetTargetVersion() int64 { return p.version } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 85d7657f43973..33b72a99f951e 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -468,10 +468,8 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64, targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { segments := make(map[int64]*datapb.SegmentInfo) - for _, s := range t.GetAllSegments() { - if s.GetPartitionID() == partitionID { - segments[s.GetID()] = s - } + for _, s := range t.GetPartitionSegments(partitionID) { + segments[s.GetID()] = s } if len(segments) > 0 { diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 94ddb392dd95f..5ccd06d05bf7f 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -325,11 +325,6 @@ func (ob *CollectionObserver) observeChannelStatus(collectionID int64) (int, int } func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32, channelTargetNum, subChannelCount int) bool { - log := log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).With( - zap.Int64("collectionID", partition.GetCollectionID()), - zap.Int64("partitionID", partition.GetPartitionID()), - ) - segmentTargets := ob.targetMgr.GetSealedSegmentsByPartition(partition.GetCollectionID(), partition.GetPartitionID(), meta.NextTarget) targetNum := len(segmentTargets) + channelTargetNum @@ -338,7 +333,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa return false } - log.RatedInfo(10, "partition targets", + log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).RatedInfo(10, "partition targets", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID()), zap.Int("segmentTargetNum", len(segmentTargets)), zap.Int("channelTargetNum", channelTargetNum), zap.Int("totalTargetNum", targetNum), @@ -356,7 +353,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa loadedCount += len(group) } if loadedCount > 0 { - log.Info("partition load progress", + log.Ctx(ctx).Info("partition load progress", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID()), zap.Int("subChannelCount", subChannelCount), zap.Int("loadSegmentCount", loadedCount-subChannelCount)) } @@ -370,16 +369,22 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount if loadPercentage == 100 { if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) { - log.Warn("failed to manual check current target, skip update load status") + log.Ctx(ctx).Warn("failed to manual check current target, skip update load status", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID())) return false } delete(ob.partitionLoadedCount, partition.GetPartitionID()) } err := ob.meta.CollectionManager.UpdatePartitionLoadPercent(partition.PartitionID, loadPercentage) if err != nil { - log.Warn("failed to update partition load percentage") + log.Ctx(ctx).Warn("failed to update partition load percentage", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID())) } - log.Info("partition load status updated", + log.Ctx(ctx).Info("partition load status updated", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID()), zap.Int32("partitionLoadPercentage", loadPercentage), ) eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage))) @@ -387,13 +392,12 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa } func (ob *CollectionObserver) observeCollectionLoadStatus(ctx context.Context, collectionID int64) { - log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) - collectionPercentage, err := ob.meta.CollectionManager.UpdateCollectionLoadPercent(collectionID) if err != nil { - log.Warn("failed to update collection load percentage") + log.Ctx(ctx).Warn("failed to update collection load percentage", zap.Int64("collectionID", collectionID)) } - log.Info("collection load status updated", + log.Ctx(ctx).Info("collection load status updated", + zap.Int64("collectionID", collectionID), zap.Int32("collectionLoadPercentage", collectionPercentage), ) if collectionPercentage == 100 { diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 7382c065c8c78..debf2de8f88aa 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -118,21 +118,7 @@ func (action *SegmentAction) Scope() querypb.DataScope { func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool { if action.Type() == ActionTypeGrow { // rpc finished - if !action.rpcReturned.Load() { - return false - } - - // segment found in leader view - views := distMgr.LeaderViewManager.GetByFilter( - meta.WithChannelName2LeaderView(action.Shard()), - meta.WithSegment2LeaderView(action.segmentID, false)) - if len(views) == 0 { - return false - } - - // segment found in dist - segmentInTargetNode := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()), meta.WithSegmentID(action.SegmentID())) - return len(segmentInTargetNode) > 0 + return action.rpcReturned.Load() } else if action.Type() == ActionTypeReduce { // FIXME: Now shard leader's segment view is a map of segment ID to node ID, // loading segment replaces the node ID with the new one, diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index aab1a30f3ceb9..518cd9cd72af7 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -617,6 +617,8 @@ func (scheduler *taskScheduler) schedule(node int64) { scheduler.remove(task) } + scheduler.updateTaskMetrics() + log.Info("processed tasks", zap.Int("toProcessNum", len(toProcess)), zap.Int32("committedNum", commmittedNum.Load()), @@ -668,10 +670,6 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool { // return true if the task should be executed, // false otherwise func (scheduler *taskScheduler) preProcess(task Task) bool { - log := log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).With( - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("taskID", task.ID()), - ) if task.Status() != TaskStatusStarted { return false } @@ -694,7 +692,9 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { } if !ready { - log.RatedInfo(30, "Blocking reduce action in balance channel task") + log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).RatedInfo(30, "Blocking reduce action in balance channel task", + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("taskID", task.ID())) break } } @@ -813,7 +813,6 @@ func (scheduler *taskScheduler) remove(task Task) { log = log.With(zap.Int64("segmentID", task.SegmentID())) } - scheduler.updateTaskMetrics() log.Info("task removed") if scheduler.meta.Exist(task.CollectionID()) { diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 40e45589886ed..a648c303b5444 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1135,22 +1135,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() { suite.dispatchAndWait(targetNode) suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) - // Process tasks done - // Dist contains channels, first task stale - view := &meta.LeaderView{ - ID: targetNode, - CollectionID: suite.collection, - Segments: map[int64]*querypb.SegmentDist{}, - Channel: channel.ChannelName, - } - for _, segment := range suite.loadSegments[1:] { - view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0} - } - distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment { - return meta.SegmentFromInfo(info) - }) - suite.dist.LeaderViewManager.Update(targetNode, view) - suite.dist.SegmentDistManager.Update(targetNode, distSegments...) segments = make([]*datapb.SegmentInfo, 0) for _, segment := range suite.loadSegments[1:] { segments = append(segments, &datapb.SegmentInfo{ diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 6d22586cc893f..cf72bf914835d 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1245,7 +1245,10 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get growingSegments[entry.SegmentID] = &msgpb.MsgPosition{} continue } - growingSegments[entry.SegmentID] = segment.StartPosition() + // QueryCoord only requires the timestamp from the position. + growingSegments[entry.SegmentID] = &msgpb.MsgPosition{ + Timestamp: segment.StartPosition().GetTimestamp(), + } numOfGrowingRows += segment.InsertCount() }