diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 2fd524fddde91..f2b8c05df5cfe 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -849,6 +849,10 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI zap.Int64s("partitionIDs", partitionIDs), ) log.Info("get recovery info request received") + start := time.Now() + defer func() { + log.Info("datacoord GetRecoveryInfoV2 done", zap.Duration("dur", time.Since(start))) + }() resp := &datapb.GetRecoveryInfoResponseV2{ Status: merr.Success(), } diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index 085bc901e89db..eaf01b0cd4596 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -606,6 +606,7 @@ func (m *CollectionManager) RemoveCollection(collectionID typeutil.UniqueID) err delete(m.partitions, partition) } delete(m.collectionPartitions, collectionID) + log.Info("collection removed", zap.Int64("collectionID", collectionID)) } metrics.CleanQueryCoordMetricsWithCollectionID(collectionID) return nil diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 6f05d4c96e004..f777b93c7a3c0 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -21,6 +21,7 @@ import ( "fmt" "runtime" "sync" + "time" "github.com/samber/lo" "go.uber.org/zap" @@ -136,6 +137,7 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool // WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update, // which may make the current target not available func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { + start := time.Now() var vChannelInfos []*datapb.VchannelInfo var segmentInfos []*datapb.SegmentInfo err := retry.Handle(context.TODO(), func() (bool, error) { @@ -197,7 +199,8 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error { zap.Int64("collectionID", collectionID), zap.Int64s("PartitionIDs", partitionIDs), zap.Int64s("segments", allocatedTarget.GetAllSegmentIDs()), - zap.Strings("channels", allocatedTarget.GetAllDmChannelNames())) + zap.Strings("channels", allocatedTarget.GetAllDmChannelNames()), + zap.Duration("dur", time.Since(start))) return nil } diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 843e3a4d55065..de5efc4345575 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -308,7 +308,7 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa return } - log.RatedInfo(10, "partition targets", + log.Info("partition targets", zap.Int("segmentTargetNum", len(segmentTargets)), zap.Int("channelTargetNum", len(channelTargets)), zap.Int("totalTargetNum", targetNum), @@ -343,6 +343,7 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa } ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount + if loadPercentage == 100 { if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) { log.Warn("failed to manual check current target, skip update load status") diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 8e69a329930af..d56fcb4d2e195 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -165,7 +165,9 @@ func (ob *TargetObserver) schedule(ctx context.Context) { case <-ticker.C: ob.clean() - ob.dispatcher.AddTask(ob.meta.GetAll()...) + all := ob.meta.GetAll() + log.Info("TargetObserver ticker all", zap.Int("len(all)", len(all)), zap.Any("all", all)) + ob.dispatcher.AddTask(all...) case req := <-ob.updateChan: log.Info("manually trigger update target", @@ -214,6 +216,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) { // If not, submit an async task into dispatcher. func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool { result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID) + log.Info("check aaaa", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID), zap.Bool("result", result)) if !result { ob.dispatcher.AddTask(collectionID) } @@ -326,7 +329,12 @@ func (ob *TargetObserver) updateNextTarget(collectionID int64) error { log := log.Ctx(context.TODO()).WithRateGroup("qcv2.TargetObserver", 1, 60). With(zap.Int64("collectionID", collectionID)) - log.RatedInfo(10, "observer trigger update next target") + log.Info("observer updateNextTarget start") + start := time.Now() + defer func() { + log.Info("observer updateNextTarget done", zap.Duration("dur", time.Since(start))) + }() + err := ob.targetMgr.UpdateCollectionNextTarget(collectionID) if err != nil { log.Warn("failed to update next target for collection", @@ -341,7 +349,7 @@ func (ob *TargetObserver) updateNextTargetTimestamp(collectionID int64) { ob.nextTargetLastUpdate.Insert(collectionID, time.Now()) } -func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collectionID int64) bool { +func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collectionID int64) (ok bool) { replicaNum := ob.meta.CollectionManager.GetReplicaNumber(collectionID) log := log.Ctx(ctx).WithRateGroup( fmt.Sprintf("qcv2.TargetObserver-%d", collectionID), @@ -351,12 +359,16 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect zap.Int64("collectionID", collectionID), zap.Int32("replicaNum", replicaNum), ) - + start := time.Now() + log.Info("shouldUpdateCurrentTarget") + defer func() { + log.Info("shouldUpdateCurrentTarget done", zap.Duration("dur", time.Since(start))) + }() // check channel first channelNames := ob.targetMgr.GetDmChannelsByCollection(collectionID, meta.NextTarget) if len(channelNames) == 0 { // next target is empty, no need to update - log.RatedInfo(10, "next target is empty, no need to update") + log.Info("next target is empty, no need to update") return false } @@ -365,7 +377,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect nodes := lo.Map(views, func(v *meta.LeaderView, _ int) int64 { return v.ID }) group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, collectionID, nodes) if int32(len(group)) < replicaNum { - log.RatedInfo(10, "channel not ready", + log.Info("channel not ready", zap.Int("readyReplicaNum", len(group)), zap.String("channelName", channel.GetChannelName()), ) @@ -380,7 +392,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect nodes := lo.Map(views, func(view *meta.LeaderView, _ int) int64 { return view.ID }) group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, collectionID, nodes) if int32(len(group)) < replicaNum { - log.RatedInfo(10, "segment not ready", + log.Info("segment not ready", zap.Int("readyReplicaNum", len(group)), zap.Int64("segmentID", segment.GetID()), ) @@ -396,7 +408,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect actions = actions[:0] leaderView := ob.distMgr.LeaderViewManager.GetLeaderShardView(leaderID, ch) if leaderView == nil { - log.RatedInfo(10, "leader view not ready", + log.Info("leader view not ready", zap.Int64("nodeID", leaderID), zap.String("channel", ch), ) @@ -406,6 +418,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect if updateVersionAction != nil { actions = append(actions, updateVersionAction) } + log.Info("shouldUpdateCurrentTarget begin to sync", zap.Any("actions", actions)) if !ob.sync(ctx, replica, leaderView, actions) { return false } @@ -415,23 +428,31 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect return true } -func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) bool { - if len(diffs) == 0 { - return true - } - replicaID := replica.GetID() - +func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) (ok bool) { log := log.With( zap.Int64("leaderID", leaderView.ID), zap.Int64("collectionID", leaderView.CollectionID), zap.String("channel", leaderView.Channel), ) + log.Info("TargetObserver begin to sync", zap.Any("actions", diffs)) + start := time.Now() + defer func() { + log.Info("TargetObserver sync done", zap.Any("actions", diffs), zap.Duration("dur", time.Since(start))) + }() + + if len(diffs) == 0 { + return true + } + replicaID := replica.GetID() + + t1 := time.Now() collectionInfo, err := ob.broker.DescribeCollection(ctx, leaderView.CollectionID) if err != nil { log.Warn("failed to get collection info", zap.Error(err)) return false } + log.Info("TargetObserver sync DescribeCollection done", zap.Any("dur", time.Since(t1))) partitions, err := utils.GetPartitions(ob.meta.CollectionManager, leaderView.CollectionID) if err != nil { log.Warn("failed to get partitions", zap.Error(err)) @@ -439,11 +460,13 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade } // Get collection index info + t2 := time.Now() indexInfo, err := ob.broker.ListIndexes(ctx, collectionInfo.GetCollectionID()) if err != nil { log.Warn("fail to get index info of collection", zap.Error(err)) return false } + log.Info("TargetObserver sync ListIndexes done", zap.Any("dur", time.Since(t2))) req := &querypb.SyncDistributionRequest{ Base: commonpbutil.NewMsgBase( @@ -464,6 +487,7 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade Version: time.Now().UnixNano(), IndexInfoList: indexInfo, } + t3 := time.Now() ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond)) defer cancel() resp, err := ob.cluster.SyncDistribution(ctx, leaderView.ID, req) @@ -471,6 +495,7 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade log.Warn("failed to sync distribution", zap.Error(err)) return false } + log.Info("TargetObserver sync SyncDistribution done", zap.Any("dur", time.Since(t3))) if resp.ErrorCode != commonpb.ErrorCode_Success { log.Warn("failed to sync distribution", zap.String("reason", resp.GetReason())) @@ -503,11 +528,18 @@ func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, lead log.Ctx(ctx).WithRateGroup("qcv2.LeaderObserver", 1, 60) targetVersion := ob.targetMgr.GetCollectionTargetVersion(leaderView.CollectionID, meta.NextTarget) + log.Info("checkNeedUpdateTargetVersion", + zap.Int64("collectionID", leaderView.CollectionID), + zap.String("channelName", leaderView.Channel), + zap.Int64("nodeID", leaderView.ID), + zap.Int64("oldVersion", leaderView.TargetVersion), + zap.Int64("newVersion", targetVersion)) + if targetVersion <= leaderView.TargetVersion { return nil } - log.RatedInfo(10, "Update readable segment version", + log.Info("Update readable segment version", zap.Int64("collectionID", leaderView.CollectionID), zap.String("channelName", leaderView.Channel), zap.Int64("nodeID", leaderView.ID), @@ -537,7 +569,11 @@ func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, lead func (ob *TargetObserver) updateCurrentTarget(collectionID int64) { log := log.Ctx(context.TODO()).WithRateGroup("qcv2.TargetObserver", 1, 60) - log.RatedInfo(10, "observer trigger update current target", zap.Int64("collectionID", collectionID)) + log.Info("observer updateCurrentTarget start", zap.Int64("collectionID", collectionID)) + start := time.Now() + defer func() { + log.Info("observer updateCurrentTarget done", zap.Int64("collectionID", collectionID), zap.Duration("dur", time.Since(start))) + }() if ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) { ob.mut.Lock() defer ob.mut.Unlock() diff --git a/internal/querycoordv2/observers/task_dispatcher.go b/internal/querycoordv2/observers/task_dispatcher.go index 29ede76b1bb28..afbe6e0153645 100644 --- a/internal/querycoordv2/observers/task_dispatcher.go +++ b/internal/querycoordv2/observers/task_dispatcher.go @@ -18,7 +18,10 @@ package observers import ( "context" + "github.com/milvus-io/milvus/pkg/log" + "go.uber.org/zap" "sync" + "time" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -76,6 +79,7 @@ func (d *taskDispatcher[K]) AddTask(keys ...K) { _, loaded := d.tasks.GetOrInsert(key, false) added = added || !loaded } + log.Info("taskDispatcher add tasks", zap.Bool("added", added), zap.Any("keys", keys)) if added { d.notify() } @@ -94,12 +98,18 @@ func (d *taskDispatcher[K]) schedule(ctx context.Context) { case <-ctx.Done(): return case <-d.notifyCh: + log.Info("taskDispatcher dddddd", zap.Int("len", d.tasks.Len()), zap.Any("keys", d.tasks.Keys())) d.tasks.Range(func(k K, submitted bool) bool { + log.Info("taskDispatcher schedule", zap.Bool("submitted", submitted), zap.Any("k", k)) if !submitted { + queueStart := time.Now() d.tasks.Insert(k, true) d.pool.Submit(func() (any, error) { + start := time.Now() + log.Info("taskDispatcher begin to run", zap.Bool("submitted", submitted), zap.Any("k", k), zap.Duration("queueDur", time.Since(queueStart))) d.taskRunner(ctx, k) d.tasks.Remove(k) + log.Info("taskDispatcher task done", zap.Any("k", k), zap.Duration("dur", time.Since(start))) return struct{}{}, nil }) } diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 085154f6968f0..87142fba61fe1 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1215,7 +1215,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get return true }) - return &querypb.GetDataDistributionResponse{ + resp := &querypb.GetDataDistributionResponse{ Status: merr.Success(), NodeID: node.GetNodeID(), Segments: segmentVersionInfos, @@ -1223,14 +1223,23 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get LeaderViews: leaderViews, LastModifyTs: lastModifyTs, MemCapacityInMB: float64(hardware.GetMemoryCount() / 1024 / 1024), - }, nil + } + log.Info("GetDataDistribution done", zap.Any("resp", resp)) + return resp, nil } func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) { defer node.updateDistributionModifyTS() log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), - zap.String("channel", req.GetChannel()), zap.Int64("currentNodeID", node.GetNodeID())) + zap.String("channel", req.GetChannel()), zap.Int64("currentNodeID", node.GetNodeID()), zap.Any("req", req)) + + start := time.Now() + log.Info("querynode receive SyncDistribution") + defer func() { + log.Info("querynode SyncDistribution done", zap.Duration("dur", time.Since(start))) + }() + // check node healthy if err := node.lifetime.Add(merr.IsHealthy); err != nil { return merr.Status(err), nil diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index f3c19842bebb9..fa88751882a1e 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -19,9 +19,17 @@ package storage import ( "bytes" "context" + "fmt" "io" + "math/rand" + "sort" "strconv" "testing" + "time" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/log" + "go.uber.org/zap" "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" @@ -490,3 +498,75 @@ func readDeltaLog(size int, blob *Blob) error { } return nil } + +func BenchmarkSerializeWriter(b *testing.B) { + const ( + dim = 128 + numRows = 200000 + ) + + var ( + rId = &schemapb.FieldSchema{FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64} + ts = &schemapb.FieldSchema{FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64} + pk = &schemapb.FieldSchema{FieldID: 100, Name: "pk", IsPrimaryKey: true, DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "100"}}} + f = &schemapb.FieldSchema{FieldID: 101, Name: "random", DataType: schemapb.DataType_Double} + fVec = &schemapb.FieldSchema{FieldID: 102, Name: "vec", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: strconv.Itoa(dim)}}} + ) + schema := &schemapb.CollectionSchema{Name: "test-aaa", Fields: []*schemapb.FieldSchema{rId, ts, pk, f, fVec}} + + // prepare data values + start := time.Now() + vec := make([]float32, dim) + for j := 0; j < dim; j++ { + vec[j] = rand.Float32() + } + values := make([]*Value, numRows) + for i := 0; i < numRows; i++ { + value := &Value{} + value.Value = make(map[int64]interface{}, len(schema.GetFields())) + m := value.Value.(map[int64]interface{}) + for _, field := range schema.GetFields() { + switch field.GetDataType() { + case schemapb.DataType_Int64: + m[field.GetFieldID()] = int64(i) + case schemapb.DataType_VarChar: + k := fmt.Sprintf("test_pk_%d", i) + m[field.GetFieldID()] = k + value.PK = &VarCharPrimaryKey{ + Value: k, + } + case schemapb.DataType_Double: + m[field.GetFieldID()] = float64(i) + case schemapb.DataType_FloatVector: + m[field.GetFieldID()] = vec + } + } + value.ID = int64(i) + value.Timestamp = int64(0) + value.IsDeleted = false + value.Value = m + values[i] = value + } + sort.Slice(values, func(i, j int) bool { + return values[i].PK.LT(values[j].PK) + }) + log.Info("prepare data done", zap.Int("len", len(values)), zap.Duration("dur", time.Since(start))) + + b.ResetTimer() + + sizes := []int{100, 1000, 10000, 100000} + for _, s := range sizes { + b.Run(fmt.Sprintf("batch size=%d", s), func(b *testing.B) { + for i := 0; i < b.N; i++ { + writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields) + writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, s) + assert.NoError(b, err) + for _, v := range values { + _ = writer.Write(v) + assert.NoError(b, err) + } + writer.Close() + } + }) + } +}