From ed2a868837c0cf34134d73b06c7083206f9c8572 Mon Sep 17 00:00:00 2001 From: jaime Date: Mon, 23 Dec 2024 16:16:48 +0800 Subject: [PATCH] enhance: add db name in replica description Signed-off-by: jaime --- internal/datacoord/compaction_task_meta.go | 4 ++-- internal/datacoord/index_meta.go | 4 ++-- internal/proto/query_coord.proto | 1 + internal/querycoordv2/job/job_load.go | 23 +++++++++++-------- .../querycoordv2/meta/collection_manager.go | 11 +++++++-- internal/querycoordv2/meta/replica_manager.go | 12 +++++++++- .../querycoordv2/meta/replica_manager_test.go | 23 ++++++++++++++++++- internal/querycoordv2/server.go | 2 +- internal/querycoordv2/utils/meta.go | 1 - pkg/util/metricsinfo/metrics_info.go | 9 ++++---- 10 files changed, 67 insertions(+), 23 deletions(-) diff --git a/internal/datacoord/compaction_task_meta.go b/internal/datacoord/compaction_task_meta.go index 33a2ad20b59cb..be18fd0884fae 100644 --- a/internal/datacoord/compaction_task_meta.go +++ b/internal/datacoord/compaction_task_meta.go @@ -43,8 +43,8 @@ func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.Compaction Type: task.Type.String(), State: task.State.String(), FailReason: task.FailReason, - StartTime: typeutil.TimestampToString(uint64(task.StartTime)), - EndTime: typeutil.TimestampToString(uint64(task.EndTime)), + StartTime: typeutil.TimestampToString(uint64(task.StartTime) * 1000), + EndTime: typeutil.TimestampToString(uint64(task.EndTime) * 1000), TotalRows: task.TotalRows, InputSegments: lo.Map(task.InputSegments, func(t int64, i int) string { return strconv.FormatInt(t, 10) diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index bc12f4f88da1c..37e744bd8a4d5 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -76,8 +76,8 @@ func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats { FailReason: s.FailReason, IndexSize: s.IndexSize, IndexVersion: s.IndexVersion, - CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime), - FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime), + CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime * 1000), + FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime * 1000), } } diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index f05a07dc7b028..0d4819e22d7ba 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -668,6 +668,7 @@ message CollectionLoadInfo { LoadType load_type = 6; int32 recover_times = 7; repeated int64 load_fields = 8; + string dbName = 9; } message PartitionLoadInfo { diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 5009be26fb4ad..f1db70bb59180 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -171,14 +171,15 @@ func (job *LoadCollectionJob) Execute() error { } } + collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID()) + if err != nil { + log.Warn("failed to describe collection from RootCoord", zap.Error(err)) + return err + } + // 2. create replica if not exist replicas := job.meta.ReplicaManager.GetByCollection(job.ctx, req.GetCollectionID()) if len(replicas) == 0 { - collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID()) - if err != nil { - return err - } - // API of LoadCollection is wired, we should use map[resourceGroupNames]replicaNumber as input, to keep consistency with `TransferReplica` API. // Then we can implement dynamic replica changed in different resource group independently. _, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames()) @@ -213,6 +214,7 @@ func (job *LoadCollectionJob) Execute() error { FieldIndexID: req.GetFieldIndexID(), LoadType: querypb.LoadType_LoadCollection, LoadFields: req.GetLoadFields(), + DbName: collectionInfo.GetDbName(), }, CreatedAt: time.Now(), LoadSpan: sp, @@ -371,13 +373,15 @@ func (job *LoadPartitionJob) Execute() error { } } + collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID()) + if err != nil { + log.Warn("failed to describe collection from RootCoord", zap.Error(err)) + return err + } + // 2. create replica if not exist replicas := job.meta.ReplicaManager.GetByCollection(context.TODO(), req.GetCollectionID()) if len(replicas) == 0 { - collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID()) - if err != nil { - return err - } _, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames()) if err != nil { msg := "failed to spawn replica for collection" @@ -412,6 +416,7 @@ func (job *LoadPartitionJob) Execute() error { FieldIndexID: req.GetFieldIndexID(), LoadType: querypb.LoadType_LoadPartition, LoadFields: req.GetLoadFields(), + DbName: collectionInfo.GetDbName(), }, CreatedAt: time.Now(), LoadSpan: sp, diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index f95d18b0658f3..f3389c731d03d 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -285,7 +285,13 @@ func (m *CollectionManager) upgradeRecover(ctx context.Context, broker Broker) e // we should save it's CollectionLoadInfo to meta store for _, partition := range m.GetAllPartitions(ctx) { // In old version, collection would NOT be stored if the partition existed. - if _, ok := m.collections[partition.GetCollectionID()]; !ok { + if !m.Exist(ctx, partition.GetCollectionID()) { + collectionInfo, err := broker.DescribeCollection(ctx, partition.GetCollectionID()) + if err != nil { + log.Warn("failed to describe collection from RootCoord", zap.Error(err)) + return err + } + col := &Collection{ CollectionLoadInfo: &querypb.CollectionLoadInfo{ CollectionID: partition.GetCollectionID(), @@ -293,10 +299,11 @@ func (m *CollectionManager) upgradeRecover(ctx context.Context, broker Broker) e Status: partition.GetStatus(), FieldIndexID: partition.GetFieldIndexID(), LoadType: querypb.LoadType_LoadPartition, + DbName: collectionInfo.GetDbName(), }, LoadPercentage: 100, } - err := m.PutCollection(ctx, col) + err = m.PutCollection(ctx, col) if err != nil { return err } diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index 2c04c52c7cddc..5296bb85203ad 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -506,7 +506,7 @@ func (m *ReplicaManager) GetResourceGroupByCollection(ctx context.Context, colle // It locks the ReplicaManager for reading, converts the replicas to their protobuf representation, // marshals them into a JSON string, and returns the result. // If an error occurs during marshaling, it logs a warning and returns an empty string. -func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string { +func (m *ReplicaManager) GetReplicasJSON(ctx context.Context, meta *Meta) string { m.rwmutex.RLock() defer m.rwmutex.RUnlock() @@ -515,9 +515,19 @@ func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string { for k, v := range r.replicaPB.GetChannelNodeInfos() { channelTowRWNodes[k] = v.GetRwNodes() } + + collectionInfo := meta.GetCollection(ctx, r.GetCollectionID()) + dbName := "" + if collectionInfo == nil { + log.Ctx(ctx).Warn("failed to get collection info", zap.Int64("collectionID", r.GetCollectionID())) + } else { + dbName = collectionInfo.GetDbName() + } + return &metricsinfo.Replica{ ID: r.GetID(), CollectionID: r.GetCollectionID(), + DBName: dbName, RWNodes: r.GetNodes(), ResourceGroup: r.GetResourceGroup(), RONodes: r.GetRONodes(), diff --git a/internal/querycoordv2/meta/replica_manager_test.go b/internal/querycoordv2/meta/replica_manager_test.go index cdfc1cbdfc8b5..04174fafe15dc 100644 --- a/internal/querycoordv2/meta/replica_manager_test.go +++ b/internal/querycoordv2/meta/replica_manager_test.go @@ -545,7 +545,26 @@ func TestGetReplicasJSON(t *testing.T) { err = replicaManager.put(ctx, replica2) assert.NoError(t, err) - jsonOutput := replicaManager.GetReplicasJSON(ctx) + meta := &Meta{ + CollectionManager: NewCollectionManager(catalog), + } + + err = meta.PutCollectionWithoutSave(ctx, &Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 100, + DbName: "tt", + }, + }) + assert.NoError(t, err) + + err = meta.PutCollectionWithoutSave(ctx, &Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 200, + }, + }) + assert.NoError(t, err) + + jsonOutput := replicaManager.GetReplicasJSON(ctx, meta) var replicas []*metricsinfo.Replica err = json.Unmarshal([]byte(jsonOutput), &replicas) assert.NoError(t, err) @@ -556,10 +575,12 @@ func TestGetReplicasJSON(t *testing.T) { assert.Equal(t, int64(100), replica.CollectionID) assert.Equal(t, "rg1", replica.ResourceGroup) assert.ElementsMatch(t, []int64{1, 2, 3}, replica.RWNodes) + assert.Equal(t, "tt", replica.DBName) } else if replica.ID == 2 { assert.Equal(t, int64(200), replica.CollectionID) assert.Equal(t, "rg2", replica.ResourceGroup) assert.ElementsMatch(t, []int64{4, 5, 6}, replica.RWNodes) + assert.Equal(t, "", replica.DBName) } else { assert.Failf(t, "unexpected replica id", "unexpected replica id %d", replica.ID) } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 8ae802cf7848f..58261423659c2 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -215,7 +215,7 @@ func (s *Server) registerMetricsRequest() { } QueryReplicasAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return s.meta.GetReplicasJSON(ctx), nil + return s.meta.GetReplicasJSON(ctx, s.meta), nil } QueryResourceGroupsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index 1744e2367849f..fe15c685bbf0c 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -153,7 +153,6 @@ func SpawnReplicasWithRG(ctx context.Context, m *meta.Meta, collection int64, re if err != nil { return nil, err } - // Spawn it in replica manager. replicas, err := m.ReplicaManager.Spawn(ctx, collection, replicaNumInRG, channels) if err != nil { diff --git a/pkg/util/metricsinfo/metrics_info.go b/pkg/util/metricsinfo/metrics_info.go index 6a6b5b46f8679..c9f68f698b851 100644 --- a/pkg/util/metricsinfo/metrics_info.go +++ b/pkg/util/metricsinfo/metrics_info.go @@ -195,6 +195,7 @@ type ResourceGroup struct { type Replica struct { ID int64 `json:"ID,omitempty,string"` CollectionID int64 `json:"collectionID,omitempty,string"` + DBName string `json:"db_name,omitempty"` RWNodes []int64 `json:"rw_nodes,omitempty"` ResourceGroup string `json:"resource_group,omitempty"` RONodes []int64 `json:"ro_nodes,omitempty"` @@ -382,8 +383,8 @@ type ImportTask struct { } type CompactionTask struct { - PlanID int64 `json:"plan_id,omitempty"` - CollectionID int64 `json:"collection_id,omitempty"` + PlanID int64 `json:"plan_id,omitempty,string"` + CollectionID int64 `json:"collection_id,omitempty,string"` Type string `json:"type,omitempty"` State string `json:"state,omitempty"` FailReason string `json:"fail_reason,omitempty"` @@ -447,7 +448,7 @@ type Collection struct { ConsistencyLevel string `json:"consistency_level,omitempty"` Aliases []string `json:"aliases,omitempty"` Properties map[string]string `json:"properties,omitempty"` - DBName string `json:"db_name,omitempty,string"` + DBName string `json:"db_name,omitempty"` NumPartitions int `json:"num_partitions,omitempty,string"` VirtualChannelNames []string `json:"virtual_channel_names,omitempty"` PhysicalChannelNames []string `json:"physical_channel_names,omitempty"` @@ -458,7 +459,7 @@ type Collection struct { type Database struct { DBName string `json:"db_name,omitempty"` - DBID int64 `json:"dbID,omitempty"` + DBID int64 `json:"dbID,omitempty,string"` CreatedTimestamp string `json:"created_timestamp,omitempty"` Properties map[string]string `json:"properties,omitempty"` }