Skip to content

Commit

Permalink
Moving etcd client into session
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Haltmayer <[email protected]>
  • Loading branch information
Filip Haltmayer committed Oct 26, 2023
1 parent 852be15 commit c8d7108
Show file tree
Hide file tree
Showing 34 changed files with 248 additions and 155 deletions.
4 changes: 4 additions & 0 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/http/healthz"
rocksmqimpl "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/internal/util/dependency"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
internalmetrics "github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -425,6 +426,9 @@ func (mr *MilvusRoles) Run() {
log.Info("proxy stopped")
}

// close reused etcd client
kvfactory.CloseEtcdClient()

log.Info("Milvus components graceful stop done")
}

Expand Down
3 changes: 1 addition & 2 deletions cmd/tools/migration/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ func (r *Runner) initEtcdCli() {

func (r *Runner) init() {
r.initEtcdCli()

r.session = sessionutil.NewSession(
r.session = sessionutil.NewSessionWithEtcd(
r.ctx,
r.cfg.EtcdCfg.MetaRootPath.GetValue(),
r.etcdCli,
Expand Down
12 changes: 6 additions & 6 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type dataNodeCreatorFunc func(ctx context.Context, addr string, nodeID int64) (t

type indexNodeCreatorFunc func(ctx context.Context, addr string, nodeID int64) (types.IndexNodeClient, error)

type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdClient *clientv3.Client) (types.RootCoordClient, error)
type rootCoordCreatorFunc func(ctx context.Context) (types.RootCoordClient, error)

// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)
Expand Down Expand Up @@ -235,8 +235,8 @@ func defaultIndexNodeCreatorFunc(ctx context.Context, addr string, nodeID int64)
return indexnodeclient.NewClient(ctx, addr, nodeID, Params.DataCoordCfg.WithCredential.GetAsBool())
}

func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, client *clientv3.Client) (types.RootCoordClient, error) {
return rootcoordclient.NewClient(ctx, metaRootPath, client)
func defaultRootCoordCreatorFunc(ctx context.Context) (types.RootCoordClient, error) {
return rootcoordclient.NewClient(ctx)
}

// QuitSignal returns signal when server quits
Expand Down Expand Up @@ -281,14 +281,14 @@ func (s *Server) Register() error {
}

func (s *Server) initSession() error {
s.icSession = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli)
s.icSession = sessionutil.NewSession(s.ctx)
if s.icSession == nil {
return errors.New("failed to initialize IndexCoord session")
}
s.icSession.Init(typeutil.IndexCoordRole, s.address, true, true)
s.icSession.SetEnableActiveStandBy(s.enableActiveStandBy)

s.session = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli)
s.session = sessionutil.NewSession(s.ctx)
if s.session == nil {
return errors.New("failed to initialize session")
}
Expand Down Expand Up @@ -1005,7 +1005,7 @@ func (s *Server) handleFlushingSegments(ctx context.Context) {
func (s *Server) initRootCoordClient() error {
var err error
if s.rootCoordClient == nil {
if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli); err != nil {
if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx); err != nil {
return err
}
}
Expand Down
22 changes: 11 additions & 11 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2485,7 +2485,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -2529,7 +2529,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -2634,7 +2634,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -2713,7 +2713,7 @@ func TestGetRecoveryInfo(t *testing.T) {
Schema: newTestSchema(),
})

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -2810,7 +2810,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -2852,7 +2852,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -2893,7 +2893,7 @@ func TestGetRecoveryInfo(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -3166,7 +3166,7 @@ func TestOptions(t *testing.T) {
t.Run("WithRootCoordCreator", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
var crt rootCoordCreatorFunc = func(ctx context.Context, metaRoot string, etcdClient *clientv3.Client) (types.RootCoordClient, error) {
var crt rootCoordCreatorFunc = func(ctx context.Context) (types.RootCoordClient, error) {
return nil, errors.New("dummy")
}
opt := WithRootCoordCreator(crt)
Expand Down Expand Up @@ -4170,7 +4170,7 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
svr.dataNodeCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return newMockDataNodeClient(0, receiveCh)
}
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -4224,7 +4224,7 @@ func newTestServerWithMeta(t *testing.T, receiveCh chan any, meta *meta, opts ..
svr.dataNodeCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return newMockDataNodeClient(0, receiveCh)
}
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}
// indexCoord := mocks.NewMockIndexCoord(t)
Expand Down Expand Up @@ -4281,7 +4281,7 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server {
svr.dataNodeCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return newMockDataNodeClient(0, receiveCh)
}
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down
17 changes: 8 additions & 9 deletions internal/datacoord/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
Expand Down Expand Up @@ -102,7 +101,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -144,7 +143,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -252,7 +251,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -333,7 +332,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
Schema: newTestSchema(),
})

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -440,7 +439,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -485,7 +484,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -529,7 +528,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down Expand Up @@ -720,7 +719,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)

svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) {
svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) {
return newMockRootCoordClient(), nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (node *DataNode) Register() error {
}

func (node *DataNode) initSession() error {
node.session = sessionutil.NewSession(node.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), node.etcdCli)
node.session = sessionutil.NewSession(node.ctx)
if node.session == nil {
return errors.New("failed to initialize session")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/distributed/connection_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,6 @@ func initSession(ctx context.Context) *sessionutil.Session {
if err != nil {
panic(err)
}
session := sessionutil.NewSession(ctx, metaRootPath, etcdCli)
session := sessionutil.NewSessionWithEtcd(ctx, metaRootPath, etcdCli)
return session
}
5 changes: 2 additions & 3 deletions internal/distributed/datacoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"

clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"

Expand Down Expand Up @@ -51,8 +50,8 @@ type Client struct {
}

// NewClient creates a new client instance
func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (*Client, error) {
sess := sessionutil.NewSession(ctx, metaRoot, etcdCli)
func NewClient(ctx context.Context) (*Client, error) {
sess := sessionutil.NewSession(ctx)
if sess == nil {
err := fmt.Errorf("new session error, maybe can not connect to etcd")
log.Debug("DataCoordClient NewClient failed", zap.Error(err))
Expand Down
13 changes: 2 additions & 11 deletions internal/distributed/datacoord/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"google.golang.org/grpc"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proxy"
"github.com/milvus-io/milvus/internal/util/mock"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
Expand All @@ -57,16 +56,8 @@ func TestMain(m *testing.M) {

func Test_NewClient(t *testing.T) {
ctx := context.Background()
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
assert.NoError(t, err)
client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli)

client, err := NewClient(ctx)
assert.NoError(t, err)
assert.NotNil(t, client)

Expand Down
16 changes: 8 additions & 8 deletions internal/distributed/datanode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ type Server struct {
rootCoord types.RootCoord
dataCoord types.DataCoord

newRootCoordClient func(string, *clientv3.Client) (types.RootCoordClient, error)
newDataCoordClient func(string, *clientv3.Client) (types.DataCoordClient, error)
newRootCoordClient func() (types.RootCoordClient, error)
newDataCoordClient func() (types.DataCoordClient, error)
}

// NewServer new DataNode grpc server
Expand All @@ -82,11 +82,11 @@ func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error)
cancel: cancel,
factory: factory,
grpcErrChan: make(chan error),
newRootCoordClient: func(etcdMetaRoot string, client *clientv3.Client) (types.RootCoordClient, error) {
return rcc.NewClient(ctx1, etcdMetaRoot, client)
newRootCoordClient: func() (types.RootCoordClient, error) {
return rcc.NewClient(ctx1)
},
newDataCoordClient: func(etcdMetaRoot string, client *clientv3.Client) (types.DataCoordClient, error) {
return dcc.NewClient(ctx1, etcdMetaRoot, client)
newDataCoordClient: func() (types.DataCoordClient, error) {
return dcc.NewClient(ctx1)
},
}

Expand Down Expand Up @@ -253,7 +253,7 @@ func (s *Server) init() error {
// --- RootCoord Client ---
if s.newRootCoordClient != nil {
log.Info("initializing RootCoord client for DataNode")
rootCoordClient, err := s.newRootCoordClient(dn.Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli)
rootCoordClient, err := s.newRootCoordClient()
if err != nil {
log.Error("failed to create new RootCoord client", zap.Error(err))
panic(err)
Expand All @@ -272,7 +272,7 @@ func (s *Server) init() error {
// --- DataCoord Client ---
if s.newDataCoordClient != nil {
log.Debug("starting DataCoord client for DataNode")
dataCoordClient, err := s.newDataCoordClient(dn.Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli)
dataCoordClient, err := s.newDataCoordClient()
if err != nil {
log.Error("failed to create new DataCoord client", zap.Error(err))
panic(err)
Expand Down
8 changes: 4 additions & 4 deletions internal/distributed/datanode/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,11 @@ func Test_NewServer(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, server)

server.newRootCoordClient = func(string, *clientv3.Client) (types.RootCoordClient, error) {
server.newRootCoordClient = func() (types.RootCoordClient, error) {
return &mockRootCoord{}, nil
}

server.newDataCoordClient = func(string, *clientv3.Client) (types.DataCoordClient, error) {
server.newDataCoordClient = func() (types.DataCoordClient, error) {
return &mockDataCoord{}, nil
}

Expand Down Expand Up @@ -355,11 +355,11 @@ func Test_Run(t *testing.T) {
regErr: errors.New("error"),
}

server.newRootCoordClient = func(string, *clientv3.Client) (types.RootCoordClient, error) {
server.newRootCoordClient = func() (types.RootCoordClient, error) {
return &mockRootCoord{}, nil
}

server.newDataCoordClient = func(string, *clientv3.Client) (types.DataCoordClient, error) {
server.newDataCoordClient = func() (types.DataCoordClient, error) {
return &mockDataCoord{}, nil
}

Expand Down
Loading

0 comments on commit c8d7108

Please sign in to comment.