diff --git a/cmd/main.go b/cmd/main.go index 875126af32c33..c50ff8613339c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -17,11 +17,32 @@ package main import ( + "fmt" + "log" "os" + "os/exec" + "path/filepath" "github.com/milvus-io/milvus/cmd/milvus" ) func main() { - milvus.RunMilvus(os.Args) + if os.Getppid() == 1 { + filePath, _ := filepath.Abs(os.Args[0]) + cmd := exec.Command(filePath, os.Args[1:]...) + cmd.Stdin = os.Stdin + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + fmt.Println("try to run milvus as child progress") + if err := cmd.Run(); err != nil { + fmt.Println("milvus exit code", cmd.ProcessState.ExitCode()) + log.Println("milvus fail reason:", err.Error()) + } + } else { + milvus.RunMilvus(os.Args) + } + + // try to clean component session after milvus core exit + milvus.Clean(os.Args) } diff --git a/cmd/milvus/clean.go b/cmd/milvus/clean.go new file mode 100644 index 0000000000000..c4666782faa49 --- /dev/null +++ b/cmd/milvus/clean.go @@ -0,0 +1,93 @@ +package milvus + +import ( + "context" + "flag" + "fmt" + "io" + "os" + "path" + + "github.com/milvus-io/milvus/cmd/roles" + "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +const ( + cleanCmd = "clean" +) + +type clean struct { + serverType string + svrAlias string +} + +func (c *clean) getHelp() string { + return cleanLine + "\n" + serverTypeLine +} + +func (c *clean) execute(args []string, flags *flag.FlagSet) { + if len(args) < 3 { + fmt.Fprintln(os.Stderr, c.getHelp()) + return + } + flags.Usage = func() { + fmt.Fprintln(os.Stderr, c.getHelp()) + } + c.serverType = args[2] + if !typeutil.ServerTypeSet().Contain(c.serverType) { + fmt.Fprintf(os.Stderr, "Unknown server type = %s\n", c.serverType) + os.Exit(-1) + } + c.formatFlags(args, flags) + + err := c.cleanSession(context.Background(), flags) + if err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + } +} + +func (c *clean) cleanSession(ctx context.Context, flags *flag.FlagSet) error { + runtimeDir := createRuntimeDir(c.serverType) + filename := getPidFileName(c.serverType, c.svrAlias) + + serverID, err := c.getServerID(runtimeDir, filename) + if err != nil { + return err + } + + role := roles.NewMilvusRoles() + err = role.Init(c.serverType, flags) + if err != nil { + fmt.Fprintf(os.Stderr, "%s\n%s", err.Error(), c.getHelp()) + os.Exit(-1) + } + return role.Clean(ctx, int64(serverID)) +} + +func (c *clean) formatFlags(args []string, flags *flag.FlagSet) { + flags.StringVar(&(c.svrAlias), "alias", "", "set alias") + if c.serverType == typeutil.EmbeddedRole { + flags.SetOutput(io.Discard) + } + hardware.InitMaxprocs(c.serverType, flags) + if err := flags.Parse(args[3:]); err != nil { + os.Exit(-1) + } +} + +func (c *clean) getServerID(runtimeDir string, filename string) (int, error) { + var sid int + + fd, err := os.OpenFile(path.Join(runtimeDir, filename), os.O_RDONLY, 0o664) + if err != nil { + return 0, err + } + defer closePidFile(fd) + + if _, err = fmt.Fscanf(fd, "%d", &sid); err != nil { + return 0, err + } + + return sid, nil +} diff --git a/cmd/milvus/help.go b/cmd/milvus/help.go index c967b8228865b..0bc8680e714b4 100644 --- a/cmd/milvus/help.go +++ b/cmd/milvus/help.go @@ -9,7 +9,7 @@ import ( var ( usageLine = fmt.Sprintf("Usage:\n"+ - "%s\n%s\n%s\n%s\n", runLine, stopLine, mckLine, serverTypeLine) + "%s\n%s\n%s\n%s\n", runLine, stopLine, mckLine, serverTypeLine, cleanLine) serverTypeLine = ` [server type] @@ -63,4 +63,12 @@ milvus mck cleanTrash [flags] Clean the back inconsistent data Tips: The flags is the same as its of the 'milvus mck [flags]' ` + + cleanLine = ` +milvus clean [server type] [flags] + clean resource after milvus exit +[flags] + -alias '' + Set alias +` ) diff --git a/cmd/milvus/milvus.go b/cmd/milvus/milvus.go index 8d3cfa1d8dc8a..c8aa22d162027 100644 --- a/cmd/milvus/milvus.go +++ b/cmd/milvus/milvus.go @@ -59,3 +59,13 @@ func RunMilvus(args []string) { c.execute(args, flags) } + +// clean milvus session after milvus exit +func Clean(args []string) { + flags := flag.NewFlagSet(args[0], flag.ExitOnError) + flags.Usage = func() { + fmt.Fprintln(os.Stderr, usageLine) + } + cmd := &clean{} + cmd.execute(args, flags) +} diff --git a/cmd/milvus/run.go b/cmd/milvus/run.go index b898e2d1c1a8e..af8b92746b1cb 100644 --- a/cmd/milvus/run.go +++ b/cmd/milvus/run.go @@ -25,15 +25,7 @@ const ( type run struct { serverType string // flags - svrAlias string - enableRootCoord bool - enableQueryCoord bool - enableDataCoord bool - enableIndexCoord bool - enableQueryNode bool - enableDataNode bool - enableIndexNode bool - enableProxy bool + svrAlias string } func (c *run) getHelp() string { @@ -55,46 +47,10 @@ func (c *run) execute(args []string, flags *flag.FlagSet) { signal.Ignore(syscall.SIGPIPE) role := roles.NewMilvusRoles() - role.Local = false - switch c.serverType { - case typeutil.RootCoordRole: - role.EnableRootCoord = true - case typeutil.ProxyRole: - role.EnableProxy = true - case typeutil.QueryCoordRole: - role.EnableQueryCoord = true - case typeutil.QueryNodeRole: - role.EnableQueryNode = true - case typeutil.DataCoordRole: - role.EnableDataCoord = true - case typeutil.DataNodeRole: - role.EnableDataNode = true - case typeutil.IndexCoordRole: - role.EnableIndexCoord = true - case typeutil.IndexNodeRole: - role.EnableIndexNode = true - case typeutil.StandaloneRole, typeutil.EmbeddedRole: - role.EnableRootCoord = true - role.EnableProxy = true - role.EnableQueryCoord = true - role.EnableQueryNode = true - role.EnableDataCoord = true - role.EnableDataNode = true - role.EnableIndexCoord = true - role.EnableIndexNode = true - role.Local = true - role.Embedded = c.serverType == typeutil.EmbeddedRole - case roleMixture: - role.EnableRootCoord = c.enableRootCoord - role.EnableQueryCoord = c.enableQueryCoord - role.EnableDataCoord = c.enableDataCoord - role.EnableIndexCoord = c.enableIndexCoord - role.EnableQueryNode = c.enableQueryNode - role.EnableDataNode = c.enableDataNode - role.EnableIndexNode = c.enableIndexNode - role.EnableProxy = c.enableProxy - default: - fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", c.serverType, c.getHelp()) + // init roles by serverType and flags + err := role.Init(c.serverType, flags) + if err != nil { + fmt.Fprintf(os.Stderr, "%s\n%s", err.Error(), c.getHelp()) os.Exit(-1) } @@ -114,16 +70,6 @@ func (c *run) execute(args []string, flags *flag.FlagSet) { func (c *run) formatFlags(args []string, flags *flag.FlagSet) { flags.StringVar(&c.svrAlias, "alias", "", "set alias") - flags.BoolVar(&c.enableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator") - flags.BoolVar(&c.enableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator") - flags.BoolVar(&c.enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator") - flags.BoolVar(&c.enableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator") - - flags.BoolVar(&c.enableQueryNode, typeutil.QueryNodeRole, false, "enable query node") - flags.BoolVar(&c.enableDataNode, typeutil.DataNodeRole, false, "enable data node") - flags.BoolVar(&c.enableIndexNode, typeutil.IndexNodeRole, false, "enable index node") - flags.BoolVar(&c.enableProxy, typeutil.ProxyRole, false, "enable proxy node") - if c.serverType == typeutil.EmbeddedRole { flags.SetOutput(io.Discard) } diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 4b550a716930b..79bb62083b04f 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -18,6 +18,7 @@ package roles import ( "context" + "flag" "fmt" "os" "os/signal" @@ -37,6 +38,7 @@ import ( rocksmqimpl "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server" "github.com/milvus-io/milvus/internal/util/dependency" internalmetrics "github.com/milvus-io/milvus/internal/util/metrics" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/tracer" @@ -151,6 +153,53 @@ func NewMilvusRoles() *MilvusRoles { return mr } +func (mr *MilvusRoles) Init(serverType string, flags *flag.FlagSet) error { + mr.Local = false + switch serverType { + case typeutil.RootCoordRole: + mr.EnableRootCoord = true + case typeutil.ProxyRole: + mr.EnableProxy = true + case typeutil.QueryCoordRole: + mr.EnableQueryCoord = true + case typeutil.QueryNodeRole: + mr.EnableQueryNode = true + case typeutil.DataCoordRole: + mr.EnableDataCoord = true + case typeutil.DataNodeRole: + mr.EnableDataNode = true + case typeutil.IndexCoordRole: + mr.EnableIndexCoord = true + case typeutil.IndexNodeRole: + mr.EnableIndexNode = true + case typeutil.StandaloneRole, typeutil.EmbeddedRole: + mr.EnableRootCoord = true + mr.EnableProxy = true + mr.EnableQueryCoord = true + mr.EnableQueryNode = true + mr.EnableDataCoord = true + mr.EnableDataNode = true + mr.EnableIndexCoord = true + mr.EnableIndexNode = true + mr.Local = true + mr.Embedded = serverType == typeutil.EmbeddedRole + case typeutil.RoleMixture: + flags.BoolVar(&mr.EnableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator") + flags.BoolVar(&mr.EnableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator") + flags.BoolVar(&mr.EnableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator") + flags.BoolVar(&mr.EnableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator") + + flags.BoolVar(&mr.EnableQueryNode, typeutil.QueryNodeRole, false, "enable query node") + flags.BoolVar(&mr.EnableDataNode, typeutil.DataNodeRole, false, "enable data node") + flags.BoolVar(&mr.EnableIndexNode, typeutil.IndexNodeRole, false, "enable index node") + flags.BoolVar(&mr.EnableProxy, typeutil.ProxyRole, false, "enable proxy node") + + default: + return fmt.Errorf("Unknown server type = %s", serverType) + } + return nil +} + // EnvValue not used now. func (mr *MilvusRoles) EnvValue(env string) bool { env = strings.ToLower(env) @@ -424,3 +473,58 @@ func (mr *MilvusRoles) Run(alias string) { log.Info("Milvus components graceful stop done") } + +// clean Milvus resource after exit, such as session +func (mr *MilvusRoles) Clean(ctx context.Context, serverID int64) error { + params := paramtable.Get() + etcdConfig := ¶ms.EtcdCfg + + etcdCli, err := etcd.GetEtcdClient( + etcdConfig.UseEmbedEtcd.GetAsBool(), + etcdConfig.EtcdUseSSL.GetAsBool(), + etcdConfig.Endpoints.GetAsStrings(), + etcdConfig.EtcdTLSCert.GetValue(), + etcdConfig.EtcdTLSKey.GetValue(), + etcdConfig.EtcdTLSCACert.GetValue(), + etcdConfig.EtcdTLSMinVersion.GetValue()) + if err != nil { + log.Debug("QueryCoord connect to etcd failed", zap.Error(err)) + return err + } + defer etcdCli.Close() + + if mr.EnableRootCoord { + err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.RootCoordRole, serverID, sessionutil.ServerIDMatchFilter) + } + + if mr.EnableProxy { + err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.ProxyRole, serverID, sessionutil.ServerIDMatchFilter) + } + + if mr.EnableQueryCoord { + err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.QueryCoordRole, serverID, sessionutil.ServerIDMatchFilter) + } + + if mr.EnableQueryNode { + err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.QueryNodeRole, serverID, sessionutil.ServerIDMatchFilter) + } + + if mr.EnableDataCoord { + err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.DataCoordRole, serverID, sessionutil.ServerIDMatchFilter) + } + + if mr.EnableDataNode { + err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.DataNodeRole, serverID, sessionutil.ServerIDMatchFilter) + } + + if mr.EnableIndexCoord { + err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.IndexCoordRole, serverID, sessionutil.ServerIDMatchFilter) + } + + if mr.EnableIndexNode { + err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.IndexNodeRole, serverID, sessionutil.ServerIDMatchFilter) + } + + log.Info("Finish to clean milvus occupied resource after exit") + return err +} diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 9d83aaa1a954a..4fb7f98c73840 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -574,9 +574,13 @@ func fnWithTimeout(fn func() error, d time.Duration) error { // GetSessions will get all sessions registered in etcd. // Revision is returned for WatchServices to prevent key events from being missed. func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error) { - res := make(map[string]*Session) key := path.Join(s.metaRoot, DefaultServiceRoot, prefix) - resp, err := s.etcdCli.Get(s.ctx, key, clientv3.WithPrefix(), + return GetSessions(s.ctx, s.etcdCli, key) +} + +func GetSessions(ctx context.Context, etcdCli *clientv3.Client, sessionKeyPrefix string) (map[string]*Session, int64, error) { + res := make(map[string]*Session) + resp, err := etcdCli.Get(ctx, sessionKeyPrefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { return nil, 0, err @@ -589,7 +593,7 @@ func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error) } _, mapKey := path.Split(string(kv.Key)) log.Debug("SessionUtil GetSessions", - zap.String("prefix", prefix), + zap.String("prefix", sessionKeyPrefix), zap.String("key", mapKey), zap.String("address", session.Address)) res[mapKey] = session @@ -1078,28 +1082,7 @@ func (s *Session) ForceActiveStandby(activateFunc func() error) error { } // try to release old session first - sessions, _, err := s.GetSessions(s.ServerName) - if err != nil { - return err - } - - if len(sessions) != 0 { - activeSess := sessions[s.ServerName] - if activeSess == nil || activeSess.leaseID == nil { - // force delete all old sessions - s.etcdCli.Delete(s.ctx, s.activeKey) - for _, sess := range sessions { - if sess.ServerID != s.ServerID { - sess.getCompleteKey() - key := path.Join(s.metaRoot, DefaultServiceRoot, fmt.Sprintf("%s-%d", sess.ServerName, sess.ServerID)) - s.etcdCli.Delete(s.ctx, key) - } - } - } else { - // force release old active session - _, _ = s.etcdCli.Revoke(s.ctx, *activeSess.leaseID) - } - } + ForceDeleteSession(s.ctx, s.etcdCli, s.ServerName, s.ServerID, ServerIDNotMatchFilter) // then try to register as active resp, err := s.etcdCli.Txn(s.ctx).If( @@ -1131,3 +1114,41 @@ func (s *Session) ForceActiveStandby(activateFunc func() error) error { } return nil } + +type ServerIDFilter = func(sess *Session, targetServerID int64) bool + +func ServerIDMatchFilter(sess *Session, targetServerID int64) bool { + return sess.ServerID == targetServerID +} + +func ServerIDNotMatchFilter(sess *Session, targetServerID int64) bool { + return sess.ServerID != targetServerID +} + +func ForceDeleteSession(ctx context.Context, etcdCli *clientv3.Client, serverName string, serverID int64, filter ServerIDFilter) error { + metaRoot := paramtable.Get().EtcdCfg.MetaRootPath.GetValue() + sessionKeyPrefix := path.Join(metaRoot, DefaultServiceRoot, serverName) + sessions, _, err := GetSessions(ctx, etcdCli, sessionKeyPrefix) + if err != nil { + return err + } + + if len(sessions) != 0 { + activeSess := sessions[serverName] + if activeSess == nil || activeSess.leaseID == nil { + // try to delete active session if exist, such as querycoord/datacoord/rootcoord + etcdCli.Delete(ctx, sessionKeyPrefix) + for _, sess := range sessions { + // delete all session which match the ServerIDFilter + if filter(sess, serverID) { + key := path.Join(metaRoot, DefaultServiceRoot, fmt.Sprintf("%s-%d", sess.ServerName, sess.ServerID)) + etcdCli.Delete(ctx, key) + } + } + } else { + // force release old active session + _, _ = etcdCli.Revoke(ctx, *activeSess.leaseID) + } + } + return nil +} diff --git a/pkg/util/typeutil/type.go b/pkg/util/typeutil/type.go index a60bb34094bce..52fd19de78111 100644 --- a/pkg/util/typeutil/type.go +++ b/pkg/util/typeutil/type.go @@ -46,6 +46,8 @@ const ( DataNodeRole = "datanode" // IndexNodeRole is a constant represent IndexNode IndexNodeRole = "indexnode" + // IndexNodeRole is for mixture Milvus + RoleMixture = "mixture" ) var (