diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index b017019bec086..cd9f5ee1ba4a0 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -390,7 +390,7 @@ func (ob *TargetObserver) updateNextTarget(collectionID int64) error { log := log.Ctx(context.TODO()).WithRateGroup("qcv2.TargetObserver", 1, 60). With(zap.Int64("collectionID", collectionID)) - log.RatedInfo(10, "observer trigger update next target") + log.Info("observer trigger update next target") err := ob.targetMgr.UpdateCollectionNextTarget(collectionID) if err != nil { log.Warn("failed to update next target for collection", @@ -422,7 +422,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect channelNames := ob.targetMgr.GetDmChannelsByCollection(collectionID, meta.NextTarget) if len(channelNames) == 0 { // next target is empty, no need to update - log.RatedInfo(10, "next target is empty, no need to update") + log.Info("next target is empty, no need to update") return false } @@ -434,7 +434,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect // to avoid stuck here in dynamic increase replica case, we just check available delegator number if int32(len(channelReadyLeaders)) < replicaNum { - log.RatedInfo(10, "channel not ready", + log.Info("channel not ready", zap.Int("readyReplicaNum", len(channelReadyLeaders)), zap.String("channelName", channel), ) @@ -573,7 +573,7 @@ func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, lead func (ob *TargetObserver) updateCurrentTarget(collectionID int64) { log := log.Ctx(context.TODO()).WithRateGroup("qcv2.TargetObserver", 1, 60) - log.RatedInfo(10, "observer trigger update current target", zap.Int64("collectionID", collectionID)) + log.Info("observer trigger update current target", zap.Int64("collectionID", collectionID)) if ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) { ob.mut.Lock() defer ob.mut.Unlock() diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index debf2de8f88aa..d90f310f95113 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -25,7 +25,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -116,35 +115,7 @@ func (action *SegmentAction) Scope() querypb.DataScope { } func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool { - if action.Type() == ActionTypeGrow { - // rpc finished - return action.rpcReturned.Load() - } else if action.Type() == ActionTypeReduce { - // FIXME: Now shard leader's segment view is a map of segment ID to node ID, - // loading segment replaces the node ID with the new one, - // which confuses the condition of finishing, - // the leader should return a map of segment ID to list of nodes, - // now, we just always commit the release task to executor once. - // NOTE: DO NOT create a task containing release action and the action is not the last action - sealed := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node())) - views := distMgr.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(action.Node())) - growing := lo.FlatMap(views, func(view *meta.LeaderView, _ int) []int64 { - return lo.Keys(view.GrowingSegments) - }) - segments := make([]int64, 0, len(sealed)+len(growing)) - for _, segment := range sealed { - segments = append(segments, segment.GetID()) - } - segments = append(segments, growing...) - if !funcutil.SliceContain(segments, action.SegmentID()) { - return true - } - return action.rpcReturned.Load() - } else if action.Type() == ActionTypeUpdate { - return action.rpcReturned.Load() - } - - return true + return action.rpcReturned.Load() } func (action *SegmentAction) String() string { diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index e37961c8bc0b4..ccdfc029ad753 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -48,7 +48,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error { func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView, scope int32) error { log := log.Ctx(context.TODO()). WithRateGroup("utils.CheckLeaderAvailable", 1, 60). - With(zap.Int64("leaderID", leader.ID)) + With(zap.Int64("leaderID", leader.ID), zap.Int64("collectionID", leader.CollectionID)) info := nodeMgr.Get(leader.ID) // Check whether leader is online