Skip to content

Commit

Permalink
fix: check collection health(queryable) fail for releasing collection (
Browse files Browse the repository at this point in the history
  • Loading branch information
jaime0815 authored Aug 2, 2024
1 parent 3641ae6 commit fcec4c2
Show file tree
Hide file tree
Showing 20 changed files with 173 additions and 75 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ queryCoord:
channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode
collectionObserverInterval: 200 # the interval of collection observer
checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist
updateCollectionLoadStatusInterval: 5 # 5m, max interval of updating collection loaded status for check health
cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds
ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address
port: 19531 # TCP port of queryCoord
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewChannelLevelScoreBalancer(scheduler task.Scheduler,
nodeManager *session.NodeManager,
dist *meta.DistributionManager,
meta *meta.Meta,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
) *ChannelLevelScoreBalancer {
return &ChannelLevelScoreBalancer{
ScoreBasedBalancer: NewScoreBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr),
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/multi_target_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (g *randomPlanGenerator) generatePlans() []SegmentAssignPlan {
type MultiTargetBalancer struct {
*ScoreBasedBalancer
dist *meta.DistributionManager
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
}

func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
Expand Down Expand Up @@ -548,7 +548,7 @@ func (b *MultiTargetBalancer) genPlanByDistributions(nodeSegments, globalNodeSeg
return plans
}

func NewMultiTargetBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, targetMgr *meta.TargetManager) *MultiTargetBalancer {
func NewMultiTargetBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, targetMgr meta.TargetManagerInterface) *MultiTargetBalancer {
return &MultiTargetBalancer{
ScoreBasedBalancer: NewScoreBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr),
dist: dist,
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type RowCountBasedBalancer struct {
*RoundRobinBalancer
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
}

// AssignSegment, when row count based balancer assign segments, it will assign segment to node with least global row count.
Expand Down Expand Up @@ -354,7 +354,7 @@ func NewRowCountBasedBalancer(
nodeManager *session.NodeManager,
dist *meta.DistributionManager,
meta *meta.Meta,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
) *RowCountBasedBalancer {
return &RowCountBasedBalancer{
RoundRobinBalancer: NewRoundRobinBalancer(scheduler, nodeManager),
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewScoreBasedBalancer(scheduler task.Scheduler,
nodeManager *session.NodeManager,
dist *meta.DistributionManager,
meta *meta.Meta,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
) *ScoreBasedBalancer {
return &ScoreBasedBalancer{
RowCountBasedBalancer: NewRowCountBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr),
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/dist/dist_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ControllerImpl struct {
client session.Cluster
nodeManager *session.NodeManager
dist *meta.DistributionManager
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
scheduler task.Scheduler
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func NewDistController(
client session.Cluster,
nodeManager *session.NodeManager,
dist *meta.DistributionManager,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
scheduler task.Scheduler,
) *ControllerImpl {
return &ControllerImpl{
Expand Down
8 changes: 4 additions & 4 deletions internal/querycoordv2/job/job_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type LoadCollectionJob struct {
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
nodeMgr *session.NodeManager
Expand All @@ -61,7 +61,7 @@ func NewLoadCollectionJob(
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
nodeMgr *session.NodeManager,
Expand Down Expand Up @@ -239,7 +239,7 @@ type LoadPartitionJob struct {
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
nodeMgr *session.NodeManager
Expand All @@ -252,7 +252,7 @@ func NewLoadPartitionJob(
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
nodeMgr *session.NodeManager,
Expand Down
8 changes: 4 additions & 4 deletions internal/querycoordv2/job/job_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type ReleaseCollectionJob struct {
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
checkerController *checkers.CheckerController
}
Expand All @@ -50,7 +50,7 @@ func NewReleaseCollectionJob(ctx context.Context,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
) *ReleaseCollectionJob {
Expand Down Expand Up @@ -114,7 +114,7 @@ type ReleasePartitionJob struct {
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
checkerController *checkers.CheckerController
}
Expand All @@ -125,7 +125,7 @@ func NewReleasePartitionJob(ctx context.Context,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
) *ReleasePartitionJob {
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/job/undo.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ type UndoList struct {
ctx context.Context
meta *meta.Meta
cluster session.Cluster
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
}

func NewUndoList(ctx context.Context, meta *meta.Meta,
cluster session.Cluster, targetMgr *meta.TargetManager, targetObserver *observers.TargetObserver,
cluster session.Cluster, targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver,
) *UndoList {
return &UndoList{
ctx: ctx,
Expand Down
43 changes: 43 additions & 0 deletions internal/querycoordv2/meta/mock_target_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type TargetManagerInterface interface {
IsNextTargetExist(collectionID int64) bool
SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
Recover(catalog metastore.QueryCoordCatalog) error
CanSegmentBeMoved(collectionID, segmentID int64) bool
}

type TargetManager struct {
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/observers/collection_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type CollectionObserver struct {

dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
targetObserver *TargetObserver
checkerController *checkers.CheckerController
partitionLoadedCount map[int64]int
Expand All @@ -62,7 +62,7 @@ type LoadTask struct {
func NewCollectionObserver(
dist *meta.DistributionManager,
meta *meta.Meta,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
targetObserver *TargetObserver,
checherController *checkers.CheckerController,
) *CollectionObserver {
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type TargetObserver struct {
cancel context.CancelFunc
wg sync.WaitGroup
meta *meta.Meta
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
distMgr *meta.DistributionManager
broker meta.Broker
cluster session.Cluster
Expand All @@ -76,7 +76,7 @@ type TargetObserver struct {

func NewTargetObserver(
meta *meta.Meta,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
distMgr *meta.DistributionManager,
broker meta.Broker,
cluster session.Cluster,
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type Server struct {
store metastore.QueryCoordCatalog
meta *meta.Meta
dist *meta.DistributionManager
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
broker meta.Broker

// Session
Expand Down
73 changes: 47 additions & 26 deletions internal/querycoordv2/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1610,44 +1610,65 @@ func (suite *ServiceSuite) TestGetReplicasWhenNoAvailableNodes() {
}

func (suite *ServiceSuite) TestCheckHealth() {
suite.loadAll()
ctx := context.Background()
server := suite.server

// Test for server is not healthy
server.UpdateStateCode(commonpb.StateCode_Initializing)
resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
suite.NoError(err)
suite.Equal(resp.IsHealthy, false)
suite.NotEmpty(resp.Reasons)
assertCheckHealthResult := func(isHealthy bool) {
resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
suite.NoError(err)
suite.Equal(resp.IsHealthy, isHealthy)
if !isHealthy {
suite.NotEmpty(resp.Reasons)
} else {
suite.Empty(resp.Reasons)
}
}

// Test for components state fail
for _, node := range suite.nodes {
suite.cluster.EXPECT().GetComponentStates(mock.Anything, node).Return(
setNodeSate := func(state commonpb.StateCode) {
// Test for components state fail
suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Unset()
suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(
&milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Abnormal},
State: &milvuspb.ComponentInfo{StateCode: state},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
},
nil).Once()
nil).Maybe()
}

// Test for server is not healthy
server.UpdateStateCode(commonpb.StateCode_Initializing)
assertCheckHealthResult(false)

// Test for components state fail
setNodeSate(commonpb.StateCode_Abnormal)
server.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err = server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
suite.NoError(err)
suite.Equal(resp.IsHealthy, false)
suite.NotEmpty(resp.Reasons)
assertCheckHealthResult(false)

// Test for server is healthy
// Test for check load percentage fail
setNodeSate(commonpb.StateCode_Healthy)
assertCheckHealthResult(true)

// Test for check channel ok
for _, collection := range suite.collections {
suite.updateCollectionStatus(collection, querypb.LoadStatus_Loaded)
suite.updateChannelDist(collection)
}
assertCheckHealthResult(true)

// Test for check channel fail
tm := meta.NewMockTargetManager(suite.T())
tm.EXPECT().GetDmChannelsByCollection(mock.Anything, mock.Anything).Return(nil).Maybe()
otm := server.targetMgr
server.targetMgr = tm
assertCheckHealthResult(true)

// Test for get shard leader fail
server.targetMgr = otm
for _, node := range suite.nodes {
suite.cluster.EXPECT().GetComponentStates(mock.Anything, node).Return(
&milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Healthy},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
},
nil).Once()
suite.nodeMgr.Suspend(node)
}
resp, err = server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
suite.NoError(err)
suite.Equal(resp.IsHealthy, true)
suite.Empty(resp.Reasons)
assertCheckHealthResult(true)
}

func (suite *ServiceSuite) TestGetShardLeaders() {
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/task/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Executor struct {
meta *meta.Meta
dist *meta.DistributionManager
broker meta.Broker
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
cluster session.Cluster
nodeMgr *session.NodeManager

Expand All @@ -69,7 +69,7 @@ type Executor struct {
func NewExecutor(meta *meta.Meta,
dist *meta.DistributionManager,
broker meta.Broker,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
cluster session.Cluster,
nodeMgr *session.NodeManager,
) *Executor {
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type taskScheduler struct {

distMgr *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
targetMgr meta.TargetManagerInterface
broker meta.Broker
cluster session.Cluster
nodeMgr *session.NodeManager
Expand All @@ -177,7 +177,7 @@ type taskScheduler struct {
func NewScheduler(ctx context.Context,
meta *meta.Meta,
distMgr *meta.DistributionManager,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
broker meta.Broker,
cluster session.Cluster,
nodeMgr *session.NodeManager,
Expand Down
Loading

0 comments on commit fcec4c2

Please sign in to comment.