Skip to content

Commit

Permalink
fix: Load segment task promote failed (#31431)
Browse files Browse the repository at this point in the history
issue: #30816
pr: #31430

pr #31319 introduce the logic that segment checker need to load level
zero segment which only exist in current target.

This PR fix load segment task promote failed when segment only belongs
to current target

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Mar 20, 2024
1 parent 6856ba1 commit 7abebf8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 14 deletions.
2 changes: 1 addition & 1 deletion internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error {
if taskType == TaskTypeMove || taskType == TaskTypeUpdate {
segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget)
} else {
segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget)
segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTargetFirst)
}
if segment == nil {
log.Warn("task stale due to the segment to load not exists in targets",
Expand Down
50 changes: 37 additions & 13 deletions tests/integration/balance/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ package balance

import (
"context"
"fmt"
"strconv"
"strings"
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"

Expand Down Expand Up @@ -51,7 +55,7 @@ func (s *BalanceTestSuit) SetupSuite() {
s.Require().NoError(s.SetupEmbedEtcd())
}

func (s *BalanceTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int) {
func (s *BalanceTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int, segmentDeleteNum int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -92,6 +96,26 @@ func (s *BalanceTestSuit) initCollection(collectionName string, replica int, cha
s.NoError(err)
s.True(merr.Ok(insertResult.Status))

if segmentDeleteNum > 0 {
if segmentDeleteNum > segmentRowNum {
segmentDeleteNum = segmentRowNum
}

pks := insertResult.GetIDs().GetIntId().GetData()
expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ","))
log.Info("========================delete expr==================",
zap.String("expr", expr),
)

deleteResp, err := s.Cluster.Proxy.Delete(ctx, &milvuspb.DeleteRequest{
CollectionName: collectionName,
Expr: expr,
})
s.Require().NoError(err)
s.Require().True(merr.Ok(deleteResp.GetStatus()))
s.Require().EqualValues(len(pks), deleteResp.GetDeleteCnt())
}

// flush
flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
Expand Down Expand Up @@ -137,7 +161,7 @@ func (s *BalanceTestSuit) initCollection(collectionName string, replica int, cha

func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
name := "test_balance_" + funcutil.GenRandomStr()
s.initCollection(name, 1, 2, 2, 2000)
s.initCollection(name, 1, 2, 2, 2000, 500)

ctx := context.Background()
// disable compact
Expand All @@ -161,7 +185,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
resp, err := qn.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
return len(resp.Channels) == 1 && len(resp.Segments) == 2
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
}, 30*time.Second, 1*time.Second)

// check total segment number
Expand All @@ -173,7 +197,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
s.True(merr.Ok(resp1.GetStatus()))
count += len(resp1.Segments)
}
return count == 4
return count == 8
}, 10*time.Second, 1*time.Second)
}

Expand All @@ -193,11 +217,11 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
Command: datapb.GcCommand_Resume,
})

// init collection with 2 channel, each channel has 2 segment, each segment has 2000 row
// init collection with 2 channel, each channel has 4 segment, each segment has 2000 row
// and load it with 2 replicas on 2 nodes.
// then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments
name := "test_balance_" + funcutil.GenRandomStr()
s.initCollection(name, 2, 2, 2, 2000)
s.initCollection(name, 2, 2, 2, 2000, 500)

resp, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{CollectionName: name})
s.NoError(err)
Expand All @@ -211,13 +235,13 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
s.Eventually(func() bool {
resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
return len(resp.Channels) == 1 && len(resp.Segments) == 2
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
}, 30*time.Second, 1*time.Second)

s.Eventually(func() bool {
resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
return len(resp.Channels) == 1 && len(resp.Segments) == 2
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
}, 30*time.Second, 1*time.Second)

// check total segment num
Expand All @@ -229,7 +253,7 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
s.True(merr.Ok(resp1.GetStatus()))
count += len(resp1.Segments)
}
return count == 8
return count == 16
}, 10*time.Second, 1*time.Second)
}

Expand All @@ -256,7 +280,7 @@ func (s *BalanceTestSuit) TestNodeDown() {
// init collection with 3 channel, each channel has 15 segment, each segment has 2000 row
// and load it with 2 replicas on 2 nodes.
name := "test_balance_" + funcutil.GenRandomStr()
s.initCollection(name, 1, 2, 15, 2000)
s.initCollection(name, 1, 2, 15, 2000, 500)

// then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments
qn1 := s.Cluster.AddQueryNode()
Expand All @@ -268,15 +292,15 @@ func (s *BalanceTestSuit) TestNodeDown() {
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments))
return len(resp.Channels) == 0 && len(resp.Segments) == 10
return len(resp.Channels) == 0 && len(resp.Segments) >= 10
}, 30*time.Second, 1*time.Second)

s.Eventually(func() bool {
resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments))
return len(resp.Channels) == 0 && len(resp.Segments) == 10
return len(resp.Channels) == 0 && len(resp.Segments) >= 10
}, 30*time.Second, 1*time.Second)

// then we force stop qn1 and resume balance channel, let balance channel and load segment happens concurrently on qn2
Expand All @@ -298,7 +322,7 @@ func (s *BalanceTestSuit) TestNodeDown() {
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments))
return len(resp.Channels) == 1 && len(resp.Segments) == 15
return len(resp.Channels) == 1 && len(resp.Segments) >= 15
}, 30*time.Second, 1*time.Second)

// expect all delegator will recover to healthy
Expand Down

0 comments on commit 7abebf8

Please sign in to comment.