Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ChannelManager concurret Release and Watch bug #38590

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions internal/datacoord/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,15 @@ func NewStateChannelByWatchInfo(nodeID int64, info *datapb.ChannelWatchInfo) *St
return c
}

func (c *StateChannel) TransitionOnSuccess() {
func (c *StateChannel) TransitionOnSuccess(opID int64) {
if opID != c.Info.GetOpID() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approve for now. But we definitely need refactor here. We need to

  1. Return the Channel status on DN (Watching/Wachted/Releasing/Released/...)
  2. Group all info for a given channel on DC to determine the next step

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Warn("Try to transit on success but opID not match, stay original state ",
zap.Any("currentState", c.currentState),
zap.String("channel", c.Name),
zap.Int64("target opID", opID),
zap.Int64("channel opID", c.Info.GetOpID()))
return
}
switch c.currentState {
case Standby:
c.setState(ToWatch)
Expand All @@ -208,7 +216,15 @@ func (c *StateChannel) TransitionOnSuccess() {
}
}

func (c *StateChannel) TransitionOnFailure() {
func (c *StateChannel) TransitionOnFailure(opID int64) {
if opID != c.Info.GetOpID() {
log.Warn("Try to transit on failure but opID not match, stay original state",
zap.Any("currentState", c.currentState),
zap.String("channel", c.Name),
zap.Int64("target opID", opID),
zap.Int64("channel opID", c.Info.GetOpID()))
return
}
switch c.currentState {
case Watching:
c.setState(Standby)
Expand Down
44 changes: 30 additions & 14 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
nodeID := nodeAssign.NodeID

var (
succeededChannels = make([]RWChannel, 0, channelCount)
failedChannels = make([]RWChannel, 0, channelCount)
succeededChannels = 0
failedChannels = 0
futures = make([]*conc.Future[any], 0, channelCount)
)

Expand All @@ -564,31 +564,42 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [

future := getOrCreateIOPool().Submit(func() (any, error) {
err := m.Notify(ctx, nodeID, tmpWatchInfo)
return innerCh, err
return poolResult{
ch: innerCh,
opID: tmpWatchInfo.GetOpID(),
}, err
})
futures = append(futures, future)
}

for _, f := range futures {
ch, err := f.Await()
got, err := f.Await()
res := got.(poolResult)

if err != nil {
failedChannels = append(failedChannels, ch.(RWChannel))
log.Ctx(ctx).Warn("Failed to notify channel operations to datanode",
zap.Int64("assignment", nodeAssign.NodeID),
zap.Int("operation count", channelCount),
zap.String("channel name", res.ch.GetName()),
zap.Error(err),
)
failedChannels++
} else {
succeededChannels = append(succeededChannels, ch.(RWChannel))
succeededChannels++
advanced = true
}

m.mu.Lock()
m.store.UpdateState(err == nil, nodeID, res.ch, res.opID)
m.mu.Unlock()
}

log.Ctx(ctx).Info("Finish to notify channel operations to datanode",
zap.Int64("assignment", nodeAssign.NodeID),
zap.Int("operation count", channelCount),
zap.Int("success count", len(succeededChannels)),
zap.Int("failure count", len(failedChannels)),
zap.Int("success count", succeededChannels),
zap.Int("failure count", failedChannels),
)
m.mu.Lock()
m.store.UpdateState(false, failedChannels...)
m.store.UpdateState(true, succeededChannels...)
m.mu.Unlock()
}

return advanced
Expand All @@ -597,6 +608,7 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
type poolResult struct {
successful bool
ch RWChannel
opID int64
}

func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool {
Expand All @@ -617,13 +629,15 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No

for _, ch := range nodeAssign.Channels {
innerCh := ch
tmpWatchInfo := typeutil.Clone(innerCh.GetWatchInfo())

future := getOrCreateIOPool().Submit(func() (any, error) {
successful, got := m.Check(ctx, nodeID, innerCh.GetWatchInfo())
successful, got := m.Check(ctx, nodeID, tmpWatchInfo)
if got {
return poolResult{
successful: successful,
ch: innerCh,
opID: tmpWatchInfo.GetOpID(),
}, nil
}
return nil, errors.New("Got results with no progress")
Expand All @@ -636,7 +650,7 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
if err == nil {
m.mu.Lock()
result := got.(poolResult)
m.store.UpdateState(result.successful, result.ch)
m.store.UpdateState(result.successful, nodeID, result.ch, result.opID)
m.mu.Unlock()

advanced = true
Expand All @@ -656,6 +670,7 @@ func (m *ChannelManagerImpl) Notify(ctx context.Context, nodeID int64, info *dat
zap.String("channel", info.GetVchan().GetChannelName()),
zap.Int64("assignment", nodeID),
zap.String("operation", info.GetState().String()),
zap.Int64("opID", info.GetOpID()),
)
log.Info("Notify channel operation")
err := m.subCluster.NotifyChannelOperation(ctx, nodeID, &datapb.ChannelOperationsRequest{Infos: []*datapb.ChannelWatchInfo{info}})
Expand Down Expand Up @@ -706,6 +721,7 @@ func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *data
if resp.GetState() == datapb.ChannelWatchState_ReleaseFailure {
return false, true
}

}
return false, false
}
Expand Down
44 changes: 44 additions & 0 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,50 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
s.checkAssignment(m, 1, "ch1", Watching)
s.checkAssignment(m, 1, "ch2", Watching)
})

s.Run("advance watching channels released during check", func() {
idx := int64(19530)
mockAlloc := globalIDAllocator.NewMockGlobalIDAllocator(s.T())
mockAlloc.EXPECT().AllocOne().
RunAndReturn(func() (int64, error) {
idx++
return idx, nil
})
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)

m.AdvanceChannelState(ctx)
s.checkAssignment(m, 1, "ch1", Watching)
s.checkAssignment(m, 1, "ch2", Watching)

// Release belfore check return
s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) {
if info.GetVchan().GetChannelName() == "ch1" {
m.Release(1, "ch1")
s.checkAssignment(m, 1, "ch1", ToRelease)
rwChannel, found := m.GetChannel(nodeID, "ch1")
s.True(found)
metaInfo := rwChannel.GetWatchInfo()
s.Require().EqualValues(metaInfo.GetOpID(), 19531)
log.Info("Trying to check this info", zap.Any("meta info", rwChannel.GetWatchInfo()))
}
log.Info("Trying to check this info", zap.Any("rpc info", info))
return &datapb.ChannelOperationProgressResponse{State: datapb.ChannelWatchState_WatchSuccess, Progress: 100}, nil
}).Twice()
m.AdvanceChannelState(ctx)

s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", Watched)
})

s.Run("advance watching channels check ErrNodeNotFound", func() {
chNodes := map[string]int64{
"ch1": 1,
Expand Down
21 changes: 10 additions & 11 deletions internal/datacoord/channel_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type RWChannelStore interface {
Update(op *ChannelOpSet) error

// UpdateState is used by StateChannelStore only
UpdateState(isSuccessful bool, channels ...RWChannel)
UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)
// SegLegacyChannelByNode is used by StateChannelStore only
SetLegacyChannelByNode(nodeIDs ...int64)

Expand Down Expand Up @@ -375,18 +375,17 @@ func (c *StateChannelStore) AddNode(nodeID int64) {
}
}

func (c *StateChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) {
lo.ForEach(channels, func(ch RWChannel, _ int) {
for _, cInfo := range c.channelsInfo {
if stateChannel, ok := cInfo.Channels[ch.GetName()]; ok {
if isSuccessful {
stateChannel.(*StateChannel).TransitionOnSuccess()
} else {
stateChannel.(*StateChannel).TransitionOnFailure()
}
func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {
channelName := channel.GetName()
if cInfo, ok := c.channelsInfo[nodeID]; ok {
if stateChannel, ok := cInfo.Channels[channelName]; ok {
if isSuccessful {
stateChannel.(*StateChannel).TransitionOnSuccess(opID)
} else {
stateChannel.(*StateChannel).TransitionOnFailure(opID)
}
}
})
}
}

func (c *StateChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/channel_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,14 @@ func (s *StateChannelStoreSuite) TestUpdateState() {
ch := "ch-1"
channel := NewStateChannel(getChannel(ch, 1))
channel.setState(test.inChannelState)
store.channelsInfo[1] = &NodeChannelInfo{
store.channelsInfo[bufferID] = &NodeChannelInfo{
NodeID: bufferID,
Channels: map[string]RWChannel{
ch: channel,
},
}

store.UpdateState(test.inSuccess, channel)
store.UpdateState(test.inSuccess, bufferID, channel, 0)
s.Equal(test.outChannelState, channel.currentState)
})
}
Expand Down
34 changes: 11 additions & 23 deletions internal/datacoord/mock_channel_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading