From 9014044ddf62803ca836661d516a9eabe3cdc5aa Mon Sep 17 00:00:00 2001 From: Alex Gartner Date: Fri, 10 May 2024 14:01:04 -0700 Subject: [PATCH] feat: add zetaclientd-supervisor (#2113) * feat: add zetaclientd-supervisor * pass through args directly * clean shutdown * Add optional autodownload * remove old scripts * workround being blocked by #2135 * keep restart-zetaclientd.sh in orchestrator for e2e * lint fixes * feedback updates * autodownload optional and changelog * feedback --- Dockerfile-upgrade | 6 + Makefile | 1 + changelog.md | 1 + cmd/zetaclientd-supervisor/lib.go | 267 ++++++++++++++++++ cmd/zetaclientd-supervisor/main.go | 95 +++++++ contrib/localnet/orchestrator/Dockerfile | 1 - .../orchestrator/Dockerfile.fastbuild | 2 - .../restart-zetaclientd-at-upgrade.sh | 50 ---- .../orchestrator/restart-zetaclientd.sh | 6 +- .../localnet/orchestrator/start-zetae2e.sh | 10 +- contrib/localnet/scripts/start-zetaclientd.sh | 4 +- 11 files changed, 382 insertions(+), 61 deletions(-) create mode 100644 cmd/zetaclientd-supervisor/lib.go create mode 100644 cmd/zetaclientd-supervisor/main.go delete mode 100644 contrib/localnet/orchestrator/restart-zetaclientd-at-upgrade.sh diff --git a/Dockerfile-upgrade b/Dockerfile-upgrade index 3d1406c1e0..8142f91297 100644 --- a/Dockerfile-upgrade +++ b/Dockerfile-upgrade @@ -44,6 +44,12 @@ RUN --mount=type=cache,target="/root/.cache/go-build" cd /go/delivery/zeta-node/ RUN cp $GOPATH/bin/zetacored $GOPATH/bin/new/ && \ cp $GOPATH/bin/zetaclientd $GOPATH/bin/new/ +COPY --from=oldbuild ${GOPATH}/bin/zetaclientd /root/.zetaclientd/upgrades/genesis/ +RUN mkdir -p /root/.zetaclientd/upgrades/${NEW_VERSION}/ && \ + cp ${GOPATH}/bin/zetaclientd /root/.zetaclientd/upgrades/${NEW_VERSION}/ +RUN ln -s /root/.zetaclientd/upgrades/genesis /root/.zetaclientd/upgrades/current +ENV PATH="/root/.zetaclientd/upgrades/current:${PATH}" + COPY --from=oldbuild $GOPATH/bin/zetacored $GOPATH/bin/zetaclientd $GOPATH/bin/ COPY --from=oldbuild $GOPATH/bin/zetacored $GOPATH/bin/zetaclientd $GOPATH/bin/old diff --git a/Makefile b/Makefile index fe2047e635..2a55c49a76 100644 --- a/Makefile +++ b/Makefile @@ -86,6 +86,7 @@ install: go.sum @echo "--> Installing zetacored & zetaclientd" @go install -mod=readonly $(BUILD_FLAGS) ./cmd/zetacored @go install -mod=readonly $(BUILD_FLAGS) ./cmd/zetaclientd + @go install -mod=readonly $(BUILD_FLAGS) ./cmd/zetaclientd-supervisor install-zetaclient: go.sum @echo "--> Installing zetaclientd" diff --git a/changelog.md b/changelog.md index 8eedfe2ac3..b293e6f464 100644 --- a/changelog.md +++ b/changelog.md @@ -13,6 +13,7 @@ * [2100](https://github.com/zeta-chain/node/pull/2100) - cosmos v0.47 upgrade * [2145](https://github.com/zeta-chain/node/pull/2145) - add `ibc` and `ibc-transfer` modules * [2135](https://github.com/zeta-chain/node/pull/2135) - add develop build version logic +* [2113](https://github.com/zeta-chain/node/pull/2113) - add zetaclientd-supervisor process ### Refactor diff --git a/cmd/zetaclientd-supervisor/lib.go b/cmd/zetaclientd-supervisor/lib.go new file mode 100644 index 0000000000..cb943b4557 --- /dev/null +++ b/cmd/zetaclientd-supervisor/lib.go @@ -0,0 +1,267 @@ +package main + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "path" + "runtime" + "strings" + "sync" + "time" + + "github.com/cosmos/cosmos-sdk/client/grpc/tmservice" + upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types" + "github.com/hashicorp/go-getter" + "github.com/rs/zerolog" + "github.com/zeta-chain/zetacore/zetaclient/config" + "google.golang.org/grpc" +) + +const zetaclientdBinaryName = "zetaclientd" + +var defaultUpgradesDir = os.ExpandEnv("$HOME/.zetaclientd/upgrades") + +// serializedWriter wraps an io.Writer and ensures that writes to it from multiple goroutines +// are serialized +type serializedWriter struct { + upstream io.Writer + lock sync.Mutex +} + +func (w *serializedWriter) Write(p []byte) (n int, err error) { + w.lock.Lock() + defer w.lock.Unlock() + + return w.upstream.Write(p) +} + +func getLogger(cfg config.Config, out io.Writer) zerolog.Logger { + var logger zerolog.Logger + switch cfg.LogFormat { + case "json": + logger = zerolog.New(out).Level(zerolog.Level(cfg.LogLevel)).With().Timestamp().Logger() + case "text": + logger = zerolog.New(zerolog.ConsoleWriter{Out: out, TimeFormat: time.RFC3339}).Level(zerolog.Level(cfg.LogLevel)).With().Timestamp().Logger() + default: + logger = zerolog.New(zerolog.ConsoleWriter{Out: out, TimeFormat: time.RFC3339}) + } + + return logger +} + +type zetaclientdSupervisor struct { + zetacoredConn *grpc.ClientConn + reloadSignals chan bool + logger zerolog.Logger + upgradesDir string + upgradePlanName string + enableAutoDownload bool +} + +func newZetaclientdSupervisor(zetaCoreURL string, logger zerolog.Logger, enableAutoDownload bool) (*zetaclientdSupervisor, error) { + logger = logger.With().Str("module", "zetaclientdSupervisor").Logger() + conn, err := grpc.Dial( + fmt.Sprintf("%s:9090", zetaCoreURL), + grpc.WithInsecure(), + ) + if err != nil { + return nil, fmt.Errorf("grpc dial: %w", err) + } + + return &zetaclientdSupervisor{ + zetacoredConn: conn, + logger: logger, + reloadSignals: make(chan bool, 1), + upgradesDir: defaultUpgradesDir, + enableAutoDownload: enableAutoDownload, + }, nil +} + +func (s *zetaclientdSupervisor) Start(ctx context.Context) { + go s.watchForVersionChanges(ctx) + go s.handleCoreUpgradePlan(ctx) +} + +func (s *zetaclientdSupervisor) WaitForReloadSignal(ctx context.Context) { + select { + case <-s.reloadSignals: + case <-ctx.Done(): + } +} + +func (s *zetaclientdSupervisor) dirForVersion(version string) string { + return path.Join(s.upgradesDir, version) +} + +func atomicSymlink(target, linkName string) error { + linkNameTmp := linkName + ".tmp" + _, err := os.Stat(target) + if err != nil { + return fmt.Errorf("stat target: %w", err) + } + err = os.Remove(linkNameTmp) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("remove old current tmp: %w", err) + } + err = os.Symlink(target, linkNameTmp) + if err != nil { + return fmt.Errorf("new symlink: %w", err) + } + err = os.Rename(linkNameTmp, linkName) + if err != nil { + return fmt.Errorf("rename symlink: %w", err) + } + return nil +} + +func (s *zetaclientdSupervisor) watchForVersionChanges(ctx context.Context) { + client := tmservice.NewServiceClient(s.zetacoredConn) + prevVersion := "" + for { + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return + } + res, err := client.GetNodeInfo(ctx, &tmservice.GetNodeInfoRequest{}) + if err != nil { + s.logger.Warn().Err(err).Msg("get node info") + continue + } + newVersion := res.ApplicationVersion.Version + if prevVersion == "" { + prevVersion = newVersion + } + if prevVersion == newVersion { + continue + } + s.logger.Warn().Msgf("core version change (%s -> %s)", prevVersion, newVersion) + + prevVersion = newVersion + + // TODO: just use newVersion when #2135 is merged + // even without #2135, the version will still change and trigger the update + newVersionDir := s.dirForVersion(s.upgradePlanName) + currentLinkPath := s.dirForVersion("current") + + err = atomicSymlink(newVersionDir, currentLinkPath) + if err != nil { + s.logger.Error().Err(err).Msgf("unable to update current symlink (%s -> %s)", newVersionDir, currentLinkPath) + return + } + s.reloadSignals <- true + } +} + +func (s *zetaclientdSupervisor) handleCoreUpgradePlan(ctx context.Context) { + client := upgradetypes.NewQueryClient(s.zetacoredConn) + + prevPlanName := "" + for { + // wait for either a second or context cancel + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return + } + + resp, err := client.CurrentPlan(ctx, &upgradetypes.QueryCurrentPlanRequest{}) + if err != nil { + s.logger.Warn().Err(err).Msg("get current upgrade plan") + continue + } + if resp.Plan == nil { + continue + } + plan := resp.Plan + if prevPlanName == plan.Name { + continue + } + s.logger.Warn().Msgf("got new upgrade plan (%s)", plan.Name) + prevPlanName = plan.Name + s.upgradePlanName = plan.Name + + if !s.enableAutoDownload { + s.logger.Warn().Msg("skipping autodownload because of configuration") + continue + } + err = s.downloadZetaclientd(ctx, plan) + if err != nil { + s.logger.Error().Err(err).Msg("downloadZetaclientd failed") + } + } +} + +// UpgradeConfig is expected format for the info field to allow auto-download +// this structure is copied from cosmosvisor +type upgradeConfig struct { + Binaries map[string]string `json:"binaries"` +} + +func (s *zetaclientdSupervisor) downloadZetaclientd(ctx context.Context, plan *upgradetypes.Plan) error { + if plan.Info == "" { + return errors.New("upgrade info empty") + } + var config upgradeConfig + err := json.Unmarshal([]byte(plan.Info), &config) + if err != nil { + return fmt.Errorf("unmarshal upgrade config: %w", err) + } + + s.logger.Info().Msg("downloading zetaclientd") + + binKey := fmt.Sprintf("%s-%s/%s", zetaclientdBinaryName, runtime.GOOS, runtime.GOARCH) + binURL, ok := config.Binaries[binKey] + if !ok { + return fmt.Errorf("no binary found for: %s", binKey) + } + upgradeDir := s.dirForVersion(plan.Name) + err = os.MkdirAll(upgradeDir, 0o750) + if err != nil { + return fmt.Errorf("mkdir %s: %w", upgradeDir, err) + } + upgradePath := path.Join(upgradeDir, zetaclientdBinaryName) + // TODO: retry? + // GetFile should validate checksum so long as it was provided in the url + err = getter.GetFile(upgradePath, binURL, getter.WithContext(ctx), getter.WithUmask(0o750)) + if err != nil { + return fmt.Errorf("get file %s: %w", binURL, err) + } + + // ensure binary is executable + info, err := os.Stat(upgradePath) + if err != nil { + return fmt.Errorf("stat binary: %w", err) + } + newMode := info.Mode().Perm() | 0o111 + err = os.Chmod(upgradePath, newMode) + if err != nil { + return fmt.Errorf("chmod %s: %w", upgradePath, err) + } + return nil +} + +func promptPasswords() (string, string, error) { + reader := bufio.NewReader(os.Stdin) + fmt.Print("HotKey Password: ") + hotKeyPass, err := reader.ReadString('\n') + if err != nil { + return "", "", err + } + fmt.Print("TSS Password: ") + tssKeyPass, err := reader.ReadString('\n') + if err != nil { + return "", "", err + } + + //trim delimiters + hotKeyPass = strings.TrimSuffix(hotKeyPass, "\n") + tssKeyPass = strings.TrimSuffix(tssKeyPass, "\n") + + return hotKeyPass, tssKeyPass, err +} diff --git a/cmd/zetaclientd-supervisor/main.go b/cmd/zetaclientd-supervisor/main.go new file mode 100644 index 0000000000..b8ce42ea5e --- /dev/null +++ b/cmd/zetaclientd-supervisor/main.go @@ -0,0 +1,95 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "os" + "os/exec" + "os/signal" + "syscall" + "time" + + "github.com/zeta-chain/zetacore/app" + "github.com/zeta-chain/zetacore/zetaclient/config" + "golang.org/x/sync/errgroup" +) + +func main() { + cfg, err := config.Load(app.DefaultNodeHome) + if err != nil { + panic(fmt.Errorf("failed to load config: %w", err)) + } + // log outputs must be serialized since we are writing log messages in this process and + // also directly from the zetaclient process + serializedStdout := &serializedWriter{upstream: os.Stdout} + logger := getLogger(cfg, serializedStdout) + logger = logger.With().Str("process", "zetaclientd-supervisor").Logger() + + ctx := context.Background() + + // these signals will result in the supervisor process shutting down + shutdownChan := make(chan os.Signal, 1) + signal.Notify(shutdownChan, syscall.SIGINT, syscall.SIGTERM) + + // these signals will result in the supervisor process only restarting zetaclientd + restartChan := make(chan os.Signal, 1) + signal.Notify(restartChan, syscall.SIGHUP) + + hotkeyPassword, tssPassword, err := promptPasswords() + if err != nil { + panic(fmt.Errorf("unable to get passwords: %w", err)) + } + + _, enableAutoDownload := os.LookupEnv("ZETACLIENTD_SUPERVISOR_ENABLE_AUTO_DOWNLOAD") + supervisor, err := newZetaclientdSupervisor(cfg.ZetaCoreURL, logger, enableAutoDownload) + if err != nil { + panic(fmt.Errorf("unable to get supervisor: %w", err)) + } + supervisor.Start(ctx) + + shouldRestart := true + for shouldRestart { + ctx, cancel := context.WithCancel(ctx) + // pass args from supervisor directly to zetaclientd + cmd := exec.CommandContext(ctx, zetaclientdBinaryName, os.Args[1:]...) // #nosec G204 + // by default, CommandContext sends SIGKILL. we want more graceful shutdown. + cmd.Cancel = func() error { + return cmd.Process.Signal(syscall.SIGINT) + } + cmd.Stdout = serializedStdout + cmd.Stderr = os.Stderr + // must reset the passwordInputBuffer every iteration because reads are stateful (seek to end) + passwordInputBuffer := bytes.Buffer{} + passwordInputBuffer.Write([]byte(hotkeyPassword + "\n" + tssPassword + "\n")) + cmd.Stdin = &passwordInputBuffer + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(cmd.Run) + eg.Go(func() error { + supervisor.WaitForReloadSignal(ctx) + cancel() + return nil + }) + eg.Go(func() error { + for { + select { + case <-ctx.Done(): + return nil + case sig := <-restartChan: + logger.Info().Msgf("got signal %d, sending SIGINT to zetaclientd", sig) + case sig := <-shutdownChan: + logger.Info().Msgf("got signal %d, shutting down", sig) + shouldRestart = false + } + cancel() + } + }) + err := eg.Wait() + if err != nil { + logger.Error().Err(err).Msg("error while waiting") + } + // prevent fast spin + time.Sleep(time.Second) + } +} diff --git a/contrib/localnet/orchestrator/Dockerfile b/contrib/localnet/orchestrator/Dockerfile index e268738ef2..e159d8ba84 100644 --- a/contrib/localnet/orchestrator/Dockerfile +++ b/contrib/localnet/orchestrator/Dockerfile @@ -13,7 +13,6 @@ COPY --from=zeta /root/.ssh/localtest.pem /root/.ssh/localtest.pem COPY contrib/localnet/orchestrator/start-zetae2e.sh /work/ COPY contrib/localnet/orchestrator/restart-zetaclientd.sh /work/ -COPY contrib/localnet/orchestrator/restart-zetaclientd-at-upgrade.sh /work/ RUN chmod +x /work/*.sh ENV GOPATH /go diff --git a/contrib/localnet/orchestrator/Dockerfile.fastbuild b/contrib/localnet/orchestrator/Dockerfile.fastbuild index 36cf07b8cc..9be2c48124 100644 --- a/contrib/localnet/orchestrator/Dockerfile.fastbuild +++ b/contrib/localnet/orchestrator/Dockerfile.fastbuild @@ -12,8 +12,6 @@ COPY --from=zeta /root/.ssh/localtest.pem.pub /root/.ssh/authorized_keys COPY --from=zeta /root/.ssh/localtest.pem /root/.ssh/localtest.pem COPY contrib/localnet/orchestrator/start-zetae2e.sh /work/ -COPY contrib/localnet/orchestrator/restart-zetaclientd.sh /work/ -COPY contrib/localnet/orchestrator/restart-zetaclientd-at-upgrade.sh /work/ RUN chmod +x /work/*.sh COPY --from=zeta /usr/local/bin/zetae2e /usr/local/bin/ diff --git a/contrib/localnet/orchestrator/restart-zetaclientd-at-upgrade.sh b/contrib/localnet/orchestrator/restart-zetaclientd-at-upgrade.sh deleted file mode 100644 index 7911deca01..0000000000 --- a/contrib/localnet/orchestrator/restart-zetaclientd-at-upgrade.sh +++ /dev/null @@ -1,50 +0,0 @@ -#!/bin/bash - -# This script is used to restart zetaclientd after an upgrade -# It waits for the upgrade height to be reached and then restarts the zetaclientd on all nodes in the network -# It interacts with the network using the zetaclientd binary - -clibuilder() -{ - echo "" - echo "Usage: $0 -u UPGRADE_HEIGHT" - echo -e "\t-u Height of upgrade, should match governance proposal" - echo -e "\t-n Number of clients in the network" - exit 1 # Exit script after printing help -} - -while getopts "u:n:" opt -do - case "$opt" in - u ) UPGRADE_HEIGHT="$OPTARG" ;; - n ) NUM_OF_NODES="$OPTARG" ;; - ? ) clibuilder ;; # Print cliBuilder in case parameter is non-existent - esac -done - -# generate client list -START=0 -END=$((NUM_OF_NODES-1)) -CLIENT_LIST=() -for i in $(eval echo "{$START..$END}") -do - CLIENT_LIST+=("zetaclient$i") -done - -echo "$UPGRADE_HEIGHT" - -CURRENT_HEIGHT=0 - -while [[ $CURRENT_HEIGHT -lt $UPGRADE_HEIGHT ]] -do - CURRENT_HEIGHT=$(curl -s zetacore0:26657/status | jq '.result.sync_info.latest_block_height' | tr -d '"') - echo current height is "$CURRENT_HEIGHT", waiting for "$UPGRADE_HEIGHT" - sleep 5 -done - -echo upgrade height reached, restarting zetaclients - -for NODE in "${CLIENT_LIST[@]}"; do - ssh -o "StrictHostKeyChecking no" "$NODE" -i ~/.ssh/localtest.pem killall zetaclientd - ssh -o "StrictHostKeyChecking no" "$NODE" -i ~/.ssh/localtest.pem "$GOPATH/bin/new/zetaclientd start < /root/password.file > $HOME/zetaclient.log 2>&1 &" -done diff --git a/contrib/localnet/orchestrator/restart-zetaclientd.sh b/contrib/localnet/orchestrator/restart-zetaclientd.sh index 6071b07570..b99a11d157 100644 --- a/contrib/localnet/orchestrator/restart-zetaclientd.sh +++ b/contrib/localnet/orchestrator/restart-zetaclientd.sh @@ -1,11 +1,9 @@ #!/bin/bash # This script immediately restarts the zetaclientd on zetaclient0 and zetaclient1 containers in the network +# zetaclientd-supervisor will restart zetaclient automatically echo restarting zetaclients ssh -o "StrictHostKeyChecking no" "zetaclient0" -i ~/.ssh/localtest.pem killall zetaclientd -ssh -o "StrictHostKeyChecking no" "zetaclient1" -i ~/.ssh/localtest.pem killall zetaclientd -ssh -o "StrictHostKeyChecking no" "zetaclient0" -i ~/.ssh/localtest.pem "/usr/local/bin/zetaclientd start < /root/password.file > $HOME/zetaclient.log 2>&1 &" -ssh -o "StrictHostKeyChecking no" "zetaclient1" -i ~/.ssh/localtest.pem "/usr/local/bin/zetaclientd start < /root/password.file > $HOME/zetaclient.log 2>&1 &" - +ssh -o "StrictHostKeyChecking no" "zetaclient1" -i ~/.ssh/localtest.pem killall zetaclientd \ No newline at end of file diff --git a/contrib/localnet/orchestrator/start-zetae2e.sh b/contrib/localnet/orchestrator/start-zetae2e.sh index 9d630950b7..772d7dd496 100644 --- a/contrib/localnet/orchestrator/start-zetae2e.sh +++ b/contrib/localnet/orchestrator/start-zetae2e.sh @@ -73,8 +73,14 @@ if [ "$OPTION" == "upgrade" ]; then echo "E2E setup passed, waiting for upgrade height..." - # Restart zetaclients at upgrade height - /work/restart-zetaclientd-at-upgrade.sh -u "$UPGRADE_HEIGHT" -n 2 + CURRENT_HEIGHT=0 + # wait for upgrade height + while [[ $CURRENT_HEIGHT -lt $UPGRADE_HEIGHT ]] + do + CURRENT_HEIGHT=$(curl -s zetacore0:26657/status | jq '.result.sync_info.latest_block_height' | tr -d '"') + echo current height is "$CURRENT_HEIGHT", waiting for "$UPGRADE_HEIGHT" + sleep 5 + done echo "waiting 10 seconds for node to restart..." diff --git a/contrib/localnet/scripts/start-zetaclientd.sh b/contrib/localnet/scripts/start-zetaclientd.sh index 89c691d52b..98cf207ef5 100755 --- a/contrib/localnet/scripts/start-zetaclientd.sh +++ b/contrib/localnet/scripts/start-zetaclientd.sh @@ -43,7 +43,7 @@ then set_sepolia_endpoint fi - zetaclientd start < /root/password.file + zetaclientd-supervisor start < /root/password.file else num=$(echo $HOSTNAME | tr -dc '0-9') node="zetacore$num" @@ -63,7 +63,7 @@ else set_sepolia_endpoint fi - zetaclientd start < /root/password.file + zetaclientd-supervisor start < /root/password.file fi # check if the option is background