From e82af487069a79fb53aae9144b5baf1203acfd63 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 19 Dec 2024 21:46:47 +0800 Subject: [PATCH] fix: State trans error in concurrent Release and Watching (#38591) See also: #38589 pr: #38590 Signed-off-by: yangxuan --- internal/datacoord/channel.go | 20 ++++++++- internal/datacoord/channel_manager_v2.go | 43 ++++++++++++------- internal/datacoord/channel_manager_v2_test.go | 42 ++++++++++++++++++ internal/datacoord/channel_store.go | 4 +- internal/datacoord/channel_store_v2.go | 19 ++++---- internal/datacoord/channel_store_v2_test.go | 4 +- internal/datacoord/mock_channel_store.go | 34 +++++---------- 7 files changed, 112 insertions(+), 54 deletions(-) diff --git a/internal/datacoord/channel.go b/internal/datacoord/channel.go index 5f270a4c88471..453a878a6e334 100644 --- a/internal/datacoord/channel.go +++ b/internal/datacoord/channel.go @@ -191,7 +191,15 @@ func NewStateChannelByWatchInfo(nodeID int64, info *datapb.ChannelWatchInfo) *St return c } -func (c *StateChannel) TransitionOnSuccess() { +func (c *StateChannel) TransitionOnSuccess(opID int64) { + if opID != c.Info.GetOpID() { + log.Warn("Try to transit on success but opID not match, stay original state ", + zap.Any("currentState", c.currentState), + zap.String("channel", c.Name), + zap.Int64("target opID", opID), + zap.Int64("channel opID", c.Info.GetOpID())) + return + } switch c.currentState { case Standby: c.setState(ToWatch) @@ -208,7 +216,15 @@ func (c *StateChannel) TransitionOnSuccess() { } } -func (c *StateChannel) TransitionOnFailure() { +func (c *StateChannel) TransitionOnFailure(opID int64) { + if opID != c.Info.GetOpID() { + log.Warn("Try to transit on failure but opID not match, stay original state", + zap.Any("currentState", c.currentState), + zap.String("channel", c.Name), + zap.Int64("target opID", opID), + zap.Int64("channel opID", c.Info.GetOpID())) + return + } switch c.currentState { case Watching: c.setState(Standby) diff --git a/internal/datacoord/channel_manager_v2.go b/internal/datacoord/channel_manager_v2.go index 301e9e874faf0..fe0e0d462ad3b 100644 --- a/internal/datacoord/channel_manager_v2.go +++ b/internal/datacoord/channel_manager_v2.go @@ -510,10 +510,9 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies continue } nodeID := nodeAssign.NodeID - var ( - succeededChannels = make([]RWChannel, 0, channelCount) - failedChannels = make([]RWChannel, 0, channelCount) + succeededChannels = 0 + failedChannels = 0 futures = make([]*conc.Future[any], 0, channelCount) ) @@ -530,31 +529,42 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies future := getOrCreateIOPool().Submit(func() (any, error) { err := m.Notify(ctx, nodeID, tmpWatchInfo) - return innerCh, err + return poolResult{ + ch: innerCh, + opID: tmpWatchInfo.GetOpID(), + }, err }) futures = append(futures, future) } for _, f := range futures { - ch, err := f.Await() + got, err := f.Await() + res := got.(poolResult) + if err != nil { - failedChannels = append(failedChannels, ch.(RWChannel)) + log.Ctx(ctx).Warn("Failed to notify channel operations to datanode", + zap.Int64("assignment", nodeAssign.NodeID), + zap.Int("operation count", channelCount), + zap.String("channel name", res.ch.GetName()), + zap.Error(err), + ) + failedChannels++ } else { - succeededChannels = append(succeededChannels, ch.(RWChannel)) + succeededChannels++ advanced = true } + + m.mu.Lock() + m.store.UpdateState(err == nil, nodeID, res.ch, res.opID) + m.mu.Unlock() } log.Info("Finish to notify channel operations to datanode", zap.Int64("assignment", nodeAssign.NodeID), zap.Int("operation count", channelCount), - zap.Int("success count", len(succeededChannels)), - zap.Int("failure count", len(failedChannels)), + zap.Int("success count", succeededChannels), + zap.Int("failure count", failedChannels), ) - m.mu.Lock() - m.store.UpdateState(false, failedChannels...) - m.store.UpdateState(true, succeededChannels...) - m.mu.Unlock() } return advanced @@ -563,6 +573,7 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies type poolResult struct { successful bool ch RWChannel + opID int64 } func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool { @@ -583,13 +594,15 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []* for _, ch := range nodeAssign.Channels { innerCh := ch + tmpWatchInfo := proto.Clone(innerCh.GetWatchInfo()).(*datapb.ChannelWatchInfo) future := getOrCreateIOPool().Submit(func() (any, error) { - successful, got := m.Check(ctx, nodeID, innerCh.GetWatchInfo()) + successful, got := m.Check(ctx, nodeID, tmpWatchInfo) if got { return poolResult{ successful: successful, ch: innerCh, + opID: tmpWatchInfo.GetOpID(), }, nil } return nil, errors.New("Got results with no progress") @@ -602,7 +615,7 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []* if err == nil { m.mu.Lock() result := got.(poolResult) - m.store.UpdateState(result.successful, result.ch) + m.store.UpdateState(result.successful, nodeID, result.ch, result.opID) m.mu.Unlock() advanced = true diff --git a/internal/datacoord/channel_manager_v2_test.go b/internal/datacoord/channel_manager_v2_test.go index 0b92eb80ad5ee..a4c0b5e77bd0f 100644 --- a/internal/datacoord/channel_manager_v2_test.go +++ b/internal/datacoord/channel_manager_v2_test.go @@ -451,6 +451,48 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() { s.checkAssignment(m, 1, "ch1", Watching) s.checkAssignment(m, 1, "ch2", Watching) }) + s.Run("advance watching channels released during check", func() { + idx := int64(19530) + mockAlloc := globalIDAllocator.NewMockGlobalIDAllocator(s.T()) + mockAlloc.EXPECT().AllocOne(). + RunAndReturn(func() (int64, error) { + idx++ + return idx, nil + }) + chNodes := map[string]int64{ + "ch1": 1, + "ch2": 1, + } + s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch) + s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice() + m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, mockAlloc) + s.Require().NoError(err) + s.checkAssignment(m, 1, "ch1", ToWatch) + s.checkAssignment(m, 1, "ch2", ToWatch) + + m.AdvanceChannelState(ctx) + s.checkAssignment(m, 1, "ch1", Watching) + s.checkAssignment(m, 1, "ch2", Watching) + + // Release belfore check return + s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) { + if info.GetVchan().GetChannelName() == "ch1" { + m.Release(1, "ch1") + s.checkAssignment(m, 1, "ch1", ToRelease) + rwChannel, found := m.GetChannel(nodeID, "ch1") + s.True(found) + metaInfo := rwChannel.GetWatchInfo() + s.Require().EqualValues(metaInfo.GetOpID(), 19531) + log.Info("Trying to check this info", zap.Any("meta info", rwChannel.GetWatchInfo())) + } + log.Info("Trying to check this info", zap.Any("rpc info", info)) + return &datapb.ChannelOperationProgressResponse{State: datapb.ChannelWatchState_WatchSuccess, Progress: 100}, nil + }).Twice() + m.AdvanceChannelState(ctx) + + s.checkAssignment(m, 1, "ch1", ToRelease) + s.checkAssignment(m, 1, "ch2", Watched) + }) s.Run("advance watching channels check ErrNodeNotFound", func() { chNodes := map[string]int64{ "ch1": 1, diff --git a/internal/datacoord/channel_store.go b/internal/datacoord/channel_store.go index 2627a5dc6de71..b3e2b1f70f55b 100644 --- a/internal/datacoord/channel_store.go +++ b/internal/datacoord/channel_store.go @@ -77,7 +77,7 @@ type RWChannelStore interface { Update(op *ChannelOpSet) error // UpdateState is used by StateChannelStore only - UpdateState(isSuccessful bool, channels ...RWChannel) + UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) // SegLegacyChannelByNode is used by StateChannelStore only SetLegacyChannelByNode(nodeIDs ...int64) } @@ -565,7 +565,7 @@ func (c *ChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelec return nil } -func (c *ChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) { +func (c *ChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) { log.Error("ChannelStore doesn't implement UpdateState") } diff --git a/internal/datacoord/channel_store_v2.go b/internal/datacoord/channel_store_v2.go index e2bed5acd9329..94b9d04fa6838 100644 --- a/internal/datacoord/channel_store_v2.go +++ b/internal/datacoord/channel_store_v2.go @@ -82,18 +82,17 @@ func (c *StateChannelStore) AddNode(nodeID int64) { } } -func (c *StateChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) { - lo.ForEach(channels, func(ch RWChannel, _ int) { - for _, cInfo := range c.channelsInfo { - if stateChannel, ok := cInfo.Channels[ch.GetName()]; ok { - if isSuccessful { - stateChannel.(*StateChannel).TransitionOnSuccess() - } else { - stateChannel.(*StateChannel).TransitionOnFailure() - } +func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) { + channelName := channel.GetName() + if cInfo, ok := c.channelsInfo[nodeID]; ok { + if stateChannel, ok := cInfo.Channels[channelName]; ok { + if isSuccessful { + stateChannel.(*StateChannel).TransitionOnSuccess(opID) + } else { + stateChannel.(*StateChannel).TransitionOnFailure(opID) } } - }) + } } func (c *StateChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) { diff --git a/internal/datacoord/channel_store_v2_test.go b/internal/datacoord/channel_store_v2_test.go index 191a0e040f024..c9ea41524a5b5 100644 --- a/internal/datacoord/channel_store_v2_test.go +++ b/internal/datacoord/channel_store_v2_test.go @@ -434,14 +434,14 @@ func (s *StateChannelStoreSuite) TestUpdateState() { ch := "ch-1" channel := NewStateChannel(getChannel(ch, 1)) channel.setState(test.inChannelState) - store.channelsInfo[1] = &NodeChannelInfo{ + store.channelsInfo[bufferID] = &NodeChannelInfo{ NodeID: bufferID, Channels: map[string]RWChannel{ ch: channel, }, } - store.UpdateState(test.inSuccess, channel) + store.UpdateState(test.inSuccess, bufferID, channel, 0) s.Equal(test.outChannelState, channel.currentState) }) } diff --git a/internal/datacoord/mock_channel_store.go b/internal/datacoord/mock_channel_store.go index fc7cb51ef3e92..b5b0ecd6c4b7a 100644 --- a/internal/datacoord/mock_channel_store.go +++ b/internal/datacoord/mock_channel_store.go @@ -572,16 +572,9 @@ func (_c *MockRWChannelStore_Update_Call) RunAndReturn(run func(*ChannelOpSet) e return _c } -// UpdateState provides a mock function with given fields: isSuccessful, channels -func (_m *MockRWChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) { - _va := make([]interface{}, len(channels)) - for _i := range channels { - _va[_i] = channels[_i] - } - var _ca []interface{} - _ca = append(_ca, isSuccessful) - _ca = append(_ca, _va...) - _m.Called(_ca...) +// UpdateState provides a mock function with given fields: isSuccessful, nodeID, channel, opID +func (_m *MockRWChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) { + _m.Called(isSuccessful, nodeID, channel, opID) } // MockRWChannelStore_UpdateState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateState' @@ -591,21 +584,16 @@ type MockRWChannelStore_UpdateState_Call struct { // UpdateState is a helper method to define mock.On call // - isSuccessful bool -// - channels ...RWChannel -func (_e *MockRWChannelStore_Expecter) UpdateState(isSuccessful interface{}, channels ...interface{}) *MockRWChannelStore_UpdateState_Call { - return &MockRWChannelStore_UpdateState_Call{Call: _e.mock.On("UpdateState", - append([]interface{}{isSuccessful}, channels...)...)} +// - nodeID int64 +// - channel RWChannel +// - opID int64 +func (_e *MockRWChannelStore_Expecter) UpdateState(isSuccessful interface{}, nodeID interface{}, channel interface{}, opID interface{}) *MockRWChannelStore_UpdateState_Call { + return &MockRWChannelStore_UpdateState_Call{Call: _e.mock.On("UpdateState", isSuccessful, nodeID, channel, opID)} } -func (_c *MockRWChannelStore_UpdateState_Call) Run(run func(isSuccessful bool, channels ...RWChannel)) *MockRWChannelStore_UpdateState_Call { +func (_c *MockRWChannelStore_UpdateState_Call) Run(run func(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)) *MockRWChannelStore_UpdateState_Call { _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]RWChannel, len(args)-1) - for i, a := range args[1:] { - if a != nil { - variadicArgs[i] = a.(RWChannel) - } - } - run(args[0].(bool), variadicArgs...) + run(args[0].(bool), args[1].(int64), args[2].(RWChannel), args[3].(int64)) }) return _c } @@ -615,7 +603,7 @@ func (_c *MockRWChannelStore_UpdateState_Call) Return() *MockRWChannelStore_Upda return _c } -func (_c *MockRWChannelStore_UpdateState_Call) RunAndReturn(run func(bool, ...RWChannel)) *MockRWChannelStore_UpdateState_Call { +func (_c *MockRWChannelStore_UpdateState_Call) RunAndReturn(run func(bool, int64, RWChannel, int64)) *MockRWChannelStore_UpdateState_Call { _c.Call.Return(run) return _c }