Skip to content

Commit

Permalink
fix: [10kcp] Fix slow preprocess in qc scheduler (#38784)
Browse files Browse the repository at this point in the history
supplement to pr: #38566

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Dec 26, 2024
1 parent 7f54675 commit 05f50b1
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 35 deletions.
8 changes: 4 additions & 4 deletions internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (ob *TargetObserver) updateNextTarget(collectionID int64) error {
log := log.Ctx(context.TODO()).WithRateGroup("qcv2.TargetObserver", 1, 60).
With(zap.Int64("collectionID", collectionID))

log.RatedInfo(10, "observer trigger update next target")
log.Info("observer trigger update next target")
err := ob.targetMgr.UpdateCollectionNextTarget(collectionID)
if err != nil {
log.Warn("failed to update next target for collection",
Expand Down Expand Up @@ -422,7 +422,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
channelNames := ob.targetMgr.GetDmChannelsByCollection(collectionID, meta.NextTarget)
if len(channelNames) == 0 {
// next target is empty, no need to update
log.RatedInfo(10, "next target is empty, no need to update")
log.Info("next target is empty, no need to update")
return false
}

Expand All @@ -434,7 +434,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect

// to avoid stuck here in dynamic increase replica case, we just check available delegator number
if int32(len(channelReadyLeaders)) < replicaNum {
log.RatedInfo(10, "channel not ready",
log.Info("channel not ready",
zap.Int("readyReplicaNum", len(channelReadyLeaders)),
zap.String("channelName", channel),
)
Expand Down Expand Up @@ -573,7 +573,7 @@ func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, lead

func (ob *TargetObserver) updateCurrentTarget(collectionID int64) {
log := log.Ctx(context.TODO()).WithRateGroup("qcv2.TargetObserver", 1, 60)
log.RatedInfo(10, "observer trigger update current target", zap.Int64("collectionID", collectionID))
log.Info("observer trigger update current target", zap.Int64("collectionID", collectionID))
if ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) {
ob.mut.Lock()
defer ob.mut.Unlock()
Expand Down
31 changes: 1 addition & 30 deletions internal/querycoordv2/task/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -116,35 +115,7 @@ func (action *SegmentAction) Scope() querypb.DataScope {
}

func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool {
if action.Type() == ActionTypeGrow {
// rpc finished
return action.rpcReturned.Load()
} else if action.Type() == ActionTypeReduce {
// FIXME: Now shard leader's segment view is a map of segment ID to node ID,
// loading segment replaces the node ID with the new one,
// which confuses the condition of finishing,
// the leader should return a map of segment ID to list of nodes,
// now, we just always commit the release task to executor once.
// NOTE: DO NOT create a task containing release action and the action is not the last action
sealed := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()))
views := distMgr.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(action.Node()))
growing := lo.FlatMap(views, func(view *meta.LeaderView, _ int) []int64 {
return lo.Keys(view.GrowingSegments)
})
segments := make([]int64, 0, len(sealed)+len(growing))
for _, segment := range sealed {
segments = append(segments, segment.GetID())
}
segments = append(segments, growing...)
if !funcutil.SliceContain(segments, action.SegmentID()) {
return true
}
return action.rpcReturned.Load()
} else if action.Type() == ActionTypeUpdate {
return action.rpcReturned.Load()
}

return true
return action.rpcReturned.Load()
}

func (action *SegmentAction) String() string {
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error {
func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView, scope int32) error {
log := log.Ctx(context.TODO()).
WithRateGroup("utils.CheckLeaderAvailable", 1, 60).
With(zap.Int64("leaderID", leader.ID))
With(zap.Int64("leaderID", leader.ID), zap.Int64("collectionID", leader.CollectionID))
info := nodeMgr.Get(leader.ID)

// Check whether leader is online
Expand Down

0 comments on commit 05f50b1

Please sign in to comment.