diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 01a9a0162552d..5ff3fe8cb1edb 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -53,6 +54,8 @@ type ChannelManager interface { GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string GetChannelsByCollectionID(collectionID int64) []RWChannel GetChannelNamesByCollectionID(collectionID int64) []string + + GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo } // An interface sessionManager implments @@ -730,6 +733,22 @@ func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error { return nil } +func (m *ChannelManagerImpl) GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo { + m.mu.RLock() + defer m.mu.RUnlock() + infos := make(map[int64]map[string]*datapb.ChannelWatchInfo) + for _, nc := range m.store.GetNodesChannels() { + for _, ch := range nc.Channels { + watchInfo := proto.Clone(ch.GetWatchInfo()).(*datapb.ChannelWatchInfo) + if _, ok := infos[nc.NodeID]; !ok { + infos[nc.NodeID] = make(map[string]*datapb.ChannelWatchInfo) + } + infos[nc.NodeID][watchInfo.Vchan.ChannelName] = watchInfo + } + } + return infos +} + func inferStateByOpType(opType ChannelOpType) datapb.ChannelWatchState { switch opType { case Watch: diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 919fbd1831061..450a260754eef 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -805,3 +805,51 @@ func (s *ChannelManagerSuite) TestStartupRootCoordFailed() { func (s *ChannelManagerSuite) TestCheckLoop() {} func (s *ChannelManagerSuite) TestGet() {} + +func (s *ChannelManagerSuite) TestGetChannelWatchInfos() { + store := NewMockRWChannelStore(s.T()) + store.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{ + { + NodeID: 1, + Channels: map[string]RWChannel{ + "ch1": &channelMeta{ + WatchInfo: &datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ + ChannelName: "ch1", + }, + StartTs: 100, + State: datapb.ChannelWatchState_ToWatch, + OpID: 1, + }, + }, + }, + }, + { + NodeID: 2, + Channels: map[string]RWChannel{ + "ch2": &channelMeta{ + WatchInfo: &datapb.ChannelWatchInfo{ + Vchan: &datapb.VchannelInfo{ + ChannelName: "ch2", + }, + StartTs: 10, + State: datapb.ChannelWatchState_WatchSuccess, + OpID: 1, + }, + }, + }, + }, + }) + + cm := &ChannelManagerImpl{store: store} + infos := cm.GetChannelWatchInfos() + s.Equal(2, len(infos)) + s.Equal("ch1", infos[1]["ch1"].GetVchan().ChannelName) + s.Equal("ch2", infos[2]["ch2"].GetVchan().ChannelName) + + // test empty value + store.EXPECT().GetNodesChannels().Unset() + store.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{}) + infos = cm.GetChannelWatchInfos() + s.Equal(0, len(infos)) +} diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index cf8eb7d42d7a2..dbc60101d00cd 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -25,11 +25,11 @@ import ( "sync" "time" - "github.com/golang/protobuf/proto" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index dec075915c11d..9c947c88b9665 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -2025,3 +2025,28 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats return metricMutation, nil } + +func (m *meta) getSegmentsMetrics() []*metricsinfo.Segment { + m.RLock() + defer m.RUnlock() + + segments := make([]*metricsinfo.Segment, 0, len(m.segments.segments)) + for _, s := range m.segments.segments { + segments = append(segments, &metricsinfo.Segment{ + SegmentID: s.ID, + CollectionID: s.CollectionID, + PartitionID: s.PartitionID, + Channel: s.InsertChannel, + NumOfRows: s.NumOfRows, + State: s.State.String(), + MemSize: s.size.Load(), + Level: s.Level.String(), + IsImporting: s.IsImporting, + Compacted: s.Compacted, + IsSorted: s.IsSorted, + NodeID: paramtable.GetNodeID(), + }) + } + + return segments +} diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 8492173123b7b..f4d2d0060d07d 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "golang.org/x/exp/slices" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -43,6 +44,7 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/testutils" ) @@ -1246,3 +1248,62 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) { assert.NotNil(t, c) }) } + +func TestMeta_GetSegmentsJSON(t *testing.T) { + // Create a mock meta object + m := &meta{ + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + PartitionID: 1, + InsertChannel: "channel1", + NumOfRows: 100, + State: commonpb.SegmentState_Growing, + MaxRowNum: 1000, + Compacted: false, + }, + }, + 2: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 2, + PartitionID: 2, + InsertChannel: "channel2", + NumOfRows: 200, + State: commonpb.SegmentState_Sealed, + MaxRowNum: 2000, + Compacted: true, + }, + }, + }, + }, + } + + segments := m.getSegmentsMetrics() + + // Check the length of the segments + assert.Equal(t, 2, len(segments)) + + slices.SortFunc(segments, func(i, j *metricsinfo.Segment) int { return int(i.SegmentID - j.SegmentID) }) + + // Check the first segment + assert.Equal(t, int64(1), segments[0].SegmentID) + assert.Equal(t, int64(1), segments[0].CollectionID) + assert.Equal(t, int64(1), segments[0].PartitionID) + assert.Equal(t, "channel1", segments[0].Channel) + assert.Equal(t, int64(100), segments[0].NumOfRows) + assert.Equal(t, "Growing", segments[0].State) + assert.False(t, segments[0].Compacted) + + // Check the second segment + assert.Equal(t, int64(2), segments[1].SegmentID) + assert.Equal(t, int64(2), segments[1].CollectionID) + assert.Equal(t, int64(2), segments[1].PartitionID) + assert.Equal(t, "channel2", segments[1].Channel) + assert.Equal(t, int64(200), segments[1].NumOfRows) + assert.Equal(t, "Sealed", segments[1].State) + assert.True(t, segments[1].Compacted) +} diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index 1728aee08d0d9..60ca296679dcb 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -19,6 +19,7 @@ package datacoord import ( "context" "encoding/json" + "sync" "github.com/cockroachdb/errors" "go.uber.org/zap" @@ -27,8 +28,10 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/datacoord/session" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/metrics" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/merr" @@ -82,74 +85,90 @@ func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoor return ret } -// GetSyncTaskMetrics retrieves and aggregates the sync task metrics of the datanode. -func (s *Server) GetSyncTaskMetrics( - ctx context.Context, - req *milvuspb.GetMetricsRequest, -) (string, error) { - resp, err := s.requestDataNodeGetMetrics(ctx, req) - if err != nil { - return "", err +func (s *Server) getChannelsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { + channels, err := getMetrics[*metricsinfo.Channel](s, ctx, req) + // fill checkpoint timestamp + channel2Checkpoints := s.meta.GetChannelCheckpoints() + for _, channel := range channels { + if cp, ok := channel2Checkpoints[channel.Name]; ok { + channel.CheckpointTS = typeutil.TimestampToString(cp.GetTimestamp()) + } else { + log.Warn("channel not found in meta cache", zap.String("channel", channel.Name)) + } } + return metricsinfo.MarshalGetMetricsValues(channels, err) +} - tasks := make(map[string][]*metricsinfo.SyncTask, resp.Len()) - resp.Range(func(key string, value *milvuspb.GetMetricsResponse) bool { - if value.Response != "" { - var sts []*metricsinfo.SyncTask - if err1 := json.Unmarshal([]byte(value.Response), &sts); err1 != nil { - log.Warn("failed to unmarshal sync task metrics") - err = err1 - return false +// mergeChannels merges the channel metrics from data nodes and channel watch infos from channel manager +// dnChannels: a slice of Channel metrics from data nodes +// dcChannels: a map of channel watch infos from the channel manager, keyed by node ID and channel name +func mergeChannels(dnChannels []*metricsinfo.Channel, dcChannels map[int64]map[string]*datapb.ChannelWatchInfo) []*metricsinfo.Channel { + mergedChannels := make([]*metricsinfo.Channel, 0) + + // Add or update channels from data nodes + for _, dnChannel := range dnChannels { + if dcChannelMap, ok := dcChannels[dnChannel.NodeID]; ok { + if dcChannel, ok := dcChannelMap[dnChannel.Name]; ok { + dnChannel.WatchState = dcChannel.State.String() + delete(dcChannelMap, dnChannel.Name) } - tasks[key] = sts } - return true - }) - - if err != nil { - return "", err + mergedChannels = append(mergedChannels, dnChannel) } - if len(tasks) == 0 { - return "", nil + // Add remaining channels from channel manager + for nodeID, dcChannelMap := range dcChannels { + for _, dcChannel := range dcChannelMap { + mergedChannels = append(mergedChannels, &metricsinfo.Channel{ + Name: dcChannel.Vchan.ChannelName, + CollectionID: dcChannel.Vchan.CollectionID, + WatchState: dcChannel.State.String(), + NodeID: nodeID, + }) + } } - bs, err := json.Marshal(tasks) - if err != nil { - return "", err - } - return (string)(bs), nil + return mergedChannels } -func (s *Server) requestDataNodeGetMetrics( - ctx context.Context, - req *milvuspb.GetMetricsRequest, -) (*typeutil.ConcurrentMap[string, *milvuspb.GetMetricsResponse], error) { - nodes := s.cluster.GetSessions() +func (s *Server) getDistJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) string { + segments := s.meta.getSegmentsMetrics() + var channels []*metricsinfo.DmChannel + for nodeID, ch := range s.channelManager.GetChannelWatchInfos() { + for _, chInfo := range ch { + dmChannel := metrics.NewDMChannelFrom(chInfo.GetVchan()) + dmChannel.NodeID = nodeID + dmChannel.WatchState = chInfo.State.String() + dmChannel.StartWatchTS = chInfo.GetStartTs() + channels = append(channels, dmChannel) + } + } - rets := typeutil.NewConcurrentMap[string, *milvuspb.GetMetricsResponse]() - wg, ctx := errgroup.WithContext(ctx) - for _, node := range nodes { - wg.Go(func() error { - cli, err := node.GetOrCreateClient(ctx) - if err != nil { - return err - } - ret, err := cli.GetMetrics(ctx, req) - if err != nil { - return err - } - key := metricsinfo.ConstructComponentName(typeutil.DataNodeRole, node.NodeID()) - rets.Insert(key, ret) - return nil - }) + if len(segments) == 0 && len(channels) == 0 { + return "" } - err := wg.Wait() + dist := &metricsinfo.DataCoordDist{ + Segments: segments, + DMChannels: channels, + } + + bs, err := json.Marshal(dist) if err != nil { - return nil, err + log.Warn("marshal dist value failed", zap.String("err", err.Error())) + return "" } - return rets, nil + return string(bs) +} + +func (s *Server) getSegmentsJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { + ret, err := getMetrics[*metricsinfo.Segment](s, ctx, req) + return metricsinfo.MarshalGetMetricsValues(ret, err) +} + +func (s *Server) getSyncTaskJSON(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { + ret, err := getMetrics[*metricsinfo.SyncTask](s, ctx, req) + return metricsinfo.MarshalGetMetricsValues(ret, err) } // getSystemInfoMetrics composes data cluster metrics @@ -322,3 +341,44 @@ func (s *Server) getIndexNodeMetrics(ctx context.Context, req *milvuspb.GetMetri infos.BaseComponentInfos.HasError = false return infos, nil } + +// getMetrics retrieves and aggregates the metrics of the datanode to a slice +func getMetrics[T any](s *Server, ctx context.Context, req *milvuspb.GetMetricsRequest) ([]T, error) { + var metrics []T + var mu sync.Mutex + errorGroup, ctx := errgroup.WithContext(ctx) + + nodes := s.cluster.GetSessions() + for _, node := range nodes { + errorGroup.Go(func() error { + cli, err := node.GetOrCreateClient(ctx) + if err != nil { + return err + } + resp, err := cli.GetMetrics(ctx, req) + if err != nil { + log.Warn("failed to get metric from DataNode", zap.Int64("nodeID", node.NodeID())) + return err + } + + if resp.Response == "" { + return nil + } + + var infos []T + err = json.Unmarshal([]byte(resp.Response), &infos) + if err != nil { + log.Warn("invalid metrics of data node was found", zap.Error(err)) + return err + } + + mu.Lock() + metrics = append(metrics, infos...) + mu.Unlock() + return nil + }) + } + + err := errorGroup.Wait() + return metrics, err +} diff --git a/internal/datacoord/metrics_info_test.go b/internal/datacoord/metrics_info_test.go index f85d43998ef95..fe6114c067052 100644 --- a/internal/datacoord/metrics_info_test.go +++ b/internal/datacoord/metrics_info_test.go @@ -18,17 +18,22 @@ package datacoord import ( "context" + "encoding/json" "testing" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "google.golang.org/grpc" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/datacoord/session" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -206,10 +211,25 @@ func TestGetSyncTaskMetrics(t *testing.T) { req := &milvuspb.GetMetricsRequest{} ctx := context.Background() - task := `[{"segment_id": 1, "batch_rows": 100, "segment_level": "L0", "ts_from": 1000, "ts_to": 2000,"delta_row_count": 10, "flush_size": 1024, "running_time": 2000000000}]` + tasks := []metricsinfo.SyncTask{ + { + SegmentID: 1, + BatchRows: 100, + SegmentLevel: "L0", + TSFrom: 1000, + TSTo: 2000, + DeltaRowCount: 10, + FlushSize: 1024, + RunningTime: "2h", + }, + } + tasksBytes, err := json.Marshal(tasks) + assert.NoError(t, err) + expectedJSON := string(tasksBytes) + mockResp := &milvuspb.GetMetricsResponse{ Status: merr.Success(), - Response: task, + Response: expectedJSON, } mockClient := &mockMetricDataNodeClient{ @@ -226,9 +246,8 @@ func TestGetSyncTaskMetrics(t *testing.T) { mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) svr.cluster = mockCluster - actualJSON, err := svr.GetSyncTaskMetrics(ctx, req) + actualJSON, err := svr.getSyncTaskJSON(ctx, req) assert.NoError(t, err) - expectedJSON := `{"datanode1":[{"segment_id":1,"batch_rows":100,"segment_level":"L0","ts_from":1000,"ts_to":2000,"delta_row_count":10,"flush_size":1024,"running_time":2000000000}]}` assert.Equal(t, expectedJSON, actualJSON) }) @@ -250,7 +269,7 @@ func TestGetSyncTaskMetrics(t *testing.T) { mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) svr.cluster = mockCluster - actualJSON, err := svr.GetSyncTaskMetrics(ctx, req) + actualJSON, err := svr.getSyncTaskJSON(ctx, req) assert.Error(t, err) assert.Equal(t, "", actualJSON) }) @@ -278,7 +297,7 @@ func TestGetSyncTaskMetrics(t *testing.T) { mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) svr.cluster = mockCluster - actualJSON, err := svr.GetSyncTaskMetrics(ctx, req) + actualJSON, err := svr.getSyncTaskJSON(ctx, req) assert.Error(t, err) assert.Equal(t, "", actualJSON) }) @@ -307,8 +326,360 @@ func TestGetSyncTaskMetrics(t *testing.T) { svr.cluster = mockCluster expectedJSON := "" - actualJSON, err := svr.GetSyncTaskMetrics(ctx, req) + actualJSON, err := svr.getSyncTaskJSON(ctx, req) + assert.NoError(t, err) + assert.Equal(t, expectedJSON, actualJSON) + }) +} + +func TestGetSegmentsJSON(t *testing.T) { + svr := Server{} + t.Run("ReturnsCorrectJSON", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + segments := []*metricsinfo.Segment{ + { + SegmentID: 1, + CollectionID: 100, + PartitionID: 10, + NumOfRows: 1000, + State: "Flushed", + }, + } + segmentsBytes, err := json.Marshal(segments) + assert.NoError(t, err) + expectedJSON := string(segmentsBytes) + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: expectedJSON, + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + actualJSON, err := svr.getSegmentsJSON(ctx, req) + assert.NoError(t, err) + assert.Equal(t, expectedJSON, actualJSON) + }) + + t.Run("ReturnsErrorOnRequestFailure", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return nil, errors.New("request failed") + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + actualJSON, err := svr.getSegmentsJSON(ctx, req) + assert.Error(t, err) + assert.Equal(t, "", actualJSON) + }) + + t.Run("ReturnsErrorOnUnmarshalFailure", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: `invalid json`, + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + actualJSON, err := svr.getSegmentsJSON(ctx, req) + assert.Error(t, err) + assert.Equal(t, "", actualJSON) + }) + + t.Run("ReturnsEmptyJSONWhenNoSegments", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: "", + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + expectedJSON := "" + actualJSON, err := svr.getSegmentsJSON(ctx, req) + assert.NoError(t, err) + assert.Equal(t, expectedJSON, actualJSON) + }) +} + +func TestGetChannelsJSON(t *testing.T) { + svr := Server{} + t.Run("ReturnsCorrectJSON", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + + channels := []*metricsinfo.Channel{ + { + Name: "channel1", + CollectionID: 100, + NodeID: 1, + }, + } + channelsBytes, err := json.Marshal(channels) + assert.NoError(t, err) + channelJSON := string(channelsBytes) + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: channelJSON, + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + svr.meta = &meta{channelCPs: newChannelCps()} + svr.meta.channelCPs.checkpoints["channel1"] = &msgpb.MsgPosition{Timestamp: 1000} + + actualJSON, err := svr.getChannelsJSON(context.TODO(), req) + assert.NoError(t, err) + + channels = []*metricsinfo.Channel{ + { + Name: "channel1", + CollectionID: 100, + NodeID: 1, + CheckpointTS: typeutil.TimestampToString(1000), + }, + } + channelsBytes, err = json.Marshal(channels) + assert.NoError(t, err) + expectedJSON := string(channelsBytes) + + assert.Equal(t, expectedJSON, actualJSON) + }) + + t.Run("ReturnsErrorOnRequestFailure", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return nil, errors.New("request failed") + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + svr.meta = &meta{channelCPs: newChannelCps()} + + actualJSON, err := svr.getChannelsJSON(ctx, req) + assert.Error(t, err) + assert.Equal(t, "", actualJSON) + }) + + t.Run("ReturnsErrorOnUnmarshalFailure", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: `invalid json`, + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + + svr.meta = &meta{channelCPs: newChannelCps()} + + actualJSON, err := svr.getChannelsJSON(ctx, req) + assert.Error(t, err) + assert.Equal(t, "", actualJSON) + }) + + t.Run("ReturnsEmptyJSONWhenNoChannels", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + mockResp := &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: "", + } + + mockClient := &mockMetricDataNodeClient{ + mock: func() (*milvuspb.GetMetricsResponse, error) { + return mockResp, nil + }, + } + + dataNodeCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { + return mockClient, nil + } + + mockCluster := NewMockCluster(t) + mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)}) + svr.cluster = mockCluster + svr.meta = &meta{channelCPs: newChannelCps()} + + expectedJSON := "" + actualJSON, err := svr.getChannelsJSON(ctx, req) + assert.NoError(t, err) + assert.Equal(t, expectedJSON, actualJSON) + }) +} + +func TestGetDistJSON(t *testing.T) { + svr := Server{} + nodeID := paramtable.GetNodeID() + paramtable.SetNodeID(1) + defer paramtable.SetNodeID(nodeID) + + t.Run("ReturnsCorrectJSON", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + svr.meta = &meta{ + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + PartitionID: 1, + InsertChannel: "channel1", + Level: datapb.SegmentLevel_L1, + State: commonpb.SegmentState_Flushed, + }, + }, + }, + }, + } + + cm := NewMockChannelManager(t) + cm.EXPECT().GetChannelWatchInfos().Return(map[int64]map[string]*datapb.ChannelWatchInfo{ + 1: { + "channel1": { + State: datapb.ChannelWatchState_ToWatch, + Vchan: &datapb.VchannelInfo{ + ChannelName: "channel1", + }, + }, + }, + }) + + svr.channelManager = cm + + segments := []*metricsinfo.Segment{ + { + SegmentID: 1, + State: commonpb.SegmentState_Flushed.String(), + CollectionID: 1, + PartitionID: 1, + Channel: "channel1", + Level: datapb.SegmentLevel_L1.String(), + NodeID: 1, + }, + } + channels := []*metricsinfo.DmChannel{ + { + ChannelName: "channel1", + NodeID: 1, + WatchState: datapb.ChannelWatchState_ToWatch.String(), + }, + } + dist := &metricsinfo.DataCoordDist{ + Segments: segments, + DMChannels: channels, + } + distBytes, err := json.Marshal(dist) assert.NoError(t, err) + expectedJSON := string(distBytes) + + actualJSON := svr.getDistJSON(ctx, req) + assert.Equal(t, expectedJSON, actualJSON) + }) + + t.Run("ReturnsEmptyJSONWhenNoDist", func(t *testing.T) { + req := &milvuspb.GetMetricsRequest{} + ctx := context.Background() + + svr.meta = &meta{segments: &SegmentsInfo{segments: map[int64]*SegmentInfo{}}} + cm := NewMockChannelManager(t) + cm.EXPECT().GetChannelWatchInfos().Return(map[int64]map[string]*datapb.ChannelWatchInfo{}) + + svr.channelManager = cm + expectedJSON := "" + actualJSON := svr.getDistJSON(ctx, req) assert.Equal(t, expectedJSON, actualJSON) }) } diff --git a/internal/datacoord/mock_channelmanager.go b/internal/datacoord/mock_channelmanager.go index 9a61852db87c3..fccecee419de2 100644 --- a/internal/datacoord/mock_channelmanager.go +++ b/internal/datacoord/mock_channelmanager.go @@ -5,6 +5,7 @@ package datacoord import ( context "context" + datapb "github.com/milvus-io/milvus/internal/proto/datapb" mock "github.com/stretchr/testify/mock" ) @@ -308,6 +309,53 @@ func (_c *MockChannelManager_GetChannelNamesByCollectionID_Call) RunAndReturn(ru return _c } +// GetChannelWatchInfos provides a mock function with given fields: +func (_m *MockChannelManager) GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetChannelWatchInfos") + } + + var r0 map[int64]map[string]*datapb.ChannelWatchInfo + if rf, ok := ret.Get(0).(func() map[int64]map[string]*datapb.ChannelWatchInfo); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]map[string]*datapb.ChannelWatchInfo) + } + } + + return r0 +} + +// MockChannelManager_GetChannelWatchInfos_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelWatchInfos' +type MockChannelManager_GetChannelWatchInfos_Call struct { + *mock.Call +} + +// GetChannelWatchInfos is a helper method to define mock.On call +func (_e *MockChannelManager_Expecter) GetChannelWatchInfos() *MockChannelManager_GetChannelWatchInfos_Call { + return &MockChannelManager_GetChannelWatchInfos_Call{Call: _e.mock.On("GetChannelWatchInfos")} +} + +func (_c *MockChannelManager_GetChannelWatchInfos_Call) Run(run func()) *MockChannelManager_GetChannelWatchInfos_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockChannelManager_GetChannelWatchInfos_Call) Return(_a0 map[int64]map[string]*datapb.ChannelWatchInfo) *MockChannelManager_GetChannelWatchInfos_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockChannelManager_GetChannelWatchInfos_Call) RunAndReturn(run func() map[int64]map[string]*datapb.ChannelWatchInfo) *MockChannelManager_GetChannelWatchInfos_Call { + _c.Call.Return(run) + return _c +} + // GetChannelsByCollectionID provides a mock function with given fields: collectionID func (_m *MockChannelManager) GetChannelsByCollectionID(collectionID int64) []RWChannel { ret := _m.Called(collectionID) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 3046183c2de68..001663cbb041a 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1137,6 +1137,11 @@ func (s *Server) registerMetricsRequest() { return s.getSystemInfoMetrics(ctx, req) }) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataDist, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.getDistJSON(ctx, req), nil + }) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.ImportTasks, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return s.importMeta.TaskStatsJSON(), nil @@ -1154,8 +1159,19 @@ func (s *Server) registerMetricsRequest() { s.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return s.GetSyncTaskMetrics(ctx, req) + return s.getSyncTaskJSON(ctx, req) + }) + + s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.getSegmentsJSON(ctx, req) }) + + s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.getChannelsJSON(ctx, req) + }) + log.Info("register metrics actions finished") } diff --git a/internal/datacoord/stats_task_meta.go b/internal/datacoord/stats_task_meta.go index 2c46d68cfd6c2..4734ee45c76b9 100644 --- a/internal/datacoord/stats_task_meta.go +++ b/internal/datacoord/stats_task_meta.go @@ -22,8 +22,8 @@ import ( "strconv" "sync" - "github.com/golang/protobuf/proto" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/proto/indexpb" diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index eccba38a1580c..bbf62f484db9d 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -284,10 +284,21 @@ func (node *DataNode) registerMetricsRequest() { func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return node.getSystemInfoMetrics(ctx, req) }) + node.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return node.syncMgr.TaskStatsJSON(), nil }) + + node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return node.flowgraphManager.GetSegmentsJSON(), nil + }) + + node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return node.flowgraphManager.GetChannelsJSON(), nil + }) log.Info("register metrics actions finished") } diff --git a/internal/flushcommon/metacache/segment.go b/internal/flushcommon/metacache/segment.go index 8c4906ff7201e..4bdadda6177ad 100644 --- a/internal/flushcommon/metacache/segment.go +++ b/internal/flushcommon/metacache/segment.go @@ -87,6 +87,14 @@ func (s *SegmentInfo) Level() datapb.SegmentLevel { return s.level } +func (s *SegmentInfo) BufferRows() int64 { + return s.bufferRows +} + +func (s *SegmentInfo) SyncingRows() int64 { + return s.syncingRows +} + func (s *SegmentInfo) Clone() *SegmentInfo { return &SegmentInfo{ segmentID: s.segmentID, diff --git a/internal/flushcommon/pipeline/flow_graph_manager.go b/internal/flushcommon/pipeline/flow_graph_manager.go index 2d8b930442f81..1cf36a177417f 100644 --- a/internal/flushcommon/pipeline/flow_graph_manager.go +++ b/internal/flushcommon/pipeline/flow_graph_manager.go @@ -18,6 +18,7 @@ package pipeline import ( "context" + "encoding/json" "fmt" "go.uber.org/zap" @@ -25,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/flushcommon/util" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -40,6 +42,8 @@ type FlowgraphManager interface { GetFlowgraphCount() int GetCollectionIDs() []int64 + GetChannelsJSON() string + GetSegmentsJSON() string Close() } @@ -115,6 +119,59 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 { return collectionSet.Collect() } +// GetChannelsJSON returns all channels in json format. +func (fm *fgManagerImpl) GetChannelsJSON() string { + var channels []*metricsinfo.Channel + fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool { + latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch) + channels = append(channels, &metricsinfo.Channel{ + Name: ch, + WatchState: ds.fg.Status(), + LatestTimeTick: typeutil.TimestampToString(latestTimeTick), + NodeID: paramtable.GetNodeID(), + CollectionID: ds.metacache.Collection(), + }) + return true + }) + + ret, err := json.Marshal(channels) + if err != nil { + log.Warn("failed to marshal channels", zap.Error(err)) + return "" + } + return string(ret) +} + +func (fm *fgManagerImpl) GetSegmentsJSON() string { + var segments []*metricsinfo.Segment + fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool { + meta := ds.metacache + for _, segment := range meta.GetSegmentsBy() { + segments = append(segments, &metricsinfo.Segment{ + SegmentID: segment.SegmentID(), + CollectionID: meta.Collection(), + PartitionID: segment.PartitionID(), + Channel: ch, + State: segment.State().String(), + Level: segment.Level().String(), + NodeID: paramtable.GetNodeID(), + NumOfRows: segment.NumOfRows(), + FlushedRows: segment.FlushedRows(), + SyncBufferRows: segment.BufferRows(), + SyncingRows: segment.SyncingRows(), + }) + } + return true + }) + + ret, err := json.Marshal(segments) + if err != nil { + log.Warn("failed to marshal segments", zap.Error(err)) + return "" + } + return string(ret) +} + func (fm *fgManagerImpl) Close() { fm.cancelFunc() } diff --git a/internal/flushcommon/pipeline/flow_graph_manager_test.go b/internal/flushcommon/pipeline/flow_graph_manager_test.go index aaa25cb4f5196..3cb27e74459f7 100644 --- a/internal/flushcommon/pipeline/flow_graph_manager_test.go +++ b/internal/flushcommon/pipeline/flow_graph_manager_test.go @@ -18,8 +18,8 @@ package pipeline import ( "context" + "encoding/json" "fmt" - "math/rand" "os" "testing" @@ -30,15 +30,20 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/flushcommon/broker" + "github.com/milvus-io/milvus/internal/flushcommon/metacache" + "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" "github.com/milvus-io/milvus/internal/flushcommon/util" "github.com/milvus-io/milvus/internal/flushcommon/writebuffer" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/util/metrics" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestMain(t *testing.M) { @@ -98,7 +103,7 @@ func TestFlowGraphManager(t *testing.T) { } func generateChannelWatchInfo() *datapb.ChannelWatchInfo { - collectionID := int64(rand.Uint32()) + collectionID := int64(1) dmChannelName := fmt.Sprintf("%s_%d", "fake-ch-", collectionID) schema := &schemapb.CollectionSchema{ Name: fmt.Sprintf("%s_%d", "collection_", collectionID), @@ -124,3 +129,105 @@ func generateChannelWatchInfo() *datapb.ChannelWatchInfo { Schema: schema, } } + +type mockTimeSender struct{} + +func (m *mockTimeSender) Update(channel string, ts typeutil.Timestamp, stats []*commonpb.SegmentStats) { + panic("implement me") +} + +func (m *mockTimeSender) GetLatestTimestamp(channel string) typeutil.Timestamp { + return 0 +} + +func newFlowGraphManager(t *testing.T) (string, FlowgraphManager) { + mockBroker := broker.NewMockBroker(t) + mockBroker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() + mockBroker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() + mockBroker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe() + + wbm := writebuffer.NewMockBufferManager(t) + wbm.EXPECT().Register(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + dispClient := msgdispatcher.NewMockClient(t) + dispClient.EXPECT().Register(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(make(chan *msgstream.MsgPack), nil) + + pipelineParams := &util.PipelineParams{ + Ctx: context.TODO(), + Session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 0}}, + Broker: mockBroker, + TimeTickSender: &mockTimeSender{}, + DispClient: dispClient, + WriteBufferManager: wbm, + } + + chanWatchInfo := generateChannelWatchInfo() + ds, err := NewDataSyncService( + context.TODO(), + pipelineParams, + chanWatchInfo, + util.NewTickler(), + ) + assert.NoError(t, err) + + fm := NewFlowgraphManager() + fm.AddFlowgraph(ds) + return ds.vchannelName, fm +} + +func TestGetChannelsJSON(t *testing.T) { + paramtable.SetNodeID(1) + _, fm := newFlowGraphManager(t) + obj := []*metricsinfo.Channel{ + { + Name: "fake-ch-_1", + WatchState: "Healthy", + LatestTimeTick: typeutil.TimestampToString(0), + NodeID: paramtable.GetNodeID(), + CollectionID: 1, + }, + } + + expectedBytes, err := json.Marshal(obj) + assert.NoError(t, err) + expectedJSON := string(expectedBytes) + + jsonResult := fm.GetChannelsJSON() + assert.JSONEq(t, expectedJSON, jsonResult) +} + +func TestGetSegmentJSON(t *testing.T) { + ch, fm := newFlowGraphManager(t) + ds, ok := fm.GetFlowgraphService(ch) + assert.True(t, ok) + + nodeID := paramtable.GetNodeID() + paramtable.SetNodeID(1) + defer paramtable.SetNodeID(nodeID) + + pkStatsFactory := func(*datapb.SegmentInfo) pkoracle.PkStat { + return pkoracle.NewBloomFilterSet() + } + segment := &datapb.SegmentInfo{ + ID: 1, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + NumOfRows: 10240, + CollectionID: 1, + } + + s := metrics.NewSegmentFrom(segment) + s.NodeID = 1 + s.Channel = "fake-ch-_1" + s.FlushedRows = 10240 + + expectedBytes, err := json.Marshal([]*metricsinfo.Segment{s}) + assert.NoError(t, err) + expectedJSON := string(expectedBytes) + + ds.metacache.AddSegment(segment, pkStatsFactory, metacache.NoneBm25StatsFactory) + jsonResult := fm.GetSegmentsJSON() + fmt.Println(jsonResult) + assert.JSONEq(t, expectedJSON, jsonResult) +} diff --git a/internal/flushcommon/pipeline/mock_fgmanager.go b/internal/flushcommon/pipeline/mock_fgmanager.go index 6945e21ff271f..cf8cd6b2aa1ca 100644 --- a/internal/flushcommon/pipeline/mock_fgmanager.go +++ b/internal/flushcommon/pipeline/mock_fgmanager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package pipeline @@ -114,10 +114,59 @@ func (_c *MockFlowgraphManager_Close_Call) RunAndReturn(run func()) *MockFlowgra return _c } +// GetChannelsJSON provides a mock function with given fields: +func (_m *MockFlowgraphManager) GetChannelsJSON() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetChannelsJSON") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockFlowgraphManager_GetChannelsJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelsJSON' +type MockFlowgraphManager_GetChannelsJSON_Call struct { + *mock.Call +} + +// GetChannelsJSON is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) GetChannelsJSON() *MockFlowgraphManager_GetChannelsJSON_Call { + return &MockFlowgraphManager_GetChannelsJSON_Call{Call: _e.mock.On("GetChannelsJSON")} +} + +func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Run(run func()) *MockFlowgraphManager_GetChannelsJSON_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Return(_a0 string) *MockFlowgraphManager_GetChannelsJSON_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlowgraphManager_GetChannelsJSON_Call) RunAndReturn(run func() string) *MockFlowgraphManager_GetChannelsJSON_Call { + _c.Call.Return(run) + return _c +} + // GetCollectionIDs provides a mock function with given fields: func (_m *MockFlowgraphManager) GetCollectionIDs() []int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetCollectionIDs") + } + var r0 []int64 if rf, ok := ret.Get(0).(func() []int64); ok { r0 = rf() @@ -161,6 +210,10 @@ func (_c *MockFlowgraphManager_GetCollectionIDs_Call) RunAndReturn(run func() [] func (_m *MockFlowgraphManager) GetFlowgraphCount() int { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetFlowgraphCount") + } + var r0 int if rf, ok := ret.Get(0).(func() int); ok { r0 = rf() @@ -202,6 +255,10 @@ func (_c *MockFlowgraphManager_GetFlowgraphCount_Call) RunAndReturn(run func() i func (_m *MockFlowgraphManager) GetFlowgraphService(channel string) (*DataSyncService, bool) { ret := _m.Called(channel) + if len(ret) == 0 { + panic("no return value specified for GetFlowgraphService") + } + var r0 *DataSyncService var r1 bool if rf, ok := ret.Get(0).(func(string) (*DataSyncService, bool)); ok { @@ -252,10 +309,59 @@ func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(s return _c } +// GetSegmentsJSON provides a mock function with given fields: +func (_m *MockFlowgraphManager) GetSegmentsJSON() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetSegmentsJSON") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockFlowgraphManager_GetSegmentsJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentsJSON' +type MockFlowgraphManager_GetSegmentsJSON_Call struct { + *mock.Call +} + +// GetSegmentsJSON is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) GetSegmentsJSON() *MockFlowgraphManager_GetSegmentsJSON_Call { + return &MockFlowgraphManager_GetSegmentsJSON_Call{Call: _e.mock.On("GetSegmentsJSON")} +} + +func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Run(run func()) *MockFlowgraphManager_GetSegmentsJSON_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Return(_a0 string) *MockFlowgraphManager_GetSegmentsJSON_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) RunAndReturn(run func() string) *MockFlowgraphManager_GetSegmentsJSON_Call { + _c.Call.Return(run) + return _c +} + // HasFlowgraph provides a mock function with given fields: channel func (_m *MockFlowgraphManager) HasFlowgraph(channel string) bool { ret := _m.Called(channel) + if len(ret) == 0 { + panic("no return value specified for HasFlowgraph") + } + var r0 bool if rf, ok := ret.Get(0).(func(string) bool); ok { r0 = rf(channel) @@ -298,6 +404,10 @@ func (_c *MockFlowgraphManager_HasFlowgraph_Call) RunAndReturn(run func(string) func (_m *MockFlowgraphManager) HasFlowgraphWithOpID(channel string, opID int64) bool { ret := _m.Called(channel, opID) + if len(ret) == 0 { + panic("no return value specified for HasFlowgraphWithOpID") + } + var r0 bool if rf, ok := ret.Get(0).(func(string, int64) bool); ok { r0 = rf(channel, opID) diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index 86ae63df67a86..9ebcbaae38291 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -422,10 +422,11 @@ func (t *SyncTask) MarshalJSON() ([]byte, error) { SegmentID: t.segmentID, BatchRows: t.batchRows, SegmentLevel: t.level.String(), - TsFrom: t.tsFrom, - TsTo: t.tsTo, + TSFrom: t.tsFrom, + TSTo: t.tsTo, DeltaRowCount: t.deltaRowCount, FlushSize: t.flushedSize, - RunningTime: t.execTime, + RunningTime: t.execTime.String(), + NodeID: paramtable.GetNodeID(), }) } diff --git a/internal/flushcommon/syncmgr/task_test.go b/internal/flushcommon/syncmgr/task_test.go index e132cdb963b96..bb2c334e785d9 100644 --- a/internal/flushcommon/syncmgr/task_test.go +++ b/internal/flushcommon/syncmgr/task_test.go @@ -35,10 +35,12 @@ import ( "github.com/milvus-io/milvus/internal/flushcommon/broker" "github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" + "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -383,7 +385,7 @@ func (s *SyncTaskSuite) TestNextID() { } func (s *SyncTaskSuite) TestSyncTask_MarshalJSON() { - task := &SyncTask{ + t := &SyncTask{ segmentID: 12345, batchRows: 100, level: datapb.SegmentLevel_L0, @@ -394,18 +396,22 @@ func (s *SyncTaskSuite) TestSyncTask_MarshalJSON() { execTime: 2 * time.Second, } - expectedJSON := `{ - "segment_id": 12345, - "batch_rows": 100, - "segment_level": "L0", - "ts_from": 1000, - "ts_to": 2000, - "delta_row_count": 10, - "flush_size": 1024, - "running_time": 2000000000 - }` - - data, err := task.MarshalJSON() + tm := &metricsinfo.SyncTask{ + SegmentID: t.segmentID, + BatchRows: t.batchRows, + SegmentLevel: t.level.String(), + TSFrom: t.tsFrom, + TSTo: t.tsTo, + DeltaRowCount: t.deltaRowCount, + FlushSize: t.flushedSize, + RunningTime: t.execTime.String(), + NodeID: paramtable.GetNodeID(), + } + expectedBytes, err := json.Marshal(tm) + s.NoError(err) + expectedJSON := string(expectedBytes) + + data, err := t.MarshalJSON() s.NoError(err) s.JSONEq(expectedJSON, string(data)) } diff --git a/internal/flushcommon/util/timetick_sender.go b/internal/flushcommon/util/timetick_sender.go index 25e889b5a2e4c..d18e4644338c0 100644 --- a/internal/flushcommon/util/timetick_sender.go +++ b/internal/flushcommon/util/timetick_sender.go @@ -36,6 +36,7 @@ import ( type StatsUpdater interface { Update(channel string, ts typeutil.Timestamp, stats []*commonpb.SegmentStats) + GetLatestTimestamp(channel string) typeutil.Timestamp } // TimeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically @@ -126,6 +127,17 @@ func (m *TimeTickSender) Update(channelName string, timestamp uint64, segStats [ m.statsCache[channelName].lastTs = timestamp } +func (m *TimeTickSender) GetLatestTimestamp(channel string) typeutil.Timestamp { + m.mu.RLock() + defer m.mu.RUnlock() + chStats, ok := m.statsCache[channel] + if !ok { + log.Warn("channel not found in TimeTickSender", zap.String("channel", channel)) + return 0 + } + return chStats.lastTs +} + func (m *TimeTickSender) assembleDatanodeTtMsg() ([]*msgpb.DataNodeTtMsg, map[string]uint64) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/internal/http/router.go b/internal/http/router.go index 4d6acec3f7adc..2859704f480d8 100644 --- a/internal/http/router.go +++ b/internal/http/router.go @@ -65,19 +65,46 @@ const ( // for WebUI restful api root path const ( - ClusterInfoPath = "/_cluster/info" - ClusterConfigsPath = "/_cluster/configs" - ClusterClientsPath = "/_cluster/clients" + // ClusterInfoPath is the path to get cluster information. + ClusterInfoPath = "/_cluster/info" + // ClusterConfigsPath is the path to get cluster configurations. + ClusterConfigsPath = "/_cluster/configs" + // ClusterClientsPath is the path to get connected clients. + ClusterClientsPath = "/_cluster/clients" + // ClusterDependenciesPath is the path to get cluster dependencies. ClusterDependenciesPath = "/_cluster/dependencies" - HookConfigsPath = "/_hook/configs" - QCoordSegmentsPath = "/_qcoord/segments" - QCoordChannelsPath = "/_qcoord/channels" - QCoordAllTasksPath = "/_qcoord/tasks/all" + // HookConfigsPath is the path to get hook configurations. + HookConfigsPath = "/_hook/configs" - DCoordAllTasksPath = "/_dcoord/tasks/all" - DCoordImportTasksPath = "/_dcoord/tasks/import" - DCoordCompactionTasksPath = "/_dcoord/tasks/compaction" - DCoordBuildIndexTasksPath = "/_dcoord/tasks/build_index" + // QCDistPath is the path to get QueryCoord distribution. + QCDistPath = "/_qc/dist" + // QCTargetPath is the path to get QueryCoord target. + QCTargetPath = "/_qc/target" + // QCReplicaPath is the path to get QueryCoord replica. + QCReplicaPath = "/_qc/replica" + // QCResourceGroupPath is the path to get QueryCoord resource group. + QCResourceGroupPath = "/_qc/resource_group" + // QCAllTasksPath is the path to get all tasks in QueryCoord. + QCAllTasksPath = "/_qc/tasks" - DNodeSyncTasksPath = "/_dnode/tasks/sync" + // QNSegmentsPath is the path to get segments in QueryNode. + QNSegmentsPath = "/_qn/segments" + // QNChannelsPath is the path to get channels in QueryNode. + QNChannelsPath = "/_qn/channels" + + // DCDistPath is the path to get all segments and channels distribution in DataCoord. + DCDistPath = "/_dc/dist" + // DCImportTasksPath is the path to get import tasks in DataCoord. + DCImportTasksPath = "/_dc/tasks/import" + // DCCompactionTasksPath is the path to get compaction tasks in DataCoord. + DCCompactionTasksPath = "/_dc/tasks/compaction" + // DCBuildIndexTasksPath is the path to get build index tasks in DataCoord. + DCBuildIndexTasksPath = "/_dc/tasks/build_index" + + // DNSyncTasksPath is the path to get sync tasks in DataNode. + DNSyncTasksPath = "/_dn/tasks/sync" + // DNSegmentsPath is the path to get segments in DataNode. + DNSegmentsPath = "/_dn/segments" + // DNChannelsPath is the path to get channels in DataNode. + DNChannelsPath = "/_dn/channels" ) diff --git a/internal/http/webui/header.html b/internal/http/webui/header.html index 4fda9e47362df..e87bd4472adfb 100644 --- a/internal/http/webui/header.html +++ b/internal/http/webui/header.html @@ -7,9 +7,6 @@ Home