Skip to content

Commit

Permalink
feat: add tasks page into management WebUI (#37002)
Browse files Browse the repository at this point in the history
issue: #36621

1. Add API to access task runtime metrics, including:
  - build index task
  - compaction task
  - import task
- balance (including load/release of segments/channels and some leader
tasks on querycoord)
  - sync task
2. Add a debug model to the webpage by using debug=true or debug=false
in the URL query parameters to enable or disable debug mode.

Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 authored Oct 28, 2024
1 parent d7b2ffe commit 9d16b97
Show file tree
Hide file tree
Showing 84 changed files with 2,881 additions and 1,153 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/bytedance/sonic v1.12.2
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cockroachdb/redact v1.1.3
github.com/goccy/go-json v0.10.3
github.com/greatroar/blobloom v0.0.0-00010101000000-000000000000
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jolestar/go-commons-pool/v2 v2.1.2
Expand Down Expand Up @@ -131,7 +132,6 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_task_l0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *L0CompactionTaskSuite) SetupTest() {
s.mockMeta = NewMockCompactionMeta(s.T())
s.mockSessMgr = session.NewMockDataNodeManager(s.T())
s.mockAlloc = allocator.NewMockAllocator(s.T())
//s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
// s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
}

func (s *L0CompactionTaskSuite) SetupSubTest() {
Expand Down
41 changes: 38 additions & 3 deletions internal/datacoord/compaction_task_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,43 @@ package datacoord

import (
"context"
"encoding/json"
"sync"
"time"

"github.com/hashicorp/golang-lru/v2/expirable"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)

func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.CompactionTask {
return &metricsinfo.CompactionTask{
PlanID: task.PlanID,
CollectionID: task.CollectionID,
Type: task.Type.String(),
State: task.State.String(),
FailReason: task.FailReason,
StartTime: task.StartTime,
EndTime: task.EndTime,
TotalRows: task.TotalRows,
InputSegments: task.InputSegments,
ResultSegments: task.ResultSegments,
}
}

type compactionTaskMeta struct {
sync.RWMutex
ctx context.Context
catalog metastore.DataCoordCatalog
// currently only clustering compaction task is stored in persist meta
compactionTasks map[int64]map[int64]*datapb.CompactionTask // triggerID -> planID
taskStats *expirable.LRU[UniqueID, *metricsinfo.CompactionTask]
}

func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatalog) (*compactionTaskMeta, error) {
Expand All @@ -43,6 +63,7 @@ func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatal
ctx: ctx,
catalog: catalog,
compactionTasks: make(map[int64]map[int64]*datapb.CompactionTask, 0),
taskStats: expirable.NewLRU[UniqueID, *metricsinfo.CompactionTask](1024, nil, time.Minute*60),
}
if err := csm.reloadFromKV(); err != nil {
return nil, err
Expand Down Expand Up @@ -125,16 +146,17 @@ func (csm *compactionTaskMeta) SaveCompactionTask(task *datapb.CompactionTask) e
log.Error("meta update: update compaction task fail", zap.Error(err))
return err
}
return csm.saveCompactionTaskMemory(task)
csm.saveCompactionTaskMemory(task)
return nil
}

func (csm *compactionTaskMeta) saveCompactionTaskMemory(task *datapb.CompactionTask) error {
func (csm *compactionTaskMeta) saveCompactionTaskMemory(task *datapb.CompactionTask) {
_, triggerIDExist := csm.compactionTasks[task.TriggerID]
if !triggerIDExist {
csm.compactionTasks[task.TriggerID] = make(map[int64]*datapb.CompactionTask, 0)
}
csm.compactionTasks[task.TriggerID][task.PlanID] = task
return nil
csm.taskStats.Add(task.PlanID, newCompactionTaskStats(task))
}

func (csm *compactionTaskMeta) DropCompactionTask(task *datapb.CompactionTask) error {
Expand All @@ -153,3 +175,16 @@ func (csm *compactionTaskMeta) DropCompactionTask(task *datapb.CompactionTask) e
}
return nil
}

func (csm *compactionTaskMeta) TaskStatsJSON() string {
tasks := csm.taskStats.Values()
if len(tasks) == 0 {
return ""
}

ret, err := json.Marshal(tasks)
if err != nil {
return ""
}
return string(ret)
}
49 changes: 49 additions & 0 deletions internal/datacoord/compaction_task_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ package datacoord

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
)

func TestCompactionTaskMetaSuite(t *testing.T) {
Expand Down Expand Up @@ -79,3 +82,49 @@ func (suite *CompactionTaskMetaSuite) TestGetCompactionTasksByCollectionAbnormal
res := suite.meta.GetCompactionTasksByCollection(101)
suite.Equal(1, len(res))
}

func (suite *CompactionTaskMetaSuite) TestTaskStatsJSON() {
task1 := &datapb.CompactionTask{
PlanID: 1,
CollectionID: 100,
Type: datapb.CompactionType_MergeCompaction,
State: datapb.CompactionTaskState_completed,
FailReason: "",
StartTime: time.Now().Unix(),
EndTime: time.Now().Add(time.Hour).Unix(),
TotalRows: 1000,
InputSegments: []int64{1, 2},
ResultSegments: []int64{3},
}
task2 := &datapb.CompactionTask{
PlanID: 2,
CollectionID: 101,
Type: datapb.CompactionType_MergeCompaction,
State: datapb.CompactionTaskState_completed,
FailReason: "",
StartTime: time.Now().Unix(),
EndTime: time.Now().Add(time.Hour).Unix(),
TotalRows: 2000,
InputSegments: []int64{4, 5},
ResultSegments: []int64{6},
}

// testing return empty string
actualJSON := suite.meta.TaskStatsJSON()
suite.Equal("", actualJSON)

err := suite.meta.SaveCompactionTask(task1)
suite.NoError(err)
err = suite.meta.SaveCompactionTask(task2)
suite.NoError(err)

expectedTasks := []*metricsinfo.CompactionTask{
newCompactionTaskStats(task1),
newCompactionTaskStats(task2),
}
expectedJSON, err := json.Marshal(expectedTasks)
suite.NoError(err)

actualJSON = suite.meta.TaskStatsJSON()
suite.JSONEq(string(expectedJSON), actualJSON)
}
147 changes: 80 additions & 67 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
plans []*datapb.CompactionPlan
expectedOut []UniqueID // planID
}{
{"with L0 tasks diff channel",
{
"with L0 tasks diff channel",
[]CompactionTask{
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 10,
Expand All @@ -156,7 +157,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
},
[]UniqueID{10, 11},
},
{"with L0 tasks same channel",
{
"with L0 tasks same channel",
[]CompactionTask{
newMixCompactionTask(&datapb.CompactionTask{
PlanID: 11,
Expand All @@ -179,7 +181,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
},
[]UniqueID{10},
},
{"without L0 tasks",
{
"without L0 tasks",
[]CompactionTask{
newMixCompactionTask(&datapb.CompactionTask{
PlanID: 14,
Expand All @@ -202,10 +205,12 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
},
[]UniqueID{13, 14},
},
{"empty tasks",
{
"empty tasks",
[]CompactionTask{},
[]*datapb.CompactionPlan{},
[]UniqueID{}},
[]UniqueID{},
},
}

for _, test := range tests {
Expand Down Expand Up @@ -235,7 +240,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
plans []*datapb.CompactionPlan
expectedOut []UniqueID // planID
}{
{"with L0 tasks diff channel",
{
"with L0 tasks diff channel",
[]CompactionTask{
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 10,
Expand All @@ -255,85 +261,92 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
[]*datapb.CompactionPlan{{}, {}},
[]UniqueID{10, 11},
},
{"with L0 tasks same channel", []CompactionTask{
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
newMixCompactionTask(&datapb.CompactionTask{
PlanID: 11,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
newMixCompactionTask(&datapb.CompactionTask{
PlanID: 13,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-3",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
},
{
"with L0 tasks same channel",
[]CompactionTask{
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
newMixCompactionTask(&datapb.CompactionTask{
PlanID: 11,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
newMixCompactionTask(&datapb.CompactionTask{
PlanID: 13,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-3",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
},
[]*datapb.CompactionPlan{
{PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction},
{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction},
{PlanID: 13, Channel: "ch-3", Type: datapb.CompactionType_MixCompaction},
},
[]UniqueID{10, 13},
},
{"with multiple L0 tasks same channel", []CompactionTask{
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 11,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 12,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
},
{
"with multiple L0 tasks same channel",
[]CompactionTask{
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 11,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 12,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
},
[]*datapb.CompactionPlan{
{PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction},
{PlanID: 11, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction},
{PlanID: 12, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction},
},
[]UniqueID{10, 11, 12},
},
{"without L0 tasks", []CompactionTask{
newMixCompactionTask(&datapb.CompactionTask{
PlanID: 14,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-3",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
newMixCompactionTask(&datapb.CompactionTask{
PlanID: 13,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-11",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
},
{
"without L0 tasks",
[]CompactionTask{
newMixCompactionTask(&datapb.CompactionTask{
PlanID: 14,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-3",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
newMixCompactionTask(&datapb.CompactionTask{
PlanID: 13,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-11",
NodeID: 102,
}, nil, s.mockMeta, s.mockSessMgr),
},
[]*datapb.CompactionPlan{
{PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MixCompaction},
{},
},
[]UniqueID{13, 14}},
[]UniqueID{13, 14},
},
{"empty tasks", []CompactionTask{}, []*datapb.CompactionPlan{}, []UniqueID{}},
}

Expand Down
Loading

0 comments on commit 9d16b97

Please sign in to comment.