Skip to content

Commit

Permalink
refactor(zetaclient): move app context update from zetacore client (#…
Browse files Browse the repository at this point in the history
…3131)

* Refactor context update worker

* Drop zetaclient.GetBlockHeaderChainState

* Move waitForZetacoreToCreateBlocks from zetacore client

* Update changelog

* Rollback upgrade plan return values semantics

* Rearrange start() sequence to fix TSS

* Address PR comments
  • Loading branch information
swift1337 authored Nov 12, 2024
1 parent f8df512 commit 47b0323
Show file tree
Hide file tree
Showing 17 changed files with 510 additions and 437 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [3118](https://github.com/zeta-chain/node/pull/3118) - zetaclient: remove hsm signer
* [3122](https://github.com/zeta-chain/node/pull/3122) - improve & refactor zetaclientd cli
* [3125](https://github.com/zeta-chain/node/pull/3125) - drop support for header proofs
* [3131](https://github.com/zeta-chain/node/pull/3131) - move app context update from zetacore client
* [3137](https://github.com/zeta-chain/node/pull/3137) - remove chain.Chain from zetaclientd config

### Fixes
Expand Down
4 changes: 3 additions & 1 deletion cmd/zetaclientd/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/orchestrator"
"github.com/zeta-chain/node/zetaclient/zetacore"
)

Expand Down Expand Up @@ -69,7 +70,8 @@ func InboundGetBallot(_ *cobra.Command, args []string) error {
appContext := zctx.New(cfg, nil, zerolog.Nop())
ctx := zctx.WithAppContext(context.Background(), appContext)

if err := client.UpdateAppContext(ctx, appContext, zerolog.Nop()); err != nil {
err = orchestrator.UpdateAppContext(ctx, appContext, client, zerolog.Nop())
if err != nil {
return errors.Wrap(err, "failed to update app context")
}

Expand Down
82 changes: 46 additions & 36 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/constant"
zetaos "github.com/zeta-chain/node/pkg/os"
"github.com/zeta-chain/node/pkg/ticker"
observerTypes "github.com/zeta-chain/node/x/observer/types"
"github.com/zeta-chain/node/zetaclient/chains/base"
"github.com/zeta-chain/node/zetaclient/config"
Expand Down Expand Up @@ -100,7 +101,7 @@ func Start(_ *cobra.Command, _ []string) error {
}

// Wait until zetacore is ready to create blocks
if err = zetacoreClient.WaitForZetacoreToCreateBlocks(ctx); err != nil {
if err = waitForZetacoreToCreateBlocks(ctx, zetacoreClient, startLogger); err != nil {
startLogger.Error().Err(err).Msg("WaitForZetacoreToCreateBlocks error")
return err
}
Expand Down Expand Up @@ -141,14 +142,12 @@ func Start(_ *cobra.Command, _ []string) error {
startLogger.Debug().Msgf("createAuthzSigner is ready")

// Initialize core parameters from zetacore
if err = zetacoreClient.UpdateAppContext(ctx, appContext, startLogger); err != nil {
if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, startLogger); err != nil {
return errors.Wrap(err, "unable to update app context")
}

startLogger.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked())

go zetacoreClient.UpdateAppContextWorker(ctx, appContext)

// Generate TSS address . The Tss address is generated through Keygen ceremony. The TSS key is used to sign all outbound transactions .
// The hotkeyPk is private key for the Hotkey. The Hotkey is used to sign all inbound transactions
// Each node processes a portion of the key stored in ~/.tss by default . Custom location can be specified in config file during init.
Expand Down Expand Up @@ -201,33 +200,36 @@ func Start(_ *cobra.Command, _ []string) error {
}

// Create TSS server
server, err := mc.SetupTSSServer(peers, priKey, preParams, appContext.Config(), tssKeyPass, true, whitelistedPeers)
tssServer, err := mc.SetupTSSServer(
peers,
priKey,
preParams,
appContext.Config(),
tssKeyPass,
true,
whitelistedPeers,
)
if err != nil {
return fmt.Errorf("SetupTSSServer error: %w", err)
}

// Set P2P ID for telemetry
telemetryServer.SetP2PID(server.GetLocalPeerID())
telemetryServer.SetP2PID(tssServer.GetLocalPeerID())

// Creating a channel to listen for os signals (or other signals)
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

// Maintenance workers ============
maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() {
masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
signalChannel <- syscall.SIGTERM
})

go func() {
for {
time.Sleep(30 * time.Second)
ps := server.GetKnownPeers()
ps := tssServer.GetKnownPeers()
metrics.NumConnectedPeers.Set(float64(len(ps)))
telemetryServer.SetConnectedPeers(ps)
}
}()
go func() {
host := server.GetP2PHost()
host := tssServer.GetP2PHost()
pingRTT := make(map[peer.ID]int64)
for {
var wg sync.WaitGroup
Expand All @@ -254,7 +256,7 @@ func Start(_ *cobra.Command, _ []string) error {

// Generate a new TSS if keygen is set and add it into the tss server
// If TSS has already been generated, and keygen was successful ; we use the existing TSS
err = mc.Generate(ctx, masterLogger, zetacoreClient, server)
err = mc.Generate(ctx, zetacoreClient, tssServer, masterLogger)
if err != nil {
return err
}
Expand All @@ -264,7 +266,7 @@ func Start(_ *cobra.Command, _ []string) error {
zetacoreClient,
tssHistoricalList,
hotkeyPass,
server,
tssServer,
)
if err != nil {
startLogger.Error().Err(err).Msg("NewTSS error")
Expand All @@ -279,23 +281,26 @@ func Start(_ *cobra.Command, _ []string) error {

// Wait for TSS keygen to be successful before proceeding, This is a blocking thread only for a new keygen.
// For existing keygen, this should directly proceed to the next step
ticker := time.NewTicker(time.Second * 1)
for range ticker.C {
keyGen := appContext.GetKeygen()
if keyGen.Status != observerTypes.KeygenStatus_KeyGenSuccess {
startLogger.Info().Msgf("Waiting for TSS Keygen to be a success, current status %s", keyGen.Status)
continue
_ = ticker.Run(ctx, time.Second, func(ctx context.Context, t *ticker.Ticker) error {
keygen, err = zetacoreClient.GetKeyGen(ctx)
switch {
case err != nil:
startLogger.Warn().Err(err).Msg("Waiting for TSS Keygen to be a success, got error")
case keygen.Status != observerTypes.KeygenStatus_KeyGenSuccess:
startLogger.Warn().Msgf("Waiting for TSS Keygen to be a success, current status %s", keygen.Status)
default:
t.Stop()
}
break
}

return nil
})

// Update Current TSS value from zetacore, if TSS keygen is successful, the TSS address is set on zeta-core
// Returns err if the RPC call fails as zeta client needs the current TSS address to be set
// This is only needed in case of a new Keygen , as the TSS address is set on zetacore only after the keygen is successful i.e enough votes have been broadcast
currentTss, err := zetacoreClient.GetTSS(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("GetCurrentTSS error")
return err
return errors.Wrap(err, "unable to get current TSS")
}

// Filter supported BTC chain IDs
Expand All @@ -314,6 +319,13 @@ func Start(_ *cobra.Command, _ []string) error {
return err
}

// Starts various background TSS listeners.
// Shuts down zetaclientd if any is triggered.
maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() {
masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
signalChannel <- syscall.SIGTERM
})

if len(appContext.ListChainIDs()) == 0 {
startLogger.Error().Interface("config", cfg).Msgf("No chains in updated config")
}
Expand Down Expand Up @@ -348,12 +360,12 @@ func Start(_ *cobra.Command, _ []string) error {
// Each chain observer is responsible for observing events on the chain and processing them.
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbpath, logger, telemetryServer)
if err != nil {
startLogger.Err(err).Msg("CreateChainObserverMap")
return err
return errors.Wrap(err, "unable to create chain observer map")
}

// Orchestrator wraps the zetacore client and adds the observers and signer maps to it.
// This is the high level object used for CCTX interactions
// It also handles background configuration updates from zetacore
maestro, err := orchestrator.New(
ctx,
zetacoreClient,
Expand All @@ -365,14 +377,12 @@ func Start(_ *cobra.Command, _ []string) error {
telemetryServer,
)
if err != nil {
startLogger.Error().Err(err).Msg("Unable to create orchestrator")
return err
return errors.Wrap(err, "unable to create orchestrator")
}

// Start orchestrator with all observers and signers
if err := maestro.Start(ctx); err != nil {
startLogger.Error().Err(err).Msg("Unable to start orchestrator")
return err
if err = maestro.Start(ctx); err != nil {
return errors.Wrap(err, "unable to start orchestrator")
}

// start zeta supply checker
Expand All @@ -389,12 +399,12 @@ func Start(_ *cobra.Command, _ []string) error {
// defer zetaSupplyChecker.Stop()
//}

startLogger.Info().Msgf("Zetaclientd is running")
startLogger.Info().Msg("zetaclientd is running")

sig := <-signalChannel
startLogger.Info().Msgf("Stop signal received: %q", sig)
startLogger.Info().Msgf("Stop signal received: %q. Stopping zetaclientd", sig)

zetacoreClient.Stop()
maestro.Stop()

return nil
}
Expand Down
30 changes: 30 additions & 0 deletions cmd/zetaclientd/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"net"
"strings"
Expand All @@ -13,6 +14,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/zeta-chain/node/zetaclient/authz"
"github.com/zeta-chain/node/zetaclient/chains/interfaces"
"github.com/zeta-chain/node/zetaclient/config"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/zetacore"
Expand Down Expand Up @@ -71,6 +73,34 @@ func waitForZetaCore(config config.Config, logger zerolog.Logger) {
}
}

func waitForZetacoreToCreateBlocks(ctx context.Context, zc interfaces.ZetacoreClient, logger zerolog.Logger) error {
const (
interval = 5 * time.Second
attempts = 15
)

var (
retryCount = 0
start = time.Now()
)

for {
blockHeight, err := zc.GetBlockHeight(ctx)
if err == nil && blockHeight > 1 {
logger.Info().Msgf("Zeta block height: %d", blockHeight)
return nil
}

retryCount++
if retryCount > attempts {
return fmt.Errorf("zetacore is not ready, timeout %s", time.Since(start).String())
}

logger.Info().Msgf("Failed to get block number, retry : %d/%d", retryCount, attempts)
time.Sleep(interval)
}
}

func validatePeer(seedPeer string) error {
parsedPeer := strings.Split(seedPeer, "/")

Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewCometBFTClients(url string) (Clients, error) {
return newClients(clientCtx)
}

// NewGRPCClient creates a Clients which uses gRPC as the transport
// NewGRPCClients creates a Clients which uses gRPC as the transport
func NewGRPCClients(url string, opts ...grpc.DialOption) (Clients, error) {
grpcConn, err := grpc.Dial(url, opts...)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/rpc/clients_cosmos.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import (
"github.com/zeta-chain/node/cmd/zetacored/config"
)

// GetUpgradePlan returns the current upgrade plan.
// if there is no active upgrade plan, plan will be nil, err will be nil as well.
// GetUpgradePlan returns the current upgrade plan or nil if there is no plan.
func (c *Clients) GetUpgradePlan(ctx context.Context) (*upgradetypes.Plan, error) {
in := &upgradetypes.QueryCurrentPlanRequest{}

Expand Down
10 changes: 7 additions & 3 deletions zetaclient/chains/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcommon "github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -102,6 +103,10 @@ type ZetacoreClient interface {
GetLogger() *zerolog.Logger
GetKeys() keyinterfaces.ObserverKeys

GetSupportedChains(ctx context.Context) ([]chains.Chain, error)
GetAdditionalChains(ctx context.Context) ([]chains.Chain, error)
GetChainParams(ctx context.Context) ([]*observertypes.ChainParams, error)

GetKeyGen(ctx context.Context) (observertypes.Keygen, error)
GetTSS(ctx context.Context) (observertypes.TSS, error)
GetTSSHistory(ctx context.Context) ([]observertypes.TSS, error)
Expand Down Expand Up @@ -130,10 +135,9 @@ type ZetacoreClient interface {
GetZetaHotKeyBalance(ctx context.Context) (sdkmath.Int, error)
GetInboundTrackersForChain(ctx context.Context, chainID int64) ([]crosschaintypes.InboundTracker, error)

PostOutboundTracker(ctx context.Context, chainID int64, nonce uint64, txHash string) (string, error)
GetUpgradePlan(ctx context.Context) (*upgradetypes.Plan, error)

Stop()
OnBeforeStop(callback func())
PostOutboundTracker(ctx context.Context, chainID int64, nonce uint64, txHash string) (string, error)
}

// BTCRPCClient is the interface for BTC RPC client
Expand Down
Loading

0 comments on commit 47b0323

Please sign in to comment.