diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 53a5fbd97e3a6..4f939fefbf043 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -716,6 +716,15 @@ func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datap } func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error { + // if task state is completed, cleaned, failed, timeout, then do append end time and save + if t.GetTaskProto().State == datapb.CompactionTaskState_completed || + t.GetTaskProto().State == datapb.CompactionTaskState_cleaned || + t.GetTaskProto().State == datapb.CompactionTaskState_failed || + t.GetTaskProto().State == datapb.CompactionTaskState_timeout { + ts := time.Now().Unix() + opts = append(opts, setEndTime(ts)) + } + task := t.ShadowClone(opts...) err := t.saveTaskMeta(task) if err != nil { diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index e7535bca8e4b5..96bdb207059cd 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -130,9 +130,6 @@ func (t *l0CompactionTask) processExecuting() bool { log.Warn("l0CompactionTask failed to get compaction result", zap.Error(err)) return false } - - ts := time.Now().Unix() - updateOps := []compactionTaskOpt{setEndTime(ts)} switch result.GetState() { case datapb.CompactionTaskState_completed: t.result = result @@ -141,15 +138,13 @@ func (t *l0CompactionTask) processExecuting() bool { return false } - updateOps = append(updateOps, setState(datapb.CompactionTaskState_meta_saved)) - if err := t.updateAndSaveTaskMeta(updateOps...); err != nil { + if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved)); err != nil { log.Warn("l0CompactionTask failed to save task meta_saved state", zap.Error(err)) return false } return t.processMetaSaved() case datapb.CompactionTaskState_failed: - updateOps = append(updateOps, setState(datapb.CompactionTaskState_failed)) - if err := t.updateAndSaveTaskMeta(updateOps...); err != nil { + if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)); err != nil { log.Warn("l0CompactionTask failed to set task failed state", zap.Error(err)) return false } @@ -159,9 +154,7 @@ func (t *l0CompactionTask) processExecuting() bool { } func (t *l0CompactionTask) processMetaSaved() bool { - ts := time.Now().Unix() - updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)} - err := t.updateAndSaveTaskMeta(updateOps...) + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) if err != nil { log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) return false @@ -358,6 +351,15 @@ func (t *l0CompactionTask) SaveTaskMeta() error { } func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error { + // if task state is completed, cleaned, failed, timeout, then do append end time and save + if t.GetTaskProto().State == datapb.CompactionTaskState_completed || + t.GetTaskProto().State == datapb.CompactionTaskState_cleaned || + t.GetTaskProto().State == datapb.CompactionTaskState_failed || + t.GetTaskProto().State == datapb.CompactionTaskState_timeout { + ts := time.Now().Unix() + opts = append(opts, setEndTime(ts)) + } + task := t.ShadowClone(opts...) err := t.saveTaskMeta(task) if err != nil { diff --git a/internal/datacoord/compaction_task_meta.go b/internal/datacoord/compaction_task_meta.go index 33a2ad20b59cb..be18fd0884fae 100644 --- a/internal/datacoord/compaction_task_meta.go +++ b/internal/datacoord/compaction_task_meta.go @@ -43,8 +43,8 @@ func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.Compaction Type: task.Type.String(), State: task.State.String(), FailReason: task.FailReason, - StartTime: typeutil.TimestampToString(uint64(task.StartTime)), - EndTime: typeutil.TimestampToString(uint64(task.EndTime)), + StartTime: typeutil.TimestampToString(uint64(task.StartTime) * 1000), + EndTime: typeutil.TimestampToString(uint64(task.EndTime) * 1000), TotalRows: task.TotalRows, InputSegments: lo.Map(task.InputSegments, func(t int64, i int) string { return strconv.FormatInt(t, 10) diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index c61756949574a..90b43432e065d 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -97,9 +97,7 @@ func (t *mixCompactionTask) processPipelining() bool { func (t *mixCompactionTask) processMetaSaved() bool { log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID())) - ts := time.Now().Unix() - updateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_completed)} - if err := t.updateAndSaveTaskMeta(updateOps...); err != nil { + if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil { log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err)) return false } @@ -119,15 +117,12 @@ func (t *mixCompactionTask) processExecuting() bool { log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err)) return false } - - ts := time.Now().Unix() - failedUpdateOps := []compactionTaskOpt{setEndTime(ts), setState(datapb.CompactionTaskState_failed)} switch result.GetState() { case datapb.CompactionTaskState_completed: t.result = result if len(result.GetSegments()) == 0 { log.Info("illegal compaction results") - err := t.updateAndSaveTaskMeta(failedUpdateOps...) + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) if err != nil { log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false @@ -137,7 +132,7 @@ func (t *mixCompactionTask) processExecuting() bool { if err := t.saveSegmentMeta(); err != nil { log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err)) if errors.Is(err, merr.ErrIllegalCompactionPlan) { - err := t.updateAndSaveTaskMeta(failedUpdateOps...) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) if err != nil { log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false @@ -154,7 +149,7 @@ func (t *mixCompactionTask) processExecuting() bool { return t.processMetaSaved() case datapb.CompactionTaskState_failed: log.Info("mixCompactionTask fail in datanode") - err := t.updateAndSaveTaskMeta(failedUpdateOps...) + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) if err != nil { log.Warn("fail to updateAndSaveTaskMeta") } @@ -240,10 +235,8 @@ func (t *mixCompactionTask) processCompleted() bool { t.resetSegmentCompacting() UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) + log.Info("mixCompactionTask processCompleted done") - task := t.GetTaskProto() - log.Info("mixCompactionTask processCompleted done", - zap.Int64("planID", task.GetPlanID()), zap.Duration("costs", time.Duration(task.GetEndTime()-task.GetStartTime())*time.Second)) return true } @@ -289,6 +282,15 @@ func (t *mixCompactionTask) doClean() error { } func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error { + // if task state is completed, cleaned, failed, timeout, then do append end time and save + if t.GetTaskProto().State == datapb.CompactionTaskState_completed || + t.GetTaskProto().State == datapb.CompactionTaskState_cleaned || + t.GetTaskProto().State == datapb.CompactionTaskState_failed || + t.GetTaskProto().State == datapb.CompactionTaskState_timeout { + ts := time.Now().Unix() + opts = append(opts, setEndTime(ts)) + } + task := t.ShadowClone(opts...) err := t.saveTaskMeta(task) if err != nil { diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index bc12f4f88da1c..37e744bd8a4d5 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -76,8 +76,8 @@ func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats { FailReason: s.FailReason, IndexSize: s.IndexSize, IndexVersion: s.IndexVersion, - CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime), - FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime), + CreatedUTCTime: typeutil.TimestampToString(s.CreatedUTCTime * 1000), + FinishedUTCTime: typeutil.TimestampToString(s.FinishedUTCTime * 1000), } } diff --git a/internal/metastore/model/segment_index.go b/internal/metastore/model/segment_index.go index 9f6dfd8dce537..8c2e8e3ef3827 100644 --- a/internal/metastore/model/segment_index.go +++ b/internal/metastore/model/segment_index.go @@ -50,6 +50,7 @@ func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex { IndexSize: segIndex.SerializeSize, WriteHandoff: segIndex.WriteHandoff, CurrentIndexVersion: segIndex.GetCurrentIndexVersion(), + FinishedUTCTime: segIndex.FinishedTime, } } @@ -75,6 +76,7 @@ func MarshalSegmentIndexModel(segIdx *SegmentIndex) *indexpb.SegmentIndex { SerializeSize: segIdx.IndexSize, WriteHandoff: segIdx.WriteHandoff, CurrentIndexVersion: segIdx.CurrentIndexVersion, + FinishedTime: segIdx.FinishedUTCTime, } } @@ -96,5 +98,6 @@ func CloneSegmentIndex(segIndex *SegmentIndex) *SegmentIndex { IndexSize: segIndex.IndexSize, WriteHandoff: segIndex.WriteHandoff, CurrentIndexVersion: segIndex.CurrentIndexVersion, + FinishedUTCTime: segIndex.FinishedUTCTime, } } diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index adcd0aed7bd37..b736098d46b82 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -80,6 +80,7 @@ message SegmentIndex { bool write_handoff = 15; int32 current_index_version = 16; int64 index_store_version = 17; + uint64 finished_time = 18; } message RegisterNodeRequest { diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index d469841c525ff..9b44e6f5c9560 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -668,6 +668,7 @@ message CollectionLoadInfo { LoadType load_type = 6; int32 recover_times = 7; repeated int64 load_fields = 8; + int64 dbID= 9; } message PartitionLoadInfo { diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 5009be26fb4ad..998335f9e9a41 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -171,14 +171,15 @@ func (job *LoadCollectionJob) Execute() error { } } + collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID()) + if err != nil { + log.Warn("failed to describe collection from RootCoord", zap.Error(err)) + return err + } + // 2. create replica if not exist replicas := job.meta.ReplicaManager.GetByCollection(job.ctx, req.GetCollectionID()) if len(replicas) == 0 { - collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID()) - if err != nil { - return err - } - // API of LoadCollection is wired, we should use map[resourceGroupNames]replicaNumber as input, to keep consistency with `TransferReplica` API. // Then we can implement dynamic replica changed in different resource group independently. _, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames()) @@ -213,6 +214,7 @@ func (job *LoadCollectionJob) Execute() error { FieldIndexID: req.GetFieldIndexID(), LoadType: querypb.LoadType_LoadCollection, LoadFields: req.GetLoadFields(), + DbID: collectionInfo.GetDbId(), }, CreatedAt: time.Now(), LoadSpan: sp, @@ -371,13 +373,15 @@ func (job *LoadPartitionJob) Execute() error { } } + collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID()) + if err != nil { + log.Warn("failed to describe collection from RootCoord", zap.Error(err)) + return err + } + // 2. create replica if not exist replicas := job.meta.ReplicaManager.GetByCollection(context.TODO(), req.GetCollectionID()) if len(replicas) == 0 { - collectionInfo, err := job.broker.DescribeCollection(job.ctx, req.GetCollectionID()) - if err != nil { - return err - } _, err = utils.SpawnReplicasWithRG(job.ctx, job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber(), collectionInfo.GetVirtualChannelNames()) if err != nil { msg := "failed to spawn replica for collection" @@ -412,6 +416,7 @@ func (job *LoadPartitionJob) Execute() error { FieldIndexID: req.GetFieldIndexID(), LoadType: querypb.LoadType_LoadPartition, LoadFields: req.GetLoadFields(), + DbID: collectionInfo.GetDbId(), }, CreatedAt: time.Now(), LoadSpan: sp, diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index f95d18b0658f3..980331ff64c80 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -285,7 +285,13 @@ func (m *CollectionManager) upgradeRecover(ctx context.Context, broker Broker) e // we should save it's CollectionLoadInfo to meta store for _, partition := range m.GetAllPartitions(ctx) { // In old version, collection would NOT be stored if the partition existed. - if _, ok := m.collections[partition.GetCollectionID()]; !ok { + if !m.Exist(ctx, partition.GetCollectionID()) { + collectionInfo, err := broker.DescribeCollection(ctx, partition.GetCollectionID()) + if err != nil { + log.Warn("failed to describe collection from RootCoord", zap.Error(err)) + return err + } + col := &Collection{ CollectionLoadInfo: &querypb.CollectionLoadInfo{ CollectionID: partition.GetCollectionID(), @@ -293,10 +299,11 @@ func (m *CollectionManager) upgradeRecover(ctx context.Context, broker Broker) e Status: partition.GetStatus(), FieldIndexID: partition.GetFieldIndexID(), LoadType: querypb.LoadType_LoadPartition, + DbID: collectionInfo.GetDbId(), }, LoadPercentage: 100, } - err := m.PutCollection(ctx, col) + err = m.PutCollection(ctx, col) if err != nil { return err } diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index 2c04c52c7cddc..a9de0566d2c5d 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -506,7 +506,7 @@ func (m *ReplicaManager) GetResourceGroupByCollection(ctx context.Context, colle // It locks the ReplicaManager for reading, converts the replicas to their protobuf representation, // marshals them into a JSON string, and returns the result. // If an error occurs during marshaling, it logs a warning and returns an empty string. -func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string { +func (m *ReplicaManager) GetReplicasJSON(ctx context.Context, meta *Meta) string { m.rwmutex.RLock() defer m.rwmutex.RUnlock() @@ -515,9 +515,19 @@ func (m *ReplicaManager) GetReplicasJSON(ctx context.Context) string { for k, v := range r.replicaPB.GetChannelNodeInfos() { channelTowRWNodes[k] = v.GetRwNodes() } + + collectionInfo := meta.GetCollection(ctx, r.GetCollectionID()) + dbID := int64(-1) + if collectionInfo == nil { + log.Ctx(ctx).Warn("failed to get collection info", zap.Int64("collectionID", r.GetCollectionID())) + } else { + dbID = collectionInfo.GetDbID() + } + return &metricsinfo.Replica{ ID: r.GetID(), CollectionID: r.GetCollectionID(), + DatabaseID: dbID, RWNodes: r.GetNodes(), ResourceGroup: r.GetResourceGroup(), RONodes: r.GetRONodes(), diff --git a/internal/querycoordv2/meta/replica_manager_test.go b/internal/querycoordv2/meta/replica_manager_test.go index cdfc1cbdfc8b5..7921129590190 100644 --- a/internal/querycoordv2/meta/replica_manager_test.go +++ b/internal/querycoordv2/meta/replica_manager_test.go @@ -545,7 +545,26 @@ func TestGetReplicasJSON(t *testing.T) { err = replicaManager.put(ctx, replica2) assert.NoError(t, err) - jsonOutput := replicaManager.GetReplicasJSON(ctx) + meta := &Meta{ + CollectionManager: NewCollectionManager(catalog), + } + + err = meta.PutCollectionWithoutSave(ctx, &Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 100, + DbID: int64(1), + }, + }) + assert.NoError(t, err) + + err = meta.PutCollectionWithoutSave(ctx, &Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 200, + }, + }) + assert.NoError(t, err) + + jsonOutput := replicaManager.GetReplicasJSON(ctx, meta) var replicas []*metricsinfo.Replica err = json.Unmarshal([]byte(jsonOutput), &replicas) assert.NoError(t, err) @@ -556,10 +575,12 @@ func TestGetReplicasJSON(t *testing.T) { assert.Equal(t, int64(100), replica.CollectionID) assert.Equal(t, "rg1", replica.ResourceGroup) assert.ElementsMatch(t, []int64{1, 2, 3}, replica.RWNodes) + assert.Equal(t, int64(1), replica.DatabaseID) } else if replica.ID == 2 { assert.Equal(t, int64(200), replica.CollectionID) assert.Equal(t, "rg2", replica.ResourceGroup) assert.ElementsMatch(t, []int64{4, 5, 6}, replica.RWNodes) + assert.Equal(t, int64(0), replica.DatabaseID) } else { assert.Failf(t, "unexpected replica id", "unexpected replica id %d", replica.ID) } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 8ae802cf7848f..58261423659c2 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -215,7 +215,7 @@ func (s *Server) registerMetricsRequest() { } QueryReplicasAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return s.meta.GetReplicasJSON(ctx), nil + return s.meta.GetReplicasJSON(ctx, s.meta), nil } QueryResourceGroupsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index 1744e2367849f..fe15c685bbf0c 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -153,7 +153,6 @@ func SpawnReplicasWithRG(ctx context.Context, m *meta.Meta, collection int64, re if err != nil { return nil, err } - // Spawn it in replica manager. replicas, err := m.ReplicaManager.Spawn(ctx, collection, replicaNumInRG, channels) if err != nil { diff --git a/pkg/util/metricsinfo/metrics_info.go b/pkg/util/metricsinfo/metrics_info.go index 6a6b5b46f8679..02e7a2fbf5d0b 100644 --- a/pkg/util/metricsinfo/metrics_info.go +++ b/pkg/util/metricsinfo/metrics_info.go @@ -195,6 +195,7 @@ type ResourceGroup struct { type Replica struct { ID int64 `json:"ID,omitempty,string"` CollectionID int64 `json:"collectionID,omitempty,string"` + DatabaseID int64 `json:"database_id,omitempty,string"` RWNodes []int64 `json:"rw_nodes,omitempty"` ResourceGroup string `json:"resource_group,omitempty"` RONodes []int64 `json:"ro_nodes,omitempty"` @@ -382,8 +383,8 @@ type ImportTask struct { } type CompactionTask struct { - PlanID int64 `json:"plan_id,omitempty"` - CollectionID int64 `json:"collection_id,omitempty"` + PlanID int64 `json:"plan_id,omitempty,string"` + CollectionID int64 `json:"collection_id,omitempty,string"` Type string `json:"type,omitempty"` State string `json:"state,omitempty"` FailReason string `json:"fail_reason,omitempty"` @@ -447,7 +448,7 @@ type Collection struct { ConsistencyLevel string `json:"consistency_level,omitempty"` Aliases []string `json:"aliases,omitempty"` Properties map[string]string `json:"properties,omitempty"` - DBName string `json:"db_name,omitempty,string"` + DBName string `json:"db_name,omitempty"` NumPartitions int `json:"num_partitions,omitempty,string"` VirtualChannelNames []string `json:"virtual_channel_names,omitempty"` PhysicalChannelNames []string `json:"physical_channel_names,omitempty"` @@ -458,7 +459,7 @@ type Collection struct { type Database struct { DBName string `json:"db_name,omitempty"` - DBID int64 `json:"dbID,omitempty"` + DBID int64 `json:"dbID,omitempty,string"` CreatedTimestamp string `json:"created_timestamp,omitempty"` Properties map[string]string `json:"properties,omitempty"` }