Skip to content

Commit

Permalink
feat: add segment,pipeline, replica and resourcegroup api for WebUI
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Nov 4, 2024
1 parent 0c4321c commit 02eacab
Show file tree
Hide file tree
Showing 57 changed files with 2,728 additions and 246 deletions.
19 changes: 19 additions & 0 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -53,6 +54,8 @@ type ChannelManager interface {
GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string
GetChannelsByCollectionID(collectionID int64) []RWChannel
GetChannelNamesByCollectionID(collectionID int64) []string

GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo
}

// An interface sessionManager implments
Expand Down Expand Up @@ -730,6 +733,22 @@ func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error {
return nil
}

func (m *ChannelManagerImpl) GetChannelWatchInfos() map[int64]map[string]*datapb.ChannelWatchInfo {
m.mu.RLock()
defer m.mu.RUnlock()
infos := make(map[int64]map[string]*datapb.ChannelWatchInfo)
for _, nc := range m.store.GetNodesChannels() {
for _, ch := range nc.Channels {
watchInfo := proto.Clone(ch.GetWatchInfo()).(*datapb.ChannelWatchInfo)
if _, ok := infos[nc.NodeID]; !ok {
infos[nc.NodeID] = make(map[string]*datapb.ChannelWatchInfo)
}
infos[nc.NodeID][watchInfo.Vchan.ChannelName] = watchInfo
}
}
return infos
}

func inferStateByOpType(opType ChannelOpType) datapb.ChannelWatchState {
switch opType {
case Watch:
Expand Down
48 changes: 48 additions & 0 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,3 +805,51 @@ func (s *ChannelManagerSuite) TestStartupRootCoordFailed() {

func (s *ChannelManagerSuite) TestCheckLoop() {}
func (s *ChannelManagerSuite) TestGet() {}

func (s *ChannelManagerSuite) TestGetChannelWatchInfos() {
store := NewMockRWChannelStore(s.T())
store.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
{
NodeID: 1,
Channels: map[string]RWChannel{
"ch1": &channelMeta{
WatchInfo: &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
ChannelName: "ch1",
},
StartTs: 100,
State: datapb.ChannelWatchState_ToWatch,
OpID: 1,
},
},
},
},
{
NodeID: 2,
Channels: map[string]RWChannel{
"ch2": &channelMeta{
WatchInfo: &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
ChannelName: "ch2",
},
StartTs: 10,
State: datapb.ChannelWatchState_WatchSuccess,
OpID: 1,
},
},
},
},
})

cm := &ChannelManagerImpl{store: store}
infos := cm.GetChannelWatchInfos()
s.Equal(2, len(infos))
s.Equal("ch1", infos[1]["ch1"].GetVchan().ChannelName)
s.Equal("ch2", infos[2]["ch2"].GetVchan().ChannelName)

// test empty value
store.EXPECT().GetNodesChannels().Unset()
store.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{})
infos = cm.GetChannelWatchInfos()
s.Equal(0, len(infos))
}
2 changes: 1 addition & 1 deletion internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
Expand Down
25 changes: 25 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2025,3 +2025,28 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats

return metricMutation, nil
}

func (m *meta) getSegmentsMetrics() []*metricsinfo.Segment {
m.RLock()
defer m.RUnlock()

segments := make([]*metricsinfo.Segment, 0, len(m.segments.segments))
for _, s := range m.segments.segments {
segments = append(segments, &metricsinfo.Segment{
SegmentID: s.ID,
CollectionID: s.CollectionID,
PartitionID: s.PartitionID,
Channel: s.InsertChannel,
NumOfRows: s.NumOfRows,
State: s.State.String(),
MemSize: s.size.Load(),
Level: s.Level.String(),
IsImporting: s.IsImporting,
Compacted: s.Compacted,
IsSorted: s.IsSorted,
NodeID: paramtable.GetNodeID(),
})
}

return segments
}
61 changes: 61 additions & 0 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/testutils"
)
Expand Down Expand Up @@ -1246,3 +1248,62 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) {
assert.NotNil(t, c)
})
}

func TestMeta_GetSegmentsJSON(t *testing.T) {
// Create a mock meta object
m := &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "channel1",
NumOfRows: 100,
State: commonpb.SegmentState_Growing,
MaxRowNum: 1000,
Compacted: false,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 2,
InsertChannel: "channel2",
NumOfRows: 200,
State: commonpb.SegmentState_Sealed,
MaxRowNum: 2000,
Compacted: true,
},
},
},
},
}

segments := m.getSegmentsMetrics()

// Check the length of the segments
assert.Equal(t, 2, len(segments))

slices.SortFunc(segments, func(i, j *metricsinfo.Segment) int { return int(i.SegmentID - j.SegmentID) })

// Check the first segment
assert.Equal(t, int64(1), segments[0].SegmentID)
assert.Equal(t, int64(1), segments[0].CollectionID)
assert.Equal(t, int64(1), segments[0].PartitionID)
assert.Equal(t, "channel1", segments[0].Channel)
assert.Equal(t, int64(100), segments[0].NumOfRows)
assert.Equal(t, "Growing", segments[0].State)
assert.False(t, segments[0].Compacted)

// Check the second segment
assert.Equal(t, int64(2), segments[1].SegmentID)
assert.Equal(t, int64(2), segments[1].CollectionID)
assert.Equal(t, int64(2), segments[1].PartitionID)
assert.Equal(t, "channel2", segments[1].Channel)
assert.Equal(t, int64(200), segments[1].NumOfRows)
assert.Equal(t, "Sealed", segments[1].State)
assert.True(t, segments[1].Compacted)
}
Loading

0 comments on commit 02eacab

Please sign in to comment.