Skip to content

Commit

Permalink
start milvus as child process
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Sep 27, 2023
1 parent b80a3e1 commit 69588bf
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 86 deletions.
23 changes: 22 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,32 @@
package main

import (
"fmt"
"log"
"os"
"os/exec"
"path/filepath"

"github.com/milvus-io/milvus/cmd/milvus"
)

func main() {
milvus.RunMilvus(os.Args)
if os.Getppid() == 1 {
filePath, _ := filepath.Abs(os.Args[0])
cmd := exec.Command(filePath, os.Args[1:]...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

fmt.Println("try to run milvus as child progress")
if err := cmd.Run(); err != nil {
fmt.Println("milvus exit code", cmd.ProcessState.ExitCode())
log.Println("milvus fail reason:", err.Error())
}
} else {
milvus.RunMilvus(os.Args)
}

// try to clean component session after milvus core exit
milvus.Clean(os.Args)
}
93 changes: 93 additions & 0 deletions cmd/milvus/clean.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package milvus

import (
"context"
"flag"
"fmt"
"io"
"os"
"path"

"github.com/milvus-io/milvus/cmd/roles"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

const (
cleanCmd = "clean"
)

type clean struct {
serverType string
svrAlias string
}

func (c *clean) getHelp() string {
return cleanLine + "\n" + serverTypeLine
}

func (c *clean) execute(args []string, flags *flag.FlagSet) {
if len(args) < 3 {
fmt.Fprintln(os.Stderr, c.getHelp())
return
}
flags.Usage = func() {
fmt.Fprintln(os.Stderr, c.getHelp())
}
c.serverType = args[2]
if !typeutil.ServerTypeSet().Contain(c.serverType) {
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n", c.serverType)
os.Exit(-1)
}
c.formatFlags(args, flags)

err := c.cleanSession(context.Background(), flags)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
}
}

func (c *clean) cleanSession(ctx context.Context, flags *flag.FlagSet) error {
runtimeDir := createRuntimeDir(c.serverType)
filename := getPidFileName(c.serverType, c.svrAlias)

serverID, err := c.getServerID(runtimeDir, filename)
if err != nil {
return err
}

role := roles.NewMilvusRoles()
err = role.Init(c.serverType, flags)
if err != nil {
fmt.Fprintf(os.Stderr, "%s\n%s", err.Error(), c.getHelp())
os.Exit(-1)
}
return role.Clean(ctx, int64(serverID))
}

func (c *clean) formatFlags(args []string, flags *flag.FlagSet) {
flags.StringVar(&(c.svrAlias), "alias", "", "set alias")
if c.serverType == typeutil.EmbeddedRole {
flags.SetOutput(io.Discard)
}
hardware.InitMaxprocs(c.serverType, flags)
if err := flags.Parse(args[3:]); err != nil {
os.Exit(-1)
}
}

func (c *clean) getServerID(runtimeDir string, filename string) (int, error) {
var sid int

fd, err := os.OpenFile(path.Join(runtimeDir, filename), os.O_RDONLY, 0o664)
if err != nil {
return 0, err
}
defer closePidFile(fd)

if _, err = fmt.Fscanf(fd, "%d", &sid); err != nil {
return 0, err
}

return sid, nil
}
10 changes: 9 additions & 1 deletion cmd/milvus/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

var (
usageLine = fmt.Sprintf("Usage:\n"+
"%s\n%s\n%s\n%s\n", runLine, stopLine, mckLine, serverTypeLine)
"%s\n%s\n%s\n%s\n", runLine, stopLine, mckLine, serverTypeLine, cleanLine)

serverTypeLine = `
[server type]
Expand Down Expand Up @@ -63,4 +63,12 @@ milvus mck cleanTrash [flags]
Clean the back inconsistent data
Tips: The flags is the same as its of the 'milvus mck [flags]'
`

cleanLine = `
milvus clean [server type] [flags]
clean resource after milvus exit
[flags]
-alias ''
Set alias
`
)
10 changes: 10 additions & 0 deletions cmd/milvus/milvus.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,13 @@ func RunMilvus(args []string) {

c.execute(args, flags)
}

// clean milvus session after milvus exit
func Clean(args []string) {
flags := flag.NewFlagSet(args[0], flag.ExitOnError)
flags.Usage = func() {
fmt.Fprintln(os.Stderr, usageLine)
}
cmd := &clean{}
cmd.execute(args, flags)
}
64 changes: 5 additions & 59 deletions cmd/milvus/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,7 @@ const (
type run struct {
serverType string
// flags
svrAlias string
enableRootCoord bool
enableQueryCoord bool
enableDataCoord bool
enableIndexCoord bool
enableQueryNode bool
enableDataNode bool
enableIndexNode bool
enableProxy bool
svrAlias string
}

func (c *run) getHelp() string {
Expand All @@ -55,46 +47,10 @@ func (c *run) execute(args []string, flags *flag.FlagSet) {
signal.Ignore(syscall.SIGPIPE)

role := roles.NewMilvusRoles()
role.Local = false
switch c.serverType {
case typeutil.RootCoordRole:
role.EnableRootCoord = true
case typeutil.ProxyRole:
role.EnableProxy = true
case typeutil.QueryCoordRole:
role.EnableQueryCoord = true
case typeutil.QueryNodeRole:
role.EnableQueryNode = true
case typeutil.DataCoordRole:
role.EnableDataCoord = true
case typeutil.DataNodeRole:
role.EnableDataNode = true
case typeutil.IndexCoordRole:
role.EnableIndexCoord = true
case typeutil.IndexNodeRole:
role.EnableIndexNode = true
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
role.EnableRootCoord = true
role.EnableProxy = true
role.EnableQueryCoord = true
role.EnableQueryNode = true
role.EnableDataCoord = true
role.EnableDataNode = true
role.EnableIndexCoord = true
role.EnableIndexNode = true
role.Local = true
role.Embedded = c.serverType == typeutil.EmbeddedRole
case roleMixture:
role.EnableRootCoord = c.enableRootCoord
role.EnableQueryCoord = c.enableQueryCoord
role.EnableDataCoord = c.enableDataCoord
role.EnableIndexCoord = c.enableIndexCoord
role.EnableQueryNode = c.enableQueryNode
role.EnableDataNode = c.enableDataNode
role.EnableIndexNode = c.enableIndexNode
role.EnableProxy = c.enableProxy
default:
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", c.serverType, c.getHelp())
// init roles by serverType and flags
err := role.Init(c.serverType, flags)
if err != nil {
fmt.Fprintf(os.Stderr, "%s\n%s", err.Error(), c.getHelp())
os.Exit(-1)
}

Expand All @@ -114,16 +70,6 @@ func (c *run) execute(args []string, flags *flag.FlagSet) {
func (c *run) formatFlags(args []string, flags *flag.FlagSet) {
flags.StringVar(&c.svrAlias, "alias", "", "set alias")

flags.BoolVar(&c.enableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator")
flags.BoolVar(&c.enableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator")
flags.BoolVar(&c.enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator")
flags.BoolVar(&c.enableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator")

flags.BoolVar(&c.enableQueryNode, typeutil.QueryNodeRole, false, "enable query node")
flags.BoolVar(&c.enableDataNode, typeutil.DataNodeRole, false, "enable data node")
flags.BoolVar(&c.enableIndexNode, typeutil.IndexNodeRole, false, "enable index node")
flags.BoolVar(&c.enableProxy, typeutil.ProxyRole, false, "enable proxy node")

if c.serverType == typeutil.EmbeddedRole {
flags.SetOutput(io.Discard)
}
Expand Down
104 changes: 104 additions & 0 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package roles

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
Expand All @@ -37,6 +38,7 @@ import (
rocksmqimpl "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/internal/util/dependency"
internalmetrics "github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/tracer"
Expand Down Expand Up @@ -151,6 +153,53 @@ func NewMilvusRoles() *MilvusRoles {
return mr
}

func (mr *MilvusRoles) Init(serverType string, flags *flag.FlagSet) error {
mr.Local = false
switch serverType {
case typeutil.RootCoordRole:
mr.EnableRootCoord = true
case typeutil.ProxyRole:
mr.EnableProxy = true
case typeutil.QueryCoordRole:
mr.EnableQueryCoord = true
case typeutil.QueryNodeRole:
mr.EnableQueryNode = true
case typeutil.DataCoordRole:
mr.EnableDataCoord = true
case typeutil.DataNodeRole:
mr.EnableDataNode = true
case typeutil.IndexCoordRole:
mr.EnableIndexCoord = true
case typeutil.IndexNodeRole:
mr.EnableIndexNode = true
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
mr.EnableRootCoord = true
mr.EnableProxy = true
mr.EnableQueryCoord = true
mr.EnableQueryNode = true
mr.EnableDataCoord = true
mr.EnableDataNode = true
mr.EnableIndexCoord = true
mr.EnableIndexNode = true
mr.Local = true
mr.Embedded = serverType == typeutil.EmbeddedRole
case typeutil.RoleMixture:
flags.BoolVar(&mr.EnableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator")
flags.BoolVar(&mr.EnableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator")
flags.BoolVar(&mr.EnableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator")
flags.BoolVar(&mr.EnableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator")

flags.BoolVar(&mr.EnableQueryNode, typeutil.QueryNodeRole, false, "enable query node")
flags.BoolVar(&mr.EnableDataNode, typeutil.DataNodeRole, false, "enable data node")
flags.BoolVar(&mr.EnableIndexNode, typeutil.IndexNodeRole, false, "enable index node")
flags.BoolVar(&mr.EnableProxy, typeutil.ProxyRole, false, "enable proxy node")

default:
return fmt.Errorf("Unknown server type = %s", serverType)
}
return nil
}

// EnvValue not used now.
func (mr *MilvusRoles) EnvValue(env string) bool {
env = strings.ToLower(env)
Expand Down Expand Up @@ -424,3 +473,58 @@ func (mr *MilvusRoles) Run(alias string) {

log.Info("Milvus components graceful stop done")
}

// clean Milvus resource after exit, such as session
func (mr *MilvusRoles) Clean(ctx context.Context, serverID int64) error {
params := paramtable.Get()
etcdConfig := &params.EtcdCfg

etcdCli, err := etcd.GetEtcdClient(
etcdConfig.UseEmbedEtcd.GetAsBool(),
etcdConfig.EtcdUseSSL.GetAsBool(),
etcdConfig.Endpoints.GetAsStrings(),
etcdConfig.EtcdTLSCert.GetValue(),
etcdConfig.EtcdTLSKey.GetValue(),
etcdConfig.EtcdTLSCACert.GetValue(),
etcdConfig.EtcdTLSMinVersion.GetValue())
if err != nil {
log.Debug("QueryCoord connect to etcd failed", zap.Error(err))
return err
}
defer etcdCli.Close()

if mr.EnableRootCoord {
err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.RootCoordRole, serverID, sessionutil.ServerIDMatchFilter)
}

if mr.EnableProxy {
err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.ProxyRole, serverID, sessionutil.ServerIDMatchFilter)
}

if mr.EnableQueryCoord {
err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.QueryCoordRole, serverID, sessionutil.ServerIDMatchFilter)
}

if mr.EnableQueryNode {
err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.QueryNodeRole, serverID, sessionutil.ServerIDMatchFilter)
}

if mr.EnableDataCoord {
err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.DataCoordRole, serverID, sessionutil.ServerIDMatchFilter)
}

if mr.EnableDataNode {
err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.DataNodeRole, serverID, sessionutil.ServerIDMatchFilter)
}

if mr.EnableIndexCoord {
err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.IndexCoordRole, serverID, sessionutil.ServerIDMatchFilter)
}

if mr.EnableIndexNode {
err = sessionutil.ForceDeleteSession(ctx, etcdCli, typeutil.IndexNodeRole, serverID, sessionutil.ServerIDMatchFilter)
}

log.Info("Finish to clean milvus occupied resource after exit")
return err
}
Loading

0 comments on commit 69588bf

Please sign in to comment.