diff --git a/Makefile b/Makefile index b2545385f117c..ff56a26fdbbc9 100644 --- a/Makefile +++ b/Makefile @@ -438,6 +438,7 @@ generate-mockery-datanode: getdeps $(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/datanode/io --output=$(PWD)/internal/datanode/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage + $(INSTALL_PATH)/mockery --name=FlowgraphManager --dir=$(PWD)/internal/datanode --output=$(PWD)/internal/datanode --filename=mock_fgmanager.go --with-expecter --structname=MockFlowgraphManager --outpkg=datanode --inpackage generate-mockery-metastore: getdeps $(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks diff --git a/internal/datanode/channel_manager.go b/internal/datanode/channel_manager.go index 3407c8bb33e69..2caf372429d89 100644 --- a/internal/datanode/channel_manager.go +++ b/internal/datanode/channel_manager.go @@ -37,10 +37,11 @@ type ChannelManager struct { mu sync.RWMutex dn *DataNode - communicateCh chan *opState - runningFlowgraphs *flowgraphManager - opRunners *typeutil.ConcurrentMap[string, *opRunner] // channel -> runner - abnormals *typeutil.ConcurrentMap[int64, string] // OpID -> Channel + fgManager FlowgraphManager + + communicateCh chan *opState + opRunners *typeutil.ConcurrentMap[string, *opRunner] // channel -> runner + abnormals *typeutil.ConcurrentMap[int64, string] // OpID -> Channel releaseFunc releaseFunc @@ -52,14 +53,14 @@ type ChannelManager struct { func NewChannelManager(dn *DataNode) *ChannelManager { fm := newFlowgraphManager() cm := ChannelManager{ - dn: dn, + dn: dn, + fgManager: fm, - communicateCh: make(chan *opState, 100), - runningFlowgraphs: fm, - opRunners: typeutil.NewConcurrentMap[string, *opRunner](), - abnormals: typeutil.NewConcurrentMap[int64, string](), + communicateCh: make(chan *opState, 100), + opRunners: typeutil.NewConcurrentMap[string, *opRunner](), + abnormals: typeutil.NewConcurrentMap[int64, string](), - releaseFunc: fm.release, + releaseFunc: fm.RemoveFlowgraph, closeCh: make(chan struct{}), } @@ -84,7 +85,7 @@ func (m *ChannelManager) GetProgress(info *datapb.ChannelWatchInfo) *datapb.Chan channel := info.GetVchan().GetChannelName() switch info.GetState() { case datapb.ChannelWatchState_ToWatch: - if m.runningFlowgraphs.existWithOpID(channel, info.GetOpID()) { + if m.fgManager.HasFlowgraphWithOpID(channel, info.GetOpID()) { resp.State = datapb.ChannelWatchState_WatchSuccess return resp } @@ -101,7 +102,7 @@ func (m *ChannelManager) GetProgress(info *datapb.ChannelWatchInfo) *datapb.Chan return resp case datapb.ChannelWatchState_ToRelease: - if !m.runningFlowgraphs.exist(channel) { + if !m.fgManager.HasFlowgraph(channel) { resp.State = datapb.ChannelWatchState_ReleaseSuccess return resp } @@ -126,16 +127,13 @@ func (m *ChannelManager) Close() { runner.Close() return true }) - m.runningFlowgraphs.close() close(m.closeCh) m.closeWaiter.Wait() }) } func (m *ChannelManager) Start() { - m.closeWaiter.Add(2) - - go m.runningFlowgraphs.start(&m.closeWaiter) + m.closeWaiter.Add(1) go func() { defer m.closeWaiter.Done() log.Info("DataNode ChannelManager start") @@ -162,7 +160,7 @@ func (m *ChannelManager) handleOpState(opState *opState) { switch opState.state { case datapb.ChannelWatchState_WatchSuccess: log.Info("Success to watch") - m.runningFlowgraphs.Add(opState.fg) + m.fgManager.AddFlowgraph(opState.fg) m.finishOp(opState.opID, opState.channel) case datapb.ChannelWatchState_WatchFailure: diff --git a/internal/datanode/channel_manager_test.go b/internal/datanode/channel_manager_test.go index 16281dacc6028..a009f1d454d32 100644 --- a/internal/datanode/channel_manager_test.go +++ b/internal/datanode/channel_manager_test.go @@ -159,7 +159,7 @@ func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() { s.Equal(datapb.ChannelWatchState_ToWatch, resp.GetState()) s.manager.handleOpState(opState) - s.Equal(1, s.manager.runningFlowgraphs.getFlowGraphNum()) + s.Equal(1, s.manager.fgManager.GetFlowgraphCount()) s.True(s.manager.opRunners.Contain(info.GetVchan().GetChannelName())) s.Equal(1, s.manager.opRunners.Len()) @@ -182,7 +182,7 @@ func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() { s.Equal(info.GetOpID(), resp.GetOpID()) s.Equal(datapb.ChannelWatchState_ReleaseSuccess, resp.GetState()) - s.Equal(0, s.manager.runningFlowgraphs.getFlowGraphNum()) + s.Equal(0, s.manager.fgManager.GetFlowgraphCount()) s.False(s.manager.opRunners.Contain(info.GetVchan().GetChannelName())) s.Equal(0, s.manager.opRunners.Len()) } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 23e741d5acb49..f88ce9b938f9c 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -85,7 +85,7 @@ type DataNode struct { cancel context.CancelFunc Role string stateCode atomic.Value // commonpb.StateCode_Initializing - flowgraphManager *flowgraphManager + flowgraphManager FlowgraphManager eventManagerMap *typeutil.ConcurrentMap[string, *channelEventManager] syncMgr syncmgr.SyncManager @@ -310,7 +310,7 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) { // tryToReleaseFlowgraph tries to release a flowgraph func (node *DataNode) tryToReleaseFlowgraph(vChanName string) { log.Info("try to release flowgraph", zap.String("vChanName", vChanName)) - node.flowgraphManager.release(vChanName) + node.flowgraphManager.RemoveFlowgraph(vChanName) } // BackGroundGC runs in background to release datanode resources @@ -382,9 +382,6 @@ func (node *DataNode) Start() error { // Start node watch node go node.StartWatchChannels(node.ctx) - node.stopWaiter.Add(1) - go node.flowgraphManager.start(&node.stopWaiter) - node.UpdateStateCode(commonpb.StateCode_Healthy) }) return startErr @@ -417,7 +414,6 @@ func (node *DataNode) Stop() error { node.stopOnce.Do(func() { // https://github.com/milvus-io/milvus/issues/12282 node.UpdateStateCode(commonpb.StateCode_Abnormal) - node.flowgraphManager.close() // Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the flow graph node.cancel() diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 47bb46deba6e9..cfff0b9d94b7e 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -206,14 +206,14 @@ func TestDataNode(t *testing.T) { } for _, test := range testDataSyncs { - err = node.flowgraphManager.addAndStartWithEtcdTickler(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler()) + err = node.flowgraphManager.AddandStartWithEtcdTickler(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler()) assert.NoError(t, err) vchanNameCh <- test.dmChannelName } assert.Eventually(t, func() bool { for _, test := range testDataSyncs { - if node.flowgraphManager.exist(test.dmChannelName) { + if node.flowgraphManager.HasFlowgraph(test.dmChannelName) { return false } } diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index 51dc8817dab62..21542c1cb82bc 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -171,7 +171,7 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version switch watchInfo.State { case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch: - if err := node.flowgraphManager.addAndStartWithEtcdTickler(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil { + if err := node.flowgraphManager.AddandStartWithEtcdTickler(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil { log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err)) watchInfo.State = datapb.ChannelWatchState_WatchFailure } else { diff --git a/internal/datanode/event_manager_test.go b/internal/datanode/event_manager_test.go index 4c31a6271566f..282173a23e2cb 100644 --- a/internal/datanode/event_manager_test.go +++ b/internal/datanode/event_manager_test.go @@ -99,7 +99,7 @@ func TestWatchChannel(t *testing.T) { assert.NoError(t, err) assert.Eventually(t, func() bool { - exist := node.flowgraphManager.exist(ch) + exist := node.flowgraphManager.HasFlowgraph(ch) if !exist { return false } @@ -119,7 +119,7 @@ func TestWatchChannel(t *testing.T) { assert.NoError(t, err) assert.Eventually(t, func() bool { - exist := node.flowgraphManager.exist(ch) + exist := node.flowgraphManager.HasFlowgraph(ch) return !exist }, 3*time.Second, 100*time.Millisecond) }) @@ -170,7 +170,7 @@ func TestWatchChannel(t *testing.T) { // wait for check goroutine received 2 events <-c - exist := node.flowgraphManager.exist(ch) + exist := node.flowgraphManager.HasFlowgraph(ch) assert.False(t, exist) err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.CommonCfg.DataCoordWatchSubPath.GetValue(), paramtable.GetNodeID())) @@ -178,7 +178,7 @@ func TestWatchChannel(t *testing.T) { // TODO there is not way to sync Release done, use sleep for now time.Sleep(100 * time.Millisecond) - exist = node.flowgraphManager.exist(ch) + exist = node.flowgraphManager.HasFlowgraph(ch) assert.False(t, exist) }) @@ -189,7 +189,7 @@ func TestWatchChannel(t *testing.T) { node.handleWatchInfo(e, "test1", []byte{23}) - exist := node.flowgraphManager.exist("test1") + exist := node.flowgraphManager.HasFlowgraph("test1") assert.False(t, exist) info := datapb.ChannelWatchInfo{ @@ -200,7 +200,7 @@ func TestWatchChannel(t *testing.T) { assert.NoError(t, err) node.handleWatchInfo(e, "test2", bs) - exist = node.flowgraphManager.exist("test2") + exist = node.flowgraphManager.HasFlowgraph("test2") assert.False(t, exist) chPut := make(chan struct{}, 1) @@ -238,7 +238,7 @@ func TestWatchChannel(t *testing.T) { node.factory = &FailMessageStreamFactory{} node.handleWatchInfo(e, ch, bs) <-chPut - exist = node.flowgraphManager.exist(ch) + exist = node.flowgraphManager.HasFlowgraph(ch) assert.True(t, exist) }) @@ -275,7 +275,7 @@ func TestWatchChannel(t *testing.T) { node.handleWatchInfo(e, ch, bs) <-chPut - exist := node.flowgraphManager.exist("test3") + exist := node.flowgraphManager.HasFlowgraph("test3") assert.False(t, exist) }) diff --git a/internal/datanode/flow_graph_manager.go b/internal/datanode/flow_graph_manager.go index 04c49c5f9e385..b1832f3884345 100644 --- a/internal/datanode/flow_graph_manager.go +++ b/internal/datanode/flow_graph_manager.go @@ -19,8 +19,6 @@ package datanode import ( "context" "fmt" - "sync" - "time" "go.uber.org/zap" @@ -28,100 +26,41 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) -type flowgraphManager struct { - flowgraphs *typeutil.ConcurrentMap[string, *dataSyncService] - - closeCh chan struct{} - closeOnce sync.Once -} - -func newFlowgraphManager() *flowgraphManager { - return &flowgraphManager{ - flowgraphs: typeutil.NewConcurrentMap[string, *dataSyncService](), - closeCh: make(chan struct{}), - } +type FlowgraphManager interface { + AddFlowgraph(ds *dataSyncService) + AddandStartWithEtcdTickler(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler) error + RemoveFlowgraph(channel string) + ClearFlowgraphs() + + GetFlowgraphService(channel string) (*dataSyncService, bool) + HasFlowgraph(channel string) bool + HasFlowgraphWithOpID(channel string, opID UniqueID) bool + GetFlowgraphCount() int + GetCollectionIDs() []int64 } -func (fm *flowgraphManager) start(waiter *sync.WaitGroup) { - defer waiter.Done() - ticker := time.NewTicker(3 * time.Second) - defer ticker.Stop() - for { - select { - case <-fm.closeCh: - return - case <-ticker.C: - fm.execute(hardware.GetMemoryCount()) - } - } -} +var _ FlowgraphManager = (*fgManagerImpl)(nil) -func (fm *flowgraphManager) close() { - fm.dropAll() - fm.closeOnce.Do(func() { - close(fm.closeCh) - }) +type fgManagerImpl struct { + flowgraphs *typeutil.ConcurrentMap[string, *dataSyncService] } -func (fm *flowgraphManager) execute(totalMemory uint64) { - if !Params.DataNodeCfg.MemoryForceSyncEnable.GetAsBool() { - return +func newFlowgraphManager() *fgManagerImpl { + return &fgManagerImpl{ + flowgraphs: typeutil.NewConcurrentMap[string, *dataSyncService](), } - // TODO change to buffer manager - - /* - var total int64 - channels := make([]struct { - channel string - bufferSize int64 - }, 0) - fm.flowgraphs.Range(func(key string, value *dataSyncService) bool { - size := value.channel.getTotalMemorySize() - channels = append(channels, struct { - channel string - bufferSize int64 - }{key, size}) - total += size - return true - }) - if len(channels) == 0 { - return - } - - toMB := func(mem float64) float64 { - return mem / 1024 / 1024 - } - - memoryWatermark := float64(totalMemory) * Params.DataNodeCfg.MemoryWatermark.GetAsFloat() - if float64(total) < memoryWatermark { - log.RatedDebug(5, "skip force sync because memory level is not high enough", - zap.Float64("current_total_memory_usage", toMB(float64(total))), - zap.Float64("current_memory_watermark", toMB(memoryWatermark)), - zap.Any("channel_memory_usages", channels)) - return - } - - sort.Slice(channels, func(i, j int) bool { - return channels[i].bufferSize > channels[j].bufferSize - }) - if fg, ok := fm.flowgraphs.Get(channels[0].channel); ok { // sync the first channel with the largest memory usage - fg.channel.setIsHighMemory(true) - log.Info("notify flowgraph to sync", - zap.String("channel", channels[0].channel), zap.Int64("bufferSize", channels[0].bufferSize)) - }*/ } -func (fm *flowgraphManager) Add(ds *dataSyncService) { +func (fm *fgManagerImpl) AddFlowgraph(ds *dataSyncService) { fm.flowgraphs.Insert(ds.vchannelName, ds) metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc() } -func (fm *flowgraphManager) addAndStartWithEtcdTickler(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler) error { +func (fm *fgManagerImpl) AddandStartWithEtcdTickler(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler) error { log := log.With(zap.String("channel", vchan.GetChannelName())) if fm.flowgraphs.Contain(vchan.GetChannelName()) { log.Warn("try to add an existed DataSyncService") @@ -143,47 +82,47 @@ func (fm *flowgraphManager) addAndStartWithEtcdTickler(dn *DataNode, vchan *data return nil } -func (fm *flowgraphManager) release(vchanName string) { - if fg, loaded := fm.flowgraphs.Get(vchanName); loaded { +func (fm *fgManagerImpl) RemoveFlowgraph(channel string) { + if fg, loaded := fm.flowgraphs.Get(channel); loaded { fg.close() - fm.flowgraphs.Remove(vchanName) + fm.flowgraphs.Remove(channel) metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() - rateCol.removeFlowGraphChannel(vchanName) + rateCol.removeFlowGraphChannel(channel) } } -func (fm *flowgraphManager) getFlowgraphService(vchan string) (*dataSyncService, bool) { - return fm.flowgraphs.Get(vchan) +func (fm *fgManagerImpl) ClearFlowgraphs() { + log.Info("start drop all flowgraph resources in DataNode") + fm.flowgraphs.Range(func(key string, value *dataSyncService) bool { + value.GracefullyClose() + fm.flowgraphs.GetAndRemove(key) + + log.Info("successfully dropped flowgraph", zap.String("vChannelName", key)) + return true + }) +} + +func (fm *fgManagerImpl) GetFlowgraphService(channel string) (*dataSyncService, bool) { + return fm.flowgraphs.Get(channel) } -func (fm *flowgraphManager) exist(vchan string) bool { - _, exist := fm.getFlowgraphService(vchan) +func (fm *fgManagerImpl) HasFlowgraph(channel string) bool { + _, exist := fm.flowgraphs.Get(channel) return exist } -func (fm *flowgraphManager) existWithOpID(vchan string, opID UniqueID) bool { - ds, exist := fm.getFlowgraphService(vchan) +func (fm *fgManagerImpl) HasFlowgraphWithOpID(channel string, opID UniqueID) bool { + ds, exist := fm.flowgraphs.Get(channel) return exist && ds.opID == opID } -// getFlowGraphNum returns number of flow graphs. -func (fm *flowgraphManager) getFlowGraphNum() int { +// GetFlowgraphCount returns number of flow graphs. +func (fm *fgManagerImpl) GetFlowgraphCount() int { return fm.flowgraphs.Len() } -func (fm *flowgraphManager) dropAll() { - log.Info("start drop all flowgraph resources in DataNode") - fm.flowgraphs.Range(func(key string, value *dataSyncService) bool { - value.GracefullyClose() - fm.flowgraphs.GetAndRemove(key) - - log.Info("successfully dropped flowgraph", zap.String("vChannelName", key)) - return true - }) -} - -func (fm *flowgraphManager) collections() []int64 { +func (fm *fgManagerImpl) GetCollectionIDs() []int64 { collectionSet := typeutil.UniqueSet{} fm.flowgraphs.Range(func(key string, value *dataSyncService) bool { collectionSet.Insert(value.metacache.Collection()) diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index 9481f60f3418e..a17e2ea9361d7 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -72,7 +72,7 @@ func TestFlowGraphManager(t *testing.T) { fm := newFlowgraphManager() defer func() { - fm.dropAll() + fm.ClearFlowgraphs() }() t.Run("Test addAndStart", func(t *testing.T) { @@ -81,13 +81,13 @@ func TestFlowGraphManager(t *testing.T) { CollectionID: 1, ChannelName: vchanName, } - require.False(t, fm.exist(vchanName)) + require.False(t, fm.HasFlowgraph(vchanName)) - err := fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler()) + err := fm.AddandStartWithEtcdTickler(node, vchan, nil, genTestTickler()) assert.NoError(t, err) - assert.True(t, fm.exist(vchanName)) + assert.True(t, fm.HasFlowgraph(vchanName)) - fm.dropAll() + fm.ClearFlowgraphs() }) t.Run("Test Release", func(t *testing.T) { @@ -96,20 +96,20 @@ func TestFlowGraphManager(t *testing.T) { CollectionID: 1, ChannelName: vchanName, } - require.False(t, fm.exist(vchanName)) + require.False(t, fm.HasFlowgraph(vchanName)) - err := fm.addAndStartWithEtcdTickler(node, vchan, nil, genTestTickler()) + err := fm.AddandStartWithEtcdTickler(node, vchan, nil, genTestTickler()) assert.NoError(t, err) - assert.True(t, fm.exist(vchanName)) + assert.True(t, fm.HasFlowgraph(vchanName)) - fm.release(vchanName) + fm.RemoveFlowgraph(vchanName) - assert.False(t, fm.exist(vchanName)) - fm.dropAll() + assert.False(t, fm.HasFlowgraph(vchanName)) + fm.ClearFlowgraphs() }) t.Run("Test getFlowgraphService", func(t *testing.T) { - fg, ok := fm.getFlowgraphService("channel-not-exist") + fg, ok := fm.GetFlowgraphService("channel-not-exist") assert.False(t, ok) assert.Nil(t, fg) }) diff --git a/internal/datanode/metrics_info.go b/internal/datanode/metrics_info.go index 6bbafef9dc10c..804a85d3f9720 100644 --- a/internal/datanode/metrics_info.go +++ b/internal/datanode/metrics_info.go @@ -56,11 +56,11 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro Fgm: metricsinfo.FlowGraphMetric{ MinFlowGraphChannel: minFGChannel, MinFlowGraphTt: minFGTt, - NumFlowGraph: node.flowgraphManager.getFlowGraphNum(), + NumFlowGraph: node.flowgraphManager.GetFlowgraphCount(), }, Effect: metricsinfo.NodeEffect{ NodeID: node.GetSession().ServerID, - CollectionIDs: node.flowgraphManager.collections(), + CollectionIDs: node.flowgraphManager.GetCollectionIDs(), }, }, nil } diff --git a/internal/datanode/mock_fgmanager.go b/internal/datanode/mock_fgmanager.go new file mode 100644 index 0000000000000..1dea01e67f6c4 --- /dev/null +++ b/internal/datanode/mock_fgmanager.go @@ -0,0 +1,500 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package datanode + +import ( + datapb "github.com/milvus-io/milvus/internal/proto/datapb" + mock "github.com/stretchr/testify/mock" + + schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" +) + +// MockFlowgraphManager is an autogenerated mock type for the FlowgraphManager type +type MockFlowgraphManager struct { + mock.Mock +} + +type MockFlowgraphManager_Expecter struct { + mock *mock.Mock +} + +func (_m *MockFlowgraphManager) EXPECT() *MockFlowgraphManager_Expecter { + return &MockFlowgraphManager_Expecter{mock: &_m.Mock} +} + +// AddFlowgraph provides a mock function with given fields: ds +func (_m *MockFlowgraphManager) AddFlowgraph(ds *dataSyncService) { + _m.Called(ds) +} + +// MockFlowgraphManager_AddFlowgraph_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddFlowgraph' +type MockFlowgraphManager_AddFlowgraph_Call struct { + *mock.Call +} + +// AddFlowgraph is a helper method to define mock.On call +// - ds *dataSyncService +func (_e *MockFlowgraphManager_Expecter) AddFlowgraph(ds interface{}) *MockFlowgraphManager_AddFlowgraph_Call { + return &MockFlowgraphManager_AddFlowgraph_Call{Call: _e.mock.On("AddFlowgraph", ds)} +} + +func (_c *MockFlowgraphManager_AddFlowgraph_Call) Run(run func(ds *dataSyncService)) *MockFlowgraphManager_AddFlowgraph_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*dataSyncService)) + }) + return _c +} + +func (_c *MockFlowgraphManager_AddFlowgraph_Call) Return() *MockFlowgraphManager_AddFlowgraph_Call { + _c.Call.Return() + return _c +} + +func (_c *MockFlowgraphManager_AddFlowgraph_Call) RunAndReturn(run func(*dataSyncService)) *MockFlowgraphManager_AddFlowgraph_Call { + _c.Call.Return(run) + return _c +} + +// AddandStartWithEtcdTickler provides a mock function with given fields: dn, vchan, schema, tickler +func (_m *MockFlowgraphManager) AddandStartWithEtcdTickler(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler) error { + ret := _m.Called(dn, vchan, schema, tickler) + + var r0 error + if rf, ok := ret.Get(0).(func(*DataNode, *datapb.VchannelInfo, *schemapb.CollectionSchema, *etcdTickler) error); ok { + r0 = rf(dn, vchan, schema, tickler) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockFlowgraphManager_AddandStartWithEtcdTickler_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddandStartWithEtcdTickler' +type MockFlowgraphManager_AddandStartWithEtcdTickler_Call struct { + *mock.Call +} + +// AddandStartWithEtcdTickler is a helper method to define mock.On call +// - dn *DataNode +// - vchan *datapb.VchannelInfo +// - schema *schemapb.CollectionSchema +// - tickler *etcdTickler +func (_e *MockFlowgraphManager_Expecter) AddandStartWithEtcdTickler(dn interface{}, vchan interface{}, schema interface{}, tickler interface{}) *MockFlowgraphManager_AddandStartWithEtcdTickler_Call { + return &MockFlowgraphManager_AddandStartWithEtcdTickler_Call{Call: _e.mock.On("AddandStartWithEtcdTickler", dn, vchan, schema, tickler)} +} + +func (_c *MockFlowgraphManager_AddandStartWithEtcdTickler_Call) Run(run func(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler)) *MockFlowgraphManager_AddandStartWithEtcdTickler_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*DataNode), args[1].(*datapb.VchannelInfo), args[2].(*schemapb.CollectionSchema), args[3].(*etcdTickler)) + }) + return _c +} + +func (_c *MockFlowgraphManager_AddandStartWithEtcdTickler_Call) Return(_a0 error) *MockFlowgraphManager_AddandStartWithEtcdTickler_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlowgraphManager_AddandStartWithEtcdTickler_Call) RunAndReturn(run func(*DataNode, *datapb.VchannelInfo, *schemapb.CollectionSchema, *etcdTickler) error) *MockFlowgraphManager_AddandStartWithEtcdTickler_Call { + _c.Call.Return(run) + return _c +} + +// ClearFlowgraphs provides a mock function with given fields: +func (_m *MockFlowgraphManager) ClearFlowgraphs() { + _m.Called() +} + +// MockFlowgraphManager_ClearFlowgraphs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClearFlowgraphs' +type MockFlowgraphManager_ClearFlowgraphs_Call struct { + *mock.Call +} + +// ClearFlowgraphs is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) ClearFlowgraphs() *MockFlowgraphManager_ClearFlowgraphs_Call { + return &MockFlowgraphManager_ClearFlowgraphs_Call{Call: _e.mock.On("ClearFlowgraphs")} +} + +func (_c *MockFlowgraphManager_ClearFlowgraphs_Call) Run(run func()) *MockFlowgraphManager_ClearFlowgraphs_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_ClearFlowgraphs_Call) Return() *MockFlowgraphManager_ClearFlowgraphs_Call { + _c.Call.Return() + return _c +} + +func (_c *MockFlowgraphManager_ClearFlowgraphs_Call) RunAndReturn(run func()) *MockFlowgraphManager_ClearFlowgraphs_Call { + _c.Call.Return(run) + return _c +} + +// GetCollectionIDs provides a mock function with given fields: +func (_m *MockFlowgraphManager) GetCollectionIDs() []int64 { + ret := _m.Called() + + var r0 []int64 + if rf, ok := ret.Get(0).(func() []int64); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + return r0 +} + +// MockFlowgraphManager_GetCollectionIDs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollectionIDs' +type MockFlowgraphManager_GetCollectionIDs_Call struct { + *mock.Call +} + +// GetCollectionIDs is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) GetCollectionIDs() *MockFlowgraphManager_GetCollectionIDs_Call { + return &MockFlowgraphManager_GetCollectionIDs_Call{Call: _e.mock.On("GetCollectionIDs")} +} + +func (_c *MockFlowgraphManager_GetCollectionIDs_Call) Run(run func()) *MockFlowgraphManager_GetCollectionIDs_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_GetCollectionIDs_Call) Return(_a0 []int64) *MockFlowgraphManager_GetCollectionIDs_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlowgraphManager_GetCollectionIDs_Call) RunAndReturn(run func() []int64) *MockFlowgraphManager_GetCollectionIDs_Call { + _c.Call.Return(run) + return _c +} + +// GetFlowgraphCount provides a mock function with given fields: +func (_m *MockFlowgraphManager) GetFlowgraphCount() int { + ret := _m.Called() + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + +// MockFlowgraphManager_GetFlowgraphCount_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetFlowgraphCount' +type MockFlowgraphManager_GetFlowgraphCount_Call struct { + *mock.Call +} + +// GetFlowgraphCount is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) GetFlowgraphCount() *MockFlowgraphManager_GetFlowgraphCount_Call { + return &MockFlowgraphManager_GetFlowgraphCount_Call{Call: _e.mock.On("GetFlowgraphCount")} +} + +func (_c *MockFlowgraphManager_GetFlowgraphCount_Call) Run(run func()) *MockFlowgraphManager_GetFlowgraphCount_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_GetFlowgraphCount_Call) Return(_a0 int) *MockFlowgraphManager_GetFlowgraphCount_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlowgraphManager_GetFlowgraphCount_Call) RunAndReturn(run func() int) *MockFlowgraphManager_GetFlowgraphCount_Call { + _c.Call.Return(run) + return _c +} + +// GetFlowgraphService provides a mock function with given fields: channel +func (_m *MockFlowgraphManager) GetFlowgraphService(channel string) (*dataSyncService, bool) { + ret := _m.Called(channel) + + var r0 *dataSyncService + var r1 bool + if rf, ok := ret.Get(0).(func(string) (*dataSyncService, bool)); ok { + return rf(channel) + } + if rf, ok := ret.Get(0).(func(string) *dataSyncService); ok { + r0 = rf(channel) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*dataSyncService) + } + } + + if rf, ok := ret.Get(1).(func(string) bool); ok { + r1 = rf(channel) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + +// MockFlowgraphManager_GetFlowgraphService_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetFlowgraphService' +type MockFlowgraphManager_GetFlowgraphService_Call struct { + *mock.Call +} + +// GetFlowgraphService is a helper method to define mock.On call +// - channel string +func (_e *MockFlowgraphManager_Expecter) GetFlowgraphService(channel interface{}) *MockFlowgraphManager_GetFlowgraphService_Call { + return &MockFlowgraphManager_GetFlowgraphService_Call{Call: _e.mock.On("GetFlowgraphService", channel)} +} + +func (_c *MockFlowgraphManager_GetFlowgraphService_Call) Run(run func(channel string)) *MockFlowgraphManager_GetFlowgraphService_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockFlowgraphManager_GetFlowgraphService_Call) Return(_a0 *dataSyncService, _a1 bool) *MockFlowgraphManager_GetFlowgraphService_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(string) (*dataSyncService, bool)) *MockFlowgraphManager_GetFlowgraphService_Call { + _c.Call.Return(run) + return _c +} + +// HasFlowgraph provides a mock function with given fields: channel +func (_m *MockFlowgraphManager) HasFlowgraph(channel string) bool { + ret := _m.Called(channel) + + var r0 bool + if rf, ok := ret.Get(0).(func(string) bool); ok { + r0 = rf(channel) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockFlowgraphManager_HasFlowgraph_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HasFlowgraph' +type MockFlowgraphManager_HasFlowgraph_Call struct { + *mock.Call +} + +// HasFlowgraph is a helper method to define mock.On call +// - channel string +func (_e *MockFlowgraphManager_Expecter) HasFlowgraph(channel interface{}) *MockFlowgraphManager_HasFlowgraph_Call { + return &MockFlowgraphManager_HasFlowgraph_Call{Call: _e.mock.On("HasFlowgraph", channel)} +} + +func (_c *MockFlowgraphManager_HasFlowgraph_Call) Run(run func(channel string)) *MockFlowgraphManager_HasFlowgraph_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockFlowgraphManager_HasFlowgraph_Call) Return(_a0 bool) *MockFlowgraphManager_HasFlowgraph_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlowgraphManager_HasFlowgraph_Call) RunAndReturn(run func(string) bool) *MockFlowgraphManager_HasFlowgraph_Call { + _c.Call.Return(run) + return _c +} + +// HasFlowgraphWithOpID provides a mock function with given fields: channel, opID +func (_m *MockFlowgraphManager) HasFlowgraphWithOpID(channel string, opID int64) bool { + ret := _m.Called(channel, opID) + + var r0 bool + if rf, ok := ret.Get(0).(func(string, int64) bool); ok { + r0 = rf(channel, opID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockFlowgraphManager_HasFlowgraphWithOpID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HasFlowgraphWithOpID' +type MockFlowgraphManager_HasFlowgraphWithOpID_Call struct { + *mock.Call +} + +// HasFlowgraphWithOpID is a helper method to define mock.On call +// - channel string +// - opID int64 +func (_e *MockFlowgraphManager_Expecter) HasFlowgraphWithOpID(channel interface{}, opID interface{}) *MockFlowgraphManager_HasFlowgraphWithOpID_Call { + return &MockFlowgraphManager_HasFlowgraphWithOpID_Call{Call: _e.mock.On("HasFlowgraphWithOpID", channel, opID)} +} + +func (_c *MockFlowgraphManager_HasFlowgraphWithOpID_Call) Run(run func(channel string, opID int64)) *MockFlowgraphManager_HasFlowgraphWithOpID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(int64)) + }) + return _c +} + +func (_c *MockFlowgraphManager_HasFlowgraphWithOpID_Call) Return(_a0 bool) *MockFlowgraphManager_HasFlowgraphWithOpID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockFlowgraphManager_HasFlowgraphWithOpID_Call) RunAndReturn(run func(string, int64) bool) *MockFlowgraphManager_HasFlowgraphWithOpID_Call { + _c.Call.Return(run) + return _c +} + +// RemoveFlowgraph provides a mock function with given fields: channel +func (_m *MockFlowgraphManager) RemoveFlowgraph(channel string) { + _m.Called(channel) +} + +// MockFlowgraphManager_RemoveFlowgraph_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveFlowgraph' +type MockFlowgraphManager_RemoveFlowgraph_Call struct { + *mock.Call +} + +// RemoveFlowgraph is a helper method to define mock.On call +// - channel string +func (_e *MockFlowgraphManager_Expecter) RemoveFlowgraph(channel interface{}) *MockFlowgraphManager_RemoveFlowgraph_Call { + return &MockFlowgraphManager_RemoveFlowgraph_Call{Call: _e.mock.On("RemoveFlowgraph", channel)} +} + +func (_c *MockFlowgraphManager_RemoveFlowgraph_Call) Run(run func(channel string)) *MockFlowgraphManager_RemoveFlowgraph_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockFlowgraphManager_RemoveFlowgraph_Call) Return() *MockFlowgraphManager_RemoveFlowgraph_Call { + _c.Call.Return() + return _c +} + +func (_c *MockFlowgraphManager_RemoveFlowgraph_Call) RunAndReturn(run func(string)) *MockFlowgraphManager_RemoveFlowgraph_Call { + _c.Call.Return(run) + return _c +} + +// Start provides a mock function with given fields: +func (_m *MockFlowgraphManager) Start() { + _m.Called() +} + +// MockFlowgraphManager_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' +type MockFlowgraphManager_Start_Call struct { + *mock.Call +} + +// Start is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) Start() *MockFlowgraphManager_Start_Call { + return &MockFlowgraphManager_Start_Call{Call: _e.mock.On("Start")} +} + +func (_c *MockFlowgraphManager_Start_Call) Run(run func()) *MockFlowgraphManager_Start_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_Start_Call) Return() *MockFlowgraphManager_Start_Call { + _c.Call.Return() + return _c +} + +func (_c *MockFlowgraphManager_Start_Call) RunAndReturn(run func()) *MockFlowgraphManager_Start_Call { + _c.Call.Return(run) + return _c +} + +// Stop provides a mock function with given fields: +func (_m *MockFlowgraphManager) Stop() { + _m.Called() +} + +// MockFlowgraphManager_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop' +type MockFlowgraphManager_Stop_Call struct { + *mock.Call +} + +// Stop is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) Stop() *MockFlowgraphManager_Stop_Call { + return &MockFlowgraphManager_Stop_Call{Call: _e.mock.On("Stop")} +} + +func (_c *MockFlowgraphManager_Stop_Call) Run(run func()) *MockFlowgraphManager_Stop_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_Stop_Call) Return() *MockFlowgraphManager_Stop_Call { + _c.Call.Return() + return _c +} + +func (_c *MockFlowgraphManager_Stop_Call) RunAndReturn(run func()) *MockFlowgraphManager_Stop_Call { + _c.Call.Return(run) + return _c +} + +// controlMemWaterLevel provides a mock function with given fields: totalMemory +func (_m *MockFlowgraphManager) controlMemWaterLevel(totalMemory uint64) { + _m.Called(totalMemory) +} + +// MockFlowgraphManager_controlMemWaterLevel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'controlMemWaterLevel' +type MockFlowgraphManager_controlMemWaterLevel_Call struct { + *mock.Call +} + +// controlMemWaterLevel is a helper method to define mock.On call +// - totalMemory uint64 +func (_e *MockFlowgraphManager_Expecter) controlMemWaterLevel(totalMemory interface{}) *MockFlowgraphManager_controlMemWaterLevel_Call { + return &MockFlowgraphManager_controlMemWaterLevel_Call{Call: _e.mock.On("controlMemWaterLevel", totalMemory)} +} + +func (_c *MockFlowgraphManager_controlMemWaterLevel_Call) Run(run func(totalMemory uint64)) *MockFlowgraphManager_controlMemWaterLevel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(uint64)) + }) + return _c +} + +func (_c *MockFlowgraphManager_controlMemWaterLevel_Call) Return() *MockFlowgraphManager_controlMemWaterLevel_Call { + _c.Call.Return() + return _c +} + +func (_c *MockFlowgraphManager_controlMemWaterLevel_Call) RunAndReturn(run func(uint64)) *MockFlowgraphManager_controlMemWaterLevel_Call { + _c.Call.Return(run) + return _c +} + +// NewMockFlowgraphManager creates a new instance of MockFlowgraphManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockFlowgraphManager(t interface { + mock.TestingT + Cleanup(func()) +}) *MockFlowgraphManager { + mock := &MockFlowgraphManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datanode/services.go b/internal/datanode/services.go index fbf666eee6a9d..95f8481386389 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -240,7 +240,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan return merr.Status(err), nil } - ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannel()) + ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannel()) if !ok { log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channelName", req.GetChannel())) return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "illegel compaction plan")), nil @@ -325,7 +325,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments return merr.Status(merr.WrapErrParameterInvalid(">0", "0", "compacted from segments shouldn't be empty")), nil } - ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannelName()) + ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName()) if !ok { node.compactionExecutor.clearTasksByChannel(req.GetChannelName()) err := merr.WrapErrChannelNotFound(req.GetChannelName()) @@ -509,7 +509,7 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor // Retry in case the channel hasn't been watched yet. err := retry.Do(ctx, func() error { var ok bool - ds, ok = node.flowgraphManager.getFlowgraphService(req.GetChannelName()) + ds, ok = node.flowgraphManager.GetFlowgraphService(req.GetChannelName()) if !ok { return errors.New("channel not found") } diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 324dea533703c..58bfad043dfa0 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -89,7 +89,7 @@ func (s *DataNodeServicesSuite) SetupTest() { err := s.node.Init() s.Require().NoError(err) - alloc := &allocator.MockAllocator{} + alloc := allocator.NewMockAllocator(s.T()) alloc.EXPECT().Start().Return(nil).Maybe() alloc.EXPECT().Close().Maybe() alloc.EXPECT().GetIDAlloactor().Return(&allocator2.IDAllocator{}).Maybe() @@ -234,10 +234,10 @@ func (s *DataNodeServicesSuite) TestFlushSegments() { FlushedSegmentIds: []int64{}, } - err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, vchan, schema, genTestTickler()) + err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, vchan, schema, genTestTickler()) s.Require().NoError(err) - fgservice, ok := s.node.flowgraphManager.getFlowgraphService(dmChannelName) + fgservice, ok := s.node.flowgraphManager.GetFlowgraphService(dmChannelName) s.Require().True(ok) metaCache := metacache.NewMockMetaCache(s.T()) @@ -422,14 +422,14 @@ func (s *DataNodeServicesSuite) TestImport() { }() chName1 := "fake-by-dev-rootcoord-dml-testimport-1" chName2 := "fake-by-dev-rootcoord-dml-testimport-2" - err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ + err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 100, ChannelName: chName1, UnflushedSegmentIds: []int64{}, FlushedSegmentIds: []int64{}, }, nil, genTestTickler()) s.Require().Nil(err) - err = s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ + err = s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 100, ChannelName: chName2, UnflushedSegmentIds: []int64{}, @@ -437,9 +437,9 @@ func (s *DataNodeServicesSuite) TestImport() { }, nil, genTestTickler()) s.Require().Nil(err) - _, ok := s.node.flowgraphManager.getFlowgraphService(chName1) + _, ok := s.node.flowgraphManager.GetFlowgraphService(chName1) s.Require().True(ok) - _, ok = s.node.flowgraphManager.getFlowgraphService(chName2) + _, ok = s.node.flowgraphManager.GetFlowgraphService(chName2) s.Require().True(ok) req := &datapb.ImportTaskRequest{ @@ -485,14 +485,14 @@ func (s *DataNodeServicesSuite) TestImport() { }() chName1 := "fake-by-dev-rootcoord-dml-testimport-1-badflowgraph" chName2 := "fake-by-dev-rootcoord-dml-testimport-2-badflowgraph" - err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ + err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 100, ChannelName: chName1, UnflushedSegmentIds: []int64{}, FlushedSegmentIds: []int64{}, }, nil, genTestTickler()) s.Require().Nil(err) - err = s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ + err = s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 999, // wrong collection ID. ChannelName: chName2, UnflushedSegmentIds: []int64{}, @@ -500,9 +500,9 @@ func (s *DataNodeServicesSuite) TestImport() { }, nil, genTestTickler()) s.Require().Nil(err) - _, ok := s.node.flowgraphManager.getFlowgraphService(chName1) + _, ok := s.node.flowgraphManager.GetFlowgraphService(chName1) s.Require().True(ok) - _, ok = s.node.flowgraphManager.getFlowgraphService(chName2) + _, ok = s.node.flowgraphManager.GetFlowgraphService(chName2) s.Require().True(ok) s.broker.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).Return(nil) @@ -612,14 +612,14 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() { chName1 := "fake-by-dev-rootcoord-dml-testaddsegment-1" chName2 := "fake-by-dev-rootcoord-dml-testaddsegment-2" - err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ + err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 100, ChannelName: chName1, UnflushedSegmentIds: []int64{}, FlushedSegmentIds: []int64{}, }, schema, genTestTickler()) s.Require().NoError(err) - err = s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ + err = s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 100, ChannelName: chName2, UnflushedSegmentIds: []int64{}, @@ -627,9 +627,9 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() { }, schema, genTestTickler()) s.Require().NoError(err) - _, ok := s.node.flowgraphManager.getFlowgraphService(chName1) + _, ok := s.node.flowgraphManager.GetFlowgraphService(chName1) s.Assert().True(ok) - _, ok = s.node.flowgraphManager.getFlowgraphService(chName2) + _, ok = s.node.flowgraphManager.GetFlowgraphService(chName2) s.Assert().True(ok) resp, err := s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{ @@ -673,14 +673,14 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { }, } - err := s.node.flowgraphManager.addAndStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ + err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 1, ChannelName: chanName, UnflushedSegmentIds: []int64{}, FlushedSegmentIds: []int64{100, 200, 300}, }, schema, genTestTickler()) s.Require().NoError(err) - fg, ok := s.node.flowgraphManager.getFlowgraphService(chanName) + fg, ok := s.node.flowgraphManager.GetFlowgraphService(chanName) s.Assert().True(ok) fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 100, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory)