diff --git a/changelog.md b/changelog.md index f04eeaa30a..44f3bde31a 100644 --- a/changelog.md +++ b/changelog.md @@ -16,6 +16,7 @@ ## Refactor * [3170](https://github.com/zeta-chain/node/pull/3170) - revamp TSS package in zetaclient +* [3291](https://github.com/zeta-chain/node/pull/3291) - revamp zetaclient initialization (+ graceful shutdown) ### Fixes diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index 9906d735fc..e82baec911 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -5,24 +5,18 @@ import ( "net/http" _ "net/http/pprof" // #nosec G108 -- pprof enablement is intentional "os" - "os/signal" - "path/filepath" - "strings" - "syscall" "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" - "github.com/zeta-chain/node/pkg/authz" "github.com/zeta-chain/node/pkg/chains" "github.com/zeta-chain/node/pkg/constant" + "github.com/zeta-chain/node/pkg/graceful" zetaos "github.com/zeta-chain/node/pkg/os" "github.com/zeta-chain/node/zetaclient/chains/base" "github.com/zeta-chain/node/zetaclient/config" zctx "github.com/zeta-chain/node/zetaclient/context" - "github.com/zeta-chain/node/zetaclient/keys" "github.com/zeta-chain/node/zetaclient/maintenance" "github.com/zeta-chain/node/zetaclient/metrics" "github.com/zeta-chain/node/zetaclient/orchestrator" @@ -36,119 +30,55 @@ const ( envPprofAddr = "PPROF_ADDR" ) -// Start starts zetaclientd process todo revamp -// https://github.com/zeta-chain/node/issues/3112 +// Start starts zetaclientd process func Start(_ *cobra.Command, _ []string) error { - // Prompt for Hotkey, TSS key-share and relayer key passwords - titles := []string{"HotKey", "TSS", "Solana Relayer Key"} - passwords, err := zetaos.PromptPasswords(titles) - if err != nil { - return errors.Wrap(err, "unable to get passwords") - } - hotkeyPass, tssKeyPass, solanaKeyPass := passwords[0], passwords[1], passwords[2] - relayerKeyPasswords := map[string]string{ - chains.Network_solana.String(): solanaKeyPass, - } - // Load Config file given path cfg, err := config.Load(globalOpts.ZetacoreHome) if err != nil { - return err + return errors.Wrap(err, "unable to load config") } - logger, err := base.InitLogger(cfg) + dbPath, err := config.ResolveDBPath() if err != nil { - return errors.Wrap(err, "initLogger failed") + return errors.Wrap(err, "unable to resolve db path") } - masterLogger := logger.Std - startLogger := logger.Std.With().Str("module", "startup").Logger() - - appContext := zctx.New(cfg, relayerKeyPasswords, masterLogger) - ctx := zctx.WithAppContext(context.Background(), appContext) - - // Wait until zetacore is up - waitForZetaCore(cfg, startLogger) - startLogger.Info().Msgf("Zetacore is ready, trying to connect to %s", cfg.Peer) - - telemetryServer := metrics.NewTelemetryServer() - go func() { - err := telemetryServer.Start() - if err != nil { - startLogger.Error().Err(err).Msg("telemetryServer error") - panic("telemetryServer error") - } - }() - - go runPprof(startLogger) - - // CreateZetacoreClient: zetacore client is used for all communication to zetacore , which this client connects to. - // Zetacore accumulates votes , and provides a centralized source of truth for all clients - zetacoreClient, err := createZetacoreClient(cfg, hotkeyPass, masterLogger) + // Configure logger (also overrides the default log level) + logger, err := base.NewLogger(cfg) if err != nil { - return errors.Wrap(err, "unable to create zetacore client") + return errors.Wrap(err, "unable to create logger") } - // Wait until zetacore is ready to create blocks - if err = waitForZetacoreToCreateBlocks(ctx, zetacoreClient, startLogger); err != nil { - startLogger.Error().Err(err).Msg("WaitForZetacoreToCreateBlocks error") - return err - } - startLogger.Info().Msgf("Zetacore client is ready") - - // Set grantee account number and sequence number - err = zetacoreClient.SetAccountNumber(authz.ZetaClientGranteeKey) + passes, err := promptPasswords() if err != nil { - startLogger.Error().Err(err).Msg("SetAccountNumber error") - return err + return errors.Wrap(err, "unable to prompt for passwords") } - // cross-check chainid - res, err := zetacoreClient.GetNodeInfo(ctx) - if err != nil { - startLogger.Error().Err(err).Msg("GetNodeInfo error") - return err - } + appContext := zctx.New(cfg, passes.relayerKeys(), logger.Std) + ctx := zctx.WithAppContext(context.Background(), appContext) - if strings.Compare(res.GetDefaultNodeInfo().Network, cfg.ChainID) != 0 { - startLogger.Warn(). - Msgf("chain id mismatch, zetacore chain id %s, zetaclient configured chain id %s; reset zetaclient chain id", res.GetDefaultNodeInfo().Network, cfg.ChainID) - cfg.ChainID = res.GetDefaultNodeInfo().Network - err := zetacoreClient.UpdateChainID(cfg.ChainID) - if err != nil { - return err - } + telemetry, err := startTelemetry(ctx, cfg) + if err != nil { + return errors.Wrap(err, "unable to start telemetry") } - // CreateAuthzSigner : which is used to sign all authz messages . All votes broadcast to zetacore are wrapped in authz exec . - // This is to ensure that the user does not need to keep their operator key online , and can use a cold key to sign votes - signerAddress, err := zetacoreClient.GetKeys().GetAddress() + // zetacore client is used for all communication to zeta node. + // it accumulates votes, and provides a source of truth for all clients + // + // This call crated client, ensured block production, then prepares the client + zetacoreClient, err := zetacore.NewFromConfig(ctx, &cfg, passes.hotkey, logger.Std) if err != nil { - return errors.Wrap(err, "error getting signer address") + return errors.Wrap(err, "unable to create zetacore client from config") } - createAuthzSigner(zetacoreClient.GetKeys().GetOperatorAddress().String(), signerAddress) - startLogger.Debug().Msgf("createAuthzSigner is ready") - // Initialize core parameters from zetacore - if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, startLogger); err != nil { + if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, logger.Std); err != nil { return errors.Wrap(err, "unable to update app context") } - startLogger.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked()) - - m, err := metrics.NewMetrics() - if err != nil { - return errors.Wrap(err, "unable to create metrics") - } - m.Start() - - metrics.Info.WithLabelValues(constant.Version).Set(1) - metrics.LastStartTime.SetToCurrentTime() - - telemetryServer.SetIPAddress(cfg.PublicIP) + log.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked()) - granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, hotkeyPass) + granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, passes.hotkey) if err != nil { return errors.Wrap(err, "unable to resolve observer pub key bech32") } @@ -157,44 +87,34 @@ func Start(_ *cobra.Command, _ []string) error { Config: cfg, Zetacore: zetacoreClient, GranteePubKeyBech32: granteePubKeyBech32, - HotKeyPassword: hotkeyPass, - TSSKeyPassword: tssKeyPass, + HotKeyPassword: passes.hotkey, + TSSKeyPassword: passes.tss, BitcoinChainIDs: btcChainIDsFromContext(appContext), PostBlame: isEnvFlagEnabled(envFlagPostBlame), - Telemetry: telemetryServer, + Telemetry: telemetry, } - tss, err := zetatss.Setup(ctx, tssSetupProps, startLogger) + tss, err := zetatss.Setup(ctx, tssSetupProps, logger.Std) if err != nil { return errors.Wrap(err, "unable to setup TSS service") } - // Creating a channel to listen for os signals (or other signals) - signalChannel := make(chan os.Signal, 1) - signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) - - // Starts various background TSS listeners. - // Shuts down zetaclientd if any is triggered. - maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() { - masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.") - signalChannel <- syscall.SIGTERM - }) - - if len(appContext.ListChainIDs()) == 0 { - startLogger.Error().Interface("config", cfg).Msgf("No chains in updated config") - } - isObserver, err := isObserverNode(ctx, zetacoreClient) switch { case err != nil: - startLogger.Error().Msgf("Unable to determine if node is an observer") - return err + return errors.Wrap(err, "unable to check if observer node") case !isObserver: - addr := zetacoreClient.GetKeys().GetOperatorAddress().String() - startLogger.Info().Str("operator_address", addr).Msg("This node is not an observer. Exit 0") + logger.Std.Warn().Msg("This node is not an observer node. Exit 0") return nil } + // Starts various background TSS listeners. + // Shuts down zetaclientd if any is triggered. + maintenance.NewTSSListener(zetacoreClient, logger.Std).Listen(ctx, func() { + logger.Std.Info().Msg("TSS listener received an action to shutdown zetaclientd.") + graceful.ShutdownNow() + }) + // CreateSignerMap: This creates a map of all signers for each chain. // Each signer is responsible for signing transactions for a particular chain signerMap, err := orchestrator.CreateSignerMap(ctx, tss, logger) @@ -203,16 +123,9 @@ func Start(_ *cobra.Command, _ []string) error { return err } - userDir, err := os.UserHomeDir() - if err != nil { - log.Error().Err(err).Msg("os.UserHomeDir") - return err - } - dbpath := filepath.Join(userDir, ".zetaclient/chainobserver") - // Creates a map of all chain observers for each chain. // Each chain observer is responsible for observing events on the chain and processing them. - observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbpath, logger, telemetryServer) + observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbPath, logger, telemetry) if err != nil { return errors.Wrap(err, "unable to create chain observer map") } @@ -226,84 +139,87 @@ func Start(_ *cobra.Command, _ []string) error { signerMap, observerMap, tss, - dbpath, + dbPath, logger, - telemetryServer, + telemetry, ) if err != nil { return errors.Wrap(err, "unable to create orchestrator") } // Start orchestrator with all observers and signers - if err = maestro.Start(ctx); err != nil { - return errors.Wrap(err, "unable to start orchestrator") - } + graceful.AddService(ctx, maestro) - // start zeta supply checker - // TODO: enable - // https://github.com/zeta-chain/node/issues/1354 - // NOTE: this is disabled for now because we need to determine the frequency on how to handle invalid check - // The method uses GRPC query to the node we might need to improve for performance - //zetaSupplyChecker, err := mc.NewZetaSupplyChecker(cfg, zetacoreClient, masterLogger) - //if err != nil { - // startLogger.Err(err).Msg("NewZetaSupplyChecker") - //} - //if err == nil { - // zetaSupplyChecker.Start() - // defer zetaSupplyChecker.Stop() - //} + // Block current routine until a shutdown signal is received + graceful.WaitForShutdown() - startLogger.Info().Msg("zetaclientd is running") + return nil +} - sig := <-signalChannel - startLogger.Info().Msgf("Stop signal received: %q. Stopping zetaclientd", sig) +type passwords struct { + hotkey string + tss string + solanaRelayerKey string +} - maestro.Stop() +// promptPasswords prompts for Hotkey, TSS key-share and relayer key passwords +func promptPasswords() (passwords, error) { + titles := []string{"HotKey", "TSS", "Solana Relayer Key"} - return nil + res, err := zetaos.PromptPasswords(titles) + if err != nil { + return passwords{}, errors.Wrap(err, "unable to get passwords") + } + + return passwords{ + hotkey: res[0], + tss: res[1], + solanaRelayerKey: res[2], + }, nil } -// isObserverNode checks whether THIS node is an observer node. -func isObserverNode(ctx context.Context, client *zetacore.Client) (bool, error) { - observers, err := client.GetObserverList(ctx) - if err != nil { - return false, errors.Wrap(err, "unable to get observers list") +func (p passwords) relayerKeys() map[string]string { + return map[string]string{ + chains.Network_solana.String(): p.solanaRelayerKey, } +} - operatorAddress := client.GetKeys().GetOperatorAddress().String() +func startTelemetry(ctx context.Context, cfg config.Config) (*metrics.TelemetryServer, error) { + // 1. Init pprof http server + pprofServer := func(_ context.Context) error { + addr := os.Getenv(envPprofAddr) + if addr == "" { + addr = "localhost:6061" + } + + log.Info().Str("addr", addr).Msg("starting pprof http server") - for _, observer := range observers { - if observer == operatorAddress { - return true, nil + // #nosec G114 -- timeouts unneeded + err := http.ListenAndServe(addr, nil) + if err != nil { + log.Error().Err(err).Msg("pprof http server error") } - } - return false, nil -} + return nil + } -func resolveObserverPubKeyBech32(cfg config.Config, hotKeyPassword string) (string, error) { - // Get observer's public key ("grantee pub key") - _, granteePubKeyBech32, err := keys.GetKeyringKeybase(cfg, hotKeyPassword) + // 2. Init metrics server + metricsServer, err := metrics.NewMetrics() if err != nil { - return "", errors.Wrap(err, "unable to get keyring key base") + return nil, errors.Wrap(err, "unable to create metrics") } - return granteePubKeyBech32, nil -} + metrics.Info.WithLabelValues(constant.Version).Set(1) + metrics.LastStartTime.SetToCurrentTime() -// runPprof run pprof http server -// zetacored/cometbft is already listening for runPprof on 6060 (by default) -func runPprof(logger zerolog.Logger) { - addr := os.Getenv(envPprofAddr) - if addr == "" { - addr = "localhost:6061" - } + // 3. Init telemetry server + telemetry := metrics.NewTelemetryServer() + telemetry.SetIPAddress(cfg.PublicIP) - logger.Info().Str("addr", addr).Msg("starting pprof http server") + // 4. Add services to the process + graceful.AddStarter(ctx, pprofServer) + graceful.AddService(ctx, metricsServer) + graceful.AddService(ctx, telemetry) - // #nosec G114 -- timeouts unneeded - err := http.ListenAndServe(addr, nil) - if err != nil { - logger.Error().Err(err).Msg("pprof http server error") - } + return telemetry, nil } diff --git a/cmd/zetaclientd/utils.go b/cmd/zetaclientd/utils.go index f7ef2f91bc..60cb7c9361 100644 --- a/cmd/zetaclientd/utils.go +++ b/cmd/zetaclientd/utils.go @@ -2,104 +2,33 @@ package main import ( "context" - "fmt" "os" "strconv" - "time" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/pkg/errors" - "github.com/rs/zerolog" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "github.com/zeta-chain/node/zetaclient/authz" - "github.com/zeta-chain/node/zetaclient/chains/interfaces" "github.com/zeta-chain/node/zetaclient/config" zctx "github.com/zeta-chain/node/zetaclient/context" "github.com/zeta-chain/node/zetaclient/keys" "github.com/zeta-chain/node/zetaclient/zetacore" ) -func createAuthzSigner(granter string, grantee sdk.AccAddress) { - authz.SetupAuthZSignerList(granter, grantee) -} - -func createZetacoreClient(cfg config.Config, hotkeyPassword string, logger zerolog.Logger) (*zetacore.Client, error) { - hotKey := cfg.AuthzHotkey - - chainIP := cfg.ZetaCoreURL - - kb, _, err := keys.GetKeyringKeybase(cfg, hotkeyPassword) +// isObserverNode checks whether THIS node is an observer node. +func isObserverNode(ctx context.Context, zc *zetacore.Client) (bool, error) { + observers, err := zc.GetObserverList(ctx) if err != nil { - return nil, errors.Wrap(err, "failed to get keyring base") + return false, errors.Wrap(err, "unable to get observers list") } - granterAddress, err := sdk.AccAddressFromBech32(cfg.AuthzGranter) - if err != nil { - return nil, errors.Wrap(err, "failed to get granter address") - } - - k := keys.NewKeysWithKeybase(kb, granterAddress, cfg.AuthzHotkey, hotkeyPassword) - - client, err := zetacore.NewClient(k, chainIP, hotKey, cfg.ChainID, logger) - if err != nil { - return nil, errors.Wrap(err, "failed to create zetacore client") - } - - return client, nil -} - -func waitForZetaCore(config config.Config, logger zerolog.Logger) { - const ( - port = 9090 - retry = 5 * time.Second - ) - - var ( - url = fmt.Sprintf("%s:%d", config.ZetaCoreURL, port) - opt = grpc.WithTransportCredentials(insecure.NewCredentials()) - ) - - // wait until zetacore is up - logger.Debug().Msgf("Waiting for zetacore to open %d port...", port) + operatorAddress := zc.GetKeys().GetOperatorAddress().String() - for { - if _, err := grpc.Dial(url, opt); err != nil { - logger.Warn().Err(err).Msg("grpc dial fail") - time.Sleep(retry) - } else { - break + for _, observer := range observers { + if observer == operatorAddress { + return true, nil } } -} - -func waitForZetacoreToCreateBlocks(ctx context.Context, zc interfaces.ZetacoreClient, logger zerolog.Logger) error { - const ( - interval = 5 * time.Second - attempts = 15 - ) - - var ( - retryCount = 0 - start = time.Now() - ) - - for { - blockHeight, err := zc.GetBlockHeight(ctx) - if err == nil && blockHeight > 1 { - logger.Info().Msgf("Zeta block height: %d", blockHeight) - return nil - } - retryCount++ - if retryCount > attempts { - return fmt.Errorf("zetacore is not ready, timeout %s", time.Since(start).String()) - } - - logger.Info().Msgf("Failed to get block number, retry : %d/%d", retryCount, attempts) - time.Sleep(interval) - } + return false, nil } func isEnvFlagEnabled(flag string) bool { @@ -119,3 +48,13 @@ func btcChainIDsFromContext(app *zctx.AppContext) []int64 { return btcChainIDs } + +func resolveObserverPubKeyBech32(cfg config.Config, hotKeyPassword string) (string, error) { + // Get observer's public key ("grantee pub key") + _, granteePubKeyBech32, err := keys.GetKeyringKeybase(cfg, hotKeyPassword) + if err != nil { + return "", errors.Wrap(err, "unable to get keyring key base") + } + + return granteePubKeyBech32, nil +} diff --git a/pkg/graceful/graceful.go b/pkg/graceful/graceful.go new file mode 100644 index 0000000000..6f9ffe4cf4 --- /dev/null +++ b/pkg/graceful/graceful.go @@ -0,0 +1,190 @@ +// Package graceful contains tools for graceful shutdown. +// GS refers to the process of shutting down a system in a controlled manner, allowing it to complete ongoing tasks, +// release resources, and perform necessary cleanup operations before terminating. +// This ensures that the system stops functioning without causing data loss, corruption, or other issues. +package graceful + +import ( + "context" + "fmt" + "os" + "os/signal" + "runtime/debug" + "strings" + "sync" + "syscall" + "time" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +// Process represents "virtual" process that contains +// routines that can be started and stopped +// Note that ANY failure in starting a service will cause the process to shutdown +type Process struct { + stop <-chan os.Signal + stopStack []func() + + timeout time.Duration + mu sync.Mutex + stopped bool + + logger zerolog.Logger +} + +// Service represents abstract service. +type Service interface { + Start(ctx context.Context) error + Stop() +} + +// DefaultProcess is a process instance with some sane defaults. +var DefaultProcess = New(15*time.Second, log.Logger, NewSigChan(syscall.SIGINT, syscall.SIGTERM)) + +// New Process constructor. +func New(timeout time.Duration, logger zerolog.Logger, stop <-chan os.Signal) *Process { + return &Process{ + stop: stop, + timeout: timeout, + logger: logger.With().Str("module", "graceful").Logger(), + } +} + +// AddService adds Service to the process. +func (p *Process) AddService(ctx context.Context, s Service) { + p.AddStarter(ctx, s.Start) + p.AddStopper(s.Stop) +} + +// AddStarter runs a function that starts something. +// This is a blocking call for blocking .Start() services +func (p *Process) AddStarter(ctx context.Context, fn func(ctx context.Context) error) { + go func() { + defer func() { + if r := recover(); r != nil { + p.logger.Error().Err(panicToErr(r, 10)).Msg("panic in service") + p.ShutdownNow() + } + }() + + if err := fn(ctx); err != nil { + p.logger.Error().Err(err).Msg("failed to start service") + p.ShutdownNow() + } + }() +} + +// AddStopper adds a function will be executed during shutdown. +func (p *Process) AddStopper(fn func()) { + p.mu.Lock() + defer p.mu.Unlock() + + p.stopStack = append(p.stopStack, fn) +} + +// WaitForShutdown blocks current routine until a shutdown signal is received +func (p *Process) WaitForShutdown() { + t := time.NewTicker(time.Second) + defer t.Stop() + + for { + select { + case sig := <-p.stop: + p.logger.Info().Msgf("Received signal: %q", sig.String()) + p.ShutdownNow() + return + case <-t.C: + // another goroutine already called ShutdownNow + // safe to read w/o mutex + if p.stopped { + return + } + } + } +} + +// ShutdownNow invokes shutdown of all services. +func (p *Process) ShutdownNow() { + p.mu.Lock() + defer p.mu.Unlock() + + // noop + if p.stopped { + return + } + + defer func() { + p.stopped = true + }() + + p.logger.Info().Msg("Shutting down") + + deadline := time.After(p.timeout) + done := make(chan struct{}) + + go func() { + defer func() { + if r := recover(); r != nil { + p.logger.Error().Err(panicToErr(r, 10)).Msg("panic during shutdown") + } + + // complete shutdown + close(done) + }() + + // stop services in the reverse order + for i := len(p.stopStack) - 1; i >= 0; i-- { + p.stopStack[i]() + } + }() + + select { + case <-deadline: + p.logger.Info().Msgf("Shutdown interrupted by timeout (%s)", p.timeout.String()) + case <-done: + p.logger.Info().Msg("Shutdown completed") + } +} + +// panicToErr converts panic to error WITH exact line of panic. +// Note the offset should be determined empirically. +func panicToErr(panic any, offset int) error { + stack := string(debug.Stack()) + lines := strings.Split(stack, "\n") + line := "" + + if len(lines) > offset { + line = strings.TrimSpace(lines[offset]) + } + + return fmt.Errorf("panic: %v at %s", panic, line) +} + +// NewSigChan creates a new signal channel. +func NewSigChan(signals ...os.Signal) chan os.Signal { + out := make(chan os.Signal, 1) + signal.Notify(out, signals...) + + return out +} + +func AddService(ctx context.Context, s Service) { + DefaultProcess.AddService(ctx, s) +} + +func AddStarter(ctx context.Context, fn func(ctx context.Context) error) { + DefaultProcess.AddStarter(ctx, fn) +} + +func AddStopper(fn func()) { + DefaultProcess.AddStopper(fn) +} + +func WaitForShutdown() { + DefaultProcess.WaitForShutdown() +} + +func ShutdownNow() { + DefaultProcess.ShutdownNow() +} diff --git a/pkg/graceful/graceful_test.go b/pkg/graceful/graceful_test.go new file mode 100644 index 0000000000..37368cecea --- /dev/null +++ b/pkg/graceful/graceful_test.go @@ -0,0 +1,298 @@ +package graceful + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" +) + +func TestProcess(t *testing.T) { + const defaultTimeout = 2 * time.Second + + ctx := context.Background() + + t.Run("Service sync", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + + // ACT + // Run service + ts.process.AddService(ctx, ts.mockService) + + start := time.Now() + + // And after 1 second someone presses ctrl+c + go func() { + time.Sleep(time.Second) + ts.mockSignal <- os.Interrupt + }() + + ts.process.WaitForShutdown() + + // ASSERT + // Check that service was stopped in a timely manner + assert.Less(t, time.Since(start), defaultTimeout) + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + assert.Contains(t, ts.logBuffer.String(), "mock is running in blocking mode") + }) + + t.Run("Service async", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, true) + + // Run service + ts.process.AddService(ctx, ts.mockService) + + // ACT + start := time.Now() + + // And after 700ms someone presses ctrl+c + go func() { + time.Sleep(700 * time.Millisecond) + ts.mockSignal <- os.Interrupt + }() + + ts.process.WaitForShutdown() + + // ASSERT + // Check that service was stopped in a timely manner + assert.Less(t, time.Since(start), defaultTimeout) + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + assert.Contains(t, ts.logBuffer.String(), "mock is running in non-blocking mode") + }) + + t.Run("Manual starters and stoppers", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + + // Given one starter + ts.process.AddStarter(ctx, func(ctx context.Context) error { + ts.logger.Info().Msg("Hello world") + return nil + }) + + // And two stoppers + ts.process.AddStopper(func() { + time.Sleep(200 * time.Millisecond) + ts.logger.Info().Msg("Stopper 1") + }) + + ts.process.AddStopper(func() { + time.Sleep(300 * time.Millisecond) + ts.logger.Info().Msg("Stopper 2") + }) + + // ACT + start := time.Now() + + // And after 1s someone presses ctrl+c + go func() { + time.Sleep(time.Second) + ts.mockSignal <- os.Interrupt + }() + + ts.process.WaitForShutdown() + + // ASSERT + // Check that service was stopped in a timely manner + assert.Less(t, time.Since(start), defaultTimeout) + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + assert.Contains(t, ts.logBuffer.String(), "Stopper 1") + assert.Contains(t, ts.logBuffer.String(), "Stopper 2") + }) + + t.Run("Starter error", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + + ts.mockService.errStart = fmt.Errorf("failed to start service") + + ts.process.AddService(ctx, ts.mockService) + + // ACT + start := time.Now() + ts.process.WaitForShutdown() + + // ASSERT + // Check that service had errors and was stopped + assert.Less(t, time.Since(start), defaultTimeout) + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + assert.Contains(t, ts.logBuffer.String(), "failed to start service") + }) + + t.Run("Panic handling during startup", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + + ts.process.AddStarter(ctx, func(ctx context.Context) error { + panic("oopsie") + return nil + }) + + // ACT + ts.process.WaitForShutdown() + + // ASSERT + // Check that service had errors and was stopped + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + assert.Contains(t, ts.logBuffer.String(), "panic in service") + + // Check that error contains exact line of panic + assert.Contains(t, ts.logBuffer.String(), "graceful_test.go:145") + }) + + t.Run("Panic handling during shutdown", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + + ts.process.AddStopper(func() { + panic("bombarda maxima") + }) + + // ACT + ts.process.ShutdownNow() + + // ASSERT + // Check that service had errors and was stopped + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + assert.Contains(t, ts.logBuffer.String(), "panic during shutdown") + + // Check that error contains exact line of panic + assert.Contains(t, ts.logBuffer.String(), "graceful_test.go:168") + }) + + t.Run("WaitForShutdown noop", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + ts.process.AddService(ctx, ts.mockService) + + // ACT + ts.process.ShutdownNow() + ts.process.WaitForShutdown() + + // ASSERT + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + }) + + t.Run("Shutdown timeout", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + + // Given some slow stopper + const workDuration = defaultTimeout + 5*time.Second + + ts.process.AddStopper(func() { + ts.logger.Info().Msg("Stopping something") + time.Sleep(workDuration) + ts.logger.Info().Msg("Stopped something") + }) + + // ACT + ts.process.ShutdownNow() + + // ASSERT + assert.Contains(t, ts.logBuffer.String(), "Stopping something") + assert.Contains(t, ts.logBuffer.String(), "Shutdown interrupted by timeout") + + // log doesn't contain this line because it was interrupted + assert.NotContains(t, ts.logBuffer.String(), "Stopped something") + }) +} + +type testSuite struct { + process *Process + mockService *mockService + mockSignal chan os.Signal + + logger zerolog.Logger + logBuffer *bytes.Buffer +} + +func newTestSuite(t *testing.T, timeout time.Duration, async bool) *testSuite { + logBuffer := &bytes.Buffer{} + logger := zerolog.New(io.MultiWriter(zerolog.NewTestWriter(t), logBuffer)) + + stop := NewSigChan(os.Interrupt) + process := New(timeout, logger, stop) + + return &testSuite{ + mockSignal: stop, + process: process, + logger: logger, + logBuffer: logBuffer, + mockService: &mockService{ + async: async, + Logger: logger, + }, + } +} + +type mockService struct { + errStart error + async bool + running bool + zerolog.Logger +} + +func (m *mockService) Start(_ context.Context) error { + const interval = 300 * time.Millisecond + + m.running = true + + // emulate async started + if m.async { + go func() { + for { + if m.errStart != nil || !m.running { + return + } + + m.Info().Msg("mock is running in non-blocking mode") + time.Sleep(interval) + } + }() + + return nil + } + + for { + switch { + case m.errStart != nil: + m.running = false + return m.errStart + case !m.running: + return nil + default: + m.Info().Msg("mock is running in blocking mode") + time.Sleep(interval) + } + } +} + +func (m *mockService) Stop() { + m.running = false + m.Info().Msg("Stopping mock service") +} diff --git a/pkg/os/console.go b/pkg/os/console.go index c4a7c505c7..6782bb6b43 100644 --- a/pkg/os/console.go +++ b/pkg/os/console.go @@ -33,11 +33,11 @@ func PromptPasswords(passwordTitles []string) ([]string, error) { // readPassword is a helper function that reads a password from bufio.Reader func readPassword(reader *bufio.Reader, passwordTitle string) (string, error) { - const delimitor = '\n' + const delimiter = '\n' // prompt for password fmt.Printf("%s Password: ", passwordTitle) - password, err := reader.ReadString(delimitor) + password, err := reader.ReadString(delimiter) if err != nil { return "", err } diff --git a/zetaclient/chains/base/logger.go b/zetaclient/chains/base/logger.go index c70c1fd738..d5ff2948af 100644 --- a/zetaclient/chains/base/logger.go +++ b/zetaclient/chains/base/logger.go @@ -1,6 +1,7 @@ package base import ( + "io" "os" "path/filepath" "time" @@ -11,9 +12,7 @@ import ( "github.com/zeta-chain/node/zetaclient/config" ) -const ( - ComplianceLogFile = "compliance.log" -) +const complianceLogFile = "compliance.log" // Logger contains the base loggers type Logger struct { @@ -21,7 +20,7 @@ type Logger struct { Compliance zerolog.Logger } -// DefaultLoggers creates default base loggers for tests +// DefaultLogger creates default base loggers for tests func DefaultLogger() Logger { return Logger{ Std: log.Logger, @@ -50,39 +49,38 @@ type ObserverLogger struct { Compliance zerolog.Logger } -// InitLogger initializes the base loggers -func InitLogger(cfg config.Config) (Logger, error) { +// NewLogger initializes the base loggers +func NewLogger(cfg config.Config) (Logger, error) { // open compliance log file - file, err := openComplianceLogFile(cfg) + complianceFile, err := openComplianceLogFile(cfg) if err != nil { - return DefaultLogger(), err + return Logger{}, err } - level := zerolog.Level(cfg.LogLevel) + augmentLogger := func(logger zerolog.Logger) zerolog.Logger { + level := zerolog.Level(cfg.LogLevel) + + return logger.Level(level).With().Timestamp().Logger() + } // create loggers based on configured level and format - var std zerolog.Logger - var compliance zerolog.Logger - switch cfg.LogFormat { - case "json": - std = zerolog.New(os.Stdout).Level(level).With().Timestamp().Logger() - compliance = zerolog.New(file).Level(level).With().Timestamp().Logger() - case "text": - std = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339}). - Level(zerolog.Level(cfg.LogLevel)). - With(). - Timestamp(). - Logger() - compliance = zerolog.New(file).Level(level).With().Timestamp().Logger() - default: - std = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339}) - compliance = zerolog.New(file).With().Timestamp().Logger() + var stdWriter io.Writer = os.Stdout + if cfg.LogFormat != "json" { + stdWriter = zerolog.ConsoleWriter{ + Out: os.Stdout, + TimeFormat: time.RFC3339, + } } + std := augmentLogger(zerolog.New(stdWriter)) + compliance := augmentLogger(zerolog.New(complianceFile)) + if cfg.LogSampler { std = std.Sample(&zerolog.BasicSampler{N: 5}) } - log.Logger = std // set global logger + + // set global logger + log.Logger = std return Logger{ Std: std, @@ -99,11 +97,12 @@ func openComplianceLogFile(cfg config.Config) (*os.File, error) { } // clean file name - name := filepath.Join(logPath, ComplianceLogFile) + name := filepath.Join(logPath, complianceLogFile) name, err := filepath.Abs(name) if err != nil { return nil, err } + name = filepath.Clean(name) // open (or create) compliance log file diff --git a/zetaclient/chains/base/logger_test.go b/zetaclient/chains/base/logger_test.go index 571a84ee7d..d1b74bf0cb 100644 --- a/zetaclient/chains/base/logger_test.go +++ b/zetaclient/chains/base/logger_test.go @@ -79,7 +79,7 @@ func TestInitLogger(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // init logger - logger, err := base.InitLogger(tt.cfg) + logger, err := base.NewLogger(tt.cfg) // check if error is expected if tt.fail { diff --git a/zetaclient/chains/interfaces/interfaces.go b/zetaclient/chains/interfaces/interfaces.go index cd195912bb..4f00b7a2bb 100644 --- a/zetaclient/chains/interfaces/interfaces.go +++ b/zetaclient/chains/interfaces/interfaces.go @@ -19,7 +19,6 @@ import ( "github.com/gagliardetto/solana-go" solrpc "github.com/gagliardetto/solana-go/rpc" "github.com/onrik/ethrpc" - "github.com/rs/zerolog" "gitlab.com/thorchain/tss/go-tss/blame" "github.com/zeta-chain/node/pkg/chains" @@ -102,7 +101,6 @@ type ZetacoreClient interface { ZetacoreVoter Chain() chains.Chain - GetLogger() *zerolog.Logger GetKeys() keyinterfaces.ObserverKeys GetSupportedChains(ctx context.Context) ([]chains.Chain, error) diff --git a/zetaclient/config/config.go b/zetaclient/config/config.go index 82cb3f97f8..8bd3e9eff9 100644 --- a/zetaclient/config/config.go +++ b/zetaclient/config/config.go @@ -7,6 +7,8 @@ import ( "os" "path/filepath" "strings" + + "github.com/pkg/errors" ) // restrictedAddressBook is a map of restricted addresses @@ -114,3 +116,15 @@ func ContainRestrictedAddress(addrs ...string) bool { } return false } + +// ResolveDBPath resolves the path to chain observer database +func ResolveDBPath() (string, error) { + const dbpath = ".zetaclient/chainobserver" + + userDir, err := os.UserHomeDir() + if err != nil { + return "", errors.Wrap(err, "unable to resolve user home directory") + } + + return filepath.Join(userDir, dbpath), nil +} diff --git a/zetaclient/metrics/metrics.go b/zetaclient/metrics/metrics.go index e614cbf676..1c2e03a21f 100644 --- a/zetaclient/metrics/metrics.go +++ b/zetaclient/metrics/metrics.go @@ -7,6 +7,7 @@ import ( "net/url" "time" + "cosmossdk.io/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -186,20 +187,24 @@ func NewMetrics() (*Metrics, error) { } // Start starts the metrics server -func (m *Metrics) Start() { +func (m *Metrics) Start(_ context.Context) error { log.Info().Msg("metrics server starting") - go func() { - if err := m.s.ListenAndServe(); err != nil { - log.Error().Err(err).Msg("fail to start metric server") - } - }() + + if err := m.s.ListenAndServe(); err != nil { + return errors.Wrap(err, "fail to start metric server") + } + + return nil } // Stop stops the metrics server -func (m *Metrics) Stop() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) +func (m *Metrics) Stop() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return m.s.Shutdown(ctx) + + if err := m.s.Shutdown(ctx); err != nil { + log.Error().Err(err).Msg("failed to shutdown metrics server") + } } // GetInstrumentedHTTPClient sets up a http client that emits prometheus metrics diff --git a/zetaclient/metrics/metrics_test.go b/zetaclient/metrics/metrics_test.go index dff41d94c8..d630b94b79 100644 --- a/zetaclient/metrics/metrics_test.go +++ b/zetaclient/metrics/metrics_test.go @@ -1,6 +1,7 @@ package metrics import ( + "context" "fmt" "io" "net/http" @@ -25,7 +26,7 @@ var _ = Suite(&MetricsSuite{}) func (ms *MetricsSuite) SetUpSuite(c *C) { m, err := NewMetrics() c.Assert(err, IsNil) - m.Start() + go m.Start(context.Background()) ms.m = m } diff --git a/zetaclient/metrics/telemetry.go b/zetaclient/metrics/telemetry.go index c78bcd565d..777d68019e 100644 --- a/zetaclient/metrics/telemetry.go +++ b/zetaclient/metrics/telemetry.go @@ -198,10 +198,11 @@ func (t *TelemetryServer) Handlers() http.Handler { } // Start starts telemetry server -func (t *TelemetryServer) Start() error { +func (t *TelemetryServer) Start(_ context.Context) error { if t.s == nil { return errors.New("invalid http server instance") } + if err := t.s.ListenAndServe(); err != nil { if !errors.Is(err, http.ErrServerClosed) { return fmt.Errorf("fail to start http server: %w", err) @@ -212,14 +213,13 @@ func (t *TelemetryServer) Start() error { } // Stop stops telemetry server -func (t *TelemetryServer) Stop() error { - c, cancel := context.WithTimeout(context.Background(), 10*time.Second) +func (t *TelemetryServer) Stop() { + c, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - err := t.s.Shutdown(c) - if err != nil { - log.Error().Err(err).Msg("Failed to shutdown the HTTP server gracefully") + + if err := t.s.Shutdown(c); err != nil { + log.Error().Err(err).Msg("Failed to shutdown the TelemetryServer") } - return err } // pingHandler returns a 200 OK response diff --git a/zetaclient/zetacore/client.go b/zetaclient/zetacore/client.go index de54435c7e..df5b6dbeb6 100644 --- a/zetaclient/zetacore/client.go +++ b/zetaclient/zetacore/client.go @@ -254,10 +254,6 @@ func (c *Client) Chain() chains.Chain { return c.chain } -func (c *Client) GetLogger() *zerolog.Logger { - return &c.logger -} - func (c *Client) GetKeys() keyinterfaces.ObserverKeys { return c.keys } diff --git a/zetaclient/zetacore/client_start.go b/zetaclient/zetacore/client_start.go new file mode 100644 index 0000000000..af957e41c6 --- /dev/null +++ b/zetaclient/zetacore/client_start.go @@ -0,0 +1,134 @@ +package zetacore + +import ( + "context" + "fmt" + "time" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/pkg/errors" + "github.com/rs/zerolog" + + zetaauthz "github.com/zeta-chain/node/pkg/authz" + "github.com/zeta-chain/node/pkg/ticker" + "github.com/zeta-chain/node/zetaclient/authz" + "github.com/zeta-chain/node/zetaclient/config" + "github.com/zeta-chain/node/zetaclient/keys" +) + +// This file contains some high level logic for creating a zetacore client +// when starting zetaclientd in cmd/zetaclientd/start.go + +// NewFromConfig creates a new client from the given config. +// It also makes sure that the zetacore (i.e. the node) is ready to be used. +func NewFromConfig( + ctx context.Context, + cfg *config.Config, + hotkeyPassword string, + logger zerolog.Logger, +) (*Client, error) { + hotKey := cfg.AuthzHotkey + + chainIP := cfg.ZetaCoreURL + + kb, _, err := keys.GetKeyringKeybase(*cfg, hotkeyPassword) + if err != nil { + return nil, errors.Wrap(err, "failed to get keyring base") + } + + granterAddress, err := sdk.AccAddressFromBech32(cfg.AuthzGranter) + if err != nil { + return nil, errors.Wrap(err, "failed to get granter address") + } + + k := keys.NewKeysWithKeybase(kb, granterAddress, cfg.AuthzHotkey, hotkeyPassword) + + // All votes broadcasts to zetacore are wrapped in authz. + // This is to ensure that the user does not need to keep their operator key online, + // and can use a cold key to sign votes + signerAddress, err := k.GetAddress() + if err != nil { + return nil, errors.Wrap(err, "failed to get signer address") + } + + authz.SetupAuthZSignerList(k.GetOperatorAddress().String(), signerAddress) + + // Create client + client, err := NewClient(k, chainIP, hotKey, cfg.ChainID, logger) + if err != nil { + return nil, errors.Wrap(err, "failed to create the client") + } + + // Make sure that the node produces blocks + if err = ensureBlocksProduction(ctx, client); err != nil { + return nil, errors.Wrap(err, "zetacore unavailable") + } + + // Prepare the client + if err = prepareZetacoreClient(ctx, client, cfg); err != nil { + return nil, errors.Wrap(err, "failed to prepare the client") + } + + return client, nil +} + +// ensureBlocksProduction waits for zetacore to be ready (i.e. producing blocks) +func ensureBlocksProduction(ctx context.Context, zc *Client) error { + const ( + interval = 5 * time.Second + attempts = 15 + ) + + var ( + retryCount = 0 + start = time.Now() + ) + + task := func(ctx context.Context, t *ticker.Ticker) error { + blockHeight, err := zc.GetBlockHeight(ctx) + + if err == nil && blockHeight > 1 { + zc.logger.Info().Msgf("Zetacore is ready, block height: %d", blockHeight) + t.Stop() + return nil + } + + retryCount++ + if retryCount > attempts { + return fmt.Errorf("zetacore is not ready, timeout %s", time.Since(start).String()) + } + + zc.logger.Info().Msgf("Failed to get block number, retry: %d/%d", retryCount, attempts) + return nil + } + + return ticker.Run(ctx, interval, task) +} + +// prepareZetacoreClient prepares the zetacore client for use. +func prepareZetacoreClient(ctx context.Context, zc *Client, cfg *config.Config) error { + // Set grantee account number and sequence number + if err := zc.SetAccountNumber(zetaauthz.ZetaClientGranteeKey); err != nil { + return errors.Wrap(err, "failed to set account number") + } + + res, err := zc.GetNodeInfo(ctx) + if err != nil { + return errors.Wrap(err, "failed to get node info") + } + + network := res.GetDefaultNodeInfo().Network + if network != cfg.ChainID { + zc.logger.Warn(). + Str("got", cfg.ChainID). + Str("want", network). + Msg("Zetacore chain id config mismatch. Forcing update from the network") + + cfg.ChainID = network + if err = zc.UpdateChainID(cfg.ChainID); err != nil { + return errors.Wrap(err, "failed to update chain id") + } + } + + return nil +}