Skip to content

Commit

Permalink
feat: add segment,pipeline, replica and resourcegroup api for WebUI
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Nov 1, 2024
1 parent cdee149 commit 9c21615
Show file tree
Hide file tree
Showing 52 changed files with 2,624 additions and 171 deletions.
19 changes: 19 additions & 0 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -53,6 +54,8 @@ type ChannelManager interface {
GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string
GetChannelsByCollectionID(collectionID int64) []RWChannel
GetChannelNamesByCollectionID(collectionID int64) []string

GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo
}

// An interface sessionManager implments
Expand Down Expand Up @@ -730,6 +733,22 @@ func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error {
return nil
}

func (m *ChannelManagerImpl) GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo {
m.mu.RLock()
defer m.mu.RUnlock()
infos := make(map[int64]map[string]*datapb.ChannelWatchInfo)
for _, nc := range m.store.GetNodesChannels() {
for _, ch := range nc.Channels {
watchInfo := proto.Clone(ch.GetWatchInfo()).(*datapb.ChannelWatchInfo)
if _, ok := infos[nc.NodeID]; !ok {
infos[nc.NodeID] = make(map[string]*datapb.ChannelWatchInfo)
}
infos[nc.NodeID][watchInfo.Vchan.ChannelName] = watchInfo
}
}
return infos
}

func inferStateByOpType(opType ChannelOpType) datapb.ChannelWatchState {
switch opType {
case Watch:
Expand Down
48 changes: 48 additions & 0 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,3 +805,51 @@ func (s *ChannelManagerSuite) TestStartupRootCoordFailed() {

func (s *ChannelManagerSuite) TestCheckLoop() {}
func (s *ChannelManagerSuite) TestGet() {}

func (s *ChannelManagerSuite) TestGetChannelWatchInfos() {
store := NewMockRWChannelStore(s.T())
store.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
{
NodeID: 1,
Channels: map[string]RWChannel{
"ch1": &channelMeta{
WatchInfo: &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
ChannelName: "ch1",
},
StartTs: 100,
State: datapb.ChannelWatchState_ToWatch,
OpID: 1,
},
},
},
},
{
NodeID: 2,
Channels: map[string]RWChannel{
"ch2": &channelMeta{
WatchInfo: &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
ChannelName: "ch2",
},
StartTs: 10,
State: datapb.ChannelWatchState_WatchSuccess,
OpID: 1,
},
},
},
},
})

cm := &ChannelManagerImpl{store: store}
infos := cm.GetChannelWatchInfos()
s.Equal(2, len(infos))
s.Equal("ch1", infos[1]["ch1"].GetVchan().ChannelName)
s.Equal("ch2", infos[2]["ch2"].GetVchan().ChannelName)

// test empty value
store.EXPECT().GetNodesChannels().Unset()
store.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{})
infos = cm.GetChannelWatchInfos()
s.Equal(0, len(infos))
}
31 changes: 31 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord

import (
"context"
"encoding/json"
"fmt"
"math"
"path"
Expand Down Expand Up @@ -2025,3 +2026,33 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats

return metricMutation, nil
}

func (m *meta) getSegmentsJSON() string {
m.RLock()
defer m.RUnlock()

segments := make([]*metricsinfo.Segment, 0, len(m.segments.segments))
for _, s := range m.segments.segments {
segments = append(segments, &metricsinfo.Segment{
SegmentID: s.ID,
CollectionID: s.CollectionID,
PartitionID: s.PartitionID,
Channel: s.InsertChannel,
NumOfRows: s.NumOfRows,
State: s.State.String(),
MemSize: s.size.Load(),
Level: s.Level.String(),
IsImporting: s.IsImporting,
Compacted: s.Compacted,
IsSorted: s.IsSorted,
NodeID: paramtable.GetNodeID(),
})
}

data, err := json.Marshal(segments)
if err != nil {
log.Warn("Failed to marshal segments to JSON", zap.Error(err))
return ""
}
return string(data)
}
64 changes: 64 additions & 0 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package datacoord

import (
"context"
"encoding/json"
"sync/atomic"
"testing"

Expand All @@ -43,6 +44,7 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/testutils"
)
Expand Down Expand Up @@ -1246,3 +1248,65 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) {
assert.NotNil(t, c)
})
}

func TestMeta_GetSegmentsJSON(t *testing.T) {
// Create a mock meta object
m := &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "channel1",
NumOfRows: 100,
State: commonpb.SegmentState_Growing,
MaxRowNum: 1000,
Compacted: false,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 2,
InsertChannel: "channel2",
NumOfRows: 200,
State: commonpb.SegmentState_Sealed,
MaxRowNum: 2000,
Compacted: true,
},
},
},
},
}

// Call the method
jsonStr := m.getSegmentsJSON()

var segments []*metricsinfo.Segment
err := json.Unmarshal([]byte(jsonStr), &segments)
assert.NoError(t, err)

// Check the length of the segments
assert.Equal(t, 2, len(segments))

// Check the first segment
assert.Equal(t, int64(1), segments[0].SegmentID)
assert.Equal(t, int64(1), segments[0].CollectionID)
assert.Equal(t, int64(1), segments[0].PartitionID)
assert.Equal(t, "channel1", segments[0].Channel)
assert.Equal(t, int64(100), segments[0].NumOfRows)
assert.Equal(t, "Growing", segments[0].State)
assert.False(t, segments[0].Compacted)

// Check the second segment
assert.Equal(t, int64(2), segments[1].SegmentID)
assert.Equal(t, int64(2), segments[1].CollectionID)
assert.Equal(t, int64(2), segments[1].PartitionID)
assert.Equal(t, "channel2", segments[1].Channel)
assert.Equal(t, int64(200), segments[1].NumOfRows)
assert.Equal(t, "Sealed", segments[1].State)
assert.True(t, segments[1].Compacted)
}
147 changes: 90 additions & 57 deletions internal/datacoord/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"context"
"encoding/json"
"sync"

"github.com/cockroachdb/errors"
"go.uber.org/zap"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -82,74 +84,64 @@ func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoor
return ret
}

// GetSyncTaskMetrics retrieves and aggregates the sync task metrics of the datanode.
func (s *Server) GetSyncTaskMetrics(
ctx context.Context,
req *milvuspb.GetMetricsRequest,
) (string, error) {
resp, err := s.requestDataNodeGetMetrics(ctx, req)
if err != nil {
return "", err
func (s *Server) getChannelsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
dnChannelWatchInfos, err := getMetrics[*metricsinfo.Channel](s, ctx, req)
dcChannelWatchInfos := s.channelManager.GetChannelWatchInfos()
newChannels := mergeChannels(dnChannelWatchInfos, dcChannelWatchInfos)

// fill checkpoint timestamp
channel2Checkpoints := s.meta.GetChannelCheckpoints()
for _, channel := range newChannels {
if cp, ok := channel2Checkpoints[channel.Name]; ok {
channel.CheckpointTS = typeutil.TimestampToString(cp.GetTimestamp())
} else {
log.Warn("channel not found in meta cache", zap.String("channel", channel.Name))
}
}
return metricsinfo.MarshalGetMetricsValues(newChannels, err)
}

tasks := make(map[string][]*metricsinfo.SyncTask, resp.Len())
resp.Range(func(key string, value *milvuspb.GetMetricsResponse) bool {
if value.Response != "" {
var sts []*metricsinfo.SyncTask
if err1 := json.Unmarshal([]byte(value.Response), &sts); err1 != nil {
log.Warn("failed to unmarshal sync task metrics")
err = err1
return false
// mergeChannels merges the channel metrics from data nodes and channel watch infos from channel manager
// dnChannels: a slice of Channel metrics from data nodes
// dcChannels: a map of channel watch infos from the channel manager, keyed by node ID and channel name
func mergeChannels(dnChannels []*metricsinfo.Channel, dcChannels map[int64]map[string]*datapb.ChannelWatchInfo) []*metricsinfo.Channel {
mergedChannels := make([]*metricsinfo.Channel, 0)

// Add or update channels from data nodes
for _, dnChannel := range dnChannels {
if dcChannelMap, ok := dcChannels[dnChannel.NodeID]; ok {
if dcChannel, ok := dcChannelMap[dnChannel.Name]; ok {
dnChannel.WatchState = dcChannel.State.String()
delete(dcChannelMap, dnChannel.Name)
}
tasks[key] = sts
}
return true
})

if err != nil {
return "", err
mergedChannels = append(mergedChannels, dnChannel)
}

if len(tasks) == 0 {
return "", nil
// Add remaining channels from channel manager
for nodeID, dcChannelMap := range dcChannels {
for _, dcChannel := range dcChannelMap {
mergedChannels = append(mergedChannels, &metricsinfo.Channel{
Name: dcChannel.Vchan.ChannelName,
CollectionID: dcChannel.Vchan.CollectionID,
WatchState: dcChannel.State.String(),
AssignSate: "Unassigned",
NodeID: nodeID,
})
}
}

bs, err := json.Marshal(tasks)
if err != nil {
return "", err
}
return (string)(bs), nil
return mergedChannels
}

func (s *Server) requestDataNodeGetMetrics(
ctx context.Context,
req *milvuspb.GetMetricsRequest,
) (*typeutil.ConcurrentMap[string, *milvuspb.GetMetricsResponse], error) {
nodes := s.cluster.GetSessions()

rets := typeutil.NewConcurrentMap[string, *milvuspb.GetMetricsResponse]()
wg, ctx := errgroup.WithContext(ctx)
for _, node := range nodes {
wg.Go(func() error {
cli, err := node.GetOrCreateClient(ctx)
if err != nil {
return err
}
ret, err := cli.GetMetrics(ctx, req)
if err != nil {
return err
}
key := metricsinfo.ConstructComponentName(typeutil.DataNodeRole, node.NodeID())
rets.Insert(key, ret)
return nil
})
}
func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
ret, err := getMetrics[*metricsinfo.Segment](s, ctx, req)
return metricsinfo.MarshalGetMetricsValues(ret, err)
}

err := wg.Wait()
if err != nil {
return nil, err
}
return rets, nil
func (s *Server) getSyncTaskJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) {
ret, err := getMetrics[*metricsinfo.SyncTask](s, ctx, req)
return metricsinfo.MarshalGetMetricsValues(ret, err)
}

// getSystemInfoMetrics composes data cluster metrics
Expand Down Expand Up @@ -322,3 +314,44 @@ func (s *Server) getIndexNodeMetrics(ctx context.Context, req *milvuspb.GetMetri
infos.BaseComponentInfos.HasError = false
return infos, nil
}

// getMetrics retrieves and aggregates the metrics of the datanode to a slice
func getMetrics[T any](s *Server, ctx context.Context, req *milvuspb.GetMetricsRequest) ([]T, error) {
var metrics []T
var mu sync.Mutex
errorGroup, ctx := errgroup.WithContext(ctx)

nodes := s.cluster.GetSessions()
for _, node := range nodes {
errorGroup.Go(func() error {
cli, err := node.GetOrCreateClient(ctx)
if err != nil {
return err
}
resp, err := cli.GetMetrics(ctx, req)
if err != nil {
log.Warn("failed to get metric from DataNode", zap.Int64("nodeID", node.NodeID()))
return err
}

if resp.Response == "" {
return nil
}

var infos []T
err = json.Unmarshal([]byte(resp.Response), &infos)
if err != nil {
log.Warn("invalid metrics of data node was found", zap.Error(err))
return err
}

mu.Lock()
metrics = append(metrics, infos...)
mu.Unlock()
return nil
})
}

err := errorGroup.Wait()
return metrics, err
}
Loading

0 comments on commit 9c21615

Please sign in to comment.