Skip to content

Commit

Permalink
feat(zetaclient)!: support for runtime chain (de)provisioning (#2497)
Browse files Browse the repository at this point in the history
* Simplify orchestrator construction

* Implement signers provision / deprovision based on AppContext state

* Add db package

* Fix observers so they can use *db.DB

* Move observer map creation to orchestrator

* Move observer.Start() to orchestrator. Shutdown zetaclient if not an observer

* Implement BTC & EVM RPCs as httptest wrapper

* Implement observer map sync based on chainParams

* Implement observer & signer sync worker

* Respect chainParam.IsSupported

* Update readme

* Fix conflicts

* Fix SOL observer

* Address PR comments [1]

* Address PR comments [2]

* Address PR comments [3]

* Update orchestrator sync cadence

* Address PR comments [4]
  • Loading branch information
swift1337 authored Jul 22, 2024
1 parent e730d3a commit b2bc8a9
Show file tree
Hide file tree
Showing 34 changed files with 2,036 additions and 934 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* [2372](https://github.com/zeta-chain/node/pull/2372) - add queries for tss fund migration info
* [2416](https://github.com/zeta-chain/node/pull/2416) - add Solana chain information
* [2465](https://github.com/zeta-chain/node/pull/2465) - add Solana inbound SOL token observation
* [2497](https://github.com/zeta-chain/node/pull/2416) - support for runtime chain (de)provisioning
* [2518](https://github.com/zeta-chain/node/pull/2518) - add support for Solana address in zetacore

### Refactor
Expand Down
83 changes: 48 additions & 35 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/zeta-chain/zetacore/zetaclient/metrics"
"github.com/zeta-chain/zetacore/zetaclient/orchestrator"
mc "github.com/zeta-chain/zetacore/zetaclient/tss"
"github.com/zeta-chain/zetacore/zetaclient/zetacore"
)

var StartCmd = &cobra.Command{
Expand Down Expand Up @@ -74,7 +75,7 @@ func start(_ *cobra.Command, _ []string) error {
}

masterLogger := logger.Std
startLogger := masterLogger.With().Str("module", "startup").Logger()
startLogger := logger.Std.With().Str("module", "startup").Logger()

appContext := zctx.New(cfg, masterLogger)
ctx := zctx.WithAppContext(context.Background(), appContext)
Expand Down Expand Up @@ -269,23 +270,22 @@ func start(_ *cobra.Command, _ []string) error {
startLogger.Error().Msgf("No chains enabled in updated config %s ", cfg.String())
}

observerList, err := zetacoreClient.GetObserverList(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("GetObserverList error")
isObserver, err := isObserverNode(ctx, zetacoreClient)
switch {
case err != nil:
startLogger.Error().Msgf("Unable to determine if node is an observer")
return err
}
isNodeActive := false
for _, observer := range observerList {
if observer == zetacoreClient.GetKeys().GetOperatorAddress().String() {
isNodeActive = true
break
}
case !isObserver:
addr := zetacoreClient.GetKeys().GetOperatorAddress().String()
startLogger.Info().Str("operator_address", addr).Msg("This node is not an observer. Exit 0")
return nil
}

// CreateSignerMap: This creates a map of all signers for each chain . Each signer is responsible for signing transactions for a particular chain
signerMap, err := CreateSignerMap(ctx, appContext, tss, logger, telemetryServer)
// CreateSignerMap: This creates a map of all signers for each chain.
// Each signer is responsible for signing transactions for a particular chain
signerMap, err := orchestrator.CreateSignerMap(ctx, tss, logger, telemetryServer)
if err != nil {
log.Error().Err(err).Msg("CreateSignerMap")
log.Error().Err(err).Msg("Unable to create signer map")
return err
}

Expand All @@ -296,35 +296,34 @@ func start(_ *cobra.Command, _ []string) error {
}
dbpath := filepath.Join(userDir, ".zetaclient/chainobserver")

// Creates a map of all chain observers for each chain. Each chain observer is responsible for observing events on the chain and processing them.
observerMap, err := CreateChainObserverMap(ctx, appContext, zetacoreClient, tss, dbpath, logger, telemetryServer)
// Creates a map of all chain observers for each chain.
// Each chain observer is responsible for observing events on the chain and processing them.
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbpath, logger, telemetryServer)
if err != nil {
startLogger.Err(err).Msg("CreateChainObserverMap")
return err
}

if !isNodeActive {
startLogger.Error().
Msgf("Node %s is not an active observer external chain observers will not be started", zetacoreClient.GetKeys().GetOperatorAddress().String())
} else {
startLogger.Debug().Msgf("Node %s is an active observer starting external chain observers", zetacoreClient.GetKeys().GetOperatorAddress().String())
for _, observer := range observerMap {
observer.Start(ctx)
}
}

// Orchestrator wraps the zetacore client and adds the observers and signer maps to it . This is the high level object used for CCTX interactions
orchestrator := orchestrator.NewOrchestrator(
// Orchestrator wraps the zetacore client and adds the observers and signer maps to it.
// This is the high level object used for CCTX interactions
maestro, err := orchestrator.New(
ctx,
zetacoreClient,
signerMap,
observerMap,
masterLogger,
tss,
dbpath,
logger,
telemetryServer,
)
err = orchestrator.MonitorCore(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("Orchestrator failed to start")
startLogger.Error().Err(err).Msg("Unable to create orchestrator")
return err
}

// Start orchestrator with all observers and signers
if err := maestro.Start(ctx); err != nil {
startLogger.Error().Err(err).Msg("Unable to start orchestrator")
return err
}

Expand All @@ -348,10 +347,6 @@ func start(_ *cobra.Command, _ []string) error {
sig := <-ch
startLogger.Info().Msgf("stop signal received: %s", sig)

// stop chain observers
for _, observer := range observerMap {
observer.Stop()
}
zetacoreClient.Stop()

return nil
Expand Down Expand Up @@ -415,3 +410,21 @@ func promptPasswords() (string, string, error) {

return hotKeyPass, TSSKeyPass, err
}

// isObserverNode checks whether THIS node is an observer node.
func isObserverNode(ctx context.Context, client *zetacore.Client) (bool, error) {
observers, err := client.GetObserverList(ctx)
if err != nil {
return false, errors.Wrap(err, "unable to get observers list")
}

operatorAddress := client.GetKeys().GetOperatorAddress().String()

for _, observer := range observers {
if observer == operatorAddress {
return true, nil
}
}

return false, nil
}
196 changes: 1 addition & 195 deletions cmd/zetaclientd/utils.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,12 @@
package main

import (
gocontext "context"
"fmt"

sdk "github.com/cosmos/cosmos-sdk/types"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
solrpc "github.com/gagliardetto/solana-go/rpc"
"github.com/rs/zerolog"

"github.com/zeta-chain/zetacore/zetaclient/authz"
"github.com/zeta-chain/zetacore/zetaclient/chains/base"
btcobserver "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/observer"
btcrpc "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/rpc"
btcsigner "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/signer"
evmobserver "github.com/zeta-chain/zetacore/zetaclient/chains/evm/observer"
evmsigner "github.com/zeta-chain/zetacore/zetaclient/chains/evm/signer"
"github.com/zeta-chain/zetacore/zetaclient/chains/interfaces"
solanaobserver "github.com/zeta-chain/zetacore/zetaclient/chains/solana/observer"
"github.com/zeta-chain/zetacore/zetaclient/config"
"github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/keys"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
"github.com/zeta-chain/zetacore/zetaclient/zetacore"
)

Expand Down Expand Up @@ -58,182 +42,4 @@ func CreateZetacoreClient(cfg config.Config, hotkeyPassword string, logger zerol
return client, nil
}

// CreateSignerMap creates a map of ChainSigners for all chains in the config
func CreateSignerMap(
ctx gocontext.Context,
appContext *context.AppContext,
tss interfaces.TSSSigner,
logger base.Logger,
ts *metrics.TelemetryServer,
) (map[int64]interfaces.ChainSigner, error) {
signerMap := make(map[int64]interfaces.ChainSigner)

// EVM signers
for _, evmConfig := range appContext.Config().GetAllEVMConfigs() {
if evmConfig.Chain.IsZetaChain() {
continue
}
evmChainParams, found := appContext.GetEVMChainParams(evmConfig.Chain.ChainId)
if !found {
logger.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String())
continue
}

chainName := evmConfig.Chain.ChainName.String()
mpiAddress := ethcommon.HexToAddress(evmChainParams.ConnectorContractAddress)
erc20CustodyAddress := ethcommon.HexToAddress(evmChainParams.Erc20CustodyContractAddress)

signer, err := evmsigner.NewSigner(
ctx,
evmConfig.Chain,
tss,
ts,
logger,
evmConfig.Endpoint,
config.GetConnectorABI(),
config.GetERC20CustodyABI(),
mpiAddress,
erc20CustodyAddress,
)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewSigner error for EVM chain %q", chainName)
continue
}

signerMap[evmConfig.Chain.ChainId] = signer
logger.Std.Info().Msgf("NewSigner succeeded for EVM chain %q", chainName)
}

// BTC signer
btcChain, btcConfig, btcEnabled := appContext.GetBTCChainAndConfig()
if btcEnabled {
chainName := btcChain.ChainName.String()

signer, err := btcsigner.NewSigner(btcChain, tss, ts, logger, btcConfig)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewSigner error for BTC chain %q", chainName)
} else {
signerMap[btcChain.ChainId] = signer
logger.Std.Info().Msgf("NewSigner succeeded for BTC chain %q", chainName)
}
}

return signerMap, nil
}

// CreateChainObserverMap creates a map of ChainObservers for all chains in the config
func CreateChainObserverMap(
ctx gocontext.Context,
appContext *context.AppContext,
zetacoreClient *zetacore.Client,
tss interfaces.TSSSigner,
dbpath string,
logger base.Logger,
ts *metrics.TelemetryServer,
) (map[int64]interfaces.ChainObserver, error) {
observerMap := make(map[int64]interfaces.ChainObserver)
// EVM observers
for _, evmConfig := range appContext.Config().GetAllEVMConfigs() {
if evmConfig.Chain.IsZetaChain() {
continue
}
chainParams, found := appContext.GetEVMChainParams(evmConfig.Chain.ChainId)
if !found {
logger.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String())
continue
}

// create EVM client
evmClient, err := ethclient.Dial(evmConfig.Endpoint)
if err != nil {
logger.Std.Error().Err(err).Msgf("error dailing endpoint %q", evmConfig.Endpoint)
continue
}

// create EVM chain observer
observer, err := evmobserver.NewObserver(
ctx,
evmConfig,
evmClient,
*chainParams,
zetacoreClient,
tss,
dbpath,
logger,
ts,
)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewObserver error for evm chain %s", evmConfig.Chain.String())
continue
}
observerMap[evmConfig.Chain.ChainId] = observer
}

// BTC observer
_, btcChainParams, found := appContext.GetBTCChainParams()
if !found {
return nil, fmt.Errorf("bitcoin chains params not found")
}

// create BTC chain observer
btcChain, btcConfig, enabled := appContext.GetBTCChainAndConfig()
if enabled {
btcClient, err := btcrpc.NewRPCClient(btcConfig)
if err != nil {
logger.Std.Error().Err(err).Msgf("error creating rpc client for bitcoin chain %s", btcChain.String())
} else {
// create BTC chain observer
observer, err := btcobserver.NewObserver(
btcChain,
btcClient,
*btcChainParams,
zetacoreClient,
tss,
dbpath,
logger,
ts,
)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewObserver error for bitcoin chain %s", btcChain.String())
} else {
observerMap[btcChain.ChainId] = observer
}
}
}

// Solana chain params
_, solChainParams, found := appContext.GetSolanaChainParams()
if !found {
logger.Std.Error().Msg("solana chain params not found")
return observerMap, nil
}

// create Solana chain observer
solChain, solConfig, enabled := appContext.GetSolanaChainAndConfig()
if enabled {
rpcClient := solrpc.New(solConfig.Endpoint)
if rpcClient == nil {
// should never happen
logger.Std.Error().Msg("solana create Solana client error")
return observerMap, nil
}

observer, err := solanaobserver.NewObserver(
solChain,
rpcClient,
*solChainParams,
zetacoreClient,
tss,
dbpath,
logger,
ts,
)
if err != nil {
logger.Std.Error().Err(err).Msg("NewObserver error for solana chain")
} else {
observerMap[solChainParams.ChainId] = observer
}
}

return observerMap, nil
}
// TODO
17 changes: 17 additions & 0 deletions pkg/ptr/ptr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Package ptr provides helper functions for working with pointers.
package ptr

// Ptr returns a pointer to the value passed in.
func Ptr[T any](value T) *T {
return &value
}

// Deref returns the value of the pointer passed in, or the zero value of the type if the pointer is nil.
func Deref[T any](value *T) T {
var out T
if value != nil {
out = *value
}

return out
}
Loading

0 comments on commit b2bc8a9

Please sign in to comment.