diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 7814e214daa7b..b50d0c5988f0d 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -18,10 +18,12 @@ package querycoordv2 import ( "context" + "encoding/json" "sync" "time" "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/samber/lo" "go.uber.org/zap" @@ -242,6 +244,36 @@ func (s *Server) balanceChannels(ctx context.Context, return nil } +// TODO(dragondriver): add more detail metrics +func (s *Server) getSegmentsFromQueryNode[T]( + ctx context.Context, + req *milvuspb.GetMetricsRequest, +) (string, error) { + segments := make([]*metricsinfo.Segment, 0) + for _, node := range s.nodeMgr.GetAll() { + resp, err := s.cluster.GetMetrics(ctx, node.ID(), req) + if err := merr.CheckRPCCall(resp, err); err != nil { + log.Warn("failed to get metric from QueryNode", + zap.Int64("nodeID", node.ID())) + continue + } + + infos := make([]*metricsinfo.Segment, 0) + err = json.Unmarshal([]byte(resp.Response), &infos) + if err != nil { + log.Warn("invalid metrics of query node was found", + zap.Error(err)) + continue + } + segments = append(segments, infos...) + } + ret, err := json.Marshal(segments) + if err != nil { + return "", err + } + return string(ret), nil +} + // TODO(dragondriver): add more detail metrics func (s *Server) getSystemInfoMetrics( ctx context.Context, diff --git a/internal/querycoordv2/meta/channel_dist_manager.go b/internal/querycoordv2/meta/channel_dist_manager.go index 9ecd29d06efe5..bd9001b82f213 100644 --- a/internal/querycoordv2/meta/channel_dist_manager.go +++ b/internal/querycoordv2/meta/channel_dist_manager.go @@ -19,6 +19,8 @@ package meta import ( "sync" + "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/samber/lo" "google.golang.org/protobuf/proto" @@ -130,6 +132,13 @@ func (channel *DmChannel) Clone() *DmChannel { } } +func newDmChannelMetricsFrom(channel *DmChannel) *metricsinfo.DmChannel { + dmChannel := metrics.NewDMChannelFrom(channel.VchannelInfo) + dmChannel.NodeID = channel.Node + dmChannel.Version = channel.Version + return dmChannel +} + type nodeChannels struct { channels []*DmChannel // collection id => channels @@ -290,3 +299,16 @@ func (m *ChannelDistManager) updateCollectionIndex() { } } } + +func (m *ChannelDistManager) GetChannelDist() []*metricsinfo.DmChannel { + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() + + var channels []*metricsinfo.DmChannel + for _, nodeChannels := range m.channels { + for _, channel := range nodeChannels.channels { + channels = append(channels, newDmChannelMetricsFrom(channel)) + } + } + return channels +} diff --git a/internal/querycoordv2/meta/channel_dist_manager_test.go b/internal/querycoordv2/meta/channel_dist_manager_test.go index 4960aae25adeb..18ffe9dccb29f 100644 --- a/internal/querycoordv2/meta/channel_dist_manager_test.go +++ b/internal/querycoordv2/meta/channel_dist_manager_test.go @@ -19,6 +19,7 @@ package meta import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -185,3 +186,32 @@ func (suite *ChannelDistManagerSuite) AssertCollection(channels []*DmChannel, co func TestChannelDistManager(t *testing.T) { suite.Run(t, new(ChannelDistManagerSuite)) } + +func TestGetChannelDistJSON(t *testing.T) { + manager := NewChannelDistManager() + channel1 := DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: 100, + ChannelName: "channel-1", + }) + channel1.Node = 1 + channel1.Version = 1 + + channel2 := DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: 200, + ChannelName: "channel-2", + }) + channel2.Node = 2 + channel2.Version = 1 + + manager.Update(1, channel1) + manager.Update(2, channel2) + + channels := manager.GetChannelDist() + assert.Equal(t, int64(1), channels[0].NodeID) + assert.Equal(t, "channel-1", channels[0].ChannelName) + assert.Equal(t, int64(100), channels[0].CollectionID) + + assert.Equal(t, int64(2), channels[1].NodeID) + assert.Equal(t, "channel-2", channels[1].ChannelName) + assert.Equal(t, int64(200), channels[1].CollectionID) +} diff --git a/internal/querycoordv2/meta/dist_manager.go b/internal/querycoordv2/meta/dist_manager.go index 39aca9551abdd..bd17dd9c0c042 100644 --- a/internal/querycoordv2/meta/dist_manager.go +++ b/internal/querycoordv2/meta/dist_manager.go @@ -16,6 +16,14 @@ package meta +import ( + "encoding/json" + + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "go.uber.org/zap" +) + type DistributionManager struct { *SegmentDistManager *ChannelDistManager @@ -29,3 +37,30 @@ func NewDistributionManager() *DistributionManager { LeaderViewManager: NewLeaderViewManager(), } } + +// GetDistributionJSON returns a JSON representation of the current distribution state. +// It includes segments, DM channels, and leader views. +// If there are no segments, channels, or leader views, it returns an empty string. +// In case of an error during JSON marshaling, it returns the error. +func (dm *DistributionManager) GetDistributionJSON() string { + segments := dm.GetSegmentDist() + channels := dm.GetChannelDist() + leaderView := dm.GetLeaderView() + + if len(segments) == 0 && len(channels) == 0 && len(leaderView) == 0 { + return "" + } + + dist := &metricsinfo.QueryCoordCollectionDistribution{ + Segments: segments, + DMChannels: channels, + LeaderViews: leaderView, + } + + v, err := json.Marshal(dist) + if err != nil { + log.Warn("failed to marshal dist", zap.Error(err)) + return "" + } + return string(v) +} diff --git a/internal/querycoordv2/meta/dist_manager_test.go b/internal/querycoordv2/meta/dist_manager_test.go new file mode 100644 index 0000000000000..872d5e9f856fe --- /dev/null +++ b/internal/querycoordv2/meta/dist_manager_test.go @@ -0,0 +1,99 @@ +package meta + +import ( + "encoding/json" + "testing" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/stretchr/testify/assert" +) + +func TestGetDistributionJSON(t *testing.T) { + // Initialize DistributionManager + manager := NewDistributionManager() + + // Add some segments to the SegmentDistManager + segment1 := SegmentFromInfo(&datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "channel-1", + NumOfRows: 1000, + State: commonpb.SegmentState_Flushed, + }) + segment1.Node = 1 + segment1.Version = 1 + + segment2 := SegmentFromInfo(&datapb.SegmentInfo{ + ID: 2, + CollectionID: 200, + PartitionID: 20, + InsertChannel: "channel-2", + NumOfRows: 2000, + State: commonpb.SegmentState_Flushed, + }) + segment2.Node = 2 + segment2.Version = 1 + + manager.SegmentDistManager.Update(1, segment1) + manager.SegmentDistManager.Update(2, segment2) + + // Add some channels to the ChannelDistManager + channel1 := DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: 100, + ChannelName: "channel-1", + }) + channel1.Node = 1 + channel1.Version = 1 + + channel2 := DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: 200, + ChannelName: "channel-2", + }) + channel2.Node = 2 + channel2.Version = 1 + + manager.ChannelDistManager.Update(1, channel1) + manager.ChannelDistManager.Update(2, channel2) + + // Add some leader views to the LeaderViewManager + leaderView1 := &LeaderView{ + ID: 1, + CollectionID: 100, + Channel: "channel-1", + Version: 1, + Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1}}, + } + + leaderView2 := &LeaderView{ + ID: 2, + CollectionID: 200, + Channel: "channel-2", + Version: 1, + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + } + + manager.LeaderViewManager.Update(1, leaderView1) + manager.LeaderViewManager.Update(2, leaderView2) + + // Call GetDistributionJSON + jsonOutput := manager.GetDistributionJSON() + + // Verify JSON output + var dist metricsinfo.QueryCoordCollectionDistribution + err := json.Unmarshal([]byte(jsonOutput), &dist) + assert.NoError(t, err) + assert.Len(t, dist.Segments, 2) + assert.Len(t, dist.DMChannels, 2) + assert.Len(t, dist.LeaderViews, 2) + + assert.Equal(t, int64(1), dist.Segments[0].SegmentID) + assert.Equal(t, int64(2), dist.Segments[1].SegmentID) + assert.Equal(t, "channel-1", dist.DMChannels[0].ChannelName) + assert.Equal(t, "channel-2", dist.DMChannels[1].ChannelName) + assert.Equal(t, int64(1), dist.LeaderViews[0].LeaderID) + assert.Equal(t, int64(2), dist.LeaderViews[1].LeaderID) +} diff --git a/internal/querycoordv2/meta/leader_view_manager.go b/internal/querycoordv2/meta/leader_view_manager.go index 963e115b69506..c63ba8cb61dfc 100644 --- a/internal/querycoordv2/meta/leader_view_manager.go +++ b/internal/querycoordv2/meta/leader_view_manager.go @@ -19,6 +19,7 @@ package meta import ( "sync" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/samber/lo" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -308,3 +309,46 @@ func (mgr *LeaderViewManager) GetLatestShardLeaderByFilter(filters ...LeaderView return v1.Version > v2.Version }) } + +// GetLeaderView returns a slice of LeaderView objects, each representing the state of a leader node. +// It traverses the views map, converts each LeaderView to a metricsinfo.LeaderView, and collects them into a slice. +// The method locks the views map for reading to ensure thread safety. +func (mgr *LeaderViewManager) GetLeaderView() []*metricsinfo.LeaderView { + mgr.rwmutex.RLock() + defer mgr.rwmutex.RUnlock() + + var leaderViews []*metricsinfo.LeaderView + for _, nodeViews := range mgr.views { + for _, lv := range nodeViews.views { + errString := "" + if lv.UnServiceableError != nil { + errString = lv.UnServiceableError.Error() + } + leaderView := &metricsinfo.LeaderView{ + LeaderID: lv.ID, + CollectionID: lv.CollectionID, + Channel: lv.Channel, + Version: lv.Version, + SealedSegments: make([]*metricsinfo.Segment, 0, len(lv.Segments)), + GrowingSegments: make([]*metricsinfo.Segment, 0, len(lv.GrowingSegments)), + TargetVersion: lv.TargetVersion, + NumOfGrowingRows: lv.NumOfGrowingRows, + UnServiceableError: errString, + } + + for segID, seg := range lv.Segments { + leaderView.SealedSegments = append(leaderView.SealedSegments, &metricsinfo.Segment{ + SegmentID: segID, + Node: seg.NodeID, + }) + } + + for _, seg := range lv.GrowingSegments { + leaderView.GrowingSegments = append(leaderView.GrowingSegments, newSegmentMetricsFrom(seg)) + } + + leaderViews = append(leaderViews, leaderView) + } + } + return leaderViews +} diff --git a/internal/querycoordv2/meta/leader_view_manager_test.go b/internal/querycoordv2/meta/leader_view_manager_test.go index b25e245e20946..af9a0ab1f506c 100644 --- a/internal/querycoordv2/meta/leader_view_manager_test.go +++ b/internal/querycoordv2/meta/leader_view_manager_test.go @@ -17,11 +17,17 @@ package meta import ( + "encoding/json" "testing" "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/samber/lo" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "golang.org/x/exp/slices" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -317,3 +323,69 @@ func (suite *LeaderViewManagerSuite) TestNotifyDelegatorChanges() { func TestLeaderViewManager(t *testing.T) { suite.Run(t, new(LeaderViewManagerSuite)) } + +func TestGetLeaderView(t *testing.T) { + manager := NewLeaderViewManager() + leaderView1 := &LeaderView{ + ID: 1, + CollectionID: 100, + Channel: "channel-1", + Version: 1, + Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1}}, + GrowingSegments: map[int64]*Segment{ + 1: {SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 100, PartitionID: 10, InsertChannel: "channel-1", NumOfRows: 1000, State: commonpb.SegmentState_Growing}, Node: 1}, + }, + TargetVersion: 1, + NumOfGrowingRows: 1000, + UnServiceableError: nil, + } + + leaderView2 := &LeaderView{ + ID: 2, + CollectionID: 200, + Channel: "channel-2", + Version: 1, + Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, + GrowingSegments: map[int64]*Segment{ + 2: {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 200, PartitionID: 20, InsertChannel: "channel-2", NumOfRows: 2000, State: commonpb.SegmentState_Growing}, Node: 2}, + }, + TargetVersion: 1, + NumOfGrowingRows: 2000, + UnServiceableError: nil, + } + + manager.Update(1, leaderView1) + manager.Update(2, leaderView2) + + // Call GetLeaderView + leaderViews := manager.GetLeaderView() + jsonOutput, err := json.Marshal(leaderViews) + assert.NoError(t, err) + + var result []*metricsinfo.LeaderView + err = json.Unmarshal(jsonOutput, &result) + assert.NoError(t, err) + assert.Len(t, result, 2) + + // sort slice for verify results + slices.SortFunc(leaderViews, func(a, b *metricsinfo.LeaderView) int { + return int(a.LeaderID - b.LeaderID) + }) + assert.Equal(t, int64(1), result[0].LeaderID) + assert.Equal(t, int64(100), result[0].CollectionID) + assert.Equal(t, "channel-1", result[0].Channel) + assert.Equal(t, int64(1), result[0].Version) + assert.Len(t, result[0].SealedSegments, 1) + assert.Len(t, result[0].GrowingSegments, 1) + assert.Equal(t, int64(1), result[0].SealedSegments[0].SegmentID) + assert.Equal(t, int64(1), result[0].GrowingSegments[0].SegmentID) + + assert.Equal(t, int64(2), result[1].LeaderID) + assert.Equal(t, int64(200), result[1].CollectionID) + assert.Equal(t, "channel-2", result[1].Channel) + assert.Equal(t, int64(1), result[1].Version) + assert.Len(t, result[1].SealedSegments, 1) + assert.Len(t, result[1].GrowingSegments, 1) + assert.Equal(t, int64(2), result[1].SealedSegments[0].SegmentID) + assert.Equal(t, int64(2), result[1].GrowingSegments[0].SegmentID) +} diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index f59bc39cf3ed1..7aa73c51d85e5 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -17,19 +17,20 @@ package meta import ( + "encoding/json" "fmt" "sync" "github.com/cockroachdb/errors" - "github.com/samber/lo" - "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" + "github.com/samber/lo" + "go.uber.org/zap" ) type ReplicaManager struct { @@ -466,3 +467,33 @@ func (m *ReplicaManager) GetResourceGroupByCollection(collection typeutil.Unique ret := typeutil.NewSet(lo.Map(replicas, func(r *Replica, _ int) string { return r.GetResourceGroup() })...) return ret } + +// GetReplicasJSON returns a JSON representation of all replicas managed by the ReplicaManager. +// It locks the ReplicaManager for reading, converts the replicas to their protobuf representation, +// marshals them into a JSON string, and returns the result. +// If an error occurs during marshaling, it logs a warning and returns an empty string. +func (m *ReplicaManager) GetReplicasJSON() string { + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() + + replicas := lo.MapValues(m.replicas, func(r *Replica, i typeutil.UniqueID) *metricsinfo.Replica { + channelTowRWNodes := make(map[string][]int64) + for k, v := range r.replicaPB.GetChannelNodeInfos() { + channelTowRWNodes[k] = v.GetRwNodes() + } + return &metricsinfo.Replica{ + ID: r.GetID(), + CollectionID: r.GetCollectionID(), + RWNodes: r.GetNodes(), + ResourceGroup: r.GetResourceGroup(), + RONodes: r.GetRONodes(), + ChannelToRWNodes: channelTowRWNodes, + } + }) + ret, err := json.Marshal(replicas) + if err != nil { + log.Warn("failed to marshal replicas", zap.Error(err)) + return "" + } + return string(ret) +} diff --git a/internal/querycoordv2/meta/replica_manager_test.go b/internal/querycoordv2/meta/replica_manager_test.go index 55d3c471a12c6..d98dffc9984ad 100644 --- a/internal/querycoordv2/meta/replica_manager_test.go +++ b/internal/querycoordv2/meta/replica_manager_test.go @@ -17,9 +17,14 @@ package meta import ( + "encoding/json" "testing" + "github.com/milvus-io/milvus/internal/metastore/mocks" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "google.golang.org/protobuf/proto" @@ -494,3 +499,43 @@ func TestReplicaManager(t *testing.T) { suite.Run(t, new(ReplicaManagerSuite)) suite.Run(t, new(ReplicaManagerV2Suite)) } + +func TestGetReplicasJSON(t *testing.T) { + catalog := mocks.NewQueryCoordCatalog(t) + catalog.EXPECT().SaveReplica(mock.Anything).Return(nil) + idAllocator := RandomIncrementIDAllocator() + replicaManager := NewReplicaManager(idAllocator, catalog) + + // Add some replicas to the ReplicaManager + replica1 := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 100, + ResourceGroup: "rg1", + Nodes: []int64{1, 2, 3}, + }) + replica2 := newReplica(&querypb.Replica{ + ID: 2, + CollectionID: 200, + ResourceGroup: "rg2", + Nodes: []int64{4, 5, 6}, + }) + + err := replicaManager.put(replica1) + err = replicaManager.put(replica2) + assert.NoError(t, err) + + jsonOutput := replicaManager.GetReplicasJSON() + var replicas map[int64]*metricsinfo.Replica + err = json.Unmarshal([]byte(jsonOutput), &replicas) + assert.NoError(t, err) + assert.Len(t, replicas, 2) + + assert.Equal(t, int64(1), replicas[1].ID) + assert.Equal(t, int64(2), replicas[2].ID) + assert.Equal(t, int64(100), replicas[1].CollectionID) + assert.Equal(t, int64(200), replicas[2].CollectionID) + assert.Equal(t, "rg1", replicas[1].ResourceGroup) + assert.Equal(t, "rg2", replicas[2].ResourceGroup) + assert.ElementsMatch(t, []int64{1, 2, 3}, replicas[1].RWNodes) + assert.ElementsMatch(t, []int64{4, 5, 6}, replicas[2].RWNodes) +} diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index 060f287bc1689..f23253194e3db 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -17,10 +17,12 @@ package meta import ( + "encoding/json" "fmt" "sync" "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/samber/lo" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -918,3 +920,23 @@ func (rm *ResourceManager) validateResourceGroupIsDeletable(rgName string) error } return nil } + +func (rm *ResourceManager) GetResourceGroupsJSON() string { + rm.rwmutex.RLock() + defer rm.rwmutex.RUnlock() + + rgs := lo.MapValues(rm.groups, func(r *ResourceGroup, i string) *metricsinfo.ResourceGroup { + return &metricsinfo.ResourceGroup{ + Name: r.GetName(), + Nodes: r.GetNodes(), + Cfg: r.GetConfig(), + } + }) + ret, err := json.Marshal(rgs) + if err != nil { + log.Error("failed to marshal resource groups", zap.Error(err)) + return "" + } + + return string(ret) +} diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index ca58a8e899e23..8e083de9a3732 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -16,8 +16,12 @@ package meta import ( + "encoding/json" "testing" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/typeutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -619,3 +623,24 @@ func (suite *ResourceManagerSuite) TestUnassignFail() { suite.manager.HandleNodeDown(1) }) } + +func TestGetResourceGroupsJSON(t *testing.T) { + manager := &ResourceManager{groups: make(map[string]*ResourceGroup)} + rg1 := NewResourceGroup("rg1", newResourceGroupConfig(0, 10)) + rg1.nodes = typeutil.NewUniqueSet(1, 2) + rg2 := NewResourceGroup("rg2", newResourceGroupConfig(0, 20)) + rg2.nodes = typeutil.NewUniqueSet(3, 4) + manager.groups["rg1"] = rg1 + manager.groups["rg2"] = rg2 + + jsonOutput := manager.GetResourceGroupsJSON() + var resourceGroups map[string]*metricsinfo.ResourceGroup + err := json.Unmarshal([]byte(jsonOutput), &resourceGroups) + assert.NoError(t, err) + assert.Len(t, resourceGroups, 2) + + assert.Equal(t, "rg1", resourceGroups["rg1"].Name) + assert.ElementsMatch(t, []int64{1, 2}, resourceGroups["rg1"].Nodes) + assert.Equal(t, "rg2", resourceGroups["rg2"].Name) + assert.ElementsMatch(t, []int64{3, 4}, resourceGroups["rg2"].Nodes) +} diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index 51d38fc0fcafe..903bc8a680959 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -19,12 +19,13 @@ package meta import ( "sync" - "github.com/samber/lo" - "google.golang.org/protobuf/proto" - + "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" + "github.com/samber/lo" ) type segDistCriterion struct { @@ -130,6 +131,21 @@ func SegmentFromInfo(info *datapb.SegmentInfo) *Segment { } } +func newSegmentMetricsFrom(segment *Segment) *metricsinfo.Segment { + convertedSegment := metrics.NewSegmentFrom(segment.SegmentInfo) + convertedSegment.Node = segment.Node + convertedSegment.LoadedTimestamp = segment.Version + convertedSegment.Index = lo.Map(lo.Values(segment.IndexInfo), func(e *querypb.FieldIndexInfo, i int) *metricsinfo.SegmentIndex { + return &metricsinfo.SegmentIndex{ + IndexFieldID: e.FieldID, + IndexID: e.IndexID, + BuildID: e.BuildID, + IndexSize: e.IndexSize, + } + }) + return convertedSegment +} + func (segment *Segment) Clone() *Segment { return &Segment{ SegmentInfo: proto.Clone(segment.SegmentInfo).(*datapb.SegmentInfo), @@ -227,3 +243,17 @@ func (m *SegmentDistManager) GetByFilter(filters ...SegmentDistFilter) []*Segmen } return ret } + +func (m *SegmentDistManager) GetSegmentDist() []*metricsinfo.Segment { + m.rwmutex.RLock() + m.rwmutex.RUnlock() + + var segments []*metricsinfo.Segment + for _, nodeSeg := range m.segments { + for _, segment := range nodeSeg.segments { + segments = append(segments, newSegmentMetricsFrom(segment)) + } + } + + return segments +} diff --git a/internal/querycoordv2/meta/segment_dist_manager_test.go b/internal/querycoordv2/meta/segment_dist_manager_test.go index 79d5340ba0b21..9a4b0d9b5b4c6 100644 --- a/internal/querycoordv2/meta/segment_dist_manager_test.go +++ b/internal/querycoordv2/meta/segment_dist_manager_test.go @@ -19,6 +19,8 @@ package meta import ( "testing" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -188,3 +190,53 @@ func (suite *SegmentDistManagerSuite) AssertShard(segments []*Segment, shard str func TestSegmentDistManager(t *testing.T) { suite.Run(t, new(SegmentDistManagerSuite)) } + +func TestGetSegmentDistJSON(t *testing.T) { + // Initialize SegmentDistManager + manager := NewSegmentDistManager() + + // Add some segments to the SegmentDistManager + segment1 := SegmentFromInfo(&datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + InsertChannel: "channel-1", + NumOfRows: 1000, + State: commonpb.SegmentState_Flushed, + }) + segment1.Node = 1 + segment1.Version = 1 + + segment2 := SegmentFromInfo(&datapb.SegmentInfo{ + ID: 2, + CollectionID: 200, + PartitionID: 20, + InsertChannel: "channel-2", + NumOfRows: 2000, + State: commonpb.SegmentState_Flushed, + }) + segment2.Node = 2 + segment2.Version = 1 + + manager.Update(1, segment1) + manager.Update(2, segment2) + + segments := manager.GetSegmentDist() + assert.Equal(t, int64(1), segments[0].SegmentID) + assert.Equal(t, int64(100), segments[0].CollectionID) + assert.Equal(t, int64(10), segments[0].PartitionID) + assert.Equal(t, "channel-1", segments[0].InsertChannel) + assert.Equal(t, int64(1000), segments[0].NumOfRows) + assert.Equal(t, "Flushed", segments[0].State) + assert.Equal(t, int64(1), segments[0].Node) + assert.Equal(t, int64(1), segments[0].LoadedTimestamp) + + assert.Equal(t, int64(2), segments[1].SegmentID) + assert.Equal(t, int64(200), segments[1].CollectionID) + assert.Equal(t, int64(20), segments[1].PartitionID) + assert.Equal(t, "channel-2", segments[1].InsertChannel) + assert.Equal(t, int64(2000), segments[1].NumOfRows) + assert.Equal(t, "Flushed", segments[1].State) + assert.Equal(t, int64(2), segments[1].Node) + assert.Equal(t, int64(1), segments[1].LoadedTimestamp) +} diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index f8fcd896942cb..48fcc25efb10b 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -19,6 +19,8 @@ package meta import ( "time" + "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/samber/lo" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -159,7 +161,7 @@ func (p *CollectionTarget) IsEmpty() bool { type target struct { // just maintain target at collection level - collectionTargetMap map[int64]*CollectionTarget + collectionTargetMap map[int64]*CollectionTarget `json:"collection_target,omitempty"` } func newTarget() *target { @@ -183,3 +185,21 @@ func (t *target) removeCollectionTarget(collectionID int64) { func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget { return t.collectionTargetMap[collectionID] } + +func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordCollectionTarget { + return lo.MapToSlice(t.collectionTargetMap, func(k int64, v *CollectionTarget) *metricsinfo.QueryCoordCollectionTarget { + segments := lo.MapToSlice(v.GetAllSegments(), func(k int64, s *datapb.SegmentInfo) *metricsinfo.Segment { + return metrics.NewSegmentFrom(s) + }) + + dmChannels := lo.MapToSlice(v.GetAllDmChannels(), func(k string, ch *DmChannel) *metricsinfo.DmChannel { + return metrics.NewDMChannelFrom(ch.VchannelInfo) + }) + + return &metricsinfo.QueryCoordCollectionTarget{ + CollectionID: k, + Segments: segments, + DMChannels: dmChannels, + } + }) +} diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 6f05d4c96e004..fd2ef164892a1 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -18,6 +18,7 @@ package meta import ( "context" + "encoding/json" "fmt" "runtime" "sync" @@ -68,6 +69,7 @@ type TargetManagerInterface interface { SaveCurrentTarget(catalog metastore.QueryCoordCatalog) Recover(catalog metastore.QueryCoordCatalog) error CanSegmentBeMoved(collectionID, segmentID int64) bool + GetTargetJSON(scope TargetScope) string } type TargetManager struct { @@ -632,3 +634,28 @@ func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool return false } + +func (mgr *TargetManager) GetTargetJSON(scope TargetScope) string { + mgr.rwMutex.RLock() + defer mgr.rwMutex.RUnlock() + + ret := mgr.getTarget(scope) + if ret == nil { + return "" + } + + v, err := json.Marshal(ret.toQueryCoordCollectionTargets()) + if err != nil { + log.Warn("failed to marshal target", zap.Error(err)) + return "" + } + return string(v) +} + +func (mgr *TargetManager) getTarget(scope TargetScope) *target { + if scope == CurrentTarget { + return mgr.current + } + + return mgr.next +} diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 960e8697db648..adde42d805bff 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -199,8 +199,41 @@ func (s *Server) registerMetricsRequest() { return s.taskScheduler.GetTasksJSON(), nil } + QueryDistAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.targetMgr.GetTargetJSON(meta.CurrentTarget), nil + } + + QueryTargetAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.dist.GetDistributionJSON(), nil + } + + QueryReplicasAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.meta.GetReplicasJSON(), nil + } + + QueryResourceGroupsAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return s.meta.GetResourceGroupsJSON(), nil + } + + QuerySegmentDistAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return "", nil + } + + QueryChannelDistAction := func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return "", nil + } + + // register actions that requests are processed in querycoord s.metricsRequest.RegisterMetricsRequest(metricsinfo.SystemInfoMetrics, getSystemInfoAction) s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryCoordAllTasks, QueryTasksAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryDist, QueryDistAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryTarget, QueryTargetAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryReplicas, QueryReplicasAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryResourceGroups, QueryResourceGroupsAction) + + // register actions that requests are processed in querynode + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments, QuerySegmentDistAction) + s.metricsRequest.RegisterMetricsRequest(metricsinfo.QueryChannels, QueryChannelDistAction) log.Info("register metrics actions finished") } diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 44fae7020e06b..58ea5e1cd1bab 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -18,9 +18,12 @@ package querynodev2 import ( "context" + "encoding/json" "fmt" + "github.com/milvus-io/milvus/pkg/log" "github.com/samber/lo" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/querynodev2/collector" @@ -170,6 +173,54 @@ func getCollectionMetrics(node *QueryNode) (*metricsinfo.QueryNodeCollectionMetr return ret, nil } +// getPipelineJSON returns the JSON string of channels +func getPipelineJSON(node *QueryNode) string { + stats := node.pipelineManager.GetPipelineStats() + ret, err := json.Marshal(stats) + if err != nil { + log.Warn("failed to marshal channels", zap.Error(err)) + return "" + } + return string(ret) +} + +// getSegmentJSON returns the JSON string of segments +func getSegmentJSON(node *QueryNode) string { + allSegments := node.manager.Segment.GetBy() + var ms []*metricsinfo.Segment + for _, s := range allSegments { + indexes := make([]*metricsinfo.SegmentIndex, 0, len(s.Indexes())) + for _, index := range s.Indexes() { + indexes = append(indexes, &metricsinfo.SegmentIndex{ + IndexFieldID: index.IndexInfo.FieldID, + IndexID: index.IndexInfo.IndexID, + IndexSize: index.IndexInfo.IndexSize, + BuildID: index.IndexInfo.BuildID, + IsLoaded: index.IsLoaded, + }) + } + + ms = append(ms, &metricsinfo.Segment{ + SegmentID: s.ID(), + CollectionID: s.Collection(), + PartitionID: s.Partition(), + MemSize: s.MemSize(), + Index: indexes, + State: s.Type().String(), + ResourceGroup: s.ResourceGroup(), + InsertRowCount: s.InsertCount(), + NodeID: node.GetNodeID(), + }) + } + + ret, err := json.Marshal(ms) + if err != nil { + log.Warn("failed to marshal segments", zap.Error(err)) + return "" + } + return string(ret) +} + // getSystemInfoMetrics returns metrics info of QueryNode func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, node *QueryNode) (string, error) { usedMem := hardware.GetUsedMemoryCount() diff --git a/internal/querynodev2/metrics_info_test.go b/internal/querynodev2/metrics_info_test.go new file mode 100644 index 0000000000000..cb22f6fe78cb7 --- /dev/null +++ b/internal/querynodev2/metrics_info_test.go @@ -0,0 +1,129 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querynodev2 + +import ( + "context" + "encoding/json" + "testing" + + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querynodev2/delegator" + "github.com/milvus-io/milvus/internal/querynodev2/pipeline" + "github.com/milvus-io/milvus/internal/querynodev2/segments" + "github.com/milvus-io/milvus/internal/querynodev2/tsafe" + "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestGetPipelineJSON(t *testing.T) { + paramtable.Init() + + ch := "ch" + tSafeManager := tsafe.NewTSafeReplica() + tSafeManager.Add(context.Background(), ch, 0) + delegators := typeutil.NewConcurrentMap[string, delegator.ShardDelegator]() + d := delegator.NewMockShardDelegator(t) + delegators.Insert(ch, d) + msgDispatcher := msgdispatcher.NewMockClient(t) + + collectionManager := segments.NewMockCollectionManager(t) + segmentManager := segments.NewMockSegmentManager(t) + collectionManager.EXPECT().Get(mock.Anything).Return(&segments.Collection{}) + manager := &segments.Manager{ + Collection: collectionManager, + Segment: segmentManager, + } + + pipelineManager := pipeline.NewManager(manager, tSafeManager, msgDispatcher, delegators) + + _, err := pipelineManager.Add(1, ch) + assert.NoError(t, err) + assert.Equal(t, 1, pipelineManager.Num()) + + stats := pipelineManager.GetPipelineStats() + expectedStats := []*metricsinfo.Pipeline{ + { + Channel: ch, + Status: "Healthy", + TimeTick: 0, + }, + } + assert.Equal(t, expectedStats, stats) + + JSONStr := getPipelineJSON(&QueryNode{pipelineManager: pipelineManager}) + assert.NotEmpty(t, JSONStr) + + var actualStats []*metricsinfo.Pipeline + err = json.Unmarshal([]byte(JSONStr), &actualStats) + assert.NoError(t, err) + assert.Equal(t, expectedStats, actualStats) +} + +func TestGetSegmentJSON(t *testing.T) { + segment := segments.NewMockSegment(t) + segment.EXPECT().ID().Return(int64(1)) + segment.EXPECT().Collection().Return(int64(1001)) + segment.EXPECT().Partition().Return(int64(2001)) + segment.EXPECT().MemSize().Return(int64(1024)) + segment.EXPECT().Indexes().Return([]*segments.IndexedFieldInfo{ + { + IndexInfo: &querypb.FieldIndexInfo{ + FieldID: 1, + IndexID: 101, + IndexSize: 512, + BuildID: 10001, + }, + IsLoaded: true, + }, + }) + segment.EXPECT().Type().Return(segments.SegmentTypeGrowing) + segment.EXPECT().ResourceGroup().Return("default") + segment.EXPECT().InsertCount().Return(int64(100)) + + node := &QueryNode{} + mockedSegmentManager := segments.NewMockSegmentManager(t) + mockedSegmentManager.EXPECT().GetBy().Return([]segments.Segment{segment}) + node.manager = &segments.Manager{Segment: mockedSegmentManager} + + jsonStr := getSegmentJSON(node) + assert.NotEmpty(t, jsonStr) + + var segments []*metricsinfo.Segment + err := json.Unmarshal([]byte(jsonStr), &segments) + assert.NoError(t, err) + assert.NotNil(t, segments) + assert.Equal(t, 1, len(segments)) + assert.Equal(t, int64(1), segments[0].SegmentID) + assert.Equal(t, int64(1001), segments[0].CollectionID) + assert.Equal(t, int64(2001), segments[0].PartitionID) + assert.Equal(t, int64(1024), segments[0].MemSize) + assert.Equal(t, 1, len(segments[0].Index)) + assert.Equal(t, int64(1), segments[0].Index[0].IndexFieldID) + assert.Equal(t, int64(101), segments[0].Index[0].IndexID) + assert.Equal(t, int64(512), segments[0].Index[0].IndexSize) + assert.Equal(t, int64(10001), segments[0].Index[0].BuildID) + assert.True(t, segments[0].Index[0].IsLoaded) + assert.Equal(t, "Growing", segments[0].State) + assert.Equal(t, "default", segments[0].ResourceGroup) + assert.Equal(t, int64(100), segments[0].InsertRowCount) + assert.Equal(t, node.GetNodeID(), segments[0].NodeID) +} diff --git a/internal/querynodev2/pipeline/manager.go b/internal/querynodev2/pipeline/manager.go index 453c9638430f7..d7d1f54507004 100644 --- a/internal/querynodev2/pipeline/manager.go +++ b/internal/querynodev2/pipeline/manager.go @@ -20,6 +20,7 @@ import ( "fmt" "sync" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/querynodev2/delegator" @@ -40,6 +41,7 @@ type Manager interface { Remove(channels ...string) Start(channels ...string) error Close() + GetPipelineStats() []*metricsinfo.Pipeline } type manager struct { @@ -155,6 +157,26 @@ func (m *manager) Close() { } } +func (m *manager) GetPipelineStats() []*metricsinfo.Pipeline { + m.mu.RLock() + defer m.mu.RUnlock() + + ret := make([]*metricsinfo.Pipeline, 0, len(m.channel2Pipeline)) + for ch, p := range m.channel2Pipeline { + tt, err := m.tSafeManager.Get(ch) + if err != nil { + log.Warn("get tSafe failed", zap.String("channel", ch), zap.Error(err)) + } + ret = append(ret, &metricsinfo.Pipeline{ + Channel: ch, + Status: p.Status(), + TimeTick: int64(tt), + NodeID: paramtable.GetNodeID(), + }) + } + return ret +} + func NewManager(dataManager *DataManager, tSafeManager TSafeManager, dispatcher msgdispatcher.Client, diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index c24b59bf66fa8..c4e99185d6458 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -283,6 +283,16 @@ func (node *QueryNode) registerMetricsRequest() { func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { return getSystemInfoMetrics(ctx, req, node) }) + + node.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return getSegmentJSON(node), nil + }) + + node.metricsRequest.RegisterMetricsRequest(metricsinfo.QuerySegments, + func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) { + return getPipelineJSON(node), nil + }) log.Info("register metrics actions finished") } diff --git a/internal/util/metrics/utils.go b/internal/util/metrics/utils.go index 4c5b1fc230f61..0e965b88540f5 100644 --- a/internal/util/metrics/utils.go +++ b/internal/util/metrics/utils.go @@ -1,46 +1,34 @@ package metrics import ( - "github.com/samber/lo" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/metricsinfo" ) -func PruneFieldIndexInfo(f *querypb.FieldIndexInfo) *querypb.FieldIndexInfo { - return &querypb.FieldIndexInfo{ - FieldID: f.FieldID, - IndexID: f.IndexID, - BuildID: f.BuildID, - IndexSize: f.IndexSize, - NumRows: f.NumRows, - } -} - -func PruneSegmentInfo(s *datapb.SegmentInfo) *datapb.SegmentInfo { - return &datapb.SegmentInfo{ - ID: s.ID, - NumOfRows: s.NumOfRows, - State: s.State, - Compacted: s.Compacted, - Level: s.Level, +func NewSegmentFrom(segment *datapb.SegmentInfo) *metricsinfo.Segment { + return &metricsinfo.Segment{ + SegmentID: segment.GetID(), + CollectionID: segment.GetCollectionID(), + PartitionID: segment.GetPartitionID(), + InsertChannel: segment.GetInsertChannel(), + NumOfRows: segment.GetNumOfRows(), + State: segment.GetState().String(), + IsImporting: segment.GetIsImporting(), + Compacted: segment.GetCompacted(), + Level: segment.GetLevel().String(), + IsSorted: segment.GetIsSorted(), + IsInvisible: segment.GetIsInvisible(), } } -func PruneVChannelInfo(channel *datapb.VchannelInfo) *datapb.VchannelInfo { - return &datapb.VchannelInfo{ - ChannelName: channel.ChannelName, - UnflushedSegments: lo.Map(channel.UnflushedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo { - return PruneSegmentInfo(s) - }), - FlushedSegments: lo.Map(channel.FlushedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo { - return PruneSegmentInfo(s) - }), - DroppedSegments: lo.Map(channel.DroppedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo { - return PruneSegmentInfo(s) - }), - IndexedSegments: lo.Map(channel.IndexedSegments, func(s *datapb.SegmentInfo, i int) *datapb.SegmentInfo { - return PruneSegmentInfo(s) - }), +func NewDMChannelFrom(channel *datapb.VchannelInfo) *metricsinfo.DmChannel { + return &metricsinfo.DmChannel{ + CollectionID: channel.GetCollectionID(), + ChannelName: channel.GetChannelName(), + UnflushedSegmentIds: channel.GetUnflushedSegmentIds(), + FlushedSegmentIds: channel.GetFlushedSegmentIds(), + DroppedSegmentIds: channel.GetDroppedSegmentIds(), + LevelZeroSegmentIds: channel.GetLevelZeroSegmentIds(), + PartitionStatsVersions: channel.GetPartitionStatsVersions(), } } diff --git a/internal/util/pipeline/stream_pipeline.go b/internal/util/pipeline/stream_pipeline.go index 2765e11492605..277e618618cba 100644 --- a/internal/util/pipeline/stream_pipeline.go +++ b/internal/util/pipeline/stream_pipeline.go @@ -18,9 +18,11 @@ package pipeline import ( "context" + "fmt" "sync" "time" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -39,6 +41,7 @@ import ( type StreamPipeline interface { Pipeline ConsumeMsgStream(position *msgpb.MsgPosition) error + Status() string } type streamPipeline struct { @@ -52,10 +55,19 @@ type streamPipeline struct { closeCh chan struct{} // notify work to exit closeWg sync.WaitGroup closeOnce sync.Once + + // lastChecked is the last time the input node received a message + lastChecked *atomic.Time + // status is the status of the input node in the pipeline + status *atomic.String } func (p *streamPipeline) work() { defer p.closeWg.Done() + + ticker := time.NewTicker(p.pipeline.nodeTtInterval) + defer ticker.Stop() + for { select { case <-p.closeCh: @@ -65,10 +77,23 @@ func (p *streamPipeline) work() { log.RatedDebug(10, "stream pipeline fetch msg", zap.Int("sum", len(msg.Msgs))) p.pipeline.inputChannel <- msg p.pipeline.process() + p.lastChecked.Store(time.Now()) + case <-ticker.C: + diff := time.Since(p.lastChecked.Load()) + if diff > p.pipeline.nodeTtInterval { + msg := fmt.Sprintf("input node hasn't received any msg in the last %s", diff.String()) + p.status.Store(msg) + } else { + p.status.Store("Healthy") + } } } } +func (p *streamPipeline) Status() string { + return p.status.Load() +} + func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error { var err error if position == nil { @@ -150,10 +175,12 @@ func NewPipelineWithStream(dispatcher msgdispatcher.Client, nodeTtInterval time. nodeTtInterval: nodeTtInterval, enableTtChecker: enableTtChecker, }, - dispatcher: dispatcher, - vChannel: vChannel, - closeCh: make(chan struct{}), - closeWg: sync.WaitGroup{}, + dispatcher: dispatcher, + vChannel: vChannel, + closeCh: make(chan struct{}), + closeWg: sync.WaitGroup{}, + lastChecked: atomic.NewTime(time.Now()), + status: atomic.NewString("Healthy"), } return pipeline diff --git a/pkg/util/metricsinfo/metric_request.go b/pkg/util/metricsinfo/metric_request.go index 07dd4a6dec034..131bc26dbad3b 100644 --- a/pkg/util/metricsinfo/metric_request.go +++ b/pkg/util/metricsinfo/metric_request.go @@ -45,10 +45,16 @@ const ( MetricRequestParamsSeparator = "," // QuerySegmentDist request for segment distribution on the query node - QuerySegmentDist = "qc_segment_dist" + QuerySegments = "qn_segments" // QueryChannelDist request for channel distribution on the query node - QueryChannelDist = "qc_channel_dist" + QueryChannels = "qn_channelst" + + // QueryDist request for segment/channel/leader view distribution on querycoord + QueryDist = "qc_dist" + + // QueryTarget request for segment/channel target on the querycoord + QueryTarget = "qc_target" // QueryCoordAllTasks request for get tasks on the querycoord QueryCoordAllTasks = "qc_tasks_all" diff --git a/pkg/util/metricsinfo/metrics_info.go b/pkg/util/metricsinfo/metrics_info.go index a1bb87d2f872f..dfba1cd59ce6c 100644 --- a/pkg/util/metricsinfo/metrics_info.go +++ b/pkg/util/metricsinfo/metrics_info.go @@ -15,6 +15,7 @@ import ( "encoding/json" "time" + "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -70,6 +71,92 @@ const ( MilvusUsedGoVersion = "MILVUS_USED_GO_VERSION" ) +type DmChannel struct { + NodeID int64 `json:"node_id,omitempty"` + Version int64 `json:"version,omitempty"` + CollectionID int64 `json:"collection_id,omitempty"` + ChannelName string `json:"channel_name,omitempty"` + UnflushedSegmentIds []int64 `json:"unflushed_segment_ids,omitempty"` + FlushedSegmentIds []int64 `json:"flushed_segment_ids,omitempty"` + DroppedSegmentIds []int64 `json:"dropped_segment_ids,omitempty"` + LevelZeroSegmentIds []int64 `json:"level_zero_segment_ids,omitempty"` + PartitionStatsVersions map[int64]int64 `json:"partition_stats_versions,omitempty"` +} + +type Segment struct { + SegmentID int64 `json:"segment_id,omitempty"` + CollectionID int64 `json:"collection_id,omitempty"` + PartitionID int64 `json:"partition_id,omitempty"` + InsertChannel string `json:"insert_channel,omitempty"` + NumOfRows int64 `json:"num_of_rows,omitempty"` + State string `json:"state,omitempty"` + IsImporting bool `json:"is_importing,omitempty"` + Compacted bool `json:"compacted,omitempty"` + Level string `json:"level,omitempty"` + IsSorted bool `json:"is_sorted,omitempty"` + IsInvisible bool `json:"is_invisible,omitempty"` + NodeID int64 `json:"node_id,omitempty"` + LoadedTimestamp int64 `json:"loaded_timestamp,omitempty"` + Index []*SegmentIndex `json:"index,omitempty"` + ResourceGroup string `json:"resource_group,omitempty"` + InsertRowCount int64 `json:"insert_row_count,omitempty"` + MemSize int64 `json:"mem_size,omitempty"` +} + +type SegmentIndex struct { + IndexFieldID int64 `json:"field_id,omitempty"` + IndexID int64 `json:"index_id,omitempty"` + BuildID int64 `json:"build_id,omitempty"` + IndexSize int64 `json:"index_size,omitempty"` + IsLoaded bool `json:"is_loaded,omitempty"` +} + +type QueryCoordCollectionTarget struct { + CollectionID int64 `json:"collection_id,omitempty"` + Segments []*Segment `json:"segments,omitempty"` + DMChannels []*DmChannel `json:"dm_channels,omitempty"` +} + +type LeaderView struct { + LeaderID int64 `json:"leader_id"` + CollectionID int64 `json:"collection_id"` + Channel string `json:"channel"` + Version int64 `json:"version"` + SealedSegments []*Segment `json:"sealed_segments"` + GrowingSegments []*Segment `json:"growing_segments"` + TargetVersion int64 `json:"target_version"` + NumOfGrowingRows int64 `json:"num_of_growing_rows"` + UnServiceableError string `json:"unserviceable_error"` +} + +type QueryCoordCollectionDistribution struct { + Segments []*Segment `json:"segments,omitempty"` + DMChannels []*DmChannel `json:"dm_channels,omitempty"` + LeaderViews []*LeaderView `json:"leader_views,omitempty"` +} + +type ResourceGroup struct { + Name string `json:"name,omitempty"` + Nodes []int64 `json:"nodes,omitempty"` + Cfg *rgpb.ResourceGroupConfig `json:"cfg,omitempty"` +} + +type Replica struct { + ID int64 `json:"ID,omitempty"` + CollectionID int64 `json:"collectionID,omitempty"` + RWNodes []int64 `json:"rw_nodes,omitempty"` + ResourceGroup string `json:"resource_group,omitempty"` + RONodes []int64 `json:"ro_nodes,omitempty"` + ChannelToRWNodes map[string][]int64 `json:"channel_to_rw_nodes,omitempty"` +} + +type Pipeline struct { + Channel string `json:"channel,omitempty"` + Status string `json:"status,omitempty"` + TimeTick int64 `json:"time_tick,omitempty"` + NodeID int64 `json:"node_id,omitempty"` +} + // DeployMetrics records the deploy information of nodes. type DeployMetrics struct { SystemVersion string `json:"system_version"`