Skip to content

Commit

Permalink
fix: Datanode stop progress stuck at writer buffer memory check (milv…
Browse files Browse the repository at this point in the history
…us-io#38274)

issue: milvus-io#38273

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Dec 6, 2024
1 parent a1e14d6 commit 2035575
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
21 changes: 17 additions & 4 deletions internal/flushcommon/writebuffer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,19 @@ func (m *bufferManager) Start() {
}

func (m *bufferManager) check() {
ticker := time.NewTimer(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond))
defer ticker.Stop()
timer := time.NewTimer(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond))
defer timer.Stop()
for {
select {
case <-ticker.C:
case <-timer.C:
m.memoryCheck()
ticker.Reset(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond))
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond))
case <-m.ch.CloseCh():
log.Info("buffer manager memory check stopped")
return
Expand Down Expand Up @@ -114,6 +120,13 @@ func (m *bufferManager) memoryCheck() {
return mem / 1024 / 1024
}

select {
case <-m.ch.CloseCh():
log.Info("stop memory check due to manager stop")
return
default:
}

m.buffers.Range(func(chanName string, buf WriteBuffer) bool {
size := buf.MemorySize()
total += size
Expand Down
33 changes: 33 additions & 0 deletions internal/flushcommon/writebuffer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,39 @@ func (s *ManagerSuite) TestMemoryCheck() {
wb.AssertExpectations(s.T())
}

func (s *ManagerSuite) TestStopDuringMemoryCheck() {
manager := s.manager
param := paramtable.Get()

param.Save(param.DataNodeCfg.MemoryCheckInterval.Key, "50")
param.Save(param.DataNodeCfg.MemoryForceSyncEnable.Key, "true")
param.Save(param.DataNodeCfg.MemoryForceSyncWatermark.Key, "0.7")

defer func() {
param.Reset(param.DataNodeCfg.MemoryCheckInterval.Key)
param.Reset(param.DataNodeCfg.MemoryForceSyncEnable.Key)
param.Reset(param.DataNodeCfg.MemoryForceSyncWatermark.Key)
}()

wb := NewMockWriteBuffer(s.T())

// mock the memory size reach water mark
memoryLimit := hardware.GetMemoryCount()
wb.EXPECT().MemorySize().RunAndReturn(func() int64 {
return int64(float64(memoryLimit) * 0.8)
}).Maybe()
//.Return(int64(float64(memoryLimit) * 0.6))
wb.EXPECT().EvictBuffer(mock.Anything).Maybe()
manager.buffers.Insert(s.channelName, wb)
manager.Start()

// wait memory check triggered
time.Sleep(200 * time.Millisecond)

// expect stop operation won't stuck
manager.Stop()
}

func TestManager(t *testing.T) {
suite.Run(t, new(ManagerSuite))
}

0 comments on commit 2035575

Please sign in to comment.