Skip to content

Commit

Permalink
feat: add segment,pipeline, replica and resourcegroup api for WebUI
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Oct 31, 2024
1 parent cdee149 commit fb350cd
Show file tree
Hide file tree
Showing 42 changed files with 1,550 additions and 91 deletions.
9 changes: 5 additions & 4 deletions internal/datacoord/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoor
return ret
}

// GetSyncTaskMetrics retrieves and aggregates the sync task metrics of the datanode.
func (s *Server) GetSyncTaskMetrics(
// getSliceMetrics retrieves and aggregates the metrics of the datanode to a slice and complete marshal.
func getSliceMetrics[T any](
s *Server,
ctx context.Context,
req *milvuspb.GetMetricsRequest,
) (string, error) {
Expand All @@ -92,10 +93,10 @@ func (s *Server) GetSyncTaskMetrics(
return "", err
}

tasks := make(map[string][]*metricsinfo.SyncTask, resp.Len())
tasks := make(map[string][]T, resp.Len())
resp.Range(func(key string, value *milvuspb.GetMetricsResponse) bool {
if value.Response != "" {
var sts []*metricsinfo.SyncTask
var sts []T
if err1 := json.Unmarshal([]byte(value.Response), &sts); err1 != nil {
log.Warn("failed to unmarshal sync task metrics")
err = err1
Expand Down
15 changes: 14 additions & 1 deletion internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,8 +1154,21 @@ func (s *Server) registerMetricsRequest() {

s.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return s.GetSyncTaskMetrics(ctx, req)
return getSliceMetrics[*metricsinfo.SyncTask](s, ctx, req)
})

s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return getSliceMetrics[*metricsinfo.Segment](s, ctx, req)
})

// TODO merge channel manager information into return value
s.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return getSliceMetrics[*metricsinfo.Channel](s, ctx, req)
})

// TODO add checkpoint metrics
log.Info("register metrics actions finished")
}

Expand Down
11 changes: 11 additions & 0 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,21 @@ func (node *DataNode) registerMetricsRequest() {
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return node.getSystemInfoMetrics(ctx, req)
})

node.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTasks,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return node.syncMgr.TaskStatsJSON(), nil
})

node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataSegments,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return node.flowgraphManager.GetSegmentsJSON(), nil
})

node.metricsRequest.RegisterMetricsRequest(metricsinfo.DataChannels,
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
return node.flowgraphManager.GetChannelsJSON(), nil
})
log.Info("register metrics actions finished")
}

Expand Down
3 changes: 2 additions & 1 deletion internal/datanode/importv2/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/stretchr/testify/assert"
)

func TestResizePools(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions internal/flushcommon/metacache/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ func (s *SegmentInfo) Level() datapb.SegmentLevel {
return s.level
}

func (s *SegmentInfo) BufferRows() int64 {
return s.bufferRows
}

func (s *SegmentInfo) SyncingRows() int64 {
return s.syncingRows
}

func (s *SegmentInfo) Clone() *SegmentInfo {
return &SegmentInfo{
segmentID: s.segmentID,
Expand Down
57 changes: 57 additions & 0 deletions internal/flushcommon/pipeline/flow_graph_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package pipeline

import (
"context"
"encoding/json"
"fmt"

"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -40,6 +42,8 @@ type FlowgraphManager interface {
GetFlowgraphCount() int
GetCollectionIDs() []int64

GetChannelsJSON() string
GetSegmentsJSON() string
Close()
}

Expand Down Expand Up @@ -115,6 +119,59 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 {
return collectionSet.Collect()
}

// GetChannelsJSON returns all channels in json format.
func (fm *fgManagerImpl) GetChannelsJSON() string {
var channels []*metricsinfo.Channel
fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool {
latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch)
channels = append(channels, &metricsinfo.Channel{
Name: ch,
WatchState: ds.fg.Status(),
TimeTick: int64(latestTimeTick),
NodeID: paramtable.GetNodeID(),
CollectionID: ds.metacache.Collection(),
})
return true
})

ret, err := json.Marshal(channels)
if err != nil {
log.Warn("failed to marshal channels", zap.Error(err))
return ""

}
return string(ret)
}

func (fm *fgManagerImpl) GetSegmentsJSON() string {
var segments []*metricsinfo.Segment
fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool {
meta := ds.metacache
for _, segment := range meta.GetSegmentsBy() {
segments = append(segments, &metricsinfo.Segment{
SegmentID: segment.SegmentID(),
CollectionID: meta.Collection(),
PartitionID: segment.PartitionID(),
State: segment.State().String(),
Level: segment.Level().String(),
NodeID: paramtable.GetNodeID(),
NumOfRows: segment.NumOfRows(),
FlushedRows: segment.FlushedRows(),
SyncBufferRows: segment.BufferRows(),
SyncingRows: segment.SyncingRows(),
})
}
return true
})

ret, err := json.Marshal(segments)
if err != nil {
log.Warn("failed to marshal segments", zap.Error(err))
return ""
}
return string(ret)
}

func (fm *fgManagerImpl) Close() {
fm.cancelFunc()
}
82 changes: 80 additions & 2 deletions internal/flushcommon/pipeline/flow_graph_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package pipeline
import (
"context"
"fmt"
"math/rand"
"os"
"testing"

Expand All @@ -30,6 +29,8 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/broker"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

func TestMain(t *testing.M) {
Expand Down Expand Up @@ -98,7 +100,7 @@ func TestFlowGraphManager(t *testing.T) {
}

func generateChannelWatchInfo() *datapb.ChannelWatchInfo {
collectionID := int64(rand.Uint32())
collectionID := int64(1)
dmChannelName := fmt.Sprintf("%s_%d", "fake-ch-", collectionID)
schema := &schemapb.CollectionSchema{
Name: fmt.Sprintf("%s_%d", "collection_", collectionID),
Expand All @@ -124,3 +126,79 @@ func generateChannelWatchInfo() *datapb.ChannelWatchInfo {
Schema: schema,
}
}

type mockTimeSender struct{}

func (m *mockTimeSender) Update(channel string, ts typeutil.Timestamp, stats []*commonpb.SegmentStats) {
panic("implement me")
}

func (m *mockTimeSender) GetLatestTimestamp(channel string) typeutil.Timestamp {
return 0
}

func newFlowGraphManager(t *testing.T) (string, FlowgraphManager) {
mockBroker := broker.NewMockBroker(t)
mockBroker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
mockBroker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
mockBroker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe()

wbm := writebuffer.NewMockBufferManager(t)
wbm.EXPECT().Register(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

dispClient := msgdispatcher.NewMockClient(t)
dispClient.EXPECT().Register(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(make(chan *msgstream.MsgPack), nil)

pipelineParams := &util.PipelineParams{
Ctx: context.TODO(),
Session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 0}},
Broker: mockBroker,
TimeTickSender: &mockTimeSender{},
DispClient: dispClient,
WriteBufferManager: wbm,
}

chanWatchInfo := generateChannelWatchInfo()
ds, err := NewDataSyncService(
context.TODO(),
pipelineParams,
chanWatchInfo,
util.NewTickler(),
)
assert.NoError(t, err)

fm := NewFlowgraphManager()
fm.AddFlowgraph(ds)
return ds.vchannelName, fm
}

func TestGetChannelsJSON(t *testing.T) {
paramtable.SetNodeID(1)
_, fm := newFlowGraphManager(t)
jsonResult := fm.GetChannelsJSON()
expectedJSON := `[{"name":"fake-ch-_1","watched_status":"Healthy","node_id":1,"collection_id":1}]`
assert.JSONEq(t, expectedJSON, jsonResult)
}

func TestGetSegmentJSON(t *testing.T) {
ch, fm := newFlowGraphManager(t)
ds, ok := fm.GetFlowgraphService(ch)
assert.True(t, ok)
pkStatsFactory := func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
}
segment := &datapb.SegmentInfo{
ID: 1,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 10240,
CollectionID: 100,
InsertChannel: "ch",
}
ds.metacache.AddSegment(segment, pkStatsFactory, metacache.NoneBm25StatsFactory)
jsonResult := fm.GetSegmentsJSON()
fmt.Println(jsonResult)
expectedJSON := `[{"segment_id":1,"collection_id":1,"partition_id":10,"num_of_rows":10240,"state":"Flushed","level":"L1","node_id":1,"flushed_rows":10240}]`
assert.JSONEq(t, expectedJSON, jsonResult)
}
Loading

0 comments on commit fb350cd

Please sign in to comment.