Skip to content

Commit

Permalink
fix: outtx tracker report timeout (#1616)
Browse files Browse the repository at this point in the history
* fix outtx tracker report timeout

* make generate

* remove err when print Info logs

* update changelog

---------

Co-authored-by: Lucas Bertrand <[email protected]>
  • Loading branch information
ws4charlie and lumtis authored Jan 26, 2024
1 parent 72f6407 commit 64ed9b1
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 20 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

### Fixes
* [1610](https://github.com/zeta-chain/node/issues/1610) - add pending outtx hash to tracker after monitoring for 10 minutes

## Version: v12.1.0

### Tests
Expand Down
83 changes: 64 additions & 19 deletions zetaclient/evm_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (
"github.com/rs/zerolog/log"
"github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/erc20custody.sol"
"github.com/zeta-chain/zetacore/common"
crosschainkeeper "github.com/zeta-chain/zetacore/x/crosschain/keeper"
"github.com/zeta-chain/zetacore/x/crosschain/types"
observertypes "github.com/zeta-chain/zetacore/x/observer/types"
)

const (
OutTxInclusionTimeout = 10 * time.Minute
OutTxTrackerReportTimeout = 10 * time.Minute
ZetaBlockTime = 6500 * time.Millisecond
)

type EVMSigner struct {
Expand Down Expand Up @@ -592,7 +595,7 @@ func (signer *EVMSigner) TryProcessOutTx(
}
}

// reportToOutTxTracker reports outTxHash to tracker only when tx receipt is available
// reportToOutTxTracker reports outTxHash to tracker only when tx is included in a block
func (signer *EVMSigner) reportToOutTxTracker(zetaBridge ZetaCoreBridger, chainID int64, nonce uint64, outTxHash string, logger zerolog.Logger) {
// skip if already being reported
signer.mu.Lock()
Expand All @@ -611,36 +614,78 @@ func (signer *EVMSigner) reportToOutTxTracker(zetaBridge ZetaCoreBridger, chainI
signer.mu.Unlock()
}()

// try fetching tx receipt for 10 minutes
// try monitoring tx inclusion status for 10 minutes
var err error
report := false
isPending := false
blockNumber := uint64(0)
tStart := time.Now()
for {
if time.Since(tStart) > OutTxTrackerReportTimeout { // give up after 10 minutes
logger.Info().Msgf("reportToOutTxTracker: outTxHash report timeout for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
return
// give up after 10 minutes of monitoring
time.Sleep(10 * time.Second)
if time.Since(tStart) > OutTxInclusionTimeout {
// if tx is still pending after timeout, report to outTxTracker anyway as we cannot monitor forever
if isPending {
report = true // probably will be included later
}
logger.Info().Msgf("reportToOutTxTracker: timeout waiting tx inclusion for chain %d nonce %d outTxHash %s report %v", chainID, nonce, outTxHash, report)
break
}
receipt, err := signer.client.TransactionReceipt(context.TODO(), ethcommon.HexToHash(outTxHash))
// try getting the tx
_, isPending, err = signer.client.TransactionByHash(context.TODO(), ethcommon.HexToHash(outTxHash))
if err != nil {
logger.Info().Err(err).Msgf("reportToOutTxTracker: receipt not available for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
time.Sleep(10 * time.Second)
logger.Info().Err(err).Msgf("reportToOutTxTracker: error getting tx for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
continue
}
if receipt != nil {
_, isPending, err := signer.client.TransactionByHash(context.TODO(), ethcommon.HexToHash(outTxHash))
if err != nil || isPending {
logger.Info().Err(err).Msgf("reportToOutTxTracker: error getting tx or tx is pending for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
time.Sleep(10 * time.Second)
continue
// if tx is include in a block, try getting receipt
if !isPending {
report = true // included
receipt, err := signer.client.TransactionReceipt(context.TODO(), ethcommon.HexToHash(outTxHash))
if err != nil {
logger.Info().Err(err).Msgf("reportToOutTxTracker: error getting receipt for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
}
if receipt != nil {
blockNumber = receipt.BlockNumber.Uint64()
}
break
}
// keep monitoring pending tx
logger.Info().Msgf("reportToOutTxTracker: tx has not been included yet for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
}

// report to outTxTracker
zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(chainID, nonce, outTxHash, nil, "", -1)
if err != nil {
logger.Err(err).Msgf("reportToOutTxTracker: unable to add to tracker on ZetaCore for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
// try adding to outTx tracker for 10 minutes
if report {
tStart := time.Now()
for {
// give up after 10 minutes of retrying
if time.Since(tStart) > OutTxTrackerReportTimeout {
logger.Info().Msgf("reportToOutTxTracker: timeout adding outtx tracker for chain %d nonce %d outTxHash %s, please add manually", chainID, nonce, outTxHash)
break
}
// stop if the cctx is already finalized
cctx, err := zetaBridge.GetCctxByNonce(chainID, nonce)
if err != nil {
logger.Err(err).Msgf("reportToOutTxTracker: error getting cctx for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
} else if !crosschainkeeper.IsPending(*cctx) {
logger.Info().Msgf("reportToOutTxTracker: cctx already finalized for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
break
}
// report to outTx tracker
zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(chainID, nonce, outTxHash, nil, "", -1)
if err != nil {
logger.Err(err).Msgf("reportToOutTxTracker: error adding to outtx tracker for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash)
} else if zetaHash != "" {
logger.Info().Msgf("reportToOutTxTracker: added outTxHash to core successful %s, chain %d nonce %d outTxHash %s block %d",
zetaHash, chainID, nonce, outTxHash, blockNumber)
} else {
// stop if the tracker contains the outTxHash
logger.Info().Msgf("reportToOutTxTracker: outtx tracker contains outTxHash %s for chain %d nonce %d", outTxHash, chainID, nonce)
break
}
// retry otherwise
time.Sleep(ZetaBlockTime * 3)
}
}
logger.Info().Msgf("reportToOutTxTracker: reported outTxHash to core successful %s, chain %d nonce %d outTxHash %s", zetaHash, chainID, nonce, outTxHash)
}()
}

Expand Down
1 change: 1 addition & 0 deletions zetaclient/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type ZetaCoreBridger interface {
ListPendingCctx(chainID int64) ([]*crosschaintypes.CrossChainTx, uint64, error)
GetPendingNoncesByChain(chainID int64) (observertypes.PendingNonces, error)
GetCctxByNonce(chainID int64, nonce uint64) (*crosschaintypes.CrossChainTx, error)
GetOutTxTracker(chain common.Chain, nonce uint64) (*crosschaintypes.OutTxTracker, error)
GetAllOutTxTrackerByChain(chainID int64, order Order) ([]crosschaintypes.OutTxTracker, error)
GetCrosschainFlags() (observertypes.CrosschainFlags, error)
GetObserverList() ([]string, error)
Expand Down
10 changes: 10 additions & 0 deletions zetaclient/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zetaclient

import (
"fmt"
"strings"
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -78,6 +79,15 @@ func (b *ZetaCoreBridge) AddTxHashToOutTxTracker(
blockHash string,
txIndex int64,
) (string, error) {
// don't report if the tracker already contains the txHash
tracker, err := b.GetOutTxTracker(common.Chain{ChainId: chainID}, nonce)
if err == nil {
for _, hash := range tracker.HashList {
if strings.EqualFold(hash.TxHash, txHash) {
return "", nil
}
}
}
signerAddress := b.keys.GetOperatorAddress().String()
msg := types.NewMsgAddToOutTxTracker(signerAddress, chainID, nonce, txHash, proof, blockHash, txIndex)

Expand Down
3 changes: 2 additions & 1 deletion zetaclient/zetacore_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ func (co *CoreObserver) scheduleCctxEVM(
continue
}
if params.OutboundTxTssNonce > cctxList[0].GetCurrentOutTxParam().OutboundTxTssNonce+MaxLookaheadNonce {
co.logger.ZetaChainWatcher.Error().Msgf("scheduleCctxEVM: nonce too high: signing %d, earliest pending %d", params.OutboundTxTssNonce, cctxList[0].GetCurrentOutTxParam().OutboundTxTssNonce)
co.logger.ZetaChainWatcher.Error().Msgf("scheduleCctxEVM: nonce too high: signing %d, earliest pending %d, chain %d",
params.OutboundTxTssNonce, cctxList[0].GetCurrentOutTxParam().OutboundTxTssNonce, chainID)
break
}

Expand Down

0 comments on commit 64ed9b1

Please sign in to comment.