Skip to content

Commit

Permalink
enhance: Add FlowgraphManager interface (#28852)
Browse files Browse the repository at this point in the history
- Change flowgraphManager to fgManagerImpl
- Change close to stop
- change execute to controlMemWaterLevel
- Change method name of fgManager for readability
- Add mockery for fgmanager

Issue: #28853

---------

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Nov 30, 2023
1 parent bf633bb commit e62edb9
Show file tree
Hide file tree
Showing 13 changed files with 608 additions and 174 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/datanode/io --output=$(PWD)/internal/datanode/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage
$(INSTALL_PATH)/mockery --name=FlowgraphManager --dir=$(PWD)/internal/datanode --output=$(PWD)/internal/datanode --filename=mock_fgmanager.go --with-expecter --structname=MockFlowgraphManager --outpkg=datanode --inpackage

generate-mockery-metastore: getdeps
$(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks
Expand Down
32 changes: 15 additions & 17 deletions internal/datanode/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ type ChannelManager struct {
mu sync.RWMutex
dn *DataNode

communicateCh chan *opState
runningFlowgraphs *flowgraphManager
opRunners *typeutil.ConcurrentMap[string, *opRunner] // channel -> runner
abnormals *typeutil.ConcurrentMap[int64, string] // OpID -> Channel
fgManager FlowgraphManager

communicateCh chan *opState
opRunners *typeutil.ConcurrentMap[string, *opRunner] // channel -> runner
abnormals *typeutil.ConcurrentMap[int64, string] // OpID -> Channel

releaseFunc releaseFunc

Expand All @@ -52,14 +53,14 @@ type ChannelManager struct {
func NewChannelManager(dn *DataNode) *ChannelManager {
fm := newFlowgraphManager()
cm := ChannelManager{
dn: dn,
dn: dn,
fgManager: fm,

communicateCh: make(chan *opState, 100),
runningFlowgraphs: fm,
opRunners: typeutil.NewConcurrentMap[string, *opRunner](),
abnormals: typeutil.NewConcurrentMap[int64, string](),
communicateCh: make(chan *opState, 100),
opRunners: typeutil.NewConcurrentMap[string, *opRunner](),
abnormals: typeutil.NewConcurrentMap[int64, string](),

releaseFunc: fm.release,
releaseFunc: fm.RemoveFlowgraph,

closeCh: make(chan struct{}),
}
Expand All @@ -84,7 +85,7 @@ func (m *ChannelManager) GetProgress(info *datapb.ChannelWatchInfo) *datapb.Chan
channel := info.GetVchan().GetChannelName()
switch info.GetState() {
case datapb.ChannelWatchState_ToWatch:
if m.runningFlowgraphs.existWithOpID(channel, info.GetOpID()) {
if m.fgManager.HasFlowgraphWithOpID(channel, info.GetOpID()) {
resp.State = datapb.ChannelWatchState_WatchSuccess
return resp
}
Expand All @@ -101,7 +102,7 @@ func (m *ChannelManager) GetProgress(info *datapb.ChannelWatchInfo) *datapb.Chan
return resp

case datapb.ChannelWatchState_ToRelease:
if !m.runningFlowgraphs.exist(channel) {
if !m.fgManager.HasFlowgraph(channel) {
resp.State = datapb.ChannelWatchState_ReleaseSuccess
return resp
}
Expand All @@ -126,16 +127,13 @@ func (m *ChannelManager) Close() {
runner.Close()
return true
})
m.runningFlowgraphs.close()
close(m.closeCh)
m.closeWaiter.Wait()
})
}

func (m *ChannelManager) Start() {
m.closeWaiter.Add(2)

go m.runningFlowgraphs.start(&m.closeWaiter)
m.closeWaiter.Add(1)
go func() {
defer m.closeWaiter.Done()
log.Info("DataNode ChannelManager start")
Expand All @@ -162,7 +160,7 @@ func (m *ChannelManager) handleOpState(opState *opState) {
switch opState.state {
case datapb.ChannelWatchState_WatchSuccess:
log.Info("Success to watch")
m.runningFlowgraphs.Add(opState.fg)
m.fgManager.AddFlowgraph(opState.fg)
m.finishOp(opState.opID, opState.channel)

case datapb.ChannelWatchState_WatchFailure:
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() {
s.Equal(datapb.ChannelWatchState_ToWatch, resp.GetState())

s.manager.handleOpState(opState)
s.Equal(1, s.manager.runningFlowgraphs.getFlowGraphNum())
s.Equal(1, s.manager.fgManager.GetFlowgraphCount())
s.True(s.manager.opRunners.Contain(info.GetVchan().GetChannelName()))
s.Equal(1, s.manager.opRunners.Len())

Expand All @@ -182,7 +182,7 @@ func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() {
s.Equal(info.GetOpID(), resp.GetOpID())
s.Equal(datapb.ChannelWatchState_ReleaseSuccess, resp.GetState())

s.Equal(0, s.manager.runningFlowgraphs.getFlowGraphNum())
s.Equal(0, s.manager.fgManager.GetFlowgraphCount())
s.False(s.manager.opRunners.Contain(info.GetVchan().GetChannelName()))
s.Equal(0, s.manager.opRunners.Len())
}
8 changes: 2 additions & 6 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type DataNode struct {
cancel context.CancelFunc
Role string
stateCode atomic.Value // commonpb.StateCode_Initializing
flowgraphManager *flowgraphManager
flowgraphManager FlowgraphManager
eventManagerMap *typeutil.ConcurrentMap[string, *channelEventManager]

syncMgr syncmgr.SyncManager
Expand Down Expand Up @@ -310,7 +310,7 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
// tryToReleaseFlowgraph tries to release a flowgraph
func (node *DataNode) tryToReleaseFlowgraph(vChanName string) {
log.Info("try to release flowgraph", zap.String("vChanName", vChanName))
node.flowgraphManager.release(vChanName)
node.flowgraphManager.RemoveFlowgraph(vChanName)
}

// BackGroundGC runs in background to release datanode resources
Expand Down Expand Up @@ -382,9 +382,6 @@ func (node *DataNode) Start() error {
// Start node watch node
go node.StartWatchChannels(node.ctx)

node.stopWaiter.Add(1)
go node.flowgraphManager.start(&node.stopWaiter)

node.UpdateStateCode(commonpb.StateCode_Healthy)
})
return startErr
Expand Down Expand Up @@ -417,7 +414,6 @@ func (node *DataNode) Stop() error {
node.stopOnce.Do(func() {
// https://github.com/milvus-io/milvus/issues/12282
node.UpdateStateCode(commonpb.StateCode_Abnormal)
node.flowgraphManager.close()
// Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the flow graph
node.cancel()

Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/data_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,14 @@ func TestDataNode(t *testing.T) {
}

for _, test := range testDataSyncs {
err = node.flowgraphManager.addAndStartWithEtcdTickler(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler())
err = node.flowgraphManager.AddandStartWithEtcdTickler(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler())
assert.NoError(t, err)
vchanNameCh <- test.dmChannelName
}

assert.Eventually(t, func() bool {
for _, test := range testDataSyncs {
if node.flowgraphManager.exist(test.dmChannelName) {
if node.flowgraphManager.HasFlowgraph(test.dmChannelName) {
return false
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version

switch watchInfo.State {
case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch:
if err := node.flowgraphManager.addAndStartWithEtcdTickler(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
if err := node.flowgraphManager.AddandStartWithEtcdTickler(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err))
watchInfo.State = datapb.ChannelWatchState_WatchFailure
} else {
Expand Down
16 changes: 8 additions & 8 deletions internal/datanode/event_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestWatchChannel(t *testing.T) {
assert.NoError(t, err)

assert.Eventually(t, func() bool {
exist := node.flowgraphManager.exist(ch)
exist := node.flowgraphManager.HasFlowgraph(ch)
if !exist {
return false
}
Expand All @@ -119,7 +119,7 @@ func TestWatchChannel(t *testing.T) {
assert.NoError(t, err)

assert.Eventually(t, func() bool {
exist := node.flowgraphManager.exist(ch)
exist := node.flowgraphManager.HasFlowgraph(ch)
return !exist
}, 3*time.Second, 100*time.Millisecond)
})
Expand Down Expand Up @@ -170,15 +170,15 @@ func TestWatchChannel(t *testing.T) {

// wait for check goroutine received 2 events
<-c
exist := node.flowgraphManager.exist(ch)
exist := node.flowgraphManager.HasFlowgraph(ch)
assert.False(t, exist)

err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID()))
assert.NoError(t, err)
// TODO there is not way to sync Release done, use sleep for now
time.Sleep(100 * time.Millisecond)

exist = node.flowgraphManager.exist(ch)
exist = node.flowgraphManager.HasFlowgraph(ch)
assert.False(t, exist)
})

Expand All @@ -189,7 +189,7 @@ func TestWatchChannel(t *testing.T) {

node.handleWatchInfo(e, "test1", []byte{23})

exist := node.flowgraphManager.exist("test1")
exist := node.flowgraphManager.HasFlowgraph("test1")
assert.False(t, exist)

info := datapb.ChannelWatchInfo{
Expand All @@ -200,7 +200,7 @@ func TestWatchChannel(t *testing.T) {
assert.NoError(t, err)
node.handleWatchInfo(e, "test2", bs)

exist = node.flowgraphManager.exist("test2")
exist = node.flowgraphManager.HasFlowgraph("test2")
assert.False(t, exist)

chPut := make(chan struct{}, 1)
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestWatchChannel(t *testing.T) {
node.factory = &FailMessageStreamFactory{}
node.handleWatchInfo(e, ch, bs)
<-chPut
exist = node.flowgraphManager.exist(ch)
exist = node.flowgraphManager.HasFlowgraph(ch)
assert.True(t, exist)
})

Expand Down Expand Up @@ -275,7 +275,7 @@ func TestWatchChannel(t *testing.T) {

node.handleWatchInfo(e, ch, bs)
<-chPut
exist := node.flowgraphManager.exist("test3")
exist := node.flowgraphManager.HasFlowgraph("test3")
assert.False(t, exist)
})

Expand Down
Loading

0 comments on commit e62edb9

Please sign in to comment.