Skip to content

Commit

Permalink
Add some log and improve TestSessionProcessActiveStandBy test case
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Sep 27, 2023
1 parent a8ce1b6 commit 79d5d29
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
6 changes: 5 additions & 1 deletion internal/datacoord/segment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,11 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin
ret := make([]UniqueID, 0, len(s.segments))
for _, id := range s.segments {
info := s.meta.GetHealthySegment(id)
if info == nil || info.InsertChannel != channel {
if info == nil {
continue
}
if info.InsertChannel != channel {
log.Warn("the channel of flushable segments isn't equal", zap.String("insert_channel", info.InsertChannel), zap.String("channel", channel), zap.Int64("segment", id))
continue
}
if s.flushPolicy(info, t) {
Expand Down
15 changes: 13 additions & 2 deletions internal/util/sessionutil/session_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,19 @@ func TestSessionProcessActiveStandBy(t *testing.T) {
log.Debug("Stop session 1, session 2 will take over primary service")
assert.False(t, flag)

s1.Stop()
<-signal
s1.safeCloseLiveCh()
{
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, _ = s1.etcdCli.Revoke(ctx, *s1.leaseID)
}
select {
case <-signal:
log.Debug("receive s1 signal")
case <-time.After(10 * time.Second):
log.Debug("wait to fail Liveness Check timeout")
t.FailNow()
}
assert.True(t, flag)

wg.Wait()
Expand Down
2 changes: 2 additions & 0 deletions pkg/mq/msgstream/mq_msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,8 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosi
MsgID: msg.ID().Serialize(),
})
ms.chanMsgBuf[consumer] = append(ms.chanMsgBuf[consumer], tsMsg)
} else {
log.Info("skip msg", zap.Any("msg", tsMsg))
}
}
}
Expand Down

0 comments on commit 79d5d29

Please sign in to comment.