From 8ce1e90ebaac4f4ed71b467f829241867f44e5d4 Mon Sep 17 00:00:00 2001 From: lixinguo Date: Fri, 13 Oct 2023 11:05:50 +0800 Subject: [PATCH] Reduce the goroutine in flowgraph to 2 Signed-off-by: lixinguo --- internal/datanode/data_sync_service.go | 3 +- internal/datanode/event_manager.go | 2 +- internal/datanode/flow_graph_dd_node.go | 4 +- internal/datanode/flow_graph_delete_node.go | 15 ++ internal/util/flowgraph/flow_graph.go | 14 +- internal/util/flowgraph/flow_graph_test.go | 16 +- internal/util/flowgraph/input_node.go | 2 - internal/util/flowgraph/node.go | 226 ++++++++++++++------ internal/util/flowgraph/node_test.go | 27 ++- pkg/util/timerecord/group_checker.go | 4 + 10 files changed, 227 insertions(+), 86 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index c888060cefc95..e0f6bca8e9ff6 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -81,7 +81,7 @@ type nodeConfig struct { serverID UniqueID } -// start the flow graph in datasyncservice +// start the flow graph in dataSyncService func (dsService *dataSyncService) start() { if dsService.fg != nil { log.Info("dataSyncService starting flow graph", zap.Int64("collectionID", dsService.collectionID), @@ -357,6 +357,7 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb ds.flushManager = flushManager // init flowgraph + fg := flowgraph.NewTimeTickedFlowGraph(node.ctx) dmStreamNode, err := newDmInputNode(initCtx, node.dispClient, info.GetVchan().GetSeekPosition(), config) if err != nil { diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index 9db8448076144..51dc8817dab62 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -84,7 +84,7 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) { } } -// checkWatchedList list all nodes under [prefix]/channel/{node_id} and make sure all nodeds are watched +// checkWatchedList list all nodes under [prefix]/channel/{node_id} and make sure all nodes are watched // serves the corner case for etcd connection lost and missing some events func (node *DataNode) checkWatchedList() error { // REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name} diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 5487eebe36f5c..6c8aac66c4b6a 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -276,7 +276,9 @@ func (ddn *ddNode) isDropped(segID UniqueID) bool { return false } -func (ddn *ddNode) Close() {} +func (ddn *ddNode) Close() { + log.Info("Flowgraph DD Node closing") +} func newDDNode(ctx context.Context, collID UniqueID, vChannelName string, droppedSegmentIDs []UniqueID, sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, compactor *compactionExecutor, diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index faccda3a53fdd..0c282f95ab9ca 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -146,11 +146,19 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { dn.flushManager.notifyAllFlushed() log.Info("DeleteNode notifies BackgroundGC to release vchannel", zap.String("vChannelName", dn.channelName)) dn.clearSignal <- dn.channelName + log.Info("DeleteNode notifies BackgroundGC to release vchannel done", zap.String("vChannelName", dn.channelName)) } for _, sp := range spans { sp.End() } + if !dn.IsValidInMsg(in) { + log.Info("lxg input is invalid", zap.String("vChannelName", dn.channelName)) + } + if isCloseMsg(in) { + log.Info("lxg debug in is close msg", zap.String("vChannelName", dn.channelName)) + } + log.Info("lxg test delete node operate ok") return in } @@ -211,3 +219,10 @@ func newDeleteNode(ctx context.Context, fm flushManager, manager *DeltaBufferMan clearSignal: sig, }, nil } + +func isCloseMsg(msgs []Msg) bool { + if len(msgs) == 1 { + return msgs[0].IsClose() + } + return false +} diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index 46b6cd79b61e4..f2ad3cd580703 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -30,20 +30,22 @@ import ( // TimeTickedFlowGraph flowgraph with input from tt msg stream type TimeTickedFlowGraph struct { nodeCtx map[NodeName]*nodeCtx + nodeCtxManager *nodeCtxManager stopOnce sync.Once startOnce sync.Once closeWg *sync.WaitGroup closeGracefully *atomic.Bool } -// AddNode add Node into flowgraph +// AddNode add Node into flowgraph and fill nodeCtxManager func (fg *TimeTickedFlowGraph) AddNode(node Node) { nodeCtx := nodeCtx{ - node: node, - closeCh: make(chan struct{}), - closeWg: fg.closeWg, + node: node, } fg.nodeCtx[node.Name()] = &nodeCtx + if node.IsInputNode() { + fg.nodeCtxManager = NewNodeCtxManager(&nodeCtx, fg.closeWg) + } } // SetEdges set directed edges from in nodes to out nodes @@ -79,8 +81,9 @@ func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, out []string) error { func (fg *TimeTickedFlowGraph) Start() { fg.startOnce.Do(func() { for _, v := range fg.nodeCtx { - v.Start() + v.node.Start() } + fg.nodeCtxManager.Start() }) } @@ -120,6 +123,7 @@ func (fg *TimeTickedFlowGraph) Close() { func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph { flowGraph := TimeTickedFlowGraph{ nodeCtx: make(map[string]*nodeCtx), + nodeCtxManager: &nodeCtxManager{}, closeWg: &sync.WaitGroup{}, closeGracefully: atomic.NewBool(CloseImmediately), } diff --git a/internal/util/flowgraph/flow_graph_test.go b/internal/util/flowgraph/flow_graph_test.go index dbe87b371740a..a745b72597106 100644 --- a/internal/util/flowgraph/flow_graph_test.go +++ b/internal/util/flowgraph/flow_graph_test.go @@ -35,7 +35,7 @@ import ( // nodeD: count c = b + 2 type nodeA struct { - BaseNode + InputNode inputChan chan float64 a float64 } @@ -47,7 +47,7 @@ type nodeB struct { type nodeC struct { BaseNode - d float64 + c float64 outputChan chan float64 } @@ -117,8 +117,10 @@ func createExampleFlowGraph() (*TimeTickedFlowGraph, chan float64, chan float64, fg := NewTimeTickedFlowGraph(ctx) var a Node = &nodeA{ - BaseNode: BaseNode{ - maxQueueLength: MaxQueueLength, + InputNode: InputNode{ + BaseNode: BaseNode{ + maxQueueLength: MaxQueueLength, + }, }, inputChan: inputChan, } @@ -175,8 +177,10 @@ func TestTimeTickedFlowGraph_AddNode(t *testing.T) { fg := NewTimeTickedFlowGraph(context.TODO()) var a Node = &nodeA{ - BaseNode: BaseNode{ - maxQueueLength: MaxQueueLength, + InputNode: InputNode{ + BaseNode: BaseNode{ + maxQueueLength: MaxQueueLength, + }, }, inputChan: inputChan, } diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index 6100f7cdf21ca..c42c42b6e9c61 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -19,7 +19,6 @@ package flowgraph import ( "context" "fmt" - "sync" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" @@ -49,7 +48,6 @@ type InputNode struct { collectionID int64 dataType string - closeOnce sync.Once closeGracefully *atomic.Bool } diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index 9962b5c765e66..0c374607bebd4 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -53,58 +53,42 @@ type BaseNode struct { maxParallelism int32 } -// nodeCtx maintains the running context for a Node in flowgragh -type nodeCtx struct { - node Node - inputChannel chan []Msg - downstream *nodeCtx - - closeCh chan struct{} // notify work to exit - closeWg *sync.WaitGroup +// manage nodeCtx +type nodeCtxManager struct { + inputNodeCtx *nodeCtx + closeWg *sync.WaitGroup + checker *timerecord.GroupChecker - blockMutex sync.RWMutex -} - -// Start invoke Node `Start` method and start a worker goroutine -func (nodeCtx *nodeCtx) Start() { - nodeCtx.node.Start() - - nodeCtx.closeWg.Add(1) - go nodeCtx.work() -} + closeOnce sync.Once -func (nodeCtx *nodeCtx) Block() { - // input node operate function will be blocking - if !nodeCtx.node.IsInputNode() { - startTs := time.Now() - nodeCtx.blockMutex.Lock() - if time.Since(startTs) >= blockAllWait { - log.Warn("flow graph wait for long time", - zap.String("name", nodeCtx.node.Name()), - zap.Duration("wait time", time.Since(startTs))) - } - } + inputNodeCloseCh chan struct{} // notify input node work to exit + ddNodeCloseCh chan struct{} // notify ddnode and downstream node work to exit } -func (nodeCtx *nodeCtx) Unblock() { - if !nodeCtx.node.IsInputNode() { - nodeCtx.blockMutex.Unlock() +// NewNodeCtxManager init with the inputNode and fg.closeWg +func NewNodeCtxManager(nodeCtx *nodeCtx, closeWg *sync.WaitGroup) *nodeCtxManager { + return &nodeCtxManager{ + inputNodeCtx: nodeCtx, + closeWg: closeWg, + inputNodeCloseCh: make(chan struct{}), + ddNodeCloseCh: make(chan struct{}), } } -func isCloseMsg(msgs []Msg) bool { - if len(msgs) == 1 { - return msgs[0].IsClose() - } - return false +// Start invoke Node `Start` method and start a worker goroutine +func (nodeCtxManager *nodeCtxManager) Start() { + // in dmInputNode, message from mq to channel, alloc goroutines + // limit the goroutines in other node to prevent huge goroutines numbers + nodeCtxManager.closeWg.Add(2) + go nodeCtxManager.inputNodeWork() + go nodeCtxManager.ddNodeWork() } -// work handles node work spinning -// 1. collectMessage from upstream or just produce Msg from InputNode -// 2. invoke node.Operate -// 3. deliver the Operate result to downstream nodes -func (nodeCtx *nodeCtx) work() { - name := fmt.Sprintf("nodeCtxTtChecker-%s", nodeCtx.node.Name()) +func (nodeCtxManager *nodeCtxManager) inputNodeWork() { + defer nodeCtxManager.closeWg.Done() + inputNode := nodeCtxManager.inputNodeCtx + name := fmt.Sprintf("nodeCtxTtChecker-%s", inputNode.node.Name()) + // tt checker start var checker *timerecord.GroupChecker if enableTtChecker { checker = timerecord.GetGroupChecker("fgNode", nodeCtxTtInterval, func(list []string) { @@ -116,50 +100,168 @@ func (nodeCtx *nodeCtx) work() { for { select { - case <-nodeCtx.closeCh: - log.Debug("flow graph node closed", zap.String("nodeName", nodeCtx.node.Name())) + case <-nodeCtxManager.inputNodeCloseCh: + log.Info("lxg test input node closech") return + // handles node work spinning + // 1. collectMessage from upstream or just produce Msg from InputNode + // 2. invoke node.Operate + // 3. deliver the Operate result to downstream nodes default: // inputs from inputsMessages for Operate var input, output []Msg - if !nodeCtx.node.IsInputNode() { - input = <-nodeCtx.inputChannel - } + // inputNode.input not from nodeCtx.inputChannel // the input message decides whether the operate method is executed - n := nodeCtx.node - nodeCtx.blockMutex.RLock() + n := inputNode.node + inputNode.blockMutex.RLock() + log.Info("lxg test inputNode receive in", zap.Any("node name", inputNode.node.Name()), zap.Any("input", input)) if !n.IsValidInMsg(input) { - nodeCtx.blockMutex.RUnlock() + log.Info("lxg test input is invalid", zap.Any("node name", inputNode.node.Name()), zap.Any("input", input)) + inputNode.blockMutex.RUnlock() continue } + log.Info("lxg test operate before", zap.Any("node name", inputNode.node.Name()), zap.Any("input", input)) output = n.Operate(input) - nodeCtx.blockMutex.RUnlock() + inputNode.blockMutex.RUnlock() // the output decide whether the node should be closed. + log.Info("lxg test input node output", zap.Any("node name", inputNode.node.Name()), zap.Any("output", output)) if isCloseMsg(output) { - close(nodeCtx.closeCh) - nodeCtx.closeWg.Done() - nodeCtx.node.Close() - if nodeCtx.inputChannel != nil { - close(nodeCtx.inputChannel) + log.Info("lxg debug", zap.Any("node close name", inputNode.node.Name())) + close(nodeCtxManager.inputNodeCloseCh) + if inputNode.inputChannel != nil { + close(inputNode.inputChannel) } } - + // deliver to all following flow graph node. + inputNode.downstream.inputChannel <- output if enableTtChecker { checker.Check(name) } + } + } +} - // deliver to all following flow graph node. - if nodeCtx.downstream != nil { - nodeCtx.downstream.inputChannel <- output +func (nodeCtxManager *nodeCtxManager) ddNodeWork() { + defer nodeCtxManager.closeWg.Done() + ddNode := nodeCtxManager.inputNodeCtx.downstream + curNode := ddNode + // tt checker start + var checker *timerecord.GroupChecker + if enableTtChecker { + checker = timerecord.GetGroupChecker("fgNode", nodeCtxTtInterval, func(list []string) { + log.Warn("some node(s) haven't received input", zap.Strings("list", list), zap.Duration("duration ", nodeCtxTtInterval)) + }) + for curNode != nil { + name := fmt.Sprintf("nodeCtxTtChecker-%s", curNode.node.Name()) + checker.Check(name) + curNode = curNode.downstream + defer checker.Remove(name) + } + } + + for { + select { + case <-nodeCtxManager.ddNodeCloseCh: + log.Info("lxg test node closech") + return + // handles node work spinning + // 1. collectMessage from upstream or just produce Msg from InputNode + // 2. invoke node.Operate + // 3. deliver the Operate result to downstream nodes + default: + // goroutine will work loop for all node(expect inpuNode) even when closeCh notify to exit + // input node will close all node + curNode = ddNode + for curNode != nil { + // inputs from inputsMessages for Operate + var input, output []Msg + input = <-curNode.inputChannel + // the input message decides whether the operate method is executed + n := curNode.node + log.Info("lxg test receive in", zap.Any("node name", curNode.node.Name()), zap.Any("input", input)) + curNode.blockMutex.RLock() + if !n.IsValidInMsg(input) { + log.Info("lxg input is invalid", zap.Any("node close name", curNode.node.Name()), zap.Any("input", input)) + curNode.blockMutex.RUnlock() + curNode = ddNode + continue + } + log.Info("lxg test operate before", zap.Any("node name", curNode.node.Name()), zap.Any("input", input)) + output = n.Operate(input) + curNode.blockMutex.RUnlock() + log.Info("lxg test output", zap.Any("node name", curNode.node.Name()), zap.Any("output", output)) + // the output decide whether the node should be closed. + if isCloseMsg(output) { + log.Info("lxg debug", zap.Any("node close name", curNode.node.Name())) + nodeCtxManager.closeOnce.Do(func() { + close(nodeCtxManager.ddNodeCloseCh) + }) + if curNode.inputChannel != nil { + close(curNode.inputChannel) + } + } + // deliver to all following flow graph node. + if curNode.downstream != nil { + curNode.downstream.inputChannel <- output + } + if enableTtChecker { + checker.Check(fmt.Sprintf("nodeCtxTtChecker-%s", curNode.node.Name())) + } + curNode = curNode.downstream } } } } +// Close handles cleanup logic and notify worker to quit +func (nodeCtxManager *nodeCtxManager) Close() { + nodeCtx := nodeCtxManager.inputNodeCtx + nodeCtx.Close() +} + +// nodeCtx maintains the running context for a Node in flowgragh +type nodeCtx struct { + node Node + inputChannel chan []Msg + downstream *nodeCtx + + blockMutex sync.RWMutex +} + +func (nodeCtx *nodeCtx) Block() { + // input node operate function will be blocking + if !nodeCtx.node.IsInputNode() { + startTs := time.Now() + nodeCtx.blockMutex.Lock() + if time.Since(startTs) >= blockAllWait { + log.Warn("flow graph wait for long time", + zap.String("name", nodeCtx.node.Name()), + zap.Duration("wait time", time.Since(startTs))) + } + } +} + +func (nodeCtx *nodeCtx) Unblock() { + if !nodeCtx.node.IsInputNode() { + nodeCtx.blockMutex.Unlock() + } +} + +func isCloseMsg(msgs []Msg) bool { + if len(msgs) == 1 { + return msgs[0].IsClose() + } + return false +} + // Close handles cleanup logic and notify worker to quit func (nodeCtx *nodeCtx) Close() { if nodeCtx.node.IsInputNode() { - nodeCtx.node.Close() + for nodeCtx != nil { + nodeCtx.node.Close() + log.Debug("flow graph node closed", zap.String("nodeName", nodeCtx.node.Name())) + nodeCtx = nodeCtx.downstream + } } } diff --git a/internal/util/flowgraph/node_test.go b/internal/util/flowgraph/node_test.go index a88dbcc151f14..b30f679e9c795 100644 --- a/internal/util/flowgraph/node_test.go +++ b/internal/util/flowgraph/node_test.go @@ -56,7 +56,7 @@ func generateMsgPack() msgstream.MsgPack { return msgPack } -func TestNodeCtx_Start(t *testing.T) { +func TestNodeManager_Start(t *testing.T) { t.Setenv("ROCKSMQ_PATH", "/tmp/MilvusTest/FlowGraph/TestNodeStart") factory := dependency.NewDefaultFactory(true) @@ -76,19 +76,30 @@ func TestNodeCtx_Start(t *testing.T) { nodeName := "input_node" inputNode := NewInputNode(msgStream.Chan(), nodeName, 100, 100, "", 0, 0, "") - node := &nodeCtx{ - node: inputNode, - closeCh: make(chan struct{}), - closeWg: &sync.WaitGroup{}, + ddNode := BaseNode{} + + node0 := &nodeCtx{ + node: inputNode, + } + + node1 := &nodeCtx{ + node: &ddNode, } - node.inputChannel = make(chan []Msg) + node0.downstream = node1 + + node0.inputChannel = make(chan []Msg) + + nodeCtxManager := &nodeCtxManager{ + inputNodeCtx: node0, + closeWg: &sync.WaitGroup{}, + } assert.NotPanics(t, func() { - node.Start() + nodeCtxManager.Start(nil) }) - node.Close() + nodeCtxManager.Close() } func TestBaseNode(t *testing.T) { diff --git a/pkg/util/timerecord/group_checker.go b/pkg/util/timerecord/group_checker.go index d8502884d7938..fd8f5ad69ca6b 100644 --- a/pkg/util/timerecord/group_checker.go +++ b/pkg/util/timerecord/group_checker.go @@ -20,7 +20,9 @@ import ( "sync" "time" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" + "go.uber.org/zap" ) // groups maintains string to GroupChecker @@ -77,11 +79,13 @@ func (gc *GroupChecker) work() { // Check updates the latest timestamp for provided name func (gc *GroupChecker) Check(name string) { + log.Info("lxg debug groupchecker insert", zap.String("name", name)) gc.lastest.Insert(name, time.Now()) } // Remove deletes name from watch list func (gc *GroupChecker) Remove(name string) { + log.Info("lxg debug groupchecker remove", zap.String("name", name)) gc.lastest.GetAndRemove(name) }