diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 01a9a0162552d..5ff3fe8cb1edb 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -53,6 +54,8 @@ type ChannelManager interface { GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string GetChannelsByCollectionID(collectionID int64) []RWChannel GetChannelNamesByCollectionID(collectionID int64) []string + + GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo } // An interface sessionManager implments @@ -730,6 +733,22 @@ func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error { return nil } +func (m *ChannelManagerImpl) GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo { + m.mu.RLock() + defer m.mu.RUnlock() + infos := make(map[int64]map[string]*datapb.ChannelWatchInfo) + for _, nc := range m.store.GetNodesChannels() { + for _, ch := range nc.Channels { + watchInfo := proto.Clone(ch.GetWatchInfo()).(*datapb.ChannelWatchInfo) + if _, ok := infos[nc.NodeID]; !ok { + infos[nc.NodeID] = make(map[string]*datapb.ChannelWatchInfo) + } + infos[nc.NodeID][watchInfo.Vchan.ChannelName] = watchInfo + } + } + return infos +} + func inferStateByOpType(opType ChannelOpType) datapb.ChannelWatchState { switch opType { case Watch: diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 919fbd1831061..450a260754eef 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -805,3 +805,51 @@ func (s *ChannelManagerSuite) TestStartupRootCoordFailed() { func (s *ChannelManagerSuite) TestCheckLoop() {} func (s *ChannelManagerSuite) TestGet() {} + +func (s *ChannelManagerSuite) TestGetChannelWatchInfos() { + store := NewMockRWChannelStore(s.T()) + store.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{ + { + NodeID: 1, + Channels: map[string]RWChannel{ + "ch1": &channelMeta{ + WatchInfo: &datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ + ChannelName: "ch1", + }, + StartTs: 100, + State: datapb.ChannelWatchState_ToWatch, + OpID: 1, + }, + }, + }, + }, + { + NodeID: 2, + Channels: map[string]RWChannel{ + "ch2": &channelMeta{ + WatchInfo: &datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ + ChannelName: "ch2", + }, + StartTs: 10, + State: datapb.ChannelWatchState_WatchSuccess, + OpID: 1, + }, + }, + }, + }, + }) + + cm := &ChannelManagerImpl{store: store} + infos := cm.GetChannelWatchInfos() + s.Equal(2, len(infos)) + s.Equal("ch1", infos[1]["ch1"].GetVchan().ChannelName) + s.Equal("ch2", infos[2]["ch2"].GetVchan().ChannelName) + + // test empty value + store.EXPECT().GetNodesChannels().Unset() + store.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{}) + infos = cm.GetChannelWatchInfos() + s.Equal(0, len(infos)) +} diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index dec075915c11d..2e63576cb97a3 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -19,6 +19,7 @@ package datacoord import ( "context" + "encoding/json" "fmt" "math" "path" @@ -2025,3 +2026,33 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats return metricMutation, nil } + +func (m *meta) getSegmentsJSON() string { + m.RLock() + defer m.RUnlock() + + segments := make([]*metricsinfo.Segment, 0, len(m.segments.segments)) + for _, s := range m.segments.segments { + segments = append(segments, &metricsinfo.Segment{ + SegmentID: s.ID, + CollectionID: s.CollectionID, + PartitionID: s.PartitionID, + Channel: s.InsertChannel, + NumOfRows: s.NumOfRows, + State: s.State.String(), + MemSize: s.size.Load(), + Level: s.Level.String(), + IsImporting: s.IsImporting, + Compacted: s.Compacted, + IsSorted: s.IsSorted, + NodeID: paramtable.GetNodeID(), + }) + } + + data, err := json.Marshal(segments) + if err != nil { + log.Warn("Failed to marshal segments to JSON", zap.Error(err)) + return "" + } + return string(data) +} diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 8492173123b7b..4714133b19e85 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -18,6 +18,7 @@ package datacoord import ( "context" + "encoding/json" "sync/atomic" "testing" @@ -43,6 +44,7 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/testutils" ) @@ -1246,3 +1248,65 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) { assert.NotNil(t, c) }) } + +func TestMeta_GetSegmentsJSON(t *testing.T) { + // Create a mock meta object + m := &meta{ + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + PartitionID: 1, + InsertChannel: "channel1", + NumOfRows: 100, + State: commonpb.SegmentState_Growing, + MaxRowNum: 1000, + Compacted: false, + }, + }, + 2: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 2, + PartitionID: 2, + InsertChannel: "channel2", + NumOfRows: 200, + State: commonpb.SegmentState_Sealed, + MaxRowNum: 2000, + Compacted: true, + }, + }, + }, + }, + } + + // Call the method + jsonStr := m.getSegmentsJSON() + + var segments []*metricsinfo.Segment + err := json.Unmarshal([]byte(jsonStr), &segments) + assert.NoError(t, err) + + // Check the length of the segments + assert.Equal(t, 2, len(segments)) + + // Check the first segment + assert.Equal(t, int64(1), segments[0].SegmentID) + assert.Equal(t, int64(1), segments[0].CollectionID) + assert.Equal(t, int64(1), segments[0].PartitionID) + assert.Equal(t, "channel1", segments[0].Channel) + assert.Equal(t, int64(100), segments[0].NumOfRows) + assert.Equal(t, "Growing", segments[0].State) + assert.False(t, segments[0].Compacted) + + // Check the second segment + assert.Equal(t, int64(2), segments[1].SegmentID) + assert.Equal(t, int64(2), segments[1].CollectionID) + assert.Equal(t, int64(2), segments[1].PartitionID) + assert.Equal(t, "channel2", segments[1].Channel) + assert.Equal(t, int64(200), segments[1].NumOfRows) + assert.Equal(t, "Sealed", segments[1].State) + assert.True(t, segments[1].Compacted) +} diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index 1728aee08d0d9..a23064322f864 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -19,6 +19,7 @@ package datacoord import ( "context" "encoding/json" + "sync" "github.com/cockroachdb/errors" "go.uber.org/zap" @@ -27,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/datacoord/session" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" @@ -82,74 +84,64 @@ func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoor return ret } -// GetSyncTaskMetrics retrieves and aggregates the sync task metrics of the datanode. -func (s *Server) GetSyncTaskMetrics( - ctx context.Context, - req *milvuspb.GetMetricsRequest, -) (string, error) { - resp, err := s.requestDataNodeGetMetrics(ctx, req) - if err != nil { - return "", err +func (s *Server) getChannelsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { + dnChannelWatchInfos, err := getMetrics[*metricsinfo.Channel](s, ctx, req) + dcChannelWatchInfos := s.channelManager.GetChannelWatchInfos() + newChannels := mergeChannels(dnChannelWatchInfos, dcChannelWatchInfos) + + // fill checkpoint timestamp + channel2Checkpoints := s.meta.GetChannelCheckpoints() + for _, channel := range newChannels { + if cp, ok := channel2Checkpoints[channel.Name]; ok { + channel.CheckpointTS = typeutil.TimestampToString(cp.GetTimestamp()) + } else { + log.Warn("channel not found in meta cache", zap.String("channel", channel.Name)) + } } + return metricsinfo.MarshalGetMetricsValues(newChannels, err) +} - tasks := make(map[string][]*metricsinfo.SyncTask, resp.Len()) - resp.Range(func(key string, value *milvuspb.GetMetricsResponse) bool { - if value.Response != "" { - var sts []*metricsinfo.SyncTask - if err1 := json.Unmarshal([]byte(value.Response), &sts); err1 != nil { - log.Warn("failed to unmarshal sync task metrics") - err = err1 - return false +// mergeChannels merges the channel metrics from data nodes and channel watch infos from channel manager +// dnChannels: a slice of Channel metrics from data nodes +// dcChannels: a map of channel watch infos from the channel manager, keyed by node ID and channel name +func mergeChannels(dnChannels []*metricsinfo.Channel, dcChannels map[int64]map[string]*datapb.ChannelWatchInfo) []*metricsinfo.Channel { + mergedChannels := make([]*metricsinfo.Channel, 0) + + // Add or update channels from data nodes + for _, dnChannel := range dnChannels { + if dcChannelMap, ok := dcChannels[dnChannel.NodeID]; ok { + if dcChannel, ok := dcChannelMap[dnChannel.Name]; ok { + dnChannel.WatchState = dcChannel.State.String() + delete(dcChannelMap, dnChannel.Name) } - tasks[key] = sts } - return true - }) - - if err != nil { - return "", err + mergedChannels = append(mergedChannels, dnChannel) } - if len(tasks) == 0 { - return "", nil + // Add remaining channels from channel manager + for nodeID, dcChannelMap := range dcChannels { + for _, dcChannel := range dcChannelMap { + mergedChannels = append(mergedChannels, &metricsinfo.Channel{ + Name: dcChannel.Vchan.ChannelName, + CollectionID: dcChannel.Vchan.CollectionID, + WatchState: dcChannel.State.String(), + AssignSate: "Unassigned", + NodeID: nodeID, + }) + } } - bs, err := json.Marshal(tasks) - if err != nil { - return "", err - } - return (string)(bs), nil + return mergedChannels } -func (s *Server) requestDataNodeGetMetrics( - ctx context.Context, - req *milvuspb.GetMetricsRequest, -) (*typeutil.ConcurrentMap[string, *milvuspb.GetMetricsResponse], error) { - nodes := s.cluster.GetSessions() - - rets := typeutil.NewConcurrentMap[string, *milvuspb.GetMetricsResponse]() - wg, ctx := errgroup.WithContext(ctx) - for _, node := range nodes { - wg.Go(func() error { - cli, err := node.GetOrCreateClient(ctx) - if err != nil { - return err - } - ret, err := cli.GetMetrics(ctx, req) - if err != nil { - return err - } - key := metricsinfo.ConstructComponentName(typeutil.DataNodeRole, node.NodeID()) - rets.Insert(key, ret) - return nil - }) - } +func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { + ret, err := getMetrics[*metricsinfo.Segment](s, ctx, req) + return metricsinfo.MarshalGetMetricsValues(ret, err) +} - err := wg.Wait() - if err != nil { - return nil, err - } - return rets, nil +func (s *Server) getSyncTaskJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { + ret, err := getMetrics[*metricsinfo.SyncTask](s, ctx, req) + return metricsinfo.MarshalGetMetricsValues(ret, err) } // getSystemInfoMetrics composes data cluster metrics @@ -322,3 +314,44 @@ func (s *Server) getIndexNodeMetrics(ctx context.Context, req *milvuspb.GetMetri infos.BaseComponentInfos.HasError = false return infos, nil } + +// getMetrics retrieves and aggregates the metrics of the datanode to a slice +func getMetrics[T any](s *Server, ctx context.Context, req *milvuspb.GetMetricsRequest) ([]T, error) { + var metrics []T + var mu sync.Mutex + errorGroup, ctx := errgroup.WithContext(ctx) + + nodes := s.cluster.GetSessions() + for _, node := range nodes { + errorGroup.Go(func() error { + cli, err := node.GetOrCreateClient(ctx) + if err != nil { + return err + } + resp, err := cli.GetMetrics(ctx, req) + if err != nil { + log.Warn("failed to get metric from DataNode", zap.Int64("nodeID", node.NodeID())) + return err + } + + if resp.Response == "" { + return nil + } + + var infos []T + err = json.Unmarshal([]byte(resp.Response), &infos) + if err != nil { + log.Warn("invalid metrics of data node was found", zap.Error(err)) + return err + } + + mu.Lock() + metrics = append(metrics, infos...) + mu.Unlock() + return nil + }) + } + + err := errorGroup.Wait() + return metrics, err +} diff --git a/internal/datacoord/metrics_info_test.go b/internal/datacoord/metrics_info_test.go index f85d43998ef95..8f17f3237c8ed 100644 --- a/internal/datacoord/metrics_info_test.go +++ b/internal/datacoord/metrics_info_test.go @@ -18,6 +18,7 @@ package datacoord import ( "context" + "encoding/json" "testing" "github.com/cockroachdb/errors" @@ -25,7 +26,9 @@ import ( "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/datacoord/session" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" @@ -200,16 +203,95 @@ func TestGetIndexNodeMetrics(t *testing.T) { assert.Equal(t, metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, 100), info.BaseComponentInfos.Name) } +func TestMergeChannels(t *testing.T) { + dnChannels := []*metricsinfo.Channel{ + { + Name: "channel1", + CollectionID: 100, + NodeID: 1, + AssignSate: "Assigned", + }, + { + Name: "channel2", + CollectionID: 101, + NodeID: 2, + AssignSate: "Assigned", + }, + } + + dcChannels := map[int64]map[string]*datapb.ChannelWatchInfo{ + 1: { + "channel1": { + Vchan: &datapb.VchannelInfo{ + ChannelName: "channel1", + CollectionID: 100, + }, + State: datapb.ChannelWatchState_WatchSuccess, + }, + }, + 3: { + "channel3": { + Vchan: &datapb.VchannelInfo{ + ChannelName: "channel3", + CollectionID: 102, + }, + State: datapb.ChannelWatchState_ToWatch, + }, + }, + } + + expectedChannels := []*metricsinfo.Channel{ + { + Name: "channel1", + CollectionID: 100, + NodeID: 1, + WatchState: "WatchSuccess", + AssignSate: "Assigned", + }, + { + Name: "channel2", + CollectionID: 101, + NodeID: 2, + AssignSate: "Assigned", + }, + { + Name: "channel3", + CollectionID: 102, + NodeID: 3, + WatchState: "ToWatch", + AssignSate: "Unassigned", + }, + } + + result := mergeChannels(dnChannels, dcChannels) + assert.ElementsMatch(t, expectedChannels, result) +} + func TestGetSyncTaskMetrics(t *testing.T) { svr := Server{} t.Run("ReturnsCorrectJSON", func(t *testing.T) { req := &milvuspb.GetMetricsRequest{} ctx := context.Background() - task := `[{"segment_id": 1, "batch_rows": 100, "segment_level": "L0", "ts_from": 1000, "ts_to": 2000,"delta_row_count": 10, "flush_size": 1024, "running_time": 2000000000}]` + tasks := []metricsinfo.SyncTask{ + { + SegmentID: 1, + BatchRows: 100, + SegmentLevel: "L0", + TSFrom: 1000, + TSTo: 2000, + DeltaRowCount: 10, + FlushSize: 1024, + RunningTime: "2h", + }, + } + tasksBytes, err := json.Marshal(tasks) + assert.NoError(t, err) + expectedJSON := string(tasksBytes) + mockResp := &milvuspb.GetMetricsResponse{ Status: merr.Success(), - Response: task, + Response: expectedJSON, } mockClient := &mockMetricDataNodeClient{ @@ -226,9 +308,8 @@ func TestGetSyncTaskMetrics(t *testing.T) { mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) svr.cluster = mockCluster - actualJSON, err := svr.GetSyncTaskMetrics(ctx, req) + actualJSON, err := svr.getSyncTaskJSON(ctx, req) assert.NoError(t, err) - expectedJSON := `{"datanode1":[{"segment_id":1,"batch_rows":100,"segment_level":"L0","ts_from":1000,"ts_to":2000,"delta_row_count":10,"flush_size":1024,"running_time":2000000000}]}` assert.Equal(t, expectedJSON, actualJSON) }) @@ -250,7 +331,7 @@ func TestGetSyncTaskMetrics(t *testing.T) { mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) svr.cluster = mockCluster - actualJSON, err := svr.GetSyncTaskMetrics(ctx, req) + actualJSON, err := svr.getSyncTaskJSON(ctx, req) assert.Error(t, err) assert.Equal(t, "", actualJSON) }) @@ -278,7 +359,7 @@ func TestGetSyncTaskMetrics(t *testing.T) { mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) svr.cluster = mockCluster - actualJSON, err := svr.GetSyncTaskMetrics(ctx, req) + actualJSON, err := svr.getSyncTaskJSON(ctx, req) assert.Error(t, err) assert.Equal(t, "", actualJSON) }) @@ -307,7 +388,291 @@ func TestGetSyncTaskMetrics(t *testing.T) { svr.cluster = mockCluster expectedJSON := "" - actualJSON, err := svr.GetSyncTaskMetrics(ctx, req) + actualJSON, err := svr.getSyncTaskJSON(ctx, req) + assert.NoError(t, err) + assert.Equal(t, expectedJSON, actualJSON) + }) +} + +func TestGetSegmentsJSON(t *testing.T) { + svr := Server{} + t.Run("ReturnsCorrectJSON", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + segments := []*metricsinfo.Segment{ + { + SegmentID: 1, + CollectionID: 100, + PartitionID: 10, + NumOfRows: 1000, + State: "Flushed", + }, + } + segmentsBytes, err := json.Marshal(segments) + assert.NoError(t, err) + expectedJSON := string(segmentsBytes) + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: expectedJSON, + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + actualJSON, err := svr.getSegmentsJSON(ctx, req) + assert.NoError(t, err) + assert.Equal(t, expectedJSON, actualJSON) + }) + + t.Run("ReturnsErrorOnRequestFailure", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return nil, errors.New("request failed") + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + actualJSON, err := svr.getSegmentsJSON(ctx, req) + assert.Error(t, err) + assert.Equal(t, "", actualJSON) + }) + + t.Run("ReturnsErrorOnUnmarshalFailure", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: `invalid json`, + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + actualJSON, err := svr.getSegmentsJSON(ctx, req) + assert.Error(t, err) + assert.Equal(t, "", actualJSON) + }) + + t.Run("ReturnsEmptyJSONWhenNoSegments", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: "", + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + expectedJSON := "" + actualJSON, err := svr.getSegmentsJSON(ctx, req) + assert.NoError(t, err) + assert.Equal(t, expectedJSON, actualJSON) + }) +} + +func TestGetChannelsJSON(t *testing.T) { + svr := Server{} + t.Run("ReturnsCorrectJSON", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + + channels := []*metricsinfo.Channel{ + { + Name: "channel1", + CollectionID: 100, + NodeID: 1, + AssignSate: "Assigned", + }, + } + channelsBytes, err := json.Marshal(channels) + assert.NoError(t, err) + channelJSON := string(channelsBytes) + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: channelJSON, + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + cm := NewMockChannelManager(t) + cm.EXPECT().GetChannelWatchInfos().Return(map[int64]map[string]*datapb.ChannelWatchInfo{}) + svr.channelManager = cm + svr.meta = &meta{channelCPs: newChannelCps()} + svr.meta.channelCPs.checkpoints["channel1"] = &msgpb.MsgPosition{Timestamp: 1000} + + actualJSON, err := svr.getChannelsJSON(context.TODO(), req) + assert.NoError(t, err) + + channels = []*metricsinfo.Channel{ + { + Name: "channel1", + CollectionID: 100, + NodeID: 1, + AssignSate: "Assigned", + CheckpointTS: "1970-01-01 08:16:40", + }, + } + channelsBytes, err = json.Marshal(channels) + assert.NoError(t, err) + expectedJSON := string(channelsBytes) + + assert.Equal(t, expectedJSON, actualJSON) + }) + + t.Run("ReturnsErrorOnRequestFailure", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return nil, errors.New("request failed") + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + cm := NewMockChannelManager(t) + cm.EXPECT().GetChannelWatchInfos().Return(map[int64]map[string]*datapb.ChannelWatchInfo{}) + svr.channelManager = cm + + svr.meta = &meta{channelCPs: newChannelCps()} + + actualJSON, err := svr.getChannelsJSON(ctx, req) + assert.Error(t, err) + assert.Equal(t, "", actualJSON) + }) + + t.Run("ReturnsErrorOnUnmarshalFailure", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: `invalid json`, + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + cm := NewMockChannelManager(t) + cm.EXPECT().GetChannelWatchInfos().Return(map[int64]map[string]*datapb.ChannelWatchInfo{}) + svr.channelManager = cm + + svr.meta = &meta{channelCPs: newChannelCps()} + + actualJSON, err := svr.getChannelsJSON(ctx, req) + assert.Error(t, err) + assert.Equal(t, "", actualJSON) + }) + + t.Run("ReturnsEmptyJSONWhenNoChannels", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: "", + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + cm := NewMockChannelManager(t) + cm.EXPECT().GetChannelWatchInfos().Return(map[int64]map[string]*datapb.ChannelWatchInfo{}) + svr.channelManager = cm + + svr.meta = &meta{channelCPs: newChannelCps()} + + expectedJSON := "" + actualJSON, err := svr.getChannelsJSON(ctx, req) assert.NoError(t, err) assert.Equal(t, expectedJSON, actualJSON) }) diff --git a/internal/datacoord/mock_channelmanager.go b/internal/datacoord/mock_channelmanager.go index 9a61852db87c3..fccecee419de2 100644 --- a/internal/datacoord/mock_channelmanager.go +++ b/internal/datacoord/mock_channelmanager.go @@ -5,6 +5,7 @@ package datacoord import ( context "context" + datapb "github.com/milvus-io/milvus/internal/proto/datapb" mock "github.com/stretchr/testify/mock" ) @@ -308,6 +309,53 @@ func (_c *MockChannelManager_GetChannelNamesByCollectionID_Call) RunAndReturn(ru return _c } +// GetChannelWatchInfos provides a mock function with given fields: +func (_m *MockChannelManager) GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetChannelWatchInfos") + } + + var r0 map[int64]map[string]*datapb.ChannelWatchInfo + if rf, ok := ret.Get(0).(func() map[int64]map[string]*datapb.ChannelWatchInfo); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]map[string]*datapb.ChannelWatchInfo) + } + } + + return r0 +} + +// MockChannelManager_GetChannelWatchInfos_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelWatchInfos' +type MockChannelManager_GetChannelWatchInfos_Call struct { + *mock.Call +} + +// GetChannelWatchInfos is a helper method to define mock.On call +func (_e *MockChannelManager_Expecter) GetChannelWatchInfos() *MockChannelManager_GetChannelWatchInfos_Call { + return &MockChannelManager_GetChannelWatchInfos_Call{Call: _e.mock.On("GetChannelWatchInfos")} +} + +func (_c *MockChannelManager_GetChannelWatchInfos_Call) Run(run func()) *MockChannelManager_GetChannelWatchInfos_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockChannelManager_GetChannelWatchInfos_Call) Return(_a0 map[int64]map[string]*datapb.ChannelWatchInfo) *MockChannelManager_GetChannelWatchInfos_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockChannelManager_GetChannelWatchInfos_Call) RunAndReturn(run func() map[int64]map[string]*datapb.ChannelWatchInfo) *MockChannelManager_GetChannelWatchInfos_Call { + _c.Call.Return(run) + return _c +} + // GetChannelsByCollectionID provides a mock function with given fields: collectionID func (_m *MockChannelManager) GetChannelsByCollectionID(collectionID int64) []RWChannel { ret := _m.Called(collectionID) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 3046183c2de68..b145b1889498f 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1137,6 +1137,11 @@ func (s *Server) registerMetricsRequest() { return s.getSystemInfoMetrics(ctx, req) }) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataDist, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.meta.getSegmentsJSON(), nil + }) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.ImportTasks, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return s.importMeta.TaskStatsJSON(), nil @@ -1154,8 +1159,19 @@ func (s *Server) registerMetricsRequest() { s.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return s.GetSyncTaskMetrics(ctx, req) + return s.getSyncTaskJSON(ctx, req) + }) + + s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.getSegmentsJSON(ctx, req) }) + + s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.getChannelsJSON(ctx, req) + }) + log.Info("register metrics actions finished") } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index eccba38a1580c..bbf62f484db9d 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -284,10 +284,21 @@ func (node *DataNode) registerMetricsRequest() { func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return node.getSystemInfoMetrics(ctx, req) }) + node.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return node.syncMgr.TaskStatsJSON(), nil }) + + node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return node.flowgraphManager.GetSegmentsJSON(), nil + }) + + node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return node.flowgraphManager.GetChannelsJSON(), nil + }) log.Info("register metrics actions finished") } diff --git a/internal/datanode/importv2/pool_test.go b/internal/datanode/importv2/pool_test.go index 06873c6d31ae5..4449a5031c812 100644 --- a/internal/datanode/importv2/pool_test.go +++ b/internal/datanode/importv2/pool_test.go @@ -20,9 +20,10 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/stretchr/testify/assert" ) func TestResizePools(t *testing.T) { diff --git a/internal/flushcommon/metacache/segment.go b/internal/flushcommon/metacache/segment.go index 8c4906ff7201e..4bdadda6177ad 100644 --- a/internal/flushcommon/metacache/segment.go +++ b/internal/flushcommon/metacache/segment.go @@ -87,6 +87,14 @@ func (s *SegmentInfo) Level() datapb.SegmentLevel { return s.level } +func (s *SegmentInfo) BufferRows() int64 { + return s.bufferRows +} + +func (s *SegmentInfo) SyncingRows() int64 { + return s.syncingRows +} + func (s *SegmentInfo) Clone() *SegmentInfo { return &SegmentInfo{ segmentID: s.segmentID, diff --git a/internal/flushcommon/pipeline/flow_graph_manager.go b/internal/flushcommon/pipeline/flow_graph_manager.go index 2d8b930442f81..42c4123b2816f 100644 --- a/internal/flushcommon/pipeline/flow_graph_manager.go +++ b/internal/flushcommon/pipeline/flow_graph_manager.go @@ -18,6 +18,7 @@ package pipeline import ( "context" + "encoding/json" "fmt" "go.uber.org/zap" @@ -25,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/flushcommon/util" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -40,6 +42,8 @@ type FlowgraphManager interface { GetFlowgraphCount() int GetCollectionIDs() []int64 + GetChannelsJSON() string + GetSegmentsJSON() string Close() } @@ -115,6 +119,60 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 { return collectionSet.Collect() } +// GetChannelsJSON returns all channels in json format. +func (fm *fgManagerImpl) GetChannelsJSON() string { + var channels []*metricsinfo.Channel + fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool { + latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch) + channels = append(channels, &metricsinfo.Channel{ + Name: ch, + WatchState: ds.fg.Status(), + AssignSate: "Assigned", + LatestTimeTick: typeutil.TimestampToString(latestTimeTick), + NodeID: paramtable.GetNodeID(), + CollectionID: ds.metacache.Collection(), + }) + return true + }) + + ret, err := json.Marshal(channels) + if err != nil { + log.Warn("failed to marshal channels", zap.Error(err)) + return "" + + } + return string(ret) +} + +func (fm *fgManagerImpl) GetSegmentsJSON() string { + var segments []*metricsinfo.Segment + fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool { + meta := ds.metacache + for _, segment := range meta.GetSegmentsBy() { + segments = append(segments, &metricsinfo.Segment{ + SegmentID: segment.SegmentID(), + CollectionID: meta.Collection(), + PartitionID: segment.PartitionID(), + State: segment.State().String(), + Level: segment.Level().String(), + NodeID: paramtable.GetNodeID(), + NumOfRows: segment.NumOfRows(), + FlushedRows: segment.FlushedRows(), + SyncBufferRows: segment.BufferRows(), + SyncingRows: segment.SyncingRows(), + }) + } + return true + }) + + ret, err := json.Marshal(segments) + if err != nil { + log.Warn("failed to marshal segments", zap.Error(err)) + return "" + } + return string(ret) +} + func (fm *fgManagerImpl) Close() { fm.cancelFunc() } diff --git a/internal/flushcommon/pipeline/flow_graph_manager_test.go b/internal/flushcommon/pipeline/flow_graph_manager_test.go index aaa25cb4f5196..4453a2f9f73ef 100644 --- a/internal/flushcommon/pipeline/flow_graph_manager_test.go +++ b/internal/flushcommon/pipeline/flow_graph_manager_test.go @@ -19,7 +19,6 @@ package pipeline import ( "context" "fmt" - "math/rand" "os" "testing" @@ -30,6 +29,8 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/flushcommon/broker" + "github.com/milvus-io/milvus/internal/flushcommon/metacache" + "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" "github.com/milvus-io/milvus/internal/flushcommon/util" "github.com/milvus-io/milvus/internal/flushcommon/writebuffer" @@ -39,6 +40,7 @@ import ( "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestMain(t *testing.M) { @@ -98,7 +100,7 @@ func TestFlowGraphManager(t *testing.T) { } func generateChannelWatchInfo() *datapb.ChannelWatchInfo { - collectionID := int64(rand.Uint32()) + collectionID := int64(1) dmChannelName := fmt.Sprintf("%s_%d", "fake-ch-", collectionID) schema := &schemapb.CollectionSchema{ Name: fmt.Sprintf("%s_%d", "collection_", collectionID), @@ -124,3 +126,79 @@ func generateChannelWatchInfo() *datapb.ChannelWatchInfo { Schema: schema, } } + +type mockTimeSender struct{} + +func (m *mockTimeSender) Update(channel string, ts typeutil.Timestamp, stats []*commonpb.SegmentStats) { + panic("implement me") +} + +func (m *mockTimeSender) GetLatestTimestamp(channel string) typeutil.Timestamp { + return 0 +} + +func newFlowGraphManager(t *testing.T) (string, FlowgraphManager) { + mockBroker := broker.NewMockBroker(t) + mockBroker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() + mockBroker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() + mockBroker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe() + + wbm := writebuffer.NewMockBufferManager(t) + wbm.EXPECT().Register(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + dispClient := msgdispatcher.NewMockClient(t) + dispClient.EXPECT().Register(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(make(chan *msgstream.MsgPack), nil) + + pipelineParams := &util.PipelineParams{ + Ctx: context.TODO(), + Session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 0}}, + Broker: mockBroker, + TimeTickSender: &mockTimeSender{}, + DispClient: dispClient, + WriteBufferManager: wbm, + } + + chanWatchInfo := generateChannelWatchInfo() + ds, err := NewDataSyncService( + context.TODO(), + pipelineParams, + chanWatchInfo, + util.NewTickler(), + ) + assert.NoError(t, err) + + fm := NewFlowgraphManager() + fm.AddFlowgraph(ds) + return ds.vchannelName, fm +} + +func TestGetChannelsJSON(t *testing.T) { + paramtable.SetNodeID(1) + _, fm := newFlowGraphManager(t) + jsonResult := fm.GetChannelsJSON() + expectedJSON := `[{"name":"fake-ch-_1","watched_status":"Healthy","node_id":1,"collection_id":1}]` + assert.JSONEq(t, expectedJSON, jsonResult) +} + +func TestGetSegmentJSON(t *testing.T) { + ch, fm := newFlowGraphManager(t) + ds, ok := fm.GetFlowgraphService(ch) + assert.True(t, ok) + pkStatsFactory := func(*datapb.SegmentInfo) pkoracle.PkStat { + return pkoracle.NewBloomFilterSet() + } + segment := &datapb.SegmentInfo{ + ID: 1, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + NumOfRows: 10240, + CollectionID: 100, + InsertChannel: "ch", + } + ds.metacache.AddSegment(segment, pkStatsFactory, metacache.NoneBm25StatsFactory) + jsonResult := fm.GetSegmentsJSON() + fmt.Println(jsonResult) + expectedJSON := `[{"segment_id":1,"collection_id":1,"partition_id":10,"num_of_rows":10240,"state":"Flushed","level":"L1","node_id":1,"flushed_rows":10240}]` + assert.JSONEq(t, expectedJSON, jsonResult) +} diff --git a/internal/flushcommon/pipeline/mock_fgmanager.go b/internal/flushcommon/pipeline/mock_fgmanager.go index 6945e21ff271f..cf8cd6b2aa1ca 100644 --- a/internal/flushcommon/pipeline/mock_fgmanager.go +++ b/internal/flushcommon/pipeline/mock_fgmanager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package pipeline @@ -114,10 +114,59 @@ func (_c *MockFlowgraphManager_Close_Call) RunAndReturn(run func()) *MockFlowgra return _c } +// GetChannelsJSON provides a mock function with given fields: +func (_m *MockFlowgraphManager) GetChannelsJSON() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetChannelsJSON") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockFlowgraphManager_GetChannelsJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelsJSON' +type MockFlowgraphManager_GetChannelsJSON_Call struct { + *mock.Call +} + +// GetChannelsJSON is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) GetChannelsJSON() *MockFlowgraphManager_GetChannelsJSON_Call { + return &MockFlowgraphManager_GetChannelsJSON_Call{Call: _e.mock.On("GetChannelsJSON")} +} + +func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Run(run func()) *MockFlowgraphManager_GetChannelsJSON_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Return(_a0 string) *MockFlowgraphManager_GetChannelsJSON_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlowgraphManager_GetChannelsJSON_Call) RunAndReturn(run func() string) *MockFlowgraphManager_GetChannelsJSON_Call { + _c.Call.Return(run) + return _c +} + // GetCollectionIDs provides a mock function with given fields: func (_m *MockFlowgraphManager) GetCollectionIDs() []int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetCollectionIDs") + } + var r0 []int64 if rf, ok := ret.Get(0).(func() []int64); ok { r0 = rf() @@ -161,6 +210,10 @@ func (_c *MockFlowgraphManager_GetCollectionIDs_Call) RunAndReturn(run func() [] func (_m *MockFlowgraphManager) GetFlowgraphCount() int { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetFlowgraphCount") + } + var r0 int if rf, ok := ret.Get(0).(func() int); ok { r0 = rf() @@ -202,6 +255,10 @@ func (_c *MockFlowgraphManager_GetFlowgraphCount_Call) RunAndReturn(run func() i func (_m *MockFlowgraphManager) GetFlowgraphService(channel string) (*DataSyncService, bool) { ret := _m.Called(channel) + if len(ret) == 0 { + panic("no return value specified for GetFlowgraphService") + } + var r0 *DataSyncService var r1 bool if rf, ok := ret.Get(0).(func(string) (*DataSyncService, bool)); ok { @@ -252,10 +309,59 @@ func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(s return _c } +// GetSegmentsJSON provides a mock function with given fields: +func (_m *MockFlowgraphManager) GetSegmentsJSON() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetSegmentsJSON") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockFlowgraphManager_GetSegmentsJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentsJSON' +type MockFlowgraphManager_GetSegmentsJSON_Call struct { + *mock.Call +} + +// GetSegmentsJSON is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) GetSegmentsJSON() *MockFlowgraphManager_GetSegmentsJSON_Call { + return &MockFlowgraphManager_GetSegmentsJSON_Call{Call: _e.mock.On("GetSegmentsJSON")} +} + +func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Run(run func()) *MockFlowgraphManager_GetSegmentsJSON_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Return(_a0 string) *MockFlowgraphManager_GetSegmentsJSON_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) RunAndReturn(run func() string) *MockFlowgraphManager_GetSegmentsJSON_Call { + _c.Call.Return(run) + return _c +} + // HasFlowgraph provides a mock function with given fields: channel func (_m *MockFlowgraphManager) HasFlowgraph(channel string) bool { ret := _m.Called(channel) + if len(ret) == 0 { + panic("no return value specified for HasFlowgraph") + } + var r0 bool if rf, ok := ret.Get(0).(func(string) bool); ok { r0 = rf(channel) @@ -298,6 +404,10 @@ func (_c *MockFlowgraphManager_HasFlowgraph_Call) RunAndReturn(run func(string) func (_m *MockFlowgraphManager) HasFlowgraphWithOpID(channel string, opID int64) bool { ret := _m.Called(channel, opID) + if len(ret) == 0 { + panic("no return value specified for HasFlowgraphWithOpID") + } + var r0 bool if rf, ok := ret.Get(0).(func(string, int64) bool); ok { r0 = rf(channel, opID) diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index 86ae63df67a86..9ebcbaae38291 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -422,10 +422,11 @@ func (t *SyncTask) MarshalJSON() ([]byte, error) { SegmentID: t.segmentID, BatchRows: t.batchRows, SegmentLevel: t.level.String(), - TsFrom: t.tsFrom, - TsTo: t.tsTo, + TSFrom: t.tsFrom, + TSTo: t.tsTo, DeltaRowCount: t.deltaRowCount, FlushSize: t.flushedSize, - RunningTime: t.execTime, + RunningTime: t.execTime.String(), + NodeID: paramtable.GetNodeID(), }) } diff --git a/internal/flushcommon/util/timetick_sender.go b/internal/flushcommon/util/timetick_sender.go index 25e889b5a2e4c..d18e4644338c0 100644 --- a/internal/flushcommon/util/timetick_sender.go +++ b/internal/flushcommon/util/timetick_sender.go @@ -36,6 +36,7 @@ import ( type StatsUpdater interface { Update(channel string, ts typeutil.Timestamp, stats []*commonpb.SegmentStats) + GetLatestTimestamp(channel string) typeutil.Timestamp } // TimeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically @@ -126,6 +127,17 @@ func (m *TimeTickSender) Update(channelName string, timestamp uint64, segStats [ m.statsCache[channelName].lastTs = timestamp } +func (m *TimeTickSender) GetLatestTimestamp(channel string) typeutil.Timestamp { + m.mu.RLock() + defer m.mu.RUnlock() + chStats, ok := m.statsCache[channel] + if !ok { + log.Warn("channel not found in TimeTickSender", zap.String("channel", channel)) + return 0 + } + return chStats.lastTs +} + func (m *TimeTickSender) assembleDatanodeTtMsg() ([]*msgpb.DataNodeTtMsg, map[string]uint64) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/internal/http/router.go b/internal/http/router.go index 4d6acec3f7adc..2859704f480d8 100644 --- a/internal/http/router.go +++ b/internal/http/router.go @@ -65,19 +65,46 @@ const ( // for WebUI restful api root path const ( - ClusterInfoPath = "/_cluster/info" - ClusterConfigsPath = "/_cluster/configs" - ClusterClientsPath = "/_cluster/clients" + // ClusterInfoPath is the path to get cluster information. + ClusterInfoPath = "/_cluster/info" + // ClusterConfigsPath is the path to get cluster configurations. + ClusterConfigsPath = "/_cluster/configs" + // ClusterClientsPath is the path to get connected clients. + ClusterClientsPath = "/_cluster/clients" + // ClusterDependenciesPath is the path to get cluster dependencies. ClusterDependenciesPath = "/_cluster/dependencies" - HookConfigsPath = "/_hook/configs" - QCoordSegmentsPath = "/_qcoord/segments" - QCoordChannelsPath = "/_qcoord/channels" - QCoordAllTasksPath = "/_qcoord/tasks/all" + // HookConfigsPath is the path to get hook configurations. + HookConfigsPath = "/_hook/configs" - DCoordAllTasksPath = "/_dcoord/tasks/all" - DCoordImportTasksPath = "/_dcoord/tasks/import" - DCoordCompactionTasksPath = "/_dcoord/tasks/compaction" - DCoordBuildIndexTasksPath = "/_dcoord/tasks/build_index" + // QCDistPath is the path to get QueryCoord distribution. + QCDistPath = "/_qc/dist" + // QCTargetPath is the path to get QueryCoord target. + QCTargetPath = "/_qc/target" + // QCReplicaPath is the path to get QueryCoord replica. + QCReplicaPath = "/_qc/replica" + // QCResourceGroupPath is the path to get QueryCoord resource group. + QCResourceGroupPath = "/_qc/resource_group" + // QCAllTasksPath is the path to get all tasks in QueryCoord. + QCAllTasksPath = "/_qc/tasks" - DNodeSyncTasksPath = "/_dnode/tasks/sync" + // QNSegmentsPath is the path to get segments in QueryNode. + QNSegmentsPath = "/_qn/segments" + // QNChannelsPath is the path to get channels in QueryNode. + QNChannelsPath = "/_qn/channels" + + // DCDistPath is the path to get all segments and channels distribution in DataCoord. + DCDistPath = "/_dc/dist" + // DCImportTasksPath is the path to get import tasks in DataCoord. + DCImportTasksPath = "/_dc/tasks/import" + // DCCompactionTasksPath is the path to get compaction tasks in DataCoord. + DCCompactionTasksPath = "/_dc/tasks/compaction" + // DCBuildIndexTasksPath is the path to get build index tasks in DataCoord. + DCBuildIndexTasksPath = "/_dc/tasks/build_index" + + // DNSyncTasksPath is the path to get sync tasks in DataNode. + DNSyncTasksPath = "/_dn/tasks/sync" + // DNSegmentsPath is the path to get segments in DataNode. + DNSegmentsPath = "/_dn/segments" + // DNChannelsPath is the path to get channels in DataNode. + DNChannelsPath = "/_dn/channels" ) diff --git a/internal/http/webui/static/js/mockdata.js b/internal/http/webui/static/js/mockdata.js index f5246d0b72372..1f728c1041eea 100644 --- a/internal/http/webui/static/js/mockdata.js +++ b/internal/http/webui/static/js/mockdata.js @@ -412,6 +412,179 @@ var mconfigs = ` } `; +var qcTargets = ` +[ + { + "collection_id": 1, + "segments": [ + { + "segment_id": 1, + "collection_id": 1, + "partition_id": 1, + "channel": "channel1", + "num_of_rows": 1000, + "state": "Sealed", + "is_importing": false, + "compacted": false, + "level": "L0", + "is_sorted": true, + "node_id": 1, + "is_invisible": false, + "loaded_timestamp": 1633072800, + "index": [ + { + "field_id": 1, + "index_id": 1, + "build_id": 1, + "index_size": 1024, + "is_loaded": true + } + ], + "resource_group": "rg1", + "loaded_insert_row_count": 1000, + "mem_size": 2048, + } + ], + "dm_channels": [ + { + "node_id": 1, + "version": 1, + "collection_id": 1, + "channel_name": "channel1", + "unflushed_segment_ids": [1], + "flushed_segment_ids": [2], + "dropped_segment_ids": [3], + "level_zero_segment_ids": [4], + "partition_stats_versions": { + "1": 1 + } + } + ] + } +] +` + +var qcDist =` +{ + "segments": [ + { + "segment_id": 1, + "collection_id": 1, + "partition_id": 1, + "channel": "channel1", + "num_of_rows": 1000, + "state": "Sealed", + "is_importing": false, + "compacted": false, + "level": "L0", + "is_sorted": true, + "node_id": 1, + "is_invisible": false, + "loaded_timestamp": 1633072800, + "index": [ + { + "field_id": 1, + "index_id": 1, + "build_id": 1, + "index_size": 1024, + "is_loaded": true + } + ], + "resource_group": "rg1", + "loaded_insert_row_count": 1000, + "mem_size": 2048, + } + ], + "dm_channels": [ + { + "node_id": 1, + "version": 1, + "collection_id": 1, + "channel_name": "channel1", + "unflushed_segment_ids": [1], + "flushed_segment_ids": [2], + "dropped_segment_ids": [3], + "level_zero_segment_ids": [4], + "partition_stats_versions": { + "1": 1 + } + } + ], + "leader_views": [ + { + "node_id": 1, + "collection_id": 1, + "channel_name": "channel1", + "segments": [ + { + "segment_id": 1, + "partition_id": 1, + "num_of_rows": 1000, + "state": "Sealed", + "is_importing": false, + "compacted": false, + "level": "L0", + "is_sorted": true, + "node_id": 1, + "is_invisible": false, + "loaded_timestamp": 1633072800, + "index": [ + { + "field_id": 1, + "index_id": 1, + "build_id": 1, + "index_size": 1024, + "is_loaded": true + } + ], + "resource_group": "rg1", + "loaded_insert_row_count": 1000, + "mem_size": 2048, + } + ] + } + ] +} +` + +var qcReplica = ` +[ + { + "ID": 1, + "CollectionID": 1, + "RWNodes": [1, 2], + "ResourceGroup": "rg1", + "RONodes": [3], + "ChannelToRWNodes": { + "channel1": [1, 2] + } + }, + { + "ID": 2, + "CollectionID": 2, + "RWNodes": [4, 5], + "ResourceGroup": "rg2", + "RONodes": [6], + "ChannelToRWNodes": { + "channel2": [4, 5] + } + } +] +` + +var qcResourceGroup = ` +[ + { + "Name": "rg1", + "Nodes": [1, 2] + }, + { + "Name": "rg2", + "Nodes": [3, 4] + } +] +` + var qcTasks = ` [ { @@ -456,6 +629,119 @@ var qcTasks = ` ] ` +var qn_segments = ` +[ + { + "segment_id": 1, + "collection_id": 1, + "partition_id": 1, + "channel": "channel1", + "num_of_rows": 1000, + "state": "Sealed", + "is_importing": false, + "compacted": false, + "level": "L1", + "is_sorted": true, + "node_id": 1, + "is_invisible": false, + "loaded_timestamp": 1620000000, + "index": [ + { + "field_id": 1, + "index_id": 1, + "build_id": 1, + "index_size": 1024, + "is_loaded": true + } + ], + "resource_group": "rg1", + "loaded_insert_row_count": 1000, + "mem_size": 2048, + }, + { + "segment_id": 2, + "collection_id": 2, + "partition_id": 2, + "channel": "channel2", + "num_of_rows": 2000, + "state": "Sealed", + "is_importing": false, + "compacted": false, + "level": "L2", + "is_sorted": true, + "node_id": 2, + "is_invisible": false, + "loaded_timestamp": 1620000001, + "index": [ + { + "field_id": 2, + "index_id": 2, + "build_id": 2, + "index_size": 2048, + "is_loaded": true + } + ], + "resource_group": "rg2", + "loaded_insert_row_count": 2000, + "mem_size": 4096, + } +] +` + +var qn_channels = ` +[ + { + "name": "channel1", + "watched_status": "Healthy", + "assign_state": "assigned", + "latest_time_tick": "2023-10-01 12:00:00", + "node_id": 1, + "collection_id": 1, + }, + { + "name": "channel2", + "watched_status": "Healthy", + "assign_state": "assigned", + "latest_time_tick": "2023-10-01 12:05:00", + "node_id": 2, + "collection_id": 2, + } +] +` + +var dc_dist = ` +[ + { + "segment_id": 1, + "collection_id": 1, + "partition_id": 1, + "channel": "channel1", + "num_of_rows": 1000, + "state": "active", + "is_importing": false, + "compacted": false, + "level": "L1", + "is_sorted": true, + "node_id": 1, + "mem_size": 2048, + }, + { + "segment_id": 2, + "collection_id": 2, + "partition_id": 2, + "channel": "channel2", + "num_of_rows": 2000, + "state": "inactive", + "is_importing": true, + "compacted": true, + "level": "L2", + "is_sorted": false, + "node_id": 2, + "mem_size": 4096, + } +] +` + var dc_build_index_task = ` [ { @@ -485,30 +771,31 @@ var dc_compaction_task = ` [ { "plan_id": 1, - "collection_id": 1001, + "collection_id": 1, "type": "Merge", "state": "Completed", - "start_time": 1633036800, - "end_time": 1633040400, - "total_rows": 100000, + "fail_reason": "", + "start_time": 1620000000, + "end_time": 1620003600, + "total_rows": 10000, "input_segments": [1, 2, 3], "result_segments": [4] }, { "plan_id": 2, - "collection_id": 1002, + "collection_id": 2, "type": "Merge", "state": "Failed", "fail_reason": "Disk full", - "start_time": 1633123200, - "end_time": 1633126800, - "total_rows": 200000, + "start_time": 1620007200, + "end_time": 1620010800, + "total_rows": 20000, "input_segments": [5, 6, 7], - "result_segments": [8] + "result_segments": [] } ]` -var dc_sync_task = ` +var dn_sync_task = ` [ { "segment_id": 1, @@ -518,7 +805,8 @@ var dc_sync_task = ` "ts_to": 1633040400, "delta_row_count": 10, "flush_size": 1024, - "running_time": 100000000 + "running_time": "100000000", + "node_id": 1 }, { "segment_id": 2, @@ -528,7 +816,8 @@ var dc_sync_task = ` "ts_to": 1633126800, "delta_row_count": 20, "flush_size": 2048, - "running_time": 200000000 + "running_time": "200000000", + "node_id": 2 } ] ` @@ -569,4 +858,64 @@ var dc_import_task = ` "complete_time": "2023-10-01T01:00:00Z" } ] +` + +var dn_segments = ` +[ + { + "segment_id": 1, + "collection_id": 1, + "partition_id": 1, + "channel": "channel1", + "num_of_rows": 1000, + "state": "active", + "is_importing": false, + "compacted": false, + "level": "L1", + "is_sorted": true, + "node_id": 1, + "flushed_rows": 1000, + "sync_buffer_rows": 0, + "syncing_rows": 0 + }, + { + "segment_id": 2, + "collection_id": 2, + "partition_id": 2, + "channel": "channel2", + "num_of_rows": 2000, + "state": "inactive", + "is_importing": true, + "compacted": true, + "level": "L2", + "is_sorted": false, + "node_id": 2, + "flushed_rows": 2000, + "sync_buffer_rows": 100, + "syncing_rows": 50 + } +] +` + +var dn_channels = ` +[ + { + "name": "channel1", + "watched_status": "Healthy", + "assign_state": "assigned", + "latest_time_tick": "2023-10-01 12:00:00", + "node_id": 1, + "collection_id": 1, + "check_point_ts": "2023-10-01 12:00:00" + }, + { + "name": "channel2", + "watched_status": "Healthy", + "assign_state": "assigned", + "latest_time_tick": "2023-10-01 12:05:00", + "node_id": 2, + "collection_id": 2, + "check_point_ts": "2023-10-01 12:05:00" + } +] ` \ No newline at end of file diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index a1632b736d021..f5949e1fd8e2a 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -6503,26 +6503,34 @@ func DeregisterSubLabel(subLabel string) { // RegisterRestRouter registers the router for the proxy func (node *Proxy) RegisterRestRouter(router gin.IRouter) { - // Cluster request + // Cluster request that executed by proxy router.GET(http.ClusterInfoPath, getClusterInfo(node)) router.GET(http.ClusterConfigsPath, getConfigs(paramtable.Get().GetAll())) router.GET(http.ClusterClientsPath, getConnectedClients) router.GET(http.ClusterDependenciesPath, getDependencies) - // Hook request + // Hook request that executed by proxy router.GET(http.HookConfigsPath, getConfigs(paramtable.GetHookParams().GetAll())) - // QueryCoord request - router.GET(http.QCoordSegmentsPath, getQueryComponentMetrics(node, metricsinfo.QuerySegmentDist)) - router.GET(http.QCoordChannelsPath, getQueryComponentMetrics(node, metricsinfo.QueryChannelDist)) - router.GET(http.QCoordAllTasksPath, getQueryComponentMetrics(node, metricsinfo.QueryCoordAllTasks)) - - // DataCoord request - router.GET(http.DCoordAllTasksPath, getDataComponentMetrics(node, metricsinfo.DataCoordAllTasks)) - router.GET(http.DCoordCompactionTasksPath, getDataComponentMetrics(node, metricsinfo.CompactionTasks)) - router.GET(http.DCoordImportTasksPath, getDataComponentMetrics(node, metricsinfo.ImportTasks)) - router.GET(http.DCoordBuildIndexTasksPath, getDataComponentMetrics(node, metricsinfo.BuildIndexTasks)) - - // Datanode request - router.GET(http.DNodeSyncTasksPath, getDataComponentMetrics(node, metricsinfo.SyncTasks)) + // QueryCoord requests that are forwarded from proxy + router.GET(http.QCTargetPath, getQueryComponentMetrics(node, metricsinfo.QueryTarget)) + router.GET(http.QCDistPath, getQueryComponentMetrics(node, metricsinfo.QueryDist)) + router.GET(http.QCReplicaPath, getQueryComponentMetrics(node, metricsinfo.QueryReplicas)) + router.GET(http.QCResourceGroupPath, getQueryComponentMetrics(node, metricsinfo.QueryResourceGroups)) + router.GET(http.QCAllTasksPath, getQueryComponentMetrics(node, metricsinfo.QueryCoordAllTasks)) + + // QueryNode requests that are forwarded from querycoord + router.GET(http.QNSegmentsPath, getQueryComponentMetrics(node, metricsinfo.QuerySegments)) + router.GET(http.QNChannelsPath, getQueryComponentMetrics(node, metricsinfo.QueryChannels)) + + // DataCoord requests that are forwarded from proxy + router.GET(http.DCDistPath, getDataComponentMetrics(node, metricsinfo.DataDist)) + router.GET(http.DCCompactionTasksPath, getDataComponentMetrics(node, metricsinfo.CompactionTasks)) + router.GET(http.DCImportTasksPath, getDataComponentMetrics(node, metricsinfo.ImportTasks)) + router.GET(http.DCBuildIndexTasksPath, getDataComponentMetrics(node, metricsinfo.BuildIndexTasks)) + + // Datanode requests that are forwarded from datacoord + router.GET(http.DNSyncTasksPath, getDataComponentMetrics(node, metricsinfo.SyncTasks)) + router.GET(http.DNSegmentsPath, getDataComponentMetrics(node, metricsinfo.DataSegments)) + router.GET(http.DNChannelsPath, getDataComponentMetrics(node, metricsinfo.DataChannels)) } diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index 492841be742f6..4171482f8f06b 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -1880,14 +1880,14 @@ func TestRegisterRestRouter(t *testing.T) { path string statusCode int }{ - {path: mhttp.QCoordSegmentsPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.QCoordChannelsPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.QCoordAllTasksPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.DNodeSyncTasksPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.DCoordCompactionTasksPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.DCoordImportTasksPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.DCoordBuildIndexTasksPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.DNodeSyncTasksPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.QCTargetPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.QCDistPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.QCAllTasksPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.DNSyncTasksPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.DCCompactionTasksPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.DCImportTasksPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.DCBuildIndexTasksPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.DNSyncTasksPath, statusCode: http.StatusInternalServerError}, } for _, tt := range tests { diff --git a/internal/proxy/rootcoord_mock_test.go b/internal/proxy/rootcoord_mock_test.go index b67246ffd006f..bc8d502f95853 100644 --- a/internal/proxy/rootcoord_mock_test.go +++ b/internal/proxy/rootcoord_mock_test.go @@ -101,7 +101,7 @@ type RootCoordMock struct { // TODO(dragondriver): segment-related - // TODO(dragondriver): TimeTick-related + // TODO(dragondriver): LatestTimeTick-related lastTs typeutil.Timestamp lastTsMtx sync.Mutex diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 7814e214daa7b..e62a642de4c85 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -18,12 +18,14 @@ package querycoordv2 import ( "context" + "encoding/json" "sync" "time" "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -34,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -242,6 +245,51 @@ func (s *Server) balanceChannels(ctx context.Context, return nil } +func getMetrics[T any](s *Server, ctx context.Context, req *milvuspb.GetMetricsRequest) ([]T, error) { + var metrics []T + var mu sync.Mutex + errorGroup, ctx := errgroup.WithContext(ctx) + + for _, node := range s.nodeMgr.GetAll() { + errorGroup.Go(func() error { + resp, err := s.cluster.GetMetrics(ctx, node.ID(), req) + if err := merr.CheckRPCCall(resp, err); err != nil { + log.Warn("failed to get metric from QueryNode", zap.Int64("nodeID", node.ID())) + return err + } + + if resp.Response == "" { + return nil + } + + infos := make([]T, 0) + err = json.Unmarshal([]byte(resp.Response), &infos) + if err != nil { + log.Warn("invalid metrics of query node was found", zap.Error(err)) + return err + } + + mu.Lock() + metrics = append(metrics, infos...) + mu.Unlock() + return nil + }) + } + err := errorGroup.Wait() + + return metrics, err +} + +func (s *Server) getChannelsFromQueryNode(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { + channels, err := getMetrics[*metricsinfo.Channel](s, ctx, req) + return metricsinfo.MarshalGetMetricsValues(channels, err) +} + +func (s *Server) getSegmentsFromQueryNode(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { + segments, err := getMetrics[*metricsinfo.Segment](s, ctx, req) + return metricsinfo.MarshalGetMetricsValues(segments, err) +} + // TODO(dragondriver): add more detail metrics func (s *Server) getSystemInfoMetrics( ctx context.Context, diff --git a/internal/querycoordv2/handlers_test.go b/internal/querycoordv2/handlers_test.go new file mode 100644 index 0000000000000..d2b2595ac2a57 --- /dev/null +++ b/internal/querycoordv2/handlers_test.go @@ -0,0 +1,108 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package querycoordv2 + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" +) + +func TestGetChannelsFromQueryNode(t *testing.T) { + mockCluster := session.NewMockCluster(t) + nodeManager := session.NewNodeManager() + nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1})) + server := &Server{cluster: mockCluster, nodeMgr: nodeManager} + req := &milvuspb.GetMetricsRequest{} + expectedChannels := []*metricsinfo.Channel{ + { + Name: "channel1", + WatchState: "Healthy", + LatestTimeTick: "1", + NodeID: int64(1), + CollectionID: int64(100), + }, + { + Name: "channel2", + WatchState: "Healthy", + LatestTimeTick: "2", + NodeID: int64(2), + CollectionID: int64(200), + }, + } + resp := &milvuspb.GetMetricsResponse{ + Response: func() string { + data, _ := json.Marshal(expectedChannels) + return string(data) + }(), + } + mockCluster.EXPECT().GetMetrics(mock.Anything, mock.Anything, req).Return(resp, nil) + result, err := server.getChannelsFromQueryNode(context.Background(), req) + assert.NoError(t, err) + + var actualChannels []*metricsinfo.Channel + err = json.Unmarshal([]byte(result), &actualChannels) + assert.NoError(t, err) + assert.Equal(t, expectedChannels, actualChannels) +} + +func TestGetSegmentsFromQueryNode(t *testing.T) { + mockCluster := session.NewMockCluster(t) + nodeManager := session.NewNodeManager() + nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1})) + server := &Server{cluster: mockCluster, nodeMgr: nodeManager} + expectedSegments := []*metricsinfo.Segment{ + { + SegmentID: 1, + PartitionID: 1, + Channel: "channel1", + ResourceGroup: "default", + MemSize: int64(1024), + LoadedInsertRowCount: 100, + }, + { + SegmentID: 2, + PartitionID: 1, + Channel: "channel2", + ResourceGroup: "default", + MemSize: int64(1024), + LoadedInsertRowCount: 200, + }, + } + resp := &milvuspb.GetMetricsResponse{ + Response: func() string { + data, _ := json.Marshal(expectedSegments) + return string(data) + }(), + } + req := &milvuspb.GetMetricsRequest{} + mockCluster.EXPECT().GetMetrics(mock.Anything, mock.Anything, req).Return(resp, nil) + + result, err := server.getSegmentsFromQueryNode(context.Background(), req) + assert.NoError(t, err) + + var actualSegments []*metricsinfo.Segment + err = json.Unmarshal([]byte(result), &actualSegments) + assert.NoError(t, err) + assert.Equal(t, expectedSegments, actualSegments) +} diff --git a/internal/querycoordv2/meta/channel_dist_manager.go b/internal/querycoordv2/meta/channel_dist_manager.go index 9ecd29d06efe5..db3dc8720100a 100644 --- a/internal/querycoordv2/meta/channel_dist_manager.go +++ b/internal/querycoordv2/meta/channel_dist_manager.go @@ -23,6 +23,8 @@ import ( "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -130,6 +132,13 @@ func (channel *DmChannel) Clone() *DmChannel { } } +func newDmChannelMetricsFrom(channel *DmChannel) *metricsinfo.DmChannel { + dmChannel := metrics.NewDMChannelFrom(channel.VchannelInfo) + dmChannel.NodeID = channel.Node + dmChannel.Version = channel.Version + return dmChannel +} + type nodeChannels struct { channels []*DmChannel // collection id => channels @@ -290,3 +299,16 @@ func (m *ChannelDistManager) updateCollectionIndex() { } } } + +func (m *ChannelDistManager) GetChannelDist() []*metricsinfo.DmChannel { + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() + + var channels []*metricsinfo.DmChannel + for _, nodeChannels := range m.channels { + for _, channel := range nodeChannels.channels { + channels = append(channels, newDmChannelMetricsFrom(channel)) + } + } + return channels +} diff --git a/internal/querycoordv2/meta/channel_dist_manager_test.go b/internal/querycoordv2/meta/channel_dist_manager_test.go index 4960aae25adeb..18ffe9dccb29f 100644 --- a/internal/querycoordv2/meta/channel_dist_manager_test.go +++ b/internal/querycoordv2/meta/channel_dist_manager_test.go @@ -19,6 +19,7 @@ package meta import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -185,3 +186,32 @@ func (suite *ChannelDistManagerSuite) AssertCollection(channels []*DmChannel, co func TestChannelDistManager(t *testing.T) { suite.Run(t, new(ChannelDistManagerSuite)) } + +func TestGetChannelDistJSON(t *testing.T) { + manager := NewChannelDistManager() + channel1 := DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: 100, + ChannelName: "channel-1", + }) + channel1.Node = 1 + channel1.Version = 1 + + channel2 := DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: 200, + ChannelName: "channel-2", + }) + channel2.Node = 2 + channel2.Version = 1 + + manager.Update(1, channel1) + manager.Update(2, channel2) + + channels := manager.GetChannelDist() + assert.Equal(t, int64(1), channels[0].NodeID) + assert.Equal(t, "channel-1", channels[0].ChannelName) + assert.Equal(t, int64(100), channels[0].CollectionID) + + assert.Equal(t, int64(2), channels[1].NodeID) + assert.Equal(t, "channel-2", channels[1].ChannelName) + assert.Equal(t, int64(200), channels[1].CollectionID) +} diff --git a/internal/querycoordv2/meta/dist_manager.go b/internal/querycoordv2/meta/dist_manager.go index 39aca9551abdd..512a926da30be 100644 --- a/internal/querycoordv2/meta/dist_manager.go +++ b/internal/querycoordv2/meta/dist_manager.go @@ -16,6 +16,15 @@ package meta +import ( + "encoding/json" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" +) + type DistributionManager struct { *SegmentDistManager *ChannelDistManager @@ -29,3 +38,30 @@ func NewDistributionManager() *DistributionManager { LeaderViewManager: NewLeaderViewManager(), } } + +// GetDistributionJSON returns a JSON representation of the current distribution state. +// It includes segments, DM channels, and leader views. +// If there are no segments, channels, or leader views, it returns an empty string. +// In case of an error during JSON marshaling, it returns the error. +func (dm *DistributionManager) GetDistributionJSON() string { + segments := dm.GetSegmentDist() + channels := dm.GetChannelDist() + leaderView := dm.GetLeaderView() + + if len(segments) == 0 && len(channels) == 0 && len(leaderView) == 0 { + return "" + } + + dist := &metricsinfo.QueryCoordCollectionDistribution{ + Segments: segments, + DMChannels: channels, + LeaderViews: leaderView, + } + + v, err := json.Marshal(dist) + if err != nil { + log.Warn("failed to marshal dist", zap.Error(err)) + return "" + } + return string(v) +} diff --git a/internal/querycoordv2/meta/dist_manager_test.go b/internal/querycoordv2/meta/dist_manager_test.go new file mode 100644 index 0000000000000..8c6f45bf98287 --- /dev/null +++ b/internal/querycoordv2/meta/dist_manager_test.go @@ -0,0 +1,100 @@ +package meta + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" +) + +func TestGetDistributionJSON(t *testing.T) { + // Initialize DistributionManager + manager := NewDistributionManager() + + // Add some segments to the SegmentDistManager + segment1 := SegmentFromInfo(&datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "channel-1", + NumOfRows: 1000, + State: commonpb.SegmentState_Flushed, + }) + segment1.Node = 1 + segment1.Version = 1 + + segment2 := SegmentFromInfo(&datapb.SegmentInfo{ + ID: 2, + CollectionID: 200, + PartitionID: 20, + InsertChannel: "channel-2", + NumOfRows: 2000, + State: commonpb.SegmentState_Flushed, + }) + segment2.Node = 2 + segment2.Version = 1 + + manager.SegmentDistManager.Update(1, segment1) + manager.SegmentDistManager.Update(2, segment2) + + // Add some channels to the ChannelDistManager + channel1 := DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: 100, + ChannelName: "channel-1", + }) + channel1.Node = 1 + channel1.Version = 1 + + channel2 := DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: 200, + ChannelName: "channel-2", + }) + channel2.Node = 2 + channel2.Version = 1 + + manager.ChannelDistManager.Update(1, channel1) + manager.ChannelDistManager.Update(2, channel2) + + // Add some leader views to the LeaderViewManager + leaderView1 := &LeaderView{ + ID: 1, + CollectionID: 100, + Channel: "channel-1", + Version: 1, + Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1}}, + } + + leaderView2 := &LeaderView{ + ID: 2, + CollectionID: 200, + Channel: "channel-2", + Version: 1, + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + } + + manager.LeaderViewManager.Update(1, leaderView1) + manager.LeaderViewManager.Update(2, leaderView2) + + // Call GetDistributionJSON + jsonOutput := manager.GetDistributionJSON() + + // Verify JSON output + var dist metricsinfo.QueryCoordCollectionDistribution + err := json.Unmarshal([]byte(jsonOutput), &dist) + assert.NoError(t, err) + assert.Len(t, dist.Segments, 2) + assert.Len(t, dist.DMChannels, 2) + assert.Len(t, dist.LeaderViews, 2) + + assert.Equal(t, int64(1), dist.Segments[0].SegmentID) + assert.Equal(t, int64(2), dist.Segments[1].SegmentID) + assert.Equal(t, "channel-1", dist.DMChannels[0].ChannelName) + assert.Equal(t, "channel-2", dist.DMChannels[1].ChannelName) + assert.Equal(t, int64(1), dist.LeaderViews[0].LeaderID) + assert.Equal(t, int64(2), dist.LeaderViews[1].LeaderID) +} diff --git a/internal/querycoordv2/meta/leader_view_manager.go b/internal/querycoordv2/meta/leader_view_manager.go index 963e115b69506..8e06ccc80ed05 100644 --- a/internal/querycoordv2/meta/leader_view_manager.go +++ b/internal/querycoordv2/meta/leader_view_manager.go @@ -22,6 +22,7 @@ import ( "github.com/samber/lo" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -308,3 +309,46 @@ func (mgr *LeaderViewManager) GetLatestShardLeaderByFilter(filters ...LeaderView return v1.Version > v2.Version }) } + +// GetLeaderView returns a slice of LeaderView objects, each representing the state of a leader node. +// It traverses the views map, converts each LeaderView to a metricsinfo.LeaderView, and collects them into a slice. +// The method locks the views map for reading to ensure thread safety. +func (mgr *LeaderViewManager) GetLeaderView() []*metricsinfo.LeaderView { + mgr.rwmutex.RLock() + defer mgr.rwmutex.RUnlock() + + var leaderViews []*metricsinfo.LeaderView + for _, nodeViews := range mgr.views { + for _, lv := range nodeViews.views { + errString := "" + if lv.UnServiceableError != nil { + errString = lv.UnServiceableError.Error() + } + leaderView := &metricsinfo.LeaderView{ + LeaderID: lv.ID, + CollectionID: lv.CollectionID, + Channel: lv.Channel, + Version: lv.Version, + SealedSegments: make([]*metricsinfo.Segment, 0, len(lv.Segments)), + GrowingSegments: make([]*metricsinfo.Segment, 0, len(lv.GrowingSegments)), + TargetVersion: lv.TargetVersion, + NumOfGrowingRows: lv.NumOfGrowingRows, + UnServiceableError: errString, + } + + for segID, seg := range lv.Segments { + leaderView.SealedSegments = append(leaderView.SealedSegments, &metricsinfo.Segment{ + SegmentID: segID, + NodeID: seg.NodeID, + }) + } + + for _, seg := range lv.GrowingSegments { + leaderView.GrowingSegments = append(leaderView.GrowingSegments, newSegmentMetricsFrom(seg)) + } + + leaderViews = append(leaderViews, leaderView) + } + } + return leaderViews +} diff --git a/internal/querycoordv2/meta/leader_view_manager_test.go b/internal/querycoordv2/meta/leader_view_manager_test.go index b25e245e20946..56374ecd23ea2 100644 --- a/internal/querycoordv2/meta/leader_view_manager_test.go +++ b/internal/querycoordv2/meta/leader_view_manager_test.go @@ -17,13 +17,19 @@ package meta import ( + "encoding/json" "testing" "github.com/cockroachdb/errors" "github.com/samber/lo" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "golang.org/x/exp/slices" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -317,3 +323,69 @@ func (suite *LeaderViewManagerSuite) TestNotifyDelegatorChanges() { func TestLeaderViewManager(t *testing.T) { suite.Run(t, new(LeaderViewManagerSuite)) } + +func TestGetLeaderView(t *testing.T) { + manager := NewLeaderViewManager() + leaderView1 := &LeaderView{ + ID: 1, + CollectionID: 100, + Channel: "channel-1", + Version: 1, + Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1}}, + GrowingSegments: map[int64]*Segment{ + 1: {SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 100, PartitionID: 10, InsertChannel: "channel-1", NumOfRows: 1000, State: commonpb.SegmentState_Growing}, Node: 1}, + }, + TargetVersion: 1, + NumOfGrowingRows: 1000, + UnServiceableError: nil, + } + + leaderView2 := &LeaderView{ + ID: 2, + CollectionID: 200, + Channel: "channel-2", + Version: 1, + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + GrowingSegments: map[int64]*Segment{ + 2: {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 200, PartitionID: 20, InsertChannel: "channel-2", NumOfRows: 2000, State: commonpb.SegmentState_Growing}, Node: 2}, + }, + TargetVersion: 1, + NumOfGrowingRows: 2000, + UnServiceableError: nil, + } + + manager.Update(1, leaderView1) + manager.Update(2, leaderView2) + + // Call GetLeaderView + leaderViews := manager.GetLeaderView() + jsonOutput, err := json.Marshal(leaderViews) + assert.NoError(t, err) + + var result []*metricsinfo.LeaderView + err = json.Unmarshal(jsonOutput, &result) + assert.NoError(t, err) + assert.Len(t, result, 2) + + // sort slice for verify results + slices.SortFunc(leaderViews, func(a, b *metricsinfo.LeaderView) int { + return int(a.LeaderID - b.LeaderID) + }) + assert.Equal(t, int64(1), result[0].LeaderID) + assert.Equal(t, int64(100), result[0].CollectionID) + assert.Equal(t, "channel-1", result[0].Channel) + assert.Equal(t, int64(1), result[0].Version) + assert.Len(t, result[0].SealedSegments, 1) + assert.Len(t, result[0].GrowingSegments, 1) + assert.Equal(t, int64(1), result[0].SealedSegments[0].SegmentID) + assert.Equal(t, int64(1), result[0].GrowingSegments[0].SegmentID) + + assert.Equal(t, int64(2), result[1].LeaderID) + assert.Equal(t, int64(200), result[1].CollectionID) + assert.Equal(t, "channel-2", result[1].Channel) + assert.Equal(t, int64(1), result[1].Version) + assert.Len(t, result[1].SealedSegments, 1) + assert.Len(t, result[1].GrowingSegments, 1) + assert.Equal(t, int64(2), result[1].SealedSegments[0].SegmentID) + assert.Equal(t, int64(2), result[1].GrowingSegments[0].SegmentID) +} diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go index b396afb33f2af..9968d495fe3ab 100644 --- a/internal/querycoordv2/meta/mock_target_manager.go +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -565,6 +565,52 @@ func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) RunAndReturn(run return _c } +// GetTargetJSON provides a mock function with given fields: scope +func (_m *MockTargetManager) GetTargetJSON(scope int32) string { + ret := _m.Called(scope) + + if len(ret) == 0 { + panic("no return value specified for GetTargetJSON") + } + + var r0 string + if rf, ok := ret.Get(0).(func(int32) string); ok { + r0 = rf(scope) + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockTargetManager_GetTargetJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTargetJSON' +type MockTargetManager_GetTargetJSON_Call struct { + *mock.Call +} + +// GetTargetJSON is a helper method to define mock.On call +// - scope int32 +func (_e *MockTargetManager_Expecter) GetTargetJSON(scope interface{}) *MockTargetManager_GetTargetJSON_Call { + return &MockTargetManager_GetTargetJSON_Call{Call: _e.mock.On("GetTargetJSON", scope)} +} + +func (_c *MockTargetManager_GetTargetJSON_Call) Run(run func(scope int32)) *MockTargetManager_GetTargetJSON_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetTargetJSON_Call) Return(_a0 string) *MockTargetManager_GetTargetJSON_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_GetTargetJSON_Call) RunAndReturn(run func(int32) string) *MockTargetManager_GetTargetJSON_Call { + _c.Call.Return(run) + return _c +} + // IsCurrentTargetExist provides a mock function with given fields: collectionID, partitionID func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool { ret := _m.Called(collectionID, partitionID) diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index f59bc39cf3ed1..436f24b5f7249 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -17,6 +17,7 @@ package meta import ( + "encoding/json" "fmt" "sync" @@ -28,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -466,3 +468,33 @@ func (m *ReplicaManager) GetResourceGroupByCollection(collection typeutil.Unique ret := typeutil.NewSet(lo.Map(replicas, func(r *Replica, _ int) string { return r.GetResourceGroup() })...) return ret } + +// GetReplicasJSON returns a JSON representation of all replicas managed by the ReplicaManager. +// It locks the ReplicaManager for reading, converts the replicas to their protobuf representation, +// marshals them into a JSON string, and returns the result. +// If an error occurs during marshaling, it logs a warning and returns an empty string. +func (m *ReplicaManager) GetReplicasJSON() string { + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() + + replicas := lo.MapToSlice(m.replicas, func(i typeutil.UniqueID, r *Replica) *metricsinfo.Replica { + channelTowRWNodes := make(map[string][]int64) + for k, v := range r.replicaPB.GetChannelNodeInfos() { + channelTowRWNodes[k] = v.GetRwNodes() + } + return &metricsinfo.Replica{ + ID: r.GetID(), + CollectionID: r.GetCollectionID(), + RWNodes: r.GetNodes(), + ResourceGroup: r.GetResourceGroup(), + RONodes: r.GetRONodes(), + ChannelToRWNodes: channelTowRWNodes, + } + }) + ret, err := json.Marshal(replicas) + if err != nil { + log.Warn("failed to marshal replicas", zap.Error(err)) + return "" + } + return string(ret) +} diff --git a/internal/querycoordv2/meta/replica_manager_test.go b/internal/querycoordv2/meta/replica_manager_test.go index 55d3c471a12c6..80f3abc3fe802 100644 --- a/internal/querycoordv2/meta/replica_manager_test.go +++ b/internal/querycoordv2/meta/replica_manager_test.go @@ -17,9 +17,12 @@ package meta import ( + "encoding/json" "testing" "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "google.golang.org/protobuf/proto" @@ -27,10 +30,12 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" + "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/querypb" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -494,3 +499,43 @@ func TestReplicaManager(t *testing.T) { suite.Run(t, new(ReplicaManagerSuite)) suite.Run(t, new(ReplicaManagerV2Suite)) } + +func TestGetReplicasJSON(t *testing.T) { + catalog := mocks.NewQueryCoordCatalog(t) + catalog.EXPECT().SaveReplica(mock.Anything).Return(nil) + idAllocator := RandomIncrementIDAllocator() + replicaManager := NewReplicaManager(idAllocator, catalog) + + // Add some replicas to the ReplicaManager + replica1 := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 100, + ResourceGroup: "rg1", + Nodes: []int64{1, 2, 3}, + }) + replica2 := newReplica(&querypb.Replica{ + ID: 2, + CollectionID: 200, + ResourceGroup: "rg2", + Nodes: []int64{4, 5, 6}, + }) + + err := replicaManager.put(replica1) + err = replicaManager.put(replica2) + assert.NoError(t, err) + + jsonOutput := replicaManager.GetReplicasJSON() + var replicas []*metricsinfo.Replica + err = json.Unmarshal([]byte(jsonOutput), &replicas) + assert.NoError(t, err) + assert.Len(t, replicas, 2) + + assert.Equal(t, int64(1), replicas[0].ID) + assert.Equal(t, int64(2), replicas[1].ID) + assert.Equal(t, int64(100), replicas[0].CollectionID) + assert.Equal(t, int64(200), replicas[1].CollectionID) + assert.Equal(t, "rg1", replicas[0].ResourceGroup) + assert.Equal(t, "rg2", replicas[1].ResourceGroup) + assert.ElementsMatch(t, []int64{1, 2, 3}, replicas[0].RWNodes) + assert.ElementsMatch(t, []int64{4, 5, 6}, replicas[1].RWNodes) +} diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index 060f287bc1689..63440f37f30dc 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -17,6 +17,7 @@ package meta import ( + "encoding/json" "fmt" "sync" @@ -31,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -918,3 +920,23 @@ func (rm *ResourceManager) validateResourceGroupIsDeletable(rgName string) error } return nil } + +func (rm *ResourceManager) GetResourceGroupsJSON() string { + rm.rwmutex.RLock() + defer rm.rwmutex.RUnlock() + + rgs := lo.MapToSlice(rm.groups, func(i string, r *ResourceGroup) *metricsinfo.ResourceGroup { + return &metricsinfo.ResourceGroup{ + Name: r.GetName(), + Nodes: r.GetNodes(), + Cfg: r.GetConfig(), + } + }) + ret, err := json.Marshal(rgs) + if err != nil { + log.Error("failed to marshal resource groups", zap.Error(err)) + return "" + } + + return string(ret) +} diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index ca58a8e899e23..657b5ef61958a 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -16,8 +16,10 @@ package meta import ( + "encoding/json" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -30,7 +32,9 @@ import ( "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type ResourceManagerSuite struct { @@ -619,3 +623,24 @@ func (suite *ResourceManagerSuite) TestUnassignFail() { suite.manager.HandleNodeDown(1) }) } + +func TestGetResourceGroupsJSON(t *testing.T) { + manager := &ResourceManager{groups: make(map[string]*ResourceGroup)} + rg1 := NewResourceGroup("rg1", newResourceGroupConfig(0, 10)) + rg1.nodes = typeutil.NewUniqueSet(1, 2) + rg2 := NewResourceGroup("rg2", newResourceGroupConfig(0, 20)) + rg2.nodes = typeutil.NewUniqueSet(3, 4) + manager.groups["rg1"] = rg1 + manager.groups["rg2"] = rg2 + + jsonOutput := manager.GetResourceGroupsJSON() + var resourceGroups []*metricsinfo.ResourceGroup + err := json.Unmarshal([]byte(jsonOutput), &resourceGroups) + assert.NoError(t, err) + assert.Len(t, resourceGroups, 2) + + assert.Equal(t, "rg1", resourceGroups[0].Name) + assert.ElementsMatch(t, []int64{1, 2}, resourceGroups[0].Nodes) + assert.Equal(t, "rg2", resourceGroups[1].Name) + assert.ElementsMatch(t, []int64{3, 4}, resourceGroups[1].Nodes) +} diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index 51d38fc0fcafe..81b41523b1bce 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -19,11 +19,13 @@ package meta import ( "sync" + "github.com/golang/protobuf/proto" "github.com/samber/lo" - "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -130,6 +132,21 @@ func SegmentFromInfo(info *datapb.SegmentInfo) *Segment { } } +func newSegmentMetricsFrom(segment *Segment) *metricsinfo.Segment { + convertedSegment := metrics.NewSegmentFrom(segment.SegmentInfo) + convertedSegment.NodeID = segment.Node + convertedSegment.LoadedTimestamp = segment.Version + convertedSegment.Index = lo.Map(lo.Values(segment.IndexInfo), func(e *querypb.FieldIndexInfo, i int) *metricsinfo.SegmentIndex { + return &metricsinfo.SegmentIndex{ + IndexFieldID: e.FieldID, + IndexID: e.IndexID, + BuildID: e.BuildID, + IndexSize: e.IndexSize, + } + }) + return convertedSegment +} + func (segment *Segment) Clone() *Segment { return &Segment{ SegmentInfo: proto.Clone(segment.SegmentInfo).(*datapb.SegmentInfo), @@ -227,3 +244,17 @@ func (m *SegmentDistManager) GetByFilter(filters ...SegmentDistFilter) []*Segmen } return ret } + +func (m *SegmentDistManager) GetSegmentDist() []*metricsinfo.Segment { + m.rwmutex.RLock() + m.rwmutex.RUnlock() + + var segments []*metricsinfo.Segment + for _, nodeSeg := range m.segments { + for _, segment := range nodeSeg.segments { + segments = append(segments, newSegmentMetricsFrom(segment)) + } + } + + return segments +} diff --git a/internal/querycoordv2/meta/segment_dist_manager_test.go b/internal/querycoordv2/meta/segment_dist_manager_test.go index 79d5340ba0b21..274e0c1f14331 100644 --- a/internal/querycoordv2/meta/segment_dist_manager_test.go +++ b/internal/querycoordv2/meta/segment_dist_manager_test.go @@ -19,8 +19,10 @@ package meta import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" ) @@ -188,3 +190,53 @@ func (suite *SegmentDistManagerSuite) AssertShard(segments []*Segment, shard str func TestSegmentDistManager(t *testing.T) { suite.Run(t, new(SegmentDistManagerSuite)) } + +func TestGetSegmentDistJSON(t *testing.T) { + // Initialize SegmentDistManager + manager := NewSegmentDistManager() + + // Add some segments to the SegmentDistManager + segment1 := SegmentFromInfo(&datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "channel-1", + NumOfRows: 1000, + State: commonpb.SegmentState_Flushed, + }) + segment1.Node = 1 + segment1.Version = 1 + + segment2 := SegmentFromInfo(&datapb.SegmentInfo{ + ID: 2, + CollectionID: 200, + PartitionID: 20, + InsertChannel: "channel-2", + NumOfRows: 2000, + State: commonpb.SegmentState_Flushed, + }) + segment2.Node = 2 + segment2.Version = 1 + + manager.Update(1, segment1) + manager.Update(2, segment2) + + segments := manager.GetSegmentDist() + assert.Equal(t, int64(1), segments[0].SegmentID) + assert.Equal(t, int64(100), segments[0].CollectionID) + assert.Equal(t, int64(10), segments[0].PartitionID) + assert.Equal(t, "channel-1", segments[0].Channel) + assert.Equal(t, int64(1000), segments[0].NumOfRows) + assert.Equal(t, "Flushed", segments[0].State) + assert.Equal(t, int64(1), segments[0].NodeID) + assert.Equal(t, int64(1), segments[0].LoadedTimestamp) + + assert.Equal(t, int64(2), segments[1].SegmentID) + assert.Equal(t, int64(200), segments[1].CollectionID) + assert.Equal(t, int64(20), segments[1].PartitionID) + assert.Equal(t, "channel-2", segments[1].Channel) + assert.Equal(t, int64(2000), segments[1].NumOfRows) + assert.Equal(t, "Flushed", segments[1].State) + assert.Equal(t, int64(2), segments[1].NodeID) + assert.Equal(t, int64(1), segments[1].LoadedTimestamp) +} diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index f8fcd896942cb..5755621b1e807 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -23,6 +23,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -183,3 +185,21 @@ func (t *target) removeCollectionTarget(collectionID int64) { func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget { return t.collectionTargetMap[collectionID] } + +func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordCollectionTarget { + return lo.MapToSlice(t.collectionTargetMap, func(k int64, v *CollectionTarget) *metricsinfo.QueryCoordCollectionTarget { + segments := lo.MapToSlice(v.GetAllSegments(), func(k int64, s *datapb.SegmentInfo) *metricsinfo.Segment { + return metrics.NewSegmentFrom(s) + }) + + dmChannels := lo.MapToSlice(v.GetAllDmChannels(), func(k string, ch *DmChannel) *metricsinfo.DmChannel { + return metrics.NewDMChannelFrom(ch.VchannelInfo) + }) + + return &metricsinfo.QueryCoordCollectionTarget{ + CollectionID: k, + Segments: segments, + DMChannels: dmChannels, + } + }) +} diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 6f05d4c96e004..fd2ef164892a1 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -18,6 +18,7 @@ package meta import ( "context" + "encoding/json" "fmt" "runtime" "sync" @@ -68,6 +69,7 @@ type TargetManagerInterface interface { SaveCurrentTarget(catalog metastore.QueryCoordCatalog) Recover(catalog metastore.QueryCoordCatalog) error CanSegmentBeMoved(collectionID, segmentID int64) bool + GetTargetJSON(scope TargetScope) string } type TargetManager struct { @@ -632,3 +634,28 @@ func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool return false } + +func (mgr *TargetManager) GetTargetJSON(scope TargetScope) string { + mgr.rwMutex.RLock() + defer mgr.rwMutex.RUnlock() + + ret := mgr.getTarget(scope) + if ret == nil { + return "" + } + + v, err := json.Marshal(ret.toQueryCoordCollectionTargets()) + if err != nil { + log.Warn("failed to marshal target", zap.Error(err)) + return "" + } + return string(v) +} + +func (mgr *TargetManager) getTarget(scope TargetScope) *target { + if scope == CurrentTarget { + return mgr.current + } + + return mgr.next +} diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 960e8697db648..d0e2c1af23146 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -199,8 +199,41 @@ func (s *Server) registerMetricsRequest() { return s.taskScheduler.GetTasksJSON(), nil } + QueryDistAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.targetMgr.GetTargetJSON(meta.CurrentTarget), nil + } + + QueryTargetAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.dist.GetDistributionJSON(), nil + } + + QueryReplicasAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.meta.GetReplicasJSON(), nil + } + + QueryResourceGroupsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.meta.GetResourceGroupsJSON(), nil + } + + QuerySegmentsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.getSegmentsFromQueryNode(ctx, req) + } + + QueryChannelsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.getChannelsFromQueryNode(ctx, req) + } + + // register actions that requests are processed in querycoord s.metricsRequest.RegisterMetricsRequest(metricsinfo.SystemInfoMetrics, getSystemInfoAction) s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryCoordAllTasks, QueryTasksAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryDist, QueryDistAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryTarget, QueryTargetAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryReplicas, QueryReplicasAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryResourceGroups, QueryResourceGroupsAction) + + // register actions that requests are processed in querynode + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments, QuerySegmentsAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryChannels, QueryChannelsAction) log.Info("register metrics actions finished") } diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 44fae7020e06b..8ac7d8130a2b8 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -18,14 +18,17 @@ package querynodev2 import ( "context" + "encoding/json" "fmt" "github.com/samber/lo" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/querynodev2/collector" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/metricsinfo" @@ -170,6 +173,54 @@ func getCollectionMetrics(node *QueryNode) (*metricsinfo.QueryNodeCollectionMetr return ret, nil } +// getChannelJSON returns the JSON string of channels +func getChannelJSON(node *QueryNode) string { + stats := node.pipelineManager.GetChannelStats() + ret, err := json.Marshal(stats) + if err != nil { + log.Warn("failed to marshal channels", zap.Error(err)) + return "" + } + return string(ret) +} + +// getSegmentJSON returns the JSON string of segments +func getSegmentJSON(node *QueryNode) string { + allSegments := node.manager.Segment.GetBy() + var ms []*metricsinfo.Segment + for _, s := range allSegments { + indexes := make([]*metricsinfo.SegmentIndex, 0, len(s.Indexes())) + for _, index := range s.Indexes() { + indexes = append(indexes, &metricsinfo.SegmentIndex{ + IndexFieldID: index.IndexInfo.FieldID, + IndexID: index.IndexInfo.IndexID, + IndexSize: index.IndexInfo.IndexSize, + BuildID: index.IndexInfo.BuildID, + IsLoaded: index.IsLoaded, + }) + } + + ms = append(ms, &metricsinfo.Segment{ + SegmentID: s.ID(), + CollectionID: s.Collection(), + PartitionID: s.Partition(), + MemSize: s.MemSize(), + Index: indexes, + State: s.Type().String(), + ResourceGroup: s.ResourceGroup(), + InsertRowCount: s.InsertCount(), + NodeID: node.GetNodeID(), + }) + } + + ret, err := json.Marshal(ms) + if err != nil { + log.Warn("failed to marshal segments", zap.Error(err)) + return "" + } + return string(ret) +} + // getSystemInfoMetrics returns metrics info of QueryNode func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, node *QueryNode) (string, error) { usedMem := hardware.GetUsedMemoryCount() diff --git a/internal/querynodev2/metrics_info_test.go b/internal/querynodev2/metrics_info_test.go new file mode 100644 index 0000000000000..d7dbe9981f657 --- /dev/null +++ b/internal/querynodev2/metrics_info_test.go @@ -0,0 +1,130 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querynodev2 + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querynodev2/delegator" + "github.com/milvus-io/milvus/internal/querynodev2/pipeline" + "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/querynodev2/tsafe" + "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +func TestGetPipelineJSON(t *testing.T) { + paramtable.Init() + + ch := "ch" + tSafeManager := tsafe.NewTSafeReplica() + tSafeManager.Add(context.Background(), ch, 0) + delegators := typeutil.NewConcurrentMap[string, delegator.ShardDelegator]() + d := delegator.NewMockShardDelegator(t) + delegators.Insert(ch, d) + msgDispatcher := msgdispatcher.NewMockClient(t) + + collectionManager := segments.NewMockCollectionManager(t) + segmentManager := segments.NewMockSegmentManager(t) + collectionManager.EXPECT().Get(mock.Anything).Return(&segments.Collection{}) + manager := &segments.Manager{ + Collection: collectionManager, + Segment: segmentManager, + } + + pipelineManager := pipeline.NewManager(manager, tSafeManager, msgDispatcher, delegators) + + _, err := pipelineManager.Add(1, ch) + assert.NoError(t, err) + assert.Equal(t, 1, pipelineManager.Num()) + + stats := pipelineManager.GetChannelStats() + expectedStats := []*metricsinfo.Channel{ + { + Name: ch, + WatchState: "Healthy", + LatestTimeTick: "0", + }, + } + assert.Equal(t, expectedStats, stats) + + JSONStr := getChannelJSON(&QueryNode{pipelineManager: pipelineManager}) + assert.NotEmpty(t, JSONStr) + + var actualStats []*metricsinfo.Channel + err = json.Unmarshal([]byte(JSONStr), &actualStats) + assert.NoError(t, err) + assert.Equal(t, expectedStats, actualStats) +} + +func TestGetSegmentJSON(t *testing.T) { + segment := segments.NewMockSegment(t) + segment.EXPECT().ID().Return(int64(1)) + segment.EXPECT().Collection().Return(int64(1001)) + segment.EXPECT().Partition().Return(int64(2001)) + segment.EXPECT().MemSize().Return(int64(1024)) + segment.EXPECT().Indexes().Return([]*segments.IndexedFieldInfo{ + { + IndexInfo: &querypb.FieldIndexInfo{ + FieldID: 1, + IndexID: 101, + IndexSize: 512, + BuildID: 10001, + }, + IsLoaded: true, + }, + }) + segment.EXPECT().Type().Return(segments.SegmentTypeGrowing) + segment.EXPECT().ResourceGroup().Return("default") + segment.EXPECT().InsertCount().Return(int64(100)) + + node := &QueryNode{} + mockedSegmentManager := segments.NewMockSegmentManager(t) + mockedSegmentManager.EXPECT().GetBy().Return([]segments.Segment{segment}) + node.manager = &segments.Manager{Segment: mockedSegmentManager} + + jsonStr := getSegmentJSON(node) + assert.NotEmpty(t, jsonStr) + + var segments []*metricsinfo.Segment + err := json.Unmarshal([]byte(jsonStr), &segments) + assert.NoError(t, err) + assert.NotNil(t, segments) + assert.Equal(t, 1, len(segments)) + assert.Equal(t, int64(1), segments[0].SegmentID) + assert.Equal(t, int64(1001), segments[0].CollectionID) + assert.Equal(t, int64(2001), segments[0].PartitionID) + assert.Equal(t, int64(1024), segments[0].MemSize) + assert.Equal(t, 1, len(segments[0].Index)) + assert.Equal(t, int64(1), segments[0].Index[0].IndexFieldID) + assert.Equal(t, int64(101), segments[0].Index[0].IndexID) + assert.Equal(t, int64(512), segments[0].Index[0].IndexSize) + assert.Equal(t, int64(10001), segments[0].Index[0].BuildID) + assert.True(t, segments[0].Index[0].IsLoaded) + assert.Equal(t, "Growing", segments[0].State) + assert.Equal(t, "default", segments[0].ResourceGroup) + assert.Equal(t, int64(100), segments[0].LoadedInsertRowCount) + assert.Equal(t, node.GetNodeID(), segments[0].NodeID) +} diff --git a/internal/querynodev2/pipeline/manager.go b/internal/querynodev2/pipeline/manager.go index 453c9638430f7..44bedc0b6c48d 100644 --- a/internal/querynodev2/pipeline/manager.go +++ b/internal/querynodev2/pipeline/manager.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -40,6 +41,7 @@ type Manager interface { Remove(channels ...string) Start(channels ...string) error Close() + GetChannelStats() []*metricsinfo.Channel } type manager struct { @@ -155,6 +157,28 @@ func (m *manager) Close() { } } +func (m *manager) GetChannelStats() []*metricsinfo.Channel { + m.mu.RLock() + defer m.mu.RUnlock() + + ret := make([]*metricsinfo.Channel, 0, len(m.channel2Pipeline)) + for ch, p := range m.channel2Pipeline { + tt, err := m.tSafeManager.Get(ch) + if err != nil { + log.Warn("get tSafe failed", zap.String("channel", ch), zap.Error(err)) + } + ret = append(ret, &metricsinfo.Channel{ + Name: ch, + WatchState: p.Status(), + AssignSate: "Assigned", + LatestTimeTick: typeutil.TimestampToString(tt), + NodeID: paramtable.GetNodeID(), + CollectionID: p.GetCollectionID(), + }) + } + return ret +} + func NewManager(dataManager *DataManager, tSafeManager TSafeManager, dispatcher msgdispatcher.Client, diff --git a/internal/querynodev2/pipeline/pipeline.go b/internal/querynodev2/pipeline/pipeline.go index fb8d72f4d9901..17e1ec56500d4 100644 --- a/internal/querynodev2/pipeline/pipeline.go +++ b/internal/querynodev2/pipeline/pipeline.go @@ -26,6 +26,7 @@ import ( // pipeline used for querynode type Pipeline interface { base.StreamPipeline + GetCollectionID() UniqueID } type pipeline struct { @@ -35,6 +36,10 @@ type pipeline struct { embeddingNode embeddingNode } +func (p *pipeline) GetCollectionID() UniqueID { + return p.collectionID +} + func (p *pipeline) Close() { p.StreamPipeline.Close() } diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index c24b59bf66fa8..6e6407df4e534 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -283,6 +283,16 @@ func (node *QueryNode) registerMetricsRequest() { func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return getSystemInfoMetrics(ctx, req, node) }) + + node.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return getSegmentJSON(node), nil + }) + + node.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryChannels, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return getChannelJSON(node), nil + }) log.Info("register metrics actions finished") } diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index 6b67e0a32d11f..91bfeb7989b8c 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/cockroachdb/errors" "go.uber.org/atomic" @@ -123,6 +124,16 @@ func (fg *TimeTickedFlowGraph) Close() { }) } +// Status returns the status of the pipeline, it will return "Healthy" if the input node +// has received any msg in the last nodeTtInterval +func (fg *TimeTickedFlowGraph) Status() string { + diff := time.Since(fg.nodeCtxManager.lastAccessTime.Load()) + if diff > nodeCtxTtInterval { + return fmt.Sprintf("input node hasn't received any msg in the last %s", diff.String()) + } + return "Healthy" +} + // NewTimeTickedFlowGraph create timetick flowgraph func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph { flowGraph := TimeTickedFlowGraph{ diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index 1c1685efcd27c..40589c3f4c2c0 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" @@ -59,14 +60,17 @@ type nodeCtxManager struct { closeWg *sync.WaitGroup closeOnce sync.Once closeCh chan struct{} // notify nodes to exit + + lastAccessTime *atomic.Time } // NewNodeCtxManager init with the inputNode and fg.closeWg func NewNodeCtxManager(nodeCtx *nodeCtx, closeWg *sync.WaitGroup) *nodeCtxManager { return &nodeCtxManager{ - inputNodeCtx: nodeCtx, - closeWg: closeWg, - closeCh: make(chan struct{}), + inputNodeCtx: nodeCtx, + closeWg: closeWg, + closeCh: make(chan struct{}), + lastAccessTime: atomic.NewTime(time.Now()), } } @@ -119,6 +123,7 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() { continue } + nodeCtxManager.lastAccessTime.Store(time.Now()) output = n.Operate(input) curNode.blockMutex.RUnlock() // the output decide whether the node should be closed. diff --git a/internal/util/metrics/utils.go b/internal/util/metrics/utils.go index 4c5b1fc230f61..8168fb78de113 100644 --- a/internal/util/metrics/utils.go +++ b/internal/util/metrics/utils.go @@ -1,46 +1,34 @@ package metrics import ( - "github.com/samber/lo" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" ) -func PruneFieldIndexInfo(f *querypb.FieldIndexInfo) *querypb.FieldIndexInfo { - return &querypb.FieldIndexInfo{ - FieldID: f.FieldID, - IndexID: f.IndexID, - BuildID: f.BuildID, - IndexSize: f.IndexSize, - NumRows: f.NumRows, - } -} - -func PruneSegmentInfo(s *datapb.SegmentInfo) *datapb.SegmentInfo { - return &datapb.SegmentInfo{ - ID: s.ID, - NumOfRows: s.NumOfRows, - State: s.State, - Compacted: s.Compacted, - Level: s.Level, +func NewSegmentFrom(segment *datapb.SegmentInfo) *metricsinfo.Segment { + return &metricsinfo.Segment{ + SegmentID: segment.GetID(), + CollectionID: segment.GetCollectionID(), + PartitionID: segment.GetPartitionID(), + Channel: segment.GetInsertChannel(), + NumOfRows: segment.GetNumOfRows(), + State: segment.GetState().String(), + IsImporting: segment.GetIsImporting(), + Compacted: segment.GetCompacted(), + Level: segment.GetLevel().String(), + IsSorted: segment.GetIsSorted(), + IsInvisible: segment.GetIsInvisible(), } } -func PruneVChannelInfo(channel *datapb.VchannelInfo) *datapb.VchannelInfo { - return &datapb.VchannelInfo{ - ChannelName: channel.ChannelName, - UnflushedSegments: lo.Map(channel.UnflushedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo { - return PruneSegmentInfo(s) - }), - FlushedSegments: lo.Map(channel.FlushedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo { - return PruneSegmentInfo(s) - }), - DroppedSegments: lo.Map(channel.DroppedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo { - return PruneSegmentInfo(s) - }), - IndexedSegments: lo.Map(channel.IndexedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo { - return PruneSegmentInfo(s) - }), +func NewDMChannelFrom(channel *datapb.VchannelInfo) *metricsinfo.DmChannel { + return &metricsinfo.DmChannel{ + CollectionID: channel.GetCollectionID(), + ChannelName: channel.GetChannelName(), + UnflushedSegmentIds: channel.GetUnflushedSegmentIds(), + FlushedSegmentIds: channel.GetFlushedSegmentIds(), + DroppedSegmentIds: channel.GetDroppedSegmentIds(), + LevelZeroSegmentIds: channel.GetLevelZeroSegmentIds(), + PartitionStatsVersions: channel.GetPartitionStatsVersions(), } } diff --git a/internal/util/pipeline/stream_pipeline.go b/internal/util/pipeline/stream_pipeline.go index 2765e11492605..c18dd196517e6 100644 --- a/internal/util/pipeline/stream_pipeline.go +++ b/internal/util/pipeline/stream_pipeline.go @@ -18,9 +18,11 @@ package pipeline import ( "context" + "fmt" "sync" "time" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -39,6 +41,7 @@ import ( type StreamPipeline interface { Pipeline ConsumeMsgStream(position *msgpb.MsgPosition) error + Status() string } type streamPipeline struct { @@ -52,6 +55,8 @@ type streamPipeline struct { closeCh chan struct{} // notify work to exit closeWg sync.WaitGroup closeOnce sync.Once + + lastAccessTime *atomic.Time } func (p *streamPipeline) work() { @@ -62,6 +67,7 @@ func (p *streamPipeline) work() { log.Debug("stream pipeline input closed") return case msg := <-p.input: + p.lastAccessTime.Store(time.Now()) log.RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs))) p.pipeline.inputChannel <- msg p.pipeline.process() @@ -69,6 +75,16 @@ func (p *streamPipeline) work() { } } +// Status returns the status of the pipeline, it will return "Healthy" if the input node +// has received any msg in the last nodeTtInterval +func (p *streamPipeline) Status() string { + diff := time.Since(p.lastAccessTime.Load()) + if diff > p.pipeline.nodeTtInterval { + return fmt.Sprintf("input node hasn't received any msg in the last %s", diff.String()) + } + return "Healthy" +} + func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error { var err error if position == nil { @@ -150,10 +166,11 @@ func NewPipelineWithStream(dispatcher msgdispatcher.Client, nodeTtInterval time. nodeTtInterval: nodeTtInterval, enableTtChecker: enableTtChecker, }, - dispatcher: dispatcher, - vChannel: vChannel, - closeCh: make(chan struct{}), - closeWg: sync.WaitGroup{}, + dispatcher: dispatcher, + vChannel: vChannel, + closeCh: make(chan struct{}), + closeWg: sync.WaitGroup{}, + lastAccessTime: atomic.NewTime(time.Now()), } return pipeline diff --git a/pkg/util/metricsinfo/metric_request.go b/pkg/util/metricsinfo/metric_request.go index 07dd4a6dec034..63504ab6717a6 100644 --- a/pkg/util/metricsinfo/metric_request.go +++ b/pkg/util/metricsinfo/metric_request.go @@ -45,10 +45,16 @@ const ( MetricRequestParamsSeparator = "," // QuerySegmentDist request for segment distribution on the query node - QuerySegmentDist = "qc_segment_dist" + QuerySegments = "qn_segments" // QueryChannelDist request for channel distribution on the query node - QueryChannelDist = "qc_channel_dist" + QueryChannels = "qn_channels" + + // QueryDist request for segment/channel/leader view distribution on querycoord + QueryDist = "qc_dist" + + // QueryTarget request for segment/channel target on the querycoord + QueryTarget = "qc_target" // QueryCoordAllTasks request for get tasks on the querycoord QueryCoordAllTasks = "qc_tasks_all" @@ -59,8 +65,8 @@ const ( // QueryResourceGroups request for get resource groups on the querycoord QueryResourceGroups = "qc_resource_group" - // DataCoordAllTasks request for get tasks on the datacoord - DataCoordAllTasks = "dc_tasks_all" + // DataDist request for get segments on the datacoord + DataDist = "dc_segments" // ImportTasks request for get import tasks from the datacoord ImportTasks = "dc_import_tasks" @@ -74,6 +80,12 @@ const ( // SyncTasks request for get sync tasks from the datanode SyncTasks = "dn_sync_tasks" + // DataSegments request for get segments from the datanode + DataSegments = "dn_segments" + + // DataChannels request for get channels from the datanode + DataChannels = "dn_channels" + // MetricRequestParamVerboseKey as a request parameter decide to whether return verbose value MetricRequestParamVerboseKey = "verbose" ) diff --git a/pkg/util/metricsinfo/metrics_info.go b/pkg/util/metricsinfo/metrics_info.go index a1bb87d2f872f..43aa6e873282f 100644 --- a/pkg/util/metricsinfo/metrics_info.go +++ b/pkg/util/metricsinfo/metrics_info.go @@ -13,8 +13,8 @@ package metricsinfo import ( "encoding/json" - "time" + "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -70,6 +70,103 @@ const ( MilvusUsedGoVersion = "MILVUS_USED_GO_VERSION" ) +type DmChannel struct { + NodeID int64 `json:"node_id,omitempty"` + Version int64 `json:"version,omitempty"` + CollectionID int64 `json:"collection_id,omitempty"` + ChannelName string `json:"channel_name,omitempty"` + UnflushedSegmentIds []int64 `json:"unflushed_segment_ids,omitempty"` + FlushedSegmentIds []int64 `json:"flushed_segment_ids,omitempty"` + DroppedSegmentIds []int64 `json:"dropped_segment_ids,omitempty"` + LevelZeroSegmentIds []int64 `json:"level_zero_segment_ids,omitempty"` + PartitionStatsVersions map[int64]int64 `json:"partition_stats_versions,omitempty"` +} + +type Segment struct { + SegmentID int64 `json:"segment_id,omitempty"` + CollectionID int64 `json:"collection_id,omitempty"` + PartitionID int64 `json:"partition_id,omitempty"` + Channel string `json:"channel,omitempty"` + NumOfRows int64 `json:"num_of_rows,omitempty"` + State string `json:"state,omitempty"` + IsImporting bool `json:"is_importing,omitempty"` + Compacted bool `json:"compacted,omitempty"` + Level string `json:"level,omitempty"` + IsSorted bool `json:"is_sorted,omitempty"` + NodeID int64 `json:"node_id,omitempty"` + + // load related + IsInvisible bool `json:"is_invisible,omitempty"` + LoadedTimestamp int64 `json:"loaded_timestamp,omitempty"` + Index []*SegmentIndex `json:"index,omitempty"` + ResourceGroup string `json:"resource_group,omitempty"` + LoadedInsertRowCount int64 `json:"loaded_insert_row_count,omitempty"` // inert row count for growing segment that excludes the deleted row count in QueryNode + MemSize int64 `json:"mem_size,omitempty"` // memory size of segment in QueryNode + + // flush related + FlushedRows int64 `json:"flushed_rows,omitempty"` + SyncBufferRows int64 `json:"sync_buffer_rows,omitempty"` + SyncingRows int64 `json:"syncing_rows,omitempty"` + // TODO add checkpoints +} + +type SegmentIndex struct { + IndexFieldID int64 `json:"field_id,omitempty"` + IndexID int64 `json:"index_id,omitempty"` + BuildID int64 `json:"build_id,omitempty"` + IndexSize int64 `json:"index_size,omitempty"` + IsLoaded bool `json:"is_loaded,omitempty"` +} + +type QueryCoordCollectionTarget struct { + CollectionID int64 `json:"collection_id,omitempty"` + Segments []*Segment `json:"segments,omitempty"` + DMChannels []*DmChannel `json:"dm_channels,omitempty"` +} + +type LeaderView struct { + LeaderID int64 `json:"leader_id"` + CollectionID int64 `json:"collection_id"` + Channel string `json:"channel"` + Version int64 `json:"version"` + SealedSegments []*Segment `json:"sealed_segments"` + GrowingSegments []*Segment `json:"growing_segments"` + TargetVersion int64 `json:"target_version"` + NumOfGrowingRows int64 `json:"num_of_growing_rows"` + UnServiceableError string `json:"unserviceable_error"` +} + +type QueryCoordCollectionDistribution struct { + Segments []*Segment `json:"segments,omitempty"` + DMChannels []*DmChannel `json:"dm_channels,omitempty"` + LeaderViews []*LeaderView `json:"leader_views,omitempty"` +} + +type ResourceGroup struct { + Name string `json:"name,omitempty"` + Nodes []int64 `json:"nodes,omitempty"` + Cfg *rgpb.ResourceGroupConfig `json:"cfg,omitempty"` +} + +type Replica struct { + ID int64 `json:"ID,omitempty"` + CollectionID int64 `json:"collectionID,omitempty"` + RWNodes []int64 `json:"rw_nodes,omitempty"` + ResourceGroup string `json:"resource_group,omitempty"` + RONodes []int64 `json:"ro_nodes,omitempty"` + ChannelToRWNodes map[string][]int64 `json:"channel_to_rw_nodes,omitempty"` +} + +type Channel struct { + Name string `json:"name,omitempty"` + WatchState string `json:"watched_status,omitempty"` + AssignSate string `json:"assign_state,omitempty"` + LatestTimeTick string `json:"latest_time_tick,omitempty"` // a time string that indicates the latest time tick of the channel is received + NodeID int64 `json:"node_id,omitempty"` + CollectionID int64 `json:"collection_id,omitempty"` + CheckpointTS string `json:"check_point_ts,omitempty"` // a time string, format like "2006-01-02 15:04:05" +} + // DeployMetrics records the deploy information of nodes. type DeployMetrics struct { SystemVersion string `json:"system_version"` @@ -167,11 +264,12 @@ type SyncTask struct { SegmentID int64 `json:"segment_id,omitempty"` BatchRows int64 `json:"batch_rows,omitempty"` SegmentLevel string `json:"segment_level,omitempty"` - TsFrom typeutil.Timestamp `json:"ts_from,omitempty"` - TsTo typeutil.Timestamp `json:"ts_to,omitempty"` + TSFrom typeutil.Timestamp `json:"ts_from,omitempty"` + TSTo typeutil.Timestamp `json:"ts_to,omitempty"` DeltaRowCount int64 `json:"delta_row_count,omitempty"` FlushSize int64 `json:"flush_size,omitempty"` - RunningTime time.Duration `json:"running_time,omitempty"` + RunningTime string `json:"running_time,omitempty"` + NodeID int64 `json:"node_id,omitempty"` } // DataNodeInfos implements ComponentInfos diff --git a/pkg/util/metricsinfo/utils.go b/pkg/util/metricsinfo/utils.go index cd038c9b28236..72e98378b110a 100644 --- a/pkg/util/metricsinfo/utils.go +++ b/pkg/util/metricsinfo/utils.go @@ -12,6 +12,7 @@ package metricsinfo import ( + "encoding/json" "os" ) @@ -23,3 +24,19 @@ func FillDeployMetricsWithEnv(m *DeployMetrics) { m.UsedGoVersion = os.Getenv(MilvusUsedGoVersion) m.BuildTime = os.Getenv(MilvusBuildTimeEnvKey) } + +func MarshalGetMetricsValues[T any](metrics []T, err error) (string, error) { + if err != nil { + return "", err + } + + if metrics == nil || len(metrics) == 0 { + return "", nil + } + + bs, err := json.Marshal(metrics) + if err != nil { + return "", err + } + return string(bs), nil +} diff --git a/pkg/util/typeutil/time.go b/pkg/util/typeutil/time.go index 565dba034e1b9..86ac06da88a7e 100644 --- a/pkg/util/typeutil/time.go +++ b/pkg/util/typeutil/time.go @@ -45,3 +45,8 @@ func ParseTimestamp(data []byte) (time.Time, error) { func SubTimeByWallClock(after, before time.Time) time.Duration { return time.Duration(after.UnixNano() - before.UnixNano()) } + +func TimestampToString(ts uint64) string { + ut := time.Unix(int64(ts), 0) + return ut.Format(time.DateTime) +}