Skip to content

Commit

Permalink
Merge branch 'develop' into solana-stress-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
skosito authored Dec 16, 2024
2 parents 61f8aac + 8710921 commit cb0bb04
Show file tree
Hide file tree
Showing 15 changed files with 803 additions and 312 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
## Refactor

* [3170](https://github.com/zeta-chain/node/pull/3170) - revamp TSS package in zetaclient
* [3291](https://github.com/zeta-chain/node/pull/3291) - revamp zetaclient initialization (+ graceful shutdown)

### Fixes

Expand Down
274 changes: 95 additions & 179 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,18 @@ import (
"net/http"
_ "net/http/pprof" // #nosec G108 -- pprof enablement is intentional
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

"github.com/zeta-chain/node/pkg/authz"
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/graceful"
zetaos "github.com/zeta-chain/node/pkg/os"
"github.com/zeta-chain/node/zetaclient/chains/base"
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/maintenance"
"github.com/zeta-chain/node/zetaclient/metrics"
"github.com/zeta-chain/node/zetaclient/orchestrator"
Expand All @@ -36,119 +30,55 @@ const (
envPprofAddr = "PPROF_ADDR"
)

// Start starts zetaclientd process todo revamp
// https://github.com/zeta-chain/node/issues/3112
// Start starts zetaclientd process
func Start(_ *cobra.Command, _ []string) error {
// Prompt for Hotkey, TSS key-share and relayer key passwords
titles := []string{"HotKey", "TSS", "Solana Relayer Key"}
passwords, err := zetaos.PromptPasswords(titles)
if err != nil {
return errors.Wrap(err, "unable to get passwords")
}
hotkeyPass, tssKeyPass, solanaKeyPass := passwords[0], passwords[1], passwords[2]
relayerKeyPasswords := map[string]string{
chains.Network_solana.String(): solanaKeyPass,
}

// Load Config file given path
cfg, err := config.Load(globalOpts.ZetacoreHome)
if err != nil {
return err
return errors.Wrap(err, "unable to load config")
}

logger, err := base.InitLogger(cfg)
dbPath, err := config.ResolveDBPath()
if err != nil {
return errors.Wrap(err, "initLogger failed")
return errors.Wrap(err, "unable to resolve db path")
}

masterLogger := logger.Std
startLogger := logger.Std.With().Str("module", "startup").Logger()

appContext := zctx.New(cfg, relayerKeyPasswords, masterLogger)
ctx := zctx.WithAppContext(context.Background(), appContext)

// Wait until zetacore is up
waitForZetaCore(cfg, startLogger)
startLogger.Info().Msgf("Zetacore is ready, trying to connect to %s", cfg.Peer)

telemetryServer := metrics.NewTelemetryServer()
go func() {
err := telemetryServer.Start()
if err != nil {
startLogger.Error().Err(err).Msg("telemetryServer error")
panic("telemetryServer error")
}
}()

go runPprof(startLogger)

// CreateZetacoreClient: zetacore client is used for all communication to zetacore , which this client connects to.
// Zetacore accumulates votes , and provides a centralized source of truth for all clients
zetacoreClient, err := createZetacoreClient(cfg, hotkeyPass, masterLogger)
// Configure logger (also overrides the default log level)
logger, err := base.NewLogger(cfg)
if err != nil {
return errors.Wrap(err, "unable to create zetacore client")
return errors.Wrap(err, "unable to create logger")
}

// Wait until zetacore is ready to create blocks
if err = waitForZetacoreToCreateBlocks(ctx, zetacoreClient, startLogger); err != nil {
startLogger.Error().Err(err).Msg("WaitForZetacoreToCreateBlocks error")
return err
}
startLogger.Info().Msgf("Zetacore client is ready")

// Set grantee account number and sequence number
err = zetacoreClient.SetAccountNumber(authz.ZetaClientGranteeKey)
passes, err := promptPasswords()
if err != nil {
startLogger.Error().Err(err).Msg("SetAccountNumber error")
return err
return errors.Wrap(err, "unable to prompt for passwords")
}

// cross-check chainid
res, err := zetacoreClient.GetNodeInfo(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("GetNodeInfo error")
return err
}
appContext := zctx.New(cfg, passes.relayerKeys(), logger.Std)
ctx := zctx.WithAppContext(context.Background(), appContext)

if strings.Compare(res.GetDefaultNodeInfo().Network, cfg.ChainID) != 0 {
startLogger.Warn().
Msgf("chain id mismatch, zetacore chain id %s, zetaclient configured chain id %s; reset zetaclient chain id", res.GetDefaultNodeInfo().Network, cfg.ChainID)
cfg.ChainID = res.GetDefaultNodeInfo().Network
err := zetacoreClient.UpdateChainID(cfg.ChainID)
if err != nil {
return err
}
telemetry, err := startTelemetry(ctx, cfg)
if err != nil {
return errors.Wrap(err, "unable to start telemetry")
}

// CreateAuthzSigner : which is used to sign all authz messages . All votes broadcast to zetacore are wrapped in authz exec .
// This is to ensure that the user does not need to keep their operator key online , and can use a cold key to sign votes
signerAddress, err := zetacoreClient.GetKeys().GetAddress()
// zetacore client is used for all communication to zeta node.
// it accumulates votes, and provides a source of truth for all clients
//
// This call crated client, ensured block production, then prepares the client
zetacoreClient, err := zetacore.NewFromConfig(ctx, &cfg, passes.hotkey, logger.Std)
if err != nil {
return errors.Wrap(err, "error getting signer address")
return errors.Wrap(err, "unable to create zetacore client from config")
}

createAuthzSigner(zetacoreClient.GetKeys().GetOperatorAddress().String(), signerAddress)
startLogger.Debug().Msgf("createAuthzSigner is ready")

// Initialize core parameters from zetacore
if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, startLogger); err != nil {
if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, logger.Std); err != nil {
return errors.Wrap(err, "unable to update app context")
}

startLogger.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked())

m, err := metrics.NewMetrics()
if err != nil {
return errors.Wrap(err, "unable to create metrics")
}
m.Start()

metrics.Info.WithLabelValues(constant.Version).Set(1)
metrics.LastStartTime.SetToCurrentTime()

telemetryServer.SetIPAddress(cfg.PublicIP)
log.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked())

granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, hotkeyPass)
granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, passes.hotkey)
if err != nil {
return errors.Wrap(err, "unable to resolve observer pub key bech32")
}
Expand All @@ -157,44 +87,34 @@ func Start(_ *cobra.Command, _ []string) error {
Config: cfg,
Zetacore: zetacoreClient,
GranteePubKeyBech32: granteePubKeyBech32,
HotKeyPassword: hotkeyPass,
TSSKeyPassword: tssKeyPass,
HotKeyPassword: passes.hotkey,
TSSKeyPassword: passes.tss,
BitcoinChainIDs: btcChainIDsFromContext(appContext),
PostBlame: isEnvFlagEnabled(envFlagPostBlame),
Telemetry: telemetryServer,
Telemetry: telemetry,
}

tss, err := zetatss.Setup(ctx, tssSetupProps, startLogger)
tss, err := zetatss.Setup(ctx, tssSetupProps, logger.Std)
if err != nil {
return errors.Wrap(err, "unable to setup TSS service")
}

// Creating a channel to listen for os signals (or other signals)
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

// Starts various background TSS listeners.
// Shuts down zetaclientd if any is triggered.
maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() {
masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
signalChannel <- syscall.SIGTERM
})

if len(appContext.ListChainIDs()) == 0 {
startLogger.Error().Interface("config", cfg).Msgf("No chains in updated config")
}

isObserver, err := isObserverNode(ctx, zetacoreClient)
switch {
case err != nil:
startLogger.Error().Msgf("Unable to determine if node is an observer")
return err
return errors.Wrap(err, "unable to check if observer node")
case !isObserver:
addr := zetacoreClient.GetKeys().GetOperatorAddress().String()
startLogger.Info().Str("operator_address", addr).Msg("This node is not an observer. Exit 0")
logger.Std.Warn().Msg("This node is not an observer node. Exit 0")
return nil
}

// Starts various background TSS listeners.
// Shuts down zetaclientd if any is triggered.
maintenance.NewTSSListener(zetacoreClient, logger.Std).Listen(ctx, func() {
logger.Std.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
graceful.ShutdownNow()
})

// CreateSignerMap: This creates a map of all signers for each chain.
// Each signer is responsible for signing transactions for a particular chain
signerMap, err := orchestrator.CreateSignerMap(ctx, tss, logger)
Expand All @@ -203,16 +123,9 @@ func Start(_ *cobra.Command, _ []string) error {
return err
}

userDir, err := os.UserHomeDir()
if err != nil {
log.Error().Err(err).Msg("os.UserHomeDir")
return err
}
dbpath := filepath.Join(userDir, ".zetaclient/chainobserver")

// Creates a map of all chain observers for each chain.
// Each chain observer is responsible for observing events on the chain and processing them.
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbpath, logger, telemetryServer)
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbPath, logger, telemetry)
if err != nil {
return errors.Wrap(err, "unable to create chain observer map")
}
Expand All @@ -226,84 +139,87 @@ func Start(_ *cobra.Command, _ []string) error {
signerMap,
observerMap,
tss,
dbpath,
dbPath,
logger,
telemetryServer,
telemetry,
)
if err != nil {
return errors.Wrap(err, "unable to create orchestrator")
}

// Start orchestrator with all observers and signers
if err = maestro.Start(ctx); err != nil {
return errors.Wrap(err, "unable to start orchestrator")
}
graceful.AddService(ctx, maestro)

// start zeta supply checker
// TODO: enable
// https://github.com/zeta-chain/node/issues/1354
// NOTE: this is disabled for now because we need to determine the frequency on how to handle invalid check
// The method uses GRPC query to the node we might need to improve for performance
//zetaSupplyChecker, err := mc.NewZetaSupplyChecker(cfg, zetacoreClient, masterLogger)
//if err != nil {
// startLogger.Err(err).Msg("NewZetaSupplyChecker")
//}
//if err == nil {
// zetaSupplyChecker.Start()
// defer zetaSupplyChecker.Stop()
//}
// Block current routine until a shutdown signal is received
graceful.WaitForShutdown()

startLogger.Info().Msg("zetaclientd is running")
return nil
}

sig := <-signalChannel
startLogger.Info().Msgf("Stop signal received: %q. Stopping zetaclientd", sig)
type passwords struct {
hotkey string
tss string
solanaRelayerKey string
}

maestro.Stop()
// promptPasswords prompts for Hotkey, TSS key-share and relayer key passwords
func promptPasswords() (passwords, error) {
titles := []string{"HotKey", "TSS", "Solana Relayer Key"}

return nil
res, err := zetaos.PromptPasswords(titles)
if err != nil {
return passwords{}, errors.Wrap(err, "unable to get passwords")
}

return passwords{
hotkey: res[0],
tss: res[1],
solanaRelayerKey: res[2],
}, nil
}

// isObserverNode checks whether THIS node is an observer node.
func isObserverNode(ctx context.Context, client *zetacore.Client) (bool, error) {
observers, err := client.GetObserverList(ctx)
if err != nil {
return false, errors.Wrap(err, "unable to get observers list")
func (p passwords) relayerKeys() map[string]string {
return map[string]string{
chains.Network_solana.String(): p.solanaRelayerKey,
}
}

operatorAddress := client.GetKeys().GetOperatorAddress().String()
func startTelemetry(ctx context.Context, cfg config.Config) (*metrics.TelemetryServer, error) {
// 1. Init pprof http server
pprofServer := func(_ context.Context) error {
addr := os.Getenv(envPprofAddr)
if addr == "" {
addr = "localhost:6061"
}

log.Info().Str("addr", addr).Msg("starting pprof http server")

for _, observer := range observers {
if observer == operatorAddress {
return true, nil
// #nosec G114 -- timeouts unneeded
err := http.ListenAndServe(addr, nil)
if err != nil {
log.Error().Err(err).Msg("pprof http server error")
}
}

return false, nil
}
return nil
}

func resolveObserverPubKeyBech32(cfg config.Config, hotKeyPassword string) (string, error) {
// Get observer's public key ("grantee pub key")
_, granteePubKeyBech32, err := keys.GetKeyringKeybase(cfg, hotKeyPassword)
// 2. Init metrics server
metricsServer, err := metrics.NewMetrics()
if err != nil {
return "", errors.Wrap(err, "unable to get keyring key base")
return nil, errors.Wrap(err, "unable to create metrics")
}

return granteePubKeyBech32, nil
}
metrics.Info.WithLabelValues(constant.Version).Set(1)
metrics.LastStartTime.SetToCurrentTime()

// runPprof run pprof http server
// zetacored/cometbft is already listening for runPprof on 6060 (by default)
func runPprof(logger zerolog.Logger) {
addr := os.Getenv(envPprofAddr)
if addr == "" {
addr = "localhost:6061"
}
// 3. Init telemetry server
telemetry := metrics.NewTelemetryServer()
telemetry.SetIPAddress(cfg.PublicIP)

logger.Info().Str("addr", addr).Msg("starting pprof http server")
// 4. Add services to the process
graceful.AddStarter(ctx, pprofServer)
graceful.AddService(ctx, metricsServer)
graceful.AddService(ctx, telemetry)

// #nosec G114 -- timeouts unneeded
err := http.ListenAndServe(addr, nil)
if err != nil {
logger.Error().Err(err).Msg("pprof http server error")
}
return telemetry, nil
}
Loading

0 comments on commit cb0bb04

Please sign in to comment.