diff --git a/changelog.md b/changelog.md index 9a2ecdddfe..fe9d1d9cbd 100644 --- a/changelog.md +++ b/changelog.md @@ -59,6 +59,7 @@ * [1861](https://github.com/zeta-chain/node/pull/1861) - fix `ObserverSlashAmount` invalid read * [1880](https://github.com/zeta-chain/node/issues/1880) - lower the gas price multiplier for EVM chains. +* [1883](https://github.com/zeta-chain/node/issues/1883) - zetaclient should check 'IsSupported' flag to pause/unpause a specific chain * [1633](https://github.com/zeta-chain/node/issues/1633) - zetaclient should be able to pick up new connector and erc20Custody addresses * [1944](https://github.com/zeta-chain/node/pull/1944) - fix evm signer unit tests * [1888](https://github.com/zeta-chain/node/issues/1888) - zetaclient should stop inbound/outbound txs according to cross-chain flags diff --git a/cmd/zetaclientd/utils.go b/cmd/zetaclientd/utils.go index 1ea6eb8c5b..a9fc63e1f8 100644 --- a/cmd/zetaclientd/utils.go +++ b/cmd/zetaclientd/utils.go @@ -67,9 +67,6 @@ func CreateSignerMap( loggers.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String()) continue } - if !evmChainParams.IsSupported { - continue - } mpiAddress := ethcommon.HexToAddress(evmChainParams.ConnectorContractAddress) erc20CustodyAddress := ethcommon.HexToAddress(evmChainParams.Erc20CustodyContractAddress) signer, err := evm.NewEVMSigner( @@ -117,14 +114,11 @@ func CreateChainClientMap( if evmConfig.Chain.IsZetaChain() { continue } - evmChainParams, found := appContext.ZetaCoreContext().GetEVMChainParams(evmConfig.Chain.ChainId) + _, found := appContext.ZetaCoreContext().GetEVMChainParams(evmConfig.Chain.ChainId) if !found { loggers.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String()) continue } - if !evmChainParams.IsSupported { - continue - } co, err := evm.NewEVMChainClient(appContext, bridge, tss, dbpath, loggers, evmConfig, ts) if err != nil { loggers.Std.Error().Err(err).Msgf("NewEVMChainClient error for chain %s", evmConfig.Chain.String()) diff --git a/zetaclient/bitcoin/bitcoin_client.go b/zetaclient/bitcoin/bitcoin_client.go index 26acc1239c..859f5e7c49 100644 --- a/zetaclient/bitcoin/bitcoin_client.go +++ b/zetaclient/bitcoin/bitcoin_client.go @@ -45,19 +45,22 @@ import ( ) const ( - // The starting height (Bitcoin mainnet) from which dynamic depositor fee will take effect - DynamicDepositorFeeHeight = 834500 + DynamicDepositorFeeHeight = 834500 // The starting height (Bitcoin mainnet) from which dynamic depositor fee will take effect + maxHeightDiff = 10000 // in case the last block is too old when the observer starts + btcBlocksPerDay = 144 // for LRU block cache size + bigValueSats = 200000000 // 2 BTC + bigValueConfirmationCount = 6 // 6 confirmations for value >= 2 BTC ) var _ interfaces.ChainClient = &BTCChainClient{} type BTCLog struct { - ChainLogger zerolog.Logger - WatchInTx zerolog.Logger - ObserveOutTx zerolog.Logger - WatchUTXOS zerolog.Logger - WatchGasPrice zerolog.Logger - Compliance zerolog.Logger + Chain zerolog.Logger // The parent logger for the chain + InTx zerolog.Logger // The logger for incoming transactions + OutTx zerolog.Logger // The logger for outgoing transactions + UTXOS zerolog.Logger // The logger for UTXOs management + GasPrice zerolog.Logger // The logger for gas price + Compliance zerolog.Logger // The logger for compliance checks } // BTCChainClient represents a chain configuration for Bitcoin @@ -89,27 +92,21 @@ type BTCChainClient struct { BlockCache *lru.Cache } -const ( - maxHeightDiff = 10000 // in case the last block is too old when the observer starts - btcBlocksPerDay = 144 // for LRU block cache size - bigValueSats = 200000000 // 2 BTC - bigValueConfirmationCount = 6 // 6 confirmations for value >= 2 BTC -) - func (ob *BTCChainClient) WithZetaClient(bridge *zetabridge.ZetaCoreBridge) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.zetaClient = bridge } + func (ob *BTCChainClient) WithLogger(logger zerolog.Logger) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.logger = BTCLog{ - ChainLogger: logger, - WatchInTx: logger.With().Str("module", "WatchInTx").Logger(), - ObserveOutTx: logger.With().Str("module", "observeOutTx").Logger(), - WatchUTXOS: logger.With().Str("module", "WatchUTXOS").Logger(), - WatchGasPrice: logger.With().Str("module", "WatchGasPrice").Logger(), + Chain: logger, + InTx: logger.With().Str("module", "WatchInTx").Logger(), + OutTx: logger.With().Str("module", "WatchOutTx").Logger(), + UTXOS: logger.With().Str("module", "WatchUTXOS").Logger(), + GasPrice: logger.With().Str("module", "WatchGasPrice").Logger(), } } @@ -161,12 +158,12 @@ func NewBitcoinClient( ob.Mu = &sync.Mutex{} chainLogger := loggers.Std.With().Str("chain", chain.ChainName.String()).Logger() ob.logger = BTCLog{ - ChainLogger: chainLogger, - WatchInTx: chainLogger.With().Str("module", "WatchInTx").Logger(), - ObserveOutTx: chainLogger.With().Str("module", "observeOutTx").Logger(), - WatchUTXOS: chainLogger.With().Str("module", "WatchUTXOS").Logger(), - WatchGasPrice: chainLogger.With().Str("module", "WatchGasPrice").Logger(), - Compliance: loggers.Compliance, + Chain: chainLogger, + InTx: chainLogger.With().Str("module", "WatchInTx").Logger(), + OutTx: chainLogger.With().Str("module", "WatchOutTx").Logger(), + UTXOS: chainLogger.With().Str("module", "WatchUTXOS").Logger(), + GasPrice: chainLogger.With().Str("module", "WatchGasPrice").Logger(), + Compliance: loggers.Compliance, } ob.zetaClient = bridge @@ -181,7 +178,7 @@ func NewBitcoinClient( } ob.params = *chainParams // initialize the Client - ob.logger.ChainLogger.Info().Msgf("Chain %s endpoint %s", ob.chain.String(), btcCfg.RPCHost) + ob.logger.Chain.Info().Msgf("Chain %s endpoint %s", ob.chain.String(), btcCfg.RPCHost) connCfg := &rpcclient.ConnConfig{ Host: btcCfg.RPCHost, User: btcCfg.RPCUsername, @@ -202,7 +199,7 @@ func NewBitcoinClient( ob.BlockCache, err = lru.New(btcBlocksPerDay) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("failed to create bitcoin block cache") + ob.logger.Chain.Error().Err(err).Msg("failed to create bitcoin block cache") return nil, err } @@ -216,55 +213,58 @@ func NewBitcoinClient( } func (ob *BTCChainClient) Start() { - ob.logger.ChainLogger.Info().Msgf("BitcoinChainClient is starting") - go ob.WatchInTx() - go ob.observeOutTx() - go ob.WatchUTXOS() - go ob.WatchGasPrice() - go ob.ExternalChainWatcherForNewInboundTrackerSuggestions() - go ob.RPCStatus() + ob.logger.Chain.Info().Msgf("BitcoinChainClient is starting") + go ob.WatchInTx() // watch bitcoin chain for incoming txs and post votes to zetacore + go ob.WatchOutTx() // watch bitcoin chain for outgoing txs status + go ob.WatchUTXOS() // watch bitcoin chain for UTXOs owned by the TSS address + go ob.WatchGasPrice() // watch bitcoin chain for gas rate and post to zetacore + go ob.WatchIntxTracker() // watch zetacore for bitcoin intx trackers + go ob.WatchRPCStatus() // watch the RPC status of the bitcoin chain } -func (ob *BTCChainClient) RPCStatus() { - ob.logger.ChainLogger.Info().Msgf("RPCStatus is starting") +// WatchRPCStatus watches the RPC status of the Bitcoin chain +func (ob *BTCChainClient) WatchRPCStatus() { + ob.logger.Chain.Info().Msgf("RPCStatus is starting") ticker := time.NewTicker(60 * time.Second) for { select { case <-ticker.C: - //ob.logger.ChainLogger.Info().Msgf("RPCStatus is running") + if !ob.GetChainParams().IsSupported { + continue + } bn, err := ob.rpcClient.GetBlockCount() if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC status check: RPC down? ") + ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") continue } hash, err := ob.rpcClient.GetBlockHash(bn) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC status check: RPC down? ") + ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") continue } header, err := ob.rpcClient.GetBlockHeader(hash) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC status check: RPC down? ") + ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") continue } blockTime := header.Timestamp elapsedSeconds := time.Since(blockTime).Seconds() if elapsedSeconds > 1200 { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC status check: RPC down? ") + ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") continue } tssAddr := ob.Tss.BTCAddressWitnessPubkeyHash() res, err := ob.rpcClient.ListUnspentMinMaxAddresses(0, 1000000, []btcutil.Address{tssAddr}) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC status check: can't list utxos of TSS address; wallet or loaded? TSS address is not imported? ") + ob.logger.Chain.Error().Err(err).Msg("RPC status check: can't list utxos of TSS address; wallet or loaded? TSS address is not imported? ") continue } if len(res) == 0 { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC status check: TSS address has no utxos; TSS address is not imported? ") + ob.logger.Chain.Error().Err(err).Msg("RPC status check: TSS address has no utxos; TSS address is not imported? ") continue } - ob.logger.ChainLogger.Info().Msgf("[OK] RPC status check: latest block number %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedSeconds, tssAddr, len(res)) + ob.logger.Chain.Info().Msgf("[OK] RPC status check: latest block number %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedSeconds, tssAddr, len(res)) case <-ob.stop: return @@ -273,9 +273,9 @@ func (ob *BTCChainClient) RPCStatus() { } func (ob *BTCChainClient) Stop() { - ob.logger.ChainLogger.Info().Msgf("ob %s is stopping", ob.chain.String()) + ob.logger.Chain.Info().Msgf("ob %s is stopping", ob.chain.String()) close(ob.stop) // this notifies all goroutines to stop - ob.logger.ChainLogger.Info().Msgf("%s observer stopped", ob.chain.String()) + ob.logger.Chain.Info().Msgf("%s observer stopped", ob.chain.String()) } func (ob *BTCChainClient) SetLastBlockHeight(height int64) { @@ -322,34 +322,39 @@ func (ob *BTCChainClient) GetBaseGasPrice() *big.Int { return big.NewInt(0) } +// WatchInTx watches Bitcoin chain for incoming txs and post votes to zetacore func (ob *BTCChainClient) WatchInTx() { ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchInTx", ob.GetChainParams().InTxTicker) if err != nil { - ob.logger.WatchInTx.Error().Err(err).Msg("WatchInTx error") + ob.logger.InTx.Error().Err(err).Msg("error creating ticker") return } defer ticker.Stop() + ob.logger.InTx.Info().Msgf("WatchInTx started for chain %d", ob.chain.ChainId) for { select { case <-ticker.C(): if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled { continue } + if !ob.GetChainParams().IsSupported { + continue + } err := ob.ObserveInTx() if err != nil { - ob.logger.WatchInTx.Error().Err(err).Msg("WatchInTx error observing in tx") + ob.logger.InTx.Error().Err(err).Msg("WatchInTx error observing in tx") } - ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.WatchInTx) + ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.InTx) case <-ob.stop: - ob.logger.WatchInTx.Info().Msg("WatchInTx stopped") + ob.logger.InTx.Info().Msgf("WatchInTx stopped for chain %d", ob.chain.ChainId) return } } } func (ob *BTCChainClient) postBlockHeader(tip int64) error { - ob.logger.WatchInTx.Info().Msgf("postBlockHeader: tip %d", tip) + ob.logger.InTx.Info().Msgf("postBlockHeader: tip %d", tip) bn := tip res, err := ob.zetaClient.GetBlockHeaderStateByChain(ob.chain.ChainId) if err == nil && res.BlockHeaderState != nil && res.BlockHeaderState.EarliestHeight > 0 { @@ -366,7 +371,7 @@ func (ob *BTCChainClient) postBlockHeader(tip int64) error { var headerBuf bytes.Buffer err = res2.Header.Serialize(&headerBuf) if err != nil { // should never happen - ob.logger.WatchInTx.Error().Err(err).Msgf("error serializing bitcoin block header: %d", bn) + ob.logger.InTx.Error().Err(err).Msgf("error serializing bitcoin block header: %d", bn) return err } blockHash := res2.Header.BlockHash() @@ -376,9 +381,9 @@ func (ob *BTCChainClient) postBlockHeader(tip int64) error { res2.Block.Height, proofs.NewBitcoinHeader(headerBuf.Bytes()), ) - ob.logger.WatchInTx.Info().Msgf("posted block header %d: %s", bn, blockHash) + ob.logger.InTx.Info().Msgf("posted block header %d: %s", bn, blockHash) if err != nil { // error shouldn't block the process - ob.logger.WatchInTx.Error().Err(err).Msgf("error posting bitcoin block header: %d", bn) + ob.logger.InTx.Error().Err(err).Msgf("error posting bitcoin block header: %d", bn) } return err } @@ -415,18 +420,18 @@ func (ob *BTCChainClient) ObserveInTx() error { bn := lastScanned + 1 res, err := ob.GetBlockByNumberCached(bn) if err != nil { - ob.logger.WatchInTx.Error().Err(err).Msgf("observeInTxBTC: error getting bitcoin block %d", bn) + ob.logger.InTx.Error().Err(err).Msgf("observeInTxBTC: error getting bitcoin block %d", bn) return err } - ob.logger.WatchInTx.Info().Msgf("observeInTxBTC: block %d has %d txs, current block %d, last block %d", + ob.logger.InTx.Info().Msgf("observeInTxBTC: block %d has %d txs, current block %d, last block %d", bn, len(res.Block.Tx), cnt, lastScanned) // print some debug information if len(res.Block.Tx) > 1 { for idx, tx := range res.Block.Tx { - ob.logger.WatchInTx.Debug().Msgf("BTC InTX | %d: %s\n", idx, tx.Txid) + ob.logger.InTx.Debug().Msgf("BTC InTX | %d: %s\n", idx, tx.Txid) for vidx, vout := range tx.Vout { - ob.logger.WatchInTx.Debug().Msgf("vout %d \n value: %v\n scriptPubKey: %v\n", vidx, vout.Value, vout.ScriptPubKey.Hex) + ob.logger.InTx.Debug().Msgf("vout %d \n value: %v\n scriptPubKey: %v\n", vidx, vout.Value, vout.ScriptPubKey.Hex) } } } @@ -436,13 +441,13 @@ func (ob *BTCChainClient) ObserveInTx() error { if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled { err = ob.postBlockHeader(bn) if err != nil { - ob.logger.WatchInTx.Warn().Err(err).Msgf("observeInTxBTC: error posting block header %d", bn) + ob.logger.InTx.Warn().Err(err).Msgf("observeInTxBTC: error posting block header %d", bn) } } if len(res.Block.Tx) > 1 { // get depositor fee - depositorFee := CalcDepositorFee(res.Block, ob.chain.ChainId, ob.netParams, ob.logger.WatchInTx) + depositorFee := CalcDepositorFee(res.Block, ob.chain.ChainId, ob.netParams, ob.logger.InTx) // filter incoming txs to TSS address tssAddress := ob.Tss.BTCAddress() @@ -451,7 +456,7 @@ func (ob *BTCChainClient) ObserveInTx() error { res.Block.Tx, uint64(res.Block.Height), tssAddress, - &ob.logger.WatchInTx, + &ob.logger.InTx, ob.netParams, depositorFee, ) @@ -462,10 +467,10 @@ func (ob *BTCChainClient) ObserveInTx() error { if msg != nil { zetaHash, ballot, err := ob.zetaClient.PostVoteInbound(zetabridge.PostVoteInboundGasLimit, zetabridge.PostVoteInboundExecutionGasLimit, msg) if err != nil { - ob.logger.WatchInTx.Error().Err(err).Msgf("observeInTxBTC: error posting to zeta core for tx %s", inTx.TxHash) + ob.logger.InTx.Error().Err(err).Msgf("observeInTxBTC: error posting to zeta core for tx %s", inTx.TxHash) return err // we have to re-scan this block next time } else if zetaHash != "" { - ob.logger.WatchInTx.Info().Msgf("observeInTxBTC: PostVoteInbound zeta tx hash: %s inTx %s ballot %s fee %v", + ob.logger.InTx.Info().Msgf("observeInTxBTC: PostVoteInbound zeta tx hash: %s inTx %s ballot %s fee %v", zetaHash, inTx.TxHash, ballot, depositorFee) } } @@ -476,7 +481,7 @@ func (ob *BTCChainClient) ObserveInTx() error { ob.SetLastBlockHeightScanned(bn) // #nosec G701 always positive if err := ob.db.Save(clienttypes.ToLastBlockSQLType(uint64(bn))).Error; err != nil { - ob.logger.WatchInTx.Error().Err(err).Msgf("observeInTxBTC: error writing last scanned block %d to db", bn) + ob.logger.InTx.Error().Err(err).Msgf("observeInTxBTC: error writing last scanned block %d to db", bn) } } @@ -528,7 +533,7 @@ func (ob *BTCChainClient) IsSendOutTxProcessed(cctx *types.CrossChainTx, logger if txResult == nil { // check failed, try again next time return false, false, nil } else if inMempool { // still in mempool (should avoid unnecessary Tss keysign) - ob.logger.ObserveOutTx.Info().Msgf("IsSendOutTxProcessed: outTx %s is still in mempool", outTxID) + ob.logger.OutTx.Info().Msgf("IsSendOutTxProcessed: outTx %s is still in mempool", outTxID) return true, false, nil } // included @@ -539,7 +544,7 @@ func (ob *BTCChainClient) IsSendOutTxProcessed(cctx *types.CrossChainTx, logger if res == nil { return false, false, nil } - ob.logger.ObserveOutTx.Info().Msgf("IsSendOutTxProcessed: setIncludedTx succeeded for outTx %s", outTxID) + ob.logger.OutTx.Info().Msgf("IsSendOutTxProcessed: setIncludedTx succeeded for outTx %s", outTxID) } // It's safe to use cctx's amount to post confirmation because it has already been verified in observeOutTx() @@ -571,33 +576,37 @@ func (ob *BTCChainClient) IsSendOutTxProcessed(cctx *types.CrossChainTx, logger return true, true, nil } +// WatchGasPrice watches Bitcoin chain for gas rate and post to zetacore func (ob *BTCChainClient) WatchGasPrice() { // report gas price right away as the ticker takes time to kick in err := ob.PostGasPrice() if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) } // start gas price ticker ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchGasPrice", ob.GetChainParams().GasPriceTicker) if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msg("WatchGasPrice error") + ob.logger.GasPrice.Error().Err(err).Msg("error creating ticker") return } - ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d", + ob.logger.GasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d", ob.chain.ChainId, ob.GetChainParams().GasPriceTicker) defer ticker.Stop() for { select { case <-ticker.C(): + if !ob.GetChainParams().IsSupported { + continue + } err := ob.PostGasPrice() if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) } - ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice) + ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.GasPrice) case <-ob.stop: - ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice stopped") + ob.logger.GasPrice.Info().Msgf("WatchGasPrice stopped for chain %d", ob.chain.ChainId) return } } @@ -612,7 +621,7 @@ func (ob *BTCChainClient) PostGasPrice() error { // #nosec G701 always in range zetaHash, err := ob.zetaClient.PostGasPrice(ob.chain, 1, "100", uint64(bn)) if err != nil { - ob.logger.WatchGasPrice.Err(err).Msg("PostGasPrice:") + ob.logger.GasPrice.Err(err).Msg("PostGasPrice:") return err } _ = zetaHash @@ -638,7 +647,7 @@ func (ob *BTCChainClient) PostGasPrice() error { // #nosec G701 always positive zetaHash, err := ob.zetaClient.PostGasPrice(ob.chain, feeRatePerByte.Uint64(), "100", uint64(bn)) if err != nil { - ob.logger.WatchGasPrice.Err(err).Msg("PostGasPrice:") + ob.logger.GasPrice.Err(err).Msg("PostGasPrice:") return err } _ = zetaHash @@ -685,7 +694,7 @@ func FilterAndParseIncomingTx( } func (ob *BTCChainClient) GetInboundVoteMessageFromBtcEvent(inTx *BTCInTxEvnet) *types.MsgVoteOnObservedInboundTx { - ob.logger.WatchInTx.Debug().Msgf("Processing inTx: %s", inTx.TxHash) + ob.logger.InTx.Debug().Msgf("Processing inTx: %s", inTx.TxHash) amount := big.NewFloat(inTx.Value) amount = amount.Mul(amount, big.NewFloat(1e8)) amountInt, _ := amount.Int(nil) @@ -722,7 +731,7 @@ func (ob *BTCChainClient) IsInTxRestricted(inTx *BTCInTxEvnet) bool { receiver = parsedAddress.Hex() } if config.ContainRestrictedAddress(inTx.FromAddress, receiver) { - compliance.PrintComplianceLog(ob.logger.WatchInTx, ob.logger.Compliance, + compliance.PrintComplianceLog(ob.logger.InTx, ob.logger.Compliance, false, ob.chain.ChainId, inTx.TxHash, inTx.FromAddress, receiver, "BTC") return true } @@ -818,10 +827,11 @@ func GetBtcEvent( return nil, nil } +// WatchUTXOS watches bitcoin chain for UTXOs owned by the TSS address func (ob *BTCChainClient) WatchUTXOS() { ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchUTXOS", ob.GetChainParams().WatchUtxoTicker) if err != nil { - ob.logger.WatchUTXOS.Error().Err(err).Msg("WatchUTXOS error") + ob.logger.UTXOS.Error().Err(err).Msg("error creating ticker") return } @@ -829,13 +839,16 @@ func (ob *BTCChainClient) WatchUTXOS() { for { select { case <-ticker.C(): + if !ob.GetChainParams().IsSupported { + continue + } err := ob.FetchUTXOS() if err != nil { - ob.logger.WatchUTXOS.Error().Err(err).Msg("error fetching btc utxos") + ob.logger.UTXOS.Error().Err(err).Msg("error fetching btc utxos") } - ticker.UpdateInterval(ob.GetChainParams().WatchUtxoTicker, ob.logger.WatchUTXOS) + ticker.UpdateInterval(ob.GetChainParams().WatchUtxoTicker, ob.logger.UTXOS) case <-ob.stop: - ob.logger.WatchUTXOS.Info().Msg("WatchUTXOS stopped") + ob.logger.UTXOS.Info().Msgf("WatchUTXOS stopped for chain %d", ob.chain.ChainId) return } } @@ -844,7 +857,7 @@ func (ob *BTCChainClient) WatchUTXOS() { func (ob *BTCChainClient) FetchUTXOS() error { defer func() { if err := recover(); err != nil { - ob.logger.WatchUTXOS.Error().Msgf("BTC fetchUTXOS: caught panic error: %v", err) + ob.logger.UTXOS.Error().Msgf("BTC fetchUTXOS: caught panic error: %v", err) } }() @@ -918,7 +931,7 @@ func (ob *BTCChainClient) refreshPendingNonce() { // get pending nonces from zetabridge p, err := ob.zetaClient.GetPendingNoncesByChain(ob.chain.ChainId) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("refreshPendingNonce: error getting pending nonces") + ob.logger.Chain.Error().Err(err).Msg("refreshPendingNonce: error getting pending nonces") } // increase pending nonce if lagged behind @@ -932,14 +945,14 @@ func (ob *BTCChainClient) refreshPendingNonce() { // get the last included outTx hash txid, err := ob.getOutTxidByNonce(nonceLow-1, false) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("refreshPendingNonce: error getting last outTx txid") + ob.logger.Chain.Error().Err(err).Msg("refreshPendingNonce: error getting last outTx txid") } // set 'NonceLow' as the new pending nonce ob.Mu.Lock() defer ob.Mu.Unlock() ob.pendingNonce = nonceLow - ob.logger.ChainLogger.Info().Msgf("refreshPendingNonce: increase pending nonce to %d with txid %s", ob.pendingNonce, txid) + ob.logger.Chain.Info().Msgf("refreshPendingNonce: increase pending nonce to %d with txid %s", ob.pendingNonce, txid) } } @@ -979,10 +992,10 @@ func (ob *BTCChainClient) findNonceMarkUTXO(nonce uint64, txid string) (int, err for i, utxo := range ob.utxos { sats, err := GetSatoshis(utxo.Amount) if err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msgf("findNonceMarkUTXO: error getting satoshis for utxo %v", utxo) + ob.logger.OutTx.Error().Err(err).Msgf("findNonceMarkUTXO: error getting satoshis for utxo %v", utxo) } if utxo.Address == tssAddress && sats == amount && utxo.TxID == txid { - ob.logger.ObserveOutTx.Info().Msgf("findNonceMarkUTXO: found nonce-mark utxo with txid %s, amount %d satoshi", utxo.TxID, sats) + ob.logger.OutTx.Info().Msgf("findNonceMarkUTXO: found nonce-mark utxo with txid %s, amount %d satoshi", utxo.TxID, sats) return i, nil } } @@ -1084,15 +1097,16 @@ func (ob *BTCChainClient) SaveBroadcastedTx(txHash string, nonce uint64) { broadcastEntry := clienttypes.ToOutTxHashSQLType(txHash, outTxID) if err := ob.db.Save(&broadcastEntry).Error; err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msgf("SaveBroadcastedTx: error saving broadcasted txHash %s for outTx %s", txHash, outTxID) + ob.logger.OutTx.Error().Err(err).Msgf("SaveBroadcastedTx: error saving broadcasted txHash %s for outTx %s", txHash, outTxID) } - ob.logger.ObserveOutTx.Info().Msgf("SaveBroadcastedTx: saved broadcasted txHash %s for outTx %s", txHash, outTxID) + ob.logger.OutTx.Info().Msgf("SaveBroadcastedTx: saved broadcasted txHash %s for outTx %s", txHash, outTxID) } -func (ob *BTCChainClient) observeOutTx() { - ticker, err := clienttypes.NewDynamicTicker("Bitcoin_observeOutTx", ob.GetChainParams().OutTxTicker) +// WatchOutTx watches Bitcoin chain for outgoing txs status +func (ob *BTCChainClient) WatchOutTx() { + ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchOutTx", ob.GetChainParams().OutTxTicker) if err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msg("observeOutTx: error creating ticker") + ob.logger.OutTx.Error().Err(err).Msg("error creating ticker ") return } @@ -1103,9 +1117,12 @@ func (ob *BTCChainClient) observeOutTx() { if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled { continue } + if !ob.GetChainParams().IsSupported { + continue + } trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending) if err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msg("observeOutTx: error GetAllOutTxTrackerByChain") + ob.logger.OutTx.Error().Err(err).Msgf("WatchOutTx: error GetAllOutTxTrackerByChain for chain %d", ob.chain.ChainId) continue } for _, tracker := range trackers { @@ -1113,16 +1130,16 @@ func (ob *BTCChainClient) observeOutTx() { outTxID := ob.GetTxID(tracker.Nonce) cctx, err := ob.zetaClient.GetCctxByNonce(ob.chain.ChainId, tracker.Nonce) if err != nil { - ob.logger.ObserveOutTx.Info().Err(err).Msgf("observeOutTx: can't find cctx for nonce %d", tracker.Nonce) + ob.logger.OutTx.Info().Err(err).Msgf("WatchOutTx: can't find cctx for chain %d nonce %d", ob.chain.ChainId, tracker.Nonce) break } nonce := cctx.GetCurrentOutTxParam().OutboundTxTssNonce if tracker.Nonce != nonce { // Tanmay: it doesn't hurt to check - ob.logger.ObserveOutTx.Error().Msgf("observeOutTx: tracker nonce %d not match cctx nonce %d", tracker.Nonce, nonce) + ob.logger.OutTx.Error().Msgf("WatchOutTx: tracker nonce %d not match cctx nonce %d", tracker.Nonce, nonce) break } if len(tracker.HashList) > 1 { - ob.logger.ObserveOutTx.Warn().Msgf("observeOutTx: oops, outTxID %s got multiple (%d) outTx hashes", outTxID, len(tracker.HashList)) + ob.logger.OutTx.Warn().Msgf("WatchOutTx: oops, outTxID %s got multiple (%d) outTx hashes", outTxID, len(tracker.HashList)) } // iterate over all txHashes to find the truly included one. // we do it this (inefficient) way because we don't rely on the first one as it may be a false positive (for unknown reason). @@ -1133,10 +1150,10 @@ func (ob *BTCChainClient) observeOutTx() { if result != nil && !inMempool { // included txCount++ txResult = result - ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: included outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, tracker.Nonce) + ob.logger.OutTx.Info().Msgf("WatchOutTx: included outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, tracker.Nonce) if txCount > 1 { - ob.logger.ObserveOutTx.Error().Msgf( - "observeOutTx: checkIncludedTx passed, txCount %d chain %d nonce %d result %v", txCount, ob.chain.ChainId, tracker.Nonce, result) + ob.logger.OutTx.Error().Msgf( + "WatchOutTx: checkIncludedTx passed, txCount %d chain %d nonce %d result %v", txCount, ob.chain.ChainId, tracker.Nonce, result) } } } @@ -1144,12 +1161,12 @@ func (ob *BTCChainClient) observeOutTx() { ob.setIncludedTx(tracker.Nonce, txResult) } else if txCount > 1 { ob.removeIncludedTx(tracker.Nonce) // we can't tell which txHash is true, so we remove all (if any) to be safe - ob.logger.ObserveOutTx.Error().Msgf("observeOutTx: included multiple (%d) outTx for chain %d nonce %d", txCount, ob.chain.ChainId, tracker.Nonce) + ob.logger.OutTx.Error().Msgf("WatchOutTx: included multiple (%d) outTx for chain %d nonce %d", txCount, ob.chain.ChainId, tracker.Nonce) } } - ticker.UpdateInterval(ob.GetChainParams().OutTxTicker, ob.logger.ObserveOutTx) + ticker.UpdateInterval(ob.GetChainParams().OutTxTicker, ob.logger.OutTx) case <-ob.stop: - ob.logger.ObserveOutTx.Info().Msg("observeOutTx stopped") + ob.logger.OutTx.Info().Msgf("WatchOutTx stopped for chain %d", ob.chain.ChainId) return } } @@ -1161,17 +1178,17 @@ func (ob *BTCChainClient) checkIncludedTx(cctx *types.CrossChainTx, txHash strin outTxID := ob.GetTxID(cctx.GetCurrentOutTxParam().OutboundTxTssNonce) hash, getTxResult, err := ob.GetTxResultByHash(txHash) if err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msgf("checkIncludedTx: error GetTxResultByHash: %s", txHash) + ob.logger.OutTx.Error().Err(err).Msgf("checkIncludedTx: error GetTxResultByHash: %s", txHash) return nil, false } if txHash != getTxResult.TxID { // just in case, we'll use getTxResult.TxID later - ob.logger.ObserveOutTx.Error().Msgf("checkIncludedTx: inconsistent txHash %s and getTxResult.TxID %s", txHash, getTxResult.TxID) + ob.logger.OutTx.Error().Msgf("checkIncludedTx: inconsistent txHash %s and getTxResult.TxID %s", txHash, getTxResult.TxID) return nil, false } if getTxResult.Confirmations >= 0 { // check included tx only err = ob.checkTssOutTxResult(cctx, hash, getTxResult) if err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msgf("checkIncludedTx: error verify bitcoin outTx %s outTxID %s", txHash, outTxID) + ob.logger.OutTx.Error().Err(err).Msgf("checkIncludedTx: error verify bitcoin outTx %s outTxID %s", txHash, outTxID) return nil, false } return getTxResult, false // included @@ -1194,16 +1211,16 @@ func (ob *BTCChainClient) setIncludedTx(nonce uint64, getTxResult *btcjson.GetTr if nonce >= ob.pendingNonce { // try increasing pending nonce on every newly included outTx ob.pendingNonce = nonce + 1 } - ob.logger.ObserveOutTx.Info().Msgf("setIncludedTx: included new bitcoin outTx %s outTxID %s pending nonce %d", txHash, outTxID, ob.pendingNonce) + ob.logger.OutTx.Info().Msgf("setIncludedTx: included new bitcoin outTx %s outTxID %s pending nonce %d", txHash, outTxID, ob.pendingNonce) } else if txHash == res.TxID { // found same hash. ob.includedTxResults[outTxID] = getTxResult // update tx result as confirmations may increase if getTxResult.Confirmations > res.Confirmations { - ob.logger.ObserveOutTx.Info().Msgf("setIncludedTx: bitcoin outTx %s got confirmations %d", txHash, getTxResult.Confirmations) + ob.logger.OutTx.Info().Msgf("setIncludedTx: bitcoin outTx %s got confirmations %d", txHash, getTxResult.Confirmations) } } else { // found other hash. // be alert for duplicate payment!!! As we got a new hash paying same cctx (for whatever reason). delete(ob.includedTxResults, outTxID) // we can't tell which txHash is true, so we remove all to be safe - ob.logger.ObserveOutTx.Error().Msgf("setIncludedTx: duplicate payment by bitcoin outTx %s outTxID %s, prior outTx %s", txHash, outTxID, res.TxID) + ob.logger.OutTx.Error().Msgf("setIncludedTx: duplicate payment by bitcoin outTx %s outTxID %s, prior outTx %s", txHash, outTxID, res.TxID) } } @@ -1413,7 +1430,7 @@ func (ob *BTCChainClient) checkTSSVoutCancelled(params *types.OutboundTxParams, func (ob *BTCChainClient) BuildBroadcastedTxMap() error { var broadcastedTransactions []clienttypes.OutTxHashSQLType if err := ob.db.Find(&broadcastedTransactions).Error; err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("error iterating over db") + ob.logger.Chain.Error().Err(err).Msg("error iterating over db") return err } for _, entry := range broadcastedTransactions { @@ -1431,7 +1448,7 @@ func (ob *BTCChainClient) LoadLastBlock() error { //Load persisted block number var lastBlockNum clienttypes.LastBlockSQLType if err := ob.db.First(&lastBlockNum, clienttypes.LastBlockNumID).Error; err != nil { - ob.logger.ChainLogger.Info().Msg("LastBlockNum not found in DB, scan from latest") + ob.logger.Chain.Info().Msg("LastBlockNum not found in DB, scan from latest") ob.SetLastBlockHeightScanned(bn) } else { // #nosec G701 always in range @@ -1440,7 +1457,7 @@ func (ob *BTCChainClient) LoadLastBlock() error { //If persisted block number is too low, use the latest height if (bn - lastBN) > maxHeightDiff { - ob.logger.ChainLogger.Info().Msgf("LastBlockNum too low: %d, scan from latest", lastBlockNum.Num) + ob.logger.Chain.Info().Msgf("LastBlockNum too low: %d, scan from latest", lastBlockNum.Num) ob.SetLastBlockHeightScanned(bn) } } @@ -1448,7 +1465,7 @@ func (ob *BTCChainClient) LoadLastBlock() error { if ob.chain.ChainId == 18444 { // bitcoin regtest: start from block 100 ob.SetLastBlockHeightScanned(100) } - ob.logger.ChainLogger.Info().Msgf("%s: start scanning from block %d", ob.chain.String(), ob.GetLastBlockHeightScanned()) + ob.logger.Chain.Info().Msgf("%s: start scanning from block %d", ob.chain.String(), ob.GetLastBlockHeightScanned()) return nil } diff --git a/zetaclient/bitcoin/inbound_tracker.go b/zetaclient/bitcoin/inbound_tracker.go index bcabf0c1a8..95a15a0bbe 100644 --- a/zetaclient/bitcoin/inbound_tracker.go +++ b/zetaclient/bitcoin/inbound_tracker.go @@ -10,10 +10,11 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/zetabridge" ) -func (ob *BTCChainClient) ExternalChainWatcherForNewInboundTrackerSuggestions() { - ticker, err := types.NewDynamicTicker("Bitcoin_WatchInTx_InboundTrackerSuggestions", ob.GetChainParams().InTxTicker) +// WatchIntxTracker watches zetacore for bitcoin intx trackers +func (ob *BTCChainClient) WatchIntxTracker() { + ticker, err := types.NewDynamicTicker("Bitcoin_WatchIntxTracker", ob.GetChainParams().InTxTicker) if err != nil { - ob.logger.WatchInTx.Err(err).Msg("error creating ticker") + ob.logger.InTx.Err(err).Msg("error creating ticker") return } @@ -24,13 +25,16 @@ func (ob *BTCChainClient) ExternalChainWatcherForNewInboundTrackerSuggestions() if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled { continue } + if !ob.GetChainParams().IsSupported { + continue + } err := ob.ObserveTrackerSuggestions() if err != nil { - ob.logger.WatchInTx.Error().Err(err).Msg("error observing in tx") + ob.logger.InTx.Error().Err(err).Msgf("error observing intx tracker for chain %d", ob.chain.ChainId) } - ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.WatchInTx) + ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.InTx) case <-ob.stop: - ob.logger.WatchInTx.Info().Msg("ExternalChainWatcher for BTC inboundTrackerSuggestions stopped") + ob.logger.InTx.Info().Msgf("WatchIntxTracker stopped for chain %d", ob.chain.ChainId) return } } @@ -42,12 +46,12 @@ func (ob *BTCChainClient) ObserveTrackerSuggestions() error { return err } for _, tracker := range trackers { - ob.logger.WatchInTx.Info().Msgf("checking tracker with hash :%s and coin-type :%s ", tracker.TxHash, tracker.CoinType) + ob.logger.InTx.Info().Msgf("checking tracker with hash :%s and coin-type :%s ", tracker.TxHash, tracker.CoinType) ballotIdentifier, err := ob.CheckReceiptForBtcTxHash(tracker.TxHash, true) if err != nil { return err } - ob.logger.WatchInTx.Info().Msgf("Vote submitted for inbound Tracker,Chain : %s,Ballot Identifier : %s, coin-type %s", ob.chain.ChainName, ballotIdentifier, coin.CoinType_Gas.String()) + ob.logger.InTx.Info().Msgf("Vote submitted for inbound Tracker,Chain : %s,Ballot Identifier : %s, coin-type %s", ob.chain.ChainName, ballotIdentifier, coin.CoinType_Gas.String()) } return nil } @@ -72,13 +76,13 @@ func (ob *BTCChainClient) CheckReceiptForBtcTxHash(txHash string, vote bool) (st if len(blockVb.Tx) <= 1 { return "", fmt.Errorf("block %d has no transactions", blockVb.Height) } - depositorFee := CalcDepositorFee(blockVb, ob.chain.ChainId, ob.netParams, ob.logger.WatchInTx) + depositorFee := CalcDepositorFee(blockVb, ob.chain.ChainId, ob.netParams, ob.logger.InTx) tss, err := ob.zetaClient.GetBtcTssAddress(ob.chain.ChainId) if err != nil { return "", err } // #nosec G701 always positive - event, err := GetBtcEvent(*tx, tss, uint64(blockVb.Height), &ob.logger.WatchInTx, ob.netParams, depositorFee) + event, err := GetBtcEvent(*tx, tss, uint64(blockVb.Height), &ob.logger.InTx, ob.netParams, depositorFee) if err != nil { return "", err } @@ -94,10 +98,10 @@ func (ob *BTCChainClient) CheckReceiptForBtcTxHash(txHash string, vote bool) (st } zetaHash, ballot, err := ob.zetaClient.PostVoteInbound(zetabridge.PostVoteInboundGasLimit, zetabridge.PostVoteInboundExecutionGasLimit, msg) if err != nil { - ob.logger.WatchInTx.Error().Err(err).Msg("error posting to zeta core") + ob.logger.InTx.Error().Err(err).Msg("error posting to zeta core") return "", err } else if zetaHash != "" { - ob.logger.WatchInTx.Info().Msgf("BTC deposit detected and reported: PostVoteInbound zeta tx hash: %s inTx %s ballot %s fee %v", + ob.logger.InTx.Info().Msgf("BTC deposit detected and reported: PostVoteInbound zeta tx hash: %s inTx %s ballot %s fee %v", zetaHash, txHash, ballot, depositorFee) } return msg.Digest(), nil diff --git a/zetaclient/evm/evm_client.go b/zetaclient/evm/evm_client.go index 069545541e..71741317fb 100644 --- a/zetaclient/evm/evm_client.go +++ b/zetaclient/evm/evm_client.go @@ -57,11 +57,11 @@ type OutTx struct { Nonce int64 } type Log struct { - ChainLogger zerolog.Logger // Parent logger - ExternalChainWatcher zerolog.Logger // Observes external Chains for incoming trasnactions - WatchGasPrice zerolog.Logger // Observes external Chains for Gas prices and posts to core - ObserveOutTx zerolog.Logger // Observes external Chains for outgoing transactions - Compliance zerolog.Logger // Compliance logger + Chain zerolog.Logger // The parent logger for the chain + InTx zerolog.Logger // Logger for incoming trasnactions + OutTx zerolog.Logger // Logger for outgoing transactions + GasPrice zerolog.Logger // Logger for gas prices + Compliance zerolog.Logger // Logger for compliance checks } var _ interfaces.ChainClient = &ChainClient{} @@ -108,11 +108,11 @@ func NewEVMChainClient( } chainLogger := loggers.Std.With().Str("chain", evmCfg.Chain.ChainName.String()).Logger() ob.logger = Log{ - ChainLogger: chainLogger, - ExternalChainWatcher: chainLogger.With().Str("module", "ExternalChainWatcher").Logger(), - WatchGasPrice: chainLogger.With().Str("module", "WatchGasPrice").Logger(), - ObserveOutTx: chainLogger.With().Str("module", "ObserveOutTx").Logger(), - Compliance: loggers.Compliance, + Chain: chainLogger, + InTx: chainLogger.With().Str("module", "WatchInTx").Logger(), + OutTx: chainLogger.With().Str("module", "WatchOutTx").Logger(), + GasPrice: chainLogger.With().Str("module", "WatchGasPrice").Logger(), + Compliance: loggers.Compliance, } ob.coreContext = appContext.ZetaCoreContext() chainParams, found := ob.coreContext.GetEVMChainParams(evmCfg.Chain.ChainId) @@ -130,10 +130,10 @@ func NewEVMChainClient( ob.outTXConfirmedReceipts = make(map[string]*ethtypes.Receipt) ob.outTXConfirmedTransactions = make(map[string]*ethtypes.Transaction) - ob.logger.ChainLogger.Info().Msgf("Chain %s endpoint %s", ob.chain.ChainName.String(), evmCfg.Endpoint) + ob.logger.Chain.Info().Msgf("Chain %s endpoint %s", ob.chain.ChainName.String(), evmCfg.Endpoint) client, err := ethclient.Dial(evmCfg.Endpoint) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("eth Client Dial") + ob.logger.Chain.Error().Err(err).Msg("eth Client Dial") return nil, err } ob.evmClient = client @@ -142,12 +142,12 @@ func NewEVMChainClient( // create block header and block caches ob.blockCache, err = lru.New(1000) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("failed to create block cache") + ob.logger.Chain.Error().Err(err).Msg("failed to create block cache") return nil, err } ob.headerCache, err = lru.New(1000) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("failed to create header cache") + ob.logger.Chain.Error().Err(err).Msg("failed to create header cache") return nil, err } @@ -156,7 +156,7 @@ func NewEVMChainClient( return nil, err } - ob.logger.ChainLogger.Info().Msgf("%s: start scanning from block %d", ob.chain.String(), ob.GetLastBlockHeightScanned()) + ob.logger.Chain.Info().Msgf("%s: start scanning from block %d", ob.chain.String(), ob.GetLastBlockHeightScanned()) return &ob, nil } @@ -169,10 +169,10 @@ func (ob *ChainClient) WithLogger(logger zerolog.Logger) { ob.Mu.Lock() defer ob.Mu.Unlock() ob.logger = Log{ - ChainLogger: logger, - ExternalChainWatcher: logger.With().Str("module", "ExternalChainWatcher").Logger(), - WatchGasPrice: logger.With().Str("module", "WatchGasPrice").Logger(), - ObserveOutTx: logger.With().Str("module", "ObserveOutTx").Logger(), + Chain: logger, + InTx: logger.With().Str("module", "WatchInTx").Logger(), + OutTx: logger.With().Str("module", "WatchOutTx").Logger(), + GasPrice: logger.With().Str("module", "WatchGasPrice").Logger(), } } @@ -252,43 +252,48 @@ func FetchERC20CustodyContract(addr ethcommon.Address, client interfaces.EVMRPCC return erc20custody.NewERC20Custody(addr, client) } +// Start all observation routines for the evm chain func (ob *ChainClient) Start() { - go ob.ExternalChainWatcherForNewInboundTrackerSuggestions() - go ob.ExternalChainWatcher() // Observes external Chains for incoming trasnactions - go ob.WatchGasPrice() // Observes external Chains for Gas prices and posts to core - go ob.observeOutTx() // Populates receipts and confirmed outbound transactions - go ob.ExternalChainRPCStatus() + go ob.WatchInTx() // watch evm chain for incoming txs and post votes to zetacore + go ob.WatchOutTx() // watch evm chain for outgoing txs status + go ob.WatchGasPrice() // watch evm chain for gas prices and post to zetacore + go ob.WatchIntxTracker() // watch zetacore for intx trackers + go ob.WatchRPCStatus() // watch the RPC status of the evm chain } -func (ob *ChainClient) ExternalChainRPCStatus() { - ob.logger.ChainLogger.Info().Msgf("Starting RPC status check for chain %s", ob.chain.String()) +// WatchRPCStatus watches the RPC status of the evm chain +func (ob *ChainClient) WatchRPCStatus() { + ob.logger.Chain.Info().Msgf("Starting RPC status check for chain %s", ob.chain.String()) ticker := time.NewTicker(60 * time.Second) for { select { case <-ticker.C: + if !ob.GetChainParams().IsSupported { + continue + } bn, err := ob.evmClient.BlockNumber(context.Background()) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC Status Check error: RPC down?") + ob.logger.Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?") continue } gasPrice, err := ob.evmClient.SuggestGasPrice(context.Background()) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC Status Check error: RPC down?") + ob.logger.Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?") continue } header, err := ob.evmClient.HeaderByNumber(context.Background(), new(big.Int).SetUint64(bn)) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("RPC Status Check error: RPC down?") + ob.logger.Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?") continue } // #nosec G701 always in range blockTime := time.Unix(int64(header.Time), 0).UTC() elapsedSeconds := time.Since(blockTime).Seconds() if elapsedSeconds > 100 { - ob.logger.ChainLogger.Warn().Msgf("RPC Status Check warning: RPC stale or chain stuck (check explorer)? Latest block %d timestamp is %.0fs ago", bn, elapsedSeconds) + ob.logger.Chain.Warn().Msgf("RPC Status Check warning: RPC stale or chain stuck (check explorer)? Latest block %d timestamp is %.0fs ago", bn, elapsedSeconds) continue } - ob.logger.ChainLogger.Info().Msgf("[OK] RPC status: latest block num %d, timestamp %s ( %.0fs ago), suggested gas price %d", header.Number, blockTime.String(), elapsedSeconds, gasPrice.Uint64()) + ob.logger.Chain.Info().Msgf("[OK] RPC status: latest block num %d, timestamp %s ( %.0fs ago), suggested gas price %d", header.Number, blockTime.String(), elapsedSeconds, gasPrice.Uint64()) case <-ob.stop: return } @@ -296,20 +301,20 @@ func (ob *ChainClient) ExternalChainRPCStatus() { } func (ob *ChainClient) Stop() { - ob.logger.ChainLogger.Info().Msgf("ob %s is stopping", ob.chain.String()) + ob.logger.Chain.Info().Msgf("ob %s is stopping", ob.chain.String()) close(ob.stop) // this notifies all goroutines to stop - ob.logger.ChainLogger.Info().Msg("closing ob.db") + ob.logger.Chain.Info().Msg("closing ob.db") dbInst, err := ob.db.DB() if err != nil { - ob.logger.ChainLogger.Info().Msg("error getting database instance") + ob.logger.Chain.Info().Msg("error getting database instance") } err = dbInst.Close() if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("error closing database") + ob.logger.Chain.Error().Err(err).Msg("error closing database") } - ob.logger.ChainLogger.Info().Msgf("%s observer stopped", ob.chain.String()) + ob.logger.Chain.Info().Msgf("%s observer stopped", ob.chain.String()) } // returns: isIncluded, isConfirmed, Error @@ -347,7 +352,7 @@ func (ob *ChainClient) IsSendOutTxProcessed(cctx *crosschaintypes.CrossChainTx, recvStatus, ob.chain, nonce, - coin.CoinType_Cmd, + cointype, ) if err != nil { logger.Error().Err(err).Msgf("error posting confirmation to meta core for cctx %s nonce %d", sendHash, nonce) @@ -606,12 +611,11 @@ func (ob *ChainClient) IsSendOutTxProcessed(cctx *crosschaintypes.CrossChainTx, return false, false, nil } -// FIXME: there's a chance that a txhash in OutTxChan may not deliver when Stop() is called -// observeOutTx periodically checks all the txhash in potential outbound txs -func (ob *ChainClient) observeOutTx() { - ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_observeOutTx_%d", ob.chain.ChainId), ob.GetChainParams().OutTxTicker) +// WatchOutTx watches evm chain for outgoing txs status +func (ob *ChainClient) WatchOutTx() { + ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchOutTx_%d", ob.chain.ChainId), ob.GetChainParams().OutTxTicker) if err != nil { - ob.logger.ObserveOutTx.Error().Err(err).Msg("failed to create ticker") + ob.logger.OutTx.Error().Err(err).Msg("error creating ticker") return } @@ -622,6 +626,9 @@ func (ob *ChainClient) observeOutTx() { if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled { continue } + if !ob.GetChainParams().IsSupported { + continue + } trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending) if err != nil { continue @@ -639,22 +646,22 @@ func (ob *ChainClient) observeOutTx() { txCount++ receipt = recpt transaction = tx - ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt) + ob.logger.OutTx.Info().Msgf("WatchOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt) if txCount > 1 { - ob.logger.ObserveOutTx.Error().Msgf( - "observeOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction) + ob.logger.OutTx.Error().Msgf( + "WatchOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction) } } } if txCount == 1 { // should be only one txHash confirmed for each nonce. ob.SetTxNReceipt(nonceInt, receipt, transaction) } else if txCount > 1 { // should not happen. We can't tell which txHash is true. It might happen (e.g. glitchy/hacked endpoint) - ob.logger.ObserveOutTx.Error().Msgf("observeOutTx: confirmed multiple (%d) outTx for chain %d nonce %d", txCount, ob.chain.ChainId, nonceInt) + ob.logger.OutTx.Error().Msgf("WatchOutTx: confirmed multiple (%d) outTx for chain %d nonce %d", txCount, ob.chain.ChainId, nonceInt) } } - ticker.UpdateInterval(ob.GetChainParams().OutTxTicker, ob.logger.ObserveOutTx) + ticker.UpdateInterval(ob.GetChainParams().OutTxTicker, ob.logger.OutTx) case <-ob.stop: - ob.logger.ObserveOutTx.Info().Msg("observeOutTx: stopped") + ob.logger.OutTx.Info().Msg("WatchOutTx: stopped") return } } @@ -820,29 +827,34 @@ func (ob *ChainClient) GetLastBlockHeight() uint64 { return height } -func (ob *ChainClient) ExternalChainWatcher() { - ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_ExternalChainWatcher_%d", ob.chain.ChainId), ob.GetChainParams().InTxTicker) +// WatchInTx watches evm chain for incoming txs and post votes to zetacore +func (ob *ChainClient) WatchInTx() { + ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchInTx_%d", ob.chain.ChainId), ob.GetChainParams().InTxTicker) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msg("NewDynamicTicker error") + ob.logger.InTx.Error().Err(err).Msg("error creating ticker") return } defer ticker.Stop() - ob.logger.ExternalChainWatcher.Info().Msg("ExternalChainWatcher started") - sampledLogger := ob.logger.ExternalChainWatcher.Sample(&zerolog.BasicSampler{N: 10}) + ob.logger.InTx.Info().Msgf("WatchInTx started for chain %d", ob.chain.ChainId) + sampledLogger := ob.logger.InTx.Sample(&zerolog.BasicSampler{N: 10}) for { select { case <-ticker.C(): if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled { continue } + if !ob.GetChainParams().IsSupported { + sampledLogger.Info().Msgf("WatchInTx: chain %d is not supported", ob.chain.ChainId) + continue + } err := ob.observeInTX(sampledLogger) if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msg("observeInTX error") + ob.logger.InTx.Err(err).Msg("WatchInTx: observeInTX error") } - ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.ExternalChainWatcher) + ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.InTx) case <-ob.stop: - ob.logger.ExternalChainWatcher.Info().Msg("ExternalChainWatcher stopped") + ob.logger.InTx.Info().Msgf("WatchInTx stopped for chain %d", ob.chain.ChainId) return } } @@ -873,12 +885,12 @@ func (ob *ChainClient) postBlockHeader(tip uint64) error { header, err := ob.GetBlockHeaderCached(bn) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error getting block: %d", bn) + ob.logger.InTx.Error().Err(err).Msgf("postBlockHeader: error getting block: %d", bn) return err } headerRLP, err := rlp.EncodeToBytes(header) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error encoding block header: %d", bn) + ob.logger.InTx.Error().Err(err).Msgf("postBlockHeader: error encoding block header: %d", bn) return err } @@ -889,7 +901,7 @@ func (ob *ChainClient) postBlockHeader(tip uint64) error { proofs.NewEthereumHeader(headerRLP), ) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("postBlockHeader: error posting block header: %d", bn) + ob.logger.InTx.Error().Err(err).Msgf("postBlockHeader: error posting block header: %d", bn) return err } return nil @@ -950,7 +962,7 @@ func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error { ob.chain.ChainId, lastScannedZetaSent, lastScannedDeposited, lastScannedTssRecvd) ob.SetLastBlockHeightScanned(lastScannedLowest) if err := ob.db.Save(clienttypes.ToLastBlockSQLType(lastScannedLowest)).Error; err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("observeInTX: error writing lastScannedLowest %d to db", lastScannedLowest) + ob.logger.InTx.Error().Err(err).Msgf("observeInTX: error writing lastScannedLowest %d to db", lastScannedLowest) } } return nil @@ -962,7 +974,7 @@ func (ob *ChainClient) ObserveZetaSent(startBlock, toBlock uint64) uint64 { // filter ZetaSent logs addrConnector, connector, err := ob.GetConnectorContract() if err != nil { - ob.logger.ChainLogger.Warn().Err(err).Msgf("ObserveZetaSent: GetConnectorContract error:") + ob.logger.Chain.Warn().Err(err).Msgf("ObserveZetaSent: GetConnectorContract error:") return startBlock - 1 // lastScanned } iter, err := connector.FilterZetaSent(&bind.FilterOpts{ @@ -971,7 +983,7 @@ func (ob *ChainClient) ObserveZetaSent(startBlock, toBlock uint64) uint64 { Context: context.TODO(), }, []ethcommon.Address{}, []*big.Int{}) if err != nil { - ob.logger.ChainLogger.Warn().Err(err).Msgf( + ob.logger.Chain.Warn().Err(err).Msgf( "ObserveZetaSent: FilterZetaSent error from block %d to %d for chain %d", startBlock, toBlock, ob.chain.ChainId) return startBlock - 1 // lastScanned } @@ -985,7 +997,7 @@ func (ob *ChainClient) ObserveZetaSent(startBlock, toBlock uint64) uint64 { events = append(events, iter.Event) continue } - ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf("ObserveZetaSent: invalid ZetaSent event in tx %s on chain %d at height %d", + ob.logger.InTx.Warn().Err(err).Msgf("ObserveZetaSent: invalid ZetaSent event in tx %s on chain %d at height %d", iter.Event.Raw.TxHash.Hex(), ob.chain.ChainId, iter.Event.Raw.BlockNumber) } sort.SliceStable(events, func(i, j int) bool { @@ -1011,7 +1023,7 @@ func (ob *ChainClient) ObserveZetaSent(startBlock, toBlock uint64) uint64 { } // guard against multiple events in the same tx if guard[event.Raw.TxHash.Hex()] { - ob.logger.ExternalChainWatcher.Warn().Msgf("ObserveZetaSent: multiple remote call events detected in tx %s", event.Raw.TxHash) + ob.logger.InTx.Warn().Msgf("ObserveZetaSent: multiple remote call events detected in tx %s", event.Raw.TxHash) continue } guard[event.Raw.TxHash.Hex()] = true @@ -1034,7 +1046,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 // filter ERC20CustodyDeposited logs addrCustody, erc20custodyContract, err := ob.GetERC20CustodyContract() if err != nil { - ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf("ObserveERC20Deposited: GetERC20CustodyContract error:") + ob.logger.InTx.Warn().Err(err).Msgf("ObserveERC20Deposited: GetERC20CustodyContract error:") return startBlock - 1 // lastScanned } iter, err := erc20custodyContract.FilterDeposited(&bind.FilterOpts{ @@ -1043,7 +1055,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 Context: context.TODO(), }, []ethcommon.Address{}) if err != nil { - ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf( + ob.logger.InTx.Warn().Err(err).Msgf( "ObserveERC20Deposited: FilterDeposited error from block %d to %d for chain %d", startBlock, toBlock, ob.chain.ChainId) return startBlock - 1 // lastScanned } @@ -1057,7 +1069,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 events = append(events, iter.Event) continue } - ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf("ObserveERC20Deposited: invalid Deposited event in tx %s on chain %d at height %d", + ob.logger.InTx.Warn().Err(err).Msgf("ObserveERC20Deposited: invalid Deposited event in tx %s on chain %d at height %d", iter.Event.Raw.TxHash.Hex(), ob.chain.ChainId, iter.Event.Raw.BlockNumber) } sort.SliceStable(events, func(i, j int) bool { @@ -1083,7 +1095,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 } tx, _, err := ob.TransactionByHash(event.Raw.TxHash.Hex()) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf( + ob.logger.InTx.Error().Err(err).Msgf( "ObserveERC20Deposited: error getting transaction for intx %s chain %d", event.Raw.TxHash, ob.chain.ChainId) return beingScanned - 1 // we have to re-scan from this block next time } @@ -1091,7 +1103,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 // guard against multiple events in the same tx if guard[event.Raw.TxHash.Hex()] { - ob.logger.ExternalChainWatcher.Warn().Msgf("ObserveERC20Deposited: multiple remote call events detected in tx %s", event.Raw.TxHash) + ob.logger.InTx.Warn().Msgf("ObserveERC20Deposited: multiple remote call events detected in tx %s", event.Raw.TxHash) continue } guard[event.Raw.TxHash.Hex()] = true @@ -1111,10 +1123,6 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 // ObserverTSSReceive queries the incoming gas asset to TSS address and posts to zetabridge // returns the last block successfully scanned func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64) uint64 { - if !ob.GetChainParams().IsSupported { - return startBlock - 1 // lastScanned - } - // query incoming gas asset for bn := startBlock; bn <= toBlock; bn++ { // post new block header (if any) to zetabridge and ignore error @@ -1125,14 +1133,14 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64) uint64 { chains.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains err := ob.postBlockHeader(toBlock) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msg("error posting block header") + ob.logger.InTx.Error().Err(err).Msg("error posting block header") } } // observe TSS received gas token in block 'bn' err := ob.ObserveTSSReceiveInBlock(bn) if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("ObserverTSSReceive: error observing TSS received token in block %d for chain %d", bn, ob.chain.ChainId) + ob.logger.InTx.Error().Err(err).Msgf("ObserverTSSReceive: error observing TSS received token in block %d for chain %d", bn, ob.chain.ChainId) return bn - 1 // we have to re-scan from this block next time } } @@ -1140,33 +1148,37 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64) uint64 { return toBlock } +// WatchGasPrice watches evm chain for gas prices and post to zetacore func (ob *ChainClient) WatchGasPrice() { // report gas price right away as the ticker takes time to kick in err := ob.PostGasPrice() if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) } // start gas price ticker ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchGasPrice_%d", ob.chain.ChainId), ob.GetChainParams().GasPriceTicker) if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msg("NewDynamicTicker error") + ob.logger.GasPrice.Error().Err(err).Msg("NewDynamicTicker error") return } - ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d", + ob.logger.GasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d", ob.chain.ChainId, ob.GetChainParams().GasPriceTicker) defer ticker.Stop() for { select { case <-ticker.C(): + if !ob.GetChainParams().IsSupported { + continue + } err = ob.PostGasPrice() if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) } - ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice) + ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.GasPrice) case <-ob.stop: - ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice stopped") + ob.logger.GasPrice.Info().Msg("WatchGasPrice stopped") return } } @@ -1177,12 +1189,12 @@ func (ob *ChainClient) PostGasPrice() error { // GAS PRICE gasPrice, err := ob.evmClient.SuggestGasPrice(context.TODO()) if err != nil { - ob.logger.WatchGasPrice.Err(err).Msg("Err SuggestGasPrice:") + ob.logger.GasPrice.Err(err).Msg("Err SuggestGasPrice:") return err } blockNum, err := ob.evmClient.BlockNumber(context.TODO()) if err != nil { - ob.logger.WatchGasPrice.Err(err).Msg("Err Fetching Most recent Block : ") + ob.logger.GasPrice.Err(err).Msg("Err Fetching Most recent Block : ") return err } @@ -1191,7 +1203,7 @@ func (ob *ChainClient) PostGasPrice() error { zetaHash, err := ob.zetaClient.PostGasPrice(ob.chain, gasPrice.Uint64(), supply, blockNum) if err != nil { - ob.logger.WatchGasPrice.Err(err).Msg("PostGasPrice to zetabridge failed") + ob.logger.GasPrice.Err(err).Msg("PostGasPrice to zetabridge failed") return err } _ = zetaHash @@ -1200,7 +1212,7 @@ func (ob *ChainClient) PostGasPrice() error { } func (ob *ChainClient) BuildLastBlock() error { - logger := ob.logger.ChainLogger.With().Str("module", "BuildBlockIndex").Logger() + logger := ob.logger.Chain.With().Str("module", "BuildBlockIndex").Logger() envvar := ob.chain.ChainName.String() + "_SCAN_FROM" scanFromBlock := os.Getenv(envvar) if scanFromBlock != "" { @@ -1241,7 +1253,7 @@ func (ob *ChainClient) BuildReceiptsMap() error { logger := ob.logger var receipts []clienttypes.ReceiptSQLType if err := ob.db.Find(&receipts).Error; err != nil { - logger.ChainLogger.Error().Err(err).Msg("error iterating over db") + logger.Chain.Error().Err(err).Msg("error iterating over db") return err } for _, receipt := range receipts { @@ -1274,7 +1286,7 @@ func (ob *ChainClient) LoadDB(dbPath string, chain chains.Chain) error { &clienttypes.TransactionSQLType{}, &clienttypes.LastBlockSQLType{}) if err != nil { - ob.logger.ChainLogger.Error().Err(err).Msg("error migrating db") + ob.logger.Chain.Error().Err(err).Msg("error migrating db") return err } diff --git a/zetaclient/evm/inbounds.go b/zetaclient/evm/inbounds.go index 4459f361f5..6d21467d09 100644 --- a/zetaclient/evm/inbounds.go +++ b/zetaclient/evm/inbounds.go @@ -26,33 +26,36 @@ import ( "golang.org/x/net/context" ) -// ExternalChainWatcherForNewInboundTrackerSuggestions At each tick, gets a list of Inbound tracker suggestions from zeta-core and tries to check if the in-tx was confirmed. +// WatchIntxTracker gets a list of Inbound tracker suggestions from zeta-core at each tick and tries to check if the in-tx was confirmed. // If it was, it tries to broadcast the confirmation vote. If this zeta client has previously broadcast the vote, the tx would be rejected -func (ob *ChainClient) ExternalChainWatcherForNewInboundTrackerSuggestions() { +func (ob *ChainClient) WatchIntxTracker() { ticker, err := clienttypes.NewDynamicTicker( - fmt.Sprintf("EVM_ExternalChainWatcher_InboundTrackerSuggestions_%d", ob.chain.ChainId), + fmt.Sprintf("EVM_WatchIntxTracker_%d", ob.chain.ChainId), ob.GetChainParams().InTxTicker, ) if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msg("error creating ticker") + ob.logger.InTx.Err(err).Msg("error creating ticker") return } defer ticker.Stop() - ob.logger.ExternalChainWatcher.Info().Msg("ExternalChainWatcher for inboundTrackerSuggestions started") + ob.logger.InTx.Info().Msgf("Intx tracker watcher started for chain %d", ob.chain.ChainId) for { select { case <-ticker.C(): if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled { continue } + if !ob.GetChainParams().IsSupported { + continue + } err := ob.ObserveIntxTrackers() if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msg("ObserveTrackerSuggestions error") + ob.logger.InTx.Err(err).Msg("ObserveTrackerSuggestions error") } - ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.ExternalChainWatcher) + ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.InTx) case <-ob.stop: - ob.logger.ExternalChainWatcher.Info().Msg("ExternalChainWatcher for inboundTrackerSuggestions stopped") + ob.logger.InTx.Info().Msg("ExternalChainWatcher for inboundTrackerSuggestions stopped") return } } @@ -74,7 +77,7 @@ func (ob *ChainClient) ObserveIntxTrackers() error { if err != nil { return errors.Wrapf(err, "error getting receipt for intx %s chain %d", tracker.TxHash, ob.chain.ChainId) } - ob.logger.ExternalChainWatcher.Info().Msgf("checking tracker for intx %s chain %d", tracker.TxHash, ob.chain.ChainId) + ob.logger.InTx.Info().Msgf("checking tracker for intx %s chain %d", tracker.TxHash, ob.chain.ChainId) // check and vote on inbound tx switch tracker.CoinType { @@ -116,7 +119,7 @@ func (ob *ChainClient) CheckAndVoteInboundTokenZeta(tx *ethrpc.Transaction, rece if err == nil { msg = ob.BuildInboundVoteMsgForZetaSentEvent(event) } else { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("CheckEvmTxLog error on intx %s chain %d", tx.Hash, ob.chain.ChainId) + ob.logger.InTx.Error().Err(err).Msgf("CheckEvmTxLog error on intx %s chain %d", tx.Hash, ob.chain.ChainId) return "", err } break // only one event is allowed per tx @@ -124,7 +127,7 @@ func (ob *ChainClient) CheckAndVoteInboundTokenZeta(tx *ethrpc.Transaction, rece } if msg == nil { // no event, restricted tx, etc. - ob.logger.ExternalChainWatcher.Info().Msgf("no ZetaSent event found for intx %s chain %d", tx.Hash, ob.chain.ChainId) + ob.logger.InTx.Info().Msgf("no ZetaSent event found for intx %s chain %d", tx.Hash, ob.chain.ChainId) return "", nil } if vote { @@ -157,7 +160,7 @@ func (ob *ChainClient) CheckAndVoteInboundTokenERC20(tx *ethrpc.Transaction, rec if err == nil { msg = ob.BuildInboundVoteMsgForDepositedEvent(zetaDeposited, sender) } else { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("CheckEvmTxLog error on intx %s chain %d", tx.Hash, ob.chain.ChainId) + ob.logger.InTx.Error().Err(err).Msgf("CheckEvmTxLog error on intx %s chain %d", tx.Hash, ob.chain.ChainId) return "", err } break // only one event is allowed per tx @@ -165,7 +168,7 @@ func (ob *ChainClient) CheckAndVoteInboundTokenERC20(tx *ethrpc.Transaction, rec } if msg == nil { // no event, donation, restricted tx, etc. - ob.logger.ExternalChainWatcher.Info().Msgf("no Deposited event found for intx %s chain %d", tx.Hash, ob.chain.ChainId) + ob.logger.InTx.Info().Msgf("no Deposited event found for intx %s chain %d", tx.Hash, ob.chain.ChainId) return "", nil } if vote { @@ -193,7 +196,7 @@ func (ob *ChainClient) CheckAndVoteInboundTokenGas(tx *ethrpc.Transaction, recei msg := ob.BuildInboundVoteMsgForTokenSentToTSS(tx, sender, receipt.BlockNumber.Uint64()) if msg == nil { // donation, restricted tx, etc. - ob.logger.ExternalChainWatcher.Info().Msgf("no vote message built for intx %s chain %d", tx.Hash, ob.chain.ChainId) + ob.logger.InTx.Info().Msgf("no vote message built for intx %s chain %d", tx.Hash, ob.chain.ChainId) return "", nil } if vote { @@ -208,12 +211,12 @@ func (ob *ChainClient) PostVoteInbound(msg *types.MsgVoteOnObservedInboundTx, co chainID := ob.chain.ChainId zetaHash, ballot, err := ob.zetaClient.PostVoteInbound(zetabridge.PostVoteInboundGasLimit, retryGasLimit, msg) if err != nil { - ob.logger.ExternalChainWatcher.Err(err).Msgf("intx detected: error posting vote for chain %d token %s intx %s", chainID, coinType, txHash) + ob.logger.InTx.Err(err).Msgf("intx detected: error posting vote for chain %d token %s intx %s", chainID, coinType, txHash) return "", err } else if zetaHash != "" { - ob.logger.ExternalChainWatcher.Info().Msgf("intx detected: chain %d token %s intx %s vote %s ballot %s", chainID, coinType, txHash, zetaHash, ballot) + ob.logger.InTx.Info().Msgf("intx detected: chain %d token %s intx %s vote %s ballot %s", chainID, coinType, txHash, zetaHash, ballot) } else { - ob.logger.ExternalChainWatcher.Info().Msgf("intx detected: chain %d token %s intx %s already voted on ballot %s", chainID, coinType, txHash, ballot) + ob.logger.InTx.Info().Msgf("intx detected: chain %d token %s intx %s already voted on ballot %s", chainID, coinType, txHash, ballot) } return ballot, err } @@ -233,18 +236,18 @@ func (ob *ChainClient) BuildInboundVoteMsgForDepositedEvent(event *erc20custody. maybeReceiver = parsedAddress.Hex() } if config.ContainRestrictedAddress(sender.Hex(), clienttypes.BytesToEthHex(event.Recipient), maybeReceiver) { - compliance.PrintComplianceLog(ob.logger.ExternalChainWatcher, ob.logger.Compliance, + compliance.PrintComplianceLog(ob.logger.InTx, ob.logger.Compliance, false, ob.chain.ChainId, event.Raw.TxHash.Hex(), sender.Hex(), clienttypes.BytesToEthHex(event.Recipient), "ERC20") return nil } // donation check if bytes.Equal(event.Message, []byte(constant.DonationMessage)) { - ob.logger.ExternalChainWatcher.Info().Msgf("thank you rich folk for your donation! tx %s chain %d", event.Raw.TxHash.Hex(), ob.chain.ChainId) + ob.logger.InTx.Info().Msgf("thank you rich folk for your donation! tx %s chain %d", event.Raw.TxHash.Hex(), ob.chain.ChainId) return nil } message := hex.EncodeToString(event.Message) - ob.logger.ExternalChainWatcher.Info().Msgf("ERC20CustodyDeposited inTx detected on chain %d tx %s block %d from %s value %s message %s", + ob.logger.InTx.Info().Msgf("ERC20CustodyDeposited inTx detected on chain %d tx %s block %d from %s value %s message %s", ob.chain.ChainId, event.Raw.TxHash.Hex(), event.Raw.BlockNumber, sender.Hex(), event.Amount.String(), message) return zetabridge.GetInBoundVoteMessage( @@ -269,7 +272,7 @@ func (ob *ChainClient) BuildInboundVoteMsgForDepositedEvent(event *erc20custody. func (ob *ChainClient) BuildInboundVoteMsgForZetaSentEvent(event *zetaconnector.ZetaConnectorNonEthZetaSent) *types.MsgVoteOnObservedInboundTx { destChain := chains.GetChainFromChainID(event.DestinationChainId.Int64()) if destChain == nil { - ob.logger.ExternalChainWatcher.Warn().Msgf("chain id not supported %d", event.DestinationChainId.Int64()) + ob.logger.InTx.Warn().Msgf("chain id not supported %d", event.DestinationChainId.Int64()) return nil } destAddr := clienttypes.BytesToEthHex(event.DestinationAddress) @@ -277,7 +280,7 @@ func (ob *ChainClient) BuildInboundVoteMsgForZetaSentEvent(event *zetaconnector. // compliance check sender := event.ZetaTxSenderAddress.Hex() if config.ContainRestrictedAddress(sender, destAddr, event.SourceTxOriginAddress.Hex()) { - compliance.PrintComplianceLog(ob.logger.ExternalChainWatcher, ob.logger.Compliance, + compliance.PrintComplianceLog(ob.logger.InTx, ob.logger.Compliance, false, ob.chain.ChainId, event.Raw.TxHash.Hex(), sender, destAddr, "Zeta") return nil } @@ -285,17 +288,17 @@ func (ob *ChainClient) BuildInboundVoteMsgForZetaSentEvent(event *zetaconnector. if !destChain.IsZetaChain() { paramsDest, found := ob.coreContext.GetEVMChainParams(destChain.ChainId) if !found { - ob.logger.ExternalChainWatcher.Warn().Msgf("chain id not present in EVMChainParams %d", event.DestinationChainId.Int64()) + ob.logger.InTx.Warn().Msgf("chain id not present in EVMChainParams %d", event.DestinationChainId.Int64()) return nil } if strings.EqualFold(destAddr, paramsDest.ZetaTokenContractAddress) { - ob.logger.ExternalChainWatcher.Warn().Msgf("potential attack attempt: %s destination address is ZETA token contract address %s", destChain, destAddr) + ob.logger.InTx.Warn().Msgf("potential attack attempt: %s destination address is ZETA token contract address %s", destChain, destAddr) return nil } } message := base64.StdEncoding.EncodeToString(event.Message) - ob.logger.ExternalChainWatcher.Info().Msgf("ZetaSent inTx detected on chain %d tx %s block %d from %s value %s message %s", + ob.logger.InTx.Info().Msgf("ZetaSent inTx detected on chain %d tx %s block %d from %s value %s message %s", ob.chain.ChainId, event.Raw.TxHash.Hex(), event.Raw.BlockNumber, sender, event.ZetaValueAndGas.String(), message) return zetabridge.GetInBoundVoteMessage( @@ -327,7 +330,7 @@ func (ob *ChainClient) BuildInboundVoteMsgForTokenSentToTSS(tx *ethrpc.Transacti maybeReceiver = parsedAddress.Hex() } if config.ContainRestrictedAddress(sender.Hex(), maybeReceiver) { - compliance.PrintComplianceLog(ob.logger.ExternalChainWatcher, ob.logger.Compliance, + compliance.PrintComplianceLog(ob.logger.InTx, ob.logger.Compliance, false, ob.chain.ChainId, tx.Hash, sender.Hex(), sender.Hex(), "Gas") return nil } @@ -336,10 +339,10 @@ func (ob *ChainClient) BuildInboundVoteMsgForTokenSentToTSS(tx *ethrpc.Transacti // #nosec G703 err is already checked data, _ := hex.DecodeString(message) if bytes.Equal(data, []byte(constant.DonationMessage)) { - ob.logger.ExternalChainWatcher.Info().Msgf("thank you rich folk for your donation! tx %s chain %d", tx.Hash, ob.chain.ChainId) + ob.logger.InTx.Info().Msgf("thank you rich folk for your donation! tx %s chain %d", tx.Hash, ob.chain.ChainId) return nil } - ob.logger.ExternalChainWatcher.Info().Msgf("TSS inTx detected on chain %d tx %s block %d from %s value %s message %s", + ob.logger.InTx.Info().Msgf("TSS inTx detected on chain %d tx %s block %d from %s value %s message %s", ob.chain.ChainId, tx.Hash, blockNumber, sender.Hex(), tx.Value.String(), message) return zetabridge.GetInBoundVoteMessage( diff --git a/zetaclient/interfaces/interfaces.go b/zetaclient/interfaces/interfaces.go index 75242ab17d..1d48fc149c 100644 --- a/zetaclient/interfaces/interfaces.go +++ b/zetaclient/interfaces/interfaces.go @@ -43,7 +43,7 @@ type ChainClient interface { SetChainParams(observertypes.ChainParams) GetChainParams() observertypes.ChainParams GetTxID(nonce uint64) string - ExternalChainWatcherForNewInboundTrackerSuggestions() + WatchIntxTracker() } // ChainSigner is the interface to sign transactions for a chain diff --git a/zetaclient/testutils/stub/chain_client.go b/zetaclient/testutils/stub/chain_client.go index 642de62792..f5a5368511 100644 --- a/zetaclient/testutils/stub/chain_client.go +++ b/zetaclient/testutils/stub/chain_client.go @@ -45,7 +45,7 @@ func (s *EVMClient) GetTxID(_ uint64) string { return "" } -func (s *EVMClient) ExternalChainWatcherForNewInboundTrackerSuggestions() { +func (s *EVMClient) WatchIntxTracker() { } // ---------------------------------------------------------------------------- @@ -86,5 +86,5 @@ func (s *BTCClient) GetTxID(_ uint64) string { return "" } -func (s *BTCClient) ExternalChainWatcherForNewInboundTrackerSuggestions() { +func (s *BTCClient) WatchIntxTracker() { } diff --git a/zetaclient/zetabridge/zetacore_bridge.go b/zetaclient/zetabridge/zetacore_bridge.go index 684092c744..d8a29fa665 100644 --- a/zetaclient/zetabridge/zetacore_bridge.go +++ b/zetaclient/zetabridge/zetacore_bridge.go @@ -220,14 +220,15 @@ func (b *ZetaCoreBridge) UpdateZetaCoreContext(coreContext *corecontext.ZetaCore var newBTCParams *observertypes.ChainParams // check and update chain params for each chain + sampledLogger := b.logger.Sample(&zerolog.BasicSampler{N: 10}) for _, chainParam := range chainParams { - err := observertypes.ValidateChainParams(chainParam) - if err != nil { - b.logger.Warn().Err(err).Msgf("Invalid chain params for chain %d", chainParam.ChainId) + if !chainParam.GetIsSupported() { + sampledLogger.Info().Msgf("Chain %d is not supported yet", chainParam.ChainId) continue } - if !chainParam.GetIsSupported() { - b.logger.Info().Msgf("Chain %d is not supported yet", chainParam.ChainId) + err := observertypes.ValidateChainParams(chainParam) + if err != nil { + sampledLogger.Warn().Err(err).Msgf("Invalid chain params for chain %d", chainParam.ChainId) continue } if chains.IsBitcoinChain(chainParam.ChainId) { @@ -237,12 +238,12 @@ func (b *ZetaCoreBridge) UpdateZetaCoreContext(coreContext *corecontext.ZetaCore } } - supporteChains, err := b.GetSupportedChains() + supportedChains, err := b.GetSupportedChains() if err != nil { return err } - newChains := make([]chains.Chain, len(supporteChains)) - for i, chain := range supporteChains { + newChains := make([]chains.Chain, len(supportedChains)) + for i, chain := range supportedChains { newChains[i] = *chain } keyGen, err := b.GetKeyGen() diff --git a/zetaclient/zetacore_observer.go b/zetaclient/zetacore_observer.go index e31766b3be..b48609188e 100644 --- a/zetaclient/zetacore_observer.go +++ b/zetaclient/zetacore_observer.go @@ -155,6 +155,10 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) { co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId) continue } + if !ob.GetChainParams().IsSupported { + co.logger.ZetaChainWatcher.Info().Msgf("startCctxScheduler: chain %d is not supported", c.ChainId) + continue + } cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId) if err != nil {