Skip to content

Commit

Permalink
enhance: add db name in replica description
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Dec 23, 2024
1 parent 205231b commit 4914c38
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 23 deletions.
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_task_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.Compaction
Type: task.Type.String(),
State: task.State.String(),
FailReason: task.FailReason,
StartTime: typeutil.TimestampToString(uint64(task.StartTime)),
EndTime: typeutil.TimestampToString(uint64(task.EndTime)),
StartTime: typeutil.TimestampToString(uint64(task.StartTime) * 1000),
EndTime: typeutil.TimestampToString(uint64(task.EndTime) * 1000),
TotalRows: task.TotalRows,
InputSegments: lo.Map(task.InputSegments, func(t int64, i int) string {
return strconv.FormatInt(t, 10)
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats {
FailReason: s.FailReason,
IndexSize: s.IndexSize,
IndexVersion: s.IndexVersion,
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime),
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime * 1000),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime * 1000),
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ message CollectionLoadInfo {
LoadType load_type = 6;
int32 recover_times = 7;
repeated int64 load_fields = 8;
string dbName = 9;
}

message PartitionLoadInfo {
Expand Down
23 changes: 14 additions & 9 deletions internal/querycoordv2/job/job_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,15 @@ func (job *LoadCollectionJob) Execute() error {
}
}

collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}

Check warning on line 178 in internal/querycoordv2/job/job_load.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/job/job_load.go#L176-L178

Added lines #L176 - L178 were not covered by tests

// 2. create replica if not exist
replicas := job.meta.ReplicaManager.GetByCollection(job.ctx, req.GetCollectionID())
if len(replicas) == 0 {
collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
return err
}

// API of LoadCollection is wired, we should use map[resourceGroupNames]replicaNumber as input, to keep consistency with `TransferReplica` API.
// Then we can implement dynamic replica changed in different resource group independently.
_, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames())
Expand Down Expand Up @@ -213,6 +214,7 @@ func (job *LoadCollectionJob) Execute() error {
FieldIndexID: req.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadCollection,
LoadFields: req.GetLoadFields(),
DbName: collectionInfo.GetDbName(),
},
CreatedAt: time.Now(),
LoadSpan: sp,
Expand Down Expand Up @@ -371,13 +373,15 @@ func (job *LoadPartitionJob) Execute() error {
}
}

collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}

Check warning on line 380 in internal/querycoordv2/job/job_load.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/job/job_load.go#L378-L380

Added lines #L378 - L380 were not covered by tests

// 2. create replica if not exist
replicas := job.meta.ReplicaManager.GetByCollection(context.TODO(), req.GetCollectionID())
if len(replicas) == 0 {
collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
return err
}
_, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames())
if err != nil {
msg := "failed to spawn replica for collection"
Expand Down Expand Up @@ -412,6 +416,7 @@ func (job *LoadPartitionJob) Execute() error {
FieldIndexID: req.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadPartition,
LoadFields: req.GetLoadFields(),
DbName: collectionInfo.GetDbName(),
},
CreatedAt: time.Now(),
LoadSpan: sp,
Expand Down
11 changes: 9 additions & 2 deletions internal/querycoordv2/meta/collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,18 +285,25 @@ func (m *CollectionManager) upgradeRecover(ctx context.Context, broker Broker) e
// we should save it's CollectionLoadInfo to meta store
for _, partition := range m.GetAllPartitions(ctx) {
// In old version, collection would NOT be stored if the partition existed.
if _, ok := m.collections[partition.GetCollectionID()]; !ok {
if !m.Exist(ctx, partition.GetCollectionID()) {
collectionInfo, err := broker.DescribeCollection(ctx, partition.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}

Check warning on line 293 in internal/querycoordv2/meta/collection_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/meta/collection_manager.go#L291-L293

Added lines #L291 - L293 were not covered by tests

col := &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: partition.GetCollectionID(),
ReplicaNumber: partition.GetReplicaNumber(),
Status: partition.GetStatus(),
FieldIndexID: partition.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadPartition,
DbName: collectionInfo.GetDbName(),
},
LoadPercentage: 100,
}
err := m.PutCollection(ctx, col)
err = m.PutCollection(ctx, col)
if err != nil {
return err
}
Expand Down
12 changes: 11 additions & 1 deletion internal/querycoordv2/meta/replica_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (m *ReplicaManager) GetResourceGroupByCollection(ctx context.Context, colle
// It locks the ReplicaManager for reading, converts the replicas to their protobuf representation,
// marshals them into a JSON string, and returns the result.
// If an error occurs during marshaling, it logs a warning and returns an empty string.
func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string {
func (m *ReplicaManager) GetReplicasJSON(ctx context.Context, meta *Meta) string {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()

Expand All @@ -515,9 +515,19 @@ func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string {
for k, v := range r.replicaPB.GetChannelNodeInfos() {
channelTowRWNodes[k] = v.GetRwNodes()
}

collectionInfo := meta.GetCollection(ctx, r.GetCollectionID())
dbName := ""
if collectionInfo == nil {
log.Ctx(ctx).Warn("failed to get collection info", zap.Int64("collectionID", r.GetCollectionID()))

Check warning on line 522 in internal/querycoordv2/meta/replica_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/meta/replica_manager.go#L522

Added line #L522 was not covered by tests
} else {
dbName = collectionInfo.GetDbName()
}

return &metricsinfo.Replica{
ID: r.GetID(),
CollectionID: r.GetCollectionID(),
DBName: dbName,
RWNodes: r.GetNodes(),
ResourceGroup: r.GetResourceGroup(),
RONodes: r.GetRONodes(),
Expand Down
23 changes: 22 additions & 1 deletion internal/querycoordv2/meta/replica_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,26 @@ func TestGetReplicasJSON(t *testing.T) {
err = replicaManager.put(ctx, replica2)
assert.NoError(t, err)

jsonOutput := replicaManager.GetReplicasJSON(ctx)
meta := &Meta{
CollectionManager: NewCollectionManager(catalog),
}

err = meta.PutCollectionWithoutSave(ctx, &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 100,
DbName: "tt",
},
})
assert.NoError(t, err)

err = meta.PutCollectionWithoutSave(ctx, &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 200,
},
})
assert.NoError(t, err)

jsonOutput := replicaManager.GetReplicasJSON(ctx, meta)
var replicas []*metricsinfo.Replica
err = json.Unmarshal([]byte(jsonOutput), &replicas)
assert.NoError(t, err)
Expand All @@ -556,10 +575,12 @@ func TestGetReplicasJSON(t *testing.T) {
assert.Equal(t, int64(100), replica.CollectionID)
assert.Equal(t, "rg1", replica.ResourceGroup)
assert.ElementsMatch(t, []int64{1, 2, 3}, replica.RWNodes)
assert.Equal(t, "tt", replica.DBName)
} else if replica.ID == 2 {
assert.Equal(t, int64(200), replica.CollectionID)
assert.Equal(t, "rg2", replica.ResourceGroup)
assert.ElementsMatch(t, []int64{4, 5, 6}, replica.RWNodes)
assert.Equal(t, "", replica.DBName)
} else {
assert.Failf(t, "unexpected replica id", "unexpected replica id %d", replica.ID)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (s *Server) registerMetricsRequest() {
}

QueryReplicasAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.meta.GetReplicasJSON(ctx), nil
return s.meta.GetReplicasJSON(ctx, s.meta), nil

Check warning on line 218 in internal/querycoordv2/server.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/server.go#L218

Added line #L218 was not covered by tests
}

QueryResourceGroupsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
Expand Down
1 change: 0 additions & 1 deletion internal/querycoordv2/utils/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func SpawnReplicasWithRG(ctx context.Context, m *meta.Meta, collection int64, re
if err != nil {
return nil, err
}

// Spawn it in replica manager.
replicas, err := m.ReplicaManager.Spawn(ctx, collection, replicaNumInRG, channels)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions pkg/util/metricsinfo/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ type ResourceGroup struct {
type Replica struct {
ID int64 `json:"ID,omitempty,string"`
CollectionID int64 `json:"collectionID,omitempty,string"`
DBName string `json:"db_name,omitempty"`
RWNodes []int64 `json:"rw_nodes,omitempty"`
ResourceGroup string `json:"resource_group,omitempty"`
RONodes []int64 `json:"ro_nodes,omitempty"`
Expand Down Expand Up @@ -382,8 +383,8 @@ type ImportTask struct {
}

type CompactionTask struct {
PlanID int64 `json:"plan_id,omitempty"`
CollectionID int64 `json:"collection_id,omitempty"`
PlanID int64 `json:"plan_id,omitempty,string"`
CollectionID int64 `json:"collection_id,omitempty,string"`
Type string `json:"type,omitempty"`
State string `json:"state,omitempty"`
FailReason string `json:"fail_reason,omitempty"`
Expand Down Expand Up @@ -447,7 +448,7 @@ type Collection struct {
ConsistencyLevel string `json:"consistency_level,omitempty"`
Aliases []string `json:"aliases,omitempty"`
Properties map[string]string `json:"properties,omitempty"`
DBName string `json:"db_name,omitempty,string"`
DBName string `json:"db_name,omitempty"`
NumPartitions int `json:"num_partitions,omitempty,string"`
VirtualChannelNames []string `json:"virtual_channel_names,omitempty"`
PhysicalChannelNames []string `json:"physical_channel_names,omitempty"`
Expand All @@ -458,7 +459,7 @@ type Collection struct {

type Database struct {
DBName string `json:"db_name,omitempty"`
DBID int64 `json:"dbID,omitempty"`
DBID int64 `json:"dbID,omitempty,string"`
CreatedTimestamp string `json:"created_timestamp,omitempty"`
Properties map[string]string `json:"properties,omitempty"`
}
Expand Down

0 comments on commit 4914c38

Please sign in to comment.