Skip to content

Commit

Permalink
fix: State trans error in concurrent Release and Watching
Browse files Browse the repository at this point in the history
See also: #38589

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Dec 19, 2024
1 parent 451deb3 commit 8db49a5
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 54 deletions.
20 changes: 18 additions & 2 deletions internal/datacoord/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,15 @@ func NewStateChannelByWatchInfo(nodeID int64, info *datapb.ChannelWatchInfo) *St
return c
}

func (c *StateChannel) TransitionOnSuccess() {
func (c *StateChannel) TransitionOnSuccess(opID int64) {
if opID != c.Info.GetOpID() {
log.Warn("Try to transit on success but opID not match, stay original state ",
zap.Any("currentState", c.currentState),
zap.String("channel", c.Name),
zap.Int64("target opID", opID),
zap.Int64("channel opID", c.Info.GetOpID()))
return
}
switch c.currentState {
case Standby:
c.setState(ToWatch)
Expand All @@ -208,7 +216,15 @@ func (c *StateChannel) TransitionOnSuccess() {
}
}

func (c *StateChannel) TransitionOnFailure() {
func (c *StateChannel) TransitionOnFailure(opID int64) {
if opID != c.Info.GetOpID() {
log.Warn("Try to transit on failure but opID not match, stay original state",
zap.Any("currentState", c.currentState),
zap.String("channel", c.Name),
zap.Int64("target opID", opID),
zap.Int64("channel opID", c.Info.GetOpID()))
return
}

Check warning on line 227 in internal/datacoord/channel.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/channel.go#L221-L227

Added lines #L221 - L227 were not covered by tests
switch c.currentState {
case Watching:
c.setState(Standby)
Expand Down
43 changes: 28 additions & 15 deletions internal/datacoord/channel_manager_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,9 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies
continue
}
nodeID := nodeAssign.NodeID

var (
succeededChannels = make([]RWChannel, 0, channelCount)
failedChannels = make([]RWChannel, 0, channelCount)
succeededChannels = 0
failedChannels = 0
futures = make([]*conc.Future[any], 0, channelCount)
)

Expand All @@ -530,31 +529,42 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies

future := getOrCreateIOPool().Submit(func() (any, error) {
err := m.Notify(ctx, nodeID, tmpWatchInfo)
return innerCh, err
return poolResult{
ch: innerCh,
opID: tmpWatchInfo.GetOpID(),
}, err
})
futures = append(futures, future)
}

for _, f := range futures {
ch, err := f.Await()
got, err := f.Await()
res := got.(poolResult)

if err != nil {
failedChannels = append(failedChannels, ch.(RWChannel))
log.Ctx(ctx).Warn("Failed to notify channel operations to datanode",
zap.Int64("assignment", nodeAssign.NodeID),
zap.Int("operation count", channelCount),
zap.String("channel name", res.ch.GetName()),
zap.Error(err),
)
failedChannels++
} else {
succeededChannels = append(succeededChannels, ch.(RWChannel))
succeededChannels++
advanced = true
}

m.mu.Lock()
m.store.UpdateState(err == nil, nodeID, res.ch, res.opID)
m.mu.Unlock()
}

log.Info("Finish to notify channel operations to datanode",
zap.Int64("assignment", nodeAssign.NodeID),
zap.Int("operation count", channelCount),
zap.Int("success count", len(succeededChannels)),
zap.Int("failure count", len(failedChannels)),
zap.Int("success count", succeededChannels),
zap.Int("failure count", failedChannels),
)
m.mu.Lock()
m.store.UpdateState(false, failedChannels...)
m.store.UpdateState(true, succeededChannels...)
m.mu.Unlock()
}

return advanced
Expand All @@ -563,6 +573,7 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies
type poolResult struct {
successful bool
ch RWChannel
opID int64
}

func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool {
Expand All @@ -583,13 +594,15 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*

for _, ch := range nodeAssign.Channels {
innerCh := ch
tmpWatchInfo := proto.Clone(innerCh.GetWatchInfo()).(*datapb.ChannelWatchInfo)

future := getOrCreateIOPool().Submit(func() (any, error) {
successful, got := m.Check(ctx, nodeID, innerCh.GetWatchInfo())
successful, got := m.Check(ctx, nodeID, tmpWatchInfo)
if got {
return poolResult{
successful: successful,
ch: innerCh,
opID: tmpWatchInfo.GetOpID(),
}, nil
}
return nil, errors.New("Got results with no progress")
Expand All @@ -602,7 +615,7 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*
if err == nil {
m.mu.Lock()
result := got.(poolResult)
m.store.UpdateState(result.successful, result.ch)
m.store.UpdateState(result.successful, nodeID, result.ch, result.opID)
m.mu.Unlock()

advanced = true
Expand Down
42 changes: 42 additions & 0 deletions internal/datacoord/channel_manager_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,48 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
s.checkAssignment(m, 1, "ch1", Watching)
s.checkAssignment(m, 1, "ch2", Watching)
})
s.Run("advance watching channels released during check", func() {
idx := int64(19530)
mockAlloc := globalIDAllocator.NewMockGlobalIDAllocator(s.T())
mockAlloc.EXPECT().AllocOne().
RunAndReturn(func() (int64, error) {
idx++
return idx, nil
})
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)

m.AdvanceChannelState(ctx)
s.checkAssignment(m, 1, "ch1", Watching)
s.checkAssignment(m, 1, "ch2", Watching)

// Release belfore check return
s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) {
if info.GetVchan().GetChannelName() == "ch1" {
m.Release(1, "ch1")
s.checkAssignment(m, 1, "ch1", ToRelease)
rwChannel, found := m.GetChannel(nodeID, "ch1")
s.True(found)
metaInfo := rwChannel.GetWatchInfo()
s.Require().EqualValues(metaInfo.GetOpID(), 19531)
log.Info("Trying to check this info", zap.Any("meta info", rwChannel.GetWatchInfo()))
}
log.Info("Trying to check this info", zap.Any("rpc info", info))
return &datapb.ChannelOperationProgressResponse{State: datapb.ChannelWatchState_WatchSuccess, Progress: 100}, nil
}).Twice()
m.AdvanceChannelState(ctx)

s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", Watched)
})
s.Run("advance watching channels check ErrNodeNotFound", func() {
chNodes := map[string]int64{
"ch1": 1,
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/channel_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type RWChannelStore interface {
Update(op *ChannelOpSet) error

// UpdateState is used by StateChannelStore only
UpdateState(isSuccessful bool, channels ...RWChannel)
UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)
// SegLegacyChannelByNode is used by StateChannelStore only
SetLegacyChannelByNode(nodeIDs ...int64)
}
Expand Down Expand Up @@ -565,7 +565,7 @@ func (c *ChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelec
return nil
}

func (c *ChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) {
func (c *ChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {

Check warning on line 568 in internal/datacoord/channel_store.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/channel_store.go#L568

Added line #L568 was not covered by tests
log.Error("ChannelStore doesn't implement UpdateState")
}

Expand Down
19 changes: 9 additions & 10 deletions internal/datacoord/channel_store_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,17 @@ func (c *StateChannelStore) AddNode(nodeID int64) {
}
}

func (c *StateChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) {
lo.ForEach(channels, func(ch RWChannel, _ int) {
for _, cInfo := range c.channelsInfo {
if stateChannel, ok := cInfo.Channels[ch.GetName()]; ok {
if isSuccessful {
stateChannel.(*StateChannel).TransitionOnSuccess()
} else {
stateChannel.(*StateChannel).TransitionOnFailure()
}
func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {
channelName := channel.GetName()
if cInfo, ok := c.channelsInfo[nodeID]; ok {
if stateChannel, ok := cInfo.Channels[channelName]; ok {
if isSuccessful {
stateChannel.(*StateChannel).TransitionOnSuccess(opID)
} else {
stateChannel.(*StateChannel).TransitionOnFailure(opID)
}
}
})
}
}

func (c *StateChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/channel_store_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,14 @@ func (s *StateChannelStoreSuite) TestUpdateState() {
ch := "ch-1"
channel := NewStateChannel(getChannel(ch, 1))
channel.setState(test.inChannelState)
store.channelsInfo[1] = &NodeChannelInfo{
store.channelsInfo[bufferID] = &NodeChannelInfo{
NodeID: bufferID,
Channels: map[string]RWChannel{
ch: channel,
},
}

store.UpdateState(test.inSuccess, channel)
store.UpdateState(test.inSuccess, bufferID, channel, 0)
s.Equal(test.outChannelState, channel.currentState)
})
}
Expand Down
34 changes: 11 additions & 23 deletions internal/datacoord/mock_channel_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8db49a5

Please sign in to comment.