Skip to content

Commit

Permalink
enhance: Refine flusher channel management (#35870)
Browse files Browse the repository at this point in the history
Change the ChannelTask to ChannelLifetime, only removing it upon
unregistering.

issue: #33285

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Sep 1, 2024
1 parent ef451f5 commit 2e090b2
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,43 +34,41 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
)

type TaskState int
type LifetimeState int

const (
Pending TaskState = iota
Pending LifetimeState = iota
Cancel
Done
)

type ChannelTask interface {
type ChannelLifetime interface {
Run() error
Cancel()
}

type channelTask struct {
type channelLifetime struct {
mu sync.Mutex
state TaskState
f *flusherImpl
state LifetimeState
vchannel string
wal wal.WAL
scanner wal.Scanner
f *flusherImpl
}

func NewChannelTask(f *flusherImpl, vchannel string, wal wal.WAL) ChannelTask {
return &channelTask{
func NewChannelLifetime(f *flusherImpl, vchannel string, wal wal.WAL) ChannelLifetime {
return &channelLifetime{
state: Pending,
f: f,
vchannel: vchannel,
wal: wal,
}
}

func (c *channelTask) Run() error {
func (c *channelLifetime) Run() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.state == Cancel {
return nil
}
if c.f.fgMgr.HasFlowgraph(c.vchannel) {
if c.state == Cancel || c.state == Done {
return nil
}
log.Info("start to build pipeline", zap.String("vchannel", c.vchannel))
Expand Down Expand Up @@ -110,14 +108,14 @@ func (c *channelTask) Run() error {
}
ds.Start()
c.f.fgMgr.AddFlowgraph(ds)
c.f.scanners.Insert(c.vchannel, scanner)
c.scanner = scanner
c.state = Done

log.Info("build pipeline done", zap.String("vchannel", c.vchannel))
return nil
}

func (c *channelTask) Cancel() {
func (c *channelLifetime) Cancel() {
c.mu.Lock()
defer c.mu.Unlock()
switch c.state {
Expand All @@ -126,11 +124,9 @@ func (c *channelTask) Cancel() {
case Cancel:
return
case Done:
if scanner, ok := c.f.scanners.GetAndRemove(c.vchannel); ok {
err := scanner.Close()
if err != nil {
log.Warn("scanner error", zap.String("vchannel", c.vchannel), zap.Error(err))
}
err := c.scanner.Close()
if err != nil {
log.Warn("scanner error", zap.String("vchannel", c.vchannel), zap.Error(err))
}
c.f.fgMgr.RemoveFlowgraph(c.vchannel)
c.f.wbMgr.RemoveChannel(c.vchannel)
Expand Down
65 changes: 21 additions & 44 deletions internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ type flusherImpl struct {
wbMgr writebuffer.BufferManager
cpUpdater *util.ChannelCheckpointUpdater

tasks *typeutil.ConcurrentMap[string, ChannelTask]
scanners *typeutil.ConcurrentMap[string, wal.Scanner] // watched scanners
channelLifetimes *typeutil.ConcurrentMap[string, ChannelLifetime]

notifyCh chan struct{}
stopChan lifetime.SafeChan
Expand All @@ -67,16 +66,15 @@ func NewFlusher(chunkManager storage.ChunkManager) flusher.Flusher {
func newFlusherWithParam(params *util.PipelineParams) flusher.Flusher {
fgMgr := pipeline.NewFlowgraphManager()
return &flusherImpl{
broker: params.Broker,
fgMgr: fgMgr,
syncMgr: params.SyncMgr,
wbMgr: params.WriteBufferManager,
cpUpdater: params.CheckpointUpdater,
tasks: typeutil.NewConcurrentMap[string, ChannelTask](),
scanners: typeutil.NewConcurrentMap[string, wal.Scanner](),
notifyCh: make(chan struct{}, 1),
stopChan: lifetime.NewSafeChan(),
pipelineParams: params,
broker: params.Broker,
fgMgr: fgMgr,
syncMgr: params.SyncMgr,
wbMgr: params.WriteBufferManager,
cpUpdater: params.CheckpointUpdater,
channelLifetimes: typeutil.NewConcurrentMap[string, ChannelLifetime](),
notifyCh: make(chan struct{}, 1),
stopChan: lifetime.NewSafeChan(),
pipelineParams: params,
}
}

Expand All @@ -96,22 +94,15 @@ func (f *flusherImpl) RegisterPChannel(pchannel string, wal wal.WAL) error {
}

func (f *flusherImpl) RegisterVChannel(vchannel string, wal wal.WAL) {
if f.scanners.Contain(vchannel) {
return
_, ok := f.channelLifetimes.GetOrInsert(vchannel, NewChannelLifetime(f, vchannel, wal))
if !ok {
log.Info("flusher register vchannel done", zap.String("vchannel", vchannel))
}
f.tasks.GetOrInsert(vchannel, NewChannelTask(f, vchannel, wal))
f.notify()
log.Info("flusher register vchannel done", zap.String("vchannel", vchannel))
}

func (f *flusherImpl) UnregisterPChannel(pchannel string) {
f.tasks.Range(func(vchannel string, task ChannelTask) bool {
if funcutil.ToPhysicalChannel(vchannel) == pchannel {
f.UnregisterVChannel(vchannel)
}
return true
})
f.scanners.Range(func(vchannel string, scanner wal.Scanner) bool {
f.channelLifetimes.Range(func(vchannel string, _ ChannelLifetime) bool {
if funcutil.ToPhysicalChannel(vchannel) == pchannel {
f.UnregisterVChannel(vchannel)
}
Expand All @@ -120,18 +111,9 @@ func (f *flusherImpl) UnregisterPChannel(pchannel string) {
}

func (f *flusherImpl) UnregisterVChannel(vchannel string) {
if task, ok := f.tasks.Get(vchannel); ok {
task.Cancel()
}
if scanner, ok := f.scanners.GetAndRemove(vchannel); ok {
err := scanner.Close()
if err != nil {
log.Warn("scanner error", zap.String("vchannel", vchannel), zap.Error(err))
}
if clt, ok := f.channelLifetimes.GetAndRemove(vchannel); ok {
clt.Cancel()
}
f.fgMgr.RemoveFlowgraph(vchannel)
f.wbMgr.RemoveChannel(vchannel)
log.Info("flusher unregister vchannel done", zap.String("vchannel", vchannel))
}

func (f *flusherImpl) notify() {
Expand All @@ -154,16 +136,14 @@ func (f *flusherImpl) Start() {
return
case <-f.notifyCh:
futures := make([]*conc.Future[any], 0)
f.tasks.Range(func(vchannel string, task ChannelTask) bool {
f.channelLifetimes.Range(func(vchannel string, lifetime ChannelLifetime) bool {
future := GetExecPool().Submit(func() (any, error) {
err := task.Run()
err := lifetime.Run()
if err != nil {
log.Warn("build pipeline failed", zap.String("vchannel", vchannel), zap.Error(err))
// Notify to trigger retry.
f.notify()
f.notify() // Notify to trigger retry.
return nil, err
}
f.tasks.Remove(vchannel)
return nil, nil
})
futures = append(futures, future)
Expand All @@ -178,11 +158,8 @@ func (f *flusherImpl) Start() {
func (f *flusherImpl) Stop() {
f.stopChan.Close()
f.stopWg.Wait()
f.scanners.Range(func(vchannel string, scanner wal.Scanner) bool {
err := scanner.Close()
if err != nil {
log.Warn("scanner error", zap.String("vchannel", vchannel), zap.Error(err))
}
f.channelLifetimes.Range(func(vchannel string, lifetime ChannelLifetime) bool {
lifetime.Cancel()
return true
})
f.fgMgr.ClearFlowgraphs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ func TestFlusher_RegisterPChannel(t *testing.T) {

f.UnregisterPChannel(pchannel)
assert.Equal(t, 0, f.(*flusherImpl).fgMgr.GetFlowgraphCount())
assert.Equal(t, 0, f.(*flusherImpl).scanners.Len())
assert.Equal(t, 0, f.(*flusherImpl).tasks.Len())
assert.Equal(t, 0, f.(*flusherImpl).channelLifetimes.Len())
}

func TestFlusher_RegisterVChannel(t *testing.T) {
Expand Down Expand Up @@ -199,8 +198,7 @@ func TestFlusher_RegisterVChannel(t *testing.T) {
f.UnregisterVChannel(vchannel)
}
assert.Equal(t, 0, f.(*flusherImpl).fgMgr.GetFlowgraphCount())
assert.Equal(t, 0, f.(*flusherImpl).scanners.Len())
assert.Equal(t, 0, f.(*flusherImpl).tasks.Len())
assert.Equal(t, 0, f.(*flusherImpl).channelLifetimes.Len())
}

func TestFlusher_Concurrency(t *testing.T) {
Expand Down Expand Up @@ -248,6 +246,5 @@ func TestFlusher_Concurrency(t *testing.T) {
}

assert.Equal(t, 0, f.(*flusherImpl).fgMgr.GetFlowgraphCount())
assert.Equal(t, 0, f.(*flusherImpl).scanners.Len())
assert.Equal(t, 0, f.(*flusherImpl).tasks.Len())
assert.Equal(t, 0, f.(*flusherImpl).channelLifetimes.Len())
}

0 comments on commit 2e090b2

Please sign in to comment.