diff --git a/changelog.md b/changelog.md index 59ec309b3c..3324692fcc 100644 --- a/changelog.md +++ b/changelog.md @@ -20,6 +20,7 @@ ### Fixes +* [1560](https://github.com/zeta-chain/node/issues/1560) - Zetaclient post evm-chain outtx hashes only when receipt is available * [1516](https://github.com/zeta-chain/node/issues/1516) - Unprivileged outtx tracker removal * [1537](https://github.com/zeta-chain/node/issues/1537) - Sanity check events of ZetaSent/ZetaReceived/ZetaRevertedWithdrawn/Deposited * [1530](https://github.com/zeta-chain/node/pull/1530) - Outbound tx confirmation/inclusion enhancement diff --git a/zetaclient/bitcoin_client.go b/zetaclient/bitcoin_client.go index 2b9f242b56..43a0cc422e 100644 --- a/zetaclient/bitcoin_client.go +++ b/zetaclient/bitcoin_client.go @@ -365,9 +365,11 @@ func (ob *BitcoinChainClient) observeInTx() error { } // add block header to zetacore - err = ob.postBlockHeader(bn) - if err != nil { - ob.logger.WatchInTx.Warn().Err(err).Msgf("observeInTxBTC: error posting block header %d", bn) + if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled { + err = ob.postBlockHeader(bn) + if err != nil { + ob.logger.WatchInTx.Warn().Err(err).Msgf("observeInTxBTC: error posting block header %d", bn) + } } tssAddress := ob.Tss.BTCAddress() diff --git a/zetaclient/evm_client.go b/zetaclient/evm_client.go index 86d4444f60..8a68ac121e 100644 --- a/zetaclient/evm_client.go +++ b/zetaclient/evm_client.go @@ -885,7 +885,7 @@ func (ob *EVMChainClient) observeInTX(sampledLogger zerolog.Logger) error { lastScannedDeposited := ob.observeERC20Deposited(startBlock, toBlock) // task 3: query the incoming tx to TSS address (read at most 100 blocks in one go) - lastScannedTssRecvd := ob.observeTssRecvd(startBlock, toBlock) + lastScannedTssRecvd := ob.observeTssRecvd(startBlock, toBlock, flags) // note: using lowest height for all 3 events is not perfect, but it's simple and good enough lastScannedLowest := lastScannedZetaSent @@ -1068,7 +1068,7 @@ func (ob *EVMChainClient) observeERC20Deposited(startBlock, toBlock uint64) uint // observeTssRecvd queries the incoming gas asset to TSS address and posts to zetacore // returns the last block successfully scanned -func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64) uint64 { +func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64, flags observertypes.CrosschainFlags) uint64 { // check TSS address (after keygen, ob.Tss.pubkey will be updated) tssAddress := ob.Tss.EVMAddress() if tssAddress == (ethcommon.Address{}) { @@ -1080,7 +1080,9 @@ func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64) uint64 { for bn := startBlock; bn <= toBlock; bn++ { // post new block header (if any) to zetacore and ignore error // TODO: consider having a independent ticker(from TSS scaning) for posting block headers - if common.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains + if flags.BlockHeaderVerificationFlags != nil && + flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled && + common.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains err := ob.postBlockHeader(toBlock) if err != nil { ob.logger.ExternalChainWatcher.Error().Err(err).Msg("error posting block header") diff --git a/zetaclient/evm_signer.go b/zetaclient/evm_signer.go index 7882104cd5..c961a627e0 100644 --- a/zetaclient/evm_signer.go +++ b/zetaclient/evm_signer.go @@ -9,6 +9,7 @@ import ( "math/rand" "strconv" "strings" + "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi" @@ -24,6 +25,10 @@ import ( observertypes "github.com/zeta-chain/zetacore/x/observer/types" ) +const ( + OutTxTrackerReportTimeout = 10 * time.Minute +) + type EVMSigner struct { client EVMRPCClient chain *common.Chain @@ -36,6 +41,10 @@ type EVMSigner struct { erc20CustodyContractAddress ethcommon.Address logger zerolog.Logger ts *TelemetryServer + + // for outTx tracker report only + mu *sync.Mutex + outTxHashBeingReported map[string]bool } var _ ChainSigner = &EVMSigner{} @@ -83,7 +92,9 @@ func NewEVMSigner( logger: logger.With(). Str("chain", chain.ChainName.String()). Str("module", "EVMSigner").Logger(), - ts: ts, + ts: ts, + mu: &sync.Mutex{}, + outTxHashBeingReported: make(map[string]bool), }, nil } @@ -566,11 +577,7 @@ func (signer *EVMSigner) TryProcessOutTx( log.Warn().Err(err).Msgf("OutTx Broadcast error") retry, report := HandleBroadcastError(err, strconv.FormatUint(send.GetCurrentOutTxParam().OutboundTxTssNonce, 10), toChain.String(), outTxHash) if report { - zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(toChain.ChainId, tx.Nonce(), outTxHash, nil, "", -1) - if err != nil { - logger.Err(err).Msgf("Unable to add to tracker on ZetaCore: nonce %d chain %s outTxHash %s", send.GetCurrentOutTxParam().OutboundTxTssNonce, toChain, outTxHash) - } - logger.Info().Msgf("Broadcast to core successful %s", zetaHash) + signer.reportToOutTxTracker(zetaBridge, toChain.ChainId, tx.Nonce(), outTxHash, logger) } if !retry { break @@ -579,15 +586,62 @@ func (signer *EVMSigner) TryProcessOutTx( continue } logger.Info().Msgf("Broadcast success: nonce %d to chain %s outTxHash %s", send.GetCurrentOutTxParam().OutboundTxTssNonce, toChain, outTxHash) - zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(toChain.ChainId, tx.Nonce(), outTxHash, nil, "", -1) - if err != nil { - logger.Err(err).Msgf("Unable to add to tracker on ZetaCore: nonce %d chain %s outTxHash %s", send.GetCurrentOutTxParam().OutboundTxTssNonce, toChain, outTxHash) - } - logger.Info().Msgf("Broadcast to core successful %s", zetaHash) + signer.reportToOutTxTracker(zetaBridge, toChain.ChainId, tx.Nonce(), outTxHash, logger) break // successful broadcast; no need to retry } + } +} +// reportToOutTxTracker reports outTxHash to tracker only when tx receipt is available +func (signer *EVMSigner) reportToOutTxTracker(zetaBridge ZetaCoreBridger, chainID int64, nonce uint64, outTxHash string, logger zerolog.Logger) { + // skip if already being reported + signer.mu.Lock() + defer signer.mu.Unlock() + if _, found := signer.outTxHashBeingReported[outTxHash]; found { + logger.Info().Msgf("reportToOutTxTracker: outTxHash %s for chain %d nonce %d is being reported", outTxHash, chainID, nonce) + return } + signer.outTxHashBeingReported[outTxHash] = true // mark as being reported + + // report to outTx tracker with goroutine + go func() { + defer func() { + signer.mu.Lock() + delete(signer.outTxHashBeingReported, outTxHash) + signer.mu.Unlock() + }() + + // try fetching tx receipt for 10 minutes + tStart := time.Now() + for { + if time.Since(tStart) > OutTxTrackerReportTimeout { // give up after 10 minutes + logger.Info().Msgf("reportToOutTxTracker: outTxHash report timeout for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash) + return + } + receipt, err := signer.client.TransactionReceipt(context.TODO(), ethcommon.HexToHash(outTxHash)) + if err != nil { + logger.Info().Err(err).Msgf("reportToOutTxTracker: receipt not available for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash) + time.Sleep(10 * time.Second) + continue + } + if receipt != nil { + _, isPending, err := signer.client.TransactionByHash(context.TODO(), ethcommon.HexToHash(outTxHash)) + if err != nil || isPending { + logger.Info().Err(err).Msgf("reportToOutTxTracker: error getting tx or tx is pending for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash) + time.Sleep(10 * time.Second) + continue + } + break + } + } + + // report to outTxTracker + zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(chainID, nonce, outTxHash, nil, "", -1) + if err != nil { + logger.Err(err).Msgf("reportToOutTxTracker: unable to add to tracker on ZetaCore for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash) + } + logger.Info().Msgf("reportToOutTxTracker: reported outTxHash to core successful %s, chain %d nonce %d outTxHash %s", zetaHash, chainID, nonce, outTxHash) + }() } // SignERC20WithdrawTx