Skip to content

Commit

Permalink
Fix datanode ttNode goroutine leak (#27878)
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Oct 24, 2023
1 parent 4faba61 commit b9d5ef3
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion internal/datanode/flow_graph_time_tick_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type ttNode struct {
updateCPLock sync.Mutex
notifyChannel chan checkPoint
closeChannel chan struct{}
closeOnce sync.Once
closeWg sync.WaitGroup
}

type checkPoint struct {
Expand All @@ -76,13 +78,19 @@ func (ttn *ttNode) IsValidInMsg(in []Msg) bool {
return true
}

func (ttn *ttNode) Close() {
ttn.closeOnce.Do(func() {
close(ttn.closeChannel)
ttn.closeWg.Wait()
})
}

// Operate handles input messages, implementing flowgraph.Node
func (ttn *ttNode) Operate(in []Msg) []Msg {
fgMsg := in[0].(*flowGraphMsg)
curTs, _ := tsoutil.ParseTS(fgMsg.timeRange.timestampMax)
if fgMsg.IsCloseMsg() {
if len(fgMsg.endPositions) > 0 {
close(ttn.closeChannel)
channelPos := ttn.channel.getChannelCheckpoint(fgMsg.endPositions[0])
log.Info("flowgraph is closing, force update channel CP",
zap.Time("cpTs", tsoutil.PhysicalTime(channelPos.GetTimestamp())),
Expand Down Expand Up @@ -151,13 +159,17 @@ func newTTNode(config *nodeConfig, broker broker.Broker) (*ttNode, error) {
broker: broker,
notifyChannel: make(chan checkPoint, 1),
closeChannel: make(chan struct{}),
closeWg: sync.WaitGroup{},
}

// check point updater
tt.closeWg.Add(1)
go func() {
defer tt.closeWg.Done()
for {
select {
case <-tt.closeChannel:
log.Info("ttNode updater exited", zap.String("channel", tt.vChannelName))
return
case cp := <-tt.notifyChannel:
tt.updateChannelCP(cp.pos, cp.curTs)
Expand Down

0 comments on commit b9d5ef3

Please sign in to comment.