Skip to content

Commit

Permalink
enhance: make TransferChannel/TransferSegment idempotent (#36489)
Browse files Browse the repository at this point in the history
issue: #36488
when call TransferChannel/TransferSegment, querycoord will generate and
submit balance task to scheduler, if segment/channel's task already
exist in scheduler, submit task will failed.

to make TransferChannel/TransferSegment idempotent, we should skip to
submit if task already exist in scheduler.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Sep 26, 2024
1 parent 5dfa1c3 commit 55be814
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
6 changes: 4 additions & 2 deletions internal/querycoordv2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ func (s *Server) balanceSegments(ctx context.Context,
err = s.taskScheduler.Add(t)
if err != nil {
t.Cancel(err)
return err
log.Info("skip balance segment task", zap.Int64("segmentID", plan.Segment.GetID()), zap.Error(err))
continue
}
tasks = append(tasks, t)
}
Expand Down Expand Up @@ -223,7 +224,8 @@ func (s *Server) balanceChannels(ctx context.Context,
err = s.taskScheduler.Add(t)
if err != nil {
t.Cancel(err)
return err
log.Info("skip balance channel task", zap.String("channel", plan.Channel.GetChannelName()), zap.Error(err))
continue
}
tasks = append(tasks, t)
}
Expand Down
56 changes: 56 additions & 0 deletions internal/querycoordv2/ops_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"testing"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -683,6 +684,33 @@ func (suite *OpsServiceSuite) TestTransferSegment() {
suite.True(merr.Ok(resp))
suite.Equal(counter.Load(), int64(4))
suite.Len(nodeSet.Collect(), 3)

// test transfer segment idempotent
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter = atomic.NewInt64(0)
taskIDSet := typeutil.NewUniqueSet()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
if taskIDSet.Contain(t.ID()) {
return errors.New("duplicate task")
}
return nil
})
resp, err = suite.server.TransferSegment(ctx, &querypb.TransferSegmentRequest{
SourceNodeID: nodes[0],
TransferAll: true,
ToAllNodes: true,
})
suite.NoError(err)
suite.True(merr.Ok(resp))
resp, err = suite.server.TransferSegment(ctx, &querypb.TransferSegmentRequest{
SourceNodeID: nodes[0],
TransferAll: true,
ToAllNodes: true,
})
suite.NoError(err)
suite.True(merr.Ok(resp))
}

func (suite *OpsServiceSuite) TestTransferChannel() {
Expand Down Expand Up @@ -907,6 +935,34 @@ func (suite *OpsServiceSuite) TestTransferChannel() {
suite.True(merr.Ok(resp))
suite.Equal(counter.Load(), int64(4))
suite.Len(nodeSet.Collect(), 3)

// test transfer channel idempotent
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter = atomic.NewInt64(0)
taskIDSet := typeutil.NewUniqueSet()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
if taskIDSet.Contain(t.ID()) {
return errors.New("duplicate task")
}
return nil
})

resp, err = suite.server.TransferChannel(ctx, &querypb.TransferChannelRequest{
SourceNodeID: nodes[0],
TransferAll: true,
ToAllNodes: true,
})
suite.NoError(err)
suite.True(merr.Ok(resp))
resp, err = suite.server.TransferChannel(ctx, &querypb.TransferChannelRequest{
SourceNodeID: nodes[0],
TransferAll: true,
ToAllNodes: true,
})
suite.NoError(err)
suite.True(merr.Ok(resp))
}

func TestOpsService(t *testing.T) {
Expand Down

0 comments on commit 55be814

Please sign in to comment.