Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: add db name in replica description #38673

Open
wants to merge 1 commit into
base: 2.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,15 @@ func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datap
}

func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}

task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ func (t *l0CompactionTask) processExecuting() bool {
log.Warn("l0CompactionTask failed to get compaction result", zap.Error(err))
return false
}

ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts)}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
Expand All @@ -141,15 +138,13 @@ func (t *l0CompactionTask) processExecuting() bool {
return false
}

updateOps = append(updateOps, setState(datapb.CompactionTaskState_meta_saved))
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved)); err != nil {
log.Warn("l0CompactionTask failed to save task meta_saved state", zap.Error(err))
return false
}
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
updateOps = append(updateOps, setState(datapb.CompactionTaskState_failed))
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)); err != nil {
log.Warn("l0CompactionTask failed to set task failed state", zap.Error(err))
return false
}
Expand All @@ -159,9 +154,7 @@ func (t *l0CompactionTask) processExecuting() bool {
}

func (t *l0CompactionTask) processMetaSaved() bool {
ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)}
err := t.updateAndSaveTaskMeta(updateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err != nil {
log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return false
Expand Down Expand Up @@ -358,6 +351,15 @@ func (t *l0CompactionTask) SaveTaskMeta() error {
}

func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}

task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_task_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.Compaction
Type: task.Type.String(),
State: task.State.String(),
FailReason: task.FailReason,
StartTime: typeutil.TimestampToString(uint64(task.StartTime)),
EndTime: typeutil.TimestampToString(uint64(task.EndTime)),
StartTime: typeutil.TimestampToString(uint64(task.StartTime) * 1000),
EndTime: typeutil.TimestampToString(uint64(task.EndTime) * 1000),
TotalRows: task.TotalRows,
InputSegments: lo.Map(task.InputSegments, func(t int64, i int) string {
return strconv.FormatInt(t, 10)
Expand Down
26 changes: 14 additions & 12 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ func (t *mixCompactionTask) processPipelining() bool {

func (t *mixCompactionTask) processMetaSaved() bool {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)}
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil {
log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err))
return false
}
Expand All @@ -119,15 +117,12 @@ func (t *mixCompactionTask) processExecuting() bool {
log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err))
return false
}

ts := time.Now().Unix()
failedUpdateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_failed)}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
if len(result.GetSegments()) == 0 {
log.Info("illegal compaction results")
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
Expand All @@ -137,7 +132,7 @@ func (t *mixCompactionTask) processExecuting() bool {
if err := t.saveSegmentMeta(); err != nil {
log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err))
if errors.Is(err, merr.ErrIllegalCompactionPlan) {
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
Expand All @@ -154,7 +149,7 @@ func (t *mixCompactionTask) processExecuting() bool {
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
log.Info("mixCompactionTask fail in datanode")
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("fail to updateAndSaveTaskMeta")
}
Expand Down Expand Up @@ -240,10 +235,8 @@ func (t *mixCompactionTask) processCompleted() bool {

t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("mixCompactionTask processCompleted done")

task := t.GetTaskProto()
log.Info("mixCompactionTask processCompleted done",
zap.Int64("planID", task.GetPlanID()), zap.Duration("costs", time.Duration(task.GetEndTime()-task.GetStartTime())*time.Second))
return true
}

Expand Down Expand Up @@ -289,6 +282,15 @@ func (t *mixCompactionTask) doClean() error {
}

func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}

task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats {
FailReason: s.FailReason,
IndexSize: s.IndexSize,
IndexVersion: s.IndexVersion,
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime),
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime * 1000),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime * 1000),
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/metastore/model/segment_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex {
IndexSize: segIndex.SerializeSize,
WriteHandoff: segIndex.WriteHandoff,
CurrentIndexVersion: segIndex.GetCurrentIndexVersion(),
FinishedUTCTime: segIndex.FinishedTime,
}
}

Expand All @@ -75,6 +76,7 @@ func MarshalSegmentIndexModel(segIdx *SegmentIndex) *indexpb.SegmentIndex {
SerializeSize: segIdx.IndexSize,
WriteHandoff: segIdx.WriteHandoff,
CurrentIndexVersion: segIdx.CurrentIndexVersion,
FinishedTime: segIdx.FinishedUTCTime,
}
}

Expand All @@ -96,5 +98,6 @@ func CloneSegmentIndex(segIndex *SegmentIndex) *SegmentIndex {
IndexSize: segIndex.IndexSize,
WriteHandoff: segIndex.WriteHandoff,
CurrentIndexVersion: segIndex.CurrentIndexVersion,
FinishedUTCTime: segIndex.FinishedUTCTime,
}
}
1 change: 1 addition & 0 deletions internal/proto/index_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ message SegmentIndex {
bool write_handoff = 15;
int32 current_index_version = 16;
int64 index_store_version = 17;
uint64 finished_time = 18;
}

message RegisterNodeRequest {
Expand Down
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ message CollectionLoadInfo {
LoadType load_type = 6;
int32 recover_times = 7;
repeated int64 load_fields = 8;
int64 dbID= 9;
}

message PartitionLoadInfo {
Expand Down
23 changes: 14 additions & 9 deletions internal/querycoordv2/job/job_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,15 @@ func (job *LoadCollectionJob) Execute() error {
}
}

collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}

// 2. create replica if not exist
replicas := job.meta.ReplicaManager.GetByCollection(job.ctx, req.GetCollectionID())
if len(replicas) == 0 {
collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
return err
}

// API of LoadCollection is wired, we should use map[resourceGroupNames]replicaNumber as input, to keep consistency with `TransferReplica` API.
// Then we can implement dynamic replica changed in different resource group independently.
_, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames())
Expand Down Expand Up @@ -213,6 +214,7 @@ func (job *LoadCollectionJob) Execute() error {
FieldIndexID: req.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadCollection,
LoadFields: req.GetLoadFields(),
DbID: collectionInfo.GetDbId(),
},
CreatedAt: time.Now(),
LoadSpan: sp,
Expand Down Expand Up @@ -371,13 +373,15 @@ func (job *LoadPartitionJob) Execute() error {
}
}

collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}

// 2. create replica if not exist
replicas := job.meta.ReplicaManager.GetByCollection(context.TODO(), req.GetCollectionID())
if len(replicas) == 0 {
collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
return err
}
_, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames())
if err != nil {
msg := "failed to spawn replica for collection"
Expand Down Expand Up @@ -412,6 +416,7 @@ func (job *LoadPartitionJob) Execute() error {
FieldIndexID: req.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadPartition,
LoadFields: req.GetLoadFields(),
DbID: collectionInfo.GetDbId(),
},
CreatedAt: time.Now(),
LoadSpan: sp,
Expand Down
11 changes: 9 additions & 2 deletions internal/querycoordv2/meta/collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,18 +285,25 @@ func (m *CollectionManager) upgradeRecover(ctx context.Context, broker Broker) e
// we should save it's CollectionLoadInfo to meta store
for _, partition := range m.GetAllPartitions(ctx) {
// In old version, collection would NOT be stored if the partition existed.
if _, ok := m.collections[partition.GetCollectionID()]; !ok {
if !m.Exist(ctx, partition.GetCollectionID()) {
collectionInfo, err := broker.DescribeCollection(ctx, partition.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}

col := &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: partition.GetCollectionID(),
ReplicaNumber: partition.GetReplicaNumber(),
Status: partition.GetStatus(),
FieldIndexID: partition.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadPartition,
DbID: collectionInfo.GetDbId(),
},
LoadPercentage: 100,
}
err := m.PutCollection(ctx, col)
err = m.PutCollection(ctx, col)
if err != nil {
return err
}
Expand Down
12 changes: 11 additions & 1 deletion internal/querycoordv2/meta/replica_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (m *ReplicaManager) GetResourceGroupByCollection(ctx context.Context, colle
// It locks the ReplicaManager for reading, converts the replicas to their protobuf representation,
// marshals them into a JSON string, and returns the result.
// If an error occurs during marshaling, it logs a warning and returns an empty string.
func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string {
func (m *ReplicaManager) GetReplicasJSON(ctx context.Context, meta *Meta) string {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()

Expand All @@ -515,9 +515,19 @@ func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string {
for k, v := range r.replicaPB.GetChannelNodeInfos() {
channelTowRWNodes[k] = v.GetRwNodes()
}

collectionInfo := meta.GetCollection(ctx, r.GetCollectionID())
dbID := int64(-1)
if collectionInfo == nil {
log.Ctx(ctx).Warn("failed to get collection info", zap.Int64("collectionID", r.GetCollectionID()))
} else {
dbID = collectionInfo.GetDbID()
}

return &metricsinfo.Replica{
ID: r.GetID(),
CollectionID: r.GetCollectionID(),
DatabaseID: dbID,
RWNodes: r.GetNodes(),
ResourceGroup: r.GetResourceGroup(),
RONodes: r.GetRONodes(),
Expand Down
23 changes: 22 additions & 1 deletion internal/querycoordv2/meta/replica_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,26 @@ func TestGetReplicasJSON(t *testing.T) {
err = replicaManager.put(ctx, replica2)
assert.NoError(t, err)

jsonOutput := replicaManager.GetReplicasJSON(ctx)
meta := &Meta{
CollectionManager: NewCollectionManager(catalog),
}

err = meta.PutCollectionWithoutSave(ctx, &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 100,
DbID: int64(1),
},
})
assert.NoError(t, err)

err = meta.PutCollectionWithoutSave(ctx, &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 200,
},
})
assert.NoError(t, err)

jsonOutput := replicaManager.GetReplicasJSON(ctx, meta)
var replicas []*metricsinfo.Replica
err = json.Unmarshal([]byte(jsonOutput), &replicas)
assert.NoError(t, err)
Expand All @@ -556,10 +575,12 @@ func TestGetReplicasJSON(t *testing.T) {
assert.Equal(t, int64(100), replica.CollectionID)
assert.Equal(t, "rg1", replica.ResourceGroup)
assert.ElementsMatch(t, []int64{1, 2, 3}, replica.RWNodes)
assert.Equal(t, int64(1), replica.DatabaseID)
} else if replica.ID == 2 {
assert.Equal(t, int64(200), replica.CollectionID)
assert.Equal(t, "rg2", replica.ResourceGroup)
assert.ElementsMatch(t, []int64{4, 5, 6}, replica.RWNodes)
assert.Equal(t, int64(0), replica.DatabaseID)
} else {
assert.Failf(t, "unexpected replica id", "unexpected replica id %d", replica.ID)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (s *Server) registerMetricsRequest() {
}

QueryReplicasAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.meta.GetReplicasJSON(ctx), nil
return s.meta.GetReplicasJSON(ctx, s.meta), nil
}

QueryResourceGroupsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
Expand Down
1 change: 0 additions & 1 deletion internal/querycoordv2/utils/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func SpawnReplicasWithRG(ctx context.Context, m *meta.Meta, collection int64, re
if err != nil {
return nil, err
}

// Spawn it in replica manager.
replicas, err := m.ReplicaManager.Spawn(ctx, collection, replicaNumInRG, channels)
if err != nil {
Expand Down
Loading
Loading