Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: add db name in replica #38672

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,15 @@ func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datap
}

func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}

task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ func (t *l0CompactionTask) processExecuting() bool {
log.Warn("l0CompactionTask failed to get compaction result", zap.Error(err))
return false
}

ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts)}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
Expand All @@ -141,15 +138,13 @@ func (t *l0CompactionTask) processExecuting() bool {
return false
}

updateOps = append(updateOps, setState(datapb.CompactionTaskState_meta_saved))
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved)); err != nil {
log.Warn("l0CompactionTask failed to save task meta_saved state", zap.Error(err))
return false
}
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
updateOps = append(updateOps, setState(datapb.CompactionTaskState_failed))
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)); err != nil {
log.Warn("l0CompactionTask failed to set task failed state", zap.Error(err))
return false
}
Expand All @@ -159,9 +154,7 @@ func (t *l0CompactionTask) processExecuting() bool {
}

func (t *l0CompactionTask) processMetaSaved() bool {
ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)}
err := t.updateAndSaveTaskMeta(updateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err != nil {
log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return false
Expand Down Expand Up @@ -358,6 +351,15 @@ func (t *l0CompactionTask) SaveTaskMeta() error {
}

func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}

task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_task_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.Compaction
Type: task.Type.String(),
State: task.State.String(),
FailReason: task.FailReason,
StartTime: typeutil.TimestampToString(uint64(task.StartTime)),
EndTime: typeutil.TimestampToString(uint64(task.EndTime)),
StartTime: typeutil.TimestampToString(uint64(task.StartTime) * 1000),
EndTime: typeutil.TimestampToString(uint64(task.EndTime) * 1000),
TotalRows: task.TotalRows,
InputSegments: lo.Map(task.InputSegments, func(t int64, i int) string {
return strconv.FormatInt(t, 10)
Expand Down
26 changes: 14 additions & 12 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@

func (t *mixCompactionTask) processMetaSaved() bool {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
ts := time.Now().Unix()
updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)}
if err := t.updateAndSaveTaskMeta(updateOps...); err != nil {
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil {
log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err))
return false
}
Expand All @@ -119,15 +117,12 @@
log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err))
return false
}

ts := time.Now().Unix()
failedUpdateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_failed)}
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
if len(result.GetSegments()) == 0 {
log.Info("illegal compaction results")
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))

Check warning on line 125 in internal/datacoord/compaction_task_mix.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_task_mix.go#L125

Added line #L125 was not covered by tests
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
Expand All @@ -137,7 +132,7 @@
if err := t.saveSegmentMeta(); err != nil {
log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err))
if errors.Is(err, merr.ErrIllegalCompactionPlan) {
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))

Check warning on line 135 in internal/datacoord/compaction_task_mix.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_task_mix.go#L135

Added line #L135 was not covered by tests
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
Expand All @@ -154,7 +149,7 @@
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
log.Info("mixCompactionTask fail in datanode")
err := t.updateAndSaveTaskMeta(failedUpdateOps...)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))

Check warning on line 152 in internal/datacoord/compaction_task_mix.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_task_mix.go#L152

Added line #L152 was not covered by tests
if err != nil {
log.Warn("fail to updateAndSaveTaskMeta")
}
Expand Down Expand Up @@ -240,10 +235,8 @@

t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("mixCompactionTask processCompleted done")

task := t.GetTaskProto()
log.Info("mixCompactionTask processCompleted done",
zap.Int64("planID", task.GetPlanID()), zap.Duration("costs", time.Duration(task.GetEndTime()-task.GetStartTime())*time.Second))
return true
}

Expand Down Expand Up @@ -289,6 +282,15 @@
}

func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
// if task state is completed, cleaned, failed, timeout, then do append end time and save
if t.GetTaskProto().State == datapb.CompactionTaskState_completed ||
t.GetTaskProto().State == datapb.CompactionTaskState_cleaned ||
t.GetTaskProto().State == datapb.CompactionTaskState_failed ||
t.GetTaskProto().State == datapb.CompactionTaskState_timeout {
ts := time.Now().Unix()
opts = append(opts, setEndTime(ts))
}

task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats {
FailReason: s.FailReason,
IndexSize: s.IndexSize,
IndexVersion: s.IndexVersion,
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime),
CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime * 1000),
FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime * 1000),
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/metastore/model/segment_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex {
IndexSize: segIndex.SerializeSize,
WriteHandoff: segIndex.WriteHandoff,
CurrentIndexVersion: segIndex.GetCurrentIndexVersion(),
FinishedUTCTime: segIndex.FinishedTime,
}
}

Expand All @@ -75,6 +76,7 @@ func MarshalSegmentIndexModel(segIdx *SegmentIndex) *indexpb.SegmentIndex {
SerializeSize: segIdx.IndexSize,
WriteHandoff: segIdx.WriteHandoff,
CurrentIndexVersion: segIdx.CurrentIndexVersion,
FinishedTime: segIdx.FinishedUTCTime,
}
}

Expand All @@ -96,5 +98,6 @@ func CloneSegmentIndex(segIndex *SegmentIndex) *SegmentIndex {
IndexSize: segIndex.IndexSize,
WriteHandoff: segIndex.WriteHandoff,
CurrentIndexVersion: segIndex.CurrentIndexVersion,
FinishedUTCTime: segIndex.FinishedUTCTime,
}
}
1 change: 1 addition & 0 deletions internal/proto/index_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ message SegmentIndex {
bool write_handoff = 15;
int32 current_index_version = 16;
int64 index_store_version = 17;
uint64 finished_time = 18;
}

message RegisterNodeRequest {
Expand Down
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ message CollectionLoadInfo {
LoadType load_type = 6;
int32 recover_times = 7;
repeated int64 load_fields = 8;
int64 dbID= 9;
}

message PartitionLoadInfo {
Expand Down
23 changes: 14 additions & 9 deletions internal/querycoordv2/job/job_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,15 @@
}
}

collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}

Check warning on line 178 in internal/querycoordv2/job/job_load.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/job/job_load.go#L176-L178

Added lines #L176 - L178 were not covered by tests

// 2. create replica if not exist
replicas := job.meta.ReplicaManager.GetByCollection(job.ctx, req.GetCollectionID())
if len(replicas) == 0 {
collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
return err
}

// API of LoadCollection is wired, we should use map[resourceGroupNames]replicaNumber as input, to keep consistency with `TransferReplica` API.
// Then we can implement dynamic replica changed in different resource group independently.
_, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames())
Expand Down Expand Up @@ -213,6 +214,7 @@
FieldIndexID: req.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadCollection,
LoadFields: req.GetLoadFields(),
DbID: collectionInfo.GetDbId(),
},
CreatedAt: time.Now(),
LoadSpan: sp,
Expand Down Expand Up @@ -371,13 +373,15 @@
}
}

collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}

Check warning on line 380 in internal/querycoordv2/job/job_load.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/job/job_load.go#L378-L380

Added lines #L378 - L380 were not covered by tests

// 2. create replica if not exist
replicas := job.meta.ReplicaManager.GetByCollection(context.TODO(), req.GetCollectionID())
if len(replicas) == 0 {
collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID())
if err != nil {
return err
}
_, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames())
if err != nil {
msg := "failed to spawn replica for collection"
Expand Down Expand Up @@ -412,6 +416,7 @@
FieldIndexID: req.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadPartition,
LoadFields: req.GetLoadFields(),
DbID: collectionInfo.GetDbId(),
},
CreatedAt: time.Now(),
LoadSpan: sp,
Expand Down
11 changes: 9 additions & 2 deletions internal/querycoordv2/meta/collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,18 +285,25 @@
// we should save it's CollectionLoadInfo to meta store
for _, partition := range m.GetAllPartitions(ctx) {
// In old version, collection would NOT be stored if the partition existed.
if _, ok := m.collections[partition.GetCollectionID()]; !ok {
if !m.Exist(ctx, partition.GetCollectionID()) {
collectionInfo, err := broker.DescribeCollection(ctx, partition.GetCollectionID())
if err != nil {
log.Warn("failed to describe collection from RootCoord", zap.Error(err))
return err
}

Check warning on line 293 in internal/querycoordv2/meta/collection_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/meta/collection_manager.go#L291-L293

Added lines #L291 - L293 were not covered by tests

col := &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: partition.GetCollectionID(),
ReplicaNumber: partition.GetReplicaNumber(),
Status: partition.GetStatus(),
FieldIndexID: partition.GetFieldIndexID(),
LoadType: querypb.LoadType_LoadPartition,
DbID: collectionInfo.GetDbId(),
},
LoadPercentage: 100,
}
err := m.PutCollection(ctx, col)
err = m.PutCollection(ctx, col)
if err != nil {
return err
}
Expand Down
12 changes: 11 additions & 1 deletion internal/querycoordv2/meta/replica_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@
// It locks the ReplicaManager for reading, converts the replicas to their protobuf representation,
// marshals them into a JSON string, and returns the result.
// If an error occurs during marshaling, it logs a warning and returns an empty string.
func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string {
func (m *ReplicaManager) GetReplicasJSON(ctx context.Context, meta *Meta) string {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()

Expand All @@ -515,9 +515,19 @@
for k, v := range r.replicaPB.GetChannelNodeInfos() {
channelTowRWNodes[k] = v.GetRwNodes()
}

collectionInfo := meta.GetCollection(ctx, r.GetCollectionID())
dbID := int64(-1)
if collectionInfo == nil {
log.Ctx(ctx).Warn("failed to get collection info", zap.Int64("collectionID", r.GetCollectionID()))

Check warning on line 522 in internal/querycoordv2/meta/replica_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/meta/replica_manager.go#L522

Added line #L522 was not covered by tests
} else {
dbID = collectionInfo.GetDbID()
}

return &metricsinfo.Replica{
ID: r.GetID(),
CollectionID: r.GetCollectionID(),
DatabaseID: dbID,
RWNodes: r.GetNodes(),
ResourceGroup: r.GetResourceGroup(),
RONodes: r.GetRONodes(),
Expand Down
23 changes: 22 additions & 1 deletion internal/querycoordv2/meta/replica_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,26 @@ func TestGetReplicasJSON(t *testing.T) {
err = replicaManager.put(ctx, replica2)
assert.NoError(t, err)

jsonOutput := replicaManager.GetReplicasJSON(ctx)
meta := &Meta{
CollectionManager: NewCollectionManager(catalog),
}

err = meta.PutCollectionWithoutSave(ctx, &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 100,
DbID: int64(1),
},
})
assert.NoError(t, err)

err = meta.PutCollectionWithoutSave(ctx, &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: 200,
},
})
assert.NoError(t, err)

jsonOutput := replicaManager.GetReplicasJSON(ctx, meta)
var replicas []*metricsinfo.Replica
err = json.Unmarshal([]byte(jsonOutput), &replicas)
assert.NoError(t, err)
Expand All @@ -556,10 +575,12 @@ func TestGetReplicasJSON(t *testing.T) {
assert.Equal(t, int64(100), replica.CollectionID)
assert.Equal(t, "rg1", replica.ResourceGroup)
assert.ElementsMatch(t, []int64{1, 2, 3}, replica.RWNodes)
assert.Equal(t, int64(1), replica.DatabaseID)
} else if replica.ID == 2 {
assert.Equal(t, int64(200), replica.CollectionID)
assert.Equal(t, "rg2", replica.ResourceGroup)
assert.ElementsMatch(t, []int64{4, 5, 6}, replica.RWNodes)
assert.Equal(t, int64(0), replica.DatabaseID)
} else {
assert.Failf(t, "unexpected replica id", "unexpected replica id %d", replica.ID)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@
}

QueryReplicasAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.meta.GetReplicasJSON(ctx), nil
return s.meta.GetReplicasJSON(ctx, s.meta), nil

Check warning on line 218 in internal/querycoordv2/server.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/server.go#L218

Added line #L218 was not covered by tests
}

QueryResourceGroupsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
Expand Down
1 change: 0 additions & 1 deletion internal/querycoordv2/utils/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func SpawnReplicasWithRG(ctx context.Context, m *meta.Meta, collection int64, re
if err != nil {
return nil, err
}

// Spawn it in replica manager.
replicas, err := m.ReplicaManager.Spawn(ctx, collection, replicaNumInRG, channels)
if err != nil {
Expand Down
Loading
Loading