diff --git a/changelog.md b/changelog.md index 221cf5003c..12e9ef07ae 100644 --- a/changelog.md +++ b/changelog.md @@ -20,7 +20,7 @@ * [2097](https://github.com/zeta-chain/node/pull/2097) - refactor lightclient verification flags to account for individual chains * [2071](https://github.com/zeta-chain/node/pull/2071) - Modify chains struct to add all chain related information * [2124](https://github.com/zeta-chain/node/pull/2124) - removed unused variables and method -* [2150](https://github.com/zeta-chain/node/pull/2150) - created `chains` `zetacore` `orchestragor` packages in zetaclient and reorganized source files accordingly. +* [2150](https://github.com/zeta-chain/node/pull/2150) - created `chains` `zetacore` `orchestrator` packages in zetaclient and reorganized source files accordingly. ### Tests diff --git a/cmd/zetaclientd/debug.go b/cmd/zetaclientd/debug.go index ebe00720a7..0c84489844 100644 --- a/cmd/zetaclientd/debug.go +++ b/cmd/zetaclientd/debug.go @@ -97,11 +97,11 @@ func DebugCmd() *cobra.Command { if chains.IsEVMChain(chain.ChainId) { - ob := evm.Client{ + evmObserver := evm.Observer{ Mu: &sync.Mutex{}, } - ob.WithZetacoreClient(client) - ob.WithLogger(chainLogger) + evmObserver.WithZetacoreClient(client) + evmObserver.WithLogger(chainLogger) var ethRPC *ethrpc.EthRPC var client *ethclient.Client coinType := coin.CoinType_Cmd @@ -112,13 +112,13 @@ func DebugCmd() *cobra.Command { if err != nil { return err } - ob.WithEvmClient(client) - ob.WithEvmJSONRPC(ethRPC) - ob.WithChain(*chains.GetChainFromChainID(chainID)) + evmObserver.WithEvmClient(client) + evmObserver.WithEvmJSONRPC(ethRPC) + evmObserver.WithChain(*chains.GetChainFromChainID(chainID)) } } hash := ethcommon.HexToHash(txHash) - tx, isPending, err := ob.TransactionByHash(txHash) + tx, isPending, err := evmObserver.TransactionByHash(txHash) if err != nil { return fmt.Errorf("tx not found on chain %s , %d", err.Error(), chain.ChainId) } @@ -132,7 +132,7 @@ func DebugCmd() *cobra.Command { for _, chainParams := range chainParams { if chainParams.ChainId == chainID { - ob.SetChainParams(observertypes.ChainParams{ + evmObserver.SetChainParams(observertypes.ChainParams{ ChainId: chainID, ConnectorContractAddress: chainParams.ConnectorContractAddress, ZetaTokenContractAddress: chainParams.ZetaTokenContractAddress, @@ -155,19 +155,19 @@ func DebugCmd() *cobra.Command { switch coinType { case coin.CoinType_Zeta: - ballotIdentifier, err = ob.CheckAndVoteInboundTokenZeta(tx, receipt, false) + ballotIdentifier, err = evmObserver.CheckAndVoteInboundTokenZeta(tx, receipt, false) if err != nil { return err } case coin.CoinType_ERC20: - ballotIdentifier, err = ob.CheckAndVoteInboundTokenERC20(tx, receipt, false) + ballotIdentifier, err = evmObserver.CheckAndVoteInboundTokenERC20(tx, receipt, false) if err != nil { return err } case coin.CoinType_Gas: - ballotIdentifier, err = ob.CheckAndVoteInboundTokenGas(tx, receipt, false) + ballotIdentifier, err = evmObserver.CheckAndVoteInboundTokenGas(tx, receipt, false) if err != nil { return err } @@ -176,12 +176,12 @@ func DebugCmd() *cobra.Command { } fmt.Println("CoinType : ", coinType) } else if chains.IsBitcoinChain(chain.ChainId) { - obBtc := bitcoin.Client{ + btcObserver := bitcoin.Observer{ Mu: &sync.Mutex{}, } - obBtc.WithZetaCoreClient(client) - obBtc.WithLogger(chainLogger) - obBtc.WithChain(*chains.GetChainFromChainID(chainID)) + btcObserver.WithZetacoreClient(client) + btcObserver.WithLogger(chainLogger) + btcObserver.WithChain(*chains.GetChainFromChainID(chainID)) connCfg := &rpcclient.ConnConfig{ Host: cfg.BitcoinConfig.RPCHost, User: cfg.BitcoinConfig.RPCUsername, @@ -195,8 +195,8 @@ func DebugCmd() *cobra.Command { if err != nil { return err } - obBtc.WithBtcClient(btcClient) - ballotIdentifier, err = obBtc.CheckReceiptForBtcTxHash(txHash, false) + btcObserver.WithBtcClient(btcClient) + ballotIdentifier, err = btcObserver.CheckReceiptForBtcTxHash(txHash, false) if err != nil { return err } diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index a8d6f4adb2..e6bd9cee8d 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -248,10 +248,10 @@ func start(_ *cobra.Command, _ []string) error { } dbpath := filepath.Join(userDir, ".zetaclient/chainobserver") - // CreateChainClientMap : This creates a map of all chain clients . Each chain client is responsible for listening to events on the chain and processing them - chainClientMap, err := CreateChainClientMap(appContext, coreClient, tss, dbpath, loggers, telemetryServer) + // Creates a map of all chain observers for each chain. Each chain observer is responsible for observing events on the chain and processing them. + observerMap, err := CreateChainObserverMap(appContext, coreClient, tss, dbpath, loggers, telemetryServer) if err != nil { - startLogger.Err(err).Msg("CreateChainClientMap") + startLogger.Err(err).Msg("CreateChainObserverMap") return err } @@ -259,13 +259,13 @@ func start(_ *cobra.Command, _ []string) error { startLogger.Error().Msgf("Node %s is not an active observer external chain observers will not be started", coreClient.GetKeys().GetOperatorAddress().String()) } else { startLogger.Debug().Msgf("Node %s is an active observer starting external chain observers", coreClient.GetKeys().GetOperatorAddress().String()) - for _, v := range chainClientMap { - v.Start() + for _, observer := range observerMap { + observer.Start() } } - // CreateCoreObserver : Core observer wraps the zetacore client and adds the client and signer maps to it . This is the high level object used for CCTX interactions - mo1 := orchestrator.NewOrchestrator(coreClient, signerMap, chainClientMap, masterLogger, telemetryServer) + // Orchestrator wraps the zetacore client and adds the observers and signer maps to it . This is the high level object used for CCTX interactions + mo1 := orchestrator.NewOrchestrator(coreClient, signerMap, observerMap, masterLogger, telemetryServer) mo1.MonitorCore(appContext) // start zeta supply checker @@ -288,9 +288,9 @@ func start(_ *cobra.Command, _ []string) error { sig := <-ch startLogger.Info().Msgf("stop signal received: %s", sig) - // stop zetacore observer - for _, client := range chainClientMap { - client.Stop() + // stop chain observers + for _, observer := range observerMap { + observer.Stop() } coreClient.Stop() diff --git a/cmd/zetaclientd/utils.go b/cmd/zetaclientd/utils.go index a1aebb8969..b6afa1a86f 100644 --- a/cmd/zetaclientd/utils.go +++ b/cmd/zetaclientd/utils.go @@ -99,16 +99,17 @@ func CreateSignerMap( return signerMap, nil } -func CreateChainClientMap( +// CreateChainObserverMap creates a map of ChainObservers for all chains in the config +func CreateChainObserverMap( appContext *appcontext.AppContext, coreClient *zetacore.Client, tss interfaces.TSSSigner, dbpath string, loggers clientcommon.ClientLogger, ts *metrics.TelemetryServer, -) (map[int64]interfaces.ChainClient, error) { - clientMap := make(map[int64]interfaces.ChainClient) - // EVM clients +) (map[int64]interfaces.ChainObserver, error) { + observerMap := make(map[int64]interfaces.ChainObserver) + // EVM observers for _, evmConfig := range appContext.Config().GetAllEVMConfigs() { if evmConfig.Chain.IsZetaChain() { continue @@ -118,24 +119,24 @@ func CreateChainClientMap( loggers.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String()) continue } - co, err := evm.NewClient(appContext, coreClient, tss, dbpath, loggers, evmConfig, ts) + co, err := evm.NewObserver(appContext, coreClient, tss, dbpath, loggers, evmConfig, ts) if err != nil { - loggers.Std.Error().Err(err).Msgf("NewEVMChainClient error for chain %s", evmConfig.Chain.String()) + loggers.Std.Error().Err(err).Msgf("NewObserver error for evm chain %s", evmConfig.Chain.String()) continue } - clientMap[evmConfig.Chain.ChainId] = co + observerMap[evmConfig.Chain.ChainId] = co } - // BTC client + // BTC observer btcChain, btcConfig, enabled := appContext.GetBTCChainAndConfig() if enabled { - co, err := bitcoin.NewClient(appContext, btcChain, coreClient, tss, dbpath, loggers, btcConfig, ts) + co, err := bitcoin.NewObserver(appContext, btcChain, coreClient, tss, dbpath, loggers, btcConfig, ts) if err != nil { - loggers.Std.Error().Err(err).Msgf("NewBitcoinClient error for chain %s", btcChain.String()) + loggers.Std.Error().Err(err).Msgf("NewObserver error for bitcoin chain %s", btcChain.String()) } else { - clientMap[btcChain.ChainId] = co + observerMap[btcChain.ChainId] = co } } - return clientMap, nil + return observerMap, nil } diff --git a/zetaclient/chains/bitcoin/client.go b/zetaclient/chains/bitcoin/observer.go similarity index 93% rename from zetaclient/chains/bitcoin/client.go rename to zetaclient/chains/bitcoin/observer.go index 2f1336c2b5..a35400fc40 100644 --- a/zetaclient/chains/bitcoin/client.go +++ b/zetaclient/chains/bitcoin/observer.go @@ -53,9 +53,9 @@ const ( bigValueConfirmationCount = 6 ) -var _ interfaces.ChainClient = &Client{} +var _ interfaces.ChainObserver = &Observer{} -// Logger contains list of loggers used by Bitcoin chain client +// Logger contains list of loggers used by Bitcoin chain observer // TODO: Merge this logger with the one in evm // https://github.com/zeta-chain/node/issues/2022 type Logger struct { @@ -100,9 +100,8 @@ type BTCBlockNHeader struct { Block *btcjson.GetBlockVerboseTxResult } -// Client represents a chain configuration for Bitcoin -// Filled with above constants depending on chain -type Client struct { +// Observer is the Bitcoin chain observer +type Observer struct { BlockCache *lru.Cache // Mu is lock for all the maps, utxos and core params @@ -136,13 +135,13 @@ type Client struct { ts *metrics.TelemetryServer } -func (ob *Client) WithZetaCoreClient(client *zetacore.Client) { +func (ob *Observer) WithZetacoreClient(client *zetacore.Client) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.coreClient = client } -func (ob *Client) WithLogger(logger zerolog.Logger) { +func (ob *Observer) WithLogger(logger zerolog.Logger) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.logger = Logger{ @@ -154,32 +153,32 @@ func (ob *Client) WithLogger(logger zerolog.Logger) { } } -func (ob *Client) WithBtcClient(client *rpcclient.Client) { +func (ob *Observer) WithBtcClient(client *rpcclient.Client) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.rpcClient = client } -func (ob *Client) WithChain(chain chains.Chain) { +func (ob *Observer) WithChain(chain chains.Chain) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.chain = chain } -func (ob *Client) SetChainParams(params observertypes.ChainParams) { +func (ob *Observer) SetChainParams(params observertypes.ChainParams) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.params = params } -func (ob *Client) GetChainParams() observertypes.ChainParams { +func (ob *Observer) GetChainParams() observertypes.ChainParams { ob.Mu.Lock() defer ob.Mu.Unlock() return ob.params } -// NewClient returns a new Bitcoin chain client -func NewClient( +// NewObserver returns a new Bitcoin chain observer +func NewObserver( appcontext *context.AppContext, chain chains.Chain, coreClient interfaces.ZetaCoreClient, @@ -188,9 +187,9 @@ func NewClient( loggers clientcommon.ClientLogger, btcCfg config.BTCConfig, ts *metrics.TelemetryServer, -) (*Client, error) { - // initialize the BTCChainClient - ob := Client{ +) (*Observer, error) { + // initialize the observer + ob := Observer{ ts: ts, } ob.stop = make(chan struct{}) @@ -257,7 +256,7 @@ func NewClient( return nil, err } - // load btc chain client DB + // load btc chain observer DB err = ob.loadDB(dbpath) if err != nil { return nil, err @@ -267,7 +266,7 @@ func NewClient( } // Start starts the Go routine to observe the Bitcoin chain -func (ob *Client) Start() { +func (ob *Observer) Start() { ob.logger.Chain.Info().Msgf("Bitcoin client is starting") go ob.WatchInTx() // watch bitcoin chain for incoming txs and post votes to zetacore go ob.WatchOutTx() // watch bitcoin chain for outgoing txs status @@ -278,7 +277,7 @@ func (ob *Client) Start() { } // WatchRPCStatus watches the RPC status of the Bitcoin chain -func (ob *Client) WatchRPCStatus() { +func (ob *Observer) WatchRPCStatus() { ob.logger.Chain.Info().Msgf("RPCStatus is starting") ticker := time.NewTicker(60 * time.Second) @@ -334,20 +333,20 @@ func (ob *Client) WatchRPCStatus() { } } -func (ob *Client) Stop() { +func (ob *Observer) Stop() { ob.logger.Chain.Info().Msgf("ob %s is stopping", ob.chain.String()) close(ob.stop) // this notifies all goroutines to stop ob.logger.Chain.Info().Msgf("%s observer stopped", ob.chain.String()) } -func (ob *Client) SetLastBlockHeight(height int64) { +func (ob *Observer) SetLastBlockHeight(height int64) { if height < 0 { panic("lastBlock is negative") } atomic.StoreInt64(&ob.lastBlock, height) } -func (ob *Client) GetLastBlockHeight() int64 { +func (ob *Observer) GetLastBlockHeight() int64 { height := atomic.LoadInt64(&ob.lastBlock) if height < 0 { panic("lastBlock is negative") @@ -355,7 +354,7 @@ func (ob *Client) GetLastBlockHeight() int64 { return height } -func (ob *Client) SetLastBlockHeightScanned(height int64) { +func (ob *Observer) SetLastBlockHeightScanned(height int64) { if height < 0 { panic("lastBlockScanned is negative") } @@ -363,7 +362,7 @@ func (ob *Client) SetLastBlockHeightScanned(height int64) { metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.ChainName.String()).Set(float64(height)) } -func (ob *Client) GetLastBlockHeightScanned() int64 { +func (ob *Observer) GetLastBlockHeightScanned() int64 { height := atomic.LoadInt64(&ob.lastBlockScanned) if height < 0 { panic("lastBlockScanned is negative") @@ -371,7 +370,7 @@ func (ob *Client) GetLastBlockHeightScanned() int64 { return height } -func (ob *Client) GetPendingNonce() uint64 { +func (ob *Observer) GetPendingNonce() uint64 { ob.Mu.Lock() defer ob.Mu.Unlock() return ob.pendingNonce @@ -380,12 +379,12 @@ func (ob *Client) GetPendingNonce() uint64 { // GetBaseGasPrice ... // TODO: implement // https://github.com/zeta-chain/node/issues/868 -func (ob *Client) GetBaseGasPrice() *big.Int { +func (ob *Observer) GetBaseGasPrice() *big.Int { return big.NewInt(0) } // ConfirmationsThreshold returns number of required Bitcoin confirmations depending on sent BTC amount. -func (ob *Client) ConfirmationsThreshold(amount *big.Int) int64 { +func (ob *Observer) ConfirmationsThreshold(amount *big.Int) int64 { if amount.Cmp(big.NewInt(bigValueSats)) >= 0 { return bigValueConfirmationCount } @@ -398,7 +397,7 @@ func (ob *Client) ConfirmationsThreshold(amount *big.Int) int64 { } // WatchGasPrice watches Bitcoin chain for gas rate and post to zetacore -func (ob *Client) WatchGasPrice() { +func (ob *Observer) WatchGasPrice() { // report gas price right away as the ticker takes time to kick in err := ob.PostGasPrice() if err != nil { @@ -433,7 +432,7 @@ func (ob *Client) WatchGasPrice() { } } -func (ob *Client) PostGasPrice() error { +func (ob *Observer) PostGasPrice() error { if ob.chain.ChainId == 18444 { //bitcoin regtest; hardcode here since this RPC is not available on regtest blockNumber, err := ob.rpcClient.GetBlockCount() if err != nil { @@ -520,7 +519,7 @@ func GetSenderAddressByVin(rpcClient interfaces.BTCRPCClient, vin btcjson.Vin, n } // WatchUTXOS watches bitcoin chain for UTXOs owned by the TSS address -func (ob *Client) WatchUTXOS() { +func (ob *Observer) WatchUTXOS() { ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchUTXOS", ob.GetChainParams().WatchUtxoTicker) if err != nil { ob.logger.UTXOS.Error().Err(err).Msg("error creating ticker") @@ -546,7 +545,7 @@ func (ob *Client) WatchUTXOS() { } } -func (ob *Client) FetchUTXOS() error { +func (ob *Observer) FetchUTXOS() error { defer func() { if err := recover(); err != nil { ob.logger.UTXOS.Error().Msgf("BTC fetchUTXOS: caught panic error: %v", err) @@ -622,7 +621,7 @@ func (ob *Client) FetchUTXOS() error { // - the total value of the selected UTXOs. // - the number of consolidated UTXOs. // - the total value of the consolidated UTXOs. -func (ob *Client) SelectUTXOs( +func (ob *Observer) SelectUTXOs( amount float64, utxosToSpend uint16, nonce uint64, @@ -701,7 +700,7 @@ func (ob *Client) SelectUTXOs( } // SaveBroadcastedTx saves successfully broadcasted transaction -func (ob *Client) SaveBroadcastedTx(txHash string, nonce uint64) { +func (ob *Observer) SaveBroadcastedTx(txHash string, nonce uint64) { outTxID := ob.GetTxID(nonce) ob.Mu.Lock() ob.broadcastedTx[outTxID] = txHash @@ -756,7 +755,7 @@ func GetRawTxResult(rpcClient interfaces.BTCRPCClient, hash *chainhash.Hash, res return btcjson.TxRawResult{}, fmt.Errorf("getRawTxResult: tx %s not included yet", hash) } -func (ob *Client) BuildBroadcastedTxMap() error { +func (ob *Observer) BuildBroadcastedTxMap() error { var broadcastedTransactions []clienttypes.OutTxHashSQLType if err := ob.db.Find(&broadcastedTransactions).Error; err != nil { ob.logger.Chain.Error().Err(err).Msg("error iterating over db") @@ -768,7 +767,7 @@ func (ob *Client) BuildBroadcastedTxMap() error { return nil } -func (ob *Client) LoadLastBlock() error { +func (ob *Observer) LoadLastBlock() error { bn, err := ob.rpcClient.GetBlockCount() if err != nil { return err @@ -799,7 +798,7 @@ func (ob *Client) LoadLastBlock() error { return nil } -func (ob *Client) GetBlockByNumberCached(blockNumber int64) (*BTCBlockNHeader, error) { +func (ob *Observer) GetBlockByNumberCached(blockNumber int64) (*BTCBlockNHeader, error) { if result, ok := ob.BlockCache.Get(blockNumber); ok { return result.(*BTCBlockNHeader), nil } @@ -829,7 +828,7 @@ func (ob *Client) GetBlockByNumberCached(blockNumber int64) (*BTCBlockNHeader, e // isTssTransaction checks if a given transaction was sent by TSS itself. // An unconfirmed transaction is safe to spend only if it was sent by TSS and verified by ourselves. -func (ob *Client) isTssTransaction(txid string) bool { +func (ob *Observer) isTssTransaction(txid string) bool { _, found := ob.includedTxHashes[txid] return found } @@ -838,7 +837,7 @@ func (ob *Client) isTssTransaction(txid string) bool { // There could be many (unpredictable) reasons for a pending nonce lagging behind, for example: // 1. The zetaclient gets restarted. // 2. The tracker is missing in zetacore. -func (ob *Client) refreshPendingNonce() { +func (ob *Observer) refreshPendingNonce() { // get pending nonces from zetacore p, err := ob.coreClient.GetPendingNoncesByChain(ob.chain.ChainId) if err != nil { @@ -867,7 +866,7 @@ func (ob *Client) refreshPendingNonce() { } } -func (ob *Client) getOutTxidByNonce(nonce uint64, test bool) (string, error) { +func (ob *Observer) getOutTxidByNonce(nonce uint64, test bool) (string, error) { // There are 2 types of txids an observer can trust // 1. The ones had been verified and saved by observer self. @@ -897,7 +896,7 @@ func (ob *Client) getOutTxidByNonce(nonce uint64, test bool) (string, error) { return "", fmt.Errorf("getOutTxidByNonce: cannot find outTx txid for nonce %d", nonce) } -func (ob *Client) findNonceMarkUTXO(nonce uint64, txid string) (int, error) { +func (ob *Observer) findNonceMarkUTXO(nonce uint64, txid string) (int, error) { tssAddress := ob.Tss.BTCAddressWitnessPubkeyHash().EncodeAddress() amount := chains.NonceMarkAmount(nonce) for i, utxo := range ob.utxos { @@ -914,7 +913,7 @@ func (ob *Client) findNonceMarkUTXO(nonce uint64, txid string) (int, error) { } // postBlockHeader posts block header to zetacore -func (ob *Client) postBlockHeader(tip int64) error { +func (ob *Observer) postBlockHeader(tip int64) error { ob.logger.InTx.Info().Msgf("postBlockHeader: tip %d", tip) bn := tip res, err := ob.coreClient.GetBlockHeaderChainState(ob.chain.ChainId) @@ -949,7 +948,7 @@ func (ob *Client) postBlockHeader(tip int64) error { return err } -func (ob *Client) loadDB(dbpath string) error { +func (ob *Observer) loadDB(dbpath string) error { if _, err := os.Stat(dbpath); os.IsNotExist(err) { err := os.MkdirAll(dbpath, os.ModePerm) if err != nil { diff --git a/zetaclient/chains/bitcoin/client_db_test.go b/zetaclient/chains/bitcoin/observer_db_test.go similarity index 84% rename from zetaclient/chains/bitcoin/client_db_test.go rename to zetaclient/chains/bitcoin/observer_db_test.go index 751713b4d7..968fd7132f 100644 --- a/zetaclient/chains/bitcoin/client_db_test.go +++ b/zetaclient/chains/bitcoin/observer_db_test.go @@ -14,17 +14,17 @@ import ( const tempSQLiteDbPath = "file::memory:?cache=shared" const numOfEntries = 2 -type BitcoinClientDBTestSuite struct { +type BitcoinObserverDBTestSuite struct { suite.Suite db *gorm.DB submittedTx map[string]btcjson.GetTransactionResult } -func TestBitcoinClientDB(t *testing.T) { - suite.Run(t, new(BitcoinClientDBTestSuite)) +func TestBitcoinObserverDB(t *testing.T) { + suite.Run(t, new(BitcoinObserverDBTestSuite)) } -func (suite *BitcoinClientDBTestSuite) SetupTest() { +func (suite *BitcoinObserverDBTestSuite) SetupTest() { suite.submittedTx = map[string]btcjson.GetTransactionResult{} db, err := gorm.Open(sqlite.Open(tempSQLiteDbPath), &gorm.Config{}) @@ -58,9 +58,9 @@ func (suite *BitcoinClientDBTestSuite) SetupTest() { } } -func (suite *BitcoinClientDBTestSuite) TearDownSuite() {} +func (suite *BitcoinObserverDBTestSuite) TearDownSuite() {} -func (suite *BitcoinClientDBTestSuite) TestSubmittedTx() { +func (suite *BitcoinObserverDBTestSuite) TestSubmittedTx() { var submittedTransactions []clienttypes.TransactionResultSQLType err := suite.db.Find(&submittedTransactions).Error suite.NoError(err) diff --git a/zetaclient/chains/bitcoin/client_inbound.go b/zetaclient/chains/bitcoin/observer_inbound.go similarity index 97% rename from zetaclient/chains/bitcoin/client_inbound.go rename to zetaclient/chains/bitcoin/observer_inbound.go index 539e163873..9920854682 100644 --- a/zetaclient/chains/bitcoin/client_inbound.go +++ b/zetaclient/chains/bitcoin/observer_inbound.go @@ -24,7 +24,7 @@ import ( ) // WatchInTx watches Bitcoin chain for incoming txs and post votes to zetacore -func (ob *Client) WatchInTx() { +func (ob *Observer) WatchInTx() { ticker, err := types.NewDynamicTicker("Bitcoin_WatchInTx", ob.GetChainParams().InTxTicker) if err != nil { ob.logger.InTx.Error().Err(err).Msg("error creating ticker") @@ -54,7 +54,7 @@ func (ob *Client) WatchInTx() { } } -func (ob *Client) ObserveInTx() error { +func (ob *Observer) ObserveInTx() error { // get and update latest block height cnt, err := ob.rpcClient.GetBlockCount() if err != nil { @@ -203,7 +203,7 @@ func (ob *Client) ObserveInTx() error { } // WatchIntxTracker watches zetacore for bitcoin intx trackers -func (ob *Client) WatchIntxTracker() { +func (ob *Observer) WatchIntxTracker() { ticker, err := types.NewDynamicTicker("Bitcoin_WatchIntxTracker", ob.GetChainParams().InTxTicker) if err != nil { ob.logger.InTx.Err(err).Msg("error creating ticker") @@ -230,7 +230,7 @@ func (ob *Client) WatchIntxTracker() { } // ProcessInboundTrackers processes inbound trackers -func (ob *Client) ProcessInboundTrackers() error { +func (ob *Observer) ProcessInboundTrackers() error { trackers, err := ob.coreClient.GetInboundTrackersForChain(ob.chain.ChainId) if err != nil { return err @@ -249,7 +249,7 @@ func (ob *Client) ProcessInboundTrackers() error { } // CheckReceiptForBtcTxHash checks the receipt for a btc tx hash -func (ob *Client) CheckReceiptForBtcTxHash(txHash string, vote bool) (string, error) { +func (ob *Observer) CheckReceiptForBtcTxHash(txHash string, vote bool) (string, error) { hash, err := chainhash.NewHashFromStr(txHash) if err != nil { return "", err @@ -344,7 +344,7 @@ func FilterAndParseIncomingTx( return inTxs, nil } -func (ob *Client) GetInboundVoteMessageFromBtcEvent(inTx *BTCInTxEvent) *crosschaintypes.MsgVoteOnObservedInboundTx { +func (ob *Observer) GetInboundVoteMessageFromBtcEvent(inTx *BTCInTxEvent) *crosschaintypes.MsgVoteOnObservedInboundTx { ob.logger.InTx.Debug().Msgf("Processing inTx: %s", inTx.TxHash) amount := big.NewFloat(inTx.Value) amount = amount.Mul(amount, big.NewFloat(1e8)) @@ -376,7 +376,7 @@ func (ob *Client) GetInboundVoteMessageFromBtcEvent(inTx *BTCInTxEvent) *crossch } // IsInTxRestricted returns true if the inTx contains restricted addresses -func (ob *Client) IsInTxRestricted(inTx *BTCInTxEvent) bool { +func (ob *Observer) IsInTxRestricted(inTx *BTCInTxEvent) bool { receiver := "" parsedAddress, _, err := chains.ParseAddressAndData(hex.EncodeToString(inTx.MemoBytes)) if err == nil && parsedAddress != (ethcommon.Address{}) { diff --git a/zetaclient/chains/bitcoin/client_live_test.go b/zetaclient/chains/bitcoin/observer_live_test.go similarity index 96% rename from zetaclient/chains/bitcoin/client_live_test.go rename to zetaclient/chains/bitcoin/observer_live_test.go index 87dc7815cf..7736b6525f 100644 --- a/zetaclient/chains/bitcoin/client_live_test.go +++ b/zetaclient/chains/bitcoin/observer_live_test.go @@ -26,12 +26,12 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/testutils/mocks" ) -type BitcoinClientTestSuite struct { +type BitcoinObserverTestSuite struct { suite.Suite rpcClient *rpcclient.Client } -func (suite *BitcoinClientTestSuite) SetupTest() { +func (suite *BitcoinObserverTestSuite) SetupTest() { // test private key with EVM address //// EVM: 0x236C7f53a90493Bb423411fe4117Cb4c2De71DfB // BTC testnet3: muGe9prUBjQwEnX19zG26fVRHNi8z7kSPo @@ -45,7 +45,7 @@ func (suite *BitcoinClientTestSuite) SetupTest() { PrivKey: privateKey, } appContext := clientcontext.NewAppContext(&clientcontext.ZetaCoreContext{}, config.Config{}) - client, err := NewClient(appContext, chains.BtcRegtestChain, nil, tss, tempSQLiteDbPath, + client, err := NewObserver(appContext, chains.BtcRegtestChain, nil, tss, tempSQLiteDbPath, clientcommon.DefaultLoggers(), config.BTCConfig{}, nil) suite.Require().NoError(err) suite.rpcClient, err = getRPCClient(18332) @@ -79,7 +79,7 @@ func (suite *BitcoinClientTestSuite) SetupTest() { } } -func (suite *BitcoinClientTestSuite) TearDownSuite() { +func (suite *BitcoinObserverTestSuite) TearDownSuite() { } func getRPCClient(chainID int64) (*rpcclient.Client, error) { @@ -125,7 +125,7 @@ func getFeeRate(client *rpcclient.Client, confTarget int64, estimateMode *btcjso // All methods that begin with "Test" are run as tests within a // suite. -func (suite *BitcoinClientTestSuite) Test1() { +func (suite *BitcoinObserverTestSuite) Test1() { feeResult, err := suite.rpcClient.EstimateSmartFee(1, nil) suite.Require().NoError(err) suite.T().Logf("fee result: %f", *feeResult.FeeRate) @@ -168,7 +168,7 @@ func (suite *BitcoinClientTestSuite) Test1() { } // a tx with memo around 81B (is this allowed1?) -func (suite *BitcoinClientTestSuite) Test2() { +func (suite *BitcoinObserverTestSuite) Test2() { hashStr := "000000000000002fd8136dbf91708898da9d6ae61d7c354065a052568e2f2888" var hash chainhash.Hash err := chainhash.Decode(&hash, hashStr) @@ -193,7 +193,7 @@ func (suite *BitcoinClientTestSuite) Test2() { suite.Require().Equal(0, len(inTxs)) } -func (suite *BitcoinClientTestSuite) Test3() { +func (suite *BitcoinObserverTestSuite) Test3() { client := suite.rpcClient res, err := client.EstimateSmartFee(1, &btcjson.EstimateModeConservative) suite.Require().NoError(err) @@ -209,8 +209,8 @@ func (suite *BitcoinClientTestSuite) Test3() { suite.T().Logf("block number %d", bn) } -// TestBitcoinClientLive is a phony test to run each live test individually -func TestBitcoinClientLive(t *testing.T) { +// TestBitcoinObserverLive is a phony test to run each live test individually +func TestBitcoinObserverLive(t *testing.T) { // suite.Run(t, new(BitcoinClientTestSuite)) // LiveTestBitcoinFeeRate(t) diff --git a/zetaclient/chains/bitcoin/client_outbound.go b/zetaclient/chains/bitcoin/observer_outbound.go similarity index 93% rename from zetaclient/chains/bitcoin/client_outbound.go rename to zetaclient/chains/bitcoin/observer_outbound.go index 12e4785238..72c4a22b40 100644 --- a/zetaclient/chains/bitcoin/client_outbound.go +++ b/zetaclient/chains/bitcoin/observer_outbound.go @@ -18,7 +18,7 @@ import ( ) // WatchOutTx watches Bitcoin chain for outgoing txs status -func (ob *Client) WatchOutTx() { +func (ob *Observer) WatchOutTx() { ticker, err := types.NewDynamicTicker("Bitcoin_WatchOutTx", ob.GetChainParams().OutTxTicker) if err != nil { ob.logger.OutTx.Error().Err(err).Msg("error creating ticker ") @@ -93,7 +93,7 @@ func (ob *Client) WatchOutTx() { } // IsOutboundProcessed returns isIncluded(or inMempool), isConfirmed, Error -func (ob *Client) IsOutboundProcessed(cctx *crosschaintypes.CrossChainTx, logger zerolog.Logger) (bool, bool, error) { +func (ob *Observer) IsOutboundProcessed(cctx *crosschaintypes.CrossChainTx, logger zerolog.Logger) (bool, bool, error) { params := *cctx.GetCurrentOutTxParam() sendHash := cctx.Index nonce := cctx.GetCurrentOutTxParam().OutboundTxTssNonce @@ -170,14 +170,14 @@ func (ob *Client) IsOutboundProcessed(cctx *crosschaintypes.CrossChainTx, logger } // GetTxID returns a unique id for outbound tx -func (ob *Client) GetTxID(nonce uint64) string { +func (ob *Observer) GetTxID(nonce uint64) string { tssAddr := ob.Tss.BTCAddress() return fmt.Sprintf("%d-%s-%d", ob.chain.ChainId, tssAddr, nonce) } // checkIncludedTx checks if a txHash is included and returns (txResult, inMempool) // Note: if txResult is nil, then inMempool flag should be ignored. -func (ob *Client) checkIncludedTx(cctx *crosschaintypes.CrossChainTx, txHash string) (*btcjson.GetTransactionResult, bool) { +func (ob *Observer) checkIncludedTx(cctx *crosschaintypes.CrossChainTx, txHash string) (*btcjson.GetTransactionResult, bool) { outTxID := ob.GetTxID(cctx.GetCurrentOutTxParam().OutboundTxTssNonce) hash, getTxResult, err := GetTxResultByHash(ob.rpcClient, txHash) if err != nil { @@ -202,7 +202,7 @@ func (ob *Client) checkIncludedTx(cctx *crosschaintypes.CrossChainTx, txHash str } // setIncludedTx saves included tx result in memory -func (ob *Client) setIncludedTx(nonce uint64, getTxResult *btcjson.GetTransactionResult) { +func (ob *Observer) setIncludedTx(nonce uint64, getTxResult *btcjson.GetTransactionResult) { txHash := getTxResult.TxID outTxID := ob.GetTxID(nonce) @@ -230,14 +230,14 @@ func (ob *Client) setIncludedTx(nonce uint64, getTxResult *btcjson.GetTransactio } // getIncludedTx gets the receipt and transaction from memory -func (ob *Client) getIncludedTx(nonce uint64) *btcjson.GetTransactionResult { +func (ob *Observer) getIncludedTx(nonce uint64) *btcjson.GetTransactionResult { ob.Mu.Lock() defer ob.Mu.Unlock() return ob.includedTxResults[ob.GetTxID(nonce)] } // removeIncludedTx removes included tx from memory -func (ob *Client) removeIncludedTx(nonce uint64) { +func (ob *Observer) removeIncludedTx(nonce uint64) { ob.Mu.Lock() defer ob.Mu.Unlock() txResult, found := ob.includedTxResults[ob.GetTxID(nonce)] @@ -252,7 +252,7 @@ func (ob *Client) removeIncludedTx(nonce uint64) { // - check if all inputs are segwit && TSS inputs // // Returns: true if outTx passes basic checks. -func (ob *Client) checkTssOutTxResult(cctx *crosschaintypes.CrossChainTx, hash *chainhash.Hash, res *btcjson.GetTransactionResult) error { +func (ob *Observer) checkTssOutTxResult(cctx *crosschaintypes.CrossChainTx, hash *chainhash.Hash, res *btcjson.GetTransactionResult) error { params := cctx.GetCurrentOutTxParam() nonce := params.OutboundTxTssNonce rawResult, err := GetRawTxResult(ob.rpcClient, hash, res) @@ -282,7 +282,7 @@ func (ob *Client) checkTssOutTxResult(cctx *crosschaintypes.CrossChainTx, hash * // checkTSSVin checks vin is valid if: // - The first input is the nonce-mark // - All inputs are from TSS address -func (ob *Client) checkTSSVin(vins []btcjson.Vin, nonce uint64) error { +func (ob *Observer) checkTSSVin(vins []btcjson.Vin, nonce uint64) error { // vins: [nonce-mark, UTXO1, UTXO2, ...] if nonce > 0 && len(vins) <= 1 { return fmt.Errorf("checkTSSVin: len(vins) <= 1") @@ -315,7 +315,7 @@ func (ob *Client) checkTSSVin(vins []btcjson.Vin, nonce uint64) error { // - The first output is the nonce-mark // - The second output is the correct payment to recipient // - The third output is the change to TSS (optional) -func (ob *Client) checkTSSVout(params *crosschaintypes.OutboundTxParams, vouts []btcjson.Vout) error { +func (ob *Observer) checkTSSVout(params *crosschaintypes.OutboundTxParams, vouts []btcjson.Vout) error { // vouts: [nonce-mark, payment to recipient, change to TSS (optional)] if !(len(vouts) == 2 || len(vouts) == 3) { return fmt.Errorf("checkTSSVout: invalid number of vouts: %d", len(vouts)) @@ -362,7 +362,7 @@ func (ob *Client) checkTSSVout(params *crosschaintypes.OutboundTxParams, vouts [ // checkTSSVoutCancelled vout is valid if: // - The first output is the nonce-mark // - The second output is the change to TSS (optional) -func (ob *Client) checkTSSVoutCancelled(params *crosschaintypes.OutboundTxParams, vouts []btcjson.Vout) error { +func (ob *Observer) checkTSSVoutCancelled(params *crosschaintypes.OutboundTxParams, vouts []btcjson.Vout) error { // vouts: [nonce-mark, change to TSS (optional)] if !(len(vouts) == 1 || len(vouts) == 2) { return fmt.Errorf("checkTSSVoutCancelled: invalid number of vouts: %d", len(vouts)) diff --git a/zetaclient/chains/bitcoin/client_test.go b/zetaclient/chains/bitcoin/observer_test.go similarity index 97% rename from zetaclient/chains/bitcoin/client_test.go rename to zetaclient/chains/bitcoin/observer_test.go index e975eddaf7..19756d6cf1 100644 --- a/zetaclient/chains/bitcoin/client_test.go +++ b/zetaclient/chains/bitcoin/observer_test.go @@ -31,11 +31,11 @@ import ( // the relative path to the testdata directory var TestDataDir = "../../" -func MockBTCClientMainnet() *Client { +func MockBTCObserverMainnet() *Observer { cfg := config.NewConfig() coreContext := context.NewZetaCoreContext(cfg) - return &Client{ + return &Observer{ chain: chains.BtcMainnetChain, coreClient: mocks.NewMockZetaCoreClient(), Tss: mocks.NewTSSMainnet(), @@ -59,7 +59,7 @@ func createRPCClientAndLoadTx(t *testing.T, chainId int64, txHash string) *mocks return rpcClient } -func TestNewBitcoinClient(t *testing.T) { +func TestNewBitcoinObserver(t *testing.T) { t.Run("should return error because zetacore doesn't update core context", func(t *testing.T) { cfg := config.NewConfig() coreContext := context.NewZetaCoreContext(cfg) @@ -71,27 +71,27 @@ func TestNewBitcoinClient(t *testing.T) { btcCfg := cfg.BitcoinConfig ts := metrics.NewTelemetryServer() - client, err := NewClient(appContext, chain, coreClient, tss, tempSQLiteDbPath, loggers, btcCfg, ts) + client, err := NewObserver(appContext, chain, coreClient, tss, tempSQLiteDbPath, loggers, btcCfg, ts) require.ErrorContains(t, err, "btc chains params not initialized") require.Nil(t, client) }) } func TestConfirmationThreshold(t *testing.T) { - client := &Client{Mu: &sync.Mutex{}} + ob := &Observer{Mu: &sync.Mutex{}} t.Run("should return confirmations in chain param", func(t *testing.T) { - client.SetChainParams(observertypes.ChainParams{ConfirmationCount: 3}) - require.Equal(t, int64(3), client.ConfirmationsThreshold(big.NewInt(1000))) + ob.SetChainParams(observertypes.ChainParams{ConfirmationCount: 3}) + require.Equal(t, int64(3), ob.ConfirmationsThreshold(big.NewInt(1000))) }) t.Run("should return big value confirmations", func(t *testing.T) { - client.SetChainParams(observertypes.ChainParams{ConfirmationCount: 3}) - require.Equal(t, int64(bigValueConfirmationCount), client.ConfirmationsThreshold(big.NewInt(bigValueSats))) + ob.SetChainParams(observertypes.ChainParams{ConfirmationCount: 3}) + require.Equal(t, int64(bigValueConfirmationCount), ob.ConfirmationsThreshold(big.NewInt(bigValueSats))) }) t.Run("big value confirmations is the upper cap", func(t *testing.T) { - client.SetChainParams(observertypes.ChainParams{ConfirmationCount: bigValueConfirmationCount + 1}) - require.Equal(t, int64(bigValueConfirmationCount), client.ConfirmationsThreshold(big.NewInt(1000))) + ob.SetChainParams(observertypes.ChainParams{ConfirmationCount: bigValueConfirmationCount + 1}) + require.Equal(t, int64(bigValueConfirmationCount), ob.ConfirmationsThreshold(big.NewInt(1000))) }) } @@ -233,7 +233,7 @@ func TestCheckTSSVout(t *testing.T) { nonce := uint64(148) // create mainnet mock client - btcClient := MockBTCClientMainnet() + btcClient := MockBTCObserverMainnet() t.Run("valid TSS vout should pass", func(t *testing.T) { rawResult, cctx := testutils.LoadBTCTxRawResultNCctx(t, TestDataDir, chainID, nonce) @@ -315,7 +315,7 @@ func TestCheckTSSVoutCancelled(t *testing.T) { nonce := uint64(148) // create mainnet mock client - btcClient := MockBTCClientMainnet() + btcClient := MockBTCObserverMainnet() t.Run("valid TSS vout should pass", func(t *testing.T) { // remove change vout to simulate cancelled tx diff --git a/zetaclient/chains/bitcoin/signer.go b/zetaclient/chains/bitcoin/signer.go index 699b3619b2..a2252ab2cf 100644 --- a/zetaclient/chains/bitcoin/signer.go +++ b/zetaclient/chains/bitcoin/signer.go @@ -167,7 +167,7 @@ func (signer *Signer) SignWithdrawTx( amount float64, gasPrice *big.Int, sizeLimit uint64, - btcClient *Client, + observer *Observer, height uint64, nonce uint64, chain *chains.Chain, @@ -177,13 +177,13 @@ func (signer *Signer) SignWithdrawTx( nonceMark := chains.NonceMarkAmount(nonce) // refresh unspent UTXOs and continue with keysign regardless of error - err := btcClient.FetchUTXOS() + err := observer.FetchUTXOS() if err != nil { signer.logger.Error().Err(err).Msgf("SignWithdrawTx: FetchUTXOS error: nonce %d chain %d", nonce, chain.ChainId) } // select N UTXOs to cover the total expense - prevOuts, total, consolidatedUtxo, consolidatedValue, err := btcClient.SelectUTXOs(amount+estimateFee+float64(nonceMark)*1e-8, maxNoOfInputsPerTx, nonce, consolidationRank, false) + prevOuts, total, consolidatedUtxo, consolidatedValue, err := observer.SelectUTXOs(amount+estimateFee+float64(nonceMark)*1e-8, maxNoOfInputsPerTx, nonce, consolidationRank, false) if err != nil { return nil, err } @@ -299,7 +299,7 @@ func (signer *Signer) TryProcessOutTx( cctx *types.CrossChainTx, outTxProc *outtxprocessor.Processor, outTxID string, - chainClient interfaces.ChainClient, + chainObserver interfaces.ChainObserver, coreClient interfaces.ZetaCoreClient, height uint64, ) { @@ -323,9 +323,9 @@ func (signer *Signer) TryProcessOutTx( } logger.Info().Msgf("BTC TryProcessOutTx: %s, value %d to %s", cctx.Index, params.Amount.BigInt(), params.Receiver) - btcClient, ok := chainClient.(*Client) + btcObserver, ok := chainObserver.(*Observer) if !ok { - logger.Error().Msgf("chain client is not a bitcoin client") + logger.Error().Msgf("chain observer is not a bitcoin observer") return } flags := signer.coreContext.GetCrossChainFlags() @@ -368,29 +368,29 @@ func (signer *Signer) TryProcessOutTx( cancelTx := compliance.IsCctxRestricted(cctx) if cancelTx { compliance.PrintComplianceLog(logger, signer.loggerCompliance, - true, btcClient.chain.ChainId, cctx.Index, cctx.InboundTxParams.Sender, params.Receiver, "BTC") + true, btcObserver.chain.ChainId, cctx.Index, cctx.InboundTxParams.Sender, params.Receiver, "BTC") amount = 0.0 // zero out the amount to cancel the tx } logger.Info().Msgf("SignWithdrawTx: to %s, value %d sats", to.EncodeAddress(), params.Amount.Uint64()) - logger.Info().Msgf("using utxos: %v", btcClient.utxos) + logger.Info().Msgf("using utxos: %v", btcObserver.utxos) tx, err := signer.SignWithdrawTx( to, amount, gasprice, sizelimit, - btcClient, + btcObserver, height, outboundTxTssNonce, - &btcClient.chain, + &btcObserver.chain, cancelTx, ) if err != nil { logger.Warn().Err(err).Msgf("SignOutboundTx error: nonce %d chain %d", outboundTxTssNonce, params.ReceiverChainId) return } - logger.Info().Msgf("Key-sign success: %d => %s, nonce %d", cctx.InboundTxParams.SenderChainId, btcClient.chain.ChainName, outboundTxTssNonce) + logger.Info().Msgf("Key-sign success: %d => %s, nonce %d", cctx.InboundTxParams.SenderChainId, btcObserver.chain.ChainName, outboundTxTssNonce) // FIXME: add prometheus metrics _, err = coreClient.GetObserverList() @@ -399,7 +399,7 @@ func (signer *Signer) TryProcessOutTx( } if tx != nil { outTxHash := tx.TxHash().String() - logger.Info().Msgf("on chain %s nonce %d, outTxHash %s signer %s", btcClient.chain.ChainName, outboundTxTssNonce, outTxHash, myid) + logger.Info().Msgf("on chain %s nonce %d, outTxHash %s signer %s", btcObserver.chain.ChainName, outboundTxTssNonce, outTxHash, myid) // TODO: pick a few broadcasters. //if len(signers) == 0 || myid == signers[send.OutboundTxParams.Broadcaster] || myid == signers[int(send.OutboundTxParams.Broadcaster+1)%len(signers)] { // retry loop: 1s, 2s, 4s, 8s, 16s in case of RPC error @@ -408,18 +408,18 @@ func (signer *Signer) TryProcessOutTx( time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond) //random delay to avoid sychronized broadcast err := signer.Broadcast(tx) if err != nil { - logger.Warn().Err(err).Msgf("broadcasting tx %s to chain %s: nonce %d, retry %d", outTxHash, btcClient.chain.ChainName, outboundTxTssNonce, i) + logger.Warn().Err(err).Msgf("broadcasting tx %s to chain %s: nonce %d, retry %d", outTxHash, btcObserver.chain.ChainName, outboundTxTssNonce, i) continue } - logger.Info().Msgf("Broadcast success: nonce %d to chain %s outTxHash %s", outboundTxTssNonce, btcClient.chain.String(), outTxHash) - zetaHash, err := coreClient.AddTxHashToOutTxTracker(btcClient.chain.ChainId, outboundTxTssNonce, outTxHash, nil, "", -1) + logger.Info().Msgf("Broadcast success: nonce %d to chain %s outTxHash %s", outboundTxTssNonce, btcObserver.chain.String(), outTxHash) + zetaHash, err := coreClient.AddTxHashToOutTxTracker(btcObserver.chain.ChainId, outboundTxTssNonce, outTxHash, nil, "", -1) if err != nil { - logger.Err(err).Msgf("Unable to add to tracker on ZetaCore: nonce %d chain %s outTxHash %s", outboundTxTssNonce, btcClient.chain.ChainName, outTxHash) + logger.Err(err).Msgf("Unable to add to tracker on ZetaCore: nonce %d chain %s outTxHash %s", outboundTxTssNonce, btcObserver.chain.ChainName, outTxHash) } logger.Info().Msgf("Broadcast to core successful %s", zetaHash) - // Save successfully broadcasted transaction to btc chain client - btcClient.SaveBroadcastedTx(outTxHash, outboundTxTssNonce) + // Save successfully broadcasted transaction to btc chain observer + btcObserver.SaveBroadcastedTx(outTxHash, outboundTxTssNonce) break // successful broadcast; no need to retry } diff --git a/zetaclient/chains/bitcoin/signer_test.go b/zetaclient/chains/bitcoin/signer_test.go index e4c6653f1d..bc8e0562bc 100644 --- a/zetaclient/chains/bitcoin/signer_test.go +++ b/zetaclient/chains/bitcoin/signer_test.go @@ -214,25 +214,25 @@ func (s *BTCSignerSuite) TestP2WPH(c *C) { fmt.Println("Transaction successfully signed") } -// helper function to create a test BitcoinChainClient -func createTestClient(t *testing.T) *Client { +// helper function to create a test Bitcoin observer +func createTestObserver(t *testing.T) *Observer { skHex := "7b8507ba117e069f4a3f456f505276084f8c92aee86ac78ae37b4d1801d35fa8" privateKey, err := crypto.HexToECDSA(skHex) require.Nil(t, err) tss := &mocks.TSS{ PrivKey: privateKey, } - return &Client{ + return &Observer{ Tss: tss, Mu: &sync.Mutex{}, includedTxResults: make(map[string]*btcjson.GetTransactionResult), } } -// helper function to create a test BitcoinChainClient with UTXOs -func createTestClientWithUTXOs(t *testing.T) *Client { - // Create BitcoinChainClient - client := createTestClient(t) +// helper function to create a test Bitcoin observer with UTXOs +func createTestObserverWithUTXOs(t *testing.T) *Observer { + // Create Bitcoin observer + client := createTestObserver(t) tssAddress := client.Tss.BTCAddressWitnessPubkeyHash().EncodeAddress() // Create 10 dummy UTXOs (22.44 BTC in total) @@ -391,7 +391,7 @@ func TestAddWithdrawTxOutputs(t *testing.T) { } } -func mineTxNSetNonceMark(ob *Client, nonce uint64, txid string, preMarkIndex int) { +func mineTxNSetNonceMark(ob *Observer, nonce uint64, txid string, preMarkIndex int) { // Mine transaction outTxID := ob.GetTxID(nonce) ob.includedTxResults[outTxID] = &btcjson.GetTransactionResult{TxID: txid} @@ -411,7 +411,7 @@ func mineTxNSetNonceMark(ob *Client, nonce uint64, txid string, preMarkIndex int } func TestSelectUTXOs(t *testing.T) { - ob := createTestClientWithUTXOs(t) + ob := createTestObserverWithUTXOs(t) dummyTxID := "6e6f71d281146c1fc5c755b35908ee449f26786c84e2ae18f98b268de40b7ec4" // Case1: nonce = 0, bootstrap @@ -503,7 +503,7 @@ func TestUTXOConsolidation(t *testing.T) { dummyTxID := "6e6f71d281146c1fc5c755b35908ee449f26786c84e2ae18f98b268de40b7ec4" t.Run("should not consolidate", func(t *testing.T) { - ob := createTestClientWithUTXOs(t) + ob := createTestObserverWithUTXOs(t) mineTxNSetNonceMark(ob, 0, dummyTxID, -1) // mine a transaction and set nonce-mark utxo for nonce 0 // input: utxoCap = 10, amount = 0.01, nonce = 1, rank = 10 @@ -517,7 +517,7 @@ func TestUTXOConsolidation(t *testing.T) { }) t.Run("should consolidate 1 utxo", func(t *testing.T) { - ob := createTestClientWithUTXOs(t) + ob := createTestObserverWithUTXOs(t) mineTxNSetNonceMark(ob, 0, dummyTxID, -1) // mine a transaction and set nonce-mark utxo for nonce 0 // input: utxoCap = 9, amount = 0.01, nonce = 1, rank = 9 @@ -531,7 +531,7 @@ func TestUTXOConsolidation(t *testing.T) { }) t.Run("should consolidate 3 utxos", func(t *testing.T) { - ob := createTestClientWithUTXOs(t) + ob := createTestObserverWithUTXOs(t) mineTxNSetNonceMark(ob, 0, dummyTxID, -1) // mine a transaction and set nonce-mark utxo for nonce 0 // input: utxoCap = 5, amount = 0.01, nonce = 0, rank = 5 @@ -550,7 +550,7 @@ func TestUTXOConsolidation(t *testing.T) { }) t.Run("should consolidate all utxos using rank 1", func(t *testing.T) { - ob := createTestClientWithUTXOs(t) + ob := createTestObserverWithUTXOs(t) mineTxNSetNonceMark(ob, 0, dummyTxID, -1) // mine a transaction and set nonce-mark utxo for nonce 0 // input: utxoCap = 12, amount = 0.01, nonce = 0, rank = 1 @@ -569,7 +569,7 @@ func TestUTXOConsolidation(t *testing.T) { }) t.Run("should consolidate 3 utxos sparse", func(t *testing.T) { - ob := createTestClientWithUTXOs(t) + ob := createTestObserverWithUTXOs(t) mineTxNSetNonceMark(ob, 24105431, dummyTxID, -1) // mine a transaction and set nonce-mark utxo for nonce 24105431 // input: utxoCap = 5, amount = 0.13, nonce = 24105432, rank = 5 @@ -587,7 +587,7 @@ func TestUTXOConsolidation(t *testing.T) { }) t.Run("should consolidate all utxos sparse", func(t *testing.T) { - ob := createTestClientWithUTXOs(t) + ob := createTestObserverWithUTXOs(t) mineTxNSetNonceMark(ob, 24105431, dummyTxID, -1) // mine a transaction and set nonce-mark utxo for nonce 24105431 // input: utxoCap = 12, amount = 0.13, nonce = 24105432, rank = 1 diff --git a/zetaclient/chains/evm/client.go b/zetaclient/chains/evm/observer.go similarity index 85% rename from zetaclient/chains/evm/client.go rename to zetaclient/chains/evm/observer.go index 3260fa7302..f23c634491 100644 --- a/zetaclient/chains/evm/client.go +++ b/zetaclient/chains/evm/observer.go @@ -57,11 +57,10 @@ type Logger struct { Compliance zerolog.Logger } -var _ interfaces.ChainClient = &Client{} +var _ interfaces.ChainObserver = &Observer{} -// Client represents the chain configuration for an EVM chain -// Filled with above constants depending on chain -type Client struct { +// Observer is the observer for evm chains +type Observer struct { Tss interfaces.TSSSigner Mu *sync.Mutex @@ -86,8 +85,8 @@ type Client struct { headerCache *lru.Cache } -// NewClient returns a new EVM chain client -func NewClient( +// NewObserver returns a new EVM chain observer +func NewObserver( appContext *clientcontext.AppContext, coreClient interfaces.ZetaCoreClient, tss interfaces.TSSSigner, @@ -95,8 +94,8 @@ func NewClient( loggers clientcommon.ClientLogger, evmCfg config.EVMConfig, ts *metrics.TelemetryServer, -) (*Client, error) { - ob := Client{ +) (*Observer, error) { + ob := Observer{ ts: ts, } @@ -158,15 +157,15 @@ func NewClient( return &ob, nil } -// WithChain attaches a new chain to the chain client -func (ob *Client) WithChain(chain chains.Chain) { +// WithChain attaches a new chain to the observer +func (ob *Observer) WithChain(chain chains.Chain) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.chain = chain } -// WithLogger attaches a new logger to the chain client -func (ob *Client) WithLogger(logger zerolog.Logger) { +// WithLogger attaches a new logger to the observer +func (ob *Observer) WithLogger(logger zerolog.Logger) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.logger = Logger{ @@ -177,67 +176,67 @@ func (ob *Client) WithLogger(logger zerolog.Logger) { } } -// WithEvmClient attaches a new evm client to the chain client -func (ob *Client) WithEvmClient(client interfaces.EVMRPCClient) { +// WithEvmClient attaches a new evm client to the observer +func (ob *Observer) WithEvmClient(client interfaces.EVMRPCClient) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.evmClient = client } -// WithEvmJSONRPC attaches a new evm json rpc client to the chain client -func (ob *Client) WithEvmJSONRPC(client interfaces.EVMJSONRPCClient) { +// WithEvmJSONRPC attaches a new evm json rpc client to the observer +func (ob *Observer) WithEvmJSONRPC(client interfaces.EVMJSONRPCClient) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.evmJSONRPC = client } -// WithZetacoreClient attaches a new client to interact with ZetaCore to the chain client -func (ob *Client) WithZetacoreClient(client interfaces.ZetaCoreClient) { +// WithZetacoreClient attaches a new client to interact with ZetaCore to the observer +func (ob *Observer) WithZetacoreClient(client interfaces.ZetaCoreClient) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.coreClient = client } -// WithBlockCache attaches a new block cache to the chain client -func (ob *Client) WithBlockCache(cache *lru.Cache) { +// WithBlockCache attaches a new block cache to the observer +func (ob *Observer) WithBlockCache(cache *lru.Cache) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.blockCache = cache } -// SetChainParams sets the chain params for the chain client -func (ob *Client) SetChainParams(params observertypes.ChainParams) { +// SetChainParams sets the chain params for the observer +func (ob *Observer) SetChainParams(params observertypes.ChainParams) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.chainParams = params } -// GetChainParams returns the chain params for the chain client -func (ob *Client) GetChainParams() observertypes.ChainParams { +// GetChainParams returns the chain params for the observer +func (ob *Observer) GetChainParams() observertypes.ChainParams { ob.Mu.Lock() defer ob.Mu.Unlock() return ob.chainParams } -func (ob *Client) GetConnectorContract() (ethcommon.Address, *zetaconnector.ZetaConnectorNonEth, error) { +func (ob *Observer) GetConnectorContract() (ethcommon.Address, *zetaconnector.ZetaConnectorNonEth, error) { addr := ethcommon.HexToAddress(ob.GetChainParams().ConnectorContractAddress) contract, err := FetchConnectorContract(addr, ob.evmClient) return addr, contract, err } -func (ob *Client) GetConnectorContractEth() (ethcommon.Address, *zetaconnectoreth.ZetaConnectorEth, error) { +func (ob *Observer) GetConnectorContractEth() (ethcommon.Address, *zetaconnectoreth.ZetaConnectorEth, error) { addr := ethcommon.HexToAddress(ob.GetChainParams().ConnectorContractAddress) contract, err := FetchConnectorContractEth(addr, ob.evmClient) return addr, contract, err } -func (ob *Client) GetZetaTokenNonEthContract() (ethcommon.Address, *zeta.ZetaNonEth, error) { +func (ob *Observer) GetZetaTokenNonEthContract() (ethcommon.Address, *zeta.ZetaNonEth, error) { addr := ethcommon.HexToAddress(ob.GetChainParams().ZetaTokenContractAddress) contract, err := FetchZetaZetaNonEthTokenContract(addr, ob.evmClient) return addr, contract, err } -func (ob *Client) GetERC20CustodyContract() (ethcommon.Address, *erc20custody.ERC20Custody, error) { +func (ob *Observer) GetERC20CustodyContract() (ethcommon.Address, *erc20custody.ERC20Custody, error) { addr := ethcommon.HexToAddress(ob.GetChainParams().Erc20CustodyContractAddress) contract, err := FetchERC20CustodyContract(addr, ob.evmClient) return addr, contract, err @@ -260,7 +259,7 @@ func FetchERC20CustodyContract(addr ethcommon.Address, client interfaces.EVMRPCC } // Start all observation routines for the evm chain -func (ob *Client) Start() { +func (ob *Observer) Start() { // watch evm chain for incoming txs and post votes to zetacore go ob.WatchInTx() @@ -278,7 +277,7 @@ func (ob *Client) Start() { } // WatchRPCStatus watches the RPC status of the evm chain -func (ob *Client) WatchRPCStatus() { +func (ob *Observer) WatchRPCStatus() { ob.logger.Chain.Info().Msgf("Starting RPC status check for chain %s", ob.chain.String()) ticker := time.NewTicker(60 * time.Second) for { @@ -316,7 +315,7 @@ func (ob *Client) WatchRPCStatus() { } } -func (ob *Client) Stop() { +func (ob *Observer) Stop() { ob.logger.Chain.Info().Msgf("ob %s is stopping", ob.chain.String()) close(ob.stop) // this notifies all goroutines to stop @@ -334,21 +333,21 @@ func (ob *Client) Stop() { } // SetPendingTx sets the pending transaction in memory -func (ob *Client) SetPendingTx(nonce uint64, transaction *ethtypes.Transaction) { +func (ob *Observer) SetPendingTx(nonce uint64, transaction *ethtypes.Transaction) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.outTxPendingTransactions[ob.GetTxID(nonce)] = transaction } // GetPendingTx gets the pending transaction from memory -func (ob *Client) GetPendingTx(nonce uint64) *ethtypes.Transaction { +func (ob *Observer) GetPendingTx(nonce uint64) *ethtypes.Transaction { ob.Mu.Lock() defer ob.Mu.Unlock() return ob.outTxPendingTransactions[ob.GetTxID(nonce)] } // SetTxNReceipt sets the receipt and transaction in memory -func (ob *Client) SetTxNReceipt(nonce uint64, receipt *ethtypes.Receipt, transaction *ethtypes.Transaction) { +func (ob *Observer) SetTxNReceipt(nonce uint64, receipt *ethtypes.Receipt, transaction *ethtypes.Transaction) { ob.Mu.Lock() defer ob.Mu.Unlock() delete(ob.outTxPendingTransactions, ob.GetTxID(nonce)) // remove pending transaction, if any @@ -357,7 +356,7 @@ func (ob *Client) SetTxNReceipt(nonce uint64, receipt *ethtypes.Receipt, transac } // GetTxNReceipt gets the receipt and transaction from memory -func (ob *Client) GetTxNReceipt(nonce uint64) (*ethtypes.Receipt, *ethtypes.Transaction) { +func (ob *Observer) GetTxNReceipt(nonce uint64) (*ethtypes.Receipt, *ethtypes.Transaction) { ob.Mu.Lock() defer ob.Mu.Unlock() receipt := ob.outTXConfirmedReceipts[ob.GetTxID(nonce)] @@ -366,14 +365,14 @@ func (ob *Client) GetTxNReceipt(nonce uint64) (*ethtypes.Receipt, *ethtypes.Tran } // IsTxConfirmed returns true if there is a confirmed tx for 'nonce' -func (ob *Client) IsTxConfirmed(nonce uint64) bool { +func (ob *Observer) IsTxConfirmed(nonce uint64) bool { ob.Mu.Lock() defer ob.Mu.Unlock() return ob.outTXConfirmedReceipts[ob.GetTxID(nonce)] != nil && ob.outTXConfirmedTransactions[ob.GetTxID(nonce)] != nil } // CheckTxInclusion returns nil only if tx is included at the position indicated by the receipt ([block, index]) -func (ob *Client) CheckTxInclusion(tx *ethtypes.Transaction, receipt *ethtypes.Receipt) error { +func (ob *Observer) CheckTxInclusion(tx *ethtypes.Transaction, receipt *ethtypes.Receipt) error { block, err := ob.GetBlockByNumberCached(receipt.BlockNumber.Uint64()) if err != nil { return errors.Wrapf(err, "GetBlockByNumberCached error for block %d txHash %s nonce %d", @@ -397,19 +396,19 @@ func (ob *Client) CheckTxInclusion(tx *ethtypes.Transaction, receipt *ethtypes.R } // SetLastBlockHeightScanned set last block height scanned (not necessarily caught up with external block; could be slow/paused) -func (ob *Client) SetLastBlockHeightScanned(height uint64) { +func (ob *Observer) SetLastBlockHeightScanned(height uint64) { atomic.StoreUint64(&ob.lastBlockScanned, height) metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.ChainName.String()).Set(float64(height)) } // GetLastBlockHeightScanned get last block height scanned (not necessarily caught up with external block; could be slow/paused) -func (ob *Client) GetLastBlockHeightScanned() uint64 { +func (ob *Observer) GetLastBlockHeightScanned() uint64 { height := atomic.LoadUint64(&ob.lastBlockScanned) return height } // SetLastBlockHeight set external last block height -func (ob *Client) SetLastBlockHeight(height uint64) { +func (ob *Observer) SetLastBlockHeight(height uint64) { if height >= math.MaxInt64 { panic("lastBlock is too large") } @@ -417,7 +416,7 @@ func (ob *Client) SetLastBlockHeight(height uint64) { } // GetLastBlockHeight get external last block height -func (ob *Client) GetLastBlockHeight() uint64 { +func (ob *Observer) GetLastBlockHeight() uint64 { height := atomic.LoadUint64(&ob.lastBlock) if height >= math.MaxInt64 { panic("lastBlock is too large") @@ -426,7 +425,7 @@ func (ob *Client) GetLastBlockHeight() uint64 { } // WatchGasPrice watches evm chain for gas prices and post to zetacore -func (ob *Client) WatchGasPrice() { +func (ob *Observer) WatchGasPrice() { // report gas price right away as the ticker takes time to kick in err := ob.PostGasPrice() if err != nil { @@ -461,7 +460,7 @@ func (ob *Client) WatchGasPrice() { } } -func (ob *Client) PostGasPrice() error { +func (ob *Observer) PostGasPrice() error { // GAS PRICE gasPrice, err := ob.evmClient.SuggestGasPrice(context.TODO()) @@ -489,7 +488,7 @@ func (ob *Client) PostGasPrice() error { } // TransactionByHash query transaction by hash via JSON-RPC -func (ob *Client) TransactionByHash(txHash string) (*ethrpc.Transaction, bool, error) { +func (ob *Observer) TransactionByHash(txHash string) (*ethrpc.Transaction, bool, error) { tx, err := ob.evmJSONRPC.EthGetTransactionByHash(txHash) if err != nil { return nil, false, err @@ -501,7 +500,7 @@ func (ob *Client) TransactionByHash(txHash string) (*ethrpc.Transaction, bool, e return tx, tx.BlockNumber == nil, nil } -func (ob *Client) GetBlockHeaderCached(blockNumber uint64) (*ethtypes.Header, error) { +func (ob *Observer) GetBlockHeaderCached(blockNumber uint64) (*ethtypes.Header, error) { if header, ok := ob.headerCache.Get(blockNumber); ok { return header.(*ethtypes.Header), nil } @@ -515,7 +514,7 @@ func (ob *Client) GetBlockHeaderCached(blockNumber uint64) (*ethtypes.Header, er // GetBlockByNumberCached get block by number from cache // returns block, ethrpc.Block, isFallback, isSkip, error -func (ob *Client) GetBlockByNumberCached(blockNumber uint64) (*ethrpc.Block, error) { +func (ob *Observer) GetBlockByNumberCached(blockNumber uint64) (*ethrpc.Block, error) { if block, ok := ob.blockCache.Get(blockNumber); ok { return block.(*ethrpc.Block), nil } @@ -532,12 +531,12 @@ func (ob *Client) GetBlockByNumberCached(blockNumber uint64) (*ethrpc.Block, err } // RemoveCachedBlock remove block from cache -func (ob *Client) RemoveCachedBlock(blockNumber uint64) { +func (ob *Observer) RemoveCachedBlock(blockNumber uint64) { ob.blockCache.Remove(blockNumber) } // BlockByNumber query block by number via JSON-RPC -func (ob *Client) BlockByNumber(blockNumber int) (*ethrpc.Block, error) { +func (ob *Observer) BlockByNumber(blockNumber int) (*ethrpc.Block, error) { block, err := ob.evmJSONRPC.EthGetBlockByNumber(blockNumber, true) if err != nil { return nil, err @@ -551,7 +550,7 @@ func (ob *Client) BlockByNumber(blockNumber int) (*ethrpc.Block, error) { return block, nil } -func (ob *Client) BuildLastBlock() error { +func (ob *Observer) BuildLastBlock() error { logger := ob.logger.Chain.With().Str("module", "BuildBlockIndex").Logger() envvar := ob.chain.ChainName.String() + "_SCAN_FROM" scanFromBlock := os.Getenv(envvar) @@ -589,8 +588,8 @@ func (ob *Client) BuildLastBlock() error { return nil } -// LoadDB open sql database and load data into EVMChainClient -func (ob *Client) LoadDB(dbPath string, chain chains.Chain) error { +// LoadDB open sql database and load data into EVM observer +func (ob *Observer) LoadDB(dbPath string, chain chains.Chain) error { if dbPath != "" { if _, err := os.Stat(dbPath); os.IsNotExist(err) { err := os.MkdirAll(dbPath, os.ModePerm) @@ -621,12 +620,12 @@ func (ob *Client) LoadDB(dbPath string, chain chains.Chain) error { return nil } -func (ob *Client) GetTxID(nonce uint64) string { +func (ob *Observer) GetTxID(nonce uint64) string { tssAddr := ob.Tss.EVMAddress().String() return fmt.Sprintf("%d-%s-%d", ob.chain.ChainId, tssAddr, nonce) } -func (ob *Client) postBlockHeader(tip uint64) error { +func (ob *Observer) postBlockHeader(tip uint64) error { bn := tip res, err := ob.coreClient.GetBlockHeaderChainState(ob.chain.ChainId) diff --git a/zetaclient/chains/evm/client_db_test.go b/zetaclient/chains/evm/observer_db_test.go similarity index 91% rename from zetaclient/chains/evm/client_db_test.go rename to zetaclient/chains/evm/observer_db_test.go index c9fbefce73..314a967a1f 100644 --- a/zetaclient/chains/evm/client_db_test.go +++ b/zetaclient/chains/evm/observer_db_test.go @@ -17,18 +17,18 @@ import ( const TempSQLiteDbPath = "file::memory:?cache=shared" const NumOfEntries = 2 -type EVMClientTestSuite struct { +type ObserverDBTestSuite struct { suite.Suite db *gorm.DB outTXConfirmedReceipts map[string]*ethtypes.Receipt outTXConfirmedTransaction map[string]*ethtypes.Transaction } -func TestEVMClient(t *testing.T) { - suite.Run(t, new(EVMClientTestSuite)) +func TestObserverDB(t *testing.T) { + suite.Run(t, new(ObserverDBTestSuite)) } -func (suite *EVMClientTestSuite) SetupTest() { +func (suite *ObserverDBTestSuite) SetupTest() { suite.outTXConfirmedReceipts = map[string]*ethtypes.Receipt{} suite.outTXConfirmedTransaction = map[string]*ethtypes.Transaction{} @@ -74,14 +74,14 @@ func (suite *EVMClientTestSuite) SetupTest() { } } -func (suite *EVMClientTestSuite) TearDownSuite() { +func (suite *ObserverDBTestSuite) TearDownSuite() { dbInst, err := suite.db.DB() suite.NoError(err) err = dbInst.Close() suite.NoError(err) } -func (suite *EVMClientTestSuite) TestEVMReceipts() { +func (suite *ObserverDBTestSuite) TestEVMReceipts() { for key, value := range suite.outTXConfirmedReceipts { var receipt clienttypes.ReceiptSQLType suite.db.Where("Identifier = ?", key).First(&receipt) @@ -91,7 +91,7 @@ func (suite *EVMClientTestSuite) TestEVMReceipts() { } } -func (suite *EVMClientTestSuite) TestEVMTransactions() { +func (suite *ObserverDBTestSuite) TestEVMTransactions() { for key, value := range suite.outTXConfirmedTransaction { var transaction clienttypes.TransactionSQLType suite.db.Where("Identifier = ?", key).First(&transaction) @@ -105,7 +105,7 @@ func (suite *EVMClientTestSuite) TestEVMTransactions() { } } -func (suite *EVMClientTestSuite) TestEVMLastBlock() { +func (suite *ObserverDBTestSuite) TestEVMLastBlock() { lastBlockNum := uint64(12345) dbc := suite.db.Create(clienttypes.ToLastBlockSQLType(lastBlockNum)) suite.NoError(dbc.Error) diff --git a/zetaclient/chains/evm/client_inbound.go b/zetaclient/chains/evm/observer_inbound.go similarity index 93% rename from zetaclient/chains/evm/client_inbound.go rename to zetaclient/chains/evm/observer_inbound.go index f5be870a17..91dc0c2b4a 100644 --- a/zetaclient/chains/evm/client_inbound.go +++ b/zetaclient/chains/evm/observer_inbound.go @@ -32,7 +32,7 @@ import ( ) // WatchInTx watches evm chain for incoming txs and post votes to zetacore -func (ob *Client) WatchInTx() { +func (ob *Observer) WatchInTx() { ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchInTx_%d", ob.chain.ChainId), ob.GetChainParams().InTxTicker) if err != nil { ob.logger.InTx.Error().Err(err).Msg("error creating ticker") @@ -64,7 +64,7 @@ func (ob *Client) WatchInTx() { // WatchIntxTracker gets a list of Inbound tracker suggestions from zeta-core at each tick and tries to check if the in-tx was confirmed. // If it was, it tries to broadcast the confirmation vote. If this zeta client has previously broadcast the vote, the tx would be rejected -func (ob *Client) WatchIntxTracker() { +func (ob *Observer) WatchIntxTracker() { ticker, err := clienttypes.NewDynamicTicker( fmt.Sprintf("EVM_WatchIntxTracker_%d", ob.chain.ChainId), ob.GetChainParams().InTxTicker, @@ -95,7 +95,7 @@ func (ob *Client) WatchIntxTracker() { } // ProcessInboundTrackers processes inbound trackers from zetacore -func (ob *Client) ProcessInboundTrackers() error { +func (ob *Observer) ProcessInboundTrackers() error { trackers, err := ob.coreClient.GetInboundTrackersForChain(ob.chain.ChainId) if err != nil { return err @@ -131,7 +131,7 @@ func (ob *Client) ProcessInboundTrackers() error { return nil } -func (ob *Client) ObserveInTX(sampledLogger zerolog.Logger) error { +func (ob *Observer) ObserveInTX(sampledLogger zerolog.Logger) error { // get and update latest block height blockNumber, err := ob.evmClient.BlockNumber(context.Background()) if err != nil { @@ -194,7 +194,7 @@ func (ob *Client) ObserveInTX(sampledLogger zerolog.Logger) error { // ObserveZetaSent queries the ZetaSent event from the connector contract and posts to zetacore // returns the last block successfully scanned -func (ob *Client) ObserveZetaSent(startBlock, toBlock uint64) uint64 { +func (ob *Observer) ObserveZetaSent(startBlock, toBlock uint64) uint64 { // filter ZetaSent logs addrConnector, connector, err := ob.GetConnectorContract() if err != nil { @@ -266,7 +266,7 @@ func (ob *Client) ObserveZetaSent(startBlock, toBlock uint64) uint64 { // ObserveERC20Deposited queries the ERC20CustodyDeposited event from the ERC20Custody contract and posts to zetacore // returns the last block successfully scanned -func (ob *Client) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 { +func (ob *Observer) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 { // filter ERC20CustodyDeposited logs addrCustody, erc20custodyContract, err := ob.GetERC20CustodyContract() if err != nil { @@ -347,7 +347,7 @@ func (ob *Client) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 { // ObserverTSSReceive queries the incoming gas asset to TSS address and posts to zetacore // returns the last block successfully scanned -func (ob *Client) ObserverTSSReceive(startBlock, toBlock uint64) uint64 { +func (ob *Observer) ObserverTSSReceive(startBlock, toBlock uint64) uint64 { // query incoming gas asset for bn := startBlock; bn <= toBlock; bn++ { // post new block header (if any) to zetacore and ignore error @@ -374,7 +374,7 @@ func (ob *Client) ObserverTSSReceive(startBlock, toBlock uint64) uint64 { } // CheckAndVoteInboundTokenZeta checks and votes on the given inbound Zeta token -func (ob *Client) CheckAndVoteInboundTokenZeta(tx *ethrpc.Transaction, receipt *ethtypes.Receipt, vote bool) (string, error) { +func (ob *Observer) CheckAndVoteInboundTokenZeta(tx *ethrpc.Transaction, receipt *ethtypes.Receipt, vote bool) (string, error) { // check confirmations if confirmed := ob.HasEnoughConfirmations(receipt, ob.GetLastBlockHeight()); !confirmed { return "", fmt.Errorf("intx %s has not been confirmed yet: receipt block %d", tx.Hash, receipt.BlockNumber.Uint64()) @@ -415,7 +415,7 @@ func (ob *Client) CheckAndVoteInboundTokenZeta(tx *ethrpc.Transaction, receipt * } // CheckAndVoteInboundTokenERC20 checks and votes on the given inbound ERC20 token -func (ob *Client) CheckAndVoteInboundTokenERC20(tx *ethrpc.Transaction, receipt *ethtypes.Receipt, vote bool) (string, error) { +func (ob *Observer) CheckAndVoteInboundTokenERC20(tx *ethrpc.Transaction, receipt *ethtypes.Receipt, vote bool) (string, error) { // check confirmations if confirmed := ob.HasEnoughConfirmations(receipt, ob.GetLastBlockHeight()); !confirmed { return "", fmt.Errorf("intx %s has not been confirmed yet: receipt block %d", tx.Hash, receipt.BlockNumber.Uint64()) @@ -457,7 +457,7 @@ func (ob *Client) CheckAndVoteInboundTokenERC20(tx *ethrpc.Transaction, receipt } // CheckAndVoteInboundTokenGas checks and votes on the given inbound gas token -func (ob *Client) CheckAndVoteInboundTokenGas(tx *ethrpc.Transaction, receipt *ethtypes.Receipt, vote bool) (string, error) { +func (ob *Observer) CheckAndVoteInboundTokenGas(tx *ethrpc.Transaction, receipt *ethtypes.Receipt, vote bool) (string, error) { // check confirmations if confirmed := ob.HasEnoughConfirmations(receipt, ob.GetLastBlockHeight()); !confirmed { return "", fmt.Errorf("intx %s has not been confirmed yet: receipt block %d", tx.Hash, receipt.BlockNumber.Uint64()) @@ -487,7 +487,7 @@ func (ob *Client) CheckAndVoteInboundTokenGas(tx *ethrpc.Transaction, receipt *e } // PostVoteInbound posts a vote for the given vote message -func (ob *Client) PostVoteInbound(msg *types.MsgVoteOnObservedInboundTx, coinType coin.CoinType, retryGasLimit uint64) (string, error) { +func (ob *Observer) PostVoteInbound(msg *types.MsgVoteOnObservedInboundTx, coinType coin.CoinType, retryGasLimit uint64) (string, error) { txHash := msg.InTxHash chainID := ob.chain.ChainId zetaHash, ballot, err := ob.coreClient.PostVoteInbound(zetacore.PostVoteInboundGasLimit, retryGasLimit, msg) @@ -504,13 +504,13 @@ func (ob *Client) PostVoteInbound(msg *types.MsgVoteOnObservedInboundTx, coinTyp } // HasEnoughConfirmations checks if the given receipt has enough confirmations -func (ob *Client) HasEnoughConfirmations(receipt *ethtypes.Receipt, lastHeight uint64) bool { +func (ob *Observer) HasEnoughConfirmations(receipt *ethtypes.Receipt, lastHeight uint64) bool { confHeight := receipt.BlockNumber.Uint64() + ob.GetChainParams().ConfirmationCount return lastHeight >= confHeight } // BuildInboundVoteMsgForDepositedEvent builds a inbound vote message for a Deposited event -func (ob *Client) BuildInboundVoteMsgForDepositedEvent(event *erc20custody.ERC20CustodyDeposited, sender ethcommon.Address) *types.MsgVoteOnObservedInboundTx { +func (ob *Observer) BuildInboundVoteMsgForDepositedEvent(event *erc20custody.ERC20CustodyDeposited, sender ethcommon.Address) *types.MsgVoteOnObservedInboundTx { // compliance check maybeReceiver := "" parsedAddress, _, err := chains.ParseAddressAndData(hex.EncodeToString(event.Message)) @@ -551,7 +551,7 @@ func (ob *Client) BuildInboundVoteMsgForDepositedEvent(event *erc20custody.ERC20 } // BuildInboundVoteMsgForZetaSentEvent builds a inbound vote message for a ZetaSent event -func (ob *Client) BuildInboundVoteMsgForZetaSentEvent(event *zetaconnector.ZetaConnectorNonEthZetaSent) *types.MsgVoteOnObservedInboundTx { +func (ob *Observer) BuildInboundVoteMsgForZetaSentEvent(event *zetaconnector.ZetaConnectorNonEthZetaSent) *types.MsgVoteOnObservedInboundTx { destChain := chains.GetChainFromChainID(event.DestinationChainId.Int64()) if destChain == nil { ob.logger.InTx.Warn().Msgf("chain id not supported %d", event.DestinationChainId.Int64()) @@ -602,7 +602,7 @@ func (ob *Client) BuildInboundVoteMsgForZetaSentEvent(event *zetaconnector.ZetaC } // BuildInboundVoteMsgForTokenSentToTSS builds a inbound vote message for a token sent to TSS -func (ob *Client) BuildInboundVoteMsgForTokenSentToTSS(tx *ethrpc.Transaction, sender ethcommon.Address, blockNumber uint64) *types.MsgVoteOnObservedInboundTx { +func (ob *Observer) BuildInboundVoteMsgForTokenSentToTSS(tx *ethrpc.Transaction, sender ethcommon.Address, blockNumber uint64) *types.MsgVoteOnObservedInboundTx { message := tx.Input // compliance check @@ -646,7 +646,7 @@ func (ob *Client) BuildInboundVoteMsgForTokenSentToTSS(tx *ethrpc.Transaction, s } // ObserveTSSReceiveInBlock queries the incoming gas asset to TSS address in a single block and posts votes -func (ob *Client) ObserveTSSReceiveInBlock(blockNumber uint64) error { +func (ob *Observer) ObserveTSSReceiveInBlock(blockNumber uint64) error { block, err := ob.GetBlockByNumberCached(blockNumber) if err != nil { return errors.Wrapf(err, "error getting block %d for chain %d", blockNumber, ob.chain.ChainId) @@ -670,7 +670,7 @@ func (ob *Client) ObserveTSSReceiveInBlock(blockNumber uint64) error { } // calcBlockRangeToScan calculates the next range of blocks to scan -func (ob *Client) calcBlockRangeToScan(latestConfirmed, lastScanned, batchSize uint64) (uint64, uint64) { +func (ob *Observer) calcBlockRangeToScan(latestConfirmed, lastScanned, batchSize uint64) (uint64, uint64) { startBlock := lastScanned + 1 toBlock := lastScanned + batchSize if toBlock > latestConfirmed { diff --git a/zetaclient/chains/evm/client_inbound_test.go b/zetaclient/chains/evm/observer_inbound_test.go similarity index 88% rename from zetaclient/chains/evm/client_inbound_test.go rename to zetaclient/chains/evm/observer_inbound_test.go index 68221e7e4f..986e076b45 100644 --- a/zetaclient/chains/evm/client_inbound_test.go +++ b/zetaclient/chains/evm/observer_inbound_test.go @@ -18,7 +18,7 @@ import ( clienttypes "github.com/zeta-chain/zetacore/zetaclient/types" ) -func TestEVM_CheckAndVoteInboundTokenZeta(t *testing.T) { +func Test_CheckAndVoteInboundTokenZeta(t *testing.T) { // load archived ZetaSent intx, receipt and cctx // https://etherscan.io/tx/0xf3935200c80f98502d5edc7e871ffc40ca898e134525c42c2ae3cbc5725f9d76 chain := chains.EthChain @@ -32,7 +32,7 @@ func TestEVM_CheckAndVoteInboundTokenZeta(t *testing.T) { require.NoError(t, evm.ValidateEvmTransaction(tx)) lastBlock := receipt.BlockNumber.Uint64() + confirmation - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, chainParam) ballot, err := ob.CheckAndVoteInboundTokenZeta(tx, receipt, false) require.NoError(t, err) require.Equal(t, cctx.InboundTxParams.InboundTxBallotIndex, ballot) @@ -42,7 +42,7 @@ func TestEVM_CheckAndVoteInboundTokenZeta(t *testing.T) { require.NoError(t, evm.ValidateEvmTransaction(tx)) lastBlock := receipt.BlockNumber.Uint64() + confirmation - 1 - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, chainParam) _, err := ob.CheckAndVoteInboundTokenZeta(tx, receipt, false) require.ErrorContains(t, err, "not been confirmed") }) @@ -52,7 +52,7 @@ func TestEVM_CheckAndVoteInboundTokenZeta(t *testing.T) { require.NoError(t, evm.ValidateEvmTransaction(tx)) lastBlock := receipt.BlockNumber.Uint64() + confirmation - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, chainParam) ballot, err := ob.CheckAndVoteInboundTokenZeta(tx, receipt, true) require.NoError(t, err) require.Equal(t, "", ballot) @@ -63,13 +63,13 @@ func TestEVM_CheckAndVoteInboundTokenZeta(t *testing.T) { lastBlock := receipt.BlockNumber.Uint64() + confirmation chainID = 56 // use BSC chain connector - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, mocks.MockChainParams(chainID, confirmation)) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, mocks.MockChainParams(chainID, confirmation)) _, err := ob.CheckAndVoteInboundTokenZeta(tx, receipt, true) require.ErrorContains(t, err, "emitter address mismatch") }) } -func TestEVM_CheckAndVoteInboundTokenERC20(t *testing.T) { +func Test_CheckAndVoteInboundTokenERC20(t *testing.T) { // load archived ERC20 intx, receipt and cctx // https://etherscan.io/tx/0x4ea69a0e2ff36f7548ab75791c3b990e076e2a4bffeb616035b239b7d33843da chain := chains.EthChain @@ -83,7 +83,7 @@ func TestEVM_CheckAndVoteInboundTokenERC20(t *testing.T) { require.NoError(t, evm.ValidateEvmTransaction(tx)) lastBlock := receipt.BlockNumber.Uint64() + confirmation - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, chainParam) ballot, err := ob.CheckAndVoteInboundTokenERC20(tx, receipt, false) require.NoError(t, err) require.Equal(t, cctx.InboundTxParams.InboundTxBallotIndex, ballot) @@ -93,7 +93,7 @@ func TestEVM_CheckAndVoteInboundTokenERC20(t *testing.T) { require.NoError(t, evm.ValidateEvmTransaction(tx)) lastBlock := receipt.BlockNumber.Uint64() + confirmation - 1 - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, chainParam) _, err := ob.CheckAndVoteInboundTokenERC20(tx, receipt, false) require.ErrorContains(t, err, "not been confirmed") }) @@ -103,7 +103,7 @@ func TestEVM_CheckAndVoteInboundTokenERC20(t *testing.T) { require.NoError(t, evm.ValidateEvmTransaction(tx)) lastBlock := receipt.BlockNumber.Uint64() + confirmation - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, chainParam) ballot, err := ob.CheckAndVoteInboundTokenERC20(tx, receipt, true) require.NoError(t, err) require.Equal(t, "", ballot) @@ -114,13 +114,13 @@ func TestEVM_CheckAndVoteInboundTokenERC20(t *testing.T) { lastBlock := receipt.BlockNumber.Uint64() + confirmation chainID = 56 // use BSC chain ERC20 custody - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, mocks.MockChainParams(chainID, confirmation)) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, mocks.MockChainParams(chainID, confirmation)) _, err := ob.CheckAndVoteInboundTokenERC20(tx, receipt, true) require.ErrorContains(t, err, "emitter address mismatch") }) } -func TestEVM_CheckAndVoteInboundTokenGas(t *testing.T) { +func Test_CheckAndVoteInboundTokenGas(t *testing.T) { // load archived Gas intx, receipt and cctx // https://etherscan.io/tx/0xeaec67d5dd5d85f27b21bef83e01cbdf59154fd793ea7a22c297f7c3a722c532 chain := chains.EthChain @@ -134,7 +134,7 @@ func TestEVM_CheckAndVoteInboundTokenGas(t *testing.T) { require.NoError(t, evm.ValidateEvmTransaction(tx)) lastBlock := receipt.BlockNumber.Uint64() + confirmation - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, chainParam) ballot, err := ob.CheckAndVoteInboundTokenGas(tx, receipt, false) require.NoError(t, err) require.Equal(t, cctx.InboundTxParams.InboundTxBallotIndex, ballot) @@ -144,7 +144,7 @@ func TestEVM_CheckAndVoteInboundTokenGas(t *testing.T) { require.NoError(t, evm.ValidateEvmTransaction(tx)) lastBlock := receipt.BlockNumber.Uint64() + confirmation - 1 - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, chainParam) _, err := ob.CheckAndVoteInboundTokenGas(tx, receipt, false) require.ErrorContains(t, err, "not been confirmed") }) @@ -154,7 +154,7 @@ func TestEVM_CheckAndVoteInboundTokenGas(t *testing.T) { require.NoError(t, evm.ValidateEvmTransaction(tx)) lastBlock := receipt.BlockNumber.Uint64() + confirmation - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, chainParam) ballot, err := ob.CheckAndVoteInboundTokenGas(tx, receipt, false) require.ErrorContains(t, err, "not TSS address") require.Equal(t, "", ballot) @@ -165,7 +165,7 @@ func TestEVM_CheckAndVoteInboundTokenGas(t *testing.T) { require.NoError(t, evm.ValidateEvmTransaction(tx)) lastBlock := receipt.BlockNumber.Uint64() + confirmation - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, chainParam) ballot, err := ob.CheckAndVoteInboundTokenGas(tx, receipt, false) require.ErrorContains(t, err, "not a successful tx") require.Equal(t, "", ballot) @@ -176,14 +176,14 @@ func TestEVM_CheckAndVoteInboundTokenGas(t *testing.T) { require.NoError(t, evm.ValidateEvmTransaction(tx)) lastBlock := receipt.BlockNumber.Uint64() + confirmation - ob := MockEVMClient(t, chain, nil, nil, nil, nil, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, lastBlock, chainParam) ballot, err := ob.CheckAndVoteInboundTokenGas(tx, receipt, false) require.NoError(t, err) require.Equal(t, "", ballot) }) } -func TestEVM_BuildInboundVoteMsgForZetaSentEvent(t *testing.T) { +func Test_BuildInboundVoteMsgForZetaSentEvent(t *testing.T) { // load archived ZetaSent receipt // https://etherscan.io/tx/0xf3935200c80f98502d5edc7e871ffc40ca898e134525c42c2ae3cbc5725f9d76 chainID := int64(1) @@ -193,7 +193,7 @@ func TestEVM_BuildInboundVoteMsgForZetaSentEvent(t *testing.T) { cctx := testutils.LoadCctxByIntx(t, chainID, coin.CoinType_Zeta, intxHash) // parse ZetaSent event - ob := MockEVMClient(t, chain, nil, nil, nil, nil, 1, mocks.MockChainParams(1, 1)) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, 1, mocks.MockChainParams(1, 1)) connector := mocks.MockConnectorNonEth(chainID) event := testutils.ParseReceiptZetaSent(receipt, connector) @@ -230,7 +230,7 @@ func TestEVM_BuildInboundVoteMsgForZetaSentEvent(t *testing.T) { }) } -func TestEVM_BuildInboundVoteMsgForDepositedEvent(t *testing.T) { +func Test_BuildInboundVoteMsgForDepositedEvent(t *testing.T) { // load archived Deposited receipt // https://etherscan.io/tx/0x4ea69a0e2ff36f7548ab75791c3b990e076e2a4bffeb616035b239b7d33843da chain := chains.EthChain @@ -240,7 +240,7 @@ func TestEVM_BuildInboundVoteMsgForDepositedEvent(t *testing.T) { cctx := testutils.LoadCctxByIntx(t, chainID, coin.CoinType_ERC20, intxHash) // parse Deposited event - ob := MockEVMClient(t, chain, nil, nil, nil, nil, 1, mocks.MockChainParams(1, 1)) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, 1, mocks.MockChainParams(1, 1)) custody := mocks.MockERC20Custody(chainID) event := testutils.ParseReceiptERC20Deposited(receipt, custody) sender := ethcommon.HexToAddress(tx.From) @@ -275,7 +275,7 @@ func TestEVM_BuildInboundVoteMsgForDepositedEvent(t *testing.T) { }) } -func TestEVM_BuildInboundVoteMsgForTokenSentToTSS(t *testing.T) { +func Test_BuildInboundVoteMsgForTokenSentToTSS(t *testing.T) { // load archived gas token transfer to TSS // https://etherscan.io/tx/0xeaec67d5dd5d85f27b21bef83e01cbdf59154fd793ea7a22c297f7c3a722c532 chain := chains.EthChain @@ -292,7 +292,7 @@ func TestEVM_BuildInboundVoteMsgForTokenSentToTSS(t *testing.T) { require.NoError(t, evm.ValidateEvmTransaction(txDonation)) // create test compliance config - ob := MockEVMClient(t, chain, nil, nil, nil, nil, 1, mocks.MockChainParams(1, 1)) + ob := MockEVMObserver(t, chain, nil, nil, nil, nil, 1, mocks.MockChainParams(1, 1)) cfg := config.Config{ ComplianceConfig: config.ComplianceConfig{}, } @@ -325,7 +325,7 @@ func TestEVM_BuildInboundVoteMsgForTokenSentToTSS(t *testing.T) { }) } -func TestEVM_ObserveTSSReceiveInBlock(t *testing.T) { +func Test_ObserveTSSReceiveInBlock(t *testing.T) { // https://etherscan.io/tx/0xeaec67d5dd5d85f27b21bef83e01cbdf59154fd793ea7a22c297f7c3a722c532 chain := chains.EthChain chainID := chain.ChainId @@ -350,7 +350,7 @@ func TestEVM_ObserveTSSReceiveInBlock(t *testing.T) { lastBlock := receipt.BlockNumber.Uint64() + confirmation t.Run("should observe TSS receive in block", func(t *testing.T) { - ob := MockEVMClient(t, chain, evmClient, evmJSONRPC, coreClient, tss, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, evmClient, evmJSONRPC, coreClient, tss, lastBlock, chainParam) // feed archived block and receipt evmJSONRPC.WithBlock(block) @@ -359,20 +359,20 @@ func TestEVM_ObserveTSSReceiveInBlock(t *testing.T) { require.NoError(t, err) }) t.Run("should not observe on error getting block", func(t *testing.T) { - ob := MockEVMClient(t, chain, evmClient, evmJSONRPC, coreClient, tss, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, evmClient, evmJSONRPC, coreClient, tss, lastBlock, chainParam) err := ob.ObserveTSSReceiveInBlock(blockNumber) // error getting block is expected because the mock JSONRPC contains no block require.ErrorContains(t, err, "error getting block") }) t.Run("should not observe on error getting receipt", func(t *testing.T) { - ob := MockEVMClient(t, chain, evmClient, evmJSONRPC, coreClient, tss, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, evmClient, evmJSONRPC, coreClient, tss, lastBlock, chainParam) evmJSONRPC.WithBlock(block) err := ob.ObserveTSSReceiveInBlock(blockNumber) // error getting block is expected because the mock evmClient contains no receipt require.ErrorContains(t, err, "error getting receipt") }) t.Run("should not observe on error posting vote", func(t *testing.T) { - ob := MockEVMClient(t, chain, evmClient, evmJSONRPC, coreClient, tss, lastBlock, chainParam) + ob := MockEVMObserver(t, chain, evmClient, evmJSONRPC, coreClient, tss, lastBlock, chainParam) // feed archived block and pause zetacore client evmJSONRPC.WithBlock(block) diff --git a/zetaclient/chains/evm/client_outbound.go b/zetaclient/chains/evm/observer_outbound.go similarity index 97% rename from zetaclient/chains/evm/client_outbound.go rename to zetaclient/chains/evm/observer_outbound.go index 95c4b14d77..e45bc734cd 100644 --- a/zetaclient/chains/evm/client_outbound.go +++ b/zetaclient/chains/evm/observer_outbound.go @@ -26,7 +26,7 @@ import ( ) // WatchOutTx watches evm chain for outgoing txs status -func (ob *Client) WatchOutTx() { +func (ob *Observer) WatchOutTx() { ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchOutTx_%d", ob.chain.ChainId), ob.GetChainParams().OutTxTicker) if err != nil { ob.logger.OutTx.Error().Err(err).Msg("error creating ticker") @@ -82,7 +82,7 @@ func (ob *Client) WatchOutTx() { } // PostVoteOutbound posts vote to zetacore for the confirmed outtx -func (ob *Client) PostVoteOutbound( +func (ob *Observer) PostVoteOutbound( cctxIndex string, receipt *ethtypes.Receipt, transaction *ethtypes.Transaction, @@ -115,7 +115,7 @@ func (ob *Client) PostVoteOutbound( // IsOutboundProcessed checks outtx status and returns (isIncluded, isConfirmed, error) // It also posts vote to zetacore if the tx is confirmed -func (ob *Client) IsOutboundProcessed(cctx *crosschaintypes.CrossChainTx, logger zerolog.Logger) (bool, bool, error) { +func (ob *Observer) IsOutboundProcessed(cctx *crosschaintypes.CrossChainTx, logger zerolog.Logger) (bool, bool, error) { // skip if outtx is not confirmed nonce := cctx.GetCurrentOutTxParam().OutboundTxTssNonce if !ob.IsTxConfirmed(nonce) { @@ -308,7 +308,7 @@ func ParseOuttxReceivedValue( // checkConfirmedTx checks if a txHash is confirmed // returns (receipt, transaction, true) if confirmed or (nil, nil, false) otherwise -func (ob *Client) checkConfirmedTx(txHash string, nonce uint64) (*ethtypes.Receipt, *ethtypes.Transaction, bool) { +func (ob *Observer) checkConfirmedTx(txHash string, nonce uint64) (*ethtypes.Receipt, *ethtypes.Transaction, bool) { ctxt, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() diff --git a/zetaclient/chains/evm/client_outbound_test.go b/zetaclient/chains/evm/observer_outbound_test.go similarity index 97% rename from zetaclient/chains/evm/client_outbound_test.go rename to zetaclient/chains/evm/observer_outbound_test.go index e439978dd8..9995b5bb47 100644 --- a/zetaclient/chains/evm/client_outbound_test.go +++ b/zetaclient/chains/evm/observer_outbound_test.go @@ -41,7 +41,7 @@ func Test_IsOutboundProcessed(t *testing.T) { t.Run("should post vote and return true if outtx is processed", func(t *testing.T) { // create evm client and set outtx and receipt - client := MockEVMClient(t, chain, nil, nil, nil, nil, 1, chainParam) + client := MockEVMObserver(t, chain, nil, nil, nil, nil, 1, chainParam) client.SetTxNReceipt(nonce, receipt, outtx) // post outbound vote isIncluded, isConfirmed, err := client.IsOutboundProcessed(cctx, zerolog.Logger{}) @@ -57,7 +57,7 @@ func Test_IsOutboundProcessed(t *testing.T) { cctx.InboundTxParams.Sender = sample.EthAddress().Hex() // create evm client and set outtx and receipt - client := MockEVMClient(t, chain, nil, nil, nil, nil, 1, chainParam) + client := MockEVMObserver(t, chain, nil, nil, nil, nil, 1, chainParam) client.SetTxNReceipt(nonce, receipt, outtx) // modify compliance config to restrict sender address @@ -75,7 +75,7 @@ func Test_IsOutboundProcessed(t *testing.T) { }) t.Run("should return false if outtx is not confirmed", func(t *testing.T) { // create evm client and DO NOT set outtx as confirmed - client := MockEVMClient(t, chain, nil, nil, nil, nil, 1, chainParam) + client := MockEVMObserver(t, chain, nil, nil, nil, nil, 1, chainParam) isIncluded, isConfirmed, err := client.IsOutboundProcessed(cctx, zerolog.Logger{}) require.NoError(t, err) require.False(t, isIncluded) @@ -83,7 +83,7 @@ func Test_IsOutboundProcessed(t *testing.T) { }) t.Run("should fail if unable to parse ZetaReceived event", func(t *testing.T) { // create evm client and set outtx and receipt - client := MockEVMClient(t, chain, nil, nil, nil, nil, 1, chainParam) + client := MockEVMObserver(t, chain, nil, nil, nil, nil, 1, chainParam) client.SetTxNReceipt(nonce, receipt, outtx) // set connector contract address to an arbitrary address to make event parsing fail @@ -118,7 +118,7 @@ func Test_IsOutboundProcessed_ContractError(t *testing.T) { t.Run("should fail if unable to get connector/custody contract", func(t *testing.T) { // create evm client and set outtx and receipt - client := MockEVMClient(t, chain, nil, nil, nil, nil, 1, chainParam) + client := MockEVMObserver(t, chain, nil, nil, nil, nil, 1, chainParam) client.SetTxNReceipt(nonce, receipt, outtx) abiConnector := zetaconnector.ZetaConnectorNonEthMetaData.ABI abiCustody := erc20custody.ERC20CustodyMetaData.ABI @@ -157,7 +157,7 @@ func Test_PostVoteOutbound(t *testing.T) { // create evm client using mock zetacore client and post outbound vote coreClient := mocks.NewMockZetaCoreClient() - client := MockEVMClient(t, chain, nil, nil, coreClient, nil, 1, observertypes.ChainParams{}) + client := MockEVMObserver(t, chain, nil, nil, coreClient, nil, 1, observertypes.ChainParams{}) client.PostVoteOutbound(cctx.Index, receipt, outtx, receiveValue, receiveStatus, nonce, coinType, zerolog.Logger{}) // pause the mock zetacore client to simulate error posting vote diff --git a/zetaclient/chains/evm/client_test.go b/zetaclient/chains/evm/observer_test.go similarity index 92% rename from zetaclient/chains/evm/client_test.go rename to zetaclient/chains/evm/observer_test.go index 3d990e4176..a4ea31b72b 100644 --- a/zetaclient/chains/evm/client_test.go +++ b/zetaclient/chains/evm/observer_test.go @@ -58,8 +58,8 @@ func getAppContext(evmChain chains.Chain, evmChainParams *observertypes.ChainPar return appCtx, cfg.EVMChainConfigs[evmChain.ChainId] } -// MockEVMClient creates a mock ChainClient with custom chain, TSS, params etc -func MockEVMClient( +// MockEVMObserver creates a mock ChainObserver with custom chain, TSS, params etc +func MockEVMObserver( t *testing.T, chain chains.Chain, evmClient interfaces.EVMRPCClient, @@ -67,7 +67,7 @@ func MockEVMClient( coreClient interfaces.ZetaCoreClient, tss interfaces.TSSSigner, lastBlock uint64, - params observertypes.ChainParams) *evm.Client { + params observertypes.ChainParams) *evm.Observer { // use default mock zetacore client if not provided if coreClient == nil { coreClient = mocks.NewMockZetaCoreClient() @@ -79,8 +79,8 @@ func MockEVMClient( // create app context appCtx, evmCfg := getAppContext(chain, ¶ms) - // create chain client - client, err := evm.NewClient(appCtx, coreClient, tss, "", common.ClientLogger{}, evmCfg, nil) + // create chain observer + client, err := evm.NewObserver(appCtx, coreClient, tss, "", common.ClientLogger{}, evmCfg, nil) require.NoError(t, err) client.WithEvmClient(evmClient) client.WithEvmJSONRPC(evmJSONRPC) @@ -89,11 +89,11 @@ func MockEVMClient( return client } -func TestEVM_BlockCache(t *testing.T) { +func Test_BlockCache(t *testing.T) { // create client blockCache, err := lru.New(1000) require.NoError(t, err) - ob := &evm.Client{Mu: &sync.Mutex{}} + ob := &evm.Observer{Mu: &sync.Mutex{}} ob.WithBlockCache(blockCache) // delete non-existing block should not panic @@ -116,7 +116,7 @@ func TestEVM_BlockCache(t *testing.T) { ob.RemoveCachedBlock(blockNumber) } -func TestEVM_CheckTxInclusion(t *testing.T) { +func Test_CheckTxInclusion(t *testing.T) { // load archived evm outtx Gas // https://etherscan.io/tx/0xd13b593eb62b5500a00e288cc2fb2c8af1339025c0e6bc6183b8bef2ebbed0d3 chainID := int64(1) @@ -132,7 +132,7 @@ func TestEVM_CheckTxInclusion(t *testing.T) { // create client blockCache, err := lru.New(1000) require.NoError(t, err) - ob := &evm.Client{Mu: &sync.Mutex{}} + ob := &evm.Observer{Mu: &sync.Mutex{}} // save block to cache blockCache.Add(blockNumber, block) @@ -167,7 +167,7 @@ func TestEVM_CheckTxInclusion(t *testing.T) { }) } -func TestEVM_VoteOutboundBallot(t *testing.T) { +func Test_VoteOutboundBallot(t *testing.T) { // load archived evm outtx Gas // https://etherscan.io/tx/0xd13b593eb62b5500a00e288cc2fb2c8af1339025c0e6bc6183b8bef2ebbed0d3 chainID := int64(1) diff --git a/zetaclient/chains/evm/signer.go b/zetaclient/chains/evm/signer.go index 7a2ea50d0b..4fe98fb0fb 100644 --- a/zetaclient/chains/evm/signer.go +++ b/zetaclient/chains/evm/signer.go @@ -329,7 +329,7 @@ func (signer *Signer) TryProcessOutTx( cctx *types.CrossChainTx, outTxProc *outtxprocessor.Processor, outTxID string, - chainClient interfaces.ChainClient, + chainObserver interfaces.ChainObserver, coreClient interfaces.ZetaCoreClient, height uint64, ) { @@ -345,14 +345,14 @@ func (signer *Signer) TryProcessOutTx( }() myID := coreClient.GetKeys().GetOperatorAddress() - evmClient, ok := chainClient.(*Client) + evmObserver, ok := chainObserver.(*Observer) if !ok { - logger.Error().Msg("chain client is not an EVMChainClient") + logger.Error().Msg("chain observer is not an EVM observer") return } // Setup Transaction input - txData, skipTx, err := NewOutBoundTransactionData(cctx, evmClient, signer.client, logger, height) + txData, skipTx, err := NewOutBoundTransactionData(cctx, evmObserver, signer.client, logger, height) if err != nil { logger.Err(err).Msg("error setting up transaction input fields") return @@ -371,7 +371,7 @@ func (signer *Signer) TryProcessOutTx( // compliance check goes first if compliance.IsCctxRestricted(cctx) { compliance.PrintComplianceLog(logger, signer.logger.Compliance, - true, evmClient.chain.ChainId, cctx.Index, cctx.InboundTxParams.Sender, txData.to.Hex(), cctx.GetCurrentOutTxParam().CoinType.String()) + true, evmObserver.chain.ChainId, cctx.Index, cctx.InboundTxParams.Sender, txData.to.Hex(), cctx.GetCurrentOutTxParam().CoinType.String()) tx, err = signer.SignCancelTx(txData.nonce, txData.gasPrice, height) // cancel the tx if err != nil { logger.Warn().Err(err).Msg(SignerErrorMsg(cctx)) diff --git a/zetaclient/chains/evm/client_outbound_tx_data.go b/zetaclient/chains/evm/signer_outbound_tx_data.go similarity index 97% rename from zetaclient/chains/evm/client_outbound_tx_data.go rename to zetaclient/chains/evm/signer_outbound_tx_data.go index 5f806b00a0..c7cb7eb21f 100644 --- a/zetaclient/chains/evm/client_outbound_tx_data.go +++ b/zetaclient/chains/evm/signer_outbound_tx_data.go @@ -109,7 +109,7 @@ func (txData *OutBoundTransactionData) SetupGas( // 3. error func NewOutBoundTransactionData( cctx *types.CrossChainTx, - evmClient *Client, + evmObserver *Observer, evmRPC interfaces.EVMRPCClient, logger zerolog.Logger, height uint64, @@ -135,7 +135,7 @@ func NewOutBoundTransactionData( // Get nonce, Early return if the cctx is already processed nonce := cctx.GetCurrentOutTxParam().OutboundTxTssNonce - included, confirmed, err := evmClient.IsOutboundProcessed(cctx, logger) + included, confirmed, err := evmObserver.IsOutboundProcessed(cctx, logger) if err != nil { return nil, true, errors.New("IsOutboundProcessed failed") } @@ -159,7 +159,7 @@ func NewOutBoundTransactionData( copy(txData.sendHash[:32], sendHash[:32]) // In case there is a pending transaction, make sure this keysign is a transaction replacement - pendingTx := evmClient.GetPendingTx(nonce) + pendingTx := evmObserver.GetPendingTx(nonce) if pendingTx != nil { if txData.gasPrice.Cmp(pendingTx.GasPrice()) > 0 { logger.Info().Msgf("replace pending outTx %s nonce %d using gas price %d", pendingTx.Hash().Hex(), nonce, txData.gasPrice) diff --git a/zetaclient/chains/evm/client_outbound_tx_data_test.go b/zetaclient/chains/evm/signer_outbound_tx_data_test.go similarity index 85% rename from zetaclient/chains/evm/client_outbound_tx_data_test.go rename to zetaclient/chains/evm/signer_outbound_tx_data_test.go index 6effd80f34..3c7b4fdb4d 100644 --- a/zetaclient/chains/evm/client_outbound_tx_data_test.go +++ b/zetaclient/chains/evm/signer_outbound_tx_data_test.go @@ -69,12 +69,12 @@ func TestSigner_NewOutBoundTransactionData(t *testing.T) { evmSigner, err := getNewEvmSigner() require.NoError(t, err) - mockChainClient, err := getNewEvmChainClient() + mockObserver, err := getNewEvmChainObserver() require.NoError(t, err) t.Run("NewOutBoundTransactionData success", func(t *testing.T) { cctx := getCCTX(t) - _, skip, err := NewOutBoundTransactionData(cctx, mockChainClient, evmSigner.EvmClient(), zerolog.Logger{}, 123) + _, skip, err := NewOutBoundTransactionData(cctx, mockObserver, evmSigner.EvmClient(), zerolog.Logger{}, 123) require.False(t, skip) require.NoError(t, err) }) @@ -82,7 +82,7 @@ func TestSigner_NewOutBoundTransactionData(t *testing.T) { t.Run("NewOutBoundTransactionData skip", func(t *testing.T) { cctx := getCCTX(t) cctx.CctxStatus.Status = types.CctxStatus_Aborted - _, skip, err := NewOutBoundTransactionData(cctx, mockChainClient, evmSigner.EvmClient(), zerolog.Logger{}, 123) + _, skip, err := NewOutBoundTransactionData(cctx, mockObserver, evmSigner.EvmClient(), zerolog.Logger{}, 123) require.NoError(t, err) require.True(t, skip) }) @@ -90,7 +90,7 @@ func TestSigner_NewOutBoundTransactionData(t *testing.T) { t.Run("NewOutBoundTransactionData unknown chain", func(t *testing.T) { cctx := getInvalidCCTX(t) require.NoError(t, err) - _, skip, err := NewOutBoundTransactionData(cctx, mockChainClient, evmSigner.EvmClient(), zerolog.Logger{}, 123) + _, skip, err := NewOutBoundTransactionData(cctx, mockObserver, evmSigner.EvmClient(), zerolog.Logger{}, 123) require.ErrorContains(t, err, "unknown chain") require.True(t, skip) }) @@ -99,7 +99,7 @@ func TestSigner_NewOutBoundTransactionData(t *testing.T) { cctx := getCCTX(t) require.NoError(t, err) cctx.GetCurrentOutTxParam().OutboundTxGasPrice = "invalidGasPrice" - _, skip, err := NewOutBoundTransactionData(cctx, mockChainClient, evmSigner.EvmClient(), zerolog.Logger{}, 123) + _, skip, err := NewOutBoundTransactionData(cctx, mockObserver, evmSigner.EvmClient(), zerolog.Logger{}, 123) require.True(t, skip) require.ErrorContains(t, err, "cannot convert gas price") }) diff --git a/zetaclient/chains/evm/signer_test.go b/zetaclient/chains/evm/signer_test.go index b3b1b687aa..aaabab5088 100644 --- a/zetaclient/chains/evm/signer_test.go +++ b/zetaclient/chains/evm/signer_test.go @@ -45,7 +45,7 @@ func getNewEvmSigner() (*Signer, error) { ts) } -func getNewEvmChainClient() (*Client, error) { +func getNewEvmChainObserver() (*Observer, error) { logger := common.ClientLogger{} ts := &metrics.TelemetryServer{} cfg := config.NewConfig() @@ -56,7 +56,7 @@ func getNewEvmChainClient() (*Client, error) { coreCTX := context.NewZetaCoreContext(cfg) appCTX := context.NewAppContext(coreCTX, cfg) - return NewClient(appCTX, mocks.NewMockZetaCoreClient(), tss, "", logger, evmcfg, ts) + return NewObserver(appCTX, mocks.NewMockZetaCoreClient(), tss, "", logger, evmcfg, ts) } func getNewOutTxProcessor() *outtxprocessor.Processor { @@ -104,10 +104,10 @@ func TestSigner_TryProcessOutTx(t *testing.T) { require.NoError(t, err) cctx := getCCTX(t) processorManager := getNewOutTxProcessor() - mockChainClient, err := getNewEvmChainClient() + mockObserver, err := getNewEvmChainObserver() require.NoError(t, err) - evmSigner.TryProcessOutTx(cctx, processorManager, "123", mockChainClient, mocks.NewMockZetaCoreClient(), 123) + evmSigner.TryProcessOutTx(cctx, processorManager, "123", mockObserver, mocks.NewMockZetaCoreClient(), 123) //Check if cctx was signed and broadcasted list := evmSigner.GetReportedTxList() @@ -122,9 +122,9 @@ func TestSigner_SignOutboundTx(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockChainClient, err := getNewEvmChainClient() + mockObserver, err := getNewEvmChainObserver() require.NoError(t, err) - txData, skip, err := NewOutBoundTransactionData(cctx, mockChainClient, evmSigner.EvmClient(), zerolog.Logger{}, 123) + txData, skip, err := NewOutBoundTransactionData(cctx, mockObserver, evmSigner.EvmClient(), zerolog.Logger{}, 123) require.False(t, skip) require.NoError(t, err) @@ -151,9 +151,9 @@ func TestSigner_SignRevertTx(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockChainClient, err := getNewEvmChainClient() + mockObserver, err := getNewEvmChainObserver() require.NoError(t, err) - txData, skip, err := NewOutBoundTransactionData(cctx, mockChainClient, evmSigner.EvmClient(), zerolog.Logger{}, 123) + txData, skip, err := NewOutBoundTransactionData(cctx, mockObserver, evmSigner.EvmClient(), zerolog.Logger{}, 123) require.False(t, skip) require.NoError(t, err) @@ -180,9 +180,9 @@ func TestSigner_SignWithdrawTx(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockChainClient, err := getNewEvmChainClient() + mockObserver, err := getNewEvmChainObserver() require.NoError(t, err) - txData, skip, err := NewOutBoundTransactionData(cctx, mockChainClient, evmSigner.EvmClient(), zerolog.Logger{}, 123) + txData, skip, err := NewOutBoundTransactionData(cctx, mockObserver, evmSigner.EvmClient(), zerolog.Logger{}, 123) require.False(t, skip) require.NoError(t, err) @@ -209,9 +209,9 @@ func TestSigner_SignCommandTx(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockChainClient, err := getNewEvmChainClient() + mockObserver, err := getNewEvmChainObserver() require.NoError(t, err) - txData, skip, err := NewOutBoundTransactionData(cctx, mockChainClient, evmSigner.EvmClient(), zerolog.Logger{}, 123) + txData, skip, err := NewOutBoundTransactionData(cctx, mockObserver, evmSigner.EvmClient(), zerolog.Logger{}, 123) require.False(t, skip) require.NoError(t, err) @@ -256,9 +256,9 @@ func TestSigner_SignERC20WithdrawTx(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockChainClient, err := getNewEvmChainClient() + mockObserver, err := getNewEvmChainObserver() require.NoError(t, err) - txData, skip, err := NewOutBoundTransactionData(cctx, mockChainClient, evmSigner.EvmClient(), zerolog.Logger{}, 123) + txData, skip, err := NewOutBoundTransactionData(cctx, mockObserver, evmSigner.EvmClient(), zerolog.Logger{}, 123) require.False(t, skip) require.NoError(t, err) @@ -285,9 +285,9 @@ func TestSigner_BroadcastOutTx(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockChainClient, err := getNewEvmChainClient() + mockObserver, err := getNewEvmChainObserver() require.NoError(t, err) - txData, skip, err := NewOutBoundTransactionData(cctx, mockChainClient, evmSigner.EvmClient(), zerolog.Logger{}, 123) + txData, skip, err := NewOutBoundTransactionData(cctx, mockObserver, evmSigner.EvmClient(), zerolog.Logger{}, 123) require.False(t, skip) require.NoError(t, err) @@ -327,9 +327,9 @@ func TestSigner_SignWhitelistERC20Cmd(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockChainClient, err := getNewEvmChainClient() + mockObserver, err := getNewEvmChainObserver() require.NoError(t, err) - txData, skip, err := NewOutBoundTransactionData(cctx, mockChainClient, evmSigner.EvmClient(), zerolog.Logger{}, 123) + txData, skip, err := NewOutBoundTransactionData(cctx, mockObserver, evmSigner.EvmClient(), zerolog.Logger{}, 123) require.False(t, skip) require.NoError(t, err) diff --git a/zetaclient/chains/interfaces/interfaces.go b/zetaclient/chains/interfaces/interfaces.go index ab8cbfee7e..12806bdc35 100644 --- a/zetaclient/chains/interfaces/interfaces.go +++ b/zetaclient/chains/interfaces/interfaces.go @@ -34,8 +34,8 @@ const ( Descending Order = "DESC" ) -// ChainClient is the interface for chain clients -type ChainClient interface { +// ChainObserver is the interface for chain observer +type ChainObserver interface { Start() Stop() IsOutboundProcessed(cctx *crosschaintypes.CrossChainTx, logger zerolog.Logger) (bool, bool, error) @@ -51,7 +51,7 @@ type ChainSigner interface { cctx *crosschaintypes.CrossChainTx, outTxProc *outtxprocessor.Processor, outTxID string, - evmClient ChainClient, + observer ChainObserver, coreClient ZetaCoreClient, height uint64, ) diff --git a/zetaclient/common/logger.go b/zetaclient/common/logger.go index 6098b67084..ebe773abfc 100644 --- a/zetaclient/common/logger.go +++ b/zetaclient/common/logger.go @@ -5,13 +5,13 @@ import ( "github.com/rs/zerolog/log" ) -// ClientLogger is a struct that contains the logger for a chain client +// ClientLogger is a struct that contains the logger for a chain observer type ClientLogger struct { Std zerolog.Logger Compliance zerolog.Logger } -// DefaultLoggers returns the default loggers for a chain client +// DefaultLoggers returns the default loggers for a chain observer func DefaultLoggers() ClientLogger { return ClientLogger{ Std: log.Logger, diff --git a/zetaclient/orchestrator/orchestrator.go b/zetaclient/orchestrator/orchestrator.go index 5c6d0f540a..e903cc30b4 100644 --- a/zetaclient/orchestrator/orchestrator.go +++ b/zetaclient/orchestrator/orchestrator.go @@ -36,14 +36,14 @@ type Log struct { Sampled zerolog.Logger } -// Orchestrator wraps the zetacore client, chain clients and signers. This is the high level object used for CCTX scheduling +// Orchestrator wraps the zetacore client, chain observers and signers. This is the high level object used for CCTX scheduling type Orchestrator struct { // zetacore client coreClient interfaces.ZetaCoreClient - // chain signers and clients - signerMap map[int64]interfaces.ChainSigner - clientMap map[int64]interfaces.ChainClient + // chain signers and observers + signerMap map[int64]interfaces.ChainSigner + observerMap map[int64]interfaces.ChainObserver // outtx processor outTxProc *outtxprocessor.Processor @@ -61,7 +61,7 @@ type Orchestrator struct { func NewOrchestrator( coreClient interfaces.ZetaCoreClient, signerMap map[int64]interfaces.ChainSigner, - clientMap map[int64]interfaces.ChainClient, + observerMap map[int64]interfaces.ChainObserver, logger zerolog.Logger, ts *metrics.TelemetryServer, ) *Orchestrator { @@ -76,10 +76,10 @@ func NewOrchestrator( } oc.logger.Sampled = oc.logger.Std.Sample(&zerolog.BasicSampler{N: loggerSamplingRate}) - // set zetacore client, signers and chain clients + // set zetacore client, signers and chain observers oc.coreClient = coreClient oc.signerMap = signerMap - oc.clientMap = clientMap + oc.observerMap = observerMap // create outtx processor manager oc.outTxProc = outtxprocessor.NewProcessor(logger) @@ -103,7 +103,7 @@ func (oc *Orchestrator) MonitorCore(appContext *context.AppContext) { oc.coreClient.Pause() // now stop everything close(oc.stop) // this stops the startSendScheduler() loop - for _, c := range oc.clientMap { + for _, c := range oc.observerMap { c.Stop() } }() @@ -137,18 +137,18 @@ func (oc *Orchestrator) GetUpdatedSigner(coreContext *context.ZetaCoreContext, c return signer, nil } -// GetUpdatedChainClient returns chain client object with updated chain parameters -func (oc *Orchestrator) GetUpdatedChainClient(coreContext *context.ZetaCoreContext, chainID int64) (interfaces.ChainClient, error) { - chainOb, found := oc.clientMap[chainID] +// GetUpdatedChainObserver returns chain observer with updated chain parameters +func (oc *Orchestrator) GetUpdatedChainObserver(coreContext *context.ZetaCoreContext, chainID int64) (interfaces.ChainObserver, error) { + observer, found := oc.observerMap[chainID] if !found { - return nil, fmt.Errorf("chain client not found for chainID %d", chainID) + return nil, fmt.Errorf("chain observer not found for chainID %d", chainID) } - // update chain client chain parameters - curParams := chainOb.GetChainParams() + // update chain observer chain parameters + curParams := observer.GetChainParams() if chains.IsEVMChain(chainID) { evmParams, found := coreContext.GetEVMChainParams(chainID) if found && !observertypes.ChainParamsEqual(curParams, *evmParams) { - chainOb.SetChainParams(*evmParams) + observer.SetChainParams(*evmParams) oc.logger.Std.Info().Msgf( "updated chain params for chainID %d, new params: %v", chainID, *evmParams) } @@ -156,12 +156,12 @@ func (oc *Orchestrator) GetUpdatedChainClient(coreContext *context.ZetaCoreConte _, btcParams, found := coreContext.GetBTCChainParams() if found && !observertypes.ChainParamsEqual(curParams, *btcParams) { - chainOb.SetChainParams(*btcParams) + observer.SetChainParams(*btcParams) oc.logger.Std.Info().Msgf( "updated chain params for Bitcoin, new params: %v", *btcParams) } } - return chainOb, nil + return observer, nil } // GetPendingCctxsWithinRatelimit get pending cctxs across foreign chains within rate limit @@ -274,15 +274,15 @@ func (oc *Orchestrator) StartCctxScheduler(appContext *context.AppContext) { continue } - // update chain parameters for signer and chain client + // update chain parameters for signer and chain observer signer, err := oc.GetUpdatedSigner(coreContext, c.ChainId) if err != nil { oc.logger.Std.Error().Err(err).Msgf("StartCctxScheduler: GetUpdatedSigner failed for chain %d", c.ChainId) continue } - ob, err := oc.GetUpdatedChainClient(coreContext, c.ChainId) + ob, err := oc.GetUpdatedChainObserver(coreContext, c.ChainId) if err != nil { - oc.logger.Std.Error().Err(err).Msgf("StartCctxScheduler: GetUpdatedChainClient failed for chain %d", c.ChainId) + oc.logger.Std.Error().Err(err).Msgf("StartCctxScheduler: GetUpdatedChainObserver failed for chain %d", c.ChainId) continue } if !context.IsOutboundObservationEnabled(coreContext, ob.GetChainParams()) { @@ -315,7 +315,7 @@ func (oc *Orchestrator) ScheduleCctxEVM( zetaHeight uint64, chainID int64, cctxList []*types.CrossChainTx, - ob interfaces.ChainClient, + observer interfaces.ChainObserver, signer interfaces.ChainSigner, ) { res, err := oc.coreClient.GetAllOutTxTrackerByChain(chainID, interfaces.Ascending) @@ -327,11 +327,11 @@ func (oc *Orchestrator) ScheduleCctxEVM( for _, v := range res { trackerMap[v.Nonce] = true } - outboundScheduleLookahead := ob.GetChainParams().OutboundTxScheduleLookahead + outboundScheduleLookahead := observer.GetChainParams().OutboundTxScheduleLookahead // #nosec G701 always in range outboundScheduleLookback := uint64(float64(outboundScheduleLookahead) * evmOutboundTxLookbackFactor) // #nosec G701 positive - outboundScheduleInterval := uint64(ob.GetChainParams().OutboundTxScheduleInterval) + outboundScheduleInterval := uint64(observer.GetChainParams().OutboundTxScheduleInterval) for idx, cctx := range cctxList { params := cctx.GetCurrentOutTxParam() @@ -349,7 +349,7 @@ func (oc *Orchestrator) ScheduleCctxEVM( } // try confirming the outtx - included, _, err := ob.IsOutboundProcessed(cctx, oc.logger.Std) + included, _, err := observer.IsOutboundProcessed(cctx, oc.logger.Std) if err != nil { oc.logger.Std.Error().Err(err).Msgf("ScheduleCctxEVM: IsOutboundProcessed faild for chain %d nonce %d", chainID, nonce) continue @@ -384,7 +384,7 @@ func (oc *Orchestrator) ScheduleCctxEVM( if nonce%outboundScheduleInterval == zetaHeight%outboundScheduleInterval && !oc.outTxProc.IsOutTxActive(outTxID) { oc.outTxProc.StartTryProcess(outTxID) oc.logger.Std.Debug().Msgf("ScheduleCctxEVM: sign outtx %s with value %d\n", outTxID, cctx.GetCurrentOutTxParam().Amount) - go signer.TryProcessOutTx(cctx, oc.outTxProc, outTxID, ob, oc.coreClient, zetaHeight) + go signer.TryProcessOutTx(cctx, oc.outTxProc, outTxID, observer, oc.coreClient, zetaHeight) } // #nosec G701 always in range @@ -402,17 +402,17 @@ func (oc *Orchestrator) ScheduleCctxBTC( zetaHeight uint64, chainID int64, cctxList []*types.CrossChainTx, - ob interfaces.ChainClient, + observer interfaces.ChainObserver, signer interfaces.ChainSigner, ) { - btcClient, ok := ob.(*bitcoin.Client) + btcObserver, ok := observer.(*bitcoin.Observer) if !ok { // should never happen - oc.logger.Std.Error().Msgf("ScheduleCctxBTC: chain client is not a bitcoin client") + oc.logger.Std.Error().Msgf("ScheduleCctxBTC: chain observer is not a bitcoin observer") return } // #nosec G701 positive - interval := uint64(ob.GetChainParams().OutboundTxScheduleInterval) - lookahead := ob.GetChainParams().OutboundTxScheduleLookahead + interval := uint64(observer.GetChainParams().OutboundTxScheduleInterval) + lookahead := observer.GetChainParams().OutboundTxScheduleLookahead // schedule at most one keysign per ticker for idx, cctx := range cctxList { @@ -425,7 +425,7 @@ func (oc *Orchestrator) ScheduleCctxBTC( continue } // try confirming the outtx - included, confirmed, err := btcClient.IsOutboundProcessed(cctx, oc.logger.Std) + included, confirmed, err := btcObserver.IsOutboundProcessed(cctx, oc.logger.Std) if err != nil { oc.logger.Std.Error().Err(err).Msgf("ScheduleCctxBTC: IsOutboundProcessed faild for chain %d nonce %d", chainID, nonce) continue @@ -436,7 +436,7 @@ func (oc *Orchestrator) ScheduleCctxBTC( } // stop if the nonce being processed is higher than the pending nonce - if nonce > btcClient.GetPendingNonce() { + if nonce > btcObserver.GetPendingNonce() { break } // stop if lookahead is reached @@ -448,7 +448,7 @@ func (oc *Orchestrator) ScheduleCctxBTC( if nonce%interval == zetaHeight%interval && !oc.outTxProc.IsOutTxActive(outTxID) { oc.outTxProc.StartTryProcess(outTxID) oc.logger.Std.Debug().Msgf("ScheduleCctxBTC: sign outtx %s with value %d\n", outTxID, params.Amount) - go signer.TryProcessOutTx(cctx, oc.outTxProc, outTxID, ob, oc.coreClient, zetaHeight) + go signer.TryProcessOutTx(cctx, oc.outTxProc, outTxID, observer, oc.coreClient, zetaHeight) } } } diff --git a/zetaclient/orchestrator/orchestrator_test.go b/zetaclient/orchestrator/orchestrator_test.go index 1de04a1bb0..a965b854a7 100644 --- a/zetaclient/orchestrator/orchestrator_test.go +++ b/zetaclient/orchestrator/orchestrator_test.go @@ -34,8 +34,8 @@ func MockOrchestrator( ethcommon.HexToAddress(evmChainParams.Erc20CustodyContractAddress), ) btcSigner := mocks.NewBTCSigner() - evmClient := mocks.NewEVMClient(evmChainParams) - btcClient := mocks.NewBTCClient(btcChainParams) + evmObserver := mocks.NewEVMObserver(evmChainParams) + btcObserver := mocks.NewBTCObserver(btcChainParams) // create orchestrator orchestrator := &Orchestrator{ @@ -44,9 +44,9 @@ func MockOrchestrator( evmChain.ChainId: evmSigner, btcChain.ChainId: btcSigner, }, - clientMap: map[int64]interfaces.ChainClient{ - evmChain.ChainId: evmClient, - btcChain.ChainId: btcClient, + observerMap: map[int64]interfaces.ChainObserver{ + evmChain.ChainId: evmObserver, + btcChain.ChainId: btcObserver, }, } return orchestrator @@ -119,7 +119,7 @@ func Test_GetUpdatedSigner(t *testing.T) { }) } -func Test_GetUpdatedChainClient(t *testing.T) { +func Test_GetUpdatedChainObserver(t *testing.T) { // initial parameters for orchestrator creation evmChain := chains.EthChain btcChain := chains.BtcMainnetChain @@ -166,34 +166,34 @@ func Test_GetUpdatedChainClient(t *testing.T) { IsSupported: true, } - t.Run("evm chain client should not be found", func(t *testing.T) { + t.Run("evm chain observer should not be found", func(t *testing.T) { orchestrator := MockOrchestrator(t, nil, evmChain, btcChain, evmChainParams, btcChainParams) coreContext := CreateCoreContext(evmChain, btcChain, evmChainParamsNew, btcChainParams) - // BSC chain client should not be found - _, err := orchestrator.GetUpdatedChainClient(coreContext, chains.BscMainnetChain.ChainId) - require.ErrorContains(t, err, "chain client not found") + // BSC chain observer should not be found + _, err := orchestrator.GetUpdatedChainObserver(coreContext, chains.BscMainnetChain.ChainId) + require.ErrorContains(t, err, "chain observer not found") }) - t.Run("chain params in evm chain client should be updated successfully", func(t *testing.T) { + t.Run("chain params in evm chain observer should be updated successfully", func(t *testing.T) { orchestrator := MockOrchestrator(t, nil, evmChain, btcChain, evmChainParams, btcChainParams) coreContext := CreateCoreContext(evmChain, btcChain, evmChainParamsNew, btcChainParams) - // update evm chain client with new chain params - chainOb, err := orchestrator.GetUpdatedChainClient(coreContext, evmChain.ChainId) + // update evm chain observer with new chain params + chainOb, err := orchestrator.GetUpdatedChainObserver(coreContext, evmChain.ChainId) require.NoError(t, err) require.NotNil(t, chainOb) require.True(t, observertypes.ChainParamsEqual(*evmChainParamsNew, chainOb.GetChainParams())) }) - t.Run("btc chain client should not be found", func(t *testing.T) { + t.Run("btc chain observer should not be found", func(t *testing.T) { orchestrator := MockOrchestrator(t, nil, evmChain, btcChain, evmChainParams, btcChainParams) coreContext := CreateCoreContext(btcChain, btcChain, evmChainParams, btcChainParamsNew) - // BTC testnet chain client should not be found - _, err := orchestrator.GetUpdatedChainClient(coreContext, chains.BtcTestNetChain.ChainId) - require.ErrorContains(t, err, "chain client not found") + // BTC testnet chain observer should not be found + _, err := orchestrator.GetUpdatedChainObserver(coreContext, chains.BtcTestNetChain.ChainId) + require.ErrorContains(t, err, "chain observer not found") }) - t.Run("chain params in btc chain client should be updated successfully", func(t *testing.T) { + t.Run("chain params in btc chain observer should be updated successfully", func(t *testing.T) { orchestrator := MockOrchestrator(t, nil, evmChain, btcChain, evmChainParams, btcChainParams) coreContext := CreateCoreContext(btcChain, btcChain, evmChainParams, btcChainParamsNew) - // update btc chain client with new chain params - chainOb, err := orchestrator.GetUpdatedChainClient(coreContext, btcChain.ChainId) + // update btc chain observer with new chain params + chainOb, err := orchestrator.GetUpdatedChainObserver(coreContext, btcChain.ChainId) require.NoError(t, err) require.NotNil(t, chainOb) require.True(t, observertypes.ChainParamsEqual(*btcChainParamsNew, chainOb.GetChainParams())) diff --git a/zetaclient/testutils/mocks/chain_clients.go b/zetaclient/testutils/mocks/chain_clients.go index 6c0d72f1a5..6f7a6a819c 100644 --- a/zetaclient/testutils/mocks/chain_clients.go +++ b/zetaclient/testutils/mocks/chain_clients.go @@ -8,83 +8,83 @@ import ( ) // ---------------------------------------------------------------------------- -// EVMClient +// EVMObserver // ---------------------------------------------------------------------------- -var _ interfaces.ChainClient = (*EVMClient)(nil) +var _ interfaces.ChainObserver = (*EVMObserver)(nil) -// EVMClient is a mock of evm chain client for testing -type EVMClient struct { +// EVMObserver is a mock of evm chain observer for testing +type EVMObserver struct { ChainParams observertypes.ChainParams } -func NewEVMClient(chainParams *observertypes.ChainParams) *EVMClient { - return &EVMClient{ +func NewEVMObserver(chainParams *observertypes.ChainParams) *EVMObserver { + return &EVMObserver{ ChainParams: *chainParams, } } -func (s *EVMClient) Start() { +func (ob *EVMObserver) Start() { } -func (s *EVMClient) Stop() { +func (ob *EVMObserver) Stop() { } -func (s *EVMClient) IsOutboundProcessed(_ *crosschaintypes.CrossChainTx, _ zerolog.Logger) (bool, bool, error) { +func (ob *EVMObserver) IsOutboundProcessed(_ *crosschaintypes.CrossChainTx, _ zerolog.Logger) (bool, bool, error) { return false, false, nil } -func (s *EVMClient) SetChainParams(chainParams observertypes.ChainParams) { - s.ChainParams = chainParams +func (ob *EVMObserver) SetChainParams(chainParams observertypes.ChainParams) { + ob.ChainParams = chainParams } -func (s *EVMClient) GetChainParams() observertypes.ChainParams { - return s.ChainParams +func (ob *EVMObserver) GetChainParams() observertypes.ChainParams { + return ob.ChainParams } -func (s *EVMClient) GetTxID(_ uint64) string { +func (ob *EVMObserver) GetTxID(_ uint64) string { return "" } -func (s *EVMClient) WatchIntxTracker() { +func (ob *EVMObserver) WatchIntxTracker() { } // ---------------------------------------------------------------------------- -// BTCClient +// BTCObserver // ---------------------------------------------------------------------------- -var _ interfaces.ChainClient = (*BTCClient)(nil) +var _ interfaces.ChainObserver = (*BTCObserver)(nil) -// BTCClient is a mock of btc chain client for testing -type BTCClient struct { +// BTCObserver is a mock of btc chain observer for testing +type BTCObserver struct { ChainParams observertypes.ChainParams } -func NewBTCClient(chainParams *observertypes.ChainParams) *BTCClient { - return &BTCClient{ +func NewBTCObserver(chainParams *observertypes.ChainParams) *BTCObserver { + return &BTCObserver{ ChainParams: *chainParams, } } -func (s *BTCClient) Start() { +func (ob *BTCObserver) Start() { } -func (s *BTCClient) Stop() { +func (ob *BTCObserver) Stop() { } -func (s *BTCClient) IsOutboundProcessed(_ *crosschaintypes.CrossChainTx, _ zerolog.Logger) (bool, bool, error) { +func (ob *BTCObserver) IsOutboundProcessed(_ *crosschaintypes.CrossChainTx, _ zerolog.Logger) (bool, bool, error) { return false, false, nil } -func (s *BTCClient) SetChainParams(chainParams observertypes.ChainParams) { - s.ChainParams = chainParams +func (ob *BTCObserver) SetChainParams(chainParams observertypes.ChainParams) { + ob.ChainParams = chainParams } -func (s *BTCClient) GetChainParams() observertypes.ChainParams { - return s.ChainParams +func (ob *BTCObserver) GetChainParams() observertypes.ChainParams { + return ob.ChainParams } -func (s *BTCClient) GetTxID(_ uint64) string { +func (ob *BTCObserver) GetTxID(_ uint64) string { return "" } -func (s *BTCClient) WatchIntxTracker() { +func (ob *BTCObserver) WatchIntxTracker() { } diff --git a/zetaclient/testutils/mocks/chain_signer.go b/zetaclient/testutils/mocks/chain_signer.go index acfb2d5b3c..cca24dccd1 100644 --- a/zetaclient/testutils/mocks/chain_signer.go +++ b/zetaclient/testutils/mocks/chain_signer.go @@ -36,7 +36,7 @@ func (s *EVMSigner) TryProcessOutTx( _ *crosschaintypes.CrossChainTx, _ *outtxprocessor.Processor, _ string, - _ interfaces.ChainClient, + _ interfaces.ChainObserver, _ interfaces.ZetaCoreClient, _ uint64, ) { @@ -75,7 +75,7 @@ func (s *BTCSigner) TryProcessOutTx( _ *crosschaintypes.CrossChainTx, _ *outtxprocessor.Processor, _ string, - _ interfaces.ChainClient, + _ interfaces.ChainObserver, _ interfaces.ZetaCoreClient, _ uint64, ) { diff --git a/zetaclient/testutils/mocks/cometbft_client.go b/zetaclient/testutils/mocks/cometbft_client.go index 3359ff165b..1058e7980e 100644 --- a/zetaclient/testutils/mocks/cometbft_client.go +++ b/zetaclient/testutils/mocks/cometbft_client.go @@ -10,21 +10,21 @@ import ( tmtypes "github.com/cometbft/cometbft/types" ) -type CometbftClient struct { +type CometBFTClient struct { mock.Client err error code uint32 } -func (c CometbftClient) BroadcastTxCommit(_ context.Context, _ tmtypes.Tx) (*coretypes.ResultBroadcastTxCommit, error) { +func (c CometBFTClient) BroadcastTxCommit(_ context.Context, _ tmtypes.Tx) (*coretypes.ResultBroadcastTxCommit, error) { return nil, c.err } -func (c CometbftClient) BroadcastTxAsync(_ context.Context, _ tmtypes.Tx) (*coretypes.ResultBroadcastTx, error) { +func (c CometBFTClient) BroadcastTxAsync(_ context.Context, _ tmtypes.Tx) (*coretypes.ResultBroadcastTx, error) { return nil, c.err } -func (c CometbftClient) BroadcastTxSync(_ context.Context, _ tmtypes.Tx) (*coretypes.ResultBroadcastTx, error) { +func (c CometBFTClient) BroadcastTxSync(_ context.Context, _ tmtypes.Tx) (*coretypes.ResultBroadcastTx, error) { log := "" if c.err != nil { log = c.err.Error() @@ -38,7 +38,7 @@ func (c CometbftClient) BroadcastTxSync(_ context.Context, _ tmtypes.Tx) (*coret }, c.err } -func (c CometbftClient) Tx(_ context.Context, _ []byte, _ bool) (*coretypes.ResultTx, error) { +func (c CometBFTClient) Tx(_ context.Context, _ []byte, _ bool) (*coretypes.ResultTx, error) { return &coretypes.ResultTx{ Hash: bytes.HexBytes{}, Height: 0, @@ -51,7 +51,7 @@ func (c CometbftClient) Tx(_ context.Context, _ []byte, _ bool) (*coretypes.Resu }, c.err } -func (c CometbftClient) Block(_ context.Context, _ *int64) (*coretypes.ResultBlock, error) { +func (c CometBFTClient) Block(_ context.Context, _ *int64) (*coretypes.ResultBlock, error) { return &coretypes.ResultBlock{Block: &tmtypes.Block{ Header: tmtypes.Header{}, Data: tmtypes.Data{}, @@ -59,8 +59,8 @@ func (c CometbftClient) Block(_ context.Context, _ *int64) (*coretypes.ResultBlo }}, c.err } -func NewSDKClientWithErr(err error, code uint32) *CometbftClient { - return &CometbftClient{ +func NewSDKClientWithErr(err error, code uint32) *CometBFTClient { + return &CometBFTClient{ Client: mock.Client{}, err: err, code: code, diff --git a/zetaclient/types/sql_evm.go b/zetaclient/types/sql_evm.go index f86df7cdd7..c551fda503 100644 --- a/zetaclient/types/sql_evm.go +++ b/zetaclient/types/sql_evm.go @@ -9,7 +9,7 @@ import ( "gorm.io/gorm" ) -// EVM Chain client types -----------------------------------> +// EVM Chain observer types -----------------------------------> const LastBlockNumID = 0xBEEF diff --git a/zetaclient/zetacore/client.go b/zetaclient/zetacore/client.go index ee8c344a67..b5679b687a 100644 --- a/zetaclient/zetacore/client.go +++ b/zetaclient/zetacore/client.go @@ -195,7 +195,7 @@ func (c *Client) UpdateZetaCoreContext(coreContext *context.ZetaCoreContext, ini if plan != nil && bn == plan.Height-1 { // stop zetaclients; notify operator to upgrade and restart c.logger.Warn().Msgf("Active upgrade plan detected and upgrade height reached: %s at height %d; ZetaClient is stopped;"+ "please kill this process, replace zetaclientd binary with upgraded version, and restart zetaclientd", plan.Name, plan.Height) - c.pause <- struct{}{} // notify CoreObserver to stop ChainClients, Signers, and CoreObserver itself + c.pause <- struct{}{} // notify Orchestrator to stop Observers, Signers, and Orchestrator itself } chainParams, err := c.GetChainParams()