Skip to content

Commit

Permalink
refactor and fix evm outbound tracker reporter to avoid invalid hashe…
Browse files Browse the repository at this point in the history
…s; print log when outbound tracker is full of invalid hashes
  • Loading branch information
ws4charlie committed Aug 2, 2024
1 parent b4251e1 commit 5e6f77f
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 163 deletions.
6 changes: 6 additions & 0 deletions pkg/constant/constant.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package constant

import "time"

const (
// ZetaBlockTime is the block time of the ZetaChain network
// It's a rough estimate that can be used in non-critical path to estimate the time of a block
ZetaBlockTime = 6000 * time.Millisecond

// DonationMessage is the message for donation transactions
// Transaction sent to the TSS or ERC20 Custody address containing this message are considered as a donation
DonationMessage = "I am rich!"
Expand Down
11 changes: 6 additions & 5 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
observertypes "github.com/zeta-chain/zetacore/x/observer/types"
"github.com/zeta-chain/zetacore/zetaclient/chains/interfaces"
"github.com/zeta-chain/zetacore/zetaclient/db"
"github.com/zeta-chain/zetacore/zetaclient/logs"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
clienttypes "github.com/zeta-chain/zetacore/zetaclient/types"
"github.com/zeta-chain/zetacore/zetaclient/zetacore"
Expand Down Expand Up @@ -295,13 +296,13 @@ func (ob *Observer) Logger() *ObserverLogger {

// WithLogger attaches a new logger to the observer.
func (ob *Observer) WithLogger(logger Logger) *Observer {
chainLogger := logger.Std.With().Int64("chain", ob.chain.ChainId).Logger()
chainLogger := logger.Std.With().Int64(logs.FieldChain, ob.chain.ChainId).Logger()
ob.logger = ObserverLogger{
Chain: chainLogger,
Inbound: chainLogger.With().Str("module", "inbound").Logger(),
Outbound: chainLogger.With().Str("module", "outbound").Logger(),
GasPrice: chainLogger.With().Str("module", "gasprice").Logger(),
Headers: chainLogger.With().Str("module", "headers").Logger(),
Inbound: chainLogger.With().Str(logs.FieldModule, logs.ModNameInbound).Logger(),
Outbound: chainLogger.With().Str(logs.FieldModule, logs.ModNameOutbound).Logger(),
GasPrice: chainLogger.With().Str(logs.FieldModule, logs.ModNameGasPrice).Logger(),
Headers: chainLogger.With().Str(logs.FieldModule, logs.ModNameHeaders).Logger(),
Compliance: logger.Compliance,
}
return ob
Expand Down
7 changes: 4 additions & 3 deletions zetaclient/chains/evm/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package evm
import "time"

const (
// ZetaBlockTime is the block time of the Zeta network
ZetaBlockTime = 6500 * time.Millisecond

// OutboundInclusionTimeout is the timeout for waiting for an outbound to be included in a block
OutboundInclusionTimeout = 20 * time.Minute

// ReorgProtectBlockCount is confirmations count to protect against reorg
// Short 1~2 block reorgs could happen often on Ethereum due to network congestion or block production race conditions
ReorgProtectBlockCount = 2

// OutboundTrackerReportTimeout is the timeout for waiting for an outbound tracker report
OutboundTrackerReportTimeout = 10 * time.Minute

Expand Down
108 changes: 72 additions & 36 deletions zetaclient/chains/evm/observer/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/zeta-chain/zetacore/pkg/chains"
"github.com/zeta-chain/zetacore/pkg/coin"
crosschainkeeper "github.com/zeta-chain/zetacore/x/crosschain/keeper"
crosschaintypes "github.com/zeta-chain/zetacore/x/crosschain/types"
"github.com/zeta-chain/zetacore/zetaclient/chains/evm"
"github.com/zeta-chain/zetacore/zetaclient/chains/interfaces"
Expand All @@ -30,8 +31,15 @@ import (

// WatchOutbound watches evm chain for outgoing txs status
// TODO(revamp): move ticker function to ticker file
// TODO(revamp): move inner logic to a separate function
func (ob *Observer) WatchOutbound(ctx context.Context) error {
// get app context
app, err := zctx.FromContext(ctx)
if err != nil {
return err
}

// create outbound ticker
chainID := ob.Chain().ChainId
ticker, err := clienttypes.NewDynamicTicker(
fmt.Sprintf("EVM_WatchOutbound_%d", ob.Chain().ChainId),
ob.GetChainParams().OutboundTicker,
Expand All @@ -41,11 +49,6 @@ func (ob *Observer) WatchOutbound(ctx context.Context) error {
return err
}

app, err := zctx.FromContext(ctx)
if err != nil {
return err
}

ob.Logger().Outbound.Info().Msgf("WatchOutbound started for chain %d", ob.Chain().ChainId)
sampledLogger := ob.Logger().Outbound.Sample(&zerolog.BasicSampler{N: 10})
defer ticker.Stop()
Expand All @@ -57,38 +60,16 @@ func (ob *Observer) WatchOutbound(ctx context.Context) error {
Msgf("WatchOutbound: outbound observation is disabled for chain %d", ob.Chain().ChainId)
continue
}
trackers, err := ob.ZetacoreClient().
GetAllOutboundTrackerByChain(ctx, ob.Chain().ChainId, interfaces.Ascending)

// process outbound trackers
err := ob.ProcessOutboundTrackers(ctx)
if err != nil {
continue
}
for _, tracker := range trackers {
nonceInt := tracker.Nonce
if ob.IsTxConfirmed(nonceInt) { // Go to next tracker if this one already has a confirmed tx
continue
}
txCount := 0
var outboundReceipt *ethtypes.Receipt
var outbound *ethtypes.Transaction
for _, txHash := range tracker.HashList {
if receipt, tx, ok := ob.checkConfirmedTx(ctx, txHash.TxHash, nonceInt); ok {
txCount++
outboundReceipt = receipt
outbound = tx
ob.Logger().Outbound.Info().
Msgf("WatchOutbound: confirmed outbound %s for chain %d nonce %d", txHash.TxHash, ob.Chain().ChainId, nonceInt)
if txCount > 1 {
ob.Logger().Outbound.Error().Msgf(
"WatchOutbound: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.Chain().ChainId, nonceInt, outboundReceipt, outbound)
}
}
}
if txCount == 1 { // should be only one txHash confirmed for each nonce.
ob.SetTxNReceipt(nonceInt, outboundReceipt, outbound)
} else if txCount > 1 { // should not happen. We can't tell which txHash is true. It might happen (e.g. glitchy/hacked endpoint)
ob.Logger().Outbound.Error().Msgf("WatchOutbound: confirmed multiple (%d) outbound for chain %d nonce %d", txCount, ob.Chain().ChainId, nonceInt)
}
ob.Logger().
Outbound.Error().
Err(err).
Msgf("WatchOutbound: error ProcessOutboundTrackers for chain %d", chainID)
}

ticker.UpdateInterval(ob.GetChainParams().OutboundTicker, ob.Logger().Outbound)
case <-ob.StopChannel():
ob.Logger().Outbound.Info().Msg("WatchOutbound: stopped")
Expand All @@ -97,6 +78,61 @@ func (ob *Observer) WatchOutbound(ctx context.Context) error {
}
}

// ProcessOutboundTrackers processes outbound trackers
func (ob *Observer) ProcessOutboundTrackers(ctx context.Context) error {
chainID := ob.Chain().ChainId
trackers, err := ob.ZetacoreClient().GetAllOutboundTrackerByChain(ctx, ob.Chain().ChainId, interfaces.Ascending)
if err != nil {
return errors.Wrap(err, "GetAllOutboundTrackerByChain error")
}

// prepare logger fields
logger := ob.Logger().Outbound.With().
Str("method", "ProcessOutboundTrackers").
Int64("chain", chainID).
Logger()

// process outbound trackers
for _, tracker := range trackers {
// go to next tracker if this one already has a confirmed tx
nonce := tracker.Nonce
if ob.IsTxConfirmed(nonce) {
continue
}

// check each txHash and save tx and receipt if it's legit and confirmed
txCount := 0
var outboundReceipt *ethtypes.Receipt
var outbound *ethtypes.Transaction
for _, txHash := range tracker.HashList {
if receipt, tx, ok := ob.checkConfirmedTx(ctx, txHash.TxHash, nonce); ok {
txCount++
outboundReceipt = receipt
outbound = tx
logger.Info().Msgf("confirmed outbound %s for chain %d nonce %d", txHash.TxHash, chainID, nonce)
if txCount > 1 {
logger.Error().
Msgf("checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v tx %v", txCount, chainID, nonce, receipt, tx)
}
}
}

// should be only one txHash confirmed for each nonce.
if txCount == 1 {
ob.SetTxNReceipt(nonce, outboundReceipt, outbound)
} else if txCount > 1 {
// should not happen. We can't tell which txHash is true. It might happen (e.g. bug, glitchy/hacked endpoint)
ob.Logger().Outbound.Error().Msgf("WatchOutbound: confirmed multiple (%d) outbound for chain %d nonce %d", txCount, chainID, nonce)
} else {
if len(tracker.HashList) == crosschainkeeper.MaxOutboundTrackerHashes {
ob.Logger().Outbound.Error().Msgf("WatchOutbound: outbound tracker is full of invalid hashes for chain %d nonce %d", chainID, nonce)
}
}
}

return nil
}

// PostVoteOutbound posts vote to zetacore for the confirmed outbound
func (ob *Observer) PostVoteOutbound(
ctx context.Context,
Expand Down
52 changes: 52 additions & 0 deletions zetaclient/chains/evm/rpc/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package rpc

import (
"context"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"

"github.com/zeta-chain/zetacore/zetaclient/chains/interfaces"
)

// IsTxConfirmed checks if the transaction is confirmed with given confirmations
func IsTxConfirmed(
ctx context.Context,
client interfaces.EVMRPCClient,
txHash string,
confirmations uint64,
) (bool, error) {
// query the tx
_, isPending, err := client.TransactionByHash(ctx, ethcommon.HexToHash(txHash))
if err != nil {
return false, errors.Wrapf(err, "error getting transaction for tx %s", txHash)
}
if isPending {
return false, nil
}

// query receipt
receipt, err := client.TransactionReceipt(ctx, ethcommon.HexToHash(txHash))
if err != nil {
return false, errors.Wrapf(err, "error getting transaction receipt for tx %s", txHash)
}

// should not happen
if receipt == nil {
return false, errors.Errorf("receipt is nil for tx %s", txHash)
}

// query last block height
lastHeight, err := client.BlockNumber(ctx)
if err != nil {
return false, errors.Wrap(err, "error getting block number")
}

// check confirmations
if lastHeight < receipt.BlockNumber.Uint64() {
return false, nil
}
blocks := lastHeight - receipt.BlockNumber.Uint64() + 1

return blocks >= confirmations, nil
}
45 changes: 45 additions & 0 deletions zetaclient/chains/evm/rpc/rpc_live_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package rpc_test

import (
"context"
"math"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/stretchr/testify/require"
"github.com/zeta-chain/zetacore/zetaclient/chains/evm/rpc"

"testing"
)

const (
URLEthMainnet = "https://rpc.ankr.com/eth"
URLEthSepolia = "https://rpc.ankr.com/eth_sepolia"
URLBscMainnet = "https://rpc.ankr.com/bsc"
URLPolygonMainnet = "https://rpc.ankr.com/polygon"
)

// Test_EVMRPCLive is a phony test to run each live test individually
func Test_EVMRPCLive(t *testing.T) {
// LiveTest_IsTxConfirmed(t)
}

func LiveTest_IsTxConfirmed(t *testing.T) {
client, err := ethclient.Dial(URLEthMainnet)
require.NoError(t, err)

// check if the transaction is confirmed
ctx := context.Background()
txHash := "0xd2eba7ac3da1b62800165414ea4bcaf69a3b0fb9b13a0fc32f4be11bfef79146"

t.Run("should confirm tx", func(t *testing.T) {
confirmed, err := rpc.IsTxConfirmed(ctx, client, txHash, 12)
require.NoError(t, err)
require.True(t, confirmed)
})

t.Run("should not confirm tx if confirmations is not enough", func(t *testing.T) {
confirmed, err := rpc.IsTxConfirmed(ctx, client, txHash, math.MaxUint64)
require.NoError(t, err)
require.False(t, confirmed)
})
}
86 changes: 86 additions & 0 deletions zetaclient/chains/evm/signer/outbound_tracker_reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Package signer implements the ChainSigner interface for EVM chains
package signer

import (
"context"
"time"

"github.com/rs/zerolog"

"github.com/zeta-chain/zetacore/zetaclient/chains/evm"
"github.com/zeta-chain/zetacore/zetaclient/chains/evm/rpc"
"github.com/zeta-chain/zetacore/zetaclient/chains/interfaces"
"github.com/zeta-chain/zetacore/zetaclient/logs"
)

// reportToOutboundTracker reports outboundHash to tracker only when tx receipt is available
func (signer *Signer) reportToOutboundTracker(
ctx context.Context,
zetacoreClient interfaces.ZetacoreClient,
chainID int64,
nonce uint64,
outboundHash string,
logger zerolog.Logger,
) {
// prepare logger
logger = logger.With().
Str(logs.FieldMethod, "reportToOutboundTracker").
Int64(logs.FieldChain, chainID).
Uint64(logs.FieldNonce, nonce).
Str(logs.FieldTx, outboundHash).
Logger()

// set being reported flag to avoid duplicate reporting
alreadySet := signer.Signer.SetBeingReportedFlag(outboundHash)
if alreadySet {
logger.Info().
Msgf("outbound %s for chain %d nonce %d is being reported", outboundHash, chainID, nonce)
return
}

// launch a goroutine to monitor tx confirmation status
go func() {
defer func() {
signer.Signer.ClearBeingReportedFlag(outboundHash)
}()

// try monitoring tx inclusion status for 20 minutes
tStart := time.Now()
for {
// take a rest between each check
time.Sleep(10 * time.Second)

// give up (forget about the tx) after 20 minutes of monitoring, there are 2 reasons:
// 1. the gas stability pool should have kicked in and replaced the tx by then.
// 2. even if there is a chance that the tx is included later, most likely it's going to be a false tx hash (either replaced or dropped).
if time.Since(tStart) > evm.OutboundInclusionTimeout {
logger.Info().
Msgf("timeout waiting outbound %s inclusion for chain %d nonce %d", outboundHash, chainID, nonce)
return
}

// check tx confirmation status
confirmed, err := rpc.IsTxConfirmed(ctx, signer.client, outboundHash, evm.ReorgProtectBlockCount)
if err != nil {
logger.Err(err).
Msgf("unable to check confirmation status for chain %d nonce %d outbound %s", chainID, nonce, outboundHash)
}
if !confirmed {
continue
}

// report outbound hash to tracker
zetaHash, err := zetacoreClient.AddOutboundTracker(ctx, chainID, nonce, outboundHash, nil, "", -1)
if err != nil {
logger.Err(err).
Msgf("error adding outbound %s to tracker for chain %d nonce %d", outboundHash, chainID, nonce)
} else if zetaHash != "" {
logger.Info().Msgf("added outbound %s to tracker for chain %d nonce %d; zeta txhash %s", outboundHash, chainID, nonce, zetaHash)
} else {
// exit goroutine until the tracker contains the hash (reported by either this or other signers)
logger.Info().Msgf("outbound %s now exists in tracker for chain %d nonce %d", outboundHash, chainID, nonce)
return
}
}
}()
}
Loading

0 comments on commit 5e6f77f

Please sign in to comment.