diff --git a/internal/proxy/mock_test.go b/internal/proxy/mock_test.go index 836ad42cf40c0..3dc101ced7797 100644 --- a/internal/proxy/mock_test.go +++ b/internal/proxy/mock_test.go @@ -96,6 +96,7 @@ func newMockIDAllocatorInterface() allocator.Interface { } type mockTask struct { + baseTask *TaskCondition id UniqueID name string diff --git a/internal/proxy/task.go b/internal/proxy/task.go index b7c034b3f2db1..96dd1bd0389da 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math" + "time" "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" @@ -107,6 +108,20 @@ type task interface { PostExecute(ctx context.Context) error WaitToFinish() error Notify(err error) + SetOnEnqueueTime() + GetDurationInQueue() time.Duration +} + +type baseTask struct { + onEnqueueTime time.Time +} + +func (bt *baseTask) SetOnEnqueueTime() { + bt.onEnqueueTime = time.Now() +} + +func (bt *baseTask) GetDurationInQueue() time.Duration { + return time.Since(bt.onEnqueueTime) } type dmlTask interface { @@ -362,12 +377,12 @@ func (t *dropCollectionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DropCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *dropCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DropCollection - t.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(t.CollectionName); err != nil { return err @@ -427,12 +442,12 @@ func (t *hasCollectionTask) SetTs(ts Timestamp) { func (t *hasCollectionTask) OnEnqueue() error { t.Base = commonpbutil.NewMsgBase() + t.Base.MsgType = commonpb.MsgType_HasCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *hasCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_HasCollection - t.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(t.CollectionName); err != nil { return err @@ -501,12 +516,12 @@ func (t *describeCollectionTask) SetTs(ts Timestamp) { func (t *describeCollectionTask) OnEnqueue() error { t.Base = commonpbutil.NewMsgBase() + t.Base.MsgType = commonpb.MsgType_DescribeCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *describeCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DescribeCollection - t.Base.SourceID = paramtable.GetNodeID() if t.CollectionID != 0 && len(t.CollectionName) == 0 { return nil @@ -636,12 +651,12 @@ func (t *showCollectionsTask) SetTs(ts Timestamp) { func (t *showCollectionsTask) OnEnqueue() error { t.Base = commonpbutil.NewMsgBase() + t.Base.MsgType = commonpb.MsgType_ShowCollections + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *showCollectionsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ShowCollections - t.Base.SourceID = paramtable.GetNodeID() if t.GetType() == milvuspb.ShowType_InMemory { for _, collectionName := range t.CollectionNames { if err := validateCollectionName(collectionName); err != nil { @@ -796,12 +811,12 @@ func (t *alterCollectionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_AlterCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *alterCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_AlterCollection - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -860,12 +875,12 @@ func (t *createPartitionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_CreatePartition + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *createPartitionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_CreatePartition - t.Base.SourceID = paramtable.GetNodeID() collName, partitionTag := t.CollectionName, t.PartitionName @@ -948,12 +963,12 @@ func (t *dropPartitionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DropPartition + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *dropPartitionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DropPartition - t.Base.SourceID = paramtable.GetNodeID() collName, partitionTag := t.CollectionName, t.PartitionName @@ -1059,12 +1074,12 @@ func (t *hasPartitionTask) SetTs(ts Timestamp) { func (t *hasPartitionTask) OnEnqueue() error { t.Base = commonpbutil.NewMsgBase() + t.Base.MsgType = commonpb.MsgType_HasPartition + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *hasPartitionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_HasPartition - t.Base.SourceID = paramtable.GetNodeID() collName, partitionTag := t.CollectionName, t.PartitionName @@ -1136,12 +1151,12 @@ func (t *showPartitionsTask) SetTs(ts Timestamp) { func (t *showPartitionsTask) OnEnqueue() error { t.Base = commonpbutil.NewMsgBase() + t.Base.MsgType = commonpb.MsgType_ShowPartitions + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *showPartitionsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ShowPartitions - t.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(t.CollectionName); err != nil { return err @@ -1301,12 +1316,12 @@ func (t *flushTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_Flush + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *flushTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_Flush - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -1407,14 +1422,14 @@ func (t *loadCollectionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_LoadCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *loadCollectionTask) PreExecute(ctx context.Context) error { log.Ctx(ctx).Debug("loadCollectionTask PreExecute", zap.String("role", typeutil.ProxyRole)) - t.Base.MsgType = commonpb.MsgType_LoadCollection - t.Base.SourceID = paramtable.GetNodeID() collName := t.CollectionName @@ -1558,12 +1573,12 @@ func (t *releaseCollectionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_ReleaseCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *releaseCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ReleaseCollection - t.Base.SourceID = paramtable.GetNodeID() collName := t.CollectionName @@ -1652,12 +1667,12 @@ func (t *loadPartitionsTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_LoadPartitions + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *loadPartitionsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_LoadPartitions - t.Base.SourceID = paramtable.GetNodeID() collName := t.CollectionName @@ -1801,12 +1816,12 @@ func (t *releasePartitionsTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_ReleasePartitions + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *releasePartitionsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ReleasePartitions - t.Base.SourceID = paramtable.GetNodeID() collName := t.CollectionName @@ -1905,12 +1920,12 @@ func (t *CreateResourceGroupTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_CreateResourceGroup + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *CreateResourceGroupTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_CreateResourceGroup - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -1969,12 +1984,12 @@ func (t *DropResourceGroupTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DropResourceGroup + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *DropResourceGroupTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DropResourceGroup - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2030,13 +2045,15 @@ func (t *DescribeResourceGroupTask) SetTs(ts Timestamp) { } func (t *DescribeResourceGroupTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_DescribeResourceGroup + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *DescribeResourceGroupTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DescribeResourceGroup - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2154,12 +2171,12 @@ func (t *TransferNodeTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_TransferNode + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *TransferNodeTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_TransferNode - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2218,12 +2235,12 @@ func (t *TransferReplicaTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_TransferReplica + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *TransferReplicaTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_TransferReplica - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2288,13 +2305,15 @@ func (t *ListResourceGroupsTask) SetTs(ts Timestamp) { } func (t *ListResourceGroupsTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_ListResourceGroups + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *ListResourceGroupsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ListResourceGroups - t.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_alias.go b/internal/proxy/task_alias.go index 863f91e4d2251..d60ded35c0fac 100644 --- a/internal/proxy/task_alias.go +++ b/internal/proxy/task_alias.go @@ -80,13 +80,13 @@ func (t *CreateAliasTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_CreateAlias + t.Base.SourceID = paramtable.GetNodeID() return nil } // PreExecute defines the tion before task execution func (t *CreateAliasTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_CreateAlias - t.Base.SourceID = paramtable.GetNodeID() collAlias := t.Alias // collection alias uses the same format as collection name @@ -162,12 +162,12 @@ func (t *DropAliasTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DropAlias + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *DropAliasTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DropAlias - t.Base.SourceID = paramtable.GetNodeID() collAlias := t.Alias if err := ValidateCollectionAlias(collAlias); err != nil { return err @@ -230,12 +230,12 @@ func (t *AlterAliasTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_AlterAlias + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *AlterAliasTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_AlterAlias - t.Base.SourceID = paramtable.GetNodeID() collAlias := t.Alias // collection alias uses the same format as collection name @@ -305,12 +305,12 @@ func (a *DescribeAliasTask) SetTs(ts Timestamp) { func (a *DescribeAliasTask) OnEnqueue() error { a.Base = commonpbutil.NewMsgBase() + a.Base.MsgType = commonpb.MsgType_DescribeAlias + a.Base.SourceID = a.nodeID return nil } func (a *DescribeAliasTask) PreExecute(ctx context.Context) error { - a.Base.MsgType = commonpb.MsgType_DescribeAlias - a.Base.SourceID = a.nodeID // collection alias uses the same format as collection name if err := ValidateCollectionAlias(a.GetAlias()); err != nil { return err @@ -372,12 +372,12 @@ func (a *ListAliasesTask) SetTs(ts Timestamp) { func (a *ListAliasesTask) OnEnqueue() error { a.Base = commonpbutil.NewMsgBase() + a.Base.MsgType = commonpb.MsgType_ListAliases + a.Base.SourceID = a.nodeID return nil } func (a *ListAliasesTask) PreExecute(ctx context.Context) error { - a.Base.MsgType = commonpb.MsgType_ListAliases - a.Base.SourceID = a.nodeID if len(a.GetCollectionName()) > 0 { if err := validateCollectionName(a.GetCollectionName()); err != nil { diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index bcab2295857fb..bb7eb01e9bc3a 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -95,6 +95,11 @@ func (dt *deleteTask) SetTs(ts Timestamp) { } func (dt *deleteTask) OnEnqueue() error { + if dt.req.Base == nil { + dt.req.Base = commonpbutil.NewMsgBase() + } + dt.req.Base.MsgType = commonpb.MsgType_Delete + dt.req.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index 1e908ed8342a3..caa0a599ad9d0 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -106,6 +106,8 @@ func (cit *createIndexTask) OnEnqueue() error { if cit.req.Base == nil { cit.req.Base = commonpbutil.NewMsgBase() } + cit.req.Base.MsgType = commonpb.MsgType_CreateIndex + cit.req.Base.SourceID = paramtable.GetNodeID() return nil } @@ -359,8 +361,6 @@ func checkTrain(field *schemapb.FieldSchema, indexParams map[string]string) erro } func (cit *createIndexTask) PreExecute(ctx context.Context) error { - cit.req.Base.MsgType = commonpb.MsgType_CreateIndex - cit.req.Base.SourceID = paramtable.GetNodeID() collName := cit.req.GetCollectionName() @@ -465,12 +465,12 @@ func (dit *describeIndexTask) SetTs(ts Timestamp) { func (dit *describeIndexTask) OnEnqueue() error { dit.Base = commonpbutil.NewMsgBase() + dit.Base.MsgType = commonpb.MsgType_DescribeIndex + dit.Base.SourceID = paramtable.GetNodeID() return nil } func (dit *describeIndexTask) PreExecute(ctx context.Context) error { - dit.Base.MsgType = commonpb.MsgType_DescribeIndex - dit.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(dit.CollectionName); err != nil { return err @@ -589,12 +589,12 @@ func (dit *getIndexStatisticsTask) SetTs(ts Timestamp) { func (dit *getIndexStatisticsTask) OnEnqueue() error { dit.Base = commonpbutil.NewMsgBase() + dit.Base.MsgType = commonpb.MsgType_GetIndexStatistics + dit.Base.SourceID = paramtable.GetNodeID() return nil } func (dit *getIndexStatisticsTask) PreExecute(ctx context.Context) error { - dit.Base.MsgType = commonpb.MsgType_GetIndexStatistics - dit.Base.SourceID = dit.nodeID if err := validateCollectionName(dit.CollectionName); err != nil { return err @@ -709,12 +709,12 @@ func (dit *dropIndexTask) OnEnqueue() error { if dit.Base == nil { dit.Base = commonpbutil.NewMsgBase() } + dit.Base.MsgType = commonpb.MsgType_DropIndex + dit.Base.SourceID = paramtable.GetNodeID() return nil } func (dit *dropIndexTask) PreExecute(ctx context.Context) error { - dit.Base.MsgType = commonpb.MsgType_DropIndex - dit.Base.SourceID = paramtable.GetNodeID() collName, fieldName := dit.CollectionName, dit.FieldName @@ -825,12 +825,12 @@ func (gibpt *getIndexBuildProgressTask) SetTs(ts Timestamp) { func (gibpt *getIndexBuildProgressTask) OnEnqueue() error { gibpt.Base = commonpbutil.NewMsgBase() + gibpt.Base.MsgType = commonpb.MsgType_GetIndexBuildProgress + gibpt.Base.SourceID = paramtable.GetNodeID() return nil } func (gibpt *getIndexBuildProgressTask) PreExecute(ctx context.Context) error { - gibpt.Base.MsgType = commonpb.MsgType_GetIndexBuildProgress - gibpt.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(gibpt.CollectionName); err != nil { return err @@ -914,12 +914,12 @@ func (gist *getIndexStateTask) SetTs(ts Timestamp) { func (gist *getIndexStateTask) OnEnqueue() error { gist.Base = commonpbutil.NewMsgBase() + gist.Base.MsgType = commonpb.MsgType_GetIndexState + gist.Base.SourceID = paramtable.GetNodeID() return nil } func (gist *getIndexStateTask) PreExecute(ctx context.Context) error { - gist.Base.MsgType = commonpb.MsgType_GetIndexState - gist.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(gist.CollectionName); err != nil { return err diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 36dbe61174981..58678c03136e2 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -15,6 +15,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" @@ -90,6 +91,11 @@ func (it *insertTask) getChannels() []pChan { } func (it *insertTask) OnEnqueue() error { + if it.insertMsg.Base == nil { + it.insertMsg.Base = commonpbutil.NewMsgBase() + } + it.insertMsg.Base.MsgType = commonpb.MsgType_Insert + it.insertMsg.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 0cdc3bfb207d1..333459616ae3f 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -23,6 +23,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -42,6 +43,7 @@ const ( ) type queryTask struct { + baseTask Condition *internalpb.RetrieveRequest @@ -666,6 +668,10 @@ func (t *queryTask) SetTs(ts Timestamp) { } func (t *queryTask) OnEnqueue() error { + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } t.Base.MsgType = commonpb.MsgType_Retrieve + t.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index f28a3a7e81e38..70b65df22ed52 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -19,6 +19,7 @@ package proxy import ( "container/list" "context" + "strconv" "sync" "time" @@ -26,6 +27,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" @@ -179,6 +181,7 @@ func (queue *baseTaskQueue) Enqueue(t task) error { // we always use same msg id and ts for now. t.SetID(UniqueID(ts)) + t.SetOnEnqueueTime() return queue.addUnissuedTask(t) } @@ -440,6 +443,11 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) { }() span.AddEvent("scheduler process PreExecute") + waitDuration := t.GetDurationInQueue() + metrics.ProxyReqInQueueLatency. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), t.Type().String()). + Observe(float64(waitDuration.Milliseconds())) + err := t.PreExecute(ctx) defer func() { diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 47532f323d0a7..e6c8765d0a256 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -46,6 +46,7 @@ const ( ) type searchTask struct { + baseTask Condition *internalpb.SearchRequest ctx context.Context diff --git a/internal/proxy/task_statistic.go b/internal/proxy/task_statistic.go index 16a2bca039042..66efb2e6ff4a5 100644 --- a/internal/proxy/task_statistic.go +++ b/internal/proxy/task_statistic.go @@ -93,6 +93,9 @@ func (g *getStatisticsTask) OnEnqueue() error { g.GetStatisticsRequest = &internalpb.GetStatisticsRequest{ Base: commonpbutil.NewMsgBase(), } + + g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics + g.Base.SourceID = paramtable.GetNodeID() return nil } @@ -106,10 +109,6 @@ func (g *getStatisticsTask) PreExecute(ctx context.Context) error { ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetStatistics-PreExecute") defer sp.End() - // TODO: Maybe we should create a new MsgType: GetStatistics? - g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics - g.Base.SourceID = paramtable.GetNodeID() - collID, err := globalMetaCache.GetCollectionID(ctx, g.request.GetDbName(), g.collectionName) if err != nil { // err is not nil if collection not exists return err @@ -632,12 +631,12 @@ func (g *getCollectionStatisticsTask) SetTs(ts Timestamp) { func (g *getCollectionStatisticsTask) OnEnqueue() error { g.Base = commonpbutil.NewMsgBase() + g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics + g.Base.SourceID = paramtable.GetNodeID() return nil } func (g *getCollectionStatisticsTask) PreExecute(ctx context.Context) error { - g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics - g.Base.SourceID = paramtable.GetNodeID() return nil } @@ -717,12 +716,12 @@ func (g *getPartitionStatisticsTask) SetTs(ts Timestamp) { func (g *getPartitionStatisticsTask) OnEnqueue() error { g.Base = commonpbutil.NewMsgBase() + g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics + g.Base.SourceID = paramtable.GetNodeID() return nil } func (g *getPartitionStatisticsTask) PreExecute(ctx context.Context) error { - g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics - g.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 6f041faa66fba..7731d3d53ac5e 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -2549,6 +2549,7 @@ func TestCreateResourceGroupTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_CreateResourceGroup, task.Type()) @@ -2588,6 +2589,7 @@ func TestDropResourceGroupTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DropResourceGroup, task.Type()) @@ -2629,6 +2631,7 @@ func TestTransferNodeTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_TransferNode, task.Type()) @@ -2671,6 +2674,7 @@ func TestTransferReplicaTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_TransferReplica, task.Type()) @@ -2710,6 +2714,7 @@ func TestListResourceGroupsTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_ListResourceGroups, task.Type()) @@ -2762,6 +2767,7 @@ func TestDescribeResourceGroupTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DescribeResourceGroup, task.Type()) @@ -2808,6 +2814,7 @@ func TestDescribeResourceGroupTaskFailed(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DescribeResourceGroup, task.Type()) diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index e85ced6f810e3..8a2e84a24c37f 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -133,6 +133,11 @@ func (it *upsertTask) getChannels() []pChan { } func (it *upsertTask) OnEnqueue() error { + if it.req.Base == nil { + it.req.Base = commonpbutil.NewMsgBase() + } + it.req.Base.MsgType = commonpb.MsgType_Upsert + it.req.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index 66a8ee01d7d47..850110160e93d 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -322,6 +322,16 @@ var ( Name: "slow_query_count", Help: "count of slow query executed", }, []string{nodeIDLabelName, msgTypeLabelName}) + + // ProxyReqInQueueLatency records the latency that requests wait in the queue, like "CreateCollection". + ProxyReqInQueueLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "req_in_queue_latency", + Help: "latency which request waits in the queue", + Buckets: buckets, // unit: ms + }, []string{nodeIDLabelName, functionLabelName}) ) // RegisterProxy registers Proxy metrics @@ -370,6 +380,7 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxyRateLimitReqCount) registry.MustRegister(ProxySlowQueryCount) + registry.MustRegister(ProxyReqInQueueLatency) } func CleanupCollectionMetrics(nodeID int64, collection string) {