diff --git a/changelog.md b/changelog.md index 51ad1dd040..c80a744c33 100644 --- a/changelog.md +++ b/changelog.md @@ -20,6 +20,8 @@ ### Fixes +* [1554](https://github.com/zeta-chain/node/pull/1554) - Screen out unconfirmed UTXOs that are not created by TSS itself +* [1560](https://github.com/zeta-chain/node/issues/1560) - Zetaclient post evm-chain outtx hashes only when receipt is available * [1516](https://github.com/zeta-chain/node/issues/1516) - Unprivileged outtx tracker removal * [1537](https://github.com/zeta-chain/node/issues/1537) - Sanity check events of ZetaSent/ZetaReceived/ZetaRevertedWithdrawn/Deposited * [1530](https://github.com/zeta-chain/node/pull/1530) - Outbound tx confirmation/inclusion enhancement @@ -37,9 +39,12 @@ * [1522](https://github.com/zeta-chain/node/pull/1522/files) - block `distribution` module account from receiving zeta * [1528](https://github.com/zeta-chain/node/pull/1528) - fix panic caused on decoding malformed BTC addresses * [1536](https://github.com/zeta-chain/node/pull/1536) - add index to check previously finalized inbounds +* [1556](https://github.com/zeta-chain/node/pull/1556) - add emptiness check for topic array in event parsing +* [1555](https://github.com/zeta-chain/node/pull/1555) - Reduce websocket message limit to 10MB ### Refactoring +* [1552](https://github.com/zeta-chain/node/pull/1552) - requires group2 to enable header verification * [1211](https://github.com/zeta-chain/node/issues/1211) - use `grpc` and `msg` for query and message files * refactor cctx scheduler - decouple evm cctx scheduler from btc cctx scheduler * move tss state from crosschain to observer @@ -53,6 +58,7 @@ * Remove chain id from the index for observer mapper and rename it to observer set. * Add logger to smoke tests * [1521](https://github.com/zeta-chain/node/pull/1521) - replace go-tss lib version with one that reverts back to thorchain tss-lib +* [1558](https://github.com/zeta-chain/node/pull/1558) - change log level for gas stability pool iteration error * Update --ledger flag hint ### Chores diff --git a/rpc/websockets.go b/rpc/websockets.go index c3b928dc36..3c64bfdb8a 100644 --- a/rpc/websockets.go +++ b/rpc/websockets.go @@ -47,7 +47,7 @@ import ( ) const ( - messageSizeLimit = 32 * 1024 * 1024 // 32MB + messageSizeLimit = 10 * 1024 * 1024 // 10MB ) diff --git a/x/crosschain/keeper/evm_hooks.go b/x/crosschain/keeper/evm_hooks.go index e4c3ed5d4b..f2b9f6bc8d 100644 --- a/x/crosschain/keeper/evm_hooks.go +++ b/x/crosschain/keeper/evm_hooks.go @@ -275,6 +275,9 @@ func (k Keeper) ParseZRC20WithdrawalEvent(ctx sdk.Context, log ethtypes.Log) (*z if err != nil { return nil, err } + if len(log.Topics) == 0 { + return nil, fmt.Errorf("ParseZRC20WithdrawalEvent: invalid log - no topics") + } event, err := zrc20ZEVM.ParseWithdrawal(log) if err != nil { return nil, err @@ -306,6 +309,9 @@ func ParseZetaSentEvent(log ethtypes.Log, connectorZEVM ethcommon.Address) (*con if err != nil { return nil, err } + if len(log.Topics) == 0 { + return nil, fmt.Errorf("ParseZetaSentEvent: invalid log - no topics") + } event, err := zetaConnectorZEVM.ParseZetaSent(log) if err != nil { return nil, err diff --git a/x/crosschain/module.go b/x/crosschain/module.go index a0177cd11e..9e2e4ec81b 100644 --- a/x/crosschain/module.go +++ b/x/crosschain/module.go @@ -190,7 +190,7 @@ func (AppModule) ConsensusVersion() uint64 { return 4 } func (am AppModule) BeginBlock(ctx sdk.Context, _ abci.RequestBeginBlock) { err := am.keeper.IterateAndUpdateCctxGasPrice(ctx) if err != nil { - ctx.Logger().Error("Error iterating and updating pending cctx gas price", "err", err.Error()) + ctx.Logger().Info("Error iterating and updating pending cctx gas price", "err", err.Error()) } } diff --git a/x/observer/keeper/msg_server_update_crosschain_flags.go b/x/observer/keeper/msg_server_update_crosschain_flags.go index 41904c75be..30325bef64 100644 --- a/x/observer/keeper/msg_server_update_crosschain_flags.go +++ b/x/observer/keeper/msg_server_update_crosschain_flags.go @@ -14,13 +14,8 @@ import ( func (k msgServer) UpdateCrosschainFlags(goCtx context.Context, msg *types.MsgUpdateCrosschainFlags) (*types.MsgUpdateCrosschainFlagsResponse, error) { ctx := sdk.UnwrapSDKContext(goCtx) - requiredGroup := types.Policy_Type_group1 - if msg.IsInboundEnabled || msg.IsOutboundEnabled || msg.GasPriceIncreaseFlags != nil { - requiredGroup = types.Policy_Type_group2 - } - // check permission - if msg.Creator != k.GetParams(ctx).GetAdminPolicyAccount(requiredGroup) { + if msg.Creator != k.GetParams(ctx).GetAdminPolicyAccount(msg.GetRequiredGroup()) { return &types.MsgUpdateCrosschainFlagsResponse{}, types.ErrNotAuthorizedPolicy } diff --git a/x/observer/keeper/msg_server_update_crosschain_flags_test.go b/x/observer/keeper/msg_server_update_crosschain_flags_test.go index 6481b44ee0..555988694c 100644 --- a/x/observer/keeper/msg_server_update_crosschain_flags_test.go +++ b/x/observer/keeper/msg_server_update_crosschain_flags_test.go @@ -57,6 +57,7 @@ func TestMsgServer_UpdateCrosschainFlags(t *testing.T) { require.Equal(t, uint32(42), flags.GasPriceIncreaseFlags.GasPriceIncreasePercent) require.True(t, flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled) require.False(t, flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled) + setAdminCrossChainFlags(ctx, k, admin, types.Policy_Type_group2) // can update flags again @@ -71,7 +72,7 @@ func TestMsgServer_UpdateCrosschainFlags(t *testing.T) { }, BlockHeaderVerificationFlags: &types.BlockHeaderVerificationFlags{ IsEthTypeChainEnabled: false, - IsBtcTypeChainEnabled: false, + IsBtcTypeChainEnabled: true, }, }) require.NoError(t, err) @@ -84,7 +85,8 @@ func TestMsgServer_UpdateCrosschainFlags(t *testing.T) { require.Equal(t, time.Minute*43, flags.GasPriceIncreaseFlags.RetryInterval) require.Equal(t, uint32(43), flags.GasPriceIncreaseFlags.GasPriceIncreasePercent) require.False(t, flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled) - require.False(t, flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled) + require.True(t, flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled) + // group 1 should be able to disable inbound and outbound setAdminCrossChainFlags(ctx, k, admin, types.Policy_Type_group1) @@ -103,6 +105,33 @@ func TestMsgServer_UpdateCrosschainFlags(t *testing.T) { require.Equal(t, int64(43), flags.GasPriceIncreaseFlags.EpochLength) require.Equal(t, time.Minute*43, flags.GasPriceIncreaseFlags.RetryInterval) require.Equal(t, uint32(43), flags.GasPriceIncreaseFlags.GasPriceIncreasePercent) + require.False(t, flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled) + require.True(t, flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled) + + // group 1 should be able to disable header verification + setAdminCrossChainFlags(ctx, k, admin, types.Policy_Type_group1) + + // if gas price increase flags is nil, it should not be updated + _, err = srv.UpdateCrosschainFlags(sdk.WrapSDKContext(ctx), &types.MsgUpdateCrosschainFlags{ + Creator: admin, + IsInboundEnabled: false, + IsOutboundEnabled: false, + BlockHeaderVerificationFlags: &types.BlockHeaderVerificationFlags{ + IsEthTypeChainEnabled: false, + IsBtcTypeChainEnabled: false, + }, + }) + require.NoError(t, err) + + flags, found = k.GetCrosschainFlags(ctx) + require.True(t, found) + require.False(t, flags.IsInboundEnabled) + require.False(t, flags.IsOutboundEnabled) + require.Equal(t, int64(43), flags.GasPriceIncreaseFlags.EpochLength) + require.Equal(t, time.Minute*43, flags.GasPriceIncreaseFlags.RetryInterval) + require.Equal(t, uint32(43), flags.GasPriceIncreaseFlags.GasPriceIncreasePercent) + require.False(t, flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled) + require.False(t, flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled) // if flags are not defined, default should be used k.RemoveCrosschainFlags(ctx) diff --git a/x/observer/types/message_crosschain_flags.go b/x/observer/types/message_crosschain_flags.go index cdca148a94..58d49cd3d3 100644 --- a/x/observer/types/message_crosschain_flags.go +++ b/x/observer/types/message_crosschain_flags.go @@ -67,3 +67,21 @@ func (gpf GasPriceIncreaseFlags) Validate() error { } return nil } + +// GetRequiredGroup returns the required group policy for the message to execute the message +// Group 1 should only be able to stop or disable functiunalities in case of emergency +// this concerns disabling inbound and outbound txs or block header verification +// every other action requires group 2 +func (msg *MsgUpdateCrosschainFlags) GetRequiredGroup() Policy_Type { + if msg.IsInboundEnabled || msg.IsOutboundEnabled { + return Policy_Type_group2 + } + if msg.GasPriceIncreaseFlags != nil { + return Policy_Type_group2 + } + if msg.BlockHeaderVerificationFlags != nil && (msg.BlockHeaderVerificationFlags.IsEthTypeChainEnabled || msg.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled) { + return Policy_Type_group2 + + } + return Policy_Type_group1 +} diff --git a/x/observer/types/message_crosschain_flags_test.go b/x/observer/types/message_crosschain_flags_test.go index f01c33e6ef..199b80c13a 100644 --- a/x/observer/types/message_crosschain_flags_test.go +++ b/x/observer/types/message_crosschain_flags_test.go @@ -116,3 +116,118 @@ func TestGasPriceIncreaseFlags_Validate(t *testing.T) { }) } } + +func TestMsgUpdateCrosschainFlags_GetRequiredGroup(t *testing.T) { + tests := []struct { + name string + msg types.MsgUpdateCrosschainFlags + want types.Policy_Type + }{ + { + name: "disabling outbound and inbound allows group 1", + msg: types.MsgUpdateCrosschainFlags{ + Creator: sample.AccAddress(), + IsInboundEnabled: false, + IsOutboundEnabled: false, + BlockHeaderVerificationFlags: nil, + GasPriceIncreaseFlags: nil, + }, + want: types.Policy_Type_group1, + }, + { + name: "disabling outbound and inbound and block header verification allows group 1", + msg: types.MsgUpdateCrosschainFlags{ + Creator: sample.AccAddress(), + IsInboundEnabled: false, + IsOutboundEnabled: false, + BlockHeaderVerificationFlags: &types.BlockHeaderVerificationFlags{ + IsEthTypeChainEnabled: false, + IsBtcTypeChainEnabled: false, + }, + GasPriceIncreaseFlags: nil, + }, + want: types.Policy_Type_group1, + }, + { + name: "updating gas price increase flags requires group 2", + msg: types.MsgUpdateCrosschainFlags{ + Creator: sample.AccAddress(), + IsInboundEnabled: false, + IsOutboundEnabled: false, + BlockHeaderVerificationFlags: &types.BlockHeaderVerificationFlags{ + IsEthTypeChainEnabled: false, + IsBtcTypeChainEnabled: false, + }, + GasPriceIncreaseFlags: &types.GasPriceIncreaseFlags{ + EpochLength: 1, + RetryInterval: 1, + GasPriceIncreasePercent: 1, + MaxPendingCctxs: 100, + }, + }, + want: types.Policy_Type_group2, + }, + { + name: "enabling inbound requires group 2", + msg: types.MsgUpdateCrosschainFlags{ + Creator: sample.AccAddress(), + IsInboundEnabled: true, + IsOutboundEnabled: false, + BlockHeaderVerificationFlags: &types.BlockHeaderVerificationFlags{ + IsEthTypeChainEnabled: false, + IsBtcTypeChainEnabled: false, + }, + GasPriceIncreaseFlags: nil, + }, + want: types.Policy_Type_group2, + }, + { + name: "enabling outbound requires group 2", + msg: types.MsgUpdateCrosschainFlags{ + Creator: sample.AccAddress(), + IsInboundEnabled: false, + IsOutboundEnabled: true, + BlockHeaderVerificationFlags: &types.BlockHeaderVerificationFlags{ + IsEthTypeChainEnabled: false, + IsBtcTypeChainEnabled: false, + }, + GasPriceIncreaseFlags: nil, + }, + want: types.Policy_Type_group2, + }, + { + name: "enabling eth header verification requires group 2", + msg: types.MsgUpdateCrosschainFlags{ + Creator: sample.AccAddress(), + IsInboundEnabled: false, + IsOutboundEnabled: false, + BlockHeaderVerificationFlags: &types.BlockHeaderVerificationFlags{ + IsEthTypeChainEnabled: true, + IsBtcTypeChainEnabled: false, + }, + GasPriceIncreaseFlags: nil, + }, + want: types.Policy_Type_group2, + }, + { + name: "enabling btc header verification requires group 2", + msg: types.MsgUpdateCrosschainFlags{ + Creator: sample.AccAddress(), + IsInboundEnabled: false, + IsOutboundEnabled: false, + BlockHeaderVerificationFlags: &types.BlockHeaderVerificationFlags{ + IsEthTypeChainEnabled: false, + IsBtcTypeChainEnabled: true, + }, + GasPriceIncreaseFlags: nil, + }, + want: types.Policy_Type_group2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.want, tt.msg.GetRequiredGroup()) + }) + } +} diff --git a/zetaclient/bitcoin_client.go b/zetaclient/bitcoin_client.go index 2b9f242b56..39649a0307 100644 --- a/zetaclient/bitcoin_client.go +++ b/zetaclient/bitcoin_client.go @@ -57,7 +57,7 @@ type BitcoinChainClient struct { Mu *sync.Mutex // lock for all the maps, utxos and core params pendingNonce uint64 - includedTxHashes map[string]uint64 // key: tx hash + includedTxHashes map[string]bool // key: tx hash includedTxResults map[string]*btcjson.GetTransactionResult // key: chain-tss-nonce broadcastedTx map[string]string // key: chain-tss-nonce, value: outTx hash utxos []btcjson.ListUnspentResult @@ -147,7 +147,7 @@ func NewBitcoinClient( ob.zetaClient = bridge ob.Tss = tss - ob.includedTxHashes = make(map[string]uint64) + ob.includedTxHashes = make(map[string]bool) ob.includedTxResults = make(map[string]*btcjson.GetTransactionResult) ob.broadcastedTx = make(map[string]string) ob.params = btcCfg.ChainParams @@ -365,9 +365,11 @@ func (ob *BitcoinChainClient) observeInTx() error { } // add block header to zetacore - err = ob.postBlockHeader(bn) - if err != nil { - ob.logger.WatchInTx.Warn().Err(err).Msgf("observeInTxBTC: error posting block header %d", bn) + if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled { + err = ob.postBlockHeader(bn) + if err != nil { + ob.logger.WatchInTx.Warn().Err(err).Msgf("observeInTxBTC: error posting block header %d", bn) + } } tssAddress := ob.Tss.BTCAddress() @@ -755,16 +757,13 @@ func (ob *BitcoinChainClient) FetchUTXOS() error { } maxConfirmations := int(bh) - // List unspent. + // List all unspent UTXOs (160ms) tssAddr := ob.Tss.BTCAddress() address, err := common.DecodeBtcAddress(tssAddr, ob.chain.ChainId) if err != nil { return fmt.Errorf("btc: error decoding wallet address (%s) : %s", tssAddr, err.Error()) } - addresses := []btcutil.Address{address} - - // fetching all TSS utxos takes 160ms - utxos, err := ob.rpcClient.ListUnspentMinMaxAddresses(0, maxConfirmations, addresses) + utxos, err := ob.rpcClient.ListUnspentMinMaxAddresses(0, maxConfirmations, []btcutil.Address{address}) if err != nil { return err } @@ -780,12 +779,20 @@ func (ob *BitcoinChainClient) FetchUTXOS() error { return utxos[i].Amount < utxos[j].Amount }) - // filter UTXOs big enough to cover the cost of spending themselves + // filter UTXOs good to spend for next TSS transaction utxosFiltered := make([]btcjson.ListUnspentResult, 0) for _, utxo := range utxos { - if utxo.Amount >= BtcDepositorFeeMin { - utxosFiltered = append(utxosFiltered, utxo) + // UTXOs big enough to cover the cost of spending themselves + if utxo.Amount < BtcDepositorFeeMin { + continue + } + // we don't want to spend other people's unconfirmed UTXOs as they may not be safe to spend + if utxo.Confirmations == 0 { + if !ob.isTssTransaction(utxo.TxID) { + continue + } } + utxosFiltered = append(utxosFiltered, utxo) } ob.Mu.Lock() @@ -795,6 +802,13 @@ func (ob *BitcoinChainClient) FetchUTXOS() error { return nil } +// isTssTransaction checks if a given transaction was sent by TSS itself. +// An unconfirmed transaction is safe to spend only if it was sent by TSS and verified by ourselves. +func (ob *BitcoinChainClient) isTssTransaction(txid string) bool { + _, found := ob.includedTxHashes[txid] + return found +} + // refreshPendingNonce tries increasing the artificial pending nonce of outTx (if lagged behind). // There could be many (unpredictable) reasons for a pending nonce lagging behind, for example: // 1. The zetaclient gets restarted. @@ -1081,6 +1095,7 @@ func (ob *BitcoinChainClient) setIncludedTx(nonce uint64, getTxResult *btcjson.G res, found := ob.includedTxResults[outTxID] if !found { // not found. + ob.includedTxHashes[txHash] = true ob.includedTxResults[outTxID] = getTxResult // include new outTx and enforce rigid 1-to-1 mapping: nonce <===> txHash if nonce >= ob.pendingNonce { // try increasing pending nonce on every newly included outTx ob.pendingNonce = nonce + 1 @@ -1105,11 +1120,15 @@ func (ob *BitcoinChainClient) getIncludedTx(nonce uint64) *btcjson.GetTransactio return ob.includedTxResults[ob.GetTxID(nonce)] } -// removeIncludedTx removes included tx's result from memory +// removeIncludedTx removes included tx from memory func (ob *BitcoinChainClient) removeIncludedTx(nonce uint64) { ob.Mu.Lock() defer ob.Mu.Unlock() - delete(ob.includedTxResults, ob.GetTxID(nonce)) + txResult, found := ob.includedTxResults[ob.GetTxID(nonce)] + if found { + delete(ob.includedTxHashes, txResult.TxID) + delete(ob.includedTxResults, ob.GetTxID(nonce)) + } } // Basic TSS outTX checks: diff --git a/zetaclient/evm_client.go b/zetaclient/evm_client.go index 86d4444f60..8a68ac121e 100644 --- a/zetaclient/evm_client.go +++ b/zetaclient/evm_client.go @@ -885,7 +885,7 @@ func (ob *EVMChainClient) observeInTX(sampledLogger zerolog.Logger) error { lastScannedDeposited := ob.observeERC20Deposited(startBlock, toBlock) // task 3: query the incoming tx to TSS address (read at most 100 blocks in one go) - lastScannedTssRecvd := ob.observeTssRecvd(startBlock, toBlock) + lastScannedTssRecvd := ob.observeTssRecvd(startBlock, toBlock, flags) // note: using lowest height for all 3 events is not perfect, but it's simple and good enough lastScannedLowest := lastScannedZetaSent @@ -1068,7 +1068,7 @@ func (ob *EVMChainClient) observeERC20Deposited(startBlock, toBlock uint64) uint // observeTssRecvd queries the incoming gas asset to TSS address and posts to zetacore // returns the last block successfully scanned -func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64) uint64 { +func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64, flags observertypes.CrosschainFlags) uint64 { // check TSS address (after keygen, ob.Tss.pubkey will be updated) tssAddress := ob.Tss.EVMAddress() if tssAddress == (ethcommon.Address{}) { @@ -1080,7 +1080,9 @@ func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64) uint64 { for bn := startBlock; bn <= toBlock; bn++ { // post new block header (if any) to zetacore and ignore error // TODO: consider having a independent ticker(from TSS scaning) for posting block headers - if common.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains + if flags.BlockHeaderVerificationFlags != nil && + flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled && + common.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains err := ob.postBlockHeader(toBlock) if err != nil { ob.logger.ExternalChainWatcher.Error().Err(err).Msg("error posting block header") diff --git a/zetaclient/evm_signer.go b/zetaclient/evm_signer.go index 7882104cd5..c961a627e0 100644 --- a/zetaclient/evm_signer.go +++ b/zetaclient/evm_signer.go @@ -9,6 +9,7 @@ import ( "math/rand" "strconv" "strings" + "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi" @@ -24,6 +25,10 @@ import ( observertypes "github.com/zeta-chain/zetacore/x/observer/types" ) +const ( + OutTxTrackerReportTimeout = 10 * time.Minute +) + type EVMSigner struct { client EVMRPCClient chain *common.Chain @@ -36,6 +41,10 @@ type EVMSigner struct { erc20CustodyContractAddress ethcommon.Address logger zerolog.Logger ts *TelemetryServer + + // for outTx tracker report only + mu *sync.Mutex + outTxHashBeingReported map[string]bool } var _ ChainSigner = &EVMSigner{} @@ -83,7 +92,9 @@ func NewEVMSigner( logger: logger.With(). Str("chain", chain.ChainName.String()). Str("module", "EVMSigner").Logger(), - ts: ts, + ts: ts, + mu: &sync.Mutex{}, + outTxHashBeingReported: make(map[string]bool), }, nil } @@ -566,11 +577,7 @@ func (signer *EVMSigner) TryProcessOutTx( log.Warn().Err(err).Msgf("OutTx Broadcast error") retry, report := HandleBroadcastError(err, strconv.FormatUint(send.GetCurrentOutTxParam().OutboundTxTssNonce, 10), toChain.String(), outTxHash) if report { - zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(toChain.ChainId, tx.Nonce(), outTxHash, nil, "", -1) - if err != nil { - logger.Err(err).Msgf("Unable to add to tracker on ZetaCore: nonce %d chain %s outTxHash %s", send.GetCurrentOutTxParam().OutboundTxTssNonce, toChain, outTxHash) - } - logger.Info().Msgf("Broadcast to core successful %s", zetaHash) + signer.reportToOutTxTracker(zetaBridge, toChain.ChainId, tx.Nonce(), outTxHash, logger) } if !retry { break @@ -579,15 +586,62 @@ func (signer *EVMSigner) TryProcessOutTx( continue } logger.Info().Msgf("Broadcast success: nonce %d to chain %s outTxHash %s", send.GetCurrentOutTxParam().OutboundTxTssNonce, toChain, outTxHash) - zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(toChain.ChainId, tx.Nonce(), outTxHash, nil, "", -1) - if err != nil { - logger.Err(err).Msgf("Unable to add to tracker on ZetaCore: nonce %d chain %s outTxHash %s", send.GetCurrentOutTxParam().OutboundTxTssNonce, toChain, outTxHash) - } - logger.Info().Msgf("Broadcast to core successful %s", zetaHash) + signer.reportToOutTxTracker(zetaBridge, toChain.ChainId, tx.Nonce(), outTxHash, logger) break // successful broadcast; no need to retry } + } +} +// reportToOutTxTracker reports outTxHash to tracker only when tx receipt is available +func (signer *EVMSigner) reportToOutTxTracker(zetaBridge ZetaCoreBridger, chainID int64, nonce uint64, outTxHash string, logger zerolog.Logger) { + // skip if already being reported + signer.mu.Lock() + defer signer.mu.Unlock() + if _, found := signer.outTxHashBeingReported[outTxHash]; found { + logger.Info().Msgf("reportToOutTxTracker: outTxHash %s for chain %d nonce %d is being reported", outTxHash, chainID, nonce) + return } + signer.outTxHashBeingReported[outTxHash] = true // mark as being reported + + // report to outTx tracker with goroutine + go func() { + defer func() { + signer.mu.Lock() + delete(signer.outTxHashBeingReported, outTxHash) + signer.mu.Unlock() + }() + + // try fetching tx receipt for 10 minutes + tStart := time.Now() + for { + if time.Since(tStart) > OutTxTrackerReportTimeout { // give up after 10 minutes + logger.Info().Msgf("reportToOutTxTracker: outTxHash report timeout for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash) + return + } + receipt, err := signer.client.TransactionReceipt(context.TODO(), ethcommon.HexToHash(outTxHash)) + if err != nil { + logger.Info().Err(err).Msgf("reportToOutTxTracker: receipt not available for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash) + time.Sleep(10 * time.Second) + continue + } + if receipt != nil { + _, isPending, err := signer.client.TransactionByHash(context.TODO(), ethcommon.HexToHash(outTxHash)) + if err != nil || isPending { + logger.Info().Err(err).Msgf("reportToOutTxTracker: error getting tx or tx is pending for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash) + time.Sleep(10 * time.Second) + continue + } + break + } + } + + // report to outTxTracker + zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(chainID, nonce, outTxHash, nil, "", -1) + if err != nil { + logger.Err(err).Msgf("reportToOutTxTracker: unable to add to tracker on ZetaCore for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash) + } + logger.Info().Msgf("reportToOutTxTracker: reported outTxHash to core successful %s, chain %d nonce %d outTxHash %s", zetaHash, chainID, nonce, outTxHash) + }() } // SignERC20WithdrawTx