Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce the goroutine in flowgraph to 2 #27727

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/datanode/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type nodeConfig struct {
serverID UniqueID
}

// start the flow graph in datasyncservice
// start the flow graph in dataSyncService
func (dsService *dataSyncService) start() {
if dsService.fg != nil {
log.Info("dataSyncService starting flow graph", zap.Int64("collectionID", dsService.collectionID),
Expand Down Expand Up @@ -357,6 +357,7 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb
ds.flushManager = flushManager

// init flowgraph

fg := flowgraph.NewTimeTickedFlowGraph(node.ctx)
dmStreamNode, err := newDmInputNode(initCtx, node.dispClient, info.GetVchan().GetSeekPosition(), config)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) {
}
}

// checkWatchedList list all nodes under [prefix]/channel/{node_id} and make sure all nodeds are watched
// checkWatchedList list all nodes under [prefix]/channel/{node_id} and make sure all nodes are watched
// serves the corner case for etcd connection lost and missing some events
func (node *DataNode) checkWatchedList() error {
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
Expand Down
4 changes: 3 additions & 1 deletion internal/datanode/flow_graph_dd_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ func (ddn *ddNode) isDropped(segID UniqueID) bool {
return false
}

func (ddn *ddNode) Close() {}
func (ddn *ddNode) Close() {
log.Info("Flowgraph DD Node closing")
}

func newDDNode(ctx context.Context, collID UniqueID, vChannelName string, droppedSegmentIDs []UniqueID,
sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, compactor *compactionExecutor,
Expand Down
15 changes: 15 additions & 0 deletions internal/datanode/flow_graph_delete_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,19 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
dn.flushManager.notifyAllFlushed()
log.Info("DeleteNode notifies BackgroundGC to release vchannel", zap.String("vChannelName", dn.channelName))
dn.clearSignal <- dn.channelName
log.Info("DeleteNode notifies BackgroundGC to release vchannel done", zap.String("vChannelName", dn.channelName))
}

for _, sp := range spans {
sp.End()
}
if !dn.IsValidInMsg(in) {
log.Info("lxg input is invalid", zap.String("vChannelName", dn.channelName))
}
if isCloseMsg(in) {
log.Info("lxg debug in is close msg", zap.String("vChannelName", dn.channelName))
}
log.Info("lxg test delete node operate ok")
return in
}

Expand Down Expand Up @@ -211,3 +219,10 @@ func newDeleteNode(ctx context.Context, fm flushManager, manager *DeltaBufferMan
clearSignal: sig,
}, nil
}

func isCloseMsg(msgs []Msg) bool {
if len(msgs) == 1 {
return msgs[0].IsClose()
}
return false
}
14 changes: 9 additions & 5 deletions internal/util/flowgraph/flow_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,22 @@ import (
// TimeTickedFlowGraph flowgraph with input from tt msg stream
type TimeTickedFlowGraph struct {
nodeCtx map[NodeName]*nodeCtx
nodeCtxManager *nodeCtxManager
stopOnce sync.Once
startOnce sync.Once
closeWg *sync.WaitGroup
closeGracefully *atomic.Bool
}

// AddNode add Node into flowgraph
// AddNode add Node into flowgraph and fill nodeCtxManager
func (fg *TimeTickedFlowGraph) AddNode(node Node) {
nodeCtx := nodeCtx{
node: node,
closeCh: make(chan struct{}),
closeWg: fg.closeWg,
node: node,
}
fg.nodeCtx[node.Name()] = &nodeCtx
if node.IsInputNode() {
fg.nodeCtxManager = NewNodeCtxManager(&nodeCtx, fg.closeWg)
}
}

// SetEdges set directed edges from in nodes to out nodes
Expand Down Expand Up @@ -79,8 +81,9 @@ func (fg *TimeTickedFlowGraph) SetEdges(nodeName string, out []string) error {
func (fg *TimeTickedFlowGraph) Start() {
fg.startOnce.Do(func() {
for _, v := range fg.nodeCtx {
v.Start()
v.node.Start()
}
fg.nodeCtxManager.Start()
})
}

Expand Down Expand Up @@ -120,6 +123,7 @@ func (fg *TimeTickedFlowGraph) Close() {
func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph {
flowGraph := TimeTickedFlowGraph{
nodeCtx: make(map[string]*nodeCtx),
nodeCtxManager: &nodeCtxManager{},
closeWg: &sync.WaitGroup{},
closeGracefully: atomic.NewBool(CloseImmediately),
}
Expand Down
16 changes: 10 additions & 6 deletions internal/util/flowgraph/flow_graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// nodeD: count c = b + 2

type nodeA struct {
BaseNode
InputNode
inputChan chan float64
a float64
}
Expand All @@ -47,7 +47,7 @@ type nodeB struct {

type nodeC struct {
BaseNode
d float64
c float64
outputChan chan float64
}

Expand Down Expand Up @@ -117,8 +117,10 @@ func createExampleFlowGraph() (*TimeTickedFlowGraph, chan float64, chan float64,
fg := NewTimeTickedFlowGraph(ctx)

var a Node = &nodeA{
BaseNode: BaseNode{
maxQueueLength: MaxQueueLength,
InputNode: InputNode{
BaseNode: BaseNode{
maxQueueLength: MaxQueueLength,
},
},
inputChan: inputChan,
}
Expand Down Expand Up @@ -175,8 +177,10 @@ func TestTimeTickedFlowGraph_AddNode(t *testing.T) {
fg := NewTimeTickedFlowGraph(context.TODO())

var a Node = &nodeA{
BaseNode: BaseNode{
maxQueueLength: MaxQueueLength,
InputNode: InputNode{
BaseNode: BaseNode{
maxQueueLength: MaxQueueLength,
},
},
inputChan: inputChan,
}
Expand Down
2 changes: 0 additions & 2 deletions internal/util/flowgraph/input_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package flowgraph
import (
"context"
"fmt"
"sync"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -49,7 +48,6 @@ type InputNode struct {
collectionID int64
dataType string

closeOnce sync.Once
closeGracefully *atomic.Bool
}

Expand Down
Loading
Loading