Skip to content

Commit

Permalink
enhance: add the param to control whether to include the current msg (m…
Browse files Browse the repository at this point in the history
…ilvus-io#35656)

/kind improvement

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG authored Aug 23, 2024
1 parent 59387f0 commit 9dc1311
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 19 deletions.
12 changes: 5 additions & 7 deletions pkg/mq/msgdispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,12 @@ type Dispatcher struct {
}

func NewDispatcher(ctx context.Context,
factory msgstream.Factory,
isMain bool,
pchannel string,
position *Pos,
subName string,
subPos SubPos,
factory msgstream.Factory, isMain bool,
pchannel string, position *Pos,
subName string, subPos SubPos,
lagNotifyChan chan struct{},
lagTargets *typeutil.ConcurrentMap[string, *target],
includeCurrentMsg bool,
) (*Dispatcher, error) {
log := log.With(zap.String("pchannel", pchannel),
zap.String("subName", subName), zap.Bool("isMain", isMain))
Expand All @@ -106,7 +104,7 @@ func NewDispatcher(ctx context.Context,
return nil, err
}

err = stream.Seek(ctx, []*Pos{position}, false)
err = stream.Seek(ctx, []*Pos{position}, includeCurrentMsg)
if err != nil {
stream.Close()
log.Error("seek failed", zap.Error(err))
Expand Down
12 changes: 4 additions & 8 deletions pkg/mq/msgdispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import (
func TestDispatcher(t *testing.T) {
ctx := context.Background()
t.Run("test base", func(t *testing.T) {
d, err := NewDispatcher(ctx, newMockFactory(), true, "mock_pchannel_0", nil,
"mock_subName_0", common.SubscriptionPositionEarliest, nil, nil)
d, err := NewDispatcher(ctx, newMockFactory(), true, "mock_pchannel_0", nil, "mock_subName_0", common.SubscriptionPositionEarliest, nil, nil, false)
assert.NoError(t, err)
assert.NotPanics(t, func() {
d.Handle(start)
Expand All @@ -62,16 +61,14 @@ func TestDispatcher(t *testing.T) {
return ms, nil
},
}
d, err := NewDispatcher(ctx, factory, true, "mock_pchannel_0", nil,
"mock_subName_0", common.SubscriptionPositionEarliest, nil, nil)
d, err := NewDispatcher(ctx, factory, true, "mock_pchannel_0", nil, "mock_subName_0", common.SubscriptionPositionEarliest, nil, nil, false)

assert.Error(t, err)
assert.Nil(t, d)
})

t.Run("test target", func(t *testing.T) {
d, err := NewDispatcher(ctx, newMockFactory(), true, "mock_pchannel_0", nil,
"mock_subName_0", common.SubscriptionPositionEarliest, nil, nil)
d, err := NewDispatcher(ctx, newMockFactory(), true, "mock_pchannel_0", nil, "mock_subName_0", common.SubscriptionPositionEarliest, nil, nil, false)
assert.NoError(t, err)
output := make(chan *msgstream.MsgPack, 1024)
d.AddTarget(&target{
Expand Down Expand Up @@ -136,8 +133,7 @@ func TestDispatcher(t *testing.T) {
}

func BenchmarkDispatcher_handle(b *testing.B) {
d, err := NewDispatcher(context.Background(), newMockFactory(), true, "mock_pchannel_0", nil,
"mock_subName_0", common.SubscriptionPositionEarliest, nil, nil)
d, err := NewDispatcher(context.Background(), newMockFactory(), true, "mock_pchannel_0", nil, "mock_subName_0", common.SubscriptionPositionEarliest, nil, nil, false)
assert.NoError(b, err)

for i := 0; i < b.N; i++ {
Expand Down
6 changes: 2 additions & 4 deletions pkg/mq/msgdispatcher/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ func (c *dispatcherManager) Add(ctx context.Context, vchannel string, pos *Pos,
c.mu.Lock()
defer c.mu.Unlock()
isMain := c.mainDispatcher == nil
d, err := NewDispatcher(ctx, c.factory, isMain, c.pchannel, pos,
c.constructSubName(vchannel, isMain), subPos, c.lagNotifyChan, c.lagTargets)
d, err := NewDispatcher(ctx, c.factory, isMain, c.pchannel, pos, c.constructSubName(vchannel, isMain), subPos, c.lagNotifyChan, c.lagTargets, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -234,8 +233,7 @@ func (c *dispatcherManager) split(t *target) {
var newSolo *Dispatcher
err := retry.Do(context.Background(), func() error {
var err error
newSolo, err = NewDispatcher(context.Background(), c.factory, false, c.pchannel, t.pos,
c.constructSubName(t.vchannel, false), common.SubscriptionPositionUnknown, c.lagNotifyChan, c.lagTargets)
newSolo, err = NewDispatcher(context.Background(), c.factory, false, c.pchannel, t.pos, c.constructSubName(t.vchannel, false), common.SubscriptionPositionUnknown, c.lagNotifyChan, c.lagTargets, true)
return err
}, retry.Attempts(10))
if err != nil {
Expand Down

0 comments on commit 9dc1311

Please sign in to comment.