From 13b75e34429114c9d6643127effd2f5e07d0cb2d Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Tue, 2 Apr 2024 15:25:25 -0500 Subject: [PATCH] check crosschain flags to stop inbound/outbound; get rid of timeout in outtx processing --- changelog.md | 2 + zetaclient/bitcoin/bitcoin_client.go | 24 ++++++--- zetaclient/bitcoin/inbound_tracker.go | 3 ++ zetaclient/evm/evm_client.go | 69 +++++++++----------------- zetaclient/evm/inbounds.go | 3 ++ zetaclient/zetacore_observer.go | 71 ++++++++++++++------------- 6 files changed, 85 insertions(+), 87 deletions(-) diff --git a/changelog.md b/changelog.md index 7ae80ba98c..8cceff3e79 100644 --- a/changelog.md +++ b/changelog.md @@ -54,6 +54,8 @@ * [1880](https://github.com/zeta-chain/node/issues/1880) - lower the gas price multiplier for EVM chains. * [1633](https://github.com/zeta-chain/node/issues/1633) - zetaclient should be able to pick up new connector and erc20Custody addresses * [1944](https://github.com/zeta-chain/node/pull/1944) - fix evm signer unit tests +* [1888](https://github.com/zeta-chain/node/issues/1888) - zetaclient should stop inbound/outbound txs according to cross-chain flags +* [1970](https://github.com/zeta-chain/node/issues/1970) - remove the timeout in the evm outtx tracker processing thread ### Chores diff --git a/zetaclient/bitcoin/bitcoin_client.go b/zetaclient/bitcoin/bitcoin_client.go index 8f9c1f9463..c06abd0876 100644 --- a/zetaclient/bitcoin/bitcoin_client.go +++ b/zetaclient/bitcoin/bitcoin_client.go @@ -335,6 +335,9 @@ func (ob *BTCChainClient) WatchInTx() { for { select { case <-ticker.C(): + if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled { + continue + } err := ob.ObserveInTx() if err != nil { ob.logger.WatchInTx.Error().Err(err).Msg("WatchInTx error observing in tx") @@ -383,12 +386,6 @@ func (ob *BTCChainClient) postBlockHeader(tip int64) error { } func (ob *BTCChainClient) ObserveInTx() error { - // make sure inbound TXS / Send is enabled by the protocol - flags := ob.coreContext.GetCrossChainFlags() - if !flags.IsInboundEnabled { - return errors.New("inbound TXS / Send has been disabled by the protocol") - } - // get and update latest block height cnt, err := ob.rpcClient.GetBlockCount() if err != nil { @@ -437,6 +434,7 @@ func (ob *BTCChainClient) ObserveInTx() error { } // add block header to zetabridge + flags := ob.coreContext.GetCrossChainFlags() if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled { err = ob.postBlockHeader(bn) if err != nil { @@ -576,11 +574,20 @@ func (ob *BTCChainClient) IsSendOutTxProcessed(cctx *types.CrossChainTx, logger } func (ob *BTCChainClient) WatchGasPrice() { + // report gas price right away as the ticker takes time to kick in + err := ob.PostGasPrice() + if err != nil { + ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) + } + + // start gas price ticker ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchGasPrice", ob.GetChainParams().GasPriceTicker) if err != nil { ob.logger.WatchGasPrice.Error().Err(err).Msg("WatchGasPrice error") return } + ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d", + ob.chain.ChainId, ob.GetChainParams().GasPriceTicker) defer ticker.Stop() for { @@ -588,7 +595,7 @@ func (ob *BTCChainClient) WatchGasPrice() { case <-ticker.C(): err := ob.PostGasPrice() if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msg("PostGasPrice error on " + ob.chain.String()) + ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) } ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice) case <-ob.stop: @@ -1095,6 +1102,9 @@ func (ob *BTCChainClient) observeOutTx() { for { select { case <-ticker.C(): + if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled { + continue + } trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending) if err != nil { ob.logger.ObserveOutTx.Error().Err(err).Msg("observeOutTx: error GetAllOutTxTrackerByChain") diff --git a/zetaclient/bitcoin/inbound_tracker.go b/zetaclient/bitcoin/inbound_tracker.go index 2a6b80c124..bcabf0c1a8 100644 --- a/zetaclient/bitcoin/inbound_tracker.go +++ b/zetaclient/bitcoin/inbound_tracker.go @@ -21,6 +21,9 @@ func (ob *BTCChainClient) ExternalChainWatcherForNewInboundTrackerSuggestions() for { select { case <-ticker.C(): + if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled { + continue + } err := ob.ObserveTrackerSuggestions() if err != nil { ob.logger.WatchInTx.Error().Err(err).Msg("error observing in tx") diff --git a/zetaclient/evm/evm_client.go b/zetaclient/evm/evm_client.go index a70401365e..f6a70b1fe6 100644 --- a/zetaclient/evm/evm_client.go +++ b/zetaclient/evm/evm_client.go @@ -612,13 +612,6 @@ func (ob *ChainClient) IsSendOutTxProcessed(cctx *crosschaintypes.CrossChainTx, // FIXME: there's a chance that a txhash in OutTxChan may not deliver when Stop() is called // observeOutTx periodically checks all the txhash in potential outbound txs func (ob *ChainClient) observeOutTx() { - // read env variables if set - timeoutNonce, err := strconv.Atoi(os.Getenv("OS_TIMEOUT_NONCE")) - if err != nil || timeoutNonce <= 0 { - timeoutNonce = 100 * 3 // process up to 100 hashes - } - ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: using timeoutNonce %d seconds", timeoutNonce) - ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_observeOutTx_%d", ob.chain.ChainId), ob.GetChainParams().OutTxTicker) if err != nil { ob.logger.ObserveOutTx.Error().Err(err).Msg("failed to create ticker") @@ -629,13 +622,13 @@ func (ob *ChainClient) observeOutTx() { for { select { case <-ticker.C(): + if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled { + continue + } trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending) if err != nil { continue } - //FIXME: remove this timeout here to ensure that all trackers are queried - outTimeout := time.After(time.Duration(timeoutNonce) * time.Second) - TRACKERLOOP: for _, tracker := range trackers { nonceInt := tracker.Nonce if ob.isTxConfirmed(nonceInt) { // Go to next tracker if this one already has a confirmed tx @@ -645,20 +638,14 @@ func (ob *ChainClient) observeOutTx() { var receipt *ethtypes.Receipt var transaction *ethtypes.Transaction for _, txHash := range tracker.HashList { - select { - case <-outTimeout: - ob.logger.ObserveOutTx.Warn().Msgf("observeOutTx: timeout on chain %d nonce %d", ob.chain.ChainId, nonceInt) - break TRACKERLOOP - default: - if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok { - txCount++ - receipt = recpt - transaction = tx - ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt) - if txCount > 1 { - ob.logger.ObserveOutTx.Error().Msgf( - "observeOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction) - } + if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok { + txCount++ + receipt = recpt + transaction = tx + ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt) + if txCount > 1 { + ob.logger.ObserveOutTx.Error().Msgf( + "observeOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction) } } } @@ -849,6 +836,9 @@ func (ob *ChainClient) ExternalChainWatcher() { for { select { case <-ticker.C(): + if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled { + continue + } err := ob.observeInTX(sampledLogger) if err != nil { ob.logger.ExternalChainWatcher.Err(err).Msg("observeInTX error") @@ -909,12 +899,6 @@ func (ob *ChainClient) postBlockHeader(tip uint64) error { } func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error { - // make sure inbound TXS / Send is enabled by the protocol - flags := ob.coreContext.GetCrossChainFlags() - if !flags.IsInboundEnabled { - return errors.New("inbound TXS / Send has been disabled by the protocol") - } - // get and update latest block height blockNumber, err := ob.evmClient.BlockNumber(context.Background()) if err != nil { @@ -952,7 +936,7 @@ func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error { lastScannedDeposited := ob.ObserveERC20Deposited(startBlock, toBlock) // task 3: query the incoming tx to TSS address (read at most 100 blocks in one go) - lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock, flags) + lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock) // note: using lowest height for all 3 events is not perfect, but it's simple and good enough lastScannedLowest := lastScannedZetaSent @@ -1129,7 +1113,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 // ObserverTSSReceive queries the incoming gas asset to TSS address and posts to zetabridge // returns the last block successfully scanned -func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags observertypes.CrosschainFlags) uint64 { +func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64) uint64 { if !ob.GetChainParams().IsSupported { return startBlock - 1 // lastScanned } @@ -1138,6 +1122,7 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags obse for bn := startBlock; bn <= toBlock; bn++ { // post new block header (if any) to zetabridge and ignore error // TODO: consider having a independent ticker(from TSS scaning) for posting block headers + flags := ob.coreContext.GetCrossChainFlags() if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled && chains.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains @@ -1159,23 +1144,20 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags obse } func (ob *ChainClient) WatchGasPrice() { - ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice starting...") + // report gas price right away as the ticker takes time to kick in err := ob.PostGasPrice() if err != nil { - height, err := ob.zetaClient.GetBlockHeight() - if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msg("GetBlockHeight error") - } else { - ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height) - } + ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) } + // start gas price ticker ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchGasPrice_%d", ob.chain.ChainId), ob.GetChainParams().GasPriceTicker) if err != nil { ob.logger.WatchGasPrice.Error().Err(err).Msg("NewDynamicTicker error") return } - ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started with interval %d", ob.GetChainParams().GasPriceTicker) + ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d", + ob.chain.ChainId, ob.GetChainParams().GasPriceTicker) defer ticker.Stop() for { @@ -1183,12 +1165,7 @@ func (ob *ChainClient) WatchGasPrice() { case <-ticker.C(): err = ob.PostGasPrice() if err != nil { - height, err := ob.zetaClient.GetBlockHeight() - if err != nil { - ob.logger.WatchGasPrice.Error().Err(err).Msg("GetBlockHeight error") - } else { - ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height) - } + ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) } ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice) case <-ob.stop: diff --git a/zetaclient/evm/inbounds.go b/zetaclient/evm/inbounds.go index 2daa4fc0c1..4459f361f5 100644 --- a/zetaclient/evm/inbounds.go +++ b/zetaclient/evm/inbounds.go @@ -43,6 +43,9 @@ func (ob *ChainClient) ExternalChainWatcherForNewInboundTrackerSuggestions() { for { select { case <-ticker.C(): + if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled { + continue + } err := ob.ObserveIntxTrackers() if err != nil { ob.logger.ExternalChainWatcher.Err(err).Msg("ObserveTrackerSuggestions error") diff --git a/zetaclient/zetacore_observer.go b/zetaclient/zetacore_observer.go index 8ec76013a5..e31766b3be 100644 --- a/zetaclient/zetacore_observer.go +++ b/zetaclient/zetacore_observer.go @@ -138,42 +138,45 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) { // schedule keysign for pending cctxs on each chain coreContext := appContext.ZetaCoreContext() - supportedChains := coreContext.GetEnabledChains() - for _, c := range supportedChains { - if c.ChainId == co.bridge.ZetaChain().ChainId { - continue - } - // update chain parameters for signer and chain client - signer, err := co.GetUpdatedSigner(coreContext, c.ChainId) - if err != nil { - co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId) - continue - } - ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId) - if err != nil { - co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId) - continue - } - - cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId) - if err != nil { - co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId) - continue - } - // Set Pending transactions prometheus gauge - metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending)) - - // #nosec G701 range is verified - zetaHeight := uint64(bn) - if chains.IsEVMChain(c.ChainId) { - co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer) - } else if chains.IsBitcoinChain(c.ChainId) { - co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer) - } else { - co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId) - continue + if flags := coreContext.GetCrossChainFlags(); flags.IsOutboundEnabled { + supportedChains := coreContext.GetEnabledChains() + for _, c := range supportedChains { + if c.ChainId == co.bridge.ZetaChain().ChainId { + continue + } + // update chain parameters for signer and chain client + signer, err := co.GetUpdatedSigner(coreContext, c.ChainId) + if err != nil { + co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId) + continue + } + ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId) + if err != nil { + co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId) + continue + } + + cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId) + if err != nil { + co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId) + continue + } + // Set Pending transactions prometheus gauge + metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending)) + + // #nosec G701 range is verified + zetaHeight := uint64(bn) + if chains.IsEVMChain(c.ChainId) { + co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer) + } else if chains.IsBitcoinChain(c.ChainId) { + co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer) + } else { + co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId) + continue + } } } + // update last processed block number lastBlockNum = bn metrics.LastCoreBlockNumber.Set(float64(lastBlockNum))