Skip to content

Commit

Permalink
fix: port listen racing in mix or standalone mode (#36442)
Browse files Browse the repository at this point in the history
issue: #36441

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Sep 26, 2024
1 parent 7ff4169 commit d29e01e
Show file tree
Hide file tree
Showing 33 changed files with 906 additions and 485 deletions.
10 changes: 9 additions & 1 deletion cmd/components/data_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,22 @@ type DataCoord struct {

// NewDataCoord creates a new DataCoord
func NewDataCoord(ctx context.Context, factory dependency.Factory) (*DataCoord, error) {
s := grpcdatacoordclient.NewServer(ctx, factory)
s, err := grpcdatacoordclient.NewServer(ctx, factory)
if err != nil {
return nil, err
}

return &DataCoord{
ctx: ctx,
svr: s,
}, nil
}

// Prepare prepares service
func (s *DataCoord) Prepare() error {
return s.svr.Prepare()
}

// Run starts service
func (s *DataCoord) Run() error {
if err := s.svr.Run(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/components/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func NewDataNode(ctx context.Context, factory dependency.Factory) (*DataNode, er
}, nil
}

func (d *DataNode) Prepare() error {
return d.svr.Prepare()
}

// Run starts service
func (d *DataNode) Run() error {
if err := d.svr.Run(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/components/index_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func NewIndexCoord(ctx context.Context, factory dependency.Factory) (*IndexCoord
return &IndexCoord{}, nil
}

func (s *IndexCoord) Prepare() error {
return nil
}

// Run starts service
func (s *IndexCoord) Run() error {
log.Info("IndexCoord running ...")
Expand Down
4 changes: 4 additions & 0 deletions cmd/components/index_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func NewIndexNode(ctx context.Context, factory dependency.Factory) (*IndexNode,
return n, nil
}

func (n *IndexNode) Prepare() error {
return n.svr.Prepare()
}

// Run starts service
func (n *IndexNode) Run() error {
if err := n.svr.Run(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/components/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) {
return n, nil
}

func (n *Proxy) Prepare() error {
return n.svr.Prepare()
}

// Run starts service
func (n *Proxy) Run() error {
if err := n.svr.Run(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/components/query_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func NewQueryCoord(ctx context.Context, factory dependency.Factory) (*QueryCoord
}, nil
}

func (qs *QueryCoord) Prepare() error {
return qs.svr.Prepare()
}

// Run starts service
func (qs *QueryCoord) Run() error {
if err := qs.svr.Run(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/components/query_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func NewQueryNode(ctx context.Context, factory dependency.Factory) (*QueryNode,
}, nil
}

func (q *QueryNode) Prepare() error {
return q.svr.Prepare()
}

// Run starts service
func (q *QueryNode) Run() error {
if err := q.svr.Run(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/components/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func NewRootCoord(ctx context.Context, factory dependency.Factory) (*RootCoord,
}, nil
}

func (rc *RootCoord) Prepare() error {
return rc.svr.Prepare()
}

// Run starts service
func (rc *RootCoord) Run() error {
if err := rc.svr.Run(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func stopRocksmq() {

type component interface {
healthz.Indicator
Prepare() error
Run() error
Stop() error
}
Expand Down Expand Up @@ -121,6 +122,9 @@ func runComponent[T component](ctx context.Context,
if err != nil {
panic(err)
}
if err := role.Prepare(); err != nil {
panic(err)
}
close(sign)
if err := role.Run(); err != nil {
panic(err)
Expand Down
51 changes: 31 additions & 20 deletions internal/distributed/datacoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package grpcdatacoord

import (
"context"
"net"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -51,6 +49,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/interceptor"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/netutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tikv"
)
Expand All @@ -70,23 +69,37 @@ type Server struct {

grpcErrChan chan error
grpcServer *grpc.Server
listener *netutil.NetListener
}

// NewServer new data service grpc server
func NewServer(ctx context.Context, factory dependency.Factory, opts ...datacoord.Option) *Server {
func NewServer(ctx context.Context, factory dependency.Factory, opts ...datacoord.Option) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)

s := &Server{
ctx: ctx1,
cancel: cancel,
grpcErrChan: make(chan error),
}
s.dataCoord = datacoord.CreateServer(s.ctx, factory, opts...)
return s
return s, nil
}

var getTiKVClient = tikv.GetTiKVClient

func (s *Server) Prepare() error {
listener, err := netutil.NewListener(
netutil.OptIP(paramtable.Get().DataCoordGrpcServerCfg.IP),
netutil.OptPort(paramtable.Get().DataCoordGrpcServerCfg.Port.GetAsInt()),
)
if err != nil {
log.Warn("DataCoord fail to create net listener", zap.Error(err))
return err
}
log.Info("DataCoord listen on", zap.String("address", listener.Addr().String()), zap.Int("port", listener.Port()))
s.listener = listener
return nil
}

func (s *Server) init() error {
params := paramtable.Get()
etcdConfig := &params.EtcdCfg
Expand All @@ -108,7 +121,7 @@ func (s *Server) init() error {
}
s.etcdCli = etcdCli
s.dataCoord.SetEtcdClient(etcdCli)
s.dataCoord.SetAddress(params.DataCoordGrpcServerCfg.GetAddress())
s.dataCoord.SetAddress(s.listener.Address())

if params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
log.Info("Connecting to tikv metadata storage.")
Expand All @@ -135,26 +148,18 @@ func (s *Server) init() error {
}

func (s *Server) startGrpc() error {
Params := &paramtable.Get().DataCoordGrpcServerCfg
s.grpcWG.Add(1)
go s.startGrpcLoop(Params.Port.GetAsInt())
go s.startGrpcLoop()
// wait for grpc server loop start
err := <-s.grpcErrChan
return err
}

func (s *Server) startGrpcLoop(grpcPort int) {
func (s *Server) startGrpcLoop() {
defer logutil.LogPanic()
defer s.grpcWG.Done()

Params := &paramtable.Get().DataCoordGrpcServerCfg
log.Debug("network port", zap.Int("port", grpcPort))
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
if err != nil {
log.Error("grpc server failed to listen error", zap.Error(err))
s.grpcErrChan <- err
return
}

ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
Expand Down Expand Up @@ -204,7 +209,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
s.dataCoord.RegisterStreamingCoordGRPCService(s.grpcServer)
}
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {
if err := s.grpcServer.Serve(s.listener); err != nil {
s.grpcErrChan <- err
}
}
Expand All @@ -227,8 +232,10 @@ func (s *Server) start() error {
// Stop stops the DataCoord server gracefully.
// Need to call the GracefulStop interface of grpc server and call the stop method of the inner DataCoord object.
func (s *Server) Stop() (err error) {
Params := &paramtable.Get().DataCoordGrpcServerCfg
logger := log.With(zap.String("address", Params.GetAddress()))
logger := log.With()
if s.listener != nil {
logger = log.With(zap.String("address", s.listener.Address()))
}
logger.Info("Datacoord stopping")
defer func() {
logger.Info("Datacoord stopped", zap.Error(err))
Expand All @@ -251,8 +258,12 @@ func (s *Server) Stop() (err error) {
log.Error("failed to close dataCoord", zap.Error(err))
return err
}

s.cancel()

// release the listener
if s.listener != nil {
s.listener.Close()
}
return nil
}

Expand Down
38 changes: 27 additions & 11 deletions internal/distributed/datacoord/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func Test_NewServer(t *testing.T) {

ctx := context.Background()
mockDataCoord := mocks.NewMockDataCoord(t)
server := NewServer(ctx, nil)
server, err := NewServer(ctx, nil)
assert.NoError(t, err)
assert.NotNil(t, server)
server.dataCoord = mockDataCoord

Expand Down Expand Up @@ -342,7 +343,8 @@ func Test_Run(t *testing.T) {
defer func() {
getTiKVClient = tikv.GetTiKVClient
}()
server := NewServer(ctx, nil)
server, err := NewServer(ctx, nil)
assert.NoError(t, err)
assert.NotNil(t, server)

mockDataCoord := mocks.NewMockDataCoord(t)
Expand All @@ -354,7 +356,9 @@ func Test_Run(t *testing.T) {
mockDataCoord.EXPECT().Init().Return(nil)
mockDataCoord.EXPECT().Start().Return(nil)
mockDataCoord.EXPECT().Register().Return(nil)
err := server.Run()
err = server.Prepare()
assert.NoError(t, err)
err = server.Run()
assert.NoError(t, err)

mockDataCoord.EXPECT().Stop().Return(nil)
Expand All @@ -367,15 +371,18 @@ func Test_Run(t *testing.T) {

t.Run("test init error", func(t *testing.T) {
ctx := context.Background()
server := NewServer(ctx, nil)
server, err := NewServer(ctx, nil)
assert.NotNil(t, server)
assert.NoError(t, err)
mockDataCoord := mocks.NewMockDataCoord(t)
mockDataCoord.EXPECT().SetEtcdClient(mock.Anything)
mockDataCoord.EXPECT().SetAddress(mock.Anything)
mockDataCoord.EXPECT().Init().Return(errors.New("error"))
server.dataCoord = mockDataCoord

err := server.Run()
err = server.Prepare()
assert.NoError(t, err)
err = server.Run()
assert.Error(t, err)

mockDataCoord.EXPECT().Stop().Return(nil)
Expand All @@ -384,7 +391,8 @@ func Test_Run(t *testing.T) {

t.Run("test register error", func(t *testing.T) {
ctx := context.Background()
server := NewServer(ctx, nil)
server, err := NewServer(ctx, nil)
assert.NoError(t, err)
assert.NotNil(t, server)
mockDataCoord := mocks.NewMockDataCoord(t)
mockDataCoord.EXPECT().SetEtcdClient(mock.Anything)
Expand All @@ -393,7 +401,9 @@ func Test_Run(t *testing.T) {
mockDataCoord.EXPECT().Register().Return(errors.New("error"))
server.dataCoord = mockDataCoord

err := server.Run()
err = server.Prepare()
assert.NoError(t, err)
err = server.Run()
assert.Error(t, err)

mockDataCoord.EXPECT().Stop().Return(nil)
Expand All @@ -402,7 +412,8 @@ func Test_Run(t *testing.T) {

t.Run("test start error", func(t *testing.T) {
ctx := context.Background()
server := NewServer(ctx, nil)
server, err := NewServer(ctx, nil)
assert.NoError(t, err)
assert.NotNil(t, server)
mockDataCoord := mocks.NewMockDataCoord(t)
mockDataCoord.EXPECT().SetEtcdClient(mock.Anything)
Expand All @@ -412,7 +423,9 @@ func Test_Run(t *testing.T) {
mockDataCoord.EXPECT().Start().Return(errors.New("error"))
server.dataCoord = mockDataCoord

err := server.Run()
err = server.Prepare()
assert.NoError(t, err)
err = server.Run()
assert.Error(t, err)

mockDataCoord.EXPECT().Stop().Return(nil)
Expand All @@ -421,7 +434,8 @@ func Test_Run(t *testing.T) {

t.Run("test stop error", func(t *testing.T) {
ctx := context.Background()
server := NewServer(ctx, nil)
server, err := NewServer(ctx, nil)
assert.NoError(t, err)
assert.NotNil(t, server)
mockDataCoord := mocks.NewMockDataCoord(t)
mockDataCoord.EXPECT().SetEtcdClient(mock.Anything)
Expand All @@ -431,7 +445,9 @@ func Test_Run(t *testing.T) {
mockDataCoord.EXPECT().Start().Return(nil)
server.dataCoord = mockDataCoord

err := server.Run()
err = server.Prepare()
assert.NoError(t, err)
err = server.Run()
assert.NoError(t, err)

mockDataCoord.EXPECT().Stop().Return(errors.New("error"))
Expand Down
Loading

0 comments on commit d29e01e

Please sign in to comment.