From 4424c9e5e7f4c3c758c8d043281d875119801a29 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 20 Jun 2024 14:42:00 +0800 Subject: [PATCH] fix: [2.4] Remove loopclosure issue in ChannelManagerImplV2 (#33989) (#34004) Cherry-pick from master pr: #33989 See also #33987 Signed-off-by: Congqi Xia --- internal/datacoord/channel_manager_v2.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/datacoord/channel_manager_v2.go b/internal/datacoord/channel_manager_v2.go index 53504e16b020c..cee393b1b7ddc 100644 --- a/internal/datacoord/channel_manager_v2.go +++ b/internal/datacoord/channel_manager_v2.go @@ -529,6 +529,7 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies if channelCount == 0 { continue } + nodeID := nodeAssign.NodeID var ( succeededChannels = make([]RWChannel, 0, channelCount) @@ -548,7 +549,7 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies tmpWatchInfo.Vchan = m.h.GetDataVChanPositions(innerCh, allPartitionID) future := getOrCreateIOPool().Submit(func() (any, error) { - err := m.Notify(ctx, nodeAssign.NodeID, tmpWatchInfo) + err := m.Notify(ctx, nodeID, tmpWatchInfo) return innerCh, err }) futures = append(futures, future) @@ -591,6 +592,7 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []* continue } + nodeID := nodeAssign.NodeID futures := make([]*conc.Future[any], 0, len(nodeAssign.Channels)) chNames := lo.Keys(nodeAssign.Channels) @@ -603,7 +605,7 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []* innerCh := ch future := getOrCreateIOPool().Submit(func() (any, error) { - successful, got := m.Check(ctx, nodeAssign.NodeID, innerCh.GetWatchInfo()) + successful, got := m.Check(ctx, nodeID, innerCh.GetWatchInfo()) if got { return poolResult{ successful: successful,