From 0d4e67dac4f9540cece57478e72102109ae395c8 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Fri, 29 Mar 2024 16:05:17 -0500 Subject: [PATCH 1/5] check IsSupported flag to pause/unpause a specific chain --- cmd/zetaclientd/utils.go | 8 +- zetaclient/bitcoin/bitcoin_client.go | 230 ++++++++++++---------- zetaclient/bitcoin/inbound_tracker.go | 27 +-- zetaclient/evm/evm_client.go | 217 ++++++++++---------- zetaclient/evm/inbounds.go | 59 +++--- zetaclient/interfaces/interfaces.go | 2 +- zetaclient/testutils/stub/chain_client.go | 4 +- zetaclient/zetabridge/zetacore_bridge.go | 14 +- 8 files changed, 293 insertions(+), 268 deletions(-) diff --git a/cmd/zetaclientd/utils.go b/cmd/zetaclientd/utils.go index 1ea6eb8c5b..a9fc63e1f8 100644 --- a/cmd/zetaclientd/utils.go +++ b/cmd/zetaclientd/utils.go @@ -67,9 +67,6 @@ func CreateSignerMap( loggers.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String()) continue } - if !evmChainParams.IsSupported { - continue - } mpiAddress := ethcommon.HexToAddress(evmChainParams.ConnectorContractAddress) erc20CustodyAddress := ethcommon.HexToAddress(evmChainParams.Erc20CustodyContractAddress) signer, err := evm.NewEVMSigner( @@ -117,14 +114,11 @@ func CreateChainClientMap( if evmConfig.Chain.IsZetaChain() { continue } - evmChainParams, found := appContext.ZetaCoreContext().GetEVMChainParams(evmConfig.Chain.ChainId) + _, found := appContext.ZetaCoreContext().GetEVMChainParams(evmConfig.Chain.ChainId) if !found { loggers.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String()) continue } - if !evmChainParams.IsSupported { - continue - } co, err := evm.NewEVMChainClient(appContext, bridge, tss, dbpath, loggers, evmConfig, ts) if err != nil { loggers.Std.Error().Err(err).Msgf("NewEVMChainClient error for chain %s", evmConfig.Chain.String()) diff --git a/zetaclient/bitcoin/bitcoin_client.go b/zetaclient/bitcoin/bitcoin_client.go index 8f9c1f9463..095f805519 100644 --- a/zetaclient/bitcoin/bitcoin_client.go +++ b/zetaclient/bitcoin/bitcoin_client.go @@ -47,19 +47,22 @@ import ( ) const ( - // The starting height (Bitcoin mainnet) from which dynamic depositor fee will take effect - DynamicDepositorFeeHeight = 834500 + DynamicDepositorFeeHeight = 834500 // The starting height (Bitcoin mainnet) from which dynamic depositor fee will take effect + maxHeightDiff = 10000 // in case the last block is too old when the observer starts + btcBlocksPerDay = 144 // for LRU block cache size + bigValueSats = 200000000 // 2 BTC + bigValueConfirmationCount = 6 // 6 confirmations for value >= 2 BTC ) var _ interfaces.ChainClient = &BTCChainClient{} type BTCLog struct { - ChainLogger zerolog.Logger - WatchInTx zerolog.Logger - ObserveOutTx zerolog.Logger - WatchUTXOS zerolog.Logger - WatchGasPrice zerolog.Logger - Compliance zerolog.Logger + Chain zerolog.Logger // The parent logger for the chain + InTx zerolog.Logger // The logger for incoming transactions + OutTx zerolog.Logger // The logger for outgoing transactions + UTXOS zerolog.Logger // The logger for UTXOs management + GasPrice zerolog.Logger // The logger for gas price + Compliance zerolog.Logger // The logger for compliance checks } // BTCChainClient represents a chain configuration for Bitcoin @@ -91,27 +94,21 @@ type BTCChainClient struct { BlockCache *lru.Cache } -const ( - maxHeightDiff = 10000 // in case the last block is too old when the observer starts - btcBlocksPerDay = 144 // for LRU block cache size - bigValueSats = 200000000 // 2 BTC - bigValueConfirmationCount = 6 // 6 confirmations for value >= 2 BTC -) - func (ob *BTCChainClient) WithZetaClient(bridge *zetabridge.ZetaCoreBridge) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.zetaClient = bridge } + func (ob *BTCChainClient) WithLogger(logger zerolog.Logger) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.logger = BTCLog{ - ChainLogger: logger, - WatchInTx: logger.With().Str("module", "WatchInTx").Logger(), - ObserveOutTx: logger.With().Str("module", "observeOutTx").Logger(), - WatchUTXOS: logger.With().Str("module", "WatchUTXOS").Logger(), - WatchGasPrice: logger.With().Str("module", "WatchGasPrice").Logger(), + Chain: logger, + InTx: logger.With().Str("module", "WatchInTx").Logger(), + OutTx: logger.With().Str("module", "WatchOutTx").Logger(), + UTXOS: logger.With().Str("module", "WatchUTXOS").Logger(), + GasPrice: logger.With().Str("module", "WatchGasPrice").Logger(), } } @@ -163,12 +160,12 @@ func NewBitcoinClient( ob.Mu = &sync.Mutex{} chainLogger := loggers.Std.With().Str("chain", chain.ChainName.String()).Logger() ob.logger = BTCLog{ - ChainLogger: chainLogger, - WatchInTx: chainLogger.With().Str("module", "WatchInTx").Logger(), - ObserveOutTx: chainLogger.With().Str("module", "observeOutTx").Logger(), - WatchUTXOS: chainLogger.With().Str("module", "WatchUTXOS").Logger(), - WatchGasPrice: chainLogger.With().Str("module", "WatchGasPrice").Logger(), - Compliance: loggers.Compliance, + Chain: chainLogger, + InTx: chainLogger.With().Str("module", "WatchInTx").Logger(), + OutTx: chainLogger.With().Str("module", "WatchOutTx").Logger(), + UTXOS: chainLogger.With().Str("module", "WatchUTXOS").Logger(), + GasPrice: chainLogger.With().Str("module", "WatchGasPrice").Logger(), + Compliance: loggers.Compliance, } ob.zetaClient = bridge @@ -183,7 +180,7 @@ func NewBitcoinClient( } ob.params = *chainParams // initialize the Client - ob.logger.ChainLogger.Info().Msgf("Chain %s endpoint %s", ob.chain.String(), btcCfg.RPCHost) + ob.logger.Chain.Info().Msgf("Chain %s endpoint %s", ob.chain.String(), btcCfg.RPCHost) connCfg := &rpcclient.ConnConfig{ Host: btcCfg.RPCHost, User: btcCfg.RPCUsername, @@ -204,7 +201,7 @@ func NewBitcoinClient( ob.BlockCache, err = lru.New(btcBlocksPerDay) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("failed to create bitcoin block cache") + ob.logger.Chain.Error().Err(err).Msg("failed to create bitcoin block cache") return nil, err } @@ -218,55 +215,58 @@ func NewBitcoinClient( } func (ob *BTCChainClient) Start() { - ob.logger.ChainLogger.Info().Msgf("BitcoinChainClient is starting") + ob.logger.Chain.Info().Msgf("BitcoinChainClient is starting") go ob.WatchInTx() - go ob.observeOutTx() + go ob.WatchOutTx() go ob.WatchUTXOS() go ob.WatchGasPrice() - go ob.ExternalChainWatcherForNewInboundTrackerSuggestions() - go ob.RPCStatus() + go ob.WatchIntxTracker() + go ob.WatchRPCStatus() } -func (ob *BTCChainClient) RPCStatus() { - ob.logger.ChainLogger.Info().Msgf("RPCStatus is starting") +// WatchRPCStatus watches the RPC status of the Bitcoin chain +func (ob *BTCChainClient) WatchRPCStatus() { + ob.logger.Chain.Info().Msgf("RPCStatus is starting") ticker := time.NewTicker(60 * time.Second) for { select { case <-ticker.C: - //ob.logger.ChainLogger.Info().Msgf("RPCStatus is running") + if !ob.GetChainParams().IsSupported { + continue + } bn, err := ob.rpcClient.GetBlockCount() if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC status check: RPC down? ") + ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") continue } hash, err := ob.rpcClient.GetBlockHash(bn) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC status check: RPC down? ") + ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") continue } header, err := ob.rpcClient.GetBlockHeader(hash) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC status check: RPC down? ") + ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") continue } blockTime := header.Timestamp elapsedSeconds := time.Since(blockTime).Seconds() if elapsedSeconds > 1200 { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC status check: RPC down? ") + ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") continue } tssAddr := ob.Tss.BTCAddressWitnessPubkeyHash() res, err := ob.rpcClient.ListUnspentMinMaxAddresses(0, 1000000, []btcutil.Address{tssAddr}) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC status check: can't list utxos of TSS address; wallet or loaded? TSS address is not imported? ") + ob.logger.Chain.Error().Err(err).Msg("RPC status check: can't list utxos of TSS address; wallet or loaded? TSS address is not imported? ") continue } if len(res) == 0 { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC status check: TSS address has no utxos; TSS address is not imported? ") + ob.logger.Chain.Error().Err(err).Msg("RPC status check: TSS address has no utxos; TSS address is not imported? ") continue } - ob.logger.ChainLogger.Info().Msgf("[OK] RPC status check: latest block number %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedSeconds, tssAddr, len(res)) + ob.logger.Chain.Info().Msgf("[OK] RPC status check: latest block number %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedSeconds, tssAddr, len(res)) case <-ob.stop: return @@ -275,9 +275,9 @@ func (ob *BTCChainClient) RPCStatus() { } func (ob *BTCChainClient) Stop() { - ob.logger.ChainLogger.Info().Msgf("ob %s is stopping", ob.chain.String()) + ob.logger.Chain.Info().Msgf("ob %s is stopping", ob.chain.String()) close(ob.stop) // this notifies all goroutines to stop - ob.logger.ChainLogger.Info().Msgf("%s observer stopped", ob.chain.String()) + ob.logger.Chain.Info().Msgf("%s observer stopped", ob.chain.String()) } func (ob *BTCChainClient) SetLastBlockHeight(height int64) { @@ -324,31 +324,36 @@ func (ob *BTCChainClient) GetBaseGasPrice() *big.Int { return big.NewInt(0) } +// WatchInTx watches Bitcoin chain for incoming txs and post votes to zetacore func (ob *BTCChainClient) WatchInTx() { ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchInTx", ob.GetChainParams().InTxTicker) if err != nil { - ob.logger.WatchInTx.Error().Err(err).Msg("WatchInTx error") + ob.logger.InTx.Error().Err(err).Msg("error creating ticker") return } defer ticker.Stop() + ob.logger.InTx.Info().Msgf("WatchInTx started for chain %d", ob.chain.ChainId) for { select { case <-ticker.C(): + if !ob.GetChainParams().IsSupported { + continue + } err := ob.ObserveInTx() if err != nil { - ob.logger.WatchInTx.Error().Err(err).Msg("WatchInTx error observing in tx") + ob.logger.InTx.Error().Err(err).Msg("WatchInTx error observing in tx") } - ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.WatchInTx) + ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.InTx) case <-ob.stop: - ob.logger.WatchInTx.Info().Msg("WatchInTx stopped") + ob.logger.InTx.Info().Msgf("WatchInTx stopped for chain %d", ob.chain.ChainId) return } } } func (ob *BTCChainClient) postBlockHeader(tip int64) error { - ob.logger.WatchInTx.Info().Msgf("postBlockHeader: tip %d", tip) + ob.logger.InTx.Info().Msgf("postBlockHeader: tip %d", tip) bn := tip res, err := ob.zetaClient.GetBlockHeaderStateByChain(ob.chain.ChainId) if err == nil && res.BlockHeaderState != nil && res.BlockHeaderState.EarliestHeight > 0 { @@ -365,7 +370,7 @@ func (ob *BTCChainClient) postBlockHeader(tip int64) error { var headerBuf bytes.Buffer err = res2.Header.Serialize(&headerBuf) if err != nil { // should never happen - ob.logger.WatchInTx.Error().Err(err).Msgf("error serializing bitcoin block header: %d", bn) + ob.logger.InTx.Error().Err(err).Msgf("error serializing bitcoin block header: %d", bn) return err } blockHash := res2.Header.BlockHash() @@ -375,9 +380,9 @@ func (ob *BTCChainClient) postBlockHeader(tip int64) error { res2.Block.Height, proofs.NewBitcoinHeader(headerBuf.Bytes()), ) - ob.logger.WatchInTx.Info().Msgf("posted block header %d: %s", bn, blockHash) + ob.logger.InTx.Info().Msgf("posted block header %d: %s", bn, blockHash) if err != nil { // error shouldn't block the process - ob.logger.WatchInTx.Error().Err(err).Msgf("error posting bitcoin block header: %d", bn) + ob.logger.InTx.Error().Err(err).Msgf("error posting bitcoin block header: %d", bn) } return err } @@ -420,18 +425,18 @@ func (ob *BTCChainClient) ObserveInTx() error { bn := lastScanned + 1 res, err := ob.GetBlockByNumberCached(bn) if err != nil { - ob.logger.WatchInTx.Error().Err(err).Msgf("observeInTxBTC: error getting bitcoin block %d", bn) + ob.logger.InTx.Error().Err(err).Msgf("observeInTxBTC: error getting bitcoin block %d", bn) return err } - ob.logger.WatchInTx.Info().Msgf("observeInTxBTC: block %d has %d txs, current block %d, last block %d", + ob.logger.InTx.Info().Msgf("observeInTxBTC: block %d has %d txs, current block %d, last block %d", bn, len(res.Block.Tx), cnt, lastScanned) // print some debug information if len(res.Block.Tx) > 1 { for idx, tx := range res.Block.Tx { - ob.logger.WatchInTx.Debug().Msgf("BTC InTX | %d: %s\n", idx, tx.Txid) + ob.logger.InTx.Debug().Msgf("BTC InTX | %d: %s\n", idx, tx.Txid) for vidx, vout := range tx.Vout { - ob.logger.WatchInTx.Debug().Msgf("vout %d \n value: %v\n scriptPubKey: %v\n", vidx, vout.Value, vout.ScriptPubKey.Hex) + ob.logger.InTx.Debug().Msgf("vout %d \n value: %v\n scriptPubKey: %v\n", vidx, vout.Value, vout.ScriptPubKey.Hex) } } } @@ -440,13 +445,13 @@ func (ob *BTCChainClient) ObserveInTx() error { if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled { err = ob.postBlockHeader(bn) if err != nil { - ob.logger.WatchInTx.Warn().Err(err).Msgf("observeInTxBTC: error posting block header %d", bn) + ob.logger.InTx.Warn().Err(err).Msgf("observeInTxBTC: error posting block header %d", bn) } } if len(res.Block.Tx) > 1 { // get depositor fee - depositorFee := CalcDepositorFee(res.Block, ob.chain.ChainId, ob.netParams, ob.logger.WatchInTx) + depositorFee := CalcDepositorFee(res.Block, ob.chain.ChainId, ob.netParams, ob.logger.InTx) // filter incoming txs to TSS address tssAddress := ob.Tss.BTCAddress() @@ -455,7 +460,7 @@ func (ob *BTCChainClient) ObserveInTx() error { res.Block.Tx, uint64(res.Block.Height), tssAddress, - &ob.logger.WatchInTx, + &ob.logger.InTx, ob.netParams, depositorFee, ) @@ -466,10 +471,10 @@ func (ob *BTCChainClient) ObserveInTx() error { if msg != nil { zetaHash, ballot, err := ob.zetaClient.PostVoteInbound(zetabridge.PostVoteInboundGasLimit, zetabridge.PostVoteInboundExecutionGasLimit, msg) if err != nil { - ob.logger.WatchInTx.Error().Err(err).Msgf("observeInTxBTC: error posting to zeta core for tx %s", inTx.TxHash) + ob.logger.InTx.Error().Err(err).Msgf("observeInTxBTC: error posting to zeta core for tx %s", inTx.TxHash) return err // we have to re-scan this block next time } else if zetaHash != "" { - ob.logger.WatchInTx.Info().Msgf("observeInTxBTC: PostVoteInbound zeta tx hash: %s inTx %s ballot %s fee %v", + ob.logger.InTx.Info().Msgf("observeInTxBTC: PostVoteInbound zeta tx hash: %s inTx %s ballot %s fee %v", zetaHash, inTx.TxHash, ballot, depositorFee) } } @@ -480,7 +485,7 @@ func (ob *BTCChainClient) ObserveInTx() error { ob.SetLastBlockHeightScanned(bn) // #nosec G701 always positive if err := ob.db.Save(clienttypes.ToLastBlockSQLType(uint64(bn))).Error; err != nil { - ob.logger.WatchInTx.Error().Err(err).Msgf("observeInTxBTC: error writing last scanned block %d to db", bn) + ob.logger.InTx.Error().Err(err).Msgf("observeInTxBTC: error writing last scanned block %d to db", bn) } } @@ -532,7 +537,7 @@ func (ob *BTCChainClient) IsSendOutTxProcessed(cctx *types.CrossChainTx, logger if txResult == nil { // check failed, try again next time return false, false, nil } else if inMempool { // still in mempool (should avoid unnecessary Tss keysign) - ob.logger.ObserveOutTx.Info().Msgf("IsSendOutTxProcessed: outTx %s is still in mempool", outTxID) + ob.logger.OutTx.Info().Msgf("IsSendOutTxProcessed: outTx %s is still in mempool", outTxID) return true, false, nil } // included @@ -543,7 +548,7 @@ func (ob *BTCChainClient) IsSendOutTxProcessed(cctx *types.CrossChainTx, logger if res == nil { return false, false, nil } - ob.logger.ObserveOutTx.Info().Msgf("IsSendOutTxProcessed: setIncludedTx succeeded for outTx %s", outTxID) + ob.logger.OutTx.Info().Msgf("IsSendOutTxProcessed: setIncludedTx succeeded for outTx %s", outTxID) } // It's safe to use cctx's amount to post confirmation because it has already been verified in observeOutTx() @@ -575,10 +580,11 @@ func (ob *BTCChainClient) IsSendOutTxProcessed(cctx *types.CrossChainTx, logger return true, true, nil } +// WatchGasPrice watches Bitcoin gas rate and post to zetacore func (ob *BTCChainClient) WatchGasPrice() { ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchGasPrice", ob.GetChainParams().GasPriceTicker) if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msg("WatchGasPrice error") + ob.logger.GasPrice.Error().Err(err).Msg("error creating ticker") return } @@ -586,13 +592,16 @@ func (ob *BTCChainClient) WatchGasPrice() { for { select { case <-ticker.C(): + if !ob.GetChainParams().IsSupported { + continue + } err := ob.PostGasPrice() if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msg("PostGasPrice error on " + ob.chain.String()) + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) } - ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice) + ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.GasPrice) case <-ob.stop: - ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice stopped") + ob.logger.GasPrice.Info().Msgf("WatchGasPrice stopped for chain %d", ob.chain.ChainId) return } } @@ -607,7 +616,7 @@ func (ob *BTCChainClient) PostGasPrice() error { // #nosec G701 always in range zetaHash, err := ob.zetaClient.PostGasPrice(ob.chain, 1, "100", uint64(bn)) if err != nil { - ob.logger.WatchGasPrice.Err(err).Msg("PostGasPrice:") + ob.logger.GasPrice.Err(err).Msg("PostGasPrice:") return err } _ = zetaHash @@ -633,7 +642,7 @@ func (ob *BTCChainClient) PostGasPrice() error { // #nosec G701 always positive zetaHash, err := ob.zetaClient.PostGasPrice(ob.chain, feeRatePerByte.Uint64(), "100", uint64(bn)) if err != nil { - ob.logger.WatchGasPrice.Err(err).Msg("PostGasPrice:") + ob.logger.GasPrice.Err(err).Msg("PostGasPrice:") return err } _ = zetaHash @@ -680,7 +689,7 @@ func FilterAndParseIncomingTx( } func (ob *BTCChainClient) GetInboundVoteMessageFromBtcEvent(inTx *BTCInTxEvnet) *types.MsgVoteOnObservedInboundTx { - ob.logger.WatchInTx.Debug().Msgf("Processing inTx: %s", inTx.TxHash) + ob.logger.InTx.Debug().Msgf("Processing inTx: %s", inTx.TxHash) amount := big.NewFloat(inTx.Value) amount = amount.Mul(amount, big.NewFloat(1e8)) amountInt, _ := amount.Int(nil) @@ -717,7 +726,7 @@ func (ob *BTCChainClient) IsInTxRestricted(inTx *BTCInTxEvnet) bool { receiver = parsedAddress.Hex() } if config.ContainRestrictedAddress(inTx.FromAddress, receiver) { - compliance.PrintComplianceLog(ob.logger.WatchInTx, ob.logger.Compliance, + compliance.PrintComplianceLog(ob.logger.InTx, ob.logger.Compliance, false, ob.chain.ChainId, inTx.TxHash, inTx.FromAddress, receiver, "BTC") return true } @@ -816,7 +825,7 @@ func GetBtcEvent( func (ob *BTCChainClient) WatchUTXOS() { ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchUTXOS", ob.GetChainParams().WatchUtxoTicker) if err != nil { - ob.logger.WatchUTXOS.Error().Err(err).Msg("WatchUTXOS error") + ob.logger.UTXOS.Error().Err(err).Msg("error creating ticker") return } @@ -824,13 +833,16 @@ func (ob *BTCChainClient) WatchUTXOS() { for { select { case <-ticker.C(): + if !ob.GetChainParams().IsSupported { + continue + } err := ob.FetchUTXOS() if err != nil { - ob.logger.WatchUTXOS.Error().Err(err).Msg("error fetching btc utxos") + ob.logger.UTXOS.Error().Err(err).Msg("error fetching btc utxos") } - ticker.UpdateInterval(ob.GetChainParams().WatchUtxoTicker, ob.logger.WatchUTXOS) + ticker.UpdateInterval(ob.GetChainParams().WatchUtxoTicker, ob.logger.UTXOS) case <-ob.stop: - ob.logger.WatchUTXOS.Info().Msg("WatchUTXOS stopped") + ob.logger.UTXOS.Info().Msgf("WatchUTXOS stopped for chain %d", ob.chain.ChainId) return } } @@ -839,7 +851,7 @@ func (ob *BTCChainClient) WatchUTXOS() { func (ob *BTCChainClient) FetchUTXOS() error { defer func() { if err := recover(); err != nil { - ob.logger.WatchUTXOS.Error().Msgf("BTC fetchUTXOS: caught panic error: %v", err) + ob.logger.UTXOS.Error().Msgf("BTC fetchUTXOS: caught panic error: %v", err) } }() @@ -913,7 +925,7 @@ func (ob *BTCChainClient) refreshPendingNonce() { // get pending nonces from zetabridge p, err := ob.zetaClient.GetPendingNoncesByChain(ob.chain.ChainId) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("refreshPendingNonce: error getting pending nonces") + ob.logger.Chain.Error().Err(err).Msg("refreshPendingNonce: error getting pending nonces") } // increase pending nonce if lagged behind @@ -927,14 +939,14 @@ func (ob *BTCChainClient) refreshPendingNonce() { // get the last included outTx hash txid, err := ob.getOutTxidByNonce(nonceLow-1, false) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("refreshPendingNonce: error getting last outTx txid") + ob.logger.Chain.Error().Err(err).Msg("refreshPendingNonce: error getting last outTx txid") } // set 'NonceLow' as the new pending nonce ob.Mu.Lock() defer ob.Mu.Unlock() ob.pendingNonce = nonceLow - ob.logger.ChainLogger.Info().Msgf("refreshPendingNonce: increase pending nonce to %d with txid %s", ob.pendingNonce, txid) + ob.logger.Chain.Info().Msgf("refreshPendingNonce: increase pending nonce to %d with txid %s", ob.pendingNonce, txid) } } @@ -974,10 +986,10 @@ func (ob *BTCChainClient) findNonceMarkUTXO(nonce uint64, txid string) (int, err for i, utxo := range ob.utxos { sats, err := GetSatoshis(utxo.Amount) if err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msgf("findNonceMarkUTXO: error getting satoshis for utxo %v", utxo) + ob.logger.OutTx.Error().Err(err).Msgf("findNonceMarkUTXO: error getting satoshis for utxo %v", utxo) } if utxo.Address == tssAddress && sats == amount && utxo.TxID == txid { - ob.logger.ObserveOutTx.Info().Msgf("findNonceMarkUTXO: found nonce-mark utxo with txid %s, amount %d satoshi", utxo.TxID, sats) + ob.logger.OutTx.Info().Msgf("findNonceMarkUTXO: found nonce-mark utxo with txid %s, amount %d satoshi", utxo.TxID, sats) return i, nil } } @@ -1079,15 +1091,16 @@ func (ob *BTCChainClient) SaveBroadcastedTx(txHash string, nonce uint64) { broadcastEntry := clienttypes.ToOutTxHashSQLType(txHash, outTxID) if err := ob.db.Save(&broadcastEntry).Error; err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msgf("SaveBroadcastedTx: error saving broadcasted txHash %s for outTx %s", txHash, outTxID) + ob.logger.OutTx.Error().Err(err).Msgf("SaveBroadcastedTx: error saving broadcasted txHash %s for outTx %s", txHash, outTxID) } - ob.logger.ObserveOutTx.Info().Msgf("SaveBroadcastedTx: saved broadcasted txHash %s for outTx %s", txHash, outTxID) + ob.logger.OutTx.Info().Msgf("SaveBroadcastedTx: saved broadcasted txHash %s for outTx %s", txHash, outTxID) } -func (ob *BTCChainClient) observeOutTx() { - ticker, err := clienttypes.NewDynamicTicker("Bitcoin_observeOutTx", ob.GetChainParams().OutTxTicker) +// WatchOutTx watches Bitcoin chain for outgoing txs status +func (ob *BTCChainClient) WatchOutTx() { + ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchOutTx", ob.GetChainParams().OutTxTicker) if err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msg("observeOutTx: error creating ticker") + ob.logger.OutTx.Error().Err(err).Msg("error creating ticker ") return } @@ -1095,9 +1108,12 @@ func (ob *BTCChainClient) observeOutTx() { for { select { case <-ticker.C(): + if !ob.GetChainParams().IsSupported { + continue + } trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending) if err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msg("observeOutTx: error GetAllOutTxTrackerByChain") + ob.logger.OutTx.Error().Err(err).Msgf("WatchOutTx: error GetAllOutTxTrackerByChain for chain %d", ob.chain.ChainId) continue } for _, tracker := range trackers { @@ -1105,16 +1121,16 @@ func (ob *BTCChainClient) observeOutTx() { outTxID := ob.GetTxID(tracker.Nonce) cctx, err := ob.zetaClient.GetCctxByNonce(ob.chain.ChainId, tracker.Nonce) if err != nil { - ob.logger.ObserveOutTx.Info().Err(err).Msgf("observeOutTx: can't find cctx for nonce %d", tracker.Nonce) + ob.logger.OutTx.Info().Err(err).Msgf("WatchOutTx: can't find cctx for chain %d nonce %d", ob.chain.ChainId, tracker.Nonce) break } nonce := cctx.GetCurrentOutTxParam().OutboundTxTssNonce if tracker.Nonce != nonce { // Tanmay: it doesn't hurt to check - ob.logger.ObserveOutTx.Error().Msgf("observeOutTx: tracker nonce %d not match cctx nonce %d", tracker.Nonce, nonce) + ob.logger.OutTx.Error().Msgf("WatchOutTx: tracker nonce %d not match cctx nonce %d", tracker.Nonce, nonce) break } if len(tracker.HashList) > 1 { - ob.logger.ObserveOutTx.Warn().Msgf("observeOutTx: oops, outTxID %s got multiple (%d) outTx hashes", outTxID, len(tracker.HashList)) + ob.logger.OutTx.Warn().Msgf("WatchOutTx: oops, outTxID %s got multiple (%d) outTx hashes", outTxID, len(tracker.HashList)) } // iterate over all txHashes to find the truly included one. // we do it this (inefficient) way because we don't rely on the first one as it may be a false positive (for unknown reason). @@ -1125,10 +1141,10 @@ func (ob *BTCChainClient) observeOutTx() { if result != nil && !inMempool { // included txCount++ txResult = result - ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: included outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, tracker.Nonce) + ob.logger.OutTx.Info().Msgf("WatchOutTx: included outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, tracker.Nonce) if txCount > 1 { - ob.logger.ObserveOutTx.Error().Msgf( - "observeOutTx: checkIncludedTx passed, txCount %d chain %d nonce %d result %v", txCount, ob.chain.ChainId, tracker.Nonce, result) + ob.logger.OutTx.Error().Msgf( + "WatchOutTx: checkIncludedTx passed, txCount %d chain %d nonce %d result %v", txCount, ob.chain.ChainId, tracker.Nonce, result) } } } @@ -1136,12 +1152,12 @@ func (ob *BTCChainClient) observeOutTx() { ob.setIncludedTx(tracker.Nonce, txResult) } else if txCount > 1 { ob.removeIncludedTx(tracker.Nonce) // we can't tell which txHash is true, so we remove all (if any) to be safe - ob.logger.ObserveOutTx.Error().Msgf("observeOutTx: included multiple (%d) outTx for chain %d nonce %d", txCount, ob.chain.ChainId, tracker.Nonce) + ob.logger.OutTx.Error().Msgf("WatchOutTx: included multiple (%d) outTx for chain %d nonce %d", txCount, ob.chain.ChainId, tracker.Nonce) } } - ticker.UpdateInterval(ob.GetChainParams().OutTxTicker, ob.logger.ObserveOutTx) + ticker.UpdateInterval(ob.GetChainParams().OutTxTicker, ob.logger.OutTx) case <-ob.stop: - ob.logger.ObserveOutTx.Info().Msg("observeOutTx stopped") + ob.logger.OutTx.Info().Msgf("WatchOutTx stopped for chain %d", ob.chain.ChainId) return } } @@ -1153,17 +1169,17 @@ func (ob *BTCChainClient) checkIncludedTx(cctx *types.CrossChainTx, txHash strin outTxID := ob.GetTxID(cctx.GetCurrentOutTxParam().OutboundTxTssNonce) hash, getTxResult, err := ob.GetTxResultByHash(txHash) if err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msgf("checkIncludedTx: error GetTxResultByHash: %s", txHash) + ob.logger.OutTx.Error().Err(err).Msgf("checkIncludedTx: error GetTxResultByHash: %s", txHash) return nil, false } if txHash != getTxResult.TxID { // just in case, we'll use getTxResult.TxID later - ob.logger.ObserveOutTx.Error().Msgf("checkIncludedTx: inconsistent txHash %s and getTxResult.TxID %s", txHash, getTxResult.TxID) + ob.logger.OutTx.Error().Msgf("checkIncludedTx: inconsistent txHash %s and getTxResult.TxID %s", txHash, getTxResult.TxID) return nil, false } if getTxResult.Confirmations >= 0 { // check included tx only err = ob.checkTssOutTxResult(cctx, hash, getTxResult) if err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msgf("checkIncludedTx: error verify bitcoin outTx %s outTxID %s", txHash, outTxID) + ob.logger.OutTx.Error().Err(err).Msgf("checkIncludedTx: error verify bitcoin outTx %s outTxID %s", txHash, outTxID) return nil, false } return getTxResult, false // included @@ -1186,16 +1202,16 @@ func (ob *BTCChainClient) setIncludedTx(nonce uint64, getTxResult *btcjson.GetTr if nonce >= ob.pendingNonce { // try increasing pending nonce on every newly included outTx ob.pendingNonce = nonce + 1 } - ob.logger.ObserveOutTx.Info().Msgf("setIncludedTx: included new bitcoin outTx %s outTxID %s pending nonce %d", txHash, outTxID, ob.pendingNonce) + ob.logger.OutTx.Info().Msgf("setIncludedTx: included new bitcoin outTx %s outTxID %s pending nonce %d", txHash, outTxID, ob.pendingNonce) } else if txHash == res.TxID { // found same hash. ob.includedTxResults[outTxID] = getTxResult // update tx result as confirmations may increase if getTxResult.Confirmations > res.Confirmations { - ob.logger.ObserveOutTx.Info().Msgf("setIncludedTx: bitcoin outTx %s got confirmations %d", txHash, getTxResult.Confirmations) + ob.logger.OutTx.Info().Msgf("setIncludedTx: bitcoin outTx %s got confirmations %d", txHash, getTxResult.Confirmations) } } else { // found other hash. // be alert for duplicate payment!!! As we got a new hash paying same cctx (for whatever reason). delete(ob.includedTxResults, outTxID) // we can't tell which txHash is true, so we remove all to be safe - ob.logger.ObserveOutTx.Error().Msgf("setIncludedTx: duplicate payment by bitcoin outTx %s outTxID %s, prior outTx %s", txHash, outTxID, res.TxID) + ob.logger.OutTx.Error().Msgf("setIncludedTx: duplicate payment by bitcoin outTx %s outTxID %s, prior outTx %s", txHash, outTxID, res.TxID) } } @@ -1405,7 +1421,7 @@ func (ob *BTCChainClient) checkTSSVoutCancelled(params *types.OutboundTxParams, func (ob *BTCChainClient) BuildBroadcastedTxMap() error { var broadcastedTransactions []clienttypes.OutTxHashSQLType if err := ob.db.Find(&broadcastedTransactions).Error; err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("error iterating over db") + ob.logger.Chain.Error().Err(err).Msg("error iterating over db") return err } for _, entry := range broadcastedTransactions { @@ -1423,7 +1439,7 @@ func (ob *BTCChainClient) LoadLastBlock() error { //Load persisted block number var lastBlockNum clienttypes.LastBlockSQLType if err := ob.db.First(&lastBlockNum, clienttypes.LastBlockNumID).Error; err != nil { - ob.logger.ChainLogger.Info().Msg("LastBlockNum not found in DB, scan from latest") + ob.logger.Chain.Info().Msg("LastBlockNum not found in DB, scan from latest") ob.SetLastBlockHeightScanned(bn) } else { // #nosec G701 always in range @@ -1432,7 +1448,7 @@ func (ob *BTCChainClient) LoadLastBlock() error { //If persisted block number is too low, use the latest height if (bn - lastBN) > maxHeightDiff { - ob.logger.ChainLogger.Info().Msgf("LastBlockNum too low: %d, scan from latest", lastBlockNum.Num) + ob.logger.Chain.Info().Msgf("LastBlockNum too low: %d, scan from latest", lastBlockNum.Num) ob.SetLastBlockHeightScanned(bn) } } @@ -1440,7 +1456,7 @@ func (ob *BTCChainClient) LoadLastBlock() error { if ob.chain.ChainId == 18444 { // bitcoin regtest: start from block 100 ob.SetLastBlockHeightScanned(100) } - ob.logger.ChainLogger.Info().Msgf("%s: start scanning from block %d", ob.chain.String(), ob.GetLastBlockHeightScanned()) + ob.logger.Chain.Info().Msgf("%s: start scanning from block %d", ob.chain.String(), ob.GetLastBlockHeightScanned()) return nil } diff --git a/zetaclient/bitcoin/inbound_tracker.go b/zetaclient/bitcoin/inbound_tracker.go index 2a6b80c124..2fc684f054 100644 --- a/zetaclient/bitcoin/inbound_tracker.go +++ b/zetaclient/bitcoin/inbound_tracker.go @@ -10,10 +10,10 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/zetabridge" ) -func (ob *BTCChainClient) ExternalChainWatcherForNewInboundTrackerSuggestions() { - ticker, err := types.NewDynamicTicker("Bitcoin_WatchInTx_InboundTrackerSuggestions", ob.GetChainParams().InTxTicker) +func (ob *BTCChainClient) WatchIntxTracker() { + ticker, err := types.NewDynamicTicker("Bitcoin_WatchIntxTracker", ob.GetChainParams().InTxTicker) if err != nil { - ob.logger.WatchInTx.Err(err).Msg("error creating ticker") + ob.logger.InTx.Err(err).Msg("error creating ticker") return } @@ -21,13 +21,16 @@ func (ob *BTCChainClient) ExternalChainWatcherForNewInboundTrackerSuggestions() for { select { case <-ticker.C(): + if !ob.GetChainParams().IsSupported { + continue + } err := ob.ObserveTrackerSuggestions() if err != nil { - ob.logger.WatchInTx.Error().Err(err).Msg("error observing in tx") + ob.logger.InTx.Error().Err(err).Msgf("error observing intx tracker for chain %d", ob.chain.ChainId) } - ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.WatchInTx) + ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.InTx) case <-ob.stop: - ob.logger.WatchInTx.Info().Msg("ExternalChainWatcher for BTC inboundTrackerSuggestions stopped") + ob.logger.InTx.Info().Msgf("WatchIntxTracker stopped for chain %d", ob.chain.ChainId) return } } @@ -39,12 +42,12 @@ func (ob *BTCChainClient) ObserveTrackerSuggestions() error { return err } for _, tracker := range trackers { - ob.logger.WatchInTx.Info().Msgf("checking tracker with hash :%s and coin-type :%s ", tracker.TxHash, tracker.CoinType) + ob.logger.InTx.Info().Msgf("checking tracker with hash :%s and coin-type :%s ", tracker.TxHash, tracker.CoinType) ballotIdentifier, err := ob.CheckReceiptForBtcTxHash(tracker.TxHash, true) if err != nil { return err } - ob.logger.WatchInTx.Info().Msgf("Vote submitted for inbound Tracker,Chain : %s,Ballot Identifier : %s, coin-type %s", ob.chain.ChainName, ballotIdentifier, coin.CoinType_Gas.String()) + ob.logger.InTx.Info().Msgf("Vote submitted for inbound Tracker,Chain : %s,Ballot Identifier : %s, coin-type %s", ob.chain.ChainName, ballotIdentifier, coin.CoinType_Gas.String()) } return nil } @@ -69,13 +72,13 @@ func (ob *BTCChainClient) CheckReceiptForBtcTxHash(txHash string, vote bool) (st if len(blockVb.Tx) <= 1 { return "", fmt.Errorf("block %d has no transactions", blockVb.Height) } - depositorFee := CalcDepositorFee(blockVb, ob.chain.ChainId, ob.netParams, ob.logger.WatchInTx) + depositorFee := CalcDepositorFee(blockVb, ob.chain.ChainId, ob.netParams, ob.logger.InTx) tss, err := ob.zetaClient.GetBtcTssAddress(ob.chain.ChainId) if err != nil { return "", err } // #nosec G701 always positive - event, err := GetBtcEvent(*tx, tss, uint64(blockVb.Height), &ob.logger.WatchInTx, ob.netParams, depositorFee) + event, err := GetBtcEvent(*tx, tss, uint64(blockVb.Height), &ob.logger.InTx, ob.netParams, depositorFee) if err != nil { return "", err } @@ -91,10 +94,10 @@ func (ob *BTCChainClient) CheckReceiptForBtcTxHash(txHash string, vote bool) (st } zetaHash, ballot, err := ob.zetaClient.PostVoteInbound(zetabridge.PostVoteInboundGasLimit, zetabridge.PostVoteInboundExecutionGasLimit, msg) if err != nil { - ob.logger.WatchInTx.Error().Err(err).Msg("error posting to zeta core") + ob.logger.InTx.Error().Err(err).Msg("error posting to zeta core") return "", err } else if zetaHash != "" { - ob.logger.WatchInTx.Info().Msgf("BTC deposit detected and reported: PostVoteInbound zeta tx hash: %s inTx %s ballot %s fee %v", + ob.logger.InTx.Info().Msgf("BTC deposit detected and reported: PostVoteInbound zeta tx hash: %s inTx %s ballot %s fee %v", zetaHash, txHash, ballot, depositorFee) } return msg.Digest(), nil diff --git a/zetaclient/evm/evm_client.go b/zetaclient/evm/evm_client.go index a70401365e..79ce6fa0f4 100644 --- a/zetaclient/evm/evm_client.go +++ b/zetaclient/evm/evm_client.go @@ -13,20 +13,7 @@ import ( "sync/atomic" "time" - "github.com/zeta-chain/zetacore/pkg/chains" - "github.com/zeta-chain/zetacore/pkg/coin" - "github.com/zeta-chain/zetacore/pkg/proofs" - appcontext "github.com/zeta-chain/zetacore/zetaclient/app_context" - corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" - - "github.com/zeta-chain/zetacore/zetaclient/interfaces" - "github.com/zeta-chain/zetacore/zetaclient/metrics" - "github.com/zeta-chain/zetacore/zetaclient/zetabridge" - "github.com/ethereum/go-ethereum" - "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/zeta.non-eth.sol" - zetaconnectoreth "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/zetaconnector.eth.sol" - "github.com/ethereum/go-ethereum/accounts/abi/bind" ethcommon "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" @@ -38,13 +25,23 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/erc20custody.sol" + "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/zeta.non-eth.sol" + zetaconnectoreth "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/zetaconnector.eth.sol" "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/zetaconnector.non-eth.sol" + "github.com/zeta-chain/zetacore/pkg/chains" + "github.com/zeta-chain/zetacore/pkg/coin" + "github.com/zeta-chain/zetacore/pkg/proofs" crosschaintypes "github.com/zeta-chain/zetacore/x/crosschain/types" observertypes "github.com/zeta-chain/zetacore/x/observer/types" + appcontext "github.com/zeta-chain/zetacore/zetaclient/app_context" clientcommon "github.com/zeta-chain/zetacore/zetaclient/common" "github.com/zeta-chain/zetacore/zetaclient/compliance" "github.com/zeta-chain/zetacore/zetaclient/config" + corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" + "github.com/zeta-chain/zetacore/zetaclient/interfaces" + "github.com/zeta-chain/zetacore/zetaclient/metrics" clienttypes "github.com/zeta-chain/zetacore/zetaclient/types" + "github.com/zeta-chain/zetacore/zetaclient/zetabridge" "gorm.io/driver/sqlite" "gorm.io/gorm" ) @@ -60,11 +57,11 @@ type OutTx struct { Nonce int64 } type Log struct { - ChainLogger zerolog.Logger // Parent logger - ExternalChainWatcher zerolog.Logger // Observes external Chains for incoming trasnactions - WatchGasPrice zerolog.Logger // Observes external Chains for Gas prices and posts to core - ObserveOutTx zerolog.Logger // Observes external Chains for outgoing transactions - Compliance zerolog.Logger // Compliance logger + Chain zerolog.Logger // The parent logger for the chain + InTx zerolog.Logger // Logger for incoming trasnactions + OutTx zerolog.Logger // Logger for outgoing transactions + GasPrice zerolog.Logger // Logger for gas prices + Compliance zerolog.Logger // Logger for compliance checks } var _ interfaces.ChainClient = &ChainClient{} @@ -111,11 +108,11 @@ func NewEVMChainClient( } chainLogger := loggers.Std.With().Str("chain", evmCfg.Chain.ChainName.String()).Logger() ob.logger = Log{ - ChainLogger: chainLogger, - ExternalChainWatcher: chainLogger.With().Str("module", "ExternalChainWatcher").Logger(), - WatchGasPrice: chainLogger.With().Str("module", "WatchGasPrice").Logger(), - ObserveOutTx: chainLogger.With().Str("module", "ObserveOutTx").Logger(), - Compliance: loggers.Compliance, + Chain: chainLogger, + InTx: chainLogger.With().Str("module", "WatchInTx").Logger(), + OutTx: chainLogger.With().Str("module", "WatchOutTx").Logger(), + GasPrice: chainLogger.With().Str("module", "WatchGasPrice").Logger(), + Compliance: loggers.Compliance, } ob.coreContext = appContext.ZetaCoreContext() chainParams, found := ob.coreContext.GetEVMChainParams(evmCfg.Chain.ChainId) @@ -133,10 +130,10 @@ func NewEVMChainClient( ob.outTXConfirmedReceipts = make(map[string]*ethtypes.Receipt) ob.outTXConfirmedTransactions = make(map[string]*ethtypes.Transaction) - ob.logger.ChainLogger.Info().Msgf("Chain %s endpoint %s", ob.chain.ChainName.String(), evmCfg.Endpoint) + ob.logger.Chain.Info().Msgf("Chain %s endpoint %s", ob.chain.ChainName.String(), evmCfg.Endpoint) client, err := ethclient.Dial(evmCfg.Endpoint) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("eth Client Dial") + ob.logger.Chain.Error().Err(err).Msg("eth Client Dial") return nil, err } ob.evmClient = client @@ -145,12 +142,12 @@ func NewEVMChainClient( // create block header and block caches ob.blockCache, err = lru.New(1000) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("failed to create block cache") + ob.logger.Chain.Error().Err(err).Msg("failed to create block cache") return nil, err } ob.headerCache, err = lru.New(1000) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("failed to create header cache") + ob.logger.Chain.Error().Err(err).Msg("failed to create header cache") return nil, err } @@ -159,7 +156,7 @@ func NewEVMChainClient( return nil, err } - ob.logger.ChainLogger.Info().Msgf("%s: start scanning from block %d", ob.chain.String(), ob.GetLastBlockHeightScanned()) + ob.logger.Chain.Info().Msgf("%s: start scanning from block %d", ob.chain.String(), ob.GetLastBlockHeightScanned()) return &ob, nil } @@ -172,10 +169,10 @@ func (ob *ChainClient) WithLogger(logger zerolog.Logger) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.logger = Log{ - ChainLogger: logger, - ExternalChainWatcher: logger.With().Str("module", "ExternalChainWatcher").Logger(), - WatchGasPrice: logger.With().Str("module", "WatchGasPrice").Logger(), - ObserveOutTx: logger.With().Str("module", "ObserveOutTx").Logger(), + Chain: logger, + InTx: logger.With().Str("module", "WatchInTx").Logger(), + OutTx: logger.With().Str("module", "WatchOutTx").Logger(), + GasPrice: logger.With().Str("module", "WatchGasPrice").Logger(), } } @@ -255,43 +252,48 @@ func FetchERC20CustodyContract(addr ethcommon.Address, client interfaces.EVMRPCC return erc20custody.NewERC20Custody(addr, client) } +// Start all observation routines for the external chain func (ob *ChainClient) Start() { - go ob.ExternalChainWatcherForNewInboundTrackerSuggestions() - go ob.ExternalChainWatcher() // Observes external Chains for incoming trasnactions - go ob.WatchGasPrice() // Observes external Chains for Gas prices and posts to core - go ob.observeOutTx() // Populates receipts and confirmed outbound transactions - go ob.ExternalChainRPCStatus() + go ob.WatchIntxTracker() + go ob.WatchInTx() + go ob.WatchGasPrice() + go ob.WatchOutTx() + go ob.WatchRPCStatus() } -func (ob *ChainClient) ExternalChainRPCStatus() { - ob.logger.ChainLogger.Info().Msgf("Starting RPC status check for chain %s", ob.chain.String()) +// WatchRPCStatus watches the RPC status of the external chain +func (ob *ChainClient) WatchRPCStatus() { + ob.logger.Chain.Info().Msgf("Starting RPC status check for chain %s", ob.chain.String()) ticker := time.NewTicker(60 * time.Second) for { select { case <-ticker.C: + if !ob.GetChainParams().IsSupported { + continue + } bn, err := ob.evmClient.BlockNumber(context.Background()) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC Status Check error: RPC down?") + ob.logger.Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?") continue } gasPrice, err := ob.evmClient.SuggestGasPrice(context.Background()) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC Status Check error: RPC down?") + ob.logger.Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?") continue } header, err := ob.evmClient.HeaderByNumber(context.Background(), new(big.Int).SetUint64(bn)) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC Status Check error: RPC down?") + ob.logger.Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?") continue } // #nosec G701 always in range blockTime := time.Unix(int64(header.Time), 0).UTC() elapsedSeconds := time.Since(blockTime).Seconds() if elapsedSeconds > 100 { - ob.logger.ChainLogger.Warn().Msgf("RPC Status Check warning: RPC stale or chain stuck (check explorer)? Latest block %d timestamp is %.0fs ago", bn, elapsedSeconds) + ob.logger.Chain.Warn().Msgf("RPC Status Check warning: RPC stale or chain stuck (check explorer)? Latest block %d timestamp is %.0fs ago", bn, elapsedSeconds) continue } - ob.logger.ChainLogger.Info().Msgf("[OK] RPC status: latest block num %d, timestamp %s ( %.0fs ago), suggested gas price %d", header.Number, blockTime.String(), elapsedSeconds, gasPrice.Uint64()) + ob.logger.Chain.Info().Msgf("[OK] RPC status: latest block num %d, timestamp %s ( %.0fs ago), suggested gas price %d", header.Number, blockTime.String(), elapsedSeconds, gasPrice.Uint64()) case <-ob.stop: return } @@ -299,20 +301,20 @@ func (ob *ChainClient) ExternalChainRPCStatus() { } func (ob *ChainClient) Stop() { - ob.logger.ChainLogger.Info().Msgf("ob %s is stopping", ob.chain.String()) + ob.logger.Chain.Info().Msgf("ob %s is stopping", ob.chain.String()) close(ob.stop) // this notifies all goroutines to stop - ob.logger.ChainLogger.Info().Msg("closing ob.db") + ob.logger.Chain.Info().Msg("closing ob.db") dbInst, err := ob.db.DB() if err != nil { - ob.logger.ChainLogger.Info().Msg("error getting database instance") + ob.logger.Chain.Info().Msg("error getting database instance") } err = dbInst.Close() if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("error closing database") + ob.logger.Chain.Error().Err(err).Msg("error closing database") } - ob.logger.ChainLogger.Info().Msgf("%s observer stopped", ob.chain.String()) + ob.logger.Chain.Info().Msgf("%s observer stopped", ob.chain.String()) } // returns: isIncluded, isConfirmed, Error @@ -609,19 +611,18 @@ func (ob *ChainClient) IsSendOutTxProcessed(cctx *crosschaintypes.CrossChainTx, return false, false, nil } -// FIXME: there's a chance that a txhash in OutTxChan may not deliver when Stop() is called -// observeOutTx periodically checks all the txhash in potential outbound txs -func (ob *ChainClient) observeOutTx() { +// WatchOutTx watches external chain for outgoing txs status +func (ob *ChainClient) WatchOutTx() { // read env variables if set timeoutNonce, err := strconv.Atoi(os.Getenv("OS_TIMEOUT_NONCE")) if err != nil || timeoutNonce <= 0 { timeoutNonce = 100 * 3 // process up to 100 hashes } - ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: using timeoutNonce %d seconds", timeoutNonce) + ob.logger.OutTx.Info().Msgf("WatchOutTx: using timeoutNonce %d seconds", timeoutNonce) - ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_observeOutTx_%d", ob.chain.ChainId), ob.GetChainParams().OutTxTicker) + ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchOutTx_%d", ob.chain.ChainId), ob.GetChainParams().OutTxTicker) if err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msg("failed to create ticker") + ob.logger.OutTx.Error().Err(err).Msg("error creating ticker") return } @@ -629,6 +630,9 @@ func (ob *ChainClient) observeOutTx() { for { select { case <-ticker.C(): + if !ob.GetChainParams().IsSupported { + continue + } trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending) if err != nil { continue @@ -647,17 +651,17 @@ func (ob *ChainClient) observeOutTx() { for _, txHash := range tracker.HashList { select { case <-outTimeout: - ob.logger.ObserveOutTx.Warn().Msgf("observeOutTx: timeout on chain %d nonce %d", ob.chain.ChainId, nonceInt) + ob.logger.OutTx.Warn().Msgf("WatchOutTx: timeout on chain %d nonce %d", ob.chain.ChainId, nonceInt) break TRACKERLOOP default: if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok { txCount++ receipt = recpt transaction = tx - ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt) + ob.logger.OutTx.Info().Msgf("WatchOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt) if txCount > 1 { - ob.logger.ObserveOutTx.Error().Msgf( - "observeOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction) + ob.logger.OutTx.Error().Msgf( + "WatchOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction) } } } @@ -665,12 +669,12 @@ func (ob *ChainClient) observeOutTx() { if txCount == 1 { // should be only one txHash confirmed for each nonce. ob.SetTxNReceipt(nonceInt, receipt, transaction) } else if txCount > 1 { // should not happen. We can't tell which txHash is true. It might happen (e.g. glitchy/hacked endpoint) - ob.logger.ObserveOutTx.Error().Msgf("observeOutTx: confirmed multiple (%d) outTx for chain %d nonce %d", txCount, ob.chain.ChainId, nonceInt) + ob.logger.OutTx.Error().Msgf("WatchOutTx: confirmed multiple (%d) outTx for chain %d nonce %d", txCount, ob.chain.ChainId, nonceInt) } } - ticker.UpdateInterval(ob.GetChainParams().OutTxTicker, ob.logger.ObserveOutTx) + ticker.UpdateInterval(ob.GetChainParams().OutTxTicker, ob.logger.OutTx) case <-ob.stop: - ob.logger.ObserveOutTx.Info().Msg("observeOutTx: stopped") + ob.logger.OutTx.Info().Msg("WatchOutTx: stopped") return } } @@ -836,26 +840,31 @@ func (ob *ChainClient) GetLastBlockHeight() uint64 { return height } -func (ob *ChainClient) ExternalChainWatcher() { - ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_ExternalChainWatcher_%d", ob.chain.ChainId), ob.GetChainParams().InTxTicker) +// WatchInTx watches external chain for incoming txs and post votes to zetacore +func (ob *ChainClient) WatchInTx() { + ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchInTx_%d", ob.chain.ChainId), ob.GetChainParams().InTxTicker) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msg("NewDynamicTicker error") + ob.logger.InTx.Error().Err(err).Msg("error creating ticker") return } defer ticker.Stop() - ob.logger.ExternalChainWatcher.Info().Msg("ExternalChainWatcher started") - sampledLogger := ob.logger.ExternalChainWatcher.Sample(&zerolog.BasicSampler{N: 10}) + ob.logger.InTx.Info().Msgf("WatchInTx started for chain %d", ob.chain.ChainId) + sampledLogger := ob.logger.InTx.Sample(&zerolog.BasicSampler{N: 10}) for { select { case <-ticker.C(): + if !ob.GetChainParams().IsSupported { + sampledLogger.Info().Msgf("WatchInTx: chain %d is not supported", ob.chain.ChainId) + continue + } err := ob.observeInTX(sampledLogger) if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msg("observeInTX error") + ob.logger.InTx.Err(err).Msg("WatchInTx: observeInTX error") } - ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.ExternalChainWatcher) + ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.InTx) case <-ob.stop: - ob.logger.ExternalChainWatcher.Info().Msg("ExternalChainWatcher stopped") + ob.logger.InTx.Info().Msgf("WatchInTx stopped for chain %d", ob.chain.ChainId) return } } @@ -886,12 +895,12 @@ func (ob *ChainClient) postBlockHeader(tip uint64) error { header, err := ob.GetBlockHeaderCached(bn) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error getting block: %d", bn) + ob.logger.InTx.Error().Err(err).Msgf("postBlockHeader: error getting block: %d", bn) return err } headerRLP, err := rlp.EncodeToBytes(header) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error encoding block header: %d", bn) + ob.logger.InTx.Error().Err(err).Msgf("postBlockHeader: error encoding block header: %d", bn) return err } @@ -902,7 +911,7 @@ func (ob *ChainClient) postBlockHeader(tip uint64) error { proofs.NewEthereumHeader(headerRLP), ) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error posting block header: %d", bn) + ob.logger.InTx.Error().Err(err).Msgf("postBlockHeader: error posting block header: %d", bn) return err } return nil @@ -969,7 +978,7 @@ func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error { ob.chain.ChainId, lastScannedZetaSent, lastScannedDeposited, lastScannedTssRecvd) ob.SetLastBlockHeightScanned(lastScannedLowest) if err := ob.db.Save(clienttypes.ToLastBlockSQLType(lastScannedLowest)).Error; err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("observeInTX: error writing lastScannedLowest %d to db", lastScannedLowest) + ob.logger.InTx.Error().Err(err).Msgf("observeInTX: error writing lastScannedLowest %d to db", lastScannedLowest) } } return nil @@ -981,7 +990,7 @@ func (ob *ChainClient) ObserveZetaSent(startBlock, toBlock uint64) uint64 { // filter ZetaSent logs addrConnector, connector, err := ob.GetConnectorContract() if err != nil { - ob.logger.ChainLogger.Warn().Err(err).Msgf("ObserveZetaSent: GetConnectorContract error:") + ob.logger.Chain.Warn().Err(err).Msgf("ObserveZetaSent: GetConnectorContract error:") return startBlock - 1 // lastScanned } iter, err := connector.FilterZetaSent(&bind.FilterOpts{ @@ -990,7 +999,7 @@ func (ob *ChainClient) ObserveZetaSent(startBlock, toBlock uint64) uint64 { Context: context.TODO(), }, []ethcommon.Address{}, []*big.Int{}) if err != nil { - ob.logger.ChainLogger.Warn().Err(err).Msgf( + ob.logger.Chain.Warn().Err(err).Msgf( "ObserveZetaSent: FilterZetaSent error from block %d to %d for chain %d", startBlock, toBlock, ob.chain.ChainId) return startBlock - 1 // lastScanned } @@ -1004,7 +1013,7 @@ func (ob *ChainClient) ObserveZetaSent(startBlock, toBlock uint64) uint64 { events = append(events, iter.Event) continue } - ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf("ObserveZetaSent: invalid ZetaSent event in tx %s on chain %d at height %d", + ob.logger.InTx.Warn().Err(err).Msgf("ObserveZetaSent: invalid ZetaSent event in tx %s on chain %d at height %d", iter.Event.Raw.TxHash.Hex(), ob.chain.ChainId, iter.Event.Raw.BlockNumber) } sort.SliceStable(events, func(i, j int) bool { @@ -1030,7 +1039,7 @@ func (ob *ChainClient) ObserveZetaSent(startBlock, toBlock uint64) uint64 { } // guard against multiple events in the same tx if guard[event.Raw.TxHash.Hex()] { - ob.logger.ExternalChainWatcher.Warn().Msgf("ObserveZetaSent: multiple remote call events detected in tx %s", event.Raw.TxHash) + ob.logger.InTx.Warn().Msgf("ObserveZetaSent: multiple remote call events detected in tx %s", event.Raw.TxHash) continue } guard[event.Raw.TxHash.Hex()] = true @@ -1053,7 +1062,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 // filter ERC20CustodyDeposited logs addrCustody, erc20custodyContract, err := ob.GetERC20CustodyContract() if err != nil { - ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf("ObserveERC20Deposited: GetERC20CustodyContract error:") + ob.logger.InTx.Warn().Err(err).Msgf("ObserveERC20Deposited: GetERC20CustodyContract error:") return startBlock - 1 // lastScanned } iter, err := erc20custodyContract.FilterDeposited(&bind.FilterOpts{ @@ -1062,7 +1071,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 Context: context.TODO(), }, []ethcommon.Address{}) if err != nil { - ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf( + ob.logger.InTx.Warn().Err(err).Msgf( "ObserveERC20Deposited: FilterDeposited error from block %d to %d for chain %d", startBlock, toBlock, ob.chain.ChainId) return startBlock - 1 // lastScanned } @@ -1076,7 +1085,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 events = append(events, iter.Event) continue } - ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf("ObserveERC20Deposited: invalid Deposited event in tx %s on chain %d at height %d", + ob.logger.InTx.Warn().Err(err).Msgf("ObserveERC20Deposited: invalid Deposited event in tx %s on chain %d at height %d", iter.Event.Raw.TxHash.Hex(), ob.chain.ChainId, iter.Event.Raw.BlockNumber) } sort.SliceStable(events, func(i, j int) bool { @@ -1102,7 +1111,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 } tx, _, err := ob.TransactionByHash(event.Raw.TxHash.Hex()) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf( + ob.logger.InTx.Error().Err(err).Msgf( "ObserveERC20Deposited: error getting transaction for intx %s chain %d", event.Raw.TxHash, ob.chain.ChainId) return beingScanned - 1 // we have to re-scan from this block next time } @@ -1110,7 +1119,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 // guard against multiple events in the same tx if guard[event.Raw.TxHash.Hex()] { - ob.logger.ExternalChainWatcher.Warn().Msgf("ObserveERC20Deposited: multiple remote call events detected in tx %s", event.Raw.TxHash) + ob.logger.InTx.Warn().Msgf("ObserveERC20Deposited: multiple remote call events detected in tx %s", event.Raw.TxHash) continue } guard[event.Raw.TxHash.Hex()] = true @@ -1130,10 +1139,6 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 // ObserverTSSReceive queries the incoming gas asset to TSS address and posts to zetabridge // returns the last block successfully scanned func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags observertypes.CrosschainFlags) uint64 { - if !ob.GetChainParams().IsSupported { - return startBlock - 1 // lastScanned - } - // query incoming gas asset for bn := startBlock; bn <= toBlock; bn++ { // post new block header (if any) to zetabridge and ignore error @@ -1143,14 +1148,14 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags obse chains.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains err := ob.postBlockHeader(toBlock) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msg("error posting block header") + ob.logger.InTx.Error().Err(err).Msg("error posting block header") } } // observe TSS received gas token in block 'bn' err := ob.ObserveTSSReceiveInBlock(bn) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("ObserverTSSReceive: error observing TSS received token in block %d for chain %d", bn, ob.chain.ChainId) + ob.logger.InTx.Error().Err(err).Msgf("ObserverTSSReceive: error observing TSS received token in block %d for chain %d", bn, ob.chain.ChainId) return bn - 1 // we have to re-scan from this block next time } } @@ -1158,41 +1163,45 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags obse return toBlock } +// WatchGasPrice watches external chain for gas prices and post to zetacore func (ob *ChainClient) WatchGasPrice() { - ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice starting...") + ob.logger.GasPrice.Info().Msg("WatchGasPrice starting...") err := ob.PostGasPrice() if err != nil { height, err := ob.zetaClient.GetBlockHeight() if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msg("GetBlockHeight error") + ob.logger.GasPrice.Error().Err(err).Msg("GetBlockHeight error") } else { - ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height) + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height) } } ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchGasPrice_%d", ob.chain.ChainId), ob.GetChainParams().GasPriceTicker) if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msg("NewDynamicTicker error") + ob.logger.GasPrice.Error().Err(err).Msg("NewDynamicTicker error") return } - ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started with interval %d", ob.GetChainParams().GasPriceTicker) + ob.logger.GasPrice.Info().Msgf("WatchGasPrice started with interval %d", ob.GetChainParams().GasPriceTicker) defer ticker.Stop() for { select { case <-ticker.C(): + if !ob.GetChainParams().IsSupported { + continue + } err = ob.PostGasPrice() if err != nil { height, err := ob.zetaClient.GetBlockHeight() if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msg("GetBlockHeight error") + ob.logger.GasPrice.Error().Err(err).Msg("GetBlockHeight error") } else { - ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height) + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height) } } - ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice) + ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.GasPrice) case <-ob.stop: - ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice stopped") + ob.logger.GasPrice.Info().Msg("WatchGasPrice stopped") return } } @@ -1203,12 +1212,12 @@ func (ob *ChainClient) PostGasPrice() error { // GAS PRICE gasPrice, err := ob.evmClient.SuggestGasPrice(context.TODO()) if err != nil { - ob.logger.WatchGasPrice.Err(err).Msg("Err SuggestGasPrice:") + ob.logger.GasPrice.Err(err).Msg("Err SuggestGasPrice:") return err } blockNum, err := ob.evmClient.BlockNumber(context.TODO()) if err != nil { - ob.logger.WatchGasPrice.Err(err).Msg("Err Fetching Most recent Block : ") + ob.logger.GasPrice.Err(err).Msg("Err Fetching Most recent Block : ") return err } @@ -1217,7 +1226,7 @@ func (ob *ChainClient) PostGasPrice() error { zetaHash, err := ob.zetaClient.PostGasPrice(ob.chain, gasPrice.Uint64(), supply, blockNum) if err != nil { - ob.logger.WatchGasPrice.Err(err).Msg("PostGasPrice to zetabridge failed") + ob.logger.GasPrice.Err(err).Msg("PostGasPrice to zetabridge failed") return err } _ = zetaHash @@ -1226,7 +1235,7 @@ func (ob *ChainClient) PostGasPrice() error { } func (ob *ChainClient) BuildLastBlock() error { - logger := ob.logger.ChainLogger.With().Str("module", "BuildBlockIndex").Logger() + logger := ob.logger.Chain.With().Str("module", "BuildBlockIndex").Logger() envvar := ob.chain.ChainName.String() + "_SCAN_FROM" scanFromBlock := os.Getenv(envvar) if scanFromBlock != "" { @@ -1267,7 +1276,7 @@ func (ob *ChainClient) BuildReceiptsMap() error { logger := ob.logger var receipts []clienttypes.ReceiptSQLType if err := ob.db.Find(&receipts).Error; err != nil { - logger.ChainLogger.Error().Err(err).Msg("error iterating over db") + logger.Chain.Error().Err(err).Msg("error iterating over db") return err } for _, receipt := range receipts { @@ -1300,7 +1309,7 @@ func (ob *ChainClient) LoadDB(dbPath string, chain chains.Chain) error { &clienttypes.TransactionSQLType{}, &clienttypes.LastBlockSQLType{}) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("error migrating db") + ob.logger.Chain.Error().Err(err).Msg("error migrating db") return err } diff --git a/zetaclient/evm/inbounds.go b/zetaclient/evm/inbounds.go index 2daa4fc0c1..d6f6fcb223 100644 --- a/zetaclient/evm/inbounds.go +++ b/zetaclient/evm/inbounds.go @@ -26,30 +26,33 @@ import ( "golang.org/x/net/context" ) -// ExternalChainWatcherForNewInboundTrackerSuggestions At each tick, gets a list of Inbound tracker suggestions from zeta-core and tries to check if the in-tx was confirmed. +// WatchIntxTracker gets a list of Inbound tracker suggestions from zeta-core at each tick and tries to check if the in-tx was confirmed. // If it was, it tries to broadcast the confirmation vote. If this zeta client has previously broadcast the vote, the tx would be rejected -func (ob *ChainClient) ExternalChainWatcherForNewInboundTrackerSuggestions() { +func (ob *ChainClient) WatchIntxTracker() { ticker, err := clienttypes.NewDynamicTicker( - fmt.Sprintf("EVM_ExternalChainWatcher_InboundTrackerSuggestions_%d", ob.chain.ChainId), + fmt.Sprintf("EVM_WatchIntxTracker_%d", ob.chain.ChainId), ob.GetChainParams().InTxTicker, ) if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msg("error creating ticker") + ob.logger.InTx.Err(err).Msg("error creating ticker") return } defer ticker.Stop() - ob.logger.ExternalChainWatcher.Info().Msg("ExternalChainWatcher for inboundTrackerSuggestions started") + ob.logger.InTx.Info().Msgf("Intx tracker watcher started for chain %d", ob.chain.ChainId) for { select { case <-ticker.C(): + if !ob.GetChainParams().IsSupported { + continue + } err := ob.ObserveIntxTrackers() if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msg("ObserveTrackerSuggestions error") + ob.logger.InTx.Err(err).Msg("ObserveTrackerSuggestions error") } - ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.ExternalChainWatcher) + ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.InTx) case <-ob.stop: - ob.logger.ExternalChainWatcher.Info().Msg("ExternalChainWatcher for inboundTrackerSuggestions stopped") + ob.logger.InTx.Info().Msg("ExternalChainWatcher for inboundTrackerSuggestions stopped") return } } @@ -71,7 +74,7 @@ func (ob *ChainClient) ObserveIntxTrackers() error { if err != nil { return errors.Wrapf(err, "error getting receipt for intx %s chain %d", tracker.TxHash, ob.chain.ChainId) } - ob.logger.ExternalChainWatcher.Info().Msgf("checking tracker for intx %s chain %d", tracker.TxHash, ob.chain.ChainId) + ob.logger.InTx.Info().Msgf("checking tracker for intx %s chain %d", tracker.TxHash, ob.chain.ChainId) // check and vote on inbound tx switch tracker.CoinType { @@ -113,7 +116,7 @@ func (ob *ChainClient) CheckAndVoteInboundTokenZeta(tx *ethrpc.Transaction, rece if err == nil { msg = ob.BuildInboundVoteMsgForZetaSentEvent(event) } else { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("CheckEvmTxLog error on intx %s chain %d", tx.Hash, ob.chain.ChainId) + ob.logger.InTx.Error().Err(err).Msgf("CheckEvmTxLog error on intx %s chain %d", tx.Hash, ob.chain.ChainId) return "", err } break // only one event is allowed per tx @@ -121,7 +124,7 @@ func (ob *ChainClient) CheckAndVoteInboundTokenZeta(tx *ethrpc.Transaction, rece } if msg == nil { // no event, restricted tx, etc. - ob.logger.ExternalChainWatcher.Info().Msgf("no ZetaSent event found for intx %s chain %d", tx.Hash, ob.chain.ChainId) + ob.logger.InTx.Info().Msgf("no ZetaSent event found for intx %s chain %d", tx.Hash, ob.chain.ChainId) return "", nil } if vote { @@ -154,7 +157,7 @@ func (ob *ChainClient) CheckAndVoteInboundTokenERC20(tx *ethrpc.Transaction, rec if err == nil { msg = ob.BuildInboundVoteMsgForDepositedEvent(zetaDeposited, sender) } else { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("CheckEvmTxLog error on intx %s chain %d", tx.Hash, ob.chain.ChainId) + ob.logger.InTx.Error().Err(err).Msgf("CheckEvmTxLog error on intx %s chain %d", tx.Hash, ob.chain.ChainId) return "", err } break // only one event is allowed per tx @@ -162,7 +165,7 @@ func (ob *ChainClient) CheckAndVoteInboundTokenERC20(tx *ethrpc.Transaction, rec } if msg == nil { // no event, donation, restricted tx, etc. - ob.logger.ExternalChainWatcher.Info().Msgf("no Deposited event found for intx %s chain %d", tx.Hash, ob.chain.ChainId) + ob.logger.InTx.Info().Msgf("no Deposited event found for intx %s chain %d", tx.Hash, ob.chain.ChainId) return "", nil } if vote { @@ -190,7 +193,7 @@ func (ob *ChainClient) CheckAndVoteInboundTokenGas(tx *ethrpc.Transaction, recei msg := ob.BuildInboundVoteMsgForTokenSentToTSS(tx, sender, receipt.BlockNumber.Uint64()) if msg == nil { // donation, restricted tx, etc. - ob.logger.ExternalChainWatcher.Info().Msgf("no vote message built for intx %s chain %d", tx.Hash, ob.chain.ChainId) + ob.logger.InTx.Info().Msgf("no vote message built for intx %s chain %d", tx.Hash, ob.chain.ChainId) return "", nil } if vote { @@ -205,12 +208,12 @@ func (ob *ChainClient) PostVoteInbound(msg *types.MsgVoteOnObservedInboundTx, co chainID := ob.chain.ChainId zetaHash, ballot, err := ob.zetaClient.PostVoteInbound(zetabridge.PostVoteInboundGasLimit, retryGasLimit, msg) if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msgf("intx detected: error posting vote for chain %d token %s intx %s", chainID, coinType, txHash) + ob.logger.InTx.Err(err).Msgf("intx detected: error posting vote for chain %d token %s intx %s", chainID, coinType, txHash) return "", err } else if zetaHash != "" { - ob.logger.ExternalChainWatcher.Info().Msgf("intx detected: chain %d token %s intx %s vote %s ballot %s", chainID, coinType, txHash, zetaHash, ballot) + ob.logger.InTx.Info().Msgf("intx detected: chain %d token %s intx %s vote %s ballot %s", chainID, coinType, txHash, zetaHash, ballot) } else { - ob.logger.ExternalChainWatcher.Info().Msgf("intx detected: chain %d token %s intx %s already voted on ballot %s", chainID, coinType, txHash, ballot) + ob.logger.InTx.Info().Msgf("intx detected: chain %d token %s intx %s already voted on ballot %s", chainID, coinType, txHash, ballot) } return ballot, err } @@ -230,18 +233,18 @@ func (ob *ChainClient) BuildInboundVoteMsgForDepositedEvent(event *erc20custody. maybeReceiver = parsedAddress.Hex() } if config.ContainRestrictedAddress(sender.Hex(), clienttypes.BytesToEthHex(event.Recipient), maybeReceiver) { - compliance.PrintComplianceLog(ob.logger.ExternalChainWatcher, ob.logger.Compliance, + compliance.PrintComplianceLog(ob.logger.InTx, ob.logger.Compliance, false, ob.chain.ChainId, event.Raw.TxHash.Hex(), sender.Hex(), clienttypes.BytesToEthHex(event.Recipient), "ERC20") return nil } // donation check if bytes.Equal(event.Message, []byte(constant.DonationMessage)) { - ob.logger.ExternalChainWatcher.Info().Msgf("thank you rich folk for your donation! tx %s chain %d", event.Raw.TxHash.Hex(), ob.chain.ChainId) + ob.logger.InTx.Info().Msgf("thank you rich folk for your donation! tx %s chain %d", event.Raw.TxHash.Hex(), ob.chain.ChainId) return nil } message := hex.EncodeToString(event.Message) - ob.logger.ExternalChainWatcher.Info().Msgf("ERC20CustodyDeposited inTx detected on chain %d tx %s block %d from %s value %s message %s", + ob.logger.InTx.Info().Msgf("ERC20CustodyDeposited inTx detected on chain %d tx %s block %d from %s value %s message %s", ob.chain.ChainId, event.Raw.TxHash.Hex(), event.Raw.BlockNumber, sender.Hex(), event.Amount.String(), message) return zetabridge.GetInBoundVoteMessage( @@ -266,7 +269,7 @@ func (ob *ChainClient) BuildInboundVoteMsgForDepositedEvent(event *erc20custody. func (ob *ChainClient) BuildInboundVoteMsgForZetaSentEvent(event *zetaconnector.ZetaConnectorNonEthZetaSent) *types.MsgVoteOnObservedInboundTx { destChain := chains.GetChainFromChainID(event.DestinationChainId.Int64()) if destChain == nil { - ob.logger.ExternalChainWatcher.Warn().Msgf("chain id not supported %d", event.DestinationChainId.Int64()) + ob.logger.InTx.Warn().Msgf("chain id not supported %d", event.DestinationChainId.Int64()) return nil } destAddr := clienttypes.BytesToEthHex(event.DestinationAddress) @@ -274,7 +277,7 @@ func (ob *ChainClient) BuildInboundVoteMsgForZetaSentEvent(event *zetaconnector. // compliance check sender := event.ZetaTxSenderAddress.Hex() if config.ContainRestrictedAddress(sender, destAddr, event.SourceTxOriginAddress.Hex()) { - compliance.PrintComplianceLog(ob.logger.ExternalChainWatcher, ob.logger.Compliance, + compliance.PrintComplianceLog(ob.logger.InTx, ob.logger.Compliance, false, ob.chain.ChainId, event.Raw.TxHash.Hex(), sender, destAddr, "Zeta") return nil } @@ -282,17 +285,17 @@ func (ob *ChainClient) BuildInboundVoteMsgForZetaSentEvent(event *zetaconnector. if !destChain.IsZetaChain() { paramsDest, found := ob.coreContext.GetEVMChainParams(destChain.ChainId) if !found { - ob.logger.ExternalChainWatcher.Warn().Msgf("chain id not present in EVMChainParams %d", event.DestinationChainId.Int64()) + ob.logger.InTx.Warn().Msgf("chain id not present in EVMChainParams %d", event.DestinationChainId.Int64()) return nil } if strings.EqualFold(destAddr, paramsDest.ZetaTokenContractAddress) { - ob.logger.ExternalChainWatcher.Warn().Msgf("potential attack attempt: %s destination address is ZETA token contract address %s", destChain, destAddr) + ob.logger.InTx.Warn().Msgf("potential attack attempt: %s destination address is ZETA token contract address %s", destChain, destAddr) return nil } } message := base64.StdEncoding.EncodeToString(event.Message) - ob.logger.ExternalChainWatcher.Info().Msgf("ZetaSent inTx detected on chain %d tx %s block %d from %s value %s message %s", + ob.logger.InTx.Info().Msgf("ZetaSent inTx detected on chain %d tx %s block %d from %s value %s message %s", ob.chain.ChainId, event.Raw.TxHash.Hex(), event.Raw.BlockNumber, sender, event.ZetaValueAndGas.String(), message) return zetabridge.GetInBoundVoteMessage( @@ -324,7 +327,7 @@ func (ob *ChainClient) BuildInboundVoteMsgForTokenSentToTSS(tx *ethrpc.Transacti maybeReceiver = parsedAddress.Hex() } if config.ContainRestrictedAddress(sender.Hex(), maybeReceiver) { - compliance.PrintComplianceLog(ob.logger.ExternalChainWatcher, ob.logger.Compliance, + compliance.PrintComplianceLog(ob.logger.InTx, ob.logger.Compliance, false, ob.chain.ChainId, tx.Hash, sender.Hex(), sender.Hex(), "Gas") return nil } @@ -333,10 +336,10 @@ func (ob *ChainClient) BuildInboundVoteMsgForTokenSentToTSS(tx *ethrpc.Transacti // #nosec G703 err is already checked data, _ := hex.DecodeString(message) if bytes.Equal(data, []byte(constant.DonationMessage)) { - ob.logger.ExternalChainWatcher.Info().Msgf("thank you rich folk for your donation! tx %s chain %d", tx.Hash, ob.chain.ChainId) + ob.logger.InTx.Info().Msgf("thank you rich folk for your donation! tx %s chain %d", tx.Hash, ob.chain.ChainId) return nil } - ob.logger.ExternalChainWatcher.Info().Msgf("TSS inTx detected on chain %d tx %s block %d from %s value %s message %s", + ob.logger.InTx.Info().Msgf("TSS inTx detected on chain %d tx %s block %d from %s value %s message %s", ob.chain.ChainId, tx.Hash, blockNumber, sender.Hex(), tx.Value.String(), message) return zetabridge.GetInBoundVoteMessage( diff --git a/zetaclient/interfaces/interfaces.go b/zetaclient/interfaces/interfaces.go index 75242ab17d..1d48fc149c 100644 --- a/zetaclient/interfaces/interfaces.go +++ b/zetaclient/interfaces/interfaces.go @@ -43,7 +43,7 @@ type ChainClient interface { SetChainParams(observertypes.ChainParams) GetChainParams() observertypes.ChainParams GetTxID(nonce uint64) string - ExternalChainWatcherForNewInboundTrackerSuggestions() + WatchIntxTracker() } // ChainSigner is the interface to sign transactions for a chain diff --git a/zetaclient/testutils/stub/chain_client.go b/zetaclient/testutils/stub/chain_client.go index 642de62792..f5a5368511 100644 --- a/zetaclient/testutils/stub/chain_client.go +++ b/zetaclient/testutils/stub/chain_client.go @@ -45,7 +45,7 @@ func (s *EVMClient) GetTxID(_ uint64) string { return "" } -func (s *EVMClient) ExternalChainWatcherForNewInboundTrackerSuggestions() { +func (s *EVMClient) WatchIntxTracker() { } // ---------------------------------------------------------------------------- @@ -86,5 +86,5 @@ func (s *BTCClient) GetTxID(_ uint64) string { return "" } -func (s *BTCClient) ExternalChainWatcherForNewInboundTrackerSuggestions() { +func (s *BTCClient) WatchIntxTracker() { } diff --git a/zetaclient/zetabridge/zetacore_bridge.go b/zetaclient/zetabridge/zetacore_bridge.go index 684092c744..5627b6b34c 100644 --- a/zetaclient/zetabridge/zetacore_bridge.go +++ b/zetaclient/zetabridge/zetacore_bridge.go @@ -221,15 +221,15 @@ func (b *ZetaCoreBridge) UpdateZetaCoreContext(coreContext *corecontext.ZetaCore // check and update chain params for each chain for _, chainParam := range chainParams { + if !chainParam.GetIsSupported() { + b.logger.Info().Msgf("Chain %d is not supported yet", chainParam.ChainId) + continue + } err := observertypes.ValidateChainParams(chainParam) if err != nil { b.logger.Warn().Err(err).Msgf("Invalid chain params for chain %d", chainParam.ChainId) continue } - if !chainParam.GetIsSupported() { - b.logger.Info().Msgf("Chain %d is not supported yet", chainParam.ChainId) - continue - } if chains.IsBitcoinChain(chainParam.ChainId) { newBTCParams = chainParam } else if chains.IsEVMChain(chainParam.ChainId) { @@ -237,12 +237,12 @@ func (b *ZetaCoreBridge) UpdateZetaCoreContext(coreContext *corecontext.ZetaCore } } - supporteChains, err := b.GetSupportedChains() + supportedChains, err := b.GetSupportedChains() if err != nil { return err } - newChains := make([]chains.Chain, len(supporteChains)) - for i, chain := range supporteChains { + newChains := make([]chains.Chain, len(supportedChains)) + for i, chain := range supportedChains { newChains[i] = *chain } keyGen, err := b.GetKeyGen() From 0d6c82dafc33f7ecba9449024cdc186d0eeb4035 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Mon, 1 Apr 2024 10:52:04 -0500 Subject: [PATCH 2/5] added changelog entry --- changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/changelog.md b/changelog.md index 7ae80ba98c..73e82567af 100644 --- a/changelog.md +++ b/changelog.md @@ -52,6 +52,7 @@ * [1861](https://github.com/zeta-chain/node/pull/1861) - fix `ObserverSlashAmount` invalid read * [1880](https://github.com/zeta-chain/node/issues/1880) - lower the gas price multiplier for EVM chains. +* [1883](https://github.com/zeta-chain/node/issues/1883) - zetaclient should check 'IsSupported' flag to pause/unpause a specific chain * [1633](https://github.com/zeta-chain/node/issues/1633) - zetaclient should be able to pick up new connector and erc20Custody addresses * [1944](https://github.com/zeta-chain/node/pull/1944) - fix evm signer unit tests From d9247777626622207f951459e486cf00208d0328 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Mon, 1 Apr 2024 16:44:33 -0500 Subject: [PATCH 3/5] skip keysign if chain is not supported --- zetaclient/zetacore_observer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/zetaclient/zetacore_observer.go b/zetaclient/zetacore_observer.go index 8ec76013a5..34d0774507 100644 --- a/zetaclient/zetacore_observer.go +++ b/zetaclient/zetacore_observer.go @@ -154,6 +154,10 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) { co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId) continue } + if !ob.GetChainParams().IsSupported { + co.logger.ZetaChainWatcher.Info().Msgf("startCctxScheduler: chain %d is not supported", c.ChainId) + continue + } cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId) if err != nil { From 8b540783471f1309881e8b274ba4750626bb3842 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Tue, 2 Apr 2024 11:52:41 -0500 Subject: [PATCH 4/5] make log printing less verbose on unsupported chain or invalid chain params --- zetaclient/zetabridge/zetacore_bridge.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zetaclient/zetabridge/zetacore_bridge.go b/zetaclient/zetabridge/zetacore_bridge.go index 5627b6b34c..d8a29fa665 100644 --- a/zetaclient/zetabridge/zetacore_bridge.go +++ b/zetaclient/zetabridge/zetacore_bridge.go @@ -220,14 +220,15 @@ func (b *ZetaCoreBridge) UpdateZetaCoreContext(coreContext *corecontext.ZetaCore var newBTCParams *observertypes.ChainParams // check and update chain params for each chain + sampledLogger := b.logger.Sample(&zerolog.BasicSampler{N: 10}) for _, chainParam := range chainParams { if !chainParam.GetIsSupported() { - b.logger.Info().Msgf("Chain %d is not supported yet", chainParam.ChainId) + sampledLogger.Info().Msgf("Chain %d is not supported yet", chainParam.ChainId) continue } err := observertypes.ValidateChainParams(chainParam) if err != nil { - b.logger.Warn().Err(err).Msgf("Invalid chain params for chain %d", chainParam.ChainId) + sampledLogger.Warn().Err(err).Msgf("Invalid chain params for chain %d", chainParam.ChainId) continue } if chains.IsBitcoinChain(chainParam.ChainId) { From 73efa77d8021c8fd1c7b344f5ffe5b09b63d8a31 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Wed, 3 Apr 2024 15:10:36 -0500 Subject: [PATCH 5/5] added comments for each observation go routine --- zetaclient/bitcoin/bitcoin_client.go | 15 ++++++++------- zetaclient/bitcoin/inbound_tracker.go | 1 + zetaclient/evm/evm_client.go | 20 ++++++++++---------- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/zetaclient/bitcoin/bitcoin_client.go b/zetaclient/bitcoin/bitcoin_client.go index 095f805519..269ef64d4c 100644 --- a/zetaclient/bitcoin/bitcoin_client.go +++ b/zetaclient/bitcoin/bitcoin_client.go @@ -216,12 +216,12 @@ func NewBitcoinClient( func (ob *BTCChainClient) Start() { ob.logger.Chain.Info().Msgf("BitcoinChainClient is starting") - go ob.WatchInTx() - go ob.WatchOutTx() - go ob.WatchUTXOS() - go ob.WatchGasPrice() - go ob.WatchIntxTracker() - go ob.WatchRPCStatus() + go ob.WatchInTx() // watch bitcoin chain for incoming txs and post votes to zetacore + go ob.WatchOutTx() // watch bitcoin chain for outgoing txs status + go ob.WatchUTXOS() // watch bitcoin chain for UTXOs owned by the TSS address + go ob.WatchGasPrice() // watch bitcoin chain for gas rate and post to zetacore + go ob.WatchIntxTracker() // watch zetacore for bitcoin intx trackers + go ob.WatchRPCStatus() // watch the RPC status of the bitcoin chain } // WatchRPCStatus watches the RPC status of the Bitcoin chain @@ -580,7 +580,7 @@ func (ob *BTCChainClient) IsSendOutTxProcessed(cctx *types.CrossChainTx, logger return true, true, nil } -// WatchGasPrice watches Bitcoin gas rate and post to zetacore +// WatchGasPrice watches Bitcoin chain for gas rate and post to zetacore func (ob *BTCChainClient) WatchGasPrice() { ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchGasPrice", ob.GetChainParams().GasPriceTicker) if err != nil { @@ -822,6 +822,7 @@ func GetBtcEvent( return nil, nil } +// WatchUTXOS watches bitcoin chain for UTXOs owned by the TSS address func (ob *BTCChainClient) WatchUTXOS() { ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchUTXOS", ob.GetChainParams().WatchUtxoTicker) if err != nil { diff --git a/zetaclient/bitcoin/inbound_tracker.go b/zetaclient/bitcoin/inbound_tracker.go index 2fc684f054..4bf2b5b8ff 100644 --- a/zetaclient/bitcoin/inbound_tracker.go +++ b/zetaclient/bitcoin/inbound_tracker.go @@ -10,6 +10,7 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/zetabridge" ) +// WatchIntxTracker watches zetacore for bitcoin intx trackers func (ob *BTCChainClient) WatchIntxTracker() { ticker, err := types.NewDynamicTicker("Bitcoin_WatchIntxTracker", ob.GetChainParams().InTxTicker) if err != nil { diff --git a/zetaclient/evm/evm_client.go b/zetaclient/evm/evm_client.go index 889ba30756..f94536be25 100644 --- a/zetaclient/evm/evm_client.go +++ b/zetaclient/evm/evm_client.go @@ -252,16 +252,16 @@ func FetchERC20CustodyContract(addr ethcommon.Address, client interfaces.EVMRPCC return erc20custody.NewERC20Custody(addr, client) } -// Start all observation routines for the external chain +// Start all observation routines for the evm chain func (ob *ChainClient) Start() { - go ob.WatchIntxTracker() - go ob.WatchInTx() - go ob.WatchGasPrice() - go ob.WatchOutTx() - go ob.WatchRPCStatus() + go ob.WatchInTx() // watch evm chain for incoming txs and post votes to zetacore + go ob.WatchOutTx() // watch evm chain for outgoing txs status + go ob.WatchGasPrice() // watch evm chain for gas prices and post to zetacore + go ob.WatchIntxTracker() // watch zetacore for intx trackers + go ob.WatchRPCStatus() // watch the RPC status of the evm chain } -// WatchRPCStatus watches the RPC status of the external chain +// WatchRPCStatus watches the RPC status of the evm chain func (ob *ChainClient) WatchRPCStatus() { ob.logger.Chain.Info().Msgf("Starting RPC status check for chain %s", ob.chain.String()) ticker := time.NewTicker(60 * time.Second) @@ -611,7 +611,7 @@ func (ob *ChainClient) IsSendOutTxProcessed(cctx *crosschaintypes.CrossChainTx, return false, false, nil } -// WatchOutTx watches external chain for outgoing txs status +// WatchOutTx watches evm chain for outgoing txs status func (ob *ChainClient) WatchOutTx() { // read env variables if set timeoutNonce, err := strconv.Atoi(os.Getenv("OS_TIMEOUT_NONCE")) @@ -840,7 +840,7 @@ func (ob *ChainClient) GetLastBlockHeight() uint64 { return height } -// WatchInTx watches external chain for incoming txs and post votes to zetacore +// WatchInTx watches evm chain for incoming txs and post votes to zetacore func (ob *ChainClient) WatchInTx() { ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchInTx_%d", ob.chain.ChainId), ob.GetChainParams().InTxTicker) if err != nil { @@ -1163,7 +1163,7 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags obse return toBlock } -// WatchGasPrice watches external chain for gas prices and post to zetacore +// WatchGasPrice watches evm chain for gas prices and post to zetacore func (ob *ChainClient) WatchGasPrice() { ob.logger.GasPrice.Info().Msg("WatchGasPrice starting...") err := ob.PostGasPrice()