diff --git a/internal/flushcommon/writebuffer/manager.go b/internal/flushcommon/writebuffer/manager.go index 22e25e4567b82..1b998784edf67 100644 --- a/internal/flushcommon/writebuffer/manager.go +++ b/internal/flushcommon/writebuffer/manager.go @@ -77,13 +77,19 @@ func (m *bufferManager) Start() { } func (m *bufferManager) check() { - ticker := time.NewTimer(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond)) - defer ticker.Stop() + timer := time.NewTimer(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond)) + defer timer.Stop() for { select { - case <-ticker.C: + case <-timer.C: m.memoryCheck() - ticker.Reset(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond)) + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond)) case <-m.ch.CloseCh(): log.Info("buffer manager memory check stopped") return @@ -114,6 +120,13 @@ func (m *bufferManager) memoryCheck() { return mem / 1024 / 1024 } + select { + case <-m.ch.CloseCh(): + log.Info("stop memory check due to manager stop") + return + default: + } + m.buffers.Range(func(chanName string, buf WriteBuffer) bool { size := buf.MemorySize() total += size diff --git a/internal/flushcommon/writebuffer/manager_test.go b/internal/flushcommon/writebuffer/manager_test.go index 3107c12fb7115..af53ba786ecc4 100644 --- a/internal/flushcommon/writebuffer/manager_test.go +++ b/internal/flushcommon/writebuffer/manager_test.go @@ -262,6 +262,39 @@ func (s *ManagerSuite) TestMemoryCheck() { wb.AssertExpectations(s.T()) } +func (s *ManagerSuite) TestStopDuringMemoryCheck() { + manager := s.manager + param := paramtable.Get() + + param.Save(param.DataNodeCfg.MemoryCheckInterval.Key, "50") + param.Save(param.DataNodeCfg.MemoryForceSyncEnable.Key, "true") + param.Save(param.DataNodeCfg.MemoryForceSyncWatermark.Key, "0.7") + + defer func() { + param.Reset(param.DataNodeCfg.MemoryCheckInterval.Key) + param.Reset(param.DataNodeCfg.MemoryForceSyncEnable.Key) + param.Reset(param.DataNodeCfg.MemoryForceSyncWatermark.Key) + }() + + wb := NewMockWriteBuffer(s.T()) + + // mock the memory size reach water mark + memoryLimit := hardware.GetMemoryCount() + wb.EXPECT().MemorySize().RunAndReturn(func() int64 { + return int64(float64(memoryLimit) * 0.8) + }).Maybe() + //.Return(int64(float64(memoryLimit) * 0.6)) + wb.EXPECT().EvictBuffer(mock.Anything).Maybe() + manager.buffers.Insert(s.channelName, wb) + manager.Start() + + // wait memory check triggered + time.Sleep(200 * time.Millisecond) + + // expect stop operation won't stuck + manager.Stop() +} + func TestManager(t *testing.T) { suite.Run(t, new(ManagerSuite)) }