Skip to content

Commit

Permalink
enhance: [Cherry-pick] Moving etcd client into session (#27069) (#28996)
Browse files Browse the repository at this point in the history
relate: #26694
pr: #27069

Signed-off-by: Filip Haltmayer <[email protected]>
Signed-off-by: MrPresent-Han <[email protected]>
Co-authored-by: Filip Haltmayer <[email protected]>
  • Loading branch information
MrPresent-Han and filip-halt authored Dec 7, 2023
1 parent 8502037 commit 5f4ac43
Show file tree
Hide file tree
Showing 34 changed files with 436 additions and 154 deletions.
4 changes: 4 additions & 0 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/http/healthz"
rocksmqimpl "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/internal/util/dependency"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
internalmetrics "github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -413,6 +414,9 @@ func (mr *MilvusRoles) Run() {
log.Info("proxy stopped!")
}

// close reused etcd client
kvfactory.CloseEtcdClient()

log.Info("Milvus components graceful stop done")
}

Expand Down
3 changes: 1 addition & 2 deletions cmd/tools/migration/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ func (r *Runner) initEtcdCli() {

func (r *Runner) init() {
r.initEtcdCli()

r.session = sessionutil.NewSession(
r.session = sessionutil.NewSessionWithEtcd(
r.ctx,
r.cfg.EtcdCfg.MetaRootPath.GetValue(),
r.etcdCli,
Expand Down
12 changes: 6 additions & 6 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type dataNodeCreatorFunc func(ctx context.Context, addr string, nodeID int64) (t

type indexNodeCreatorFunc func(ctx context.Context, addr string, nodeID int64) (types.IndexNodeClient, error)

type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdClient *clientv3.Client) (types.RootCoordClient, error)
type rootCoordCreatorFunc func(ctx context.Context) (types.RootCoordClient, error)

// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)
Expand Down Expand Up @@ -236,8 +236,8 @@ func defaultIndexNodeCreatorFunc(ctx context.Context, addr string, nodeID int64)
return indexnodeclient.NewClient(ctx, addr, nodeID, Params.DataCoordCfg.WithCredential.GetAsBool())
}

func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, client *clientv3.Client) (types.RootCoordClient, error) {
return rootcoordclient.NewClient(ctx, metaRootPath, client)
func defaultRootCoordCreatorFunc(ctx context.Context) (types.RootCoordClient, error) {
return rootcoordclient.NewClient(ctx)
}

// QuitSignal returns signal when server quits
Expand Down Expand Up @@ -282,14 +282,14 @@ func (s *Server) Register() error {
}

func (s *Server) initSession() error {
s.icSession = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli)
s.icSession = sessionutil.NewSession(s.ctx)
if s.icSession == nil {
return errors.New("failed to initialize IndexCoord session")
}
s.icSession.Init(typeutil.IndexCoordRole, s.address, true, true)
s.icSession.SetEnableActiveStandBy(s.enableActiveStandBy)

s.session = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli)
s.session = sessionutil.NewSession(s.ctx)
if s.session == nil {
return errors.New("failed to initialize session")
}
Expand Down Expand Up @@ -1024,7 +1024,7 @@ func (s *Server) handleFlushingSegments(ctx context.Context) {
func (s *Server) initRootCoordClient() error {
var err error
if s.rootCoordClient == nil {
if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli); err != nil {
if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx); err != nil {
return err
}
}
Expand Down
22 changes: 11 additions & 11 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2485,7 +2485,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -2529,7 +2529,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -2634,7 +2634,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -2713,7 +2713,7 @@ func TestGetRecoveryInfo(t *testing.T) {
Schema: newTestSchema(),
})

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -2810,7 +2810,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -2852,7 +2852,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -2893,7 +2893,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -3166,7 +3166,7 @@ func TestOptions(t *testing.T) {
t.Run("WithRootCoordCreator", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
var crt rootCoordCreatorFunc = func(ctx context.Context, metaRoot string, etcdClient *clientv3.Client) (types.RootCoordClient, error) {
var crt rootCoordCreatorFunc = func(ctx context.Context) (types.RootCoordClient, error) {
return nil, errors.New("dummy")
}
opt := WithRootCoordCreator(crt)
Expand Down Expand Up @@ -4170,7 +4170,7 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
svr.dataNodeCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return newMockDataNodeClient(0, receiveCh)
}
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -4224,7 +4224,7 @@ func newTestServerWithMeta(t *testing.T, receiveCh chan any, meta *meta, opts ..
svr.dataNodeCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return newMockDataNodeClient(0, receiveCh)
}
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}
// indexCoord := mocks.NewMockIndexCoord(t)
Expand Down Expand Up @@ -4281,7 +4281,7 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server {
svr.dataNodeCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return newMockDataNodeClient(0, receiveCh)
}
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down
Loading

0 comments on commit 5f4ac43

Please sign in to comment.