diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index 1728aee08d0d9..359375127c475 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -82,8 +82,9 @@ func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoor return ret } -// GetSyncTaskMetrics retrieves and aggregates the sync task metrics of the datanode. -func (s *Server) GetSyncTaskMetrics( +// getSliceMetrics retrieves and aggregates the metrics of the datanode to a slice and complete marshal. +func getSliceMetrics[T any]( + s *Server, ctx context.Context, req *milvuspb.GetMetricsRequest, ) (string, error) { @@ -92,10 +93,10 @@ func (s *Server) GetSyncTaskMetrics( return "", err } - tasks := make(map[string][]*metricsinfo.SyncTask, resp.Len()) + tasks := make(map[string][]T, resp.Len()) resp.Range(func(key string, value *milvuspb.GetMetricsResponse) bool { if value.Response != "" { - var sts []*metricsinfo.SyncTask + var sts []T if err1 := json.Unmarshal([]byte(value.Response), &sts); err1 != nil { log.Warn("failed to unmarshal sync task metrics") err = err1 diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 3046183c2de68..050934106af70 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1154,8 +1154,21 @@ func (s *Server) registerMetricsRequest() { s.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { - return s.GetSyncTaskMetrics(ctx, req) + return getSliceMetrics[*metricsinfo.SyncTask](s, ctx, req) }) + + s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return getSliceMetrics[*metricsinfo.Segment](s, ctx, req) + }) + + // TODO merge channel manager information into return value + s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return getSliceMetrics[*metricsinfo.Channel](s, ctx, req) + }) + + // TODO add checkpoint metrics log.Info("register metrics actions finished") } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index eccba38a1580c..bbf62f484db9d 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -284,10 +284,21 @@ func (node *DataNode) registerMetricsRequest() { func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return node.getSystemInfoMetrics(ctx, req) }) + node.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks, func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return node.syncMgr.TaskStatsJSON(), nil }) + + node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return node.flowgraphManager.GetSegmentsJSON(), nil + }) + + node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return node.flowgraphManager.GetChannelsJSON(), nil + }) log.Info("register metrics actions finished") } diff --git a/internal/datanode/importv2/pool_test.go b/internal/datanode/importv2/pool_test.go index 06873c6d31ae5..4449a5031c812 100644 --- a/internal/datanode/importv2/pool_test.go +++ b/internal/datanode/importv2/pool_test.go @@ -20,9 +20,10 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/stretchr/testify/assert" ) func TestResizePools(t *testing.T) { diff --git a/internal/flushcommon/metacache/segment.go b/internal/flushcommon/metacache/segment.go index 8c4906ff7201e..4bdadda6177ad 100644 --- a/internal/flushcommon/metacache/segment.go +++ b/internal/flushcommon/metacache/segment.go @@ -87,6 +87,14 @@ func (s *SegmentInfo) Level() datapb.SegmentLevel { return s.level } +func (s *SegmentInfo) BufferRows() int64 { + return s.bufferRows +} + +func (s *SegmentInfo) SyncingRows() int64 { + return s.syncingRows +} + func (s *SegmentInfo) Clone() *SegmentInfo { return &SegmentInfo{ segmentID: s.segmentID, diff --git a/internal/flushcommon/pipeline/flow_graph_manager.go b/internal/flushcommon/pipeline/flow_graph_manager.go index 2d8b930442f81..055a7705126b4 100644 --- a/internal/flushcommon/pipeline/flow_graph_manager.go +++ b/internal/flushcommon/pipeline/flow_graph_manager.go @@ -18,6 +18,7 @@ package pipeline import ( "context" + "encoding/json" "fmt" "go.uber.org/zap" @@ -25,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/flushcommon/util" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -40,6 +42,8 @@ type FlowgraphManager interface { GetFlowgraphCount() int GetCollectionIDs() []int64 + GetChannelsJSON() string + GetSegmentsJSON() string Close() } @@ -115,6 +119,59 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 { return collectionSet.Collect() } +// GetChannelsJSON returns all channels in json format. +func (fm *fgManagerImpl) GetChannelsJSON() string { + var channels []*metricsinfo.Channel + fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool { + latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch) + channels = append(channels, &metricsinfo.Channel{ + Name: ch, + WatchState: ds.fg.Status(), + TimeTick: int64(latestTimeTick), + NodeID: paramtable.GetNodeID(), + CollectionID: ds.metacache.Collection(), + }) + return true + }) + + ret, err := json.Marshal(channels) + if err != nil { + log.Warn("failed to marshal channels", zap.Error(err)) + return "" + + } + return string(ret) +} + +func (fm *fgManagerImpl) GetSegmentsJSON() string { + var segments []*metricsinfo.Segment + fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool { + meta := ds.metacache + for _, segment := range meta.GetSegmentsBy() { + segments = append(segments, &metricsinfo.Segment{ + SegmentID: segment.SegmentID(), + CollectionID: meta.Collection(), + PartitionID: segment.PartitionID(), + State: segment.State().String(), + Level: segment.Level().String(), + NodeID: paramtable.GetNodeID(), + NumOfRows: segment.NumOfRows(), + FlushedRows: segment.FlushedRows(), + SyncBufferRows: segment.BufferRows(), + SyncingRows: segment.SyncingRows(), + }) + } + return true + }) + + ret, err := json.Marshal(segments) + if err != nil { + log.Warn("failed to marshal segments", zap.Error(err)) + return "" + } + return string(ret) +} + func (fm *fgManagerImpl) Close() { fm.cancelFunc() } diff --git a/internal/flushcommon/pipeline/flow_graph_manager_test.go b/internal/flushcommon/pipeline/flow_graph_manager_test.go index aaa25cb4f5196..4453a2f9f73ef 100644 --- a/internal/flushcommon/pipeline/flow_graph_manager_test.go +++ b/internal/flushcommon/pipeline/flow_graph_manager_test.go @@ -19,7 +19,6 @@ package pipeline import ( "context" "fmt" - "math/rand" "os" "testing" @@ -30,6 +29,8 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/flushcommon/broker" + "github.com/milvus-io/milvus/internal/flushcommon/metacache" + "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" "github.com/milvus-io/milvus/internal/flushcommon/util" "github.com/milvus-io/milvus/internal/flushcommon/writebuffer" @@ -39,6 +40,7 @@ import ( "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestMain(t *testing.M) { @@ -98,7 +100,7 @@ func TestFlowGraphManager(t *testing.T) { } func generateChannelWatchInfo() *datapb.ChannelWatchInfo { - collectionID := int64(rand.Uint32()) + collectionID := int64(1) dmChannelName := fmt.Sprintf("%s_%d", "fake-ch-", collectionID) schema := &schemapb.CollectionSchema{ Name: fmt.Sprintf("%s_%d", "collection_", collectionID), @@ -124,3 +126,79 @@ func generateChannelWatchInfo() *datapb.ChannelWatchInfo { Schema: schema, } } + +type mockTimeSender struct{} + +func (m *mockTimeSender) Update(channel string, ts typeutil.Timestamp, stats []*commonpb.SegmentStats) { + panic("implement me") +} + +func (m *mockTimeSender) GetLatestTimestamp(channel string) typeutil.Timestamp { + return 0 +} + +func newFlowGraphManager(t *testing.T) (string, FlowgraphManager) { + mockBroker := broker.NewMockBroker(t) + mockBroker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() + mockBroker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() + mockBroker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe() + + wbm := writebuffer.NewMockBufferManager(t) + wbm.EXPECT().Register(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + + dispClient := msgdispatcher.NewMockClient(t) + dispClient.EXPECT().Register(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(make(chan *msgstream.MsgPack), nil) + + pipelineParams := &util.PipelineParams{ + Ctx: context.TODO(), + Session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 0}}, + Broker: mockBroker, + TimeTickSender: &mockTimeSender{}, + DispClient: dispClient, + WriteBufferManager: wbm, + } + + chanWatchInfo := generateChannelWatchInfo() + ds, err := NewDataSyncService( + context.TODO(), + pipelineParams, + chanWatchInfo, + util.NewTickler(), + ) + assert.NoError(t, err) + + fm := NewFlowgraphManager() + fm.AddFlowgraph(ds) + return ds.vchannelName, fm +} + +func TestGetChannelsJSON(t *testing.T) { + paramtable.SetNodeID(1) + _, fm := newFlowGraphManager(t) + jsonResult := fm.GetChannelsJSON() + expectedJSON := `[{"name":"fake-ch-_1","watched_status":"Healthy","node_id":1,"collection_id":1}]` + assert.JSONEq(t, expectedJSON, jsonResult) +} + +func TestGetSegmentJSON(t *testing.T) { + ch, fm := newFlowGraphManager(t) + ds, ok := fm.GetFlowgraphService(ch) + assert.True(t, ok) + pkStatsFactory := func(*datapb.SegmentInfo) pkoracle.PkStat { + return pkoracle.NewBloomFilterSet() + } + segment := &datapb.SegmentInfo{ + ID: 1, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + NumOfRows: 10240, + CollectionID: 100, + InsertChannel: "ch", + } + ds.metacache.AddSegment(segment, pkStatsFactory, metacache.NoneBm25StatsFactory) + jsonResult := fm.GetSegmentsJSON() + fmt.Println(jsonResult) + expectedJSON := `[{"segment_id":1,"collection_id":1,"partition_id":10,"num_of_rows":10240,"state":"Flushed","level":"L1","node_id":1,"flushed_rows":10240}]` + assert.JSONEq(t, expectedJSON, jsonResult) +} diff --git a/internal/flushcommon/pipeline/mock_fgmanager.go b/internal/flushcommon/pipeline/mock_fgmanager.go index 6945e21ff271f..cf8cd6b2aa1ca 100644 --- a/internal/flushcommon/pipeline/mock_fgmanager.go +++ b/internal/flushcommon/pipeline/mock_fgmanager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package pipeline @@ -114,10 +114,59 @@ func (_c *MockFlowgraphManager_Close_Call) RunAndReturn(run func()) *MockFlowgra return _c } +// GetChannelsJSON provides a mock function with given fields: +func (_m *MockFlowgraphManager) GetChannelsJSON() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetChannelsJSON") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockFlowgraphManager_GetChannelsJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelsJSON' +type MockFlowgraphManager_GetChannelsJSON_Call struct { + *mock.Call +} + +// GetChannelsJSON is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) GetChannelsJSON() *MockFlowgraphManager_GetChannelsJSON_Call { + return &MockFlowgraphManager_GetChannelsJSON_Call{Call: _e.mock.On("GetChannelsJSON")} +} + +func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Run(run func()) *MockFlowgraphManager_GetChannelsJSON_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_GetChannelsJSON_Call) Return(_a0 string) *MockFlowgraphManager_GetChannelsJSON_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlowgraphManager_GetChannelsJSON_Call) RunAndReturn(run func() string) *MockFlowgraphManager_GetChannelsJSON_Call { + _c.Call.Return(run) + return _c +} + // GetCollectionIDs provides a mock function with given fields: func (_m *MockFlowgraphManager) GetCollectionIDs() []int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetCollectionIDs") + } + var r0 []int64 if rf, ok := ret.Get(0).(func() []int64); ok { r0 = rf() @@ -161,6 +210,10 @@ func (_c *MockFlowgraphManager_GetCollectionIDs_Call) RunAndReturn(run func() [] func (_m *MockFlowgraphManager) GetFlowgraphCount() int { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetFlowgraphCount") + } + var r0 int if rf, ok := ret.Get(0).(func() int); ok { r0 = rf() @@ -202,6 +255,10 @@ func (_c *MockFlowgraphManager_GetFlowgraphCount_Call) RunAndReturn(run func() i func (_m *MockFlowgraphManager) GetFlowgraphService(channel string) (*DataSyncService, bool) { ret := _m.Called(channel) + if len(ret) == 0 { + panic("no return value specified for GetFlowgraphService") + } + var r0 *DataSyncService var r1 bool if rf, ok := ret.Get(0).(func(string) (*DataSyncService, bool)); ok { @@ -252,10 +309,59 @@ func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(s return _c } +// GetSegmentsJSON provides a mock function with given fields: +func (_m *MockFlowgraphManager) GetSegmentsJSON() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetSegmentsJSON") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockFlowgraphManager_GetSegmentsJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentsJSON' +type MockFlowgraphManager_GetSegmentsJSON_Call struct { + *mock.Call +} + +// GetSegmentsJSON is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) GetSegmentsJSON() *MockFlowgraphManager_GetSegmentsJSON_Call { + return &MockFlowgraphManager_GetSegmentsJSON_Call{Call: _e.mock.On("GetSegmentsJSON")} +} + +func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Run(run func()) *MockFlowgraphManager_GetSegmentsJSON_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) Return(_a0 string) *MockFlowgraphManager_GetSegmentsJSON_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlowgraphManager_GetSegmentsJSON_Call) RunAndReturn(run func() string) *MockFlowgraphManager_GetSegmentsJSON_Call { + _c.Call.Return(run) + return _c +} + // HasFlowgraph provides a mock function with given fields: channel func (_m *MockFlowgraphManager) HasFlowgraph(channel string) bool { ret := _m.Called(channel) + if len(ret) == 0 { + panic("no return value specified for HasFlowgraph") + } + var r0 bool if rf, ok := ret.Get(0).(func(string) bool); ok { r0 = rf(channel) @@ -298,6 +404,10 @@ func (_c *MockFlowgraphManager_HasFlowgraph_Call) RunAndReturn(run func(string) func (_m *MockFlowgraphManager) HasFlowgraphWithOpID(channel string, opID int64) bool { ret := _m.Called(channel, opID) + if len(ret) == 0 { + panic("no return value specified for HasFlowgraphWithOpID") + } + var r0 bool if rf, ok := ret.Get(0).(func(string, int64) bool); ok { r0 = rf(channel, opID) diff --git a/internal/flushcommon/util/timetick_sender.go b/internal/flushcommon/util/timetick_sender.go index 25e889b5a2e4c..d18e4644338c0 100644 --- a/internal/flushcommon/util/timetick_sender.go +++ b/internal/flushcommon/util/timetick_sender.go @@ -36,6 +36,7 @@ import ( type StatsUpdater interface { Update(channel string, ts typeutil.Timestamp, stats []*commonpb.SegmentStats) + GetLatestTimestamp(channel string) typeutil.Timestamp } // TimeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically @@ -126,6 +127,17 @@ func (m *TimeTickSender) Update(channelName string, timestamp uint64, segStats [ m.statsCache[channelName].lastTs = timestamp } +func (m *TimeTickSender) GetLatestTimestamp(channel string) typeutil.Timestamp { + m.mu.RLock() + defer m.mu.RUnlock() + chStats, ok := m.statsCache[channel] + if !ok { + log.Warn("channel not found in TimeTickSender", zap.String("channel", channel)) + return 0 + } + return chStats.lastTs +} + func (m *TimeTickSender) assembleDatanodeTtMsg() ([]*msgpb.DataNodeTtMsg, map[string]uint64) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/internal/http/router.go b/internal/http/router.go index 4d6acec3f7adc..1989cbae9a02f 100644 --- a/internal/http/router.go +++ b/internal/http/router.go @@ -65,19 +65,46 @@ const ( // for WebUI restful api root path const ( - ClusterInfoPath = "/_cluster/info" - ClusterConfigsPath = "/_cluster/configs" - ClusterClientsPath = "/_cluster/clients" + // ClusterInfoPath is the path to get cluster information. + ClusterInfoPath = "/_cluster/info" + // ClusterConfigsPath is the path to get cluster configurations. + ClusterConfigsPath = "/_cluster/configs" + // ClusterClientsPath is the path to get connected clients. + ClusterClientsPath = "/_cluster/clients" + // ClusterDependenciesPath is the path to get cluster dependencies. ClusterDependenciesPath = "/_cluster/dependencies" - HookConfigsPath = "/_hook/configs" - QCoordSegmentsPath = "/_qcoord/segments" - QCoordChannelsPath = "/_qcoord/channels" - QCoordAllTasksPath = "/_qcoord/tasks/all" + // HookConfigsPath is the path to get hook configurations. + HookConfigsPath = "/_hook/configs" - DCoordAllTasksPath = "/_dcoord/tasks/all" - DCoordImportTasksPath = "/_dcoord/tasks/import" - DCoordCompactionTasksPath = "/_dcoord/tasks/compaction" - DCoordBuildIndexTasksPath = "/_dcoord/tasks/build_index" + // QCDistPath is the path to get QueryCoord distribution. + QCDistPath = "/_qc/dist" + // QCTargetPath is the path to get QueryCoord target. + QCTargetPath = "/_qc/target" + // QCReplicaPath is the path to get QueryCoord replica. + QCReplicaPath = "/_qc/replica" + // QCResourceGroupPath is the path to get QueryCoord resource group. + QCResourceGroupPath = "/_qc/resource_group" + // QCAllTasksPath is the path to get all tasks in QueryCoord. + QCAllTasksPath = "/_qc/tasks/all" - DNodeSyncTasksPath = "/_dnode/tasks/sync" + // QNSegmentsPath is the path to get segments in QueryNode. + QNSegmentsPath = "/_qn/segments" + // QNChannelsPath is the path to get channels in QueryNode. + QNChannelsPath = "/_qn/channels" + + // DCAllTasksPath is the path to get all tasks in DataCoord. + DCAllTasksPath = "/_dc/tasks/all" + // DCImportTasksPath is the path to get import tasks in DataCoord. + DCImportTasksPath = "/_dc/tasks/import" + // DCCompactionTasksPath is the path to get compaction tasks in DataCoord. + DCCompactionTasksPath = "/_dc/tasks/compaction" + // DCBuildIndexTasksPath is the path to get build index tasks in DataCoord. + DCBuildIndexTasksPath = "/_dc/tasks/build_index" + + // DNSyncTasksPath is the path to get sync tasks in DataNode. + DNSyncTasksPath = "/_dn/tasks/sync" + // DNSegmentsPath is the path to get segments in DataNode. + DNSegmentsPath = "/_dn/segments" + // DNChannelsPath is the path to get channels in DataNode. + DNChannelsPath = "/_dn/channels" ) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index a1632b736d021..15a0353b6eb7c 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -6503,26 +6503,34 @@ func DeregisterSubLabel(subLabel string) { // RegisterRestRouter registers the router for the proxy func (node *Proxy) RegisterRestRouter(router gin.IRouter) { - // Cluster request + // Cluster request that executed by proxy router.GET(http.ClusterInfoPath, getClusterInfo(node)) router.GET(http.ClusterConfigsPath, getConfigs(paramtable.Get().GetAll())) router.GET(http.ClusterClientsPath, getConnectedClients) router.GET(http.ClusterDependenciesPath, getDependencies) - // Hook request + // Hook request that executed by proxy router.GET(http.HookConfigsPath, getConfigs(paramtable.GetHookParams().GetAll())) - // QueryCoord request - router.GET(http.QCoordSegmentsPath, getQueryComponentMetrics(node, metricsinfo.QuerySegmentDist)) - router.GET(http.QCoordChannelsPath, getQueryComponentMetrics(node, metricsinfo.QueryChannelDist)) - router.GET(http.QCoordAllTasksPath, getQueryComponentMetrics(node, metricsinfo.QueryCoordAllTasks)) - - // DataCoord request - router.GET(http.DCoordAllTasksPath, getDataComponentMetrics(node, metricsinfo.DataCoordAllTasks)) - router.GET(http.DCoordCompactionTasksPath, getDataComponentMetrics(node, metricsinfo.CompactionTasks)) - router.GET(http.DCoordImportTasksPath, getDataComponentMetrics(node, metricsinfo.ImportTasks)) - router.GET(http.DCoordBuildIndexTasksPath, getDataComponentMetrics(node, metricsinfo.BuildIndexTasks)) - - // Datanode request - router.GET(http.DNodeSyncTasksPath, getDataComponentMetrics(node, metricsinfo.SyncTasks)) + // QueryCoord requests that are forwarded from proxy + router.GET(http.QCTargetPath, getQueryComponentMetrics(node, metricsinfo.QueryTarget)) + router.GET(http.QCDistPath, getQueryComponentMetrics(node, metricsinfo.QueryDist)) + router.GET(http.QCReplicaPath, getQueryComponentMetrics(node, metricsinfo.QueryReplicas)) + router.GET(http.QCResourceGroupPath, getQueryComponentMetrics(node, metricsinfo.QueryResourceGroups)) + router.GET(http.QCAllTasksPath, getQueryComponentMetrics(node, metricsinfo.QueryCoordAllTasks)) + + // QueryNode requests that are forwarded from querycoord + router.GET(http.QNSegmentsPath, getQueryComponentMetrics(node, metricsinfo.QuerySegments)) + router.GET(http.QNChannelsPath, getQueryComponentMetrics(node, metricsinfo.QueryChannels)) + + // DataCoord requests that are forwarded from proxy + router.GET(http.DCAllTasksPath, getDataComponentMetrics(node, metricsinfo.DataCoordAllTasks)) + router.GET(http.DCCompactionTasksPath, getDataComponentMetrics(node, metricsinfo.CompactionTasks)) + router.GET(http.DCImportTasksPath, getDataComponentMetrics(node, metricsinfo.ImportTasks)) + router.GET(http.DCBuildIndexTasksPath, getDataComponentMetrics(node, metricsinfo.BuildIndexTasks)) + + // Datanode requests that are forwarded from datacoord + router.GET(http.DNSyncTasksPath, getDataComponentMetrics(node, metricsinfo.SyncTasks)) + router.GET(http.DNSegmentsPath, getDataComponentMetrics(node, metricsinfo.DataSegments)) + router.GET(http.DNChannelsPath, getDataComponentMetrics(node, metricsinfo.DataChannels)) } diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index 492841be742f6..4171482f8f06b 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -1880,14 +1880,14 @@ func TestRegisterRestRouter(t *testing.T) { path string statusCode int }{ - {path: mhttp.QCoordSegmentsPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.QCoordChannelsPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.QCoordAllTasksPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.DNodeSyncTasksPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.DCoordCompactionTasksPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.DCoordImportTasksPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.DCoordBuildIndexTasksPath, statusCode: http.StatusInternalServerError}, - {path: mhttp.DNodeSyncTasksPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.QCTargetPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.QCDistPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.QCAllTasksPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.DNSyncTasksPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.DCCompactionTasksPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.DCImportTasksPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.DCBuildIndexTasksPath, statusCode: http.StatusInternalServerError}, + {path: mhttp.DNSyncTasksPath, statusCode: http.StatusInternalServerError}, } for _, tt := range tests { diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 7814e214daa7b..9ea75c24b7a7e 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -18,6 +18,7 @@ package querycoordv2 import ( "context" + "encoding/json" "sync" "time" @@ -34,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -242,6 +244,44 @@ func (s *Server) balanceChannels(ctx context.Context, return nil } +func getMetrics[T any](s *Server, ctx context.Context, req *milvuspb.GetMetricsRequest) ([]T, error) { + metrics := make([]T, 0) + for _, node := range s.nodeMgr.GetAll() { + resp, err := s.cluster.GetMetrics(ctx, node.ID(), req) + if err := merr.CheckRPCCall(resp, err); err != nil { + log.Warn("failed to get metric from QueryNode", zap.Int64("nodeID", node.ID())) + return nil, err + } + + infos := make([]T, 0) + err = json.Unmarshal([]byte(resp.Response), &infos) + if err != nil { + log.Warn("invalid metrics of query node was found", zap.Error(err)) + return nil, err + } + metrics = append(metrics, infos...) + } + return metrics, nil +} + +func (s *Server) getChannelsFromQueryNode(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { + channels, err := getMetrics[*metricsinfo.Channel](s, ctx, req) + ret, err := json.Marshal(channels) + if err != nil { + return "", err + } + return string(ret), nil +} + +func (s *Server) getSegmentsFromQueryNode(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { + segments, err := getMetrics[*metricsinfo.Segment](s, ctx, req) + ret, err := json.Marshal(segments) + if err != nil { + return "", err + } + return string(ret), nil +} + // TODO(dragondriver): add more detail metrics func (s *Server) getSystemInfoMetrics( ctx context.Context, diff --git a/internal/querycoordv2/handlers_test.go b/internal/querycoordv2/handlers_test.go new file mode 100644 index 0000000000000..0aa9619aec2e2 --- /dev/null +++ b/internal/querycoordv2/handlers_test.go @@ -0,0 +1,108 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package querycoordv2 + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" +) + +func TestGetChannelsFromQueryNode(t *testing.T) { + mockCluster := session.NewMockCluster(t) + nodeManager := session.NewNodeManager() + nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1})) + server := &Server{cluster: mockCluster, nodeMgr: nodeManager} + req := &milvuspb.GetMetricsRequest{} + expectedChannels := []*metricsinfo.Channel{ + { + Name: "channel1", + WatchState: "Healthy", + TimeTick: int64(1), + NodeID: int64(1), + CollectionID: int64(100), + }, + { + Name: "channel2", + WatchState: "Healthy", + TimeTick: int64(1), + NodeID: int64(2), + CollectionID: int64(200), + }, + } + resp := &milvuspb.GetMetricsResponse{ + Response: func() string { + data, _ := json.Marshal(expectedChannels) + return string(data) + }(), + } + mockCluster.EXPECT().GetMetrics(mock.Anything, mock.Anything, req).Return(resp, nil) + result, err := server.getChannelsFromQueryNode(context.Background(), req) + assert.NoError(t, err) + + var actualChannels []*metricsinfo.Channel + err = json.Unmarshal([]byte(result), &actualChannels) + assert.NoError(t, err) + assert.Equal(t, expectedChannels, actualChannels) +} + +func TestGetSegmentsFromQueryNode(t *testing.T) { + mockCluster := session.NewMockCluster(t) + nodeManager := session.NewNodeManager() + nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1})) + server := &Server{cluster: mockCluster, nodeMgr: nodeManager} + expectedSegments := []*metricsinfo.Segment{ + { + SegmentID: 1, + PartitionID: 1, + Channel: "channel1", + ResourceGroup: "default", + MemSize: int64(1024), + LoadedInsertRowCount: 100, + }, + { + SegmentID: 2, + PartitionID: 1, + Channel: "channel2", + ResourceGroup: "default", + MemSize: int64(1024), + LoadedInsertRowCount: 200, + }, + } + resp := &milvuspb.GetMetricsResponse{ + Response: func() string { + data, _ := json.Marshal(expectedSegments) + return string(data) + }(), + } + req := &milvuspb.GetMetricsRequest{} + mockCluster.EXPECT().GetMetrics(mock.Anything, mock.Anything, req).Return(resp, nil) + + result, err := server.getSegmentsFromQueryNode(context.Background(), req) + assert.NoError(t, err) + + var actualSegments []*metricsinfo.Segment + err = json.Unmarshal([]byte(result), &actualSegments) + assert.NoError(t, err) + assert.Equal(t, expectedSegments, actualSegments) +} diff --git a/internal/querycoordv2/meta/channel_dist_manager.go b/internal/querycoordv2/meta/channel_dist_manager.go index 9ecd29d06efe5..db3dc8720100a 100644 --- a/internal/querycoordv2/meta/channel_dist_manager.go +++ b/internal/querycoordv2/meta/channel_dist_manager.go @@ -23,6 +23,8 @@ import ( "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -130,6 +132,13 @@ func (channel *DmChannel) Clone() *DmChannel { } } +func newDmChannelMetricsFrom(channel *DmChannel) *metricsinfo.DmChannel { + dmChannel := metrics.NewDMChannelFrom(channel.VchannelInfo) + dmChannel.NodeID = channel.Node + dmChannel.Version = channel.Version + return dmChannel +} + type nodeChannels struct { channels []*DmChannel // collection id => channels @@ -290,3 +299,16 @@ func (m *ChannelDistManager) updateCollectionIndex() { } } } + +func (m *ChannelDistManager) GetChannelDist() []*metricsinfo.DmChannel { + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() + + var channels []*metricsinfo.DmChannel + for _, nodeChannels := range m.channels { + for _, channel := range nodeChannels.channels { + channels = append(channels, newDmChannelMetricsFrom(channel)) + } + } + return channels +} diff --git a/internal/querycoordv2/meta/channel_dist_manager_test.go b/internal/querycoordv2/meta/channel_dist_manager_test.go index 4960aae25adeb..18ffe9dccb29f 100644 --- a/internal/querycoordv2/meta/channel_dist_manager_test.go +++ b/internal/querycoordv2/meta/channel_dist_manager_test.go @@ -19,6 +19,7 @@ package meta import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -185,3 +186,32 @@ func (suite *ChannelDistManagerSuite) AssertCollection(channels []*DmChannel, co func TestChannelDistManager(t *testing.T) { suite.Run(t, new(ChannelDistManagerSuite)) } + +func TestGetChannelDistJSON(t *testing.T) { + manager := NewChannelDistManager() + channel1 := DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: 100, + ChannelName: "channel-1", + }) + channel1.Node = 1 + channel1.Version = 1 + + channel2 := DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: 200, + ChannelName: "channel-2", + }) + channel2.Node = 2 + channel2.Version = 1 + + manager.Update(1, channel1) + manager.Update(2, channel2) + + channels := manager.GetChannelDist() + assert.Equal(t, int64(1), channels[0].NodeID) + assert.Equal(t, "channel-1", channels[0].ChannelName) + assert.Equal(t, int64(100), channels[0].CollectionID) + + assert.Equal(t, int64(2), channels[1].NodeID) + assert.Equal(t, "channel-2", channels[1].ChannelName) + assert.Equal(t, int64(200), channels[1].CollectionID) +} diff --git a/internal/querycoordv2/meta/dist_manager.go b/internal/querycoordv2/meta/dist_manager.go index 39aca9551abdd..512a926da30be 100644 --- a/internal/querycoordv2/meta/dist_manager.go +++ b/internal/querycoordv2/meta/dist_manager.go @@ -16,6 +16,15 @@ package meta +import ( + "encoding/json" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" +) + type DistributionManager struct { *SegmentDistManager *ChannelDistManager @@ -29,3 +38,30 @@ func NewDistributionManager() *DistributionManager { LeaderViewManager: NewLeaderViewManager(), } } + +// GetDistributionJSON returns a JSON representation of the current distribution state. +// It includes segments, DM channels, and leader views. +// If there are no segments, channels, or leader views, it returns an empty string. +// In case of an error during JSON marshaling, it returns the error. +func (dm *DistributionManager) GetDistributionJSON() string { + segments := dm.GetSegmentDist() + channels := dm.GetChannelDist() + leaderView := dm.GetLeaderView() + + if len(segments) == 0 && len(channels) == 0 && len(leaderView) == 0 { + return "" + } + + dist := &metricsinfo.QueryCoordCollectionDistribution{ + Segments: segments, + DMChannels: channels, + LeaderViews: leaderView, + } + + v, err := json.Marshal(dist) + if err != nil { + log.Warn("failed to marshal dist", zap.Error(err)) + return "" + } + return string(v) +} diff --git a/internal/querycoordv2/meta/dist_manager_test.go b/internal/querycoordv2/meta/dist_manager_test.go new file mode 100644 index 0000000000000..8c6f45bf98287 --- /dev/null +++ b/internal/querycoordv2/meta/dist_manager_test.go @@ -0,0 +1,100 @@ +package meta + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" +) + +func TestGetDistributionJSON(t *testing.T) { + // Initialize DistributionManager + manager := NewDistributionManager() + + // Add some segments to the SegmentDistManager + segment1 := SegmentFromInfo(&datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "channel-1", + NumOfRows: 1000, + State: commonpb.SegmentState_Flushed, + }) + segment1.Node = 1 + segment1.Version = 1 + + segment2 := SegmentFromInfo(&datapb.SegmentInfo{ + ID: 2, + CollectionID: 200, + PartitionID: 20, + InsertChannel: "channel-2", + NumOfRows: 2000, + State: commonpb.SegmentState_Flushed, + }) + segment2.Node = 2 + segment2.Version = 1 + + manager.SegmentDistManager.Update(1, segment1) + manager.SegmentDistManager.Update(2, segment2) + + // Add some channels to the ChannelDistManager + channel1 := DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: 100, + ChannelName: "channel-1", + }) + channel1.Node = 1 + channel1.Version = 1 + + channel2 := DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: 200, + ChannelName: "channel-2", + }) + channel2.Node = 2 + channel2.Version = 1 + + manager.ChannelDistManager.Update(1, channel1) + manager.ChannelDistManager.Update(2, channel2) + + // Add some leader views to the LeaderViewManager + leaderView1 := &LeaderView{ + ID: 1, + CollectionID: 100, + Channel: "channel-1", + Version: 1, + Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1}}, + } + + leaderView2 := &LeaderView{ + ID: 2, + CollectionID: 200, + Channel: "channel-2", + Version: 1, + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + } + + manager.LeaderViewManager.Update(1, leaderView1) + manager.LeaderViewManager.Update(2, leaderView2) + + // Call GetDistributionJSON + jsonOutput := manager.GetDistributionJSON() + + // Verify JSON output + var dist metricsinfo.QueryCoordCollectionDistribution + err := json.Unmarshal([]byte(jsonOutput), &dist) + assert.NoError(t, err) + assert.Len(t, dist.Segments, 2) + assert.Len(t, dist.DMChannels, 2) + assert.Len(t, dist.LeaderViews, 2) + + assert.Equal(t, int64(1), dist.Segments[0].SegmentID) + assert.Equal(t, int64(2), dist.Segments[1].SegmentID) + assert.Equal(t, "channel-1", dist.DMChannels[0].ChannelName) + assert.Equal(t, "channel-2", dist.DMChannels[1].ChannelName) + assert.Equal(t, int64(1), dist.LeaderViews[0].LeaderID) + assert.Equal(t, int64(2), dist.LeaderViews[1].LeaderID) +} diff --git a/internal/querycoordv2/meta/leader_view_manager.go b/internal/querycoordv2/meta/leader_view_manager.go index 963e115b69506..8e06ccc80ed05 100644 --- a/internal/querycoordv2/meta/leader_view_manager.go +++ b/internal/querycoordv2/meta/leader_view_manager.go @@ -22,6 +22,7 @@ import ( "github.com/samber/lo" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -308,3 +309,46 @@ func (mgr *LeaderViewManager) GetLatestShardLeaderByFilter(filters ...LeaderView return v1.Version > v2.Version }) } + +// GetLeaderView returns a slice of LeaderView objects, each representing the state of a leader node. +// It traverses the views map, converts each LeaderView to a metricsinfo.LeaderView, and collects them into a slice. +// The method locks the views map for reading to ensure thread safety. +func (mgr *LeaderViewManager) GetLeaderView() []*metricsinfo.LeaderView { + mgr.rwmutex.RLock() + defer mgr.rwmutex.RUnlock() + + var leaderViews []*metricsinfo.LeaderView + for _, nodeViews := range mgr.views { + for _, lv := range nodeViews.views { + errString := "" + if lv.UnServiceableError != nil { + errString = lv.UnServiceableError.Error() + } + leaderView := &metricsinfo.LeaderView{ + LeaderID: lv.ID, + CollectionID: lv.CollectionID, + Channel: lv.Channel, + Version: lv.Version, + SealedSegments: make([]*metricsinfo.Segment, 0, len(lv.Segments)), + GrowingSegments: make([]*metricsinfo.Segment, 0, len(lv.GrowingSegments)), + TargetVersion: lv.TargetVersion, + NumOfGrowingRows: lv.NumOfGrowingRows, + UnServiceableError: errString, + } + + for segID, seg := range lv.Segments { + leaderView.SealedSegments = append(leaderView.SealedSegments, &metricsinfo.Segment{ + SegmentID: segID, + NodeID: seg.NodeID, + }) + } + + for _, seg := range lv.GrowingSegments { + leaderView.GrowingSegments = append(leaderView.GrowingSegments, newSegmentMetricsFrom(seg)) + } + + leaderViews = append(leaderViews, leaderView) + } + } + return leaderViews +} diff --git a/internal/querycoordv2/meta/leader_view_manager_test.go b/internal/querycoordv2/meta/leader_view_manager_test.go index b25e245e20946..56374ecd23ea2 100644 --- a/internal/querycoordv2/meta/leader_view_manager_test.go +++ b/internal/querycoordv2/meta/leader_view_manager_test.go @@ -17,13 +17,19 @@ package meta import ( + "encoding/json" "testing" "github.com/cockroachdb/errors" "github.com/samber/lo" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "golang.org/x/exp/slices" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -317,3 +323,69 @@ func (suite *LeaderViewManagerSuite) TestNotifyDelegatorChanges() { func TestLeaderViewManager(t *testing.T) { suite.Run(t, new(LeaderViewManagerSuite)) } + +func TestGetLeaderView(t *testing.T) { + manager := NewLeaderViewManager() + leaderView1 := &LeaderView{ + ID: 1, + CollectionID: 100, + Channel: "channel-1", + Version: 1, + Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1}}, + GrowingSegments: map[int64]*Segment{ + 1: {SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 100, PartitionID: 10, InsertChannel: "channel-1", NumOfRows: 1000, State: commonpb.SegmentState_Growing}, Node: 1}, + }, + TargetVersion: 1, + NumOfGrowingRows: 1000, + UnServiceableError: nil, + } + + leaderView2 := &LeaderView{ + ID: 2, + CollectionID: 200, + Channel: "channel-2", + Version: 1, + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + GrowingSegments: map[int64]*Segment{ + 2: {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 200, PartitionID: 20, InsertChannel: "channel-2", NumOfRows: 2000, State: commonpb.SegmentState_Growing}, Node: 2}, + }, + TargetVersion: 1, + NumOfGrowingRows: 2000, + UnServiceableError: nil, + } + + manager.Update(1, leaderView1) + manager.Update(2, leaderView2) + + // Call GetLeaderView + leaderViews := manager.GetLeaderView() + jsonOutput, err := json.Marshal(leaderViews) + assert.NoError(t, err) + + var result []*metricsinfo.LeaderView + err = json.Unmarshal(jsonOutput, &result) + assert.NoError(t, err) + assert.Len(t, result, 2) + + // sort slice for verify results + slices.SortFunc(leaderViews, func(a, b *metricsinfo.LeaderView) int { + return int(a.LeaderID - b.LeaderID) + }) + assert.Equal(t, int64(1), result[0].LeaderID) + assert.Equal(t, int64(100), result[0].CollectionID) + assert.Equal(t, "channel-1", result[0].Channel) + assert.Equal(t, int64(1), result[0].Version) + assert.Len(t, result[0].SealedSegments, 1) + assert.Len(t, result[0].GrowingSegments, 1) + assert.Equal(t, int64(1), result[0].SealedSegments[0].SegmentID) + assert.Equal(t, int64(1), result[0].GrowingSegments[0].SegmentID) + + assert.Equal(t, int64(2), result[1].LeaderID) + assert.Equal(t, int64(200), result[1].CollectionID) + assert.Equal(t, "channel-2", result[1].Channel) + assert.Equal(t, int64(1), result[1].Version) + assert.Len(t, result[1].SealedSegments, 1) + assert.Len(t, result[1].GrowingSegments, 1) + assert.Equal(t, int64(2), result[1].SealedSegments[0].SegmentID) + assert.Equal(t, int64(2), result[1].GrowingSegments[0].SegmentID) +} diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go index b396afb33f2af..9968d495fe3ab 100644 --- a/internal/querycoordv2/meta/mock_target_manager.go +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -565,6 +565,52 @@ func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) RunAndReturn(run return _c } +// GetTargetJSON provides a mock function with given fields: scope +func (_m *MockTargetManager) GetTargetJSON(scope int32) string { + ret := _m.Called(scope) + + if len(ret) == 0 { + panic("no return value specified for GetTargetJSON") + } + + var r0 string + if rf, ok := ret.Get(0).(func(int32) string); ok { + r0 = rf(scope) + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockTargetManager_GetTargetJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTargetJSON' +type MockTargetManager_GetTargetJSON_Call struct { + *mock.Call +} + +// GetTargetJSON is a helper method to define mock.On call +// - scope int32 +func (_e *MockTargetManager_Expecter) GetTargetJSON(scope interface{}) *MockTargetManager_GetTargetJSON_Call { + return &MockTargetManager_GetTargetJSON_Call{Call: _e.mock.On("GetTargetJSON", scope)} +} + +func (_c *MockTargetManager_GetTargetJSON_Call) Run(run func(scope int32)) *MockTargetManager_GetTargetJSON_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetTargetJSON_Call) Return(_a0 string) *MockTargetManager_GetTargetJSON_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_GetTargetJSON_Call) RunAndReturn(run func(int32) string) *MockTargetManager_GetTargetJSON_Call { + _c.Call.Return(run) + return _c +} + // IsCurrentTargetExist provides a mock function with given fields: collectionID, partitionID func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool { ret := _m.Called(collectionID, partitionID) diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index f59bc39cf3ed1..e7731ad8ebb0e 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -17,6 +17,7 @@ package meta import ( + "encoding/json" "fmt" "sync" @@ -28,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -466,3 +468,33 @@ func (m *ReplicaManager) GetResourceGroupByCollection(collection typeutil.Unique ret := typeutil.NewSet(lo.Map(replicas, func(r *Replica, _ int) string { return r.GetResourceGroup() })...) return ret } + +// GetReplicasJSON returns a JSON representation of all replicas managed by the ReplicaManager. +// It locks the ReplicaManager for reading, converts the replicas to their protobuf representation, +// marshals them into a JSON string, and returns the result. +// If an error occurs during marshaling, it logs a warning and returns an empty string. +func (m *ReplicaManager) GetReplicasJSON() string { + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() + + replicas := lo.MapValues(m.replicas, func(r *Replica, i typeutil.UniqueID) *metricsinfo.Replica { + channelTowRWNodes := make(map[string][]int64) + for k, v := range r.replicaPB.GetChannelNodeInfos() { + channelTowRWNodes[k] = v.GetRwNodes() + } + return &metricsinfo.Replica{ + ID: r.GetID(), + CollectionID: r.GetCollectionID(), + RWNodes: r.GetNodes(), + ResourceGroup: r.GetResourceGroup(), + RONodes: r.GetRONodes(), + ChannelToRWNodes: channelTowRWNodes, + } + }) + ret, err := json.Marshal(replicas) + if err != nil { + log.Warn("failed to marshal replicas", zap.Error(err)) + return "" + } + return string(ret) +} diff --git a/internal/querycoordv2/meta/replica_manager_test.go b/internal/querycoordv2/meta/replica_manager_test.go index 55d3c471a12c6..fc70c8115533a 100644 --- a/internal/querycoordv2/meta/replica_manager_test.go +++ b/internal/querycoordv2/meta/replica_manager_test.go @@ -17,9 +17,12 @@ package meta import ( + "encoding/json" "testing" "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "google.golang.org/protobuf/proto" @@ -27,10 +30,12 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" + "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/querypb" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -494,3 +499,43 @@ func TestReplicaManager(t *testing.T) { suite.Run(t, new(ReplicaManagerSuite)) suite.Run(t, new(ReplicaManagerV2Suite)) } + +func TestGetReplicasJSON(t *testing.T) { + catalog := mocks.NewQueryCoordCatalog(t) + catalog.EXPECT().SaveReplica(mock.Anything).Return(nil) + idAllocator := RandomIncrementIDAllocator() + replicaManager := NewReplicaManager(idAllocator, catalog) + + // Add some replicas to the ReplicaManager + replica1 := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 100, + ResourceGroup: "rg1", + Nodes: []int64{1, 2, 3}, + }) + replica2 := newReplica(&querypb.Replica{ + ID: 2, + CollectionID: 200, + ResourceGroup: "rg2", + Nodes: []int64{4, 5, 6}, + }) + + err := replicaManager.put(replica1) + err = replicaManager.put(replica2) + assert.NoError(t, err) + + jsonOutput := replicaManager.GetReplicasJSON() + var replicas map[int64]*metricsinfo.Replica + err = json.Unmarshal([]byte(jsonOutput), &replicas) + assert.NoError(t, err) + assert.Len(t, replicas, 2) + + assert.Equal(t, int64(1), replicas[1].ID) + assert.Equal(t, int64(2), replicas[2].ID) + assert.Equal(t, int64(100), replicas[1].CollectionID) + assert.Equal(t, int64(200), replicas[2].CollectionID) + assert.Equal(t, "rg1", replicas[1].ResourceGroup) + assert.Equal(t, "rg2", replicas[2].ResourceGroup) + assert.ElementsMatch(t, []int64{1, 2, 3}, replicas[1].RWNodes) + assert.ElementsMatch(t, []int64{4, 5, 6}, replicas[2].RWNodes) +} diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index 060f287bc1689..7a41379c1a958 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -17,6 +17,7 @@ package meta import ( + "encoding/json" "fmt" "sync" @@ -31,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -918,3 +920,23 @@ func (rm *ResourceManager) validateResourceGroupIsDeletable(rgName string) error } return nil } + +func (rm *ResourceManager) GetResourceGroupsJSON() string { + rm.rwmutex.RLock() + defer rm.rwmutex.RUnlock() + + rgs := lo.MapValues(rm.groups, func(r *ResourceGroup, i string) *metricsinfo.ResourceGroup { + return &metricsinfo.ResourceGroup{ + Name: r.GetName(), + Nodes: r.GetNodes(), + Cfg: r.GetConfig(), + } + }) + ret, err := json.Marshal(rgs) + if err != nil { + log.Error("failed to marshal resource groups", zap.Error(err)) + return "" + } + + return string(ret) +} diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index ca58a8e899e23..16de7c76d4e1e 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -16,8 +16,10 @@ package meta import ( + "encoding/json" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -30,7 +32,9 @@ import ( "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type ResourceManagerSuite struct { @@ -619,3 +623,24 @@ func (suite *ResourceManagerSuite) TestUnassignFail() { suite.manager.HandleNodeDown(1) }) } + +func TestGetResourceGroupsJSON(t *testing.T) { + manager := &ResourceManager{groups: make(map[string]*ResourceGroup)} + rg1 := NewResourceGroup("rg1", newResourceGroupConfig(0, 10)) + rg1.nodes = typeutil.NewUniqueSet(1, 2) + rg2 := NewResourceGroup("rg2", newResourceGroupConfig(0, 20)) + rg2.nodes = typeutil.NewUniqueSet(3, 4) + manager.groups["rg1"] = rg1 + manager.groups["rg2"] = rg2 + + jsonOutput := manager.GetResourceGroupsJSON() + var resourceGroups map[string]*metricsinfo.ResourceGroup + err := json.Unmarshal([]byte(jsonOutput), &resourceGroups) + assert.NoError(t, err) + assert.Len(t, resourceGroups, 2) + + assert.Equal(t, "rg1", resourceGroups["rg1"].Name) + assert.ElementsMatch(t, []int64{1, 2}, resourceGroups["rg1"].Nodes) + assert.Equal(t, "rg2", resourceGroups["rg2"].Name) + assert.ElementsMatch(t, []int64{3, 4}, resourceGroups["rg2"].Nodes) +} diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index 51d38fc0fcafe..81b41523b1bce 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -19,11 +19,13 @@ package meta import ( "sync" + "github.com/golang/protobuf/proto" "github.com/samber/lo" - "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -130,6 +132,21 @@ func SegmentFromInfo(info *datapb.SegmentInfo) *Segment { } } +func newSegmentMetricsFrom(segment *Segment) *metricsinfo.Segment { + convertedSegment := metrics.NewSegmentFrom(segment.SegmentInfo) + convertedSegment.NodeID = segment.Node + convertedSegment.LoadedTimestamp = segment.Version + convertedSegment.Index = lo.Map(lo.Values(segment.IndexInfo), func(e *querypb.FieldIndexInfo, i int) *metricsinfo.SegmentIndex { + return &metricsinfo.SegmentIndex{ + IndexFieldID: e.FieldID, + IndexID: e.IndexID, + BuildID: e.BuildID, + IndexSize: e.IndexSize, + } + }) + return convertedSegment +} + func (segment *Segment) Clone() *Segment { return &Segment{ SegmentInfo: proto.Clone(segment.SegmentInfo).(*datapb.SegmentInfo), @@ -227,3 +244,17 @@ func (m *SegmentDistManager) GetByFilter(filters ...SegmentDistFilter) []*Segmen } return ret } + +func (m *SegmentDistManager) GetSegmentDist() []*metricsinfo.Segment { + m.rwmutex.RLock() + m.rwmutex.RUnlock() + + var segments []*metricsinfo.Segment + for _, nodeSeg := range m.segments { + for _, segment := range nodeSeg.segments { + segments = append(segments, newSegmentMetricsFrom(segment)) + } + } + + return segments +} diff --git a/internal/querycoordv2/meta/segment_dist_manager_test.go b/internal/querycoordv2/meta/segment_dist_manager_test.go index 79d5340ba0b21..f8e42b86f1202 100644 --- a/internal/querycoordv2/meta/segment_dist_manager_test.go +++ b/internal/querycoordv2/meta/segment_dist_manager_test.go @@ -19,8 +19,10 @@ package meta import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" ) @@ -188,3 +190,53 @@ func (suite *SegmentDistManagerSuite) AssertShard(segments []*Segment, shard str func TestSegmentDistManager(t *testing.T) { suite.Run(t, new(SegmentDistManagerSuite)) } + +func TestGetSegmentDistJSON(t *testing.T) { + // Initialize SegmentDistManager + manager := NewSegmentDistManager() + + // Add some segments to the SegmentDistManager + segment1 := SegmentFromInfo(&datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "channel-1", + NumOfRows: 1000, + State: commonpb.SegmentState_Flushed, + }) + segment1.Node = 1 + segment1.Version = 1 + + segment2 := SegmentFromInfo(&datapb.SegmentInfo{ + ID: 2, + CollectionID: 200, + PartitionID: 20, + InsertChannel: "channel-2", + NumOfRows: 2000, + State: commonpb.SegmentState_Flushed, + }) + segment2.Node = 2 + segment2.Version = 1 + + manager.Update(1, segment1) + manager.Update(2, segment2) + + segments := manager.GetSegmentDist() + assert.Equal(t, int64(1), segments[0].SegmentID) + assert.Equal(t, int64(100), segments[0].CollectionID) + assert.Equal(t, int64(10), segments[0].PartitionID) + assert.Equal(t, "channel-1", segments[0].Channel) + assert.Equal(t, int64(1000), segments[0].NumOfRows) + assert.Equal(t, "Flushed", segments[0].State) + assert.Equal(t, int64(1), segments[0].Node) + assert.Equal(t, int64(1), segments[0].LoadedTimestamp) + + assert.Equal(t, int64(2), segments[1].SegmentID) + assert.Equal(t, int64(200), segments[1].CollectionID) + assert.Equal(t, int64(20), segments[1].PartitionID) + assert.Equal(t, "channel-2", segments[1].Channel) + assert.Equal(t, int64(2000), segments[1].NumOfRows) + assert.Equal(t, "Flushed", segments[1].State) + assert.Equal(t, int64(2), segments[1].Node) + assert.Equal(t, int64(1), segments[1].LoadedTimestamp) +} diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index f8fcd896942cb..bfee39d4fd0f7 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -23,6 +23,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -159,7 +161,7 @@ func (p *CollectionTarget) IsEmpty() bool { type target struct { // just maintain target at collection level - collectionTargetMap map[int64]*CollectionTarget + collectionTargetMap map[int64]*CollectionTarget `json:"collection_target,omitempty"` } func newTarget() *target { @@ -183,3 +185,21 @@ func (t *target) removeCollectionTarget(collectionID int64) { func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget { return t.collectionTargetMap[collectionID] } + +func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordCollectionTarget { + return lo.MapToSlice(t.collectionTargetMap, func(k int64, v *CollectionTarget) *metricsinfo.QueryCoordCollectionTarget { + segments := lo.MapToSlice(v.GetAllSegments(), func(k int64, s *datapb.SegmentInfo) *metricsinfo.Segment { + return metrics.NewSegmentFrom(s) + }) + + dmChannels := lo.MapToSlice(v.GetAllDmChannels(), func(k string, ch *DmChannel) *metricsinfo.DmChannel { + return metrics.NewDMChannelFrom(ch.VchannelInfo) + }) + + return &metricsinfo.QueryCoordCollectionTarget{ + CollectionID: k, + Segments: segments, + DMChannels: dmChannels, + } + }) +} diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 6f05d4c96e004..fd2ef164892a1 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -18,6 +18,7 @@ package meta import ( "context" + "encoding/json" "fmt" "runtime" "sync" @@ -68,6 +69,7 @@ type TargetManagerInterface interface { SaveCurrentTarget(catalog metastore.QueryCoordCatalog) Recover(catalog metastore.QueryCoordCatalog) error CanSegmentBeMoved(collectionID, segmentID int64) bool + GetTargetJSON(scope TargetScope) string } type TargetManager struct { @@ -632,3 +634,28 @@ func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool return false } + +func (mgr *TargetManager) GetTargetJSON(scope TargetScope) string { + mgr.rwMutex.RLock() + defer mgr.rwMutex.RUnlock() + + ret := mgr.getTarget(scope) + if ret == nil { + return "" + } + + v, err := json.Marshal(ret.toQueryCoordCollectionTargets()) + if err != nil { + log.Warn("failed to marshal target", zap.Error(err)) + return "" + } + return string(v) +} + +func (mgr *TargetManager) getTarget(scope TargetScope) *target { + if scope == CurrentTarget { + return mgr.current + } + + return mgr.next +} diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 960e8697db648..d0e2c1af23146 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -199,8 +199,41 @@ func (s *Server) registerMetricsRequest() { return s.taskScheduler.GetTasksJSON(), nil } + QueryDistAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.targetMgr.GetTargetJSON(meta.CurrentTarget), nil + } + + QueryTargetAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.dist.GetDistributionJSON(), nil + } + + QueryReplicasAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.meta.GetReplicasJSON(), nil + } + + QueryResourceGroupsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.meta.GetResourceGroupsJSON(), nil + } + + QuerySegmentsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.getSegmentsFromQueryNode(ctx, req) + } + + QueryChannelsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.getChannelsFromQueryNode(ctx, req) + } + + // register actions that requests are processed in querycoord s.metricsRequest.RegisterMetricsRequest(metricsinfo.SystemInfoMetrics, getSystemInfoAction) s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryCoordAllTasks, QueryTasksAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryDist, QueryDistAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryTarget, QueryTargetAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryReplicas, QueryReplicasAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryResourceGroups, QueryResourceGroupsAction) + + // register actions that requests are processed in querynode + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments, QuerySegmentsAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryChannels, QueryChannelsAction) log.Info("register metrics actions finished") } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 2343607269ac9..2ecbf9e4d2e7d 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -786,7 +786,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() { // StartPosition: &msgpb.MsgPosition{Timestamp: 20000}, // DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000}, // Level: datapb.SegmentLevel_L1, - // InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID), + // Channel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID), // }, // }, // }) diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 44fae7020e06b..8ac7d8130a2b8 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -18,14 +18,17 @@ package querynodev2 import ( "context" + "encoding/json" "fmt" "github.com/samber/lo" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/querynodev2/collector" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/metricsinfo" @@ -170,6 +173,54 @@ func getCollectionMetrics(node *QueryNode) (*metricsinfo.QueryNodeCollectionMetr return ret, nil } +// getChannelJSON returns the JSON string of channels +func getChannelJSON(node *QueryNode) string { + stats := node.pipelineManager.GetChannelStats() + ret, err := json.Marshal(stats) + if err != nil { + log.Warn("failed to marshal channels", zap.Error(err)) + return "" + } + return string(ret) +} + +// getSegmentJSON returns the JSON string of segments +func getSegmentJSON(node *QueryNode) string { + allSegments := node.manager.Segment.GetBy() + var ms []*metricsinfo.Segment + for _, s := range allSegments { + indexes := make([]*metricsinfo.SegmentIndex, 0, len(s.Indexes())) + for _, index := range s.Indexes() { + indexes = append(indexes, &metricsinfo.SegmentIndex{ + IndexFieldID: index.IndexInfo.FieldID, + IndexID: index.IndexInfo.IndexID, + IndexSize: index.IndexInfo.IndexSize, + BuildID: index.IndexInfo.BuildID, + IsLoaded: index.IsLoaded, + }) + } + + ms = append(ms, &metricsinfo.Segment{ + SegmentID: s.ID(), + CollectionID: s.Collection(), + PartitionID: s.Partition(), + MemSize: s.MemSize(), + Index: indexes, + State: s.Type().String(), + ResourceGroup: s.ResourceGroup(), + InsertRowCount: s.InsertCount(), + NodeID: node.GetNodeID(), + }) + } + + ret, err := json.Marshal(ms) + if err != nil { + log.Warn("failed to marshal segments", zap.Error(err)) + return "" + } + return string(ret) +} + // getSystemInfoMetrics returns metrics info of QueryNode func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, node *QueryNode) (string, error) { usedMem := hardware.GetUsedMemoryCount() diff --git a/internal/querynodev2/metrics_info_test.go b/internal/querynodev2/metrics_info_test.go new file mode 100644 index 0000000000000..bf0fad7cfce85 --- /dev/null +++ b/internal/querynodev2/metrics_info_test.go @@ -0,0 +1,130 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querynodev2 + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querynodev2/delegator" + "github.com/milvus-io/milvus/internal/querynodev2/pipeline" + "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/querynodev2/tsafe" + "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +func TestGetPipelineJSON(t *testing.T) { + paramtable.Init() + + ch := "ch" + tSafeManager := tsafe.NewTSafeReplica() + tSafeManager.Add(context.Background(), ch, 0) + delegators := typeutil.NewConcurrentMap[string, delegator.ShardDelegator]() + d := delegator.NewMockShardDelegator(t) + delegators.Insert(ch, d) + msgDispatcher := msgdispatcher.NewMockClient(t) + + collectionManager := segments.NewMockCollectionManager(t) + segmentManager := segments.NewMockSegmentManager(t) + collectionManager.EXPECT().Get(mock.Anything).Return(&segments.Collection{}) + manager := &segments.Manager{ + Collection: collectionManager, + Segment: segmentManager, + } + + pipelineManager := pipeline.NewManager(manager, tSafeManager, msgDispatcher, delegators) + + _, err := pipelineManager.Add(1, ch) + assert.NoError(t, err) + assert.Equal(t, 1, pipelineManager.Num()) + + stats := pipelineManager.GetChannelStats() + expectedStats := []*metricsinfo.Channel{ + { + Name: ch, + WatchState: "Healthy", + TimeTick: 0, + }, + } + assert.Equal(t, expectedStats, stats) + + JSONStr := getChannelJSON(&QueryNode{pipelineManager: pipelineManager}) + assert.NotEmpty(t, JSONStr) + + var actualStats []*metricsinfo.Channel + err = json.Unmarshal([]byte(JSONStr), &actualStats) + assert.NoError(t, err) + assert.Equal(t, expectedStats, actualStats) +} + +func TestGetSegmentJSON(t *testing.T) { + segment := segments.NewMockSegment(t) + segment.EXPECT().ID().Return(int64(1)) + segment.EXPECT().Collection().Return(int64(1001)) + segment.EXPECT().Partition().Return(int64(2001)) + segment.EXPECT().MemSize().Return(int64(1024)) + segment.EXPECT().Indexes().Return([]*segments.IndexedFieldInfo{ + { + IndexInfo: &querypb.FieldIndexInfo{ + FieldID: 1, + IndexID: 101, + IndexSize: 512, + BuildID: 10001, + }, + IsLoaded: true, + }, + }) + segment.EXPECT().Type().Return(segments.SegmentTypeGrowing) + segment.EXPECT().ResourceGroup().Return("default") + segment.EXPECT().InsertCount().Return(int64(100)) + + node := &QueryNode{} + mockedSegmentManager := segments.NewMockSegmentManager(t) + mockedSegmentManager.EXPECT().GetBy().Return([]segments.Segment{segment}) + node.manager = &segments.Manager{Segment: mockedSegmentManager} + + jsonStr := getSegmentJSON(node) + assert.NotEmpty(t, jsonStr) + + var segments []*metricsinfo.Segment + err := json.Unmarshal([]byte(jsonStr), &segments) + assert.NoError(t, err) + assert.NotNil(t, segments) + assert.Equal(t, 1, len(segments)) + assert.Equal(t, int64(1), segments[0].SegmentID) + assert.Equal(t, int64(1001), segments[0].CollectionID) + assert.Equal(t, int64(2001), segments[0].PartitionID) + assert.Equal(t, int64(1024), segments[0].MemSize) + assert.Equal(t, 1, len(segments[0].Index)) + assert.Equal(t, int64(1), segments[0].Index[0].IndexFieldID) + assert.Equal(t, int64(101), segments[0].Index[0].IndexID) + assert.Equal(t, int64(512), segments[0].Index[0].IndexSize) + assert.Equal(t, int64(10001), segments[0].Index[0].BuildID) + assert.True(t, segments[0].Index[0].IsLoaded) + assert.Equal(t, "Growing", segments[0].State) + assert.Equal(t, "default", segments[0].ResourceGroup) + assert.Equal(t, int64(100), segments[0].LoadedInsertRowCount) + assert.Equal(t, node.GetNodeID(), segments[0].NodeID) +} diff --git a/internal/querynodev2/pipeline/manager.go b/internal/querynodev2/pipeline/manager.go index 453c9638430f7..a1272a2e426d7 100644 --- a/internal/querynodev2/pipeline/manager.go +++ b/internal/querynodev2/pipeline/manager.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -40,6 +41,7 @@ type Manager interface { Remove(channels ...string) Start(channels ...string) error Close() + GetChannelStats() []*metricsinfo.Channel } type manager struct { @@ -155,6 +157,27 @@ func (m *manager) Close() { } } +func (m *manager) GetChannelStats() []*metricsinfo.Channel { + m.mu.RLock() + defer m.mu.RUnlock() + + ret := make([]*metricsinfo.Channel, 0, len(m.channel2Pipeline)) + for ch, p := range m.channel2Pipeline { + tt, err := m.tSafeManager.Get(ch) + if err != nil { + log.Warn("get tSafe failed", zap.String("channel", ch), zap.Error(err)) + } + ret = append(ret, &metricsinfo.Channel{ + Name: ch, + WatchState: p.Status(), + TimeTick: int64(tt), + NodeID: paramtable.GetNodeID(), + CollectionID: p.GetCollectionID(), + }) + } + return ret +} + func NewManager(dataManager *DataManager, tSafeManager TSafeManager, dispatcher msgdispatcher.Client, diff --git a/internal/querynodev2/pipeline/pipeline.go b/internal/querynodev2/pipeline/pipeline.go index fb8d72f4d9901..17e1ec56500d4 100644 --- a/internal/querynodev2/pipeline/pipeline.go +++ b/internal/querynodev2/pipeline/pipeline.go @@ -26,6 +26,7 @@ import ( // pipeline used for querynode type Pipeline interface { base.StreamPipeline + GetCollectionID() UniqueID } type pipeline struct { @@ -35,6 +36,10 @@ type pipeline struct { embeddingNode embeddingNode } +func (p *pipeline) GetCollectionID() UniqueID { + return p.collectionID +} + func (p *pipeline) Close() { p.StreamPipeline.Close() } diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index c24b59bf66fa8..6e6407df4e534 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -283,6 +283,16 @@ func (node *QueryNode) registerMetricsRequest() { func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return getSystemInfoMetrics(ctx, req, node) }) + + node.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return getSegmentJSON(node), nil + }) + + node.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryChannels, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return getChannelJSON(node), nil + }) log.Info("register metrics actions finished") } diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index 6b67e0a32d11f..91bfeb7989b8c 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/cockroachdb/errors" "go.uber.org/atomic" @@ -123,6 +124,16 @@ func (fg *TimeTickedFlowGraph) Close() { }) } +// Status returns the status of the pipeline, it will return "Healthy" if the input node +// has received any msg in the last nodeTtInterval +func (fg *TimeTickedFlowGraph) Status() string { + diff := time.Since(fg.nodeCtxManager.lastAccessTime.Load()) + if diff > nodeCtxTtInterval { + return fmt.Sprintf("input node hasn't received any msg in the last %s", diff.String()) + } + return "Healthy" +} + // NewTimeTickedFlowGraph create timetick flowgraph func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph { flowGraph := TimeTickedFlowGraph{ diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index 1c1685efcd27c..40589c3f4c2c0 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" @@ -59,14 +60,17 @@ type nodeCtxManager struct { closeWg *sync.WaitGroup closeOnce sync.Once closeCh chan struct{} // notify nodes to exit + + lastAccessTime *atomic.Time } // NewNodeCtxManager init with the inputNode and fg.closeWg func NewNodeCtxManager(nodeCtx *nodeCtx, closeWg *sync.WaitGroup) *nodeCtxManager { return &nodeCtxManager{ - inputNodeCtx: nodeCtx, - closeWg: closeWg, - closeCh: make(chan struct{}), + inputNodeCtx: nodeCtx, + closeWg: closeWg, + closeCh: make(chan struct{}), + lastAccessTime: atomic.NewTime(time.Now()), } } @@ -119,6 +123,7 @@ func (nodeCtxManager *nodeCtxManager) workNodeStart() { continue } + nodeCtxManager.lastAccessTime.Store(time.Now()) output = n.Operate(input) curNode.blockMutex.RUnlock() // the output decide whether the node should be closed. diff --git a/internal/util/metrics/utils.go b/internal/util/metrics/utils.go index 4c5b1fc230f61..8168fb78de113 100644 --- a/internal/util/metrics/utils.go +++ b/internal/util/metrics/utils.go @@ -1,46 +1,34 @@ package metrics import ( - "github.com/samber/lo" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" ) -func PruneFieldIndexInfo(f *querypb.FieldIndexInfo) *querypb.FieldIndexInfo { - return &querypb.FieldIndexInfo{ - FieldID: f.FieldID, - IndexID: f.IndexID, - BuildID: f.BuildID, - IndexSize: f.IndexSize, - NumRows: f.NumRows, - } -} - -func PruneSegmentInfo(s *datapb.SegmentInfo) *datapb.SegmentInfo { - return &datapb.SegmentInfo{ - ID: s.ID, - NumOfRows: s.NumOfRows, - State: s.State, - Compacted: s.Compacted, - Level: s.Level, +func NewSegmentFrom(segment *datapb.SegmentInfo) *metricsinfo.Segment { + return &metricsinfo.Segment{ + SegmentID: segment.GetID(), + CollectionID: segment.GetCollectionID(), + PartitionID: segment.GetPartitionID(), + Channel: segment.GetInsertChannel(), + NumOfRows: segment.GetNumOfRows(), + State: segment.GetState().String(), + IsImporting: segment.GetIsImporting(), + Compacted: segment.GetCompacted(), + Level: segment.GetLevel().String(), + IsSorted: segment.GetIsSorted(), + IsInvisible: segment.GetIsInvisible(), } } -func PruneVChannelInfo(channel *datapb.VchannelInfo) *datapb.VchannelInfo { - return &datapb.VchannelInfo{ - ChannelName: channel.ChannelName, - UnflushedSegments: lo.Map(channel.UnflushedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo { - return PruneSegmentInfo(s) - }), - FlushedSegments: lo.Map(channel.FlushedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo { - return PruneSegmentInfo(s) - }), - DroppedSegments: lo.Map(channel.DroppedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo { - return PruneSegmentInfo(s) - }), - IndexedSegments: lo.Map(channel.IndexedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo { - return PruneSegmentInfo(s) - }), +func NewDMChannelFrom(channel *datapb.VchannelInfo) *metricsinfo.DmChannel { + return &metricsinfo.DmChannel{ + CollectionID: channel.GetCollectionID(), + ChannelName: channel.GetChannelName(), + UnflushedSegmentIds: channel.GetUnflushedSegmentIds(), + FlushedSegmentIds: channel.GetFlushedSegmentIds(), + DroppedSegmentIds: channel.GetDroppedSegmentIds(), + LevelZeroSegmentIds: channel.GetLevelZeroSegmentIds(), + PartitionStatsVersions: channel.GetPartitionStatsVersions(), } } diff --git a/internal/util/pipeline/stream_pipeline.go b/internal/util/pipeline/stream_pipeline.go index 2765e11492605..c18dd196517e6 100644 --- a/internal/util/pipeline/stream_pipeline.go +++ b/internal/util/pipeline/stream_pipeline.go @@ -18,9 +18,11 @@ package pipeline import ( "context" + "fmt" "sync" "time" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -39,6 +41,7 @@ import ( type StreamPipeline interface { Pipeline ConsumeMsgStream(position *msgpb.MsgPosition) error + Status() string } type streamPipeline struct { @@ -52,6 +55,8 @@ type streamPipeline struct { closeCh chan struct{} // notify work to exit closeWg sync.WaitGroup closeOnce sync.Once + + lastAccessTime *atomic.Time } func (p *streamPipeline) work() { @@ -62,6 +67,7 @@ func (p *streamPipeline) work() { log.Debug("stream pipeline input closed") return case msg := <-p.input: + p.lastAccessTime.Store(time.Now()) log.RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs))) p.pipeline.inputChannel <- msg p.pipeline.process() @@ -69,6 +75,16 @@ func (p *streamPipeline) work() { } } +// Status returns the status of the pipeline, it will return "Healthy" if the input node +// has received any msg in the last nodeTtInterval +func (p *streamPipeline) Status() string { + diff := time.Since(p.lastAccessTime.Load()) + if diff > p.pipeline.nodeTtInterval { + return fmt.Sprintf("input node hasn't received any msg in the last %s", diff.String()) + } + return "Healthy" +} + func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error { var err error if position == nil { @@ -150,10 +166,11 @@ func NewPipelineWithStream(dispatcher msgdispatcher.Client, nodeTtInterval time. nodeTtInterval: nodeTtInterval, enableTtChecker: enableTtChecker, }, - dispatcher: dispatcher, - vChannel: vChannel, - closeCh: make(chan struct{}), - closeWg: sync.WaitGroup{}, + dispatcher: dispatcher, + vChannel: vChannel, + closeCh: make(chan struct{}), + closeWg: sync.WaitGroup{}, + lastAccessTime: atomic.NewTime(time.Now()), } return pipeline diff --git a/pkg/util/metricsinfo/metric_request.go b/pkg/util/metricsinfo/metric_request.go index 07dd4a6dec034..464cabf63851a 100644 --- a/pkg/util/metricsinfo/metric_request.go +++ b/pkg/util/metricsinfo/metric_request.go @@ -45,10 +45,16 @@ const ( MetricRequestParamsSeparator = "," // QuerySegmentDist request for segment distribution on the query node - QuerySegmentDist = "qc_segment_dist" + QuerySegments = "qn_segments" // QueryChannelDist request for channel distribution on the query node - QueryChannelDist = "qc_channel_dist" + QueryChannels = "qn_channels" + + // QueryDist request for segment/channel/leader view distribution on querycoord + QueryDist = "qc_dist" + + // QueryTarget request for segment/channel target on the querycoord + QueryTarget = "qc_target" // QueryCoordAllTasks request for get tasks on the querycoord QueryCoordAllTasks = "qc_tasks_all" @@ -74,6 +80,12 @@ const ( // SyncTasks request for get sync tasks from the datanode SyncTasks = "dn_sync_tasks" + // DataSegments request for get segments from the datanode + DataSegments = "dn_segments" + + // DataChannels request for get channels from the datanode + DataChannels = "dn_channels" + // MetricRequestParamVerboseKey as a request parameter decide to whether return verbose value MetricRequestParamVerboseKey = "verbose" ) diff --git a/pkg/util/metricsinfo/metrics_info.go b/pkg/util/metricsinfo/metrics_info.go index a1bb87d2f872f..799c075a7eaaa 100644 --- a/pkg/util/metricsinfo/metrics_info.go +++ b/pkg/util/metricsinfo/metrics_info.go @@ -15,6 +15,7 @@ import ( "encoding/json" "time" + "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -70,6 +71,101 @@ const ( MilvusUsedGoVersion = "MILVUS_USED_GO_VERSION" ) +type DmChannel struct { + NodeID int64 `json:"node_id,omitempty"` + Version int64 `json:"version,omitempty"` + CollectionID int64 `json:"collection_id,omitempty"` + ChannelName string `json:"channel_name,omitempty"` + UnflushedSegmentIds []int64 `json:"unflushed_segment_ids,omitempty"` + FlushedSegmentIds []int64 `json:"flushed_segment_ids,omitempty"` + DroppedSegmentIds []int64 `json:"dropped_segment_ids,omitempty"` + LevelZeroSegmentIds []int64 `json:"level_zero_segment_ids,omitempty"` + PartitionStatsVersions map[int64]int64 `json:"partition_stats_versions,omitempty"` +} + +type Segment struct { + SegmentID int64 `json:"segment_id,omitempty"` + CollectionID int64 `json:"collection_id,omitempty"` + PartitionID int64 `json:"partition_id,omitempty"` + Channel string `json:"channel,omitempty"` + NumOfRows int64 `json:"num_of_rows,omitempty"` + State string `json:"state,omitempty"` + IsImporting bool `json:"is_importing,omitempty"` + Compacted bool `json:"compacted,omitempty"` + Level string `json:"level,omitempty"` + IsSorted bool `json:"is_sorted,omitempty"` + NodeID int64 `json:"node_id,omitempty"` + + // load related + IsInvisible bool `json:"is_invisible,omitempty"` + LoadedTimestamp int64 `json:"loaded_timestamp,omitempty"` + Index []*SegmentIndex `json:"index,omitempty"` + ResourceGroup string `json:"resource_group,omitempty"` + LoadedInsertRowCount int64 `json:"loaded_insert_row_count,omitempty"` // inert row count for growing segment that excludes the deleted row count in QueryNode + MemSize int64 `json:"mem_size,omitempty"` // memory size of segment in QueryNode + + // flush related + FlushedRows int64 `json:"flushed_rows,omitempty"` + SyncBufferRows int64 `json:"sync_buffer_rows,omitempty"` + SyncingRows int64 `json:"syncing_rows,omitempty"` + // TODO add checkpoints +} + +type SegmentIndex struct { + IndexFieldID int64 `json:"field_id,omitempty"` + IndexID int64 `json:"index_id,omitempty"` + BuildID int64 `json:"build_id,omitempty"` + IndexSize int64 `json:"index_size,omitempty"` + IsLoaded bool `json:"is_loaded,omitempty"` +} + +type QueryCoordCollectionTarget struct { + CollectionID int64 `json:"collection_id,omitempty"` + Segments []*Segment `json:"segments,omitempty"` + DMChannels []*DmChannel `json:"dm_channels,omitempty"` +} + +type LeaderView struct { + LeaderID int64 `json:"leader_id"` + CollectionID int64 `json:"collection_id"` + Channel string `json:"channel"` + Version int64 `json:"version"` + SealedSegments []*Segment `json:"sealed_segments"` + GrowingSegments []*Segment `json:"growing_segments"` + TargetVersion int64 `json:"target_version"` + NumOfGrowingRows int64 `json:"num_of_growing_rows"` + UnServiceableError string `json:"unserviceable_error"` +} + +type QueryCoordCollectionDistribution struct { + Segments []*Segment `json:"segments,omitempty"` + DMChannels []*DmChannel `json:"dm_channels,omitempty"` + LeaderViews []*LeaderView `json:"leader_views,omitempty"` +} + +type ResourceGroup struct { + Name string `json:"name,omitempty"` + Nodes []int64 `json:"nodes,omitempty"` + Cfg *rgpb.ResourceGroupConfig `json:"cfg,omitempty"` +} + +type Replica struct { + ID int64 `json:"ID,omitempty"` + CollectionID int64 `json:"collectionID,omitempty"` + RWNodes []int64 `json:"rw_nodes,omitempty"` + ResourceGroup string `json:"resource_group,omitempty"` + RONodes []int64 `json:"ro_nodes,omitempty"` + ChannelToRWNodes map[string][]int64 `json:"channel_to_rw_nodes,omitempty"` +} + +type Channel struct { + Name string `json:"name,omitempty"` + WatchState string `json:"watched_status,omitempty"` + TimeTick int64 `json:"time_tick,omitempty"` + NodeID int64 `json:"node_id,omitempty"` + CollectionID int64 `json:"collection_id,omitempty"` +} + // DeployMetrics records the deploy information of nodes. type DeployMetrics struct { SystemVersion string `json:"system_version"`