Skip to content

Commit

Permalink
enhance: add db name in replica description
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Dec 26, 2024
1 parent 5395ec1 commit 12a7c44
Show file tree
Hide file tree
Showing 15 changed files with 106 additions and 45 deletions.
9 changes: 9 additions & 0 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,15 @@ func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datap
}

func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}

task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ func (t *l0CompactionTask) processExecuting() bool {
log.Warn("l0CompactionTask failed to get compaction result", zap.Error(err))
return false
}

ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts)}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
Expand All @@ -141,15 +138,13 @@ func (t *l0CompactionTask) processExecuting() bool {
return false
}

updateOps = append(updateOps, setState(datapb.CompactionTaskState_meta_saved))
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved)); err != nil {
log.Warn("l0CompactionTask failed to save task meta_saved state", zap.Error(err))
return false
}
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
updateOps = append(updateOps, setState(datapb.CompactionTaskState_failed))
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)); err != nil {
log.Warn("l0CompactionTask failed to set task failed state", zap.Error(err))
return false
}
Expand All @@ -159,9 +154,7 @@ func (t *l0CompactionTask) processExecuting() bool {
}

func (t *l0CompactionTask) processMetaSaved() bool {
ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)}
err := t.updateAndSaveTaskMeta(updateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err != nil {
log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return false
Expand Down Expand Up @@ -358,6 +351,15 @@ func (t *l0CompactionTask) SaveTaskMeta() error {
}

func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}

task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_task_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.Compaction
Type: task.Type.String(),
State: task.State.String(),
FailReason: task.FailReason,
StartTime: typeutil.TimestampToString(uint64(task.StartTime)),
EndTime: typeutil.TimestampToString(uint64(task.EndTime)),
StartTime: typeutil.TimestampToString(uint64(task.StartTime) * 1000),
EndTime: typeutil.TimestampToString(uint64(task.EndTime) * 1000),
TotalRows: task.TotalRows,
InputSegments: lo.Map(task.InputSegments, func(t int64, i int) string {
return strconv.FormatInt(t, 10)
Expand Down
26 changes: 14 additions & 12 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ func (t *mixCompactionTask) processPipelining() bool {

func (t *mixCompactionTask) processMetaSaved() bool {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)}
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil {
log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err))
return false
}
Expand All @@ -119,15 +117,12 @@ func (t *mixCompactionTask) processExecuting() bool {
log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err))
return false
}

ts := time.Now().Unix()
failedUpdateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_failed)}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
if len(result.GetSegments()) == 0 {
log.Info("illegal compaction results")
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))

Check warning on line 125 in internal/datacoord/compaction_task_mix.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_task_mix.go#L125

Added line #L125 was not covered by tests
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
Expand All @@ -137,7 +132,7 @@ func (t *mixCompactionTask) processExecuting() bool {
if err := t.saveSegmentMeta(); err != nil {
log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err))
if errors.Is(err, merr.ErrIllegalCompactionPlan) {
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))

Check warning on line 135 in internal/datacoord/compaction_task_mix.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_task_mix.go#L135

Added line #L135 was not covered by tests
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
Expand All @@ -154,7 +149,7 @@ func (t *mixCompactionTask) processExecuting() bool {
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
log.Info("mixCompactionTask fail in datanode")
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))

Check warning on line 152 in internal/datacoord/compaction_task_mix.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_task_mix.go#L152

Added line #L152 was not covered by tests
if err != nil {
log.Warn("fail to updateAndSaveTaskMeta")
}
Expand Down Expand Up @@ -240,10 +235,8 @@ func (t *mixCompactionTask) processCompleted() bool {

t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("mixCompactionTask processCompleted done")

task := t.GetTaskProto()
log.Info("mixCompactionTask processCompleted done",
zap.Int64("planID", task.GetPlanID()), zap.Duration("costs", time.Duration(task.GetEndTime()-task.GetStartTime())*time.Second))
return true
}

Expand Down Expand Up @@ -289,6 +282,15 @@ func (t *mixCompactionTask) doClean() error {
}

func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}

task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats {
FailReason: s.FailReason,
IndexSize: s.IndexSize,
IndexVersion: s.IndexVersion,
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime),
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime * 1000),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime * 1000),
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/metastore/model/segment_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex {
IndexSize: segIndex.SerializeSize,
WriteHandoff: segIndex.WriteHandoff,
CurrentIndexVersion: segIndex.GetCurrentIndexVersion(),
FinishedUTCTime: segIndex.FinishedTime,
}
}

Expand All @@ -75,6 +76,7 @@ func MarshalSegmentIndexModel(segIdx *SegmentIndex) *indexpb.SegmentIndex {
SerializeSize: segIdx.IndexSize,
WriteHandoff: segIdx.WriteHandoff,
CurrentIndexVersion: segIdx.CurrentIndexVersion,
FinishedTime: segIdx.FinishedUTCTime,
}
}

Expand All @@ -96,5 +98,6 @@ func CloneSegmentIndex(segIndex *SegmentIndex) *SegmentIndex {
IndexSize: segIndex.IndexSize,
WriteHandoff: segIndex.WriteHandoff,
CurrentIndexVersion: segIndex.CurrentIndexVersion,
FinishedUTCTime: segIndex.FinishedUTCTime,
}
}
1 change: 1 addition & 0 deletions internal/proto/index_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ message SegmentIndex {
bool write_handoff = 15;
int32 current_index_version = 16;
int64 index_store_version = 17;
uint64 finished_time = 18;
}

message RegisterNodeRequest {
Expand Down
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ message CollectionLoadInfo {
LoadType load_type = 6;
int32 recover_times = 7;
repeated int64 load_fields = 8;
int64 dbID= 9;
}

message PartitionLoadInfo {
Expand Down
23 changes: 14 additions & 9 deletions internal/querycoordv2/job/job_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,15 @@ func (job *LoadCollectionJob) Execute() error {
}
}

collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}

Check warning on line 178 in internal/querycoordv2/job/job_load.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/job/job_load.go#L176-L178

Added lines #L176 - L178 were not covered by tests

// 2. create replica if not exist
replicas := job.meta.ReplicaManager.GetByCollection(job.ctx, req.GetCollectionID())
if len(replicas) == 0 {
collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
return err
}

// API of LoadCollection is wired, we should use map[resourceGroupNames]replicaNumber as input, to keep consistency with `TransferReplica` API.
// Then we can implement dynamic replica changed in different resource group independently.
_, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames())
Expand Down Expand Up @@ -213,6 +214,7 @@ func (job *LoadCollectionJob) Execute() error {
FieldIndexID: req.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadCollection,
LoadFields: req.GetLoadFields(),
DbID: collectionInfo.GetDbId(),
},
CreatedAt: time.Now(),
LoadSpan: sp,
Expand Down Expand Up @@ -371,13 +373,15 @@ func (job *LoadPartitionJob) Execute() error {
}
}

collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}

Check warning on line 380 in internal/querycoordv2/job/job_load.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/job/job_load.go#L378-L380

Added lines #L378 - L380 were not covered by tests

// 2. create replica if not exist
replicas := job.meta.ReplicaManager.GetByCollection(context.TODO(), req.GetCollectionID())
if len(replicas) == 0 {
collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
return err
}
_, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames())
if err != nil {
msg := "failed to spawn replica for collection"
Expand Down Expand Up @@ -412,6 +416,7 @@ func (job *LoadPartitionJob) Execute() error {
FieldIndexID: req.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadPartition,
LoadFields: req.GetLoadFields(),
DbID: collectionInfo.GetDbId(),
},
CreatedAt: time.Now(),
LoadSpan: sp,
Expand Down
11 changes: 9 additions & 2 deletions internal/querycoordv2/meta/collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,18 +285,25 @@ func (m *CollectionManager) upgradeRecover(ctx context.Context, broker Broker) e
// we should save it's CollectionLoadInfo to meta store
for _, partition := range m.GetAllPartitions(ctx) {
// In old version, collection would NOT be stored if the partition existed.
if _, ok := m.collections[partition.GetCollectionID()]; !ok {
if !m.Exist(ctx, partition.GetCollectionID()) {
collectionInfo, err := broker.DescribeCollection(ctx, partition.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}

Check warning on line 293 in internal/querycoordv2/meta/collection_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/meta/collection_manager.go#L291-L293

Added lines #L291 - L293 were not covered by tests

col := &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: partition.GetCollectionID(),
ReplicaNumber: partition.GetReplicaNumber(),
Status: partition.GetStatus(),
FieldIndexID: partition.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadPartition,
DbID: collectionInfo.GetDbId(),
},
LoadPercentage: 100,
}
err := m.PutCollection(ctx, col)
err = m.PutCollection(ctx, col)
if err != nil {
return err
}
Expand Down
12 changes: 11 additions & 1 deletion internal/querycoordv2/meta/replica_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (m *ReplicaManager) GetResourceGroupByCollection(ctx context.Context, colle
// It locks the ReplicaManager for reading, converts the replicas to their protobuf representation,
// marshals them into a JSON string, and returns the result.
// If an error occurs during marshaling, it logs a warning and returns an empty string.
func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string {
func (m *ReplicaManager) GetReplicasJSON(ctx context.Context, meta *Meta) string {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()

Expand All @@ -515,9 +515,19 @@ func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string {
for k, v := range r.replicaPB.GetChannelNodeInfos() {
channelTowRWNodes[k] = v.GetRwNodes()
}

collectionInfo := meta.GetCollection(ctx, r.GetCollectionID())
dbID := int64(-1)
if collectionInfo == nil {
log.Ctx(ctx).Warn("failed to get collection info", zap.Int64("collectionID", r.GetCollectionID()))

Check warning on line 522 in internal/querycoordv2/meta/replica_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/meta/replica_manager.go#L522

Added line #L522 was not covered by tests
} else {
dbID = collectionInfo.GetDbID()
}

return &metricsinfo.Replica{
ID: r.GetID(),
CollectionID: r.GetCollectionID(),
DatabaseID: dbID,
RWNodes: r.GetNodes(),
ResourceGroup: r.GetResourceGroup(),
RONodes: r.GetRONodes(),
Expand Down
23 changes: 22 additions & 1 deletion internal/querycoordv2/meta/replica_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,26 @@ func TestGetReplicasJSON(t *testing.T) {
err = replicaManager.put(ctx, replica2)
assert.NoError(t, err)

jsonOutput := replicaManager.GetReplicasJSON(ctx)
meta := &Meta{
CollectionManager: NewCollectionManager(catalog),
}

err = meta.PutCollectionWithoutSave(ctx, &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 100,
DbID: int64(1),
},
})
assert.NoError(t, err)

err = meta.PutCollectionWithoutSave(ctx, &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 200,
},
})
assert.NoError(t, err)

jsonOutput := replicaManager.GetReplicasJSON(ctx, meta)
var replicas []*metricsinfo.Replica
err = json.Unmarshal([]byte(jsonOutput), &replicas)
assert.NoError(t, err)
Expand All @@ -556,10 +575,12 @@ func TestGetReplicasJSON(t *testing.T) {
assert.Equal(t, int64(100), replica.CollectionID)
assert.Equal(t, "rg1", replica.ResourceGroup)
assert.ElementsMatch(t, []int64{1, 2, 3}, replica.RWNodes)
assert.Equal(t, int64(1), replica.DatabaseID)
} else if replica.ID == 2 {
assert.Equal(t, int64(200), replica.CollectionID)
assert.Equal(t, "rg2", replica.ResourceGroup)
assert.ElementsMatch(t, []int64{4, 5, 6}, replica.RWNodes)
assert.Equal(t, int64(0), replica.DatabaseID)
} else {
assert.Failf(t, "unexpected replica id", "unexpected replica id %d", replica.ID)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (s *Server) registerMetricsRequest() {
}

QueryReplicasAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.meta.GetReplicasJSON(ctx), nil
return s.meta.GetReplicasJSON(ctx, s.meta), nil

Check warning on line 218 in internal/querycoordv2/server.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/server.go#L218

Added line #L218 was not covered by tests
}

QueryResourceGroupsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
Expand Down
1 change: 0 additions & 1 deletion internal/querycoordv2/utils/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func SpawnReplicasWithRG(ctx context.Context, m *meta.Meta, collection int64, re
if err != nil {
return nil, err
}

// Spawn it in replica manager.
replicas, err := m.ReplicaManager.Spawn(ctx, collection, replicaNumInRG, channels)
if err != nil {
Expand Down
Loading

0 comments on commit 12a7c44

Please sign in to comment.