From a2ac84bd64bf7567188f754698e169cc609fb49d Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Tue, 23 Jul 2024 14:23:52 +0800 Subject: [PATCH] feat: record the duration waiting in the proxy queue (#34744) fix: https://github.com/milvus-io/milvus/issues/34743 --------- Signed-off-by: longjiquan --- internal/proxy/mock_test.go | 1 + internal/proxy/task.go | 123 +++++++++++++++++++------------ internal/proxy/task_alias.go | 20 ++--- internal/proxy/task_database.go | 8 +- internal/proxy/task_delete.go | 5 ++ internal/proxy/task_index.go | 28 +++---- internal/proxy/task_insert.go | 6 ++ internal/proxy/task_query.go | 6 ++ internal/proxy/task_scheduler.go | 8 ++ internal/proxy/task_search.go | 1 + internal/proxy/task_statistic.go | 15 ++-- internal/proxy/task_test.go | 13 ++++ internal/proxy/task_upsert.go | 5 ++ pkg/metrics/proxy_metrics.go | 11 +++ 14 files changed, 165 insertions(+), 85 deletions(-) diff --git a/internal/proxy/mock_test.go b/internal/proxy/mock_test.go index d7d69a90c1376..8132e5bb3a06d 100644 --- a/internal/proxy/mock_test.go +++ b/internal/proxy/mock_test.go @@ -96,6 +96,7 @@ func newMockIDAllocatorInterface() allocator.Interface { } type mockTask struct { + baseTask *TaskCondition id UniqueID name string diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 9883baebe9658..7b64ce7d8cf50 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math" + "time" "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" @@ -118,14 +119,26 @@ type task interface { WaitToFinish() error Notify(err error) CanSkipAllocTimestamp() bool + SetOnEnqueueTime() + GetDurationInQueue() time.Duration } -type baseTask struct{} +type baseTask struct { + onEnqueueTime time.Time +} func (bt *baseTask) CanSkipAllocTimestamp() bool { return false } +func (bt *baseTask) SetOnEnqueueTime() { + bt.onEnqueueTime = time.Now() +} + +func (bt *baseTask) GetDurationInQueue() time.Duration { + return time.Since(bt.onEnqueueTime) +} + type dmlTask interface { task setChannels() error @@ -440,12 +453,12 @@ func (t *dropCollectionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DropCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *dropCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DropCollection - t.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(t.CollectionName); err != nil { return err @@ -505,13 +518,15 @@ func (t *hasCollectionTask) SetTs(ts Timestamp) { } func (t *hasCollectionTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_HasCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *hasCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_HasCollection - t.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(t.CollectionName); err != nil { return err @@ -571,13 +586,15 @@ func (t *describeCollectionTask) SetTs(ts Timestamp) { } func (t *describeCollectionTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_DescribeCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *describeCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DescribeCollection - t.Base.SourceID = paramtable.GetNodeID() if t.CollectionID != 0 && len(t.CollectionName) == 0 { return nil @@ -712,12 +729,12 @@ func (t *showCollectionsTask) SetTs(ts Timestamp) { func (t *showCollectionsTask) OnEnqueue() error { t.Base = commonpbutil.NewMsgBase() + t.Base.MsgType = commonpb.MsgType_ShowCollections + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *showCollectionsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ShowCollections - t.Base.SourceID = paramtable.GetNodeID() if t.GetType() == milvuspb.ShowType_InMemory { for _, collectionName := range t.CollectionNames { if err := validateCollectionName(collectionName); err != nil { @@ -868,6 +885,8 @@ func (t *alterCollectionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_AlterCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -916,8 +935,6 @@ func validatePartitionKeyIsolation(colName string, isPartitionKeyEnabled bool, p } func (t *alterCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_AlterCollection - t.Base.SourceID = paramtable.GetNodeID() collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), t.CollectionName) if err != nil { @@ -1049,12 +1066,12 @@ func (t *createPartitionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_CreatePartition + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *createPartitionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_CreatePartition - t.Base.SourceID = paramtable.GetNodeID() collName, partitionTag := t.CollectionName, t.PartitionName @@ -1132,12 +1149,12 @@ func (t *dropPartitionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DropPartition + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *dropPartitionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DropPartition - t.Base.SourceID = paramtable.GetNodeID() collName, partitionTag := t.CollectionName, t.PartitionName @@ -1237,13 +1254,15 @@ func (t *hasPartitionTask) SetTs(ts Timestamp) { } func (t *hasPartitionTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_HasPartition + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *hasPartitionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_HasPartition - t.Base.SourceID = paramtable.GetNodeID() collName, partitionTag := t.CollectionName, t.PartitionName @@ -1309,13 +1328,15 @@ func (t *showPartitionsTask) SetTs(ts Timestamp) { } func (t *showPartitionsTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_ShowPartitions + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *showPartitionsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ShowPartitions - t.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(t.CollectionName); err != nil { return err @@ -1460,12 +1481,12 @@ func (t *flushTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_Flush + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *flushTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_Flush - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -1563,14 +1584,14 @@ func (t *loadCollectionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_LoadCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *loadCollectionTask) PreExecute(ctx context.Context) error { log.Ctx(ctx).Debug("loadCollectionTask PreExecute", zap.String("role", typeutil.ProxyRole)) - t.Base.MsgType = commonpb.MsgType_LoadCollection - t.Base.SourceID = paramtable.GetNodeID() collName := t.CollectionName @@ -1715,12 +1736,12 @@ func (t *releaseCollectionTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_ReleaseCollection + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *releaseCollectionTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ReleaseCollection - t.Base.SourceID = paramtable.GetNodeID() collName := t.CollectionName @@ -1809,12 +1830,12 @@ func (t *loadPartitionsTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_LoadPartitions + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *loadPartitionsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_LoadPartitions - t.Base.SourceID = paramtable.GetNodeID() collName := t.CollectionName @@ -1959,12 +1980,12 @@ func (t *releasePartitionsTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_ReleasePartitions + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *releasePartitionsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ReleasePartitions - t.Base.SourceID = paramtable.GetNodeID() collName := t.CollectionName @@ -2064,12 +2085,12 @@ func (t *CreateResourceGroupTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_CreateResourceGroup + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *CreateResourceGroupTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_CreateResourceGroup - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2129,12 +2150,12 @@ func (t *UpdateResourceGroupsTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_UpdateResourceGroups + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *UpdateResourceGroupsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_UpdateResourceGroups - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2197,12 +2218,12 @@ func (t *DropResourceGroupTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DropResourceGroup + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *DropResourceGroupTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DropResourceGroup - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2259,13 +2280,15 @@ func (t *DescribeResourceGroupTask) SetTs(ts Timestamp) { } func (t *DescribeResourceGroupTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_DescribeResourceGroup + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *DescribeResourceGroupTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DescribeResourceGroup - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2386,12 +2409,12 @@ func (t *TransferNodeTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_TransferNode + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *TransferNodeTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_TransferNode - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2451,12 +2474,12 @@ func (t *TransferReplicaTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_TransferReplica + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *TransferReplicaTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_TransferReplica - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -2522,13 +2545,15 @@ func (t *ListResourceGroupsTask) SetTs(ts Timestamp) { } func (t *ListResourceGroupsTask) OnEnqueue() error { - t.Base = commonpbutil.NewMsgBase() + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } + t.Base.MsgType = commonpb.MsgType_ListResourceGroups + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *ListResourceGroupsTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_ListResourceGroups - t.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_alias.go b/internal/proxy/task_alias.go index 21606394de663..5d46fbfe8c977 100644 --- a/internal/proxy/task_alias.go +++ b/internal/proxy/task_alias.go @@ -82,13 +82,13 @@ func (t *CreateAliasTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_CreateAlias + t.Base.SourceID = paramtable.GetNodeID() return nil } // PreExecute defines the tion before task execution func (t *CreateAliasTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_CreateAlias - t.Base.SourceID = paramtable.GetNodeID() collAlias := t.Alias // collection alias uses the same format as collection name @@ -165,12 +165,12 @@ func (t *DropAliasTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DropAlias + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *DropAliasTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_DropAlias - t.Base.SourceID = paramtable.GetNodeID() collAlias := t.Alias if err := ValidateCollectionAlias(collAlias); err != nil { return err @@ -234,12 +234,12 @@ func (t *AlterAliasTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_AlterAlias + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *AlterAliasTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_AlterAlias - t.Base.SourceID = paramtable.GetNodeID() collAlias := t.Alias // collection alias uses the same format as collection name @@ -310,12 +310,12 @@ func (a *DescribeAliasTask) SetTs(ts Timestamp) { func (a *DescribeAliasTask) OnEnqueue() error { a.Base = commonpbutil.NewMsgBase() + a.Base.MsgType = commonpb.MsgType_DescribeAlias + a.Base.SourceID = a.nodeID return nil } func (a *DescribeAliasTask) PreExecute(ctx context.Context) error { - a.Base.MsgType = commonpb.MsgType_DescribeAlias - a.Base.SourceID = a.nodeID // collection alias uses the same format as collection name if err := ValidateCollectionAlias(a.GetAlias()); err != nil { return err @@ -378,12 +378,12 @@ func (a *ListAliasesTask) SetTs(ts Timestamp) { func (a *ListAliasesTask) OnEnqueue() error { a.Base = commonpbutil.NewMsgBase() + a.Base.MsgType = commonpb.MsgType_ListAliases + a.Base.SourceID = a.nodeID return nil } func (a *ListAliasesTask) PreExecute(ctx context.Context) error { - a.Base.MsgType = commonpb.MsgType_ListAliases - a.Base.SourceID = a.nodeID if len(a.GetCollectionName()) > 0 { if err := validateCollectionName(a.GetCollectionName()); err != nil { diff --git a/internal/proxy/task_database.go b/internal/proxy/task_database.go index 06d7406050468..32c19117171da 100644 --- a/internal/proxy/task_database.go +++ b/internal/proxy/task_database.go @@ -266,12 +266,12 @@ func (t *alterDatabaseTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_AlterDatabase + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *alterDatabaseTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_AlterDatabase - t.Base.SourceID = paramtable.GetNodeID() return nil } @@ -345,12 +345,12 @@ func (t *describeDatabaseTask) OnEnqueue() error { if t.Base == nil { t.Base = commonpbutil.NewMsgBase() } + t.Base.MsgType = commonpb.MsgType_DescribeDatabase + t.Base.SourceID = paramtable.GetNodeID() return nil } func (t *describeDatabaseTask) PreExecute(ctx context.Context) error { - t.Base.MsgType = commonpb.MsgType_AlterCollection - t.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index 4da494b39d4c3..c082fb3081225 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -98,6 +98,11 @@ func (dt *deleteTask) SetTs(ts Timestamp) { } func (dt *deleteTask) OnEnqueue() error { + if dt.req.Base == nil { + dt.req.Base = commonpbutil.NewMsgBase() + } + dt.req.Base.MsgType = commonpb.MsgType_Delete + dt.req.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index 56f9e4fb68432..1b844cc2113fa 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -111,6 +111,8 @@ func (cit *createIndexTask) OnEnqueue() error { if cit.req.Base == nil { cit.req.Base = commonpbutil.NewMsgBase() } + cit.req.Base.MsgType = commonpb.MsgType_CreateIndex + cit.req.Base.SourceID = paramtable.GetNodeID() return nil } @@ -438,8 +440,6 @@ func checkTrain(field *schemapb.FieldSchema, indexParams map[string]string) erro } func (cit *createIndexTask) PreExecute(ctx context.Context) error { - cit.req.Base.MsgType = commonpb.MsgType_CreateIndex - cit.req.Base.SourceID = paramtable.GetNodeID() collName := cit.req.GetCollectionName() @@ -548,12 +548,12 @@ func (t *alterIndexTask) OnEnqueue() error { if t.req.Base == nil { t.req.Base = commonpbutil.NewMsgBase() } + t.req.Base.MsgType = commonpb.MsgType_AlterIndex + t.req.Base.SourceID = paramtable.GetNodeID() return nil } func (t *alterIndexTask) PreExecute(ctx context.Context) error { - t.req.Base.MsgType = commonpb.MsgType_AlterIndex - t.req.Base.SourceID = paramtable.GetNodeID() for _, param := range t.req.GetExtraParams() { if !indexparams.IsConfigableIndexParam(param.GetKey()) { @@ -660,12 +660,12 @@ func (dit *describeIndexTask) SetTs(ts Timestamp) { func (dit *describeIndexTask) OnEnqueue() error { dit.Base = commonpbutil.NewMsgBase() + dit.Base.MsgType = commonpb.MsgType_DescribeIndex + dit.Base.SourceID = paramtable.GetNodeID() return nil } func (dit *describeIndexTask) PreExecute(ctx context.Context) error { - dit.Base.MsgType = commonpb.MsgType_DescribeIndex - dit.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(dit.CollectionName); err != nil { return err @@ -785,12 +785,12 @@ func (dit *getIndexStatisticsTask) SetTs(ts Timestamp) { func (dit *getIndexStatisticsTask) OnEnqueue() error { dit.Base = commonpbutil.NewMsgBase() + dit.Base.MsgType = commonpb.MsgType_GetIndexStatistics + dit.Base.SourceID = paramtable.GetNodeID() return nil } func (dit *getIndexStatisticsTask) PreExecute(ctx context.Context) error { - dit.Base.MsgType = commonpb.MsgType_GetIndexStatistics - dit.Base.SourceID = dit.nodeID if err := validateCollectionName(dit.CollectionName); err != nil { return err @@ -903,12 +903,12 @@ func (dit *dropIndexTask) OnEnqueue() error { if dit.Base == nil { dit.Base = commonpbutil.NewMsgBase() } + dit.Base.MsgType = commonpb.MsgType_DropIndex + dit.Base.SourceID = paramtable.GetNodeID() return nil } func (dit *dropIndexTask) PreExecute(ctx context.Context) error { - dit.Base.MsgType = commonpb.MsgType_DropIndex - dit.Base.SourceID = paramtable.GetNodeID() collName, fieldName := dit.CollectionName, dit.FieldName @@ -1014,12 +1014,12 @@ func (gibpt *getIndexBuildProgressTask) SetTs(ts Timestamp) { func (gibpt *getIndexBuildProgressTask) OnEnqueue() error { gibpt.Base = commonpbutil.NewMsgBase() + gibpt.Base.MsgType = commonpb.MsgType_GetIndexBuildProgress + gibpt.Base.SourceID = paramtable.GetNodeID() return nil } func (gibpt *getIndexBuildProgressTask) PreExecute(ctx context.Context) error { - gibpt.Base.MsgType = commonpb.MsgType_GetIndexBuildProgress - gibpt.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(gibpt.CollectionName); err != nil { return err @@ -1104,12 +1104,12 @@ func (gist *getIndexStateTask) SetTs(ts Timestamp) { func (gist *getIndexStateTask) OnEnqueue() error { gist.Base = commonpbutil.NewMsgBase() + gist.Base.MsgType = commonpb.MsgType_GetIndexState + gist.Base.SourceID = paramtable.GetNodeID() return nil } func (gist *getIndexStateTask) PreExecute(ctx context.Context) error { - gist.Base.MsgType = commonpb.MsgType_GetIndexState - gist.Base.SourceID = paramtable.GetNodeID() if err := validateCollectionName(gist.CollectionName); err != nil { return err diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 8b45a8621380c..c0c8a38f5fe48 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -15,6 +15,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" @@ -91,6 +92,11 @@ func (it *insertTask) getChannels() []pChan { } func (it *insertTask) OnEnqueue() error { + if it.insertMsg.Base == nil { + it.insertMsg.Base = commonpbutil.NewMsgBase() + } + it.insertMsg.Base.MsgType = commonpb.MsgType_Insert + it.insertMsg.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 8af0c2099799a..9282c72edb633 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -43,6 +44,7 @@ const ( ) type queryTask struct { + baseTask Condition *internalpb.RetrieveRequest @@ -710,6 +712,10 @@ func (t *queryTask) SetTs(ts Timestamp) { } func (t *queryTask) OnEnqueue() error { + if t.Base == nil { + t.Base = commonpbutil.NewMsgBase() + } t.Base.MsgType = commonpb.MsgType_Retrieve + t.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index 6c3adbfda34de..05a8199844f94 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -19,6 +19,7 @@ package proxy import ( "container/list" "context" + "strconv" "sync" "time" @@ -26,6 +27,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" @@ -190,6 +192,7 @@ func (queue *baseTaskQueue) Enqueue(t task) error { t.SetTs(ts) t.SetID(id) + t.SetOnEnqueueTime() return queue.addUnissuedTask(t) } @@ -454,6 +457,11 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) { }() span.AddEvent("scheduler process PreExecute") + waitDuration := t.GetDurationInQueue() + metrics.ProxyReqInQueueLatency. + WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), t.Type().String()). + Observe(float64(waitDuration.Milliseconds())) + err := t.PreExecute(ctx) defer func() { diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 45a2af64d6fe5..59d58f83bec6b 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -46,6 +46,7 @@ const ( ) type searchTask struct { + baseTask Condition ctx context.Context *internalpb.SearchRequest diff --git a/internal/proxy/task_statistic.go b/internal/proxy/task_statistic.go index 58cfcbb418b7f..20425151fa0d4 100644 --- a/internal/proxy/task_statistic.go +++ b/internal/proxy/task_statistic.go @@ -94,6 +94,9 @@ func (g *getStatisticsTask) OnEnqueue() error { g.GetStatisticsRequest = &internalpb.GetStatisticsRequest{ Base: commonpbutil.NewMsgBase(), } + + g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics + g.Base.SourceID = paramtable.GetNodeID() return nil } @@ -107,10 +110,6 @@ func (g *getStatisticsTask) PreExecute(ctx context.Context) error { ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetStatistics-PreExecute") defer sp.End() - // TODO: Maybe we should create a new MsgType: GetStatistics? - g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics - g.Base.SourceID = paramtable.GetNodeID() - collID, err := globalMetaCache.GetCollectionID(ctx, g.request.GetDbName(), g.collectionName) if err != nil { // err is not nil if collection not exists return err @@ -634,12 +633,12 @@ func (g *getCollectionStatisticsTask) SetTs(ts Timestamp) { func (g *getCollectionStatisticsTask) OnEnqueue() error { g.Base = commonpbutil.NewMsgBase() + g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics + g.Base.SourceID = paramtable.GetNodeID() return nil } func (g *getCollectionStatisticsTask) PreExecute(ctx context.Context) error { - g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics - g.Base.SourceID = paramtable.GetNodeID() return nil } @@ -717,12 +716,12 @@ func (g *getPartitionStatisticsTask) SetTs(ts Timestamp) { func (g *getPartitionStatisticsTask) OnEnqueue() error { g.Base = commonpbutil.NewMsgBase() + g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics + g.Base.SourceID = paramtable.GetNodeID() return nil } func (g *getPartitionStatisticsTask) PreExecute(ctx context.Context) error { - g.Base.MsgType = commonpb.MsgType_GetPartitionStatistics - g.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 54fda37a135d0..c85632018c506 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -1022,6 +1022,7 @@ func TestHasCollectionTask(t *testing.T) { rootCoord: rc, result: nil, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_HasCollection, task.Type()) @@ -1084,6 +1085,7 @@ func TestDescribeCollectionTask(t *testing.T) { rootCoord: rc, result: nil, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DescribeCollection, task.Type()) @@ -1332,6 +1334,7 @@ func TestCreatePartitionTask(t *testing.T) { rootCoord: rc, result: nil, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_CreatePartition, task.Type()) @@ -1407,6 +1410,7 @@ func TestDropPartitionTask(t *testing.T) { queryCoord: qc, result: nil, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DropPartition, task.Type()) @@ -1524,6 +1528,7 @@ func TestHasPartitionTask(t *testing.T) { rootCoord: rc, result: nil, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_HasPartition, task.Type()) @@ -1571,6 +1576,7 @@ func TestShowPartitionsTask(t *testing.T) { rootCoord: rc, result: nil, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_ShowPartitions, task.Type()) @@ -2693,6 +2699,7 @@ func TestCreateResourceGroupTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_CreateResourceGroup, task.Type()) @@ -2732,6 +2739,7 @@ func TestDropResourceGroupTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DropResourceGroup, task.Type()) @@ -2773,6 +2781,7 @@ func TestTransferNodeTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_TransferNode, task.Type()) @@ -2815,6 +2824,7 @@ func TestTransferReplicaTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_TransferReplica, task.Type()) @@ -2854,6 +2864,7 @@ func TestListResourceGroupsTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_ListResourceGroups, task.Type()) @@ -2906,6 +2917,7 @@ func TestDescribeResourceGroupTask(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DescribeResourceGroup, task.Type()) @@ -2952,6 +2964,7 @@ func TestDescribeResourceGroupTaskFailed(t *testing.T) { ctx: ctx, queryCoord: qc, } + task.OnEnqueue() task.PreExecute(ctx) assert.Equal(t, commonpb.MsgType_DescribeResourceGroup, task.Type()) diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index c551843e94502..8132430c67b64 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -134,6 +134,11 @@ func (it *upsertTask) getChannels() []pChan { } func (it *upsertTask) OnEnqueue() error { + if it.req.Base == nil { + it.req.Base = commonpbutil.NewMsgBase() + } + it.req.Base.MsgType = commonpb.MsgType_Upsert + it.req.Base.SourceID = paramtable.GetNodeID() return nil } diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index 46e67ce29dd39..43fbd49c83734 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -334,6 +334,16 @@ var ( Name: "slow_query_count", Help: "count of slow query executed", }, []string{nodeIDLabelName, msgTypeLabelName}) + + // ProxyReqInQueueLatency records the latency that requests wait in the queue, like "CreateCollection". + ProxyReqInQueueLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "req_in_queue_latency", + Help: "latency which request waits in the queue", + Buckets: buckets, // unit: ms + }, []string{nodeIDLabelName, functionLabelName}) ) // RegisterProxy registers Proxy metrics @@ -383,6 +393,7 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxySlowQueryCount) registry.MustRegister(ProxyReportValue) + registry.MustRegister(ProxyReqInQueueLatency) } func CleanupProxyDBMetrics(nodeID int64, dbName string) {