Skip to content

Commit

Permalink
feat: add segment,pipeline, replica and resourcegroup api for WebUI
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Oct 31, 2024
1 parent cdee149 commit f80ffd6
Show file tree
Hide file tree
Showing 24 changed files with 987 additions and 48 deletions.
32 changes: 32 additions & 0 deletions internal/querycoordv2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package querycoordv2

import (
"context"
"encoding/json"
"sync"
"time"

"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/samber/lo"
"go.uber.org/zap"

Expand Down Expand Up @@ -242,6 +244,36 @@ func (s *Server) balanceChannels(ctx context.Context,
return nil
}

// TODO(dragondriver): add more detail metrics
func (s *Server) getSegmentsFromQueryNode[T](
ctx context.Context,
req *milvuspb.GetMetricsRequest,
) (string, error) {
segments := make([]*metricsinfo.Segment, 0)
for _, node := range s.nodeMgr.GetAll() {
resp, err := s.cluster.GetMetrics(ctx, node.ID(), req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to get metric from QueryNode",
zap.Int64("nodeID", node.ID()))
continue
}

infos := make([]*metricsinfo.Segment, 0)
err = json.Unmarshal([]byte(resp.Response), &infos)
if err != nil {
log.Warn("invalid metrics of query node was found",
zap.Error(err))
continue
}
segments = append(segments, infos...)
}
ret, err := json.Marshal(segments)
if err != nil {
return "", err
}
return string(ret), nil
}

// TODO(dragondriver): add more detail metrics
func (s *Server) getSystemInfoMetrics(
ctx context.Context,
Expand Down
22 changes: 22 additions & 0 deletions internal/querycoordv2/meta/channel_dist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package meta
import (
"sync"

"github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/samber/lo"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -130,6 +132,13 @@ func (channel *DmChannel) Clone() *DmChannel {
}
}

func newDmChannelMetricsFrom(channel *DmChannel) *metricsinfo.DmChannel {
dmChannel := metrics.NewDMChannelFrom(channel.VchannelInfo)
dmChannel.NodeID = channel.Node
dmChannel.Version = channel.Version
return dmChannel
}

type nodeChannels struct {
channels []*DmChannel
// collection id => channels
Expand Down Expand Up @@ -290,3 +299,16 @@ func (m *ChannelDistManager) updateCollectionIndex() {
}
}
}

func (m *ChannelDistManager) GetChannelDist() []*metricsinfo.DmChannel {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()

var channels []*metricsinfo.DmChannel
for _, nodeChannels := range m.channels {
for _, channel := range nodeChannels.channels {
channels = append(channels, newDmChannelMetricsFrom(channel))
}
}
return channels
}
30 changes: 30 additions & 0 deletions internal/querycoordv2/meta/channel_dist_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package meta
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -185,3 +186,32 @@ func (suite *ChannelDistManagerSuite) AssertCollection(channels []*DmChannel, co
func TestChannelDistManager(t *testing.T) {
suite.Run(t, new(ChannelDistManagerSuite))
}

func TestGetChannelDistJSON(t *testing.T) {
manager := NewChannelDistManager()
channel1 := DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: 100,
ChannelName: "channel-1",
})
channel1.Node = 1
channel1.Version = 1

channel2 := DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: 200,
ChannelName: "channel-2",
})
channel2.Node = 2
channel2.Version = 1

manager.Update(1, channel1)
manager.Update(2, channel2)

channels := manager.GetChannelDist()
assert.Equal(t, int64(1), channels[0].NodeID)
assert.Equal(t, "channel-1", channels[0].ChannelName)
assert.Equal(t, int64(100), channels[0].CollectionID)

assert.Equal(t, int64(2), channels[1].NodeID)
assert.Equal(t, "channel-2", channels[1].ChannelName)
assert.Equal(t, int64(200), channels[1].CollectionID)
}
35 changes: 35 additions & 0 deletions internal/querycoordv2/meta/dist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@

package meta

import (
"encoding/json"

"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"go.uber.org/zap"
)

type DistributionManager struct {
*SegmentDistManager
*ChannelDistManager
Expand All @@ -29,3 +37,30 @@ func NewDistributionManager() *DistributionManager {
LeaderViewManager: NewLeaderViewManager(),
}
}

// GetDistributionJSON returns a JSON representation of the current distribution state.
// It includes segments, DM channels, and leader views.
// If there are no segments, channels, or leader views, it returns an empty string.
// In case of an error during JSON marshaling, it returns the error.
func (dm *DistributionManager) GetDistributionJSON() string {
segments := dm.GetSegmentDist()
channels := dm.GetChannelDist()
leaderView := dm.GetLeaderView()

if len(segments) == 0 && len(channels) == 0 && len(leaderView) == 0 {
return ""
}

dist := &metricsinfo.QueryCoordCollectionDistribution{
Segments: segments,
DMChannels: channels,
LeaderViews: leaderView,
}

v, err := json.Marshal(dist)
if err != nil {
log.Warn("failed to marshal dist", zap.Error(err))
return ""
}
return string(v)
}
99 changes: 99 additions & 0 deletions internal/querycoordv2/meta/dist_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package meta

import (
"encoding/json"
"testing"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/stretchr/testify/assert"
)

func TestGetDistributionJSON(t *testing.T) {
// Initialize DistributionManager
manager := NewDistributionManager()

// Add some segments to the SegmentDistManager
segment1 := SegmentFromInfo(&datapb.SegmentInfo{
ID: 1,
CollectionID: 100,
PartitionID: 10,
InsertChannel: "channel-1",
NumOfRows: 1000,
State: commonpb.SegmentState_Flushed,
})
segment1.Node = 1
segment1.Version = 1

segment2 := SegmentFromInfo(&datapb.SegmentInfo{
ID: 2,
CollectionID: 200,
PartitionID: 20,
InsertChannel: "channel-2",
NumOfRows: 2000,
State: commonpb.SegmentState_Flushed,
})
segment2.Node = 2
segment2.Version = 1

manager.SegmentDistManager.Update(1, segment1)
manager.SegmentDistManager.Update(2, segment2)

// Add some channels to the ChannelDistManager
channel1 := DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: 100,
ChannelName: "channel-1",
})
channel1.Node = 1
channel1.Version = 1

channel2 := DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: 200,
ChannelName: "channel-2",
})
channel2.Node = 2
channel2.Version = 1

manager.ChannelDistManager.Update(1, channel1)
manager.ChannelDistManager.Update(2, channel2)

// Add some leader views to the LeaderViewManager
leaderView1 := &LeaderView{
ID: 1,
CollectionID: 100,
Channel: "channel-1",
Version: 1,
Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1}},
}

leaderView2 := &LeaderView{
ID: 2,
CollectionID: 200,
Channel: "channel-2",
Version: 1,
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
}

manager.LeaderViewManager.Update(1, leaderView1)
manager.LeaderViewManager.Update(2, leaderView2)

// Call GetDistributionJSON
jsonOutput := manager.GetDistributionJSON()

// Verify JSON output
var dist metricsinfo.QueryCoordCollectionDistribution
err := json.Unmarshal([]byte(jsonOutput), &dist)
assert.NoError(t, err)
assert.Len(t, dist.Segments, 2)
assert.Len(t, dist.DMChannels, 2)
assert.Len(t, dist.LeaderViews, 2)

assert.Equal(t, int64(1), dist.Segments[0].SegmentID)
assert.Equal(t, int64(2), dist.Segments[1].SegmentID)
assert.Equal(t, "channel-1", dist.DMChannels[0].ChannelName)
assert.Equal(t, "channel-2", dist.DMChannels[1].ChannelName)
assert.Equal(t, int64(1), dist.LeaderViews[0].LeaderID)
assert.Equal(t, int64(2), dist.LeaderViews[1].LeaderID)
}
44 changes: 44 additions & 0 deletions internal/querycoordv2/meta/leader_view_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package meta
import (
"sync"

"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/samber/lo"

"github.com/milvus-io/milvus/internal/proto/querypb"
Expand Down Expand Up @@ -308,3 +309,46 @@ func (mgr *LeaderViewManager) GetLatestShardLeaderByFilter(filters ...LeaderView
return v1.Version > v2.Version
})
}

// GetLeaderView returns a slice of LeaderView objects, each representing the state of a leader node.
// It traverses the views map, converts each LeaderView to a metricsinfo.LeaderView, and collects them into a slice.
// The method locks the views map for reading to ensure thread safety.
func (mgr *LeaderViewManager) GetLeaderView() []*metricsinfo.LeaderView {
mgr.rwmutex.RLock()
defer mgr.rwmutex.RUnlock()

var leaderViews []*metricsinfo.LeaderView
for _, nodeViews := range mgr.views {
for _, lv := range nodeViews.views {
errString := ""
if lv.UnServiceableError != nil {
errString = lv.UnServiceableError.Error()
}
leaderView := &metricsinfo.LeaderView{
LeaderID: lv.ID,
CollectionID: lv.CollectionID,
Channel: lv.Channel,
Version: lv.Version,
SealedSegments: make([]*metricsinfo.Segment, 0, len(lv.Segments)),
GrowingSegments: make([]*metricsinfo.Segment, 0, len(lv.GrowingSegments)),
TargetVersion: lv.TargetVersion,
NumOfGrowingRows: lv.NumOfGrowingRows,
UnServiceableError: errString,
}

for segID, seg := range lv.Segments {
leaderView.SealedSegments = append(leaderView.SealedSegments, &metricsinfo.Segment{
SegmentID: segID,
Node: seg.NodeID,
})
}

for _, seg := range lv.GrowingSegments {
leaderView.GrowingSegments = append(leaderView.GrowingSegments, newSegmentMetricsFrom(seg))
}

leaderViews = append(leaderViews, leaderView)
}
}
return leaderViews
}
Loading

0 comments on commit f80ffd6

Please sign in to comment.