Skip to content

Commit

Permalink
enhance: add list index and segment index retrieval API for WebUI (#3…
Browse files Browse the repository at this point in the history
…7861)

issue: #36621

Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 authored Nov 22, 2024
1 parent 7c5a801 commit 7bbfe86
Show file tree
Hide file tree
Showing 24 changed files with 616 additions and 122 deletions.
71 changes: 71 additions & 0 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -1068,3 +1070,72 @@ func (m *indexMeta) TaskStatsJSON() string {
}
return string(ret)
}

func (m *indexMeta) GetIndexJSON(collectionID int64) string {
m.RLock()
defer m.RUnlock()

var indexMetrics []*metricsinfo.Index
for collID, indexes := range m.indexes {
for _, index := range indexes {
if collectionID == 0 || collID == collectionID {
im := &metricsinfo.Index{
CollectionID: collID,
IndexID: index.IndexID,
FieldID: index.FieldID,
Name: index.IndexName,
IsDeleted: index.IsDeleted,
CreateTime: tsoutil.PhysicalTimeFormat(index.CreateTime),
IndexParams: funcutil.KeyValuePair2Map(index.IndexParams),
IsAutoIndex: index.IsAutoIndex,
UserIndexParams: funcutil.KeyValuePair2Map(index.UserIndexParams),
}
indexMetrics = append(indexMetrics, im)
}
}
}

ret, err := json.Marshal(indexMetrics)
if err != nil {
return ""
}
return string(ret)
}

func (m *indexMeta) GetSegmentIndexedFields(collectionID UniqueID, segmentID UniqueID) (bool, []*metricsinfo.IndexedField) {
m.RLock()
defer m.RUnlock()
fieldIndexes, ok := m.indexes[collectionID]
if !ok {
// the segment should be unindexed status if the collection has no indexes
return false, []*metricsinfo.IndexedField{}
}

// the segment should be unindexed status if the segment indexes is not found
segIndexInfos, ok := m.segmentIndexes[segmentID]
if !ok || len(segIndexInfos) == 0 {
return false, []*metricsinfo.IndexedField{}
}

isIndexed := true
var segmentIndexes []*metricsinfo.IndexedField
for _, index := range fieldIndexes {
if si, ok := segIndexInfos[index.IndexID]; !index.IsDeleted {
buildID := int64(-1)
if !ok {
// the segment should be unindexed status if the segment index is not found within field indexes
isIndexed = false
} else {
buildID = si.BuildID
}

segmentIndexes = append(segmentIndexes, &metricsinfo.IndexedField{
IndexFieldID: index.IndexID,
IndexID: index.IndexID,
BuildID: buildID,
IndexSize: int64(si.IndexSize),
})
}
}
return isIndexed, segmentIndexes
}
110 changes: 110 additions & 0 deletions internal/datacoord/index_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,3 +1567,113 @@ func TestBuildIndexTaskStatsJSON(t *testing.T) {
im.segmentBuildInfo.Remove(si1.BuildID)
assert.Equal(t, 1, len(im.segmentBuildInfo.List()))
}

func TestMeta_GetIndexJSON(t *testing.T) {
m := &indexMeta{
indexes: map[UniqueID]map[UniqueID]*model.Index{
1: {
1: &model.Index{
CollectionID: 1,
FieldID: 1,
IndexID: 1,
IndexName: "index1",
IsDeleted: false,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "param1",
Value: "value1",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "param1",
Value: "value1",
},
},
IsAutoIndex: true,
UserIndexParams: []*commonpb.KeyValuePair{
{
Key: "param1",
Value: "value1",
},
},
},
},
},
}

actualJSON := m.GetIndexJSON(0)
var actualIndex []*metricsinfo.Index
err := json.Unmarshal([]byte(actualJSON), &actualIndex)
assert.NoError(t, err)
assert.Equal(t, int64(1), actualIndex[0].CollectionID)
assert.Equal(t, int64(1), actualIndex[0].FieldID)
assert.Equal(t, int64(1), actualIndex[0].IndexID)
assert.Equal(t, map[string]string{"param1": "value1"}, actualIndex[0].IndexParams)
assert.Equal(t, map[string]string{"param1": "value1"}, actualIndex[0].UserIndexParams)
}

func TestMeta_GetSegmentIndexStatus(t *testing.T) {
var (
collID = UniqueID(1)
partID = UniqueID(2)
indexID = UniqueID(10)
fieldID = UniqueID(100)
segID = UniqueID(1000)
buildID = UniqueID(10000)
)

m := &indexMeta{}
m.indexes = map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: "test_index",
IsDeleted: false,
},
},
}
m.segmentIndexes = map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10250,
IndexID: indexID,
BuildID: buildID,
NodeID: 1,
IndexVersion: 0,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 12,
IndexFileKeys: nil,
IndexSize: 0,
},
},
segID + 1: {},
}

t.Run("index exists", func(t *testing.T) {
isIndexed, segmentIndexes := m.GetSegmentIndexedFields(collID, segID)
assert.True(t, isIndexed)
assert.Len(t, segmentIndexes, 1)
assert.Equal(t, indexID, segmentIndexes[0].IndexID)
assert.Equal(t, buildID, segmentIndexes[0].BuildID)
})

t.Run("index does not exist", func(t *testing.T) {
isIndexed, segmentIndexes := m.GetSegmentIndexedFields(collID+1, segID)
assert.False(t, isIndexed)
assert.Empty(t, segmentIndexes)
})

t.Run("segment does not exist", func(t *testing.T) {
isIndexed, segmentIndexes := m.GetSegmentIndexedFields(collID, segID+1)
assert.False(t, isIndexed)
assert.Empty(t, segmentIndexes)
})
}
32 changes: 17 additions & 15 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2046,26 +2046,28 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
return metricMutation, nil
}

func (m *meta) getSegmentsMetrics() []*metricsinfo.Segment {
func (m *meta) getSegmentsMetrics(collectionID int64) []*metricsinfo.Segment {
m.RLock()
defer m.RUnlock()

segments := make([]*metricsinfo.Segment, 0, len(m.segments.segments))
for _, s := range m.segments.segments {
segments = append(segments, &metricsinfo.Segment{
SegmentID: s.ID,
CollectionID: s.CollectionID,
PartitionID: s.PartitionID,
Channel: s.InsertChannel,
NumOfRows: s.NumOfRows,
State: s.State.String(),
MemSize: s.size.Load(),
Level: s.Level.String(),
IsImporting: s.IsImporting,
Compacted: s.Compacted,
IsSorted: s.IsSorted,
NodeID: paramtable.GetNodeID(),
})
if collectionID <= 0 || s.GetCollectionID() == collectionID {
segments = append(segments, &metricsinfo.Segment{
SegmentID: s.ID,
CollectionID: s.CollectionID,
PartitionID: s.PartitionID,
Channel: s.InsertChannel,
NumOfRows: s.NumOfRows,
State: s.State.String(),
MemSize: s.size.Load(),
Level: s.Level.String(),
IsImporting: s.IsImporting,
Compacted: s.Compacted,
IsSorted: s.IsSorted,
NodeID: paramtable.GetNodeID(),
})
}
}

return segments
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,7 @@ func TestMeta_GetSegmentsJSON(t *testing.T) {
},
}

segments := m.getSegmentsMetrics()
segments := m.getSegmentsMetrics(0)

// Check the length of the segments
assert.Equal(t, 2, len(segments))
Expand Down
43 changes: 41 additions & 2 deletions internal/datacoord/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package datacoord

import (
"context"
"fmt"
"sync"

"github.com/cockroachdb/errors"
"github.com/tidwall/gjson"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -132,8 +134,45 @@ func mergeChannels(dnChannels []*metricsinfo.Channel, dcChannels map[int64]map[s
return mergedChannels
}

func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
v := jsonReq.Get(metricsinfo.MetricRequestParamINKey)
if !v.Exists() {
// default to get all segments from dataanode
return s.getDataNodeSegmentsJSON(ctx, req)
}

in := v.String()
if in == "dn" {
// TODO: support filter by collection id
return s.getDataNodeSegmentsJSON(ctx, req)
}

if in == "dc" {
v = jsonReq.Get(metricsinfo.MetricRequestParamCollectionIDKey)
collectionID := int64(0)
if v.Exists() {
collectionID = v.Int()
}

segments := s.meta.getSegmentsMetrics(collectionID)
for _, seg := range segments {
isIndexed, indexedFields := s.meta.indexMeta.GetSegmentIndexedFields(seg.CollectionID, seg.SegmentID)
seg.IndexedFields = indexedFields
seg.IsIndexed = isIndexed
}

bs, err := json.Marshal(segments)
if err != nil {
log.Warn("marshal segment value failed", zap.Int64("collectionID", collectionID), zap.String("err", err.Error()))
return "", nil
}
return string(bs), nil
}
return "", fmt.Errorf("invalid param value in=[%s], it should be dc or dn", in)
}

func (s *Server) getDistJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) string {
segments := s.meta.getSegmentsMetrics()
segments := s.meta.getSegmentsMetrics(-1)
var channels []*metricsinfo.DmChannel
for nodeID, ch := range s.channelManager.GetChannelWatchInfos() {
for _, chInfo := range ch {
Expand All @@ -158,7 +197,7 @@ func (s *Server) getDistJSON(ctx context.Context, req *milvuspb.GetMetricsReques
return string(bs)
}

func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
func (s *Server) getDataNodeSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
ret, err := getMetrics[*metricsinfo.Segment](s, ctx, req)
return metricsinfo.MarshalGetMetricsValues(ret, err)
}
Expand Down
Loading

0 comments on commit 7bbfe86

Please sign in to comment.