From 6932b13f189690ecdc913073c082a7a9bf26525a Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 18 Jul 2024 16:05:33 +0200 Subject: [PATCH] Move observer map creation to orchestrator --- cmd/zetaclientd/start.go | 2 +- cmd/zetaclientd/utils.go | 139 ------------------ zetaclient/chains/base/observer.go | 30 +++- .../chains/bitcoin/observer/observer.go | 8 + zetaclient/chains/evm/observer/observer.go | 5 + zetaclient/orchestrator/bootstrap.go | 111 ++++++++++++++ 6 files changed, 150 insertions(+), 145 deletions(-) diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index a2973e82fd..176e1b5847 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -282,7 +282,7 @@ func start(_ *cobra.Command, _ []string) error { dbpath := filepath.Join(userDir, ".zetaclient/chainobserver") // Creates a map of all chain observers for each chain. Each chain observer is responsible for observing events on the chain and processing them. - observerMap, err := CreateChainObserverMap(ctx, appContext, zetacoreClient, tss, dbpath, logger, telemetryServer) + observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbpath, logger, telemetryServer) if err != nil { startLogger.Err(err).Msg("CreateChainObserverMap") return err diff --git a/cmd/zetaclientd/utils.go b/cmd/zetaclientd/utils.go index 24c3db3e8f..b25de0a2b5 100644 --- a/cmd/zetaclientd/utils.go +++ b/cmd/zetaclientd/utils.go @@ -1,33 +1,15 @@ package main import ( - gocontext "context" - "fmt" - - "cosmossdk.io/errors" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/ethereum/go-ethereum/ethclient" "github.com/rs/zerolog" - "github.com/zeta-chain/zetacore/pkg/chains" - observertypes "github.com/zeta-chain/zetacore/x/observer/types" "github.com/zeta-chain/zetacore/zetaclient/authz" - "github.com/zeta-chain/zetacore/zetaclient/chains/base" - btcobserver "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/observer" - btcrpc "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/rpc" - evmobserver "github.com/zeta-chain/zetacore/zetaclient/chains/evm/observer" - "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" "github.com/zeta-chain/zetacore/zetaclient/config" - "github.com/zeta-chain/zetacore/zetaclient/context" - "github.com/zeta-chain/zetacore/zetaclient/db" "github.com/zeta-chain/zetacore/zetaclient/keys" - "github.com/zeta-chain/zetacore/zetaclient/metrics" "github.com/zeta-chain/zetacore/zetaclient/zetacore" ) -// backwards compatibility -const btcDatabaseFilename = "btc_chain_client" - func CreateAuthzSigner(granter string, grantee sdk.AccAddress) { authz.SetupAuthZSignerList(granter, grantee) } @@ -59,124 +41,3 @@ func CreateZetacoreClient(cfg config.Config, hotkeyPassword string, logger zerol return client, nil } - -// CreateChainObserverMap creates a map of ChainObservers for all chains in the config -func CreateChainObserverMap( - ctx gocontext.Context, - appContext *context.AppContext, - zetacoreClient *zetacore.Client, - tss interfaces.TSSSigner, - dbpath string, - logger base.Logger, - ts *metrics.TelemetryServer, -) (map[int64]interfaces.ChainObserver, error) { - observerMap := make(map[int64]interfaces.ChainObserver) - // EVM observers - for _, evmConfig := range appContext.Config().GetAllEVMConfigs() { - if evmConfig.Chain.IsZetaChain() { - continue - } - chainParams, found := appContext.GetEVMChainParams(evmConfig.Chain.ChainId) - if !found { - logger.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String()) - continue - } - - // create EVM client - evmClient, err := ethclient.Dial(evmConfig.Endpoint) - if err != nil { - logger.Std.Error().Err(err).Msgf("error dailing endpoint %q", evmConfig.Endpoint) - continue - } - - chainName := evmConfig.Chain.ChainName.String() - - database, err := db.NewFromSqlite(dbpath, chainName, true) - if err != nil { - logger.Std.Error().Err(err).Msgf("Unable to open a database for EVM chain %q", chainName) - } - - // create EVM chain observer - observer, err := evmobserver.NewObserver( - ctx, - evmConfig, - evmClient, - *chainParams, - zetacoreClient, - tss, - database, - logger, - ts, - ) - if err != nil { - logger.Std.Error().Err(err).Msgf("NewObserver error for EVM chain %s", evmConfig.Chain.String()) - continue - } - observerMap[evmConfig.Chain.ChainId] = observer - } - - // create BTC chain observer - btcChain, btcConfig, btcEnabled := appContext.GetBTCChainAndConfig() - if btcEnabled { - _, chainParams, found := appContext.GetBTCChainParams() - if !found { - return nil, fmt.Errorf("BTC is enabled, but chains params not found") - } - - btcObserver, err := createBTCObserver( - dbpath, - btcConfig, - btcChain, - *chainParams, - zetacoreClient, - tss, - logger, - ts, - ) - if err != nil { - logger.Std.Error().Err(err).Msgf("NewObserver error for BTC chain %s", btcChain.ChainName.String()) - } else { - observerMap[btcChain.ChainId] = btcObserver - } - } - - return observerMap, nil -} - -func createBTCObserver( - dbPath string, - cfg config.BTCConfig, - chain chains.Chain, - chainParams observertypes.ChainParams, - client *zetacore.Client, - tss interfaces.TSSSigner, - logger base.Logger, - ts *metrics.TelemetryServer, -) (*btcobserver.Observer, error) { - btcClient, err := btcrpc.NewRPCClient(cfg) - if err != nil { - return nil, errors.Wrap(err, "unable to create rpc client for BTC chain") - } - - database, err := db.NewFromSqlite(dbPath, btcDatabaseFilename, true) - if err != nil { - return nil, errors.Wrap(err, "unable to open a database for BTC chain") - } - - // create BTC chain observer - observer, err := btcobserver.NewObserver( - chain, - btcClient, - chainParams, - client, - tss, - database, - logger, - ts, - ) - if err != nil { - return nil, errors.Wrap(err, "unable to create observer for BTC chain") - } - - return observer, nil -} diff --git a/zetaclient/chains/base/observer.go b/zetaclient/chains/base/observer.go index 6fbc735e5d..314185b3c1 100644 --- a/zetaclient/chains/base/observer.go +++ b/zetaclient/chains/base/observer.go @@ -70,7 +70,8 @@ type Observer struct { // mu protects fields from concurrent access // Note: base observer simply provides the mutex. It's the sub-struct's responsibility to use it to be thread-safe - mu *sync.Mutex + mu *sync.Mutex + started bool // stop is the channel to signal the observer to stop stop chan struct{} @@ -120,17 +121,36 @@ func NewObserver( return &ob, nil } +// Start starts the observer. Returns true if the observer was already started (noop). +func (ob *Observer) Start() bool { + ob.mu.Lock() + defer ob.Mu().Unlock() + + // noop + if ob.started { + return true + } + + ob.started = true + + return false +} + // Stop notifies all goroutines to stop and closes the database. func (ob *Observer) Stop() { ob.logger.Chain.Info().Msgf("observer is stopping for chain %d", ob.Chain().ChainId) close(ob.stop) + ob.mu.Lock() + defer ob.mu.Unlock() + + ob.started = false + // close database - if ob.db != nil { - if err := ob.db.Close(); err != nil { - ob.Logger().Chain.Error().Err(err).Msgf("unable to close db for chain %d", ob.Chain().ChainId) - } + if err := ob.db.Close(); err != nil { + ob.Logger().Chain.Error().Err(err).Msgf("unable to close db for chain %d", ob.Chain().ChainId) } + ob.Logger().Chain.Info().Msgf("observer stopped for chain %d", ob.Chain().ChainId) } diff --git a/zetaclient/chains/bitcoin/observer/observer.go b/zetaclient/chains/bitcoin/observer/observer.go index 831488a76a..9d85d144a9 100644 --- a/zetaclient/chains/bitcoin/observer/observer.go +++ b/zetaclient/chains/bitcoin/observer/observer.go @@ -200,6 +200,14 @@ func (ob *Observer) GetChainParams() observertypes.ChainParams { // Start starts the Go routine processes to observe the Bitcoin chain func (ob *Observer) Start(ctx context.Context) { + ob.Mu().Lock() + defer ob.Mu().Unlock() + + if noop := ob.Observer.Start(); noop { + ob.Logger().Chain.Info().Msgf("observer is already started for chain %d", ob.Chain().ChainId) + return + } + ob.Logger().Chain.Info().Msgf("observer is starting for chain %d", ob.Chain().ChainId) // watch bitcoin chain for incoming txs and post votes to zetacore diff --git a/zetaclient/chains/evm/observer/observer.go b/zetaclient/chains/evm/observer/observer.go index aac28b7ffc..955ece2264 100644 --- a/zetaclient/chains/evm/observer/observer.go +++ b/zetaclient/chains/evm/observer/observer.go @@ -167,6 +167,11 @@ func FetchZetaTokenContract( // Start all observation routines for the evm chain func (ob *Observer) Start(ctx context.Context) { + if noop := ob.Observer.Start(); noop { + ob.Logger().Chain.Info().Msgf("observer is already started for chain %d", ob.Chain().ChainId) + return + } + ob.Logger().Chain.Info().Msgf("observer is starting for chain %d", ob.Chain().ChainId) bg.Work(ctx, ob.WatchInbound, bg.WithName("WatchInbound"), bg.WithLogger(ob.Logger().Inbound)) diff --git a/zetaclient/orchestrator/bootstrap.go b/zetaclient/orchestrator/bootstrap.go index d739492f0a..19af9795ed 100644 --- a/zetaclient/orchestrator/bootstrap.go +++ b/zetaclient/orchestrator/bootstrap.go @@ -2,20 +2,30 @@ package orchestrator import ( "context" + "fmt" ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/zeta-chain/zetacore/zetaclient/chains/base" + btcobserver "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/observer" + "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/rpc" btcsigner "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/signer" + evmobserver "github.com/zeta-chain/zetacore/zetaclient/chains/evm/observer" evmsigner "github.com/zeta-chain/zetacore/zetaclient/chains/evm/signer" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" "github.com/zeta-chain/zetacore/zetaclient/config" zctx "github.com/zeta-chain/zetacore/zetaclient/context" + "github.com/zeta-chain/zetacore/zetaclient/db" "github.com/zeta-chain/zetacore/zetaclient/metrics" + "github.com/zeta-chain/zetacore/zetaclient/zetacore" ) +// backwards compatibility +const btcDatabaseFilename = "btc_chain_client" + // CreateSignerMap creates a map of interfaces.ChainSigner (by chainID) for all chains in the config. // Note that signer construction failure for a chain does not prevent the creation of signers for other chains. func CreateSignerMap( @@ -168,3 +178,104 @@ func (m *signerMap) unsetMissing(enabledChains []int64, logger zerolog.Logger) i return removed } + +// CreateChainObserverMap creates a map of ChainObservers for all chains in the config +func CreateChainObserverMap( + ctx context.Context, + client *zetacore.Client, + tss interfaces.TSSSigner, + dbpath string, + logger base.Logger, + ts *metrics.TelemetryServer, +) (map[int64]interfaces.ChainObserver, error) { + observerMap := make(map[int64]interfaces.ChainObserver) + + app, err := zctx.FromContext(ctx) + if err != nil { + return nil, errors.Wrapf(err, "failed to get app context") + } + + // EVM observers + for _, evmConfig := range app.Config().GetAllEVMConfigs() { + if evmConfig.Chain.IsZetaChain() { + continue + } + + chainParams, found := app.GetEVMChainParams(evmConfig.Chain.ChainId) + if !found { + logger.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String()) + continue + } + + // create EVM client + evmClient, err := ethclient.Dial(evmConfig.Endpoint) + if err != nil { + logger.Std.Error().Err(err).Msgf("error dailing endpoint %q", evmConfig.Endpoint) + continue + } + + chainName := evmConfig.Chain.ChainName.String() + + database, err := db.NewFromSqlite(dbpath, chainName, true) + if err != nil { + logger.Std.Error().Err(err).Msgf("Unable to open a database for EVM chain %q", chainName) + } + + // create EVM chain observer + observer, err := evmobserver.NewObserver( + ctx, + evmConfig, + evmClient, + *chainParams, + client, + tss, + database, + logger, + ts, + ) + if err != nil { + logger.Std.Error().Err(err).Msgf("NewObserver error for EVM chain %s", evmConfig.Chain.String()) + continue + } + observerMap[evmConfig.Chain.ChainId] = observer + } + + // create BTC chain observer + btcChain, btcConfig, btcEnabled := app.GetBTCChainAndConfig() + if !btcEnabled { + return observerMap, nil + } + + _, btcChainParams, found := app.GetBTCChainParams() + if !found { + return nil, fmt.Errorf("BTC is enabled, but chains params not found") + } + + btcClient, err := rpc.NewRPCClient(btcConfig) + if err != nil { + return nil, errors.Wrap(err, "unable to create rpc client for BTC chain") + } + + btcDatabase, err := db.NewFromSqlite(dbpath, btcDatabaseFilename, true) + if err != nil { + return nil, errors.Wrap(err, "unable to open a database for BTC chain") + } + + btcObserver, err := btcobserver.NewObserver( + btcChain, + btcClient, + *btcChainParams, + client, + tss, + btcDatabase, + logger, + ts, + ) + if err != nil { + logger.Std.Error().Err(err).Msgf("NewObserver error for BTC chain %s", btcChain.ChainName.String()) + } else { + observerMap[btcChain.ChainId] = btcObserver + } + + return observerMap, nil +}