Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Test ci #37422

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,10 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
zap.Int64s("partitionIDs", partitionIDs),
)
log.Info("get recovery info request received")
start := time.Now()
defer func() {
log.Info("datacoord GetRecoveryInfoV2 done", zap.Duration("dur", time.Since(start)))
}()
resp := &datapb.GetRecoveryInfoResponseV2{
Status: merr.Success(),
}
Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/meta/collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ func (m *CollectionManager) RemoveCollection(collectionID typeutil.UniqueID) err
delete(m.partitions, partition)
}
delete(m.collectionPartitions, collectionID)
log.Info("collection removed", zap.Int64("collectionID", collectionID))
}
metrics.CleanQueryCoordMetricsWithCollectionID(collectionID)
return nil
Expand Down
5 changes: 4 additions & 1 deletion internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"runtime"
"sync"
"time"

"github.com/samber/lo"
"go.uber.org/zap"
Expand Down Expand Up @@ -136,6 +137,7 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool
// WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update,
// which may make the current target not available
func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
start := time.Now()
var vChannelInfos []*datapb.VchannelInfo
var segmentInfos []*datapb.SegmentInfo
err := retry.Handle(context.TODO(), func() (bool, error) {
Expand Down Expand Up @@ -197,7 +199,8 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
zap.Int64("collectionID", collectionID),
zap.Int64s("PartitionIDs", partitionIDs),
zap.Int64s("segments", allocatedTarget.GetAllSegmentIDs()),
zap.Strings("channels", allocatedTarget.GetAllDmChannelNames()))
zap.Strings("channels", allocatedTarget.GetAllDmChannelNames()),
zap.Duration("dur", time.Since(start)))

return nil
}
Expand Down
3 changes: 2 additions & 1 deletion internal/querycoordv2/observers/collection_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
return
}

log.RatedInfo(10, "partition targets",
log.Info("partition targets",
zap.Int("segmentTargetNum", len(segmentTargets)),
zap.Int("channelTargetNum", len(channelTargets)),
zap.Int("totalTargetNum", targetNum),
Expand Down Expand Up @@ -343,6 +343,7 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
}

ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount

if loadPercentage == 100 {
if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) {
log.Warn("failed to manual check current target, skip update load status")
Expand Down
68 changes: 52 additions & 16 deletions internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ func (ob *TargetObserver) schedule(ctx context.Context) {

case <-ticker.C:
ob.clean()
ob.dispatcher.AddTask(ob.meta.GetAll()...)
all := ob.meta.GetAll()
log.Info("TargetObserver ticker all", zap.Int("len(all)", len(all)), zap.Any("all", all))
ob.dispatcher.AddTask(all...)

case req := <-ob.updateChan:
log.Info("manually trigger update target",
Expand Down Expand Up @@ -214,6 +216,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
// If not, submit an async task into dispatcher.
func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool {
result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID)
log.Info("check aaaa", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID), zap.Bool("result", result))
if !result {
ob.dispatcher.AddTask(collectionID)
}
Expand Down Expand Up @@ -326,7 +329,12 @@ func (ob *TargetObserver) updateNextTarget(collectionID int64) error {
log := log.Ctx(context.TODO()).WithRateGroup("qcv2.TargetObserver", 1, 60).
With(zap.Int64("collectionID", collectionID))

log.RatedInfo(10, "observer trigger update next target")
log.Info("observer updateNextTarget start")
start := time.Now()
defer func() {
log.Info("observer updateNextTarget done", zap.Duration("dur", time.Since(start)))
}()

err := ob.targetMgr.UpdateCollectionNextTarget(collectionID)
if err != nil {
log.Warn("failed to update next target for collection",
Expand All @@ -341,7 +349,7 @@ func (ob *TargetObserver) updateNextTargetTimestamp(collectionID int64) {
ob.nextTargetLastUpdate.Insert(collectionID, time.Now())
}

func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collectionID int64) bool {
func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collectionID int64) (ok bool) {
replicaNum := ob.meta.CollectionManager.GetReplicaNumber(collectionID)
log := log.Ctx(ctx).WithRateGroup(
fmt.Sprintf("qcv2.TargetObserver-%d", collectionID),
Expand All @@ -351,12 +359,16 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
zap.Int64("collectionID", collectionID),
zap.Int32("replicaNum", replicaNum),
)

start := time.Now()
log.Info("shouldUpdateCurrentTarget")
defer func() {
log.Info("shouldUpdateCurrentTarget done", zap.Duration("dur", time.Since(start)))
}()
// check channel first
channelNames := ob.targetMgr.GetDmChannelsByCollection(collectionID, meta.NextTarget)
if len(channelNames) == 0 {
// next target is empty, no need to update
log.RatedInfo(10, "next target is empty, no need to update")
log.Info("next target is empty, no need to update")
return false
}

Expand All @@ -365,7 +377,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
nodes := lo.Map(views, func(v *meta.LeaderView, _ int) int64 { return v.ID })
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, collectionID, nodes)
if int32(len(group)) < replicaNum {
log.RatedInfo(10, "channel not ready",
log.Info("channel not ready",
zap.Int("readyReplicaNum", len(group)),
zap.String("channelName", channel.GetChannelName()),
)
Expand All @@ -380,7 +392,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
nodes := lo.Map(views, func(view *meta.LeaderView, _ int) int64 { return view.ID })
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, collectionID, nodes)
if int32(len(group)) < replicaNum {
log.RatedInfo(10, "segment not ready",
log.Info("segment not ready",
zap.Int("readyReplicaNum", len(group)),
zap.Int64("segmentID", segment.GetID()),
)
Expand All @@ -396,7 +408,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
actions = actions[:0]
leaderView := ob.distMgr.LeaderViewManager.GetLeaderShardView(leaderID, ch)
if leaderView == nil {
log.RatedInfo(10, "leader view not ready",
log.Info("leader view not ready",
zap.Int64("nodeID", leaderID),
zap.String("channel", ch),
)
Expand All @@ -406,6 +418,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
if updateVersionAction != nil {
actions = append(actions, updateVersionAction)
}
log.Info("shouldUpdateCurrentTarget begin to sync", zap.Any("actions", actions))
if !ob.sync(ctx, replica, leaderView, actions) {
return false
}
Expand All @@ -415,35 +428,45 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
return true
}

func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) bool {
if len(diffs) == 0 {
return true
}
replicaID := replica.GetID()

func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) (ok bool) {
log := log.With(
zap.Int64("leaderID", leaderView.ID),
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channel", leaderView.Channel),
)

log.Info("TargetObserver begin to sync", zap.Any("actions", diffs))
start := time.Now()
defer func() {
log.Info("TargetObserver sync done", zap.Any("actions", diffs), zap.Duration("dur", time.Since(start)))
}()

if len(diffs) == 0 {
return true
}
replicaID := replica.GetID()

t1 := time.Now()
collectionInfo, err := ob.broker.DescribeCollection(ctx, leaderView.CollectionID)
if err != nil {
log.Warn("failed to get collection info", zap.Error(err))
return false
}
log.Info("TargetObserver sync DescribeCollection done", zap.Any("dur", time.Since(t1)))
partitions, err := utils.GetPartitions(ob.meta.CollectionManager, leaderView.CollectionID)
if err != nil {
log.Warn("failed to get partitions", zap.Error(err))
return false
}

// Get collection index info
t2 := time.Now()
indexInfo, err := ob.broker.ListIndexes(ctx, collectionInfo.GetCollectionID())
if err != nil {
log.Warn("fail to get index info of collection", zap.Error(err))
return false
}
log.Info("TargetObserver sync ListIndexes done", zap.Any("dur", time.Since(t2)))

req := &querypb.SyncDistributionRequest{
Base: commonpbutil.NewMsgBase(
Expand All @@ -464,13 +487,15 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade
Version: time.Now().UnixNano(),
IndexInfoList: indexInfo,
}
t3 := time.Now()
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond))
defer cancel()
resp, err := ob.cluster.SyncDistribution(ctx, leaderView.ID, req)
if err != nil {
log.Warn("failed to sync distribution", zap.Error(err))
return false
}
log.Info("TargetObserver sync SyncDistribution done", zap.Any("dur", time.Since(t3)))

if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("failed to sync distribution", zap.String("reason", resp.GetReason()))
Expand Down Expand Up @@ -503,11 +528,18 @@ func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, lead
log.Ctx(ctx).WithRateGroup("qcv2.LeaderObserver", 1, 60)
targetVersion := ob.targetMgr.GetCollectionTargetVersion(leaderView.CollectionID, meta.NextTarget)

log.Info("checkNeedUpdateTargetVersion",
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channelName", leaderView.Channel),
zap.Int64("nodeID", leaderView.ID),
zap.Int64("oldVersion", leaderView.TargetVersion),
zap.Int64("newVersion", targetVersion))

if targetVersion <= leaderView.TargetVersion {
return nil
}

log.RatedInfo(10, "Update readable segment version",
log.Info("Update readable segment version",
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channelName", leaderView.Channel),
zap.Int64("nodeID", leaderView.ID),
Expand Down Expand Up @@ -537,7 +569,11 @@ func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, lead

func (ob *TargetObserver) updateCurrentTarget(collectionID int64) {
log := log.Ctx(context.TODO()).WithRateGroup("qcv2.TargetObserver", 1, 60)
log.RatedInfo(10, "observer trigger update current target", zap.Int64("collectionID", collectionID))
log.Info("observer updateCurrentTarget start", zap.Int64("collectionID", collectionID))
start := time.Now()
defer func() {
log.Info("observer updateCurrentTarget done", zap.Int64("collectionID", collectionID), zap.Duration("dur", time.Since(start)))
}()
if ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) {
ob.mut.Lock()
defer ob.mut.Unlock()
Expand Down
10 changes: 10 additions & 0 deletions internal/querycoordv2/observers/task_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package observers

import (
"context"
"github.com/milvus-io/milvus/pkg/log"
"go.uber.org/zap"
"sync"
"time"

"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -76,6 +79,7 @@ func (d *taskDispatcher[K]) AddTask(keys ...K) {
_, loaded := d.tasks.GetOrInsert(key, false)
added = added || !loaded
}
log.Info("taskDispatcher add tasks", zap.Bool("added", added), zap.Any("keys", keys))
if added {
d.notify()
}
Expand All @@ -94,12 +98,18 @@ func (d *taskDispatcher[K]) schedule(ctx context.Context) {
case <-ctx.Done():
return
case <-d.notifyCh:
log.Info("taskDispatcher dddddd", zap.Int("len", d.tasks.Len()), zap.Any("keys", d.tasks.Keys()))
d.tasks.Range(func(k K, submitted bool) bool {
log.Info("taskDispatcher schedule", zap.Bool("submitted", submitted), zap.Any("k", k))
if !submitted {
queueStart := time.Now()
d.tasks.Insert(k, true)
d.pool.Submit(func() (any, error) {
start := time.Now()
log.Info("taskDispatcher begin to run", zap.Bool("submitted", submitted), zap.Any("k", k), zap.Duration("queueDur", time.Since(queueStart)))
d.taskRunner(ctx, k)
d.tasks.Remove(k)
log.Info("taskDispatcher task done", zap.Any("k", k), zap.Duration("dur", time.Since(start)))
return struct{}{}, nil
})
}
Expand Down
15 changes: 12 additions & 3 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,22 +1215,31 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
return true
})

return &querypb.GetDataDistributionResponse{
resp := &querypb.GetDataDistributionResponse{
Status: merr.Success(),
NodeID: node.GetNodeID(),
Segments: segmentVersionInfos,
Channels: channelVersionInfos,
LeaderViews: leaderViews,
LastModifyTs: lastModifyTs,
MemCapacityInMB: float64(hardware.GetMemoryCount() / 1024 / 1024),
}, nil
}
log.Info("GetDataDistribution done", zap.Any("resp", resp))
return resp, nil
}

func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) {
defer node.updateDistributionModifyTS()

log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()),
zap.String("channel", req.GetChannel()), zap.Int64("currentNodeID", node.GetNodeID()))
zap.String("channel", req.GetChannel()), zap.Int64("currentNodeID", node.GetNodeID()), zap.Any("req", req))

start := time.Now()
log.Info("querynode receive SyncDistribution")
defer func() {
log.Info("querynode SyncDistribution done", zap.Duration("dur", time.Since(start)))
}()

// check node healthy
if err := node.lifetime.Add(merr.IsHealthy); err != nil {
return merr.Status(err), nil
Expand Down
Loading
Loading