Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check crosschain flags to stop inbound/outbound; get rid of outtx tracker iteration timeout #1978

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
* [1880](https://github.com/zeta-chain/node/issues/1880) - lower the gas price multiplier for EVM chains.
* [1633](https://github.com/zeta-chain/node/issues/1633) - zetaclient should be able to pick up new connector and erc20Custody addresses
* [1944](https://github.com/zeta-chain/node/pull/1944) - fix evm signer unit tests
* [1888](https://github.com/zeta-chain/node/issues/1888) - zetaclient should stop inbound/outbound txs according to cross-chain flags
* [1970](https://github.com/zeta-chain/node/issues/1970) - remove the timeout in the evm outtx tracker processing thread

### Chores

Expand Down
24 changes: 17 additions & 7 deletions zetaclient/bitcoin/bitcoin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ func (ob *BTCChainClient) WatchInTx() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue
}
err := ob.ObserveInTx()
if err != nil {
ob.logger.WatchInTx.Error().Err(err).Msg("WatchInTx error observing in tx")
Expand Down Expand Up @@ -383,12 +386,6 @@ func (ob *BTCChainClient) postBlockHeader(tip int64) error {
}

func (ob *BTCChainClient) ObserveInTx() error {
// make sure inbound TXS / Send is enabled by the protocol
flags := ob.coreContext.GetCrossChainFlags()
if !flags.IsInboundEnabled {
return errors.New("inbound TXS / Send has been disabled by the protocol")
}

// get and update latest block height
cnt, err := ob.rpcClient.GetBlockCount()
if err != nil {
Expand Down Expand Up @@ -437,6 +434,7 @@ func (ob *BTCChainClient) ObserveInTx() error {
}

// add block header to zetabridge
flags := ob.coreContext.GetCrossChainFlags()
if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled {
err = ob.postBlockHeader(bn)
if err != nil {
Expand Down Expand Up @@ -576,19 +574,28 @@ func (ob *BTCChainClient) IsSendOutTxProcessed(cctx *types.CrossChainTx, logger
}

func (ob *BTCChainClient) WatchGasPrice() {
// report gas price right away as the ticker takes time to kick in
err := ob.PostGasPrice()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)
}

// start gas price ticker
ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchGasPrice", ob.GetChainParams().GasPriceTicker)
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("WatchGasPrice error")
return
}
ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d",
ob.chain.ChainId, ob.GetChainParams().GasPriceTicker)

defer ticker.Stop()
for {
select {
case <-ticker.C():
err := ob.PostGasPrice()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("PostGasPrice error on " + ob.chain.String())
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)
}
ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice)
case <-ob.stop:
Expand Down Expand Up @@ -1095,6 +1102,9 @@ func (ob *BTCChainClient) observeOutTx() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled {
continue
}
trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending)
if err != nil {
ob.logger.ObserveOutTx.Error().Err(err).Msg("observeOutTx: error GetAllOutTxTrackerByChain")
Expand Down
3 changes: 3 additions & 0 deletions zetaclient/bitcoin/inbound_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ func (ob *BTCChainClient) ExternalChainWatcherForNewInboundTrackerSuggestions()
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue
}
err := ob.ObserveTrackerSuggestions()
if err != nil {
ob.logger.WatchInTx.Error().Err(err).Msg("error observing in tx")
Expand Down
69 changes: 23 additions & 46 deletions zetaclient/evm/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,13 +612,6 @@ func (ob *ChainClient) IsSendOutTxProcessed(cctx *crosschaintypes.CrossChainTx,
// FIXME: there's a chance that a txhash in OutTxChan may not deliver when Stop() is called
// observeOutTx periodically checks all the txhash in potential outbound txs
func (ob *ChainClient) observeOutTx() {
// read env variables if set
timeoutNonce, err := strconv.Atoi(os.Getenv("OS_TIMEOUT_NONCE"))
if err != nil || timeoutNonce <= 0 {
timeoutNonce = 100 * 3 // process up to 100 hashes
}
ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: using timeoutNonce %d seconds", timeoutNonce)

ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_observeOutTx_%d", ob.chain.ChainId), ob.GetChainParams().OutTxTicker)
if err != nil {
ob.logger.ObserveOutTx.Error().Err(err).Msg("failed to create ticker")
Expand All @@ -629,13 +622,13 @@ func (ob *ChainClient) observeOutTx() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled {
continue
}
trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending)
if err != nil {
continue
}
//FIXME: remove this timeout here to ensure that all trackers are queried
outTimeout := time.After(time.Duration(timeoutNonce) * time.Second)
TRACKERLOOP:
for _, tracker := range trackers {
nonceInt := tracker.Nonce
if ob.isTxConfirmed(nonceInt) { // Go to next tracker if this one already has a confirmed tx
Expand All @@ -645,20 +638,14 @@ func (ob *ChainClient) observeOutTx() {
var receipt *ethtypes.Receipt
var transaction *ethtypes.Transaction
for _, txHash := range tracker.HashList {
select {
case <-outTimeout:
ob.logger.ObserveOutTx.Warn().Msgf("observeOutTx: timeout on chain %d nonce %d", ob.chain.ChainId, nonceInt)
break TRACKERLOOP
default:
if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok {
txCount++
receipt = recpt
transaction = tx
ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt)
if txCount > 1 {
ob.logger.ObserveOutTx.Error().Msgf(
"observeOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction)
}
if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok {
txCount++
receipt = recpt
transaction = tx
ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt)
if txCount > 1 {
ob.logger.ObserveOutTx.Error().Msgf(
"observeOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction)
}
}
}
Expand Down Expand Up @@ -849,6 +836,9 @@ func (ob *ChainClient) ExternalChainWatcher() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue
}
err := ob.observeInTX(sampledLogger)
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msg("observeInTX error")
Expand Down Expand Up @@ -909,12 +899,6 @@ func (ob *ChainClient) postBlockHeader(tip uint64) error {
}

func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error {
// make sure inbound TXS / Send is enabled by the protocol
flags := ob.coreContext.GetCrossChainFlags()
if !flags.IsInboundEnabled {
return errors.New("inbound TXS / Send has been disabled by the protocol")
}

// get and update latest block height
blockNumber, err := ob.evmClient.BlockNumber(context.Background())
if err != nil {
Expand Down Expand Up @@ -952,7 +936,7 @@ func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error {
lastScannedDeposited := ob.ObserveERC20Deposited(startBlock, toBlock)

// task 3: query the incoming tx to TSS address (read at most 100 blocks in one go)
lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock, flags)
lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock)

// note: using lowest height for all 3 events is not perfect, but it's simple and good enough
lastScannedLowest := lastScannedZetaSent
Expand Down Expand Up @@ -1129,7 +1113,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64

// ObserverTSSReceive queries the incoming gas asset to TSS address and posts to zetabridge
// returns the last block successfully scanned
func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags observertypes.CrosschainFlags) uint64 {
func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64) uint64 {
if !ob.GetChainParams().IsSupported {
return startBlock - 1 // lastScanned
}
Expand All @@ -1138,6 +1122,7 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags obse
for bn := startBlock; bn <= toBlock; bn++ {
// post new block header (if any) to zetabridge and ignore error
// TODO: consider having a independent ticker(from TSS scaning) for posting block headers
flags := ob.coreContext.GetCrossChainFlags()
if flags.BlockHeaderVerificationFlags != nil &&
flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled &&
chains.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains
Expand All @@ -1159,36 +1144,28 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags obse
}

func (ob *ChainClient) WatchGasPrice() {
ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice starting...")
// report gas price right away as the ticker takes time to kick in
err := ob.PostGasPrice()
if err != nil {
height, err := ob.zetaClient.GetBlockHeight()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("GetBlockHeight error")
} else {
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height)
}
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)
}

// start gas price ticker
ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchGasPrice_%d", ob.chain.ChainId), ob.GetChainParams().GasPriceTicker)
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("NewDynamicTicker error")
return
}
ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started with interval %d", ob.GetChainParams().GasPriceTicker)
ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d",
ob.chain.ChainId, ob.GetChainParams().GasPriceTicker)

defer ticker.Stop()
for {
select {
case <-ticker.C():
err = ob.PostGasPrice()
if err != nil {
height, err := ob.zetaClient.GetBlockHeight()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("GetBlockHeight error")
} else {
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height)
}
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)
}
ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice)
case <-ob.stop:
Expand Down
3 changes: 3 additions & 0 deletions zetaclient/evm/inbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func (ob *ChainClient) ExternalChainWatcherForNewInboundTrackerSuggestions() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue
}
err := ob.ObserveIntxTrackers()
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msg("ObserveTrackerSuggestions error")
Expand Down
71 changes: 37 additions & 34 deletions zetaclient/zetacore_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,42 +138,45 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) {

// schedule keysign for pending cctxs on each chain
coreContext := appContext.ZetaCoreContext()
supportedChains := coreContext.GetEnabledChains()
for _, c := range supportedChains {
if c.ChainId == co.bridge.ZetaChain().ChainId {
continue
}
// update chain parameters for signer and chain client
signer, err := co.GetUpdatedSigner(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId)
continue
}
ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId)
continue
}

cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId)
continue
}
// Set Pending transactions prometheus gauge
metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending))

// #nosec G701 range is verified
zetaHeight := uint64(bn)
if chains.IsEVMChain(c.ChainId) {
co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else if chains.IsBitcoinChain(c.ChainId) {
co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else {
co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId)
continue
if flags := coreContext.GetCrossChainFlags(); flags.IsOutboundEnabled {
supportedChains := coreContext.GetEnabledChains()
for _, c := range supportedChains {
if c.ChainId == co.bridge.ZetaChain().ChainId {
continue
}
// update chain parameters for signer and chain client
signer, err := co.GetUpdatedSigner(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId)
continue
}
ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId)
continue
}

cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId)
continue
}
// Set Pending transactions prometheus gauge
metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending))

// #nosec G701 range is verified
zetaHeight := uint64(bn)
if chains.IsEVMChain(c.ChainId) {
co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else if chains.IsBitcoinChain(c.ChainId) {
co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else {
co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId)
continue
}
}
}

// update last processed block number
lastBlockNum = bn
metrics.LastCoreBlockNumber.Set(float64(lastBlockNum))
Expand Down
Loading