Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check crosschain flags to stop inbound/outbound; get rid of outtx tracker iteration timeout #1984

Merged
merged 7 commits into from
Apr 8, 2024
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
* [1883](https://github.com/zeta-chain/node/issues/1883) - zetaclient should check 'IsSupported' flag to pause/unpause a specific chain
* [1633](https://github.com/zeta-chain/node/issues/1633) - zetaclient should be able to pick up new connector and erc20Custody addresses
* [1944](https://github.com/zeta-chain/node/pull/1944) - fix evm signer unit tests
* [1888](https://github.com/zeta-chain/node/issues/1888) - zetaclient should stop inbound/outbound txs according to cross-chain flags
* [1970](https://github.com/zeta-chain/node/issues/1970) - remove the timeout in the evm outtx tracker processing thread

### Chores

Expand Down
22 changes: 16 additions & 6 deletions zetaclient/bitcoin/bitcoin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue

Check warning on line 337 in zetaclient/bitcoin/bitcoin_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/bitcoin_client.go#L336-L337

Added lines #L336 - L337 were not covered by tests
}
if !ob.GetChainParams().IsSupported {
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved
continue
}
Expand Down Expand Up @@ -384,12 +387,6 @@
}

func (ob *BTCChainClient) ObserveInTx() error {
// make sure inbound TXS / Send is enabled by the protocol
flags := ob.coreContext.GetCrossChainFlags()
if !flags.IsInboundEnabled {
return errors.New("inbound TXS / Send has been disabled by the protocol")
}

// get and update latest block height
cnt, err := ob.rpcClient.GetBlockCount()
if err != nil {
Expand Down Expand Up @@ -438,6 +435,7 @@
}

// add block header to zetabridge
flags := ob.coreContext.GetCrossChainFlags()

Check warning on line 438 in zetaclient/bitcoin/bitcoin_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/bitcoin_client.go#L438

Added line #L438 was not covered by tests
if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled {
err = ob.postBlockHeader(bn)
if err != nil {
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -583,11 +581,20 @@

// WatchGasPrice watches Bitcoin chain for gas rate and post to zetacore
func (ob *BTCChainClient) WatchGasPrice() {
// report gas price right away as the ticker takes time to kick in
err := ob.PostGasPrice()
if err != nil {
ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)

Check warning on line 587 in zetaclient/bitcoin/bitcoin_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/bitcoin_client.go#L585-L587

Added lines #L585 - L587 were not covered by tests
}

// start gas price ticker
ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchGasPrice", ob.GetChainParams().GasPriceTicker)
if err != nil {
ob.logger.GasPrice.Error().Err(err).Msg("error creating ticker")
return
}
ob.logger.GasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d",
ob.chain.ChainId, ob.GetChainParams().GasPriceTicker)

Check warning on line 597 in zetaclient/bitcoin/bitcoin_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/bitcoin_client.go#L596-L597

Added lines #L596 - L597 were not covered by tests

defer ticker.Stop()
for {
Expand Down Expand Up @@ -1125,6 +1132,9 @@
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled {
continue

Check warning on line 1136 in zetaclient/bitcoin/bitcoin_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/bitcoin_client.go#L1135-L1136

Added lines #L1135 - L1136 were not covered by tests
}
if !ob.GetChainParams().IsSupported {
continue
}
Expand Down
9 changes: 0 additions & 9 deletions zetaclient/bitcoin/bitcoin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,12 +712,3 @@ func TestGetBtcEventErrors(t *testing.T) {
require.Nil(t, event)
})
}

func TestBTCChainClient_ObserveInTx(t *testing.T) {
t.Run("should return error", func(t *testing.T) {
// create mainnet mock client
btcClient := MockBTCClientMainnet()
err := btcClient.ObserveInTx()
require.ErrorContains(t, err, "inbound TXS / Send has been disabled by the protocol")
})
}
3 changes: 3 additions & 0 deletions zetaclient/bitcoin/inbound_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue

Check warning on line 26 in zetaclient/bitcoin/inbound_tracker.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/inbound_tracker.go#L25-L26

Added lines #L25 - L26 were not covered by tests
}
if !ob.GetChainParams().IsSupported {
continue
}
Expand Down
69 changes: 23 additions & 46 deletions zetaclient/evm/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,13 +613,6 @@

// WatchOutTx watches evm chain for outgoing txs status
func (ob *ChainClient) WatchOutTx() {
// read env variables if set
timeoutNonce, err := strconv.Atoi(os.Getenv("OS_TIMEOUT_NONCE"))
if err != nil || timeoutNonce <= 0 {
timeoutNonce = 100 * 3 // process up to 100 hashes
}
ob.logger.OutTx.Info().Msgf("WatchOutTx: using timeoutNonce %d seconds", timeoutNonce)

ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchOutTx_%d", ob.chain.ChainId), ob.GetChainParams().OutTxTicker)
if err != nil {
ob.logger.OutTx.Error().Err(err).Msg("error creating ticker")
Expand All @@ -630,16 +623,16 @@
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled {
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved
continue

Check warning on line 627 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L626-L627

Added lines #L626 - L627 were not covered by tests
}
if !ob.GetChainParams().IsSupported {
continue
}
trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending)
if err != nil {
continue
}
//FIXME: remove this timeout here to ensure that all trackers are queried
outTimeout := time.After(time.Duration(timeoutNonce) * time.Second)
TRACKERLOOP:
for _, tracker := range trackers {
nonceInt := tracker.Nonce
if ob.isTxConfirmed(nonceInt) { // Go to next tracker if this one already has a confirmed tx
Expand All @@ -649,20 +642,14 @@
var receipt *ethtypes.Receipt
var transaction *ethtypes.Transaction
for _, txHash := range tracker.HashList {
select {
case <-outTimeout:
ob.logger.OutTx.Warn().Msgf("WatchOutTx: timeout on chain %d nonce %d", ob.chain.ChainId, nonceInt)
break TRACKERLOOP
default:
if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok {
txCount++
receipt = recpt
transaction = tx
ob.logger.OutTx.Info().Msgf("WatchOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt)
if txCount > 1 {
ob.logger.OutTx.Error().Msgf(
"WatchOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction)
}
if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok {
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved
txCount++
receipt = recpt
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved
transaction = tx
ob.logger.OutTx.Info().Msgf("WatchOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt)
if txCount > 1 {
ob.logger.OutTx.Error().Msgf(
"WatchOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction)

Check warning on line 652 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L645-L652

Added lines #L645 - L652 were not covered by tests
}
}
}
Expand Down Expand Up @@ -854,6 +841,9 @@
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue

Check warning on line 845 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L844-L845

Added lines #L844 - L845 were not covered by tests
}
if !ob.GetChainParams().IsSupported {
sampledLogger.Info().Msgf("WatchInTx: chain %d is not supported", ob.chain.ChainId)
continue
Expand Down Expand Up @@ -918,12 +908,6 @@
}

func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error {
// make sure inbound TXS / Send is enabled by the protocol
flags := ob.coreContext.GetCrossChainFlags()
if !flags.IsInboundEnabled {
return errors.New("inbound TXS / Send has been disabled by the protocol")
}

// get and update latest block height
blockNumber, err := ob.evmClient.BlockNumber(context.Background())
if err != nil {
Expand Down Expand Up @@ -961,7 +945,7 @@
lastScannedDeposited := ob.ObserveERC20Deposited(startBlock, toBlock)

// task 3: query the incoming tx to TSS address (read at most 100 blocks in one go)
lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock, flags)
lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock)

Check warning on line 948 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L948

Added line #L948 was not covered by tests

// note: using lowest height for all 3 events is not perfect, but it's simple and good enough
lastScannedLowest := lastScannedZetaSent
Expand Down Expand Up @@ -1138,11 +1122,12 @@

// ObserverTSSReceive queries the incoming gas asset to TSS address and posts to zetabridge
// returns the last block successfully scanned
func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags observertypes.CrosschainFlags) uint64 {
func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64) uint64 {

Check warning on line 1125 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L1125

Added line #L1125 was not covered by tests
// query incoming gas asset
for bn := startBlock; bn <= toBlock; bn++ {
// post new block header (if any) to zetabridge and ignore error
// TODO: consider having a independent ticker(from TSS scaning) for posting block headers
flags := ob.coreContext.GetCrossChainFlags()

Check warning on line 1130 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L1130

Added line #L1130 was not covered by tests
if flags.BlockHeaderVerificationFlags != nil &&
flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled &&
chains.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains
Expand All @@ -1165,23 +1150,20 @@

// WatchGasPrice watches evm chain for gas prices and post to zetacore
func (ob *ChainClient) WatchGasPrice() {
ob.logger.GasPrice.Info().Msg("WatchGasPrice starting...")
// report gas price right away as the ticker takes time to kick in
err := ob.PostGasPrice()
if err != nil {
height, err := ob.zetaClient.GetBlockHeight()
if err != nil {
ob.logger.GasPrice.Error().Err(err).Msg("GetBlockHeight error")
} else {
ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height)
}
ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)

Check warning on line 1156 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L1156

Added line #L1156 was not covered by tests
}

// start gas price ticker
ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchGasPrice_%d", ob.chain.ChainId), ob.GetChainParams().GasPriceTicker)
if err != nil {
ob.logger.GasPrice.Error().Err(err).Msg("NewDynamicTicker error")
return
}
ob.logger.GasPrice.Info().Msgf("WatchGasPrice started with interval %d", ob.GetChainParams().GasPriceTicker)
ob.logger.GasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d",
ob.chain.ChainId, ob.GetChainParams().GasPriceTicker)

Check warning on line 1166 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L1165-L1166

Added lines #L1165 - L1166 were not covered by tests

defer ticker.Stop()
for {
Expand All @@ -1192,12 +1174,7 @@
}
err = ob.PostGasPrice()
if err != nil {
height, err := ob.zetaClient.GetBlockHeight()
if err != nil {
ob.logger.GasPrice.Error().Err(err).Msg("GetBlockHeight error")
} else {
ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height)
}
ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)

Check warning on line 1177 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L1177

Added line #L1177 was not covered by tests
}
ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.GasPrice)
case <-ob.stop:
Expand Down
10 changes: 6 additions & 4 deletions zetaclient/evm/inbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,19 @@
"strings"

sdkmath "cosmossdk.io/math"
ethcommon "github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/onrik/ethrpc"
"github.com/pkg/errors"
"github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/erc20custody.sol"
"github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/zetaconnector.non-eth.sol"
"github.com/zeta-chain/zetacore/pkg/chains"
"github.com/zeta-chain/zetacore/pkg/coin"
"github.com/zeta-chain/zetacore/pkg/constant"
"github.com/zeta-chain/zetacore/x/crosschain/types"
"github.com/zeta-chain/zetacore/zetaclient/compliance"
"github.com/zeta-chain/zetacore/zetaclient/config"
clienttypes "github.com/zeta-chain/zetacore/zetaclient/types"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/zeta-chain/zetacore/pkg/constant"
"github.com/zeta-chain/zetacore/x/crosschain/types"
"github.com/zeta-chain/zetacore/zetaclient/zetabridge"
"golang.org/x/net/context"
)
Expand All @@ -43,6 +42,9 @@
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue

Check warning on line 46 in zetaclient/evm/inbounds.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/inbounds.go#L45-L46

Added lines #L45 - L46 were not covered by tests
}
if !ob.GetChainParams().IsSupported {
continue
}
Expand Down
92 changes: 46 additions & 46 deletions zetaclient/zetacore_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,18 @@
"math"
"time"

sdkmath "cosmossdk.io/math"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/rs/zerolog"
"github.com/zeta-chain/zetacore/pkg/chains"
"github.com/zeta-chain/zetacore/x/crosschain/types"
observertypes "github.com/zeta-chain/zetacore/x/observer/types"
appcontext "github.com/zeta-chain/zetacore/zetaclient/app_context"
"github.com/zeta-chain/zetacore/zetaclient/bitcoin"
corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context"
"github.com/zeta-chain/zetacore/zetaclient/interfaces"
"github.com/zeta-chain/zetacore/zetaclient/outtxprocessor"

observertypes "github.com/zeta-chain/zetacore/x/observer/types"

sdkmath "cosmossdk.io/math"

"github.com/rs/zerolog"
"github.com/zeta-chain/zetacore/x/crosschain/types"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
"github.com/zeta-chain/zetacore/zetaclient/outtxprocessor"
)

const (
Expand Down Expand Up @@ -138,46 +135,49 @@

// schedule keysign for pending cctxs on each chain
coreContext := appContext.ZetaCoreContext()
supportedChains := coreContext.GetEnabledChains()
for _, c := range supportedChains {
if c.ChainId == co.bridge.ZetaChain().ChainId {
continue
}
// update chain parameters for signer and chain client
signer, err := co.GetUpdatedSigner(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId)
continue
}
ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId)
continue
}
if !ob.GetChainParams().IsSupported {
co.logger.ZetaChainWatcher.Info().Msgf("startCctxScheduler: chain %d is not supported", c.ChainId)
continue
}

cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId)
continue
}
// Set Pending transactions prometheus gauge
metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending))

// #nosec G701 range is verified
zetaHeight := uint64(bn)
if chains.IsEVMChain(c.ChainId) {
co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else if chains.IsBitcoinChain(c.ChainId) {
co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else {
co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId)
continue
if flags := coreContext.GetCrossChainFlags(); flags.IsOutboundEnabled {
supportedChains := coreContext.GetEnabledChains()
for _, c := range supportedChains {
if c.ChainId == co.bridge.ZetaChain().ChainId {
continue

Check warning on line 142 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L138-L142

Added lines #L138 - L142 were not covered by tests
}
// update chain parameters for signer and chain client
signer, err := co.GetUpdatedSigner(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId)
continue

Check warning on line 148 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L145-L148

Added lines #L145 - L148 were not covered by tests
}
ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId)
continue

Check warning on line 153 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L150-L153

Added lines #L150 - L153 were not covered by tests
}
if !ob.GetChainParams().IsSupported {
co.logger.ZetaChainWatcher.Info().Msgf("startCctxScheduler: chain %d is not supported", c.ChainId)
continue

Check warning on line 157 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L155-L157

Added lines #L155 - L157 were not covered by tests
}

cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId)
continue

Check warning on line 163 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L160-L163

Added lines #L160 - L163 were not covered by tests
}
// Set Pending transactions prometheus gauge
metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending))

Check warning on line 166 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L166

Added line #L166 was not covered by tests

// #nosec G701 range is verified
zetaHeight := uint64(bn)
if chains.IsEVMChain(c.ChainId) {
co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else if chains.IsBitcoinChain(c.ChainId) {
co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else {
co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId)
continue

Check warning on line 176 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L169-L176

Added lines #L169 - L176 were not covered by tests
}
}
}

// update last processed block number
lastBlockNum = bn
metrics.LastCoreBlockNumber.Set(float64(lastBlockNum))
Expand Down
Loading