Skip to content

Commit

Permalink
fix component not exit when liveness check failed (#27236)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Sep 22, 2023
1 parent 4b12cb8 commit 9433a24
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 23 deletions.
22 changes: 16 additions & 6 deletions internal/util/sessionutil/session_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,10 @@ func (s *Session) initWatchSessionCh(ctx context.Context) error {

err = retry.Do(ctx, func() error {
getResp, err = s.etcdCli.Get(ctx, s.getSessionKey())
log.Warn("fail to get the session key from the etcd", zap.Error(err))
return err
}, retry.Attempts(uint(s.sessionRetryTimes)))
if err != nil {
log.Warn("fail to get the session key from the etcd", zap.Error(err))
return err
}
s.watchSessionKeyCh = s.etcdCli.Watch(ctx, s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision))
Expand Down Expand Up @@ -785,18 +785,31 @@ func (w *sessionWatcher) handleWatchErr(err error) error {
// LivenessCheck performs liveness check with provided context and channel
// ctx controls the liveness check loop
// ch is the liveness signal channel, ch is closed only when the session is expired
// callback is the function to call when ch is closed, note that callback will not be invoked when loop exits due to context
// callback must be called before liveness check exit, to close the session's owner component
func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
err := s.initWatchSessionCh(ctx)
if err != nil {
log.Error("failed to get session for liveness check", zap.Error(err))
s.cancelKeepAlive()
if callback != nil {
go callback()
}
return
}

s.wg.Add(1)
go func() {
defer s.wg.Done()
if callback != nil {
// before exit liveness check, callback to exit the session owner
defer func() {
if ctx.Err() == nil {
go callback()
}
}()
}
defer s.SetDisconnected(true)
for {
defer s.SetDisconnected(true)
select {
case _, ok := <-s.liveCh:
// ok, still alive
Expand All @@ -805,9 +818,6 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
}
// not ok, connection lost
log.Warn("connection lost detected, shuting down")
if callback != nil {
go callback()
}
return
case <-ctx.Done():
log.Warn("liveness exits due to context done")
Expand Down
51 changes: 34 additions & 17 deletions internal/util/sessionutil/session_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
"go.uber.org/atomic"
"go.uber.org/zap"

etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
Expand Down Expand Up @@ -194,39 +195,55 @@ func TestSessionLivenessCheck(t *testing.T) {
etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints)
require.NoError(t, err)
s := NewSession(context.Background(), metaRoot, etcdCli)
ctx := context.Background()
s.Register()
ch := make(chan struct{})
s.liveCh = ch
signal := make(chan struct{}, 1)

flag := false

s.LivenessCheck(ctx, func() {
flag = true
flag := atomic.NewBool(false)
s.LivenessCheck(context.Background(), func() {
flag.Store(true)
signal <- struct{}{}
})
assert.False(t, flag.Load())

assert.False(t, flag)
// test liveCh receive event, liveness won't exit, callback won't trigger
ch <- struct{}{}
assert.False(t, flag.Load())

assert.False(t, flag)
// test close liveCh, liveness exit, callback should trigger
close(ch)

<-signal
assert.True(t, flag)
assert.True(t, flag.Load())

ctx, cancel := context.WithCancel(ctx)
cancel()
ch = make(chan struct{})
s.liveCh = ch
flag = false
// test context done, liveness exit, callback shouldn't trigger
metaRoot = fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
s1 := NewSession(context.Background(), metaRoot, etcdCli)
s1.Register()
ctx, cancel := context.WithCancel(context.Background())
flag.Store(false)

s.LivenessCheck(ctx, func() {
flag = true
s1.LivenessCheck(ctx, func() {
flag.Store(true)
signal <- struct{}{}
})
cancel()
assert.False(t, flag.Load())

assert.False(t, flag)
// test context done, liveness start failed, callback should trigger
metaRoot = fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
s2 := NewSession(context.Background(), metaRoot, etcdCli)
s2.Register()
ctx, cancel = context.WithCancel(context.Background())
signal = make(chan struct{}, 1)
flag.Store(false)
cancel()
s2.LivenessCheck(ctx, func() {
flag.Store(true)
signal <- struct{}{}
})
<-signal
assert.True(t, flag.Load())
}

func TestWatcherHandleWatchResp(t *testing.T) {
Expand Down

0 comments on commit 9433a24

Please sign in to comment.