From 77e123762fff791c2d86f21a61413c9089045b61 Mon Sep 17 00:00:00 2001 From: chyezh Date: Sat, 27 Jan 2024 08:45:02 +0800 Subject: [PATCH] enhance: add graceful stop timeout to avoid node stop hang under extreme cases (#30320) 1. add coordinator and proxy graceful stop timeout to 5s. 3. add other work node graceful stop timeout to 900s, and we should potentially change this to 600s when graceful stop is smooth 4. change the order of datacoord component while stop. 5. `LivenessCheck` do not perform graceful shutdown now. issue: https://github.com/milvus-io/milvus/issues/30310 pr: #30317 also see: https://github.com/milvus-io/milvus/pull/30306 --------- Signed-off-by: chyezh --- cmd/components/data_coord.go | 8 +-- cmd/components/data_node.go | 8 +-- cmd/components/index_node.go | 8 +-- cmd/components/proxy.go | 8 +-- cmd/components/query_coord.go | 8 +-- cmd/components/query_node.go | 8 +-- cmd/components/root_coord.go | 8 +-- cmd/components/util.go | 38 +++++++++++++ cmd/components/util_test.go | 38 +++++++++++++ internal/datacoord/server.go | 24 ++++----- internal/datanode/data_node.go | 12 +---- internal/distributed/datacoord/service.go | 10 ++-- internal/distributed/datanode/service.go | 11 ++-- internal/distributed/indexnode/service.go | 9 +++- internal/distributed/proxy/service.go | 10 ++-- internal/distributed/querycoord/service.go | 13 +++-- internal/distributed/querynode/service.go | 11 ++-- internal/distributed/rootcoord/service.go | 9 +++- internal/indexnode/indexnode.go | 12 +---- internal/proxy/proxy.go | 11 +--- internal/querycoordv2/server.go | 11 +--- internal/querynodev2/server.go | 45 ++++++---------- internal/querynodev2/server_test.go | 18 +++++-- internal/rootcoord/root_coord.go | 12 +---- pkg/util/paramtable/component_param.go | 60 ++++++++++++++++++++- pkg/util/paramtable/component_param_test.go | 21 ++++++++ 26 files changed, 282 insertions(+), 149 deletions(-) create mode 100644 cmd/components/util.go create mode 100644 cmd/components/util_test.go diff --git a/cmd/components/data_coord.go b/cmd/components/data_coord.go index f7878314739f3..977a52a42dece 100644 --- a/cmd/components/data_coord.go +++ b/cmd/components/data_coord.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( grpcdatacoordclient "github.com/milvus-io/milvus/internal/distributed/datacoord" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -57,10 +59,8 @@ func (s *DataCoord) Run() error { // Stop terminates service func (s *DataCoord) Stop() error { - if err := s.svr.Stop(); err != nil { - return err - } - return nil + timeout := paramtable.Get().DataCoordCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(s.svr.Stop, timeout) } // GetComponentStates returns DataCoord's states diff --git a/cmd/components/data_node.go b/cmd/components/data_node.go index 25a7b9a91c37c..8fbba83a0800d 100644 --- a/cmd/components/data_node.go +++ b/cmd/components/data_node.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( grpcdatanode "github.com/milvus-io/milvus/internal/distributed/datanode" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -60,10 +62,8 @@ func (d *DataNode) Run() error { // Stop terminates service func (d *DataNode) Stop() error { - if err := d.svr.Stop(); err != nil { - return err - } - return nil + timeout := paramtable.Get().DataNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(d.svr.Stop, timeout) } // GetComponentStates returns DataNode's states diff --git a/cmd/components/index_node.go b/cmd/components/index_node.go index 4f947d35f4158..edf72384d4d2d 100644 --- a/cmd/components/index_node.go +++ b/cmd/components/index_node.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -58,10 +60,8 @@ func (n *IndexNode) Run() error { // Stop terminates service func (n *IndexNode) Stop() error { - if err := n.svr.Stop(); err != nil { - return err - } - return nil + timeout := paramtable.Get().IndexNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(n.svr.Stop, timeout) } // GetComponentStates returns IndexNode's states diff --git a/cmd/components/proxy.go b/cmd/components/proxy.go index 61a62df495538..cb74b36680a90 100644 --- a/cmd/components/proxy.go +++ b/cmd/components/proxy.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( grpcproxy "github.com/milvus-io/milvus/internal/distributed/proxy" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -59,10 +61,8 @@ func (n *Proxy) Run() error { // Stop terminates service func (n *Proxy) Stop() error { - if err := n.svr.Stop(); err != nil { - return err - } - return nil + timeout := paramtable.Get().ProxyCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(n.svr.Stop, timeout) } // GetComponentStates returns Proxy's states diff --git a/cmd/components/query_coord.go b/cmd/components/query_coord.go index 3c893ad697631..c98812d86ef62 100644 --- a/cmd/components/query_coord.go +++ b/cmd/components/query_coord.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( grpcquerycoord "github.com/milvus-io/milvus/internal/distributed/querycoord" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -60,10 +62,8 @@ func (qs *QueryCoord) Run() error { // Stop terminates service func (qs *QueryCoord) Stop() error { - if err := qs.svr.Stop(); err != nil { - return err - } - return nil + timeout := paramtable.Get().QueryCoordCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(qs.svr.Stop, timeout) } // GetComponentStates returns QueryCoord's states diff --git a/cmd/components/query_node.go b/cmd/components/query_node.go index 50570ec152fe4..3857f81bafa49 100644 --- a/cmd/components/query_node.go +++ b/cmd/components/query_node.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( grpcquerynode "github.com/milvus-io/milvus/internal/distributed/querynode" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -60,10 +62,8 @@ func (q *QueryNode) Run() error { // Stop terminates service func (q *QueryNode) Stop() error { - if err := q.svr.Stop(); err != nil { - return err - } - return nil + timeout := paramtable.Get().QueryNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(q.svr.Stop, timeout) } // GetComponentStates returns QueryNode's states diff --git a/cmd/components/root_coord.go b/cmd/components/root_coord.go index 720511902a911..e130516ac8d16 100644 --- a/cmd/components/root_coord.go +++ b/cmd/components/root_coord.go @@ -18,6 +18,7 @@ package components import ( "context" + "time" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( rc "github.com/milvus-io/milvus/internal/distributed/rootcoord" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -59,10 +61,8 @@ func (rc *RootCoord) Run() error { // Stop terminates service func (rc *RootCoord) Stop() error { - if rc.svr != nil { - return rc.svr.Stop() - } - return nil + timeout := paramtable.Get().RootCoordCfg.GracefulStopTimeout.GetAsDuration(time.Second) + return exitWhenStopTimeout(rc.svr.Stop, timeout) } // GetComponentStates returns RootCoord's states diff --git a/cmd/components/util.go b/cmd/components/util.go new file mode 100644 index 0000000000000..d731bb6e86f5f --- /dev/null +++ b/cmd/components/util.go @@ -0,0 +1,38 @@ +package components + +import ( + "context" + "os" + "time" + + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/util/conc" +) + +var errStopTimeout = errors.New("stop timeout") + +// exitWhenStopTimeout stops a component with timeout and exit progress when timeout. +func exitWhenStopTimeout(stop func() error, timeout time.Duration) error { + err := stopWithTimeout(stop, timeout) + if errors.Is(err, errStopTimeout) { + os.Exit(1) + } + return err +} + +// stopWithTimeout stops a component with timeout. +func stopWithTimeout(stop func() error, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + future := conc.Go(func() (struct{}, error) { + return struct{}{}, stop() + }) + select { + case <-future.Inner(): + return errors.Wrap(future.Err(), "failed to stop component") + case <-ctx.Done(): + return errStopTimeout + } +} diff --git a/cmd/components/util_test.go b/cmd/components/util_test.go new file mode 100644 index 0000000000000..4490b20c8d94e --- /dev/null +++ b/cmd/components/util_test.go @@ -0,0 +1,38 @@ +package components + +import ( + "testing" + "time" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" +) + +func TestExitWithTimeout(t *testing.T) { + // only normal path can be tested. + targetErr := errors.New("stop error") + err := exitWhenStopTimeout(func() error { + time.Sleep(1 * time.Second) + return targetErr + }, 5*time.Second) + assert.ErrorIs(t, err, targetErr) +} + +func TestStopWithTimeout(t *testing.T) { + ch := make(chan struct{}) + stop := func() error { + <-ch + return nil + } + + err := stopWithTimeout(stop, 1*time.Second) + assert.ErrorIs(t, err, errStopTimeout) + + targetErr := errors.New("stop error") + stop = func() error { + return targetErr + } + + err = stopWithTimeout(stop, 1*time.Second) + assert.ErrorIs(t, err, targetErr) +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 79831d918a9aa..d73356db92456 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -269,16 +269,7 @@ func (s *Server) Register() error { s.session.LivenessCheck(s.serverLoopCtx, func() { logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.GetServerID())) - if err := s.Stop(); err != nil { - logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataCoordRole).Dec() - // manually send signal to starter goroutine - if s.session.IsTriggerKill() { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) return nil } @@ -1050,15 +1041,23 @@ func (s *Server) Stop() error { return nil } logutil.Logger(s.ctx).Info("server shutdown") - s.cluster.Close() s.garbageCollector.close() - s.stopServerLoop() + logutil.Logger(s.ctx).Info("datacoord garbage collector stopped") if Params.DataCoordCfg.EnableCompaction.GetAsBool() { s.stopCompactionTrigger() s.stopCompactionHandler() } + logutil.Logger(s.ctx).Info("datacoord compaction stopped") + s.indexBuilder.Stop() + logutil.Logger(s.ctx).Info("datacoord index builder stopped") + + s.cluster.Close() + logutil.Logger(s.ctx).Info("index builder stopped") + + s.stopServerLoop() + logutil.Logger(s.ctx).Info("serverloop stopped") if s.session != nil { s.session.Stop() @@ -1067,6 +1066,7 @@ func (s *Server) Stop() error { if s.icSession != nil { s.icSession.Stop() } + logutil.Logger(s.ctx).Warn("datacoord stop successful") return nil } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index ede9742d81685..20e1e4640d163 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -27,7 +27,6 @@ import ( "os" "sync" "sync/atomic" - "syscall" "time" "github.com/cockroachdb/errors" @@ -188,16 +187,7 @@ func (node *DataNode) Register() error { // Start liveness check node.session.LivenessCheck(node.ctx, func() { log.Error("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.GetSession().ServerID)) - if err := node.Stop(); err != nil { - log.Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataNodeRole).Dec() - // manually send signal to starter goroutine - if node.session.TriggerKill { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) return nil diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index fc8d21ecd094f..b94c81f291165 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -218,10 +218,14 @@ func (s *Server) start() error { // Stop stops the DataCoord server gracefully. // Need to call the GracefulStop interface of grpc server and call the stop method of the inner DataCoord object. -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().DataCoordGrpcServerCfg - log.Debug("Datacoord stop", zap.String("Address", Params.GetAddress())) - var err error + logger := log.With(zap.String("address", Params.GetAddress())) + logger.Info("Datacoord stopping") + defer func() { + logger.Info("Datacoord stopped", zap.Error(err)) + }() + s.cancel() if s.etcdCli != nil { diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index a7e549e7f09ca..c2c39116780da 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -198,9 +198,14 @@ func (s *Server) Run() error { } // Stop stops Datanode's grpc service. -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().DataNodeGrpcServerCfg - log.Debug("Datanode stop", zap.String("Address", Params.GetAddress())) + logger := log.With(zap.String("address", Params.GetAddress())) + logger.Info("Datanode stopping") + defer func() { + logger.Info("Datanode stopped", zap.Error(err)) + }() + s.cancel() if s.etcdCli != nil { defer s.etcdCli.Close() @@ -209,7 +214,7 @@ func (s *Server) Stop() error { utils.GracefulStopGRPCServer(s.grpcServer) } - err := s.datanode.Stop() + err = s.datanode.Stop() if err != nil { return err } diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 10b3b8ac02a09..af1b200cbd567 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -208,9 +208,14 @@ func (s *Server) start() error { } // Stop stops IndexNode's grpc service. -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().IndexNodeGrpcServerCfg - log.Debug("IndexNode stop", zap.String("Address", Params.GetAddress())) + logger := log.With(zap.String("address", Params.GetAddress())) + logger.Info("IndexNode stopping") + defer func() { + logger.Info("IndexNode stopped", zap.Error(err)) + }() + if s.indexnode != nil { s.indexnode.Stop() } diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index e0a2ab1a00b34..0059e684c99e8 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -662,9 +662,13 @@ func (s *Server) start() error { } // Stop stop the Proxy Server -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().ProxyGrpcServerCfg - log.Debug("Proxy stop", zap.String("internal address", Params.GetInternalAddress()), zap.String("external address", Params.GetInternalAddress())) + logger := log.With(zap.String("internal address", Params.GetInternalAddress()), zap.String("external address", Params.GetInternalAddress())) + logger.Info("Proxy stopping") + defer func() { + logger.Info("Proxy stopped", zap.Error(err)) + }() if s.etcdCli != nil { defer s.etcdCli.Close() @@ -708,7 +712,7 @@ func (s *Server) Stop() error { s.wg.Wait() - err := s.proxy.Stop() + err = s.proxy.Stop() if err != nil { return err } diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 30271af103d6b..bf65d80a283b3 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -269,9 +269,14 @@ func (s *Server) start() error { } // Stop stops QueryCoord's grpc service. -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().QueryCoordGrpcServerCfg - log.Debug("QueryCoord stop", zap.String("Address", Params.GetAddress())) + logger := log.With(zap.String("address", Params.GetAddress())) + logger.Info("QueryCoord stopping") + defer func() { + logger.Info("QueryCoord stopped", zap.Error(err)) + }() + if s.etcdCli != nil { defer s.etcdCli.Close() } @@ -279,9 +284,7 @@ func (s *Server) Stop() error { if s.grpcServer != nil { utils.GracefulStopGRPCServer(s.grpcServer) } - err := s.queryCoord.Stop() - - return err + return s.queryCoord.Stop() } // SetRootCoord sets root coordinator's client diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index a94c68d221943..2ad14f15d2907 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -233,10 +233,15 @@ func (s *Server) Run() error { } // Stop stops QueryNode's grpc service. -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().QueryNodeGrpcServerCfg - log.Debug("QueryNode stop", zap.String("Address", Params.GetAddress())) - err := s.querynode.Stop() + logger := log.With(zap.String("address", Params.GetAddress())) + logger.Info("QueryNode stopping") + defer func() { + logger.Info("QueryNode stopped", zap.Error(err)) + }() + + err = s.querynode.Stop() if err != nil { return err } diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index 7d8bb0af49c41..49d12e227fe79 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -312,9 +312,14 @@ func (s *Server) start() error { return nil } -func (s *Server) Stop() error { +func (s *Server) Stop() (err error) { Params := ¶mtable.Get().RootCoordGrpcServerCfg - log.Debug("Rootcoord stop", zap.String("Address", Params.GetAddress())) + logger := log.With(zap.String("address", Params.GetAddress())) + logger.Info("Rootcoord stopping") + defer func() { + logger.Info("Rootcoord stopped", zap.Error(err)) + }() + if s.etcdCli != nil { defer s.etcdCli.Close() } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index dcf9804a8cea6..504f1ce137174 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -35,7 +35,6 @@ import ( "path" "path/filepath" "sync" - "syscall" "time" "unsafe" @@ -139,16 +138,7 @@ func (i *IndexNode) Register() error { // start liveness check i.session.LivenessCheck(i.loopCtx, func() { log.Error("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID)) - if err := i.Stop(); err != nil { - log.Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.IndexNodeRole).Dec() - // manually send signal to starter goroutine - if i.session.TriggerKill { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) return nil } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 77ee7d8368d26..afd8bd5fe4c7b 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -23,7 +23,6 @@ import ( "os" "strconv" "sync" - "syscall" "time" "github.com/cockroachdb/errors" @@ -166,15 +165,7 @@ func (node *Proxy) Register() error { log.Info("Proxy Register Finished") node.session.LivenessCheck(node.ctx, func() { log.Error("Proxy disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) - if err := node.Stop(); err != nil { - log.Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.ProxyRole).Dec() - if node.session.TriggerKill { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) // TODO Reset the logger // Params.initLogCfg() diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 8f9259c2aa47f..a8ab9f136c5e3 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -151,16 +151,7 @@ func (s *Server) Register() error { metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Inc() s.session.LivenessCheck(s.ctx, func() { log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.GetServerID())) - if err := s.Stop(); err != nil { - log.Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Dec() - // manually send signal to starter goroutine - if s.session.IsTriggerKill() { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) return nil } diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index c7fbfe56a2746..f7ce1e578cb8a 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -37,7 +37,6 @@ import ( "runtime/debug" "strings" "sync" - "syscall" "time" "unsafe" @@ -168,16 +167,7 @@ func (node *QueryNode) Register() error { metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryNodeRole).Inc() node.session.LivenessCheck(node.ctx, func() { log.Error("Query Node disconnected from etcd, process will exit", zap.Int64("Server Id", paramtable.GetNodeID())) - if err := node.Stop(); err != nil { - log.Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryNodeRole).Dec() - // manually send signal to starter goroutine - if node.session.IsTriggerKill() { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) return nil } @@ -418,9 +408,7 @@ func (node *QueryNode) Stop() error { log.Warn("session fail to go stopping state", zap.Error(err)) } else { metrics.StoppingBalanceNodeNum.WithLabelValues().Set(1) - timeoutCh := time.After(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.GetAsDuration(time.Second)) - outer: for (node.manager != nil && !node.manager.Segment.Empty()) || (node.pipelineManager != nil && node.pipelineManager.Num() != 0) { var ( @@ -436,25 +424,22 @@ func (node *QueryNode) Stop() error { channelNum = node.pipelineManager.Num() } - select { - case <-timeoutCh: - log.Warn("migrate data timed out", zap.Int64("ServerID", paramtable.GetNodeID()), - zap.Int64s("sealedSegments", lo.Map(sealedSegments, func(s segments.Segment, i int) int64 { - return s.ID() - })), - zap.Int64s("growingSegments", lo.Map(growingSegments, func(t segments.Segment, i int) int64 { - return t.ID() - })), - zap.Int("channelNum", channelNum), - ) - break outer - - case <-time.After(time.Second): - metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(len(sealedSegments))) - metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(channelNum)) - } + log.Info("migrate data...", zap.Int64("ServerID", paramtable.GetNodeID()), + zap.Int64s("sealedSegments", lo.Map(sealedSegments, func(s segments.Segment, i int) int64 { + return s.ID() + })), + zap.Int64s("growingSegments", lo.Map(growingSegments, func(t segments.Segment, i int) int64 { + return t.ID() + })), + zap.Int("channelNum", channelNum), + ) + metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(len(sealedSegments))) + metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(channelNum)) + // Metrics is collected every 15 seconds or more. + <-time.After(5 * time.Second) } + log.Info("query node is empty, ready to stop", zap.Int64("ServerID", paramtable.GetNodeID())) metrics.StoppingBalanceNodeNum.WithLabelValues().Set(0) metrics.StoppingBalanceSegmentNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(0) metrics.StoppingBalanceChannelNum.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(0) diff --git a/internal/querynodev2/server_test.go b/internal/querynodev2/server_test.go index b6c8ed0409863..c7e589ab0d743 100644 --- a/internal/querynodev2/server_test.go +++ b/internal/querynodev2/server_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -214,8 +215,6 @@ func (suite *QueryNodeSuite) TestInit_QueryHook() { } func (suite *QueryNodeSuite) TestStop() { - paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "2") - suite.node.manager = segments.NewManager() mockSession := sessionutil.NewMockSession(suite.T()) mockSession.EXPECT().GoingStop().Return(nil) @@ -227,7 +226,20 @@ func (suite *QueryNodeSuite) TestStop() { segment, err := segments.NewSegment(context.Background(), collection, 100, 10, 1, "test_stop_channel", segments.SegmentTypeSealed, 1, nil, nil) suite.NoError(err) suite.node.manager.Segment.Put(segments.SegmentTypeSealed, segment) - err = suite.node.Stop() + future := conc.Go(func() (struct{}, error) { + return struct{}{}, suite.node.Stop() + }) + // Graceful stop, should wait for all segments to be released + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + select { + case <-future.Inner(): + suite.FailNow("stop should be blocked") + case <-ctx.Done(): + } + suite.node.manager.Segment.Clear() + + _, err = future.Await() suite.NoError(err) suite.True(suite.node.manager.Segment.Empty()) } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index d9fc66aa6a566..b0c0c58b5ed2e 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -22,7 +22,6 @@ import ( "math/rand" "os" "sync" - "syscall" "time" "github.com/cockroachdb/errors" @@ -280,16 +279,7 @@ func (c *Core) Register() error { log.Info("RootCoord Register Finished") c.session.LivenessCheck(c.ctx, func() { log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID)) - if err := c.Stop(); err != nil { - log.Fatal("failed to stop server", zap.Error(err)) - } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.RootCoordRole).Dec() - // manually send signal to starter goroutine - if c.session.TriggerKill { - if p, err := os.FindProcess(os.Getpid()); err == nil { - p.Signal(syscall.SIGINT) - } - } + os.Exit(1) }) return nil diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index be093f9e8a371..66f311b269ee6 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -34,7 +34,9 @@ const ( DefaultIndexSliceSize = 16 DefaultConsistencyLevelUsedInDelete = commonpb.ConsistencyLevel_Bounded DefaultGracefulTime = 5000 // ms - DefaultGracefulStopTimeout = 1800 // s + DefaultGracefulStopTimeout = 1800 // s, for node + DefaultProxyGracefulStopTimeout = 30 // s,for proxy + DefaultCoordGracefulStopTimeout = 5 // s,for coord DefaultHighPriorityThreadCoreCoefficient = 10 DefaultMiddlePriorityThreadCoreCoefficient = 5 DefaultLowPriorityThreadCoreCoefficient = 1 @@ -819,6 +821,7 @@ type rootCoordConfig struct { EnableActiveStandby ParamItem `refreshable:"false"` MaxDatabaseNum ParamItem `refreshable:"false"` MaxGeneralCapacity ParamItem `refreshable:"true"` + GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *rootCoordConfig) init(base *BaseTable) { @@ -913,6 +916,15 @@ func (p *rootCoordConfig) init(base *BaseTable) { }, } p.MaxGeneralCapacity.Init(base.mgr) + + p.GracefulStopTimeout = ParamItem{ + Key: "rootCoord.gracefulStopTimeout", + Version: "2.3.7", + DefaultValue: strconv.Itoa(DefaultCoordGracefulStopTimeout), + Doc: "seconds. force stop node without graceful stop", + Export: true, + } + p.GracefulStopTimeout.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -959,6 +971,8 @@ type proxyConfig struct { PartitionNameRegexp ParamItem `refreshable:"true"` AccessLog AccessLogConfig + + GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *proxyConfig) init(base *BaseTable) { @@ -1251,6 +1265,15 @@ please adjust in embedded Milvus: false`, Doc: "switch for whether proxy shall use partition name as regexp when searching", } p.PartitionNameRegexp.Init(base.mgr) + + p.GracefulStopTimeout = ParamItem{ + Key: "proxy.gracefulStopTimeout", + Version: "2.3.7", + DefaultValue: strconv.Itoa(DefaultProxyGracefulStopTimeout), + Doc: "seconds. force stop node without graceful stop", + Export: true, + } + p.GracefulStopTimeout.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -1314,6 +1337,7 @@ type queryCoordConfig struct { ObserverTaskParallel ParamItem `refreshable:"false"` CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"` CheckNodeSessionInterval ParamItem `refreshable:"false"` + GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -1702,6 +1726,15 @@ func (p *queryCoordConfig) init(base *BaseTable) { Export: true, } p.HeartBeatWarningLag.Init(base.mgr) + + p.GracefulStopTimeout = ParamItem{ + Key: "queryCoord.gracefulStopTimeout", + Version: "2.3.7", + DefaultValue: strconv.Itoa(DefaultCoordGracefulStopTimeout), + Doc: "seconds. force stop node without graceful stop", + Export: true, + } + p.GracefulStopTimeout.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -2243,6 +2276,8 @@ type dataCoordConfig struct { // auto balance channel on datanode AutoBalance ParamItem `refreshable:"true"` CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"` + + GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *dataCoordConfig) init(base *BaseTable) { @@ -2637,6 +2672,15 @@ During compaction, the size of segment # of rows is able to exceed segment max # Export: true, } p.AutoUpgradeSegmentIndex.Init(base.mgr) + + p.GracefulStopTimeout = ParamItem{ + Key: "dataCoord.gracefulStopTimeout", + Version: "2.3.7", + DefaultValue: strconv.Itoa(DefaultCoordGracefulStopTimeout), + Doc: "seconds. force stop node without graceful stop", + Export: true, + } + p.GracefulStopTimeout.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -2687,6 +2731,8 @@ type dataNodeConfig struct { ChannelWorkPoolSize ParamItem `refreshable:"true"` UpdateChannelCheckpointMaxParallel ParamItem `refreshable:"true"` + + GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *dataNodeConfig) init(base *BaseTable) { @@ -2903,6 +2949,15 @@ func (p *dataNodeConfig) init(base *BaseTable) { DefaultValue: "1000", } p.UpdateChannelCheckpointMaxParallel.Init(base.mgr) + + p.GracefulStopTimeout = ParamItem{ + Key: "datanode.gracefulStopTimeout", + Version: "2.3.7", + DefaultValue: strconv.Itoa(DefaultGracefulStopTimeout), + Doc: "seconds. force stop node without graceful stop", + Export: true, + } + p.GracefulStopTimeout.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -2914,7 +2969,7 @@ type indexNodeConfig struct { DiskCapacityLimit ParamItem `refreshable:"true"` MaxDiskUsagePercentage ParamItem `refreshable:"true"` - GracefulStopTimeout ParamItem `refreshable:"false"` + GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *indexNodeConfig) init(base *BaseTable) { @@ -2969,6 +3024,7 @@ func (p *indexNodeConfig) init(base *BaseTable) { Key: "indexNode.gracefulStopTimeout", Version: "2.2.1", FallbackKeys: []string{"common.gracefulStopTimeout"}, + Doc: "seconds. force stop node without graceful stop", Export: true, } p.GracefulStopTimeout.Init(base.mgr) diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index e5eb4fe0d6783..e6506624e96ad 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -125,6 +125,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, Params.EnableActiveStandby.GetAsBool(), false) t.Logf("rootCoord EnableActiveStandby = %t", Params.EnableActiveStandby.GetAsBool()) + params.Save("rootCoord.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) + SetCreateTime(time.Now()) SetUpdateTime(time.Now()) }) @@ -167,6 +170,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, Params.CostMetricsExpireTime.GetAsInt(), 1000) assert.Equal(t, Params.RetryTimesOnReplica.GetAsInt(), 2) assert.EqualValues(t, Params.HealthCheckTimeout.GetAsInt64(), 3000) + + params.Save("proxy.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) // t.Run("test proxyConfig panic", func(t *testing.T) { @@ -285,6 +291,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, true, Params.AutoBalance.GetAsBool()) assert.Equal(t, true, Params.AutoBalanceChannel.GetAsBool()) assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt()) + + params.Save("queryCoord.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) t.Run("test queryNodeConfig", func(t *testing.T) { @@ -350,6 +359,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(100), gracefulStopTimeout.GetAsInt64()) assert.Equal(t, false, Params.EnableWorkerSQCostMetrics.GetAsBool()) + + params.Save("querynode.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) t.Run("test dataCoordConfig", func(t *testing.T) { @@ -362,6 +374,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, true, Params.AutoBalance.GetAsBool()) assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt()) assert.Equal(t, false, Params.AutoUpgradeSegmentIndex.GetAsBool()) + + params.Save("datacoord.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) t.Run("test dataNodeConfig", func(t *testing.T) { @@ -408,12 +423,18 @@ func TestComponentParam(t *testing.T) { updateChannelCheckpointMaxParallel := Params.UpdateChannelCheckpointMaxParallel.GetAsInt() t.Logf("updateChannelCheckpointMaxParallel: %d", updateChannelCheckpointMaxParallel) assert.Equal(t, 1000, Params.UpdateChannelCheckpointMaxParallel.GetAsInt()) + + params.Save("datanode.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) t.Run("test indexNodeConfig", func(t *testing.T) { Params := ¶ms.IndexNodeCfg params.Save(Params.GracefulStopTimeout.Key, "50") assert.Equal(t, Params.GracefulStopTimeout.GetAsInt64(), int64(50)) + + params.Save("indexnode.gracefulStopTimeout", "100") + assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) t.Run("channel config priority", func(t *testing.T) {