Skip to content

Commit

Permalink
enhance: avoid to create many timer object in the target (#36570)
Browse files Browse the repository at this point in the history
/kind improvement

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG authored Sep 29, 2024
1 parent a47abb2 commit 9c1772f
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 24 deletions.
30 changes: 11 additions & 19 deletions pkg/mq/msgdispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/lifetime"
)

func TestDispatcher(t *testing.T) {
Expand Down Expand Up @@ -71,18 +70,15 @@ func TestDispatcher(t *testing.T) {
d, err := NewDispatcher(ctx, newMockFactory(), true, "mock_pchannel_0", nil, "mock_subName_0", common.SubscriptionPositionEarliest, nil, nil, false)
assert.NoError(t, err)
output := make(chan *msgstream.MsgPack, 1024)
d.AddTarget(&target{
vchannel: "mock_vchannel_0",
pos: nil,
ch: output,
cancelCh: lifetime.NewSafeChan(),
})
d.AddTarget(&target{
vchannel: "mock_vchannel_1",
pos: nil,
ch: nil,
cancelCh: lifetime.NewSafeChan(),
})

getTarget := func(vchannel string, pos *Pos, ch chan *msgstream.MsgPack) *target {
target := newTarget(vchannel, pos)
target.ch = ch
return target
}

d.AddTarget(getTarget("mock_vchannel_0", nil, output))
d.AddTarget(getTarget("mock_vchannel_1", nil, nil))
num := d.TargetNum()
assert.Equal(t, 2, num)

Expand All @@ -106,12 +102,8 @@ func TestDispatcher(t *testing.T) {
t.Run("test concurrent send and close", func(t *testing.T) {
for i := 0; i < 100; i++ {
output := make(chan *msgstream.MsgPack, 1024)
target := &target{
vchannel: "mock_vchannel_0",
pos: nil,
ch: output,
cancelCh: lifetime.NewSafeChan(),
}
target := newTarget("mock_vchannel_0", nil)
target.ch = output
assert.Equal(t, cap(output), cap(target.ch))
wg := &sync.WaitGroup{}
for j := 0; j < 100; j++ {
Expand Down
12 changes: 9 additions & 3 deletions pkg/mq/msgdispatcher/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,21 @@ type target struct {
closeMu sync.Mutex
closeOnce sync.Once
closed bool
maxLag time.Duration
timer *time.Timer

cancelCh lifetime.SafeChan
}

func newTarget(vchannel string, pos *Pos) *target {
maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second)
t := &target{
vchannel: vchannel,
ch: make(chan *MsgPack, paramtable.Get().MQCfg.TargetBufSize.GetAsInt()),
pos: pos,
cancelCh: lifetime.NewSafeChan(),
maxLag: maxTolerantLag,
timer: time.NewTimer(maxTolerantLag),
}
t.closed = false
return t
Expand All @@ -57,6 +62,7 @@ func (t *target) close() {
defer t.closeMu.Unlock()
t.closeOnce.Do(func() {
t.closed = true
t.timer.Stop()
close(t.ch)
})
}
Expand All @@ -67,13 +73,13 @@ func (t *target) send(pack *MsgPack) error {
if t.closed {
return nil
}
maxTolerantLag := paramtable.Get().MQCfg.MaxTolerantLag.GetAsDuration(time.Second)
t.timer.Reset(t.maxLag)
select {
case <-t.cancelCh.CloseCh():
log.Info("target closed", zap.String("vchannel", t.vchannel))
return nil
case <-time.After(maxTolerantLag):
return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, maxTolerantLag)
case <-t.timer.C:
return fmt.Errorf("send target timeout, vchannel=%s, timeout=%s", t.vchannel, t.maxLag)
case t.ch <- pack:
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/paramtable/base_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func SkipRemote(skip bool) Option {
}
}

func skipEnv(skip bool) Option {
func SkipEnv(skip bool) Option {
return func(bt *baseTableConfig) {
bt.skipEnv = skip
}
Expand All @@ -112,7 +112,7 @@ func skipEnv(skip bool) Option {
// NewBaseTableFromYamlOnly only used in migration tool.
// Maybe we shouldn't limit the configDir internally.
func NewBaseTableFromYamlOnly(yaml string) *BaseTable {
return NewBaseTable(Files([]string{yaml}), SkipRemote(true), skipEnv(true))
return NewBaseTable(Files([]string{yaml}), SkipRemote(true), SkipEnv(true))
}

func NewBaseTable(opts ...Option) *BaseTable {
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/typeutil/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,12 @@ func (m *ConcurrentMap[K, V]) Values() []V {
})
return ret
}

func (m *ConcurrentMap[K, V]) Keys() []K {
ret := make([]K, m.Len())
m.inner.Range(func(key, value any) bool {
ret = append(ret, key.(K))
return true
})
return ret
}

0 comments on commit 9c1772f

Please sign in to comment.