Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check crosschain flags to stop inbound/outbound; get rid of outtx tracker iteration timeout #1978

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
* [1880](https://github.com/zeta-chain/node/issues/1880) - lower the gas price multiplier for EVM chains.
* [1633](https://github.com/zeta-chain/node/issues/1633) - zetaclient should be able to pick up new connector and erc20Custody addresses
* [1944](https://github.com/zeta-chain/node/pull/1944) - fix evm signer unit tests
* [1888](https://github.com/zeta-chain/node/issues/1888) - zetaclient should stop inbound/outbound txs according to cross-chain flags
* [1970](https://github.com/zeta-chain/node/issues/1970) - remove the timeout in the evm outtx tracker processing thread

### Chores

Expand Down
24 changes: 17 additions & 7 deletions zetaclient/bitcoin/bitcoin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue

Check warning on line 337 in zetaclient/bitcoin/bitcoin_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/bitcoin_client.go#L336-L337

Added lines #L336 - L337 were not covered by tests
}
err := ob.ObserveInTx()
if err != nil {
ob.logger.WatchInTx.Error().Err(err).Msg("WatchInTx error observing in tx")
Expand Down Expand Up @@ -381,12 +384,6 @@
}

func (ob *BTCChainClient) ObserveInTx() error {
// make sure inbound TXS / Send is enabled by the protocol
flags := ob.coreContext.GetCrossChainFlags()
if !flags.IsInboundEnabled {
return errors.New("inbound TXS / Send has been disabled by the protocol")
}

// get and update latest block height
cnt, err := ob.rpcClient.GetBlockCount()
if err != nil {
Expand Down Expand Up @@ -435,6 +432,7 @@
}

// add block header to zetabridge
flags := ob.coreContext.GetCrossChainFlags()

Check warning on line 435 in zetaclient/bitcoin/bitcoin_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/bitcoin_client.go#L435

Added line #L435 was not covered by tests
if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled {
err = ob.postBlockHeader(bn)
if err != nil {
Expand Down Expand Up @@ -574,19 +572,28 @@
}

func (ob *BTCChainClient) WatchGasPrice() {
// report gas price right away as the ticker takes time to kick in
err := ob.PostGasPrice()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)

Check warning on line 578 in zetaclient/bitcoin/bitcoin_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/bitcoin_client.go#L576-L578

Added lines #L576 - L578 were not covered by tests
}

// start gas price ticker
ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchGasPrice", ob.GetChainParams().GasPriceTicker)
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("WatchGasPrice error")
return
}
ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d",
ob.chain.ChainId, ob.GetChainParams().GasPriceTicker)

Check warning on line 588 in zetaclient/bitcoin/bitcoin_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/bitcoin_client.go#L587-L588

Added lines #L587 - L588 were not covered by tests

defer ticker.Stop()
for {
select {
case <-ticker.C():
err := ob.PostGasPrice()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("PostGasPrice error on " + ob.chain.String())
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)

Check warning on line 596 in zetaclient/bitcoin/bitcoin_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/bitcoin_client.go#L596

Added line #L596 was not covered by tests
}
ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice)
case <-ob.stop:
Expand Down Expand Up @@ -1093,6 +1100,9 @@
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled {
continue

Check warning on line 1104 in zetaclient/bitcoin/bitcoin_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/bitcoin_client.go#L1103-L1104

Added lines #L1103 - L1104 were not covered by tests
}
trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending)
if err != nil {
ob.logger.ObserveOutTx.Error().Err(err).Msg("observeOutTx: error GetAllOutTxTrackerByChain")
Expand Down
9 changes: 0 additions & 9 deletions zetaclient/bitcoin/bitcoin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,3 @@ func TestCheckTSSVoutCancelled(t *testing.T) {
require.ErrorContains(t, err, "not match TSS address")
})
}

func TestBTCChainClient_ObserveInTx(t *testing.T) {
t.Run("should return error", func(t *testing.T) {
// create mainnet mock client
btcClient := MockBTCClientMainnet()
err := btcClient.ObserveInTx()
require.ErrorContains(t, err, "inbound TXS / Send has been disabled by the protocol")
})
}
3 changes: 3 additions & 0 deletions zetaclient/bitcoin/inbound_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue

Check warning on line 25 in zetaclient/bitcoin/inbound_tracker.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/bitcoin/inbound_tracker.go#L24-L25

Added lines #L24 - L25 were not covered by tests
}
err := ob.ObserveTrackerSuggestions()
if err != nil {
ob.logger.WatchInTx.Error().Err(err).Msg("error observing in tx")
Expand Down
69 changes: 23 additions & 46 deletions zetaclient/evm/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,6 @@
// FIXME: there's a chance that a txhash in OutTxChan may not deliver when Stop() is called
// observeOutTx periodically checks all the txhash in potential outbound txs
func (ob *ChainClient) observeOutTx() {
// read env variables if set
timeoutNonce, err := strconv.Atoi(os.Getenv("OS_TIMEOUT_NONCE"))
if err != nil || timeoutNonce <= 0 {
timeoutNonce = 100 * 3 // process up to 100 hashes
}
ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: using timeoutNonce %d seconds", timeoutNonce)

ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_observeOutTx_%d", ob.chain.ChainId), ob.GetChainParams().OutTxTicker)
if err != nil {
ob.logger.ObserveOutTx.Error().Err(err).Msg("failed to create ticker")
Expand All @@ -626,13 +619,13 @@
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled {
continue

Check warning on line 623 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L622-L623

Added lines #L622 - L623 were not covered by tests
}
trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending)
if err != nil {
continue
}
//FIXME: remove this timeout here to ensure that all trackers are queried
outTimeout := time.After(time.Duration(timeoutNonce) * time.Second)
TRACKERLOOP:
for _, tracker := range trackers {
nonceInt := tracker.Nonce
if ob.isTxConfirmed(nonceInt) { // Go to next tracker if this one already has a confirmed tx
Expand All @@ -642,20 +635,14 @@
var receipt *ethtypes.Receipt
var transaction *ethtypes.Transaction
for _, txHash := range tracker.HashList {
select {
case <-outTimeout:
ob.logger.ObserveOutTx.Warn().Msgf("observeOutTx: timeout on chain %d nonce %d", ob.chain.ChainId, nonceInt)
break TRACKERLOOP
default:
if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok {
txCount++
receipt = recpt
transaction = tx
ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt)
if txCount > 1 {
ob.logger.ObserveOutTx.Error().Msgf(
"observeOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction)
}
if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok {
txCount++
receipt = recpt
transaction = tx
ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt)
if txCount > 1 {
ob.logger.ObserveOutTx.Error().Msgf(
"observeOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction)

Check warning on line 645 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L638-L645

Added lines #L638 - L645 were not covered by tests
}
}
}
Expand Down Expand Up @@ -846,6 +833,9 @@
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue

Check warning on line 837 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L836-L837

Added lines #L836 - L837 were not covered by tests
}
err := ob.observeInTX(sampledLogger)
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msg("observeInTX error")
Expand Down Expand Up @@ -906,12 +896,6 @@
}

func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error {
// make sure inbound TXS / Send is enabled by the protocol
flags := ob.coreContext.GetCrossChainFlags()
if !flags.IsInboundEnabled {
return errors.New("inbound TXS / Send has been disabled by the protocol")
}

// get and update latest block height
blockNumber, err := ob.evmClient.BlockNumber(context.Background())
if err != nil {
Expand Down Expand Up @@ -949,7 +933,7 @@
lastScannedDeposited := ob.ObserveERC20Deposited(startBlock, toBlock)

// task 3: query the incoming tx to TSS address (read at most 100 blocks in one go)
lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock, flags)
lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock)

Check warning on line 936 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L936

Added line #L936 was not covered by tests

// note: using lowest height for all 3 events is not perfect, but it's simple and good enough
lastScannedLowest := lastScannedZetaSent
Expand Down Expand Up @@ -1126,7 +1110,7 @@

// ObserverTSSReceive queries the incoming gas asset to TSS address and posts to zetabridge
// returns the last block successfully scanned
func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags observertypes.CrosschainFlags) uint64 {
func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64) uint64 {

Check warning on line 1113 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L1113

Added line #L1113 was not covered by tests
if !ob.GetChainParams().IsSupported {
return startBlock - 1 // lastScanned
}
Expand All @@ -1135,6 +1119,7 @@
for bn := startBlock; bn <= toBlock; bn++ {
// post new block header (if any) to zetabridge and ignore error
// TODO: consider having a independent ticker(from TSS scaning) for posting block headers
flags := ob.coreContext.GetCrossChainFlags()

Check warning on line 1122 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L1122

Added line #L1122 was not covered by tests
if flags.BlockHeaderVerificationFlags != nil &&
flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled &&
chains.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains
Expand All @@ -1156,36 +1141,28 @@
}

func (ob *ChainClient) WatchGasPrice() {
ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice starting...")
// report gas price right away as the ticker takes time to kick in
err := ob.PostGasPrice()
if err != nil {
height, err := ob.zetaClient.GetBlockHeight()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("GetBlockHeight error")
} else {
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height)
}
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)

Check warning on line 1147 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L1147

Added line #L1147 was not covered by tests
}

// start gas price ticker
ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchGasPrice_%d", ob.chain.ChainId), ob.GetChainParams().GasPriceTicker)
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("NewDynamicTicker error")
return
}
ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started with interval %d", ob.GetChainParams().GasPriceTicker)
ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d",
ob.chain.ChainId, ob.GetChainParams().GasPriceTicker)

Check warning on line 1157 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L1156-L1157

Added lines #L1156 - L1157 were not covered by tests

defer ticker.Stop()
for {
select {
case <-ticker.C():
err = ob.PostGasPrice()
if err != nil {
height, err := ob.zetaClient.GetBlockHeight()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("GetBlockHeight error")
} else {
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height)
}
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)

Check warning on line 1165 in zetaclient/evm/evm_client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/evm_client.go#L1165

Added line #L1165 was not covered by tests
}
ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice)
case <-ob.stop:
Expand Down
3 changes: 3 additions & 0 deletions zetaclient/evm/inbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue

Check warning on line 47 in zetaclient/evm/inbounds.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/evm/inbounds.go#L46-L47

Added lines #L46 - L47 were not covered by tests
}
err := ob.ObserveIntxTrackers()
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msg("ObserveTrackerSuggestions error")
Expand Down
71 changes: 37 additions & 34 deletions zetaclient/zetacore_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,42 +138,45 @@

// schedule keysign for pending cctxs on each chain
coreContext := appContext.ZetaCoreContext()
supportedChains := coreContext.GetEnabledChains()
for _, c := range supportedChains {
if c.ChainId == co.bridge.ZetaChain().ChainId {
continue
}
// update chain parameters for signer and chain client
signer, err := co.GetUpdatedSigner(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId)
continue
}
ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId)
continue
}

cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId)
continue
}
// Set Pending transactions prometheus gauge
metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending))

// #nosec G701 range is verified
zetaHeight := uint64(bn)
if chains.IsEVMChain(c.ChainId) {
co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else if chains.IsBitcoinChain(c.ChainId) {
co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else {
co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId)
continue
if flags := coreContext.GetCrossChainFlags(); flags.IsOutboundEnabled {
supportedChains := coreContext.GetEnabledChains()
for _, c := range supportedChains {
if c.ChainId == co.bridge.ZetaChain().ChainId {
continue

Check warning on line 145 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L141-L145

Added lines #L141 - L145 were not covered by tests
}
// update chain parameters for signer and chain client
signer, err := co.GetUpdatedSigner(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId)
continue

Check warning on line 151 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L148-L151

Added lines #L148 - L151 were not covered by tests
}
ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId)
continue

Check warning on line 156 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L153-L156

Added lines #L153 - L156 were not covered by tests
}

cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId)
continue

Check warning on line 162 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L159-L162

Added lines #L159 - L162 were not covered by tests
}
// Set Pending transactions prometheus gauge
metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending))

Check warning on line 165 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L165

Added line #L165 was not covered by tests

// #nosec G701 range is verified
zetaHeight := uint64(bn)
if chains.IsEVMChain(c.ChainId) {
co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else if chains.IsBitcoinChain(c.ChainId) {
co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else {
co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId)
continue

Check warning on line 175 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L168-L175

Added lines #L168 - L175 were not covered by tests
}
}
}

// update last processed block number
lastBlockNum = bn
metrics.LastCoreBlockNumber.Set(float64(lastBlockNum))
Expand Down
Loading