Skip to content

Commit

Permalink
check crosschain flags to stop inbound/outbound; get rid of timeout i…
Browse files Browse the repository at this point in the history
…n outtx processing
  • Loading branch information
ws4charlie committed Apr 2, 2024
1 parent 1e0d859 commit 13b75e3
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 87 deletions.
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
* [1880](https://github.com/zeta-chain/node/issues/1880) - lower the gas price multiplier for EVM chains.
* [1633](https://github.com/zeta-chain/node/issues/1633) - zetaclient should be able to pick up new connector and erc20Custody addresses
* [1944](https://github.com/zeta-chain/node/pull/1944) - fix evm signer unit tests
* [1888](https://github.com/zeta-chain/node/issues/1888) - zetaclient should stop inbound/outbound txs according to cross-chain flags
* [1970](https://github.com/zeta-chain/node/issues/1970) - remove the timeout in the evm outtx tracker processing thread

### Chores

Expand Down
24 changes: 17 additions & 7 deletions zetaclient/bitcoin/bitcoin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ func (ob *BTCChainClient) WatchInTx() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue
}
err := ob.ObserveInTx()
if err != nil {
ob.logger.WatchInTx.Error().Err(err).Msg("WatchInTx error observing in tx")
Expand Down Expand Up @@ -383,12 +386,6 @@ func (ob *BTCChainClient) postBlockHeader(tip int64) error {
}

func (ob *BTCChainClient) ObserveInTx() error {
// make sure inbound TXS / Send is enabled by the protocol
flags := ob.coreContext.GetCrossChainFlags()
if !flags.IsInboundEnabled {
return errors.New("inbound TXS / Send has been disabled by the protocol")
}

// get and update latest block height
cnt, err := ob.rpcClient.GetBlockCount()
if err != nil {
Expand Down Expand Up @@ -437,6 +434,7 @@ func (ob *BTCChainClient) ObserveInTx() error {
}

// add block header to zetabridge
flags := ob.coreContext.GetCrossChainFlags()
if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled {
err = ob.postBlockHeader(bn)
if err != nil {
Expand Down Expand Up @@ -576,19 +574,28 @@ func (ob *BTCChainClient) IsSendOutTxProcessed(cctx *types.CrossChainTx, logger
}

func (ob *BTCChainClient) WatchGasPrice() {
// report gas price right away as the ticker takes time to kick in
err := ob.PostGasPrice()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)
}

// start gas price ticker
ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchGasPrice", ob.GetChainParams().GasPriceTicker)
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("WatchGasPrice error")
return
}
ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d",
ob.chain.ChainId, ob.GetChainParams().GasPriceTicker)

defer ticker.Stop()
for {
select {
case <-ticker.C():
err := ob.PostGasPrice()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("PostGasPrice error on " + ob.chain.String())
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)
}
ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice)
case <-ob.stop:
Expand Down Expand Up @@ -1095,6 +1102,9 @@ func (ob *BTCChainClient) observeOutTx() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled {
continue
}
trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending)
if err != nil {
ob.logger.ObserveOutTx.Error().Err(err).Msg("observeOutTx: error GetAllOutTxTrackerByChain")
Expand Down
3 changes: 3 additions & 0 deletions zetaclient/bitcoin/inbound_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ func (ob *BTCChainClient) ExternalChainWatcherForNewInboundTrackerSuggestions()
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue
}
err := ob.ObserveTrackerSuggestions()
if err != nil {
ob.logger.WatchInTx.Error().Err(err).Msg("error observing in tx")
Expand Down
69 changes: 23 additions & 46 deletions zetaclient/evm/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,13 +612,6 @@ func (ob *ChainClient) IsSendOutTxProcessed(cctx *crosschaintypes.CrossChainTx,
// FIXME: there's a chance that a txhash in OutTxChan may not deliver when Stop() is called
// observeOutTx periodically checks all the txhash in potential outbound txs
func (ob *ChainClient) observeOutTx() {
// read env variables if set
timeoutNonce, err := strconv.Atoi(os.Getenv("OS_TIMEOUT_NONCE"))
if err != nil || timeoutNonce <= 0 {
timeoutNonce = 100 * 3 // process up to 100 hashes
}
ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: using timeoutNonce %d seconds", timeoutNonce)

ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_observeOutTx_%d", ob.chain.ChainId), ob.GetChainParams().OutTxTicker)
if err != nil {
ob.logger.ObserveOutTx.Error().Err(err).Msg("failed to create ticker")
Expand All @@ -629,13 +622,13 @@ func (ob *ChainClient) observeOutTx() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled {
continue
}
trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending)
if err != nil {
continue
}
//FIXME: remove this timeout here to ensure that all trackers are queried
outTimeout := time.After(time.Duration(timeoutNonce) * time.Second)
TRACKERLOOP:
for _, tracker := range trackers {
nonceInt := tracker.Nonce
if ob.isTxConfirmed(nonceInt) { // Go to next tracker if this one already has a confirmed tx
Expand All @@ -645,20 +638,14 @@ func (ob *ChainClient) observeOutTx() {
var receipt *ethtypes.Receipt
var transaction *ethtypes.Transaction
for _, txHash := range tracker.HashList {
select {
case <-outTimeout:
ob.logger.ObserveOutTx.Warn().Msgf("observeOutTx: timeout on chain %d nonce %d", ob.chain.ChainId, nonceInt)
break TRACKERLOOP
default:
if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok {
txCount++
receipt = recpt
transaction = tx
ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt)
if txCount > 1 {
ob.logger.ObserveOutTx.Error().Msgf(
"observeOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction)
}
if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok {
txCount++
receipt = recpt
transaction = tx
ob.logger.ObserveOutTx.Info().Msgf("observeOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt)
if txCount > 1 {
ob.logger.ObserveOutTx.Error().Msgf(
"observeOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction)
}
}
}
Expand Down Expand Up @@ -849,6 +836,9 @@ func (ob *ChainClient) ExternalChainWatcher() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue
}
err := ob.observeInTX(sampledLogger)
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msg("observeInTX error")
Expand Down Expand Up @@ -909,12 +899,6 @@ func (ob *ChainClient) postBlockHeader(tip uint64) error {
}

func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error {
// make sure inbound TXS / Send is enabled by the protocol
flags := ob.coreContext.GetCrossChainFlags()
if !flags.IsInboundEnabled {
return errors.New("inbound TXS / Send has been disabled by the protocol")
}

// get and update latest block height
blockNumber, err := ob.evmClient.BlockNumber(context.Background())
if err != nil {
Expand Down Expand Up @@ -952,7 +936,7 @@ func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error {
lastScannedDeposited := ob.ObserveERC20Deposited(startBlock, toBlock)

// task 3: query the incoming tx to TSS address (read at most 100 blocks in one go)
lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock, flags)
lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock)

// note: using lowest height for all 3 events is not perfect, but it's simple and good enough
lastScannedLowest := lastScannedZetaSent
Expand Down Expand Up @@ -1129,7 +1113,7 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64

// ObserverTSSReceive queries the incoming gas asset to TSS address and posts to zetabridge
// returns the last block successfully scanned
func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags observertypes.CrosschainFlags) uint64 {
func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64) uint64 {
if !ob.GetChainParams().IsSupported {
return startBlock - 1 // lastScanned
}
Expand All @@ -1138,6 +1122,7 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags obse
for bn := startBlock; bn <= toBlock; bn++ {
// post new block header (if any) to zetabridge and ignore error
// TODO: consider having a independent ticker(from TSS scaning) for posting block headers
flags := ob.coreContext.GetCrossChainFlags()
if flags.BlockHeaderVerificationFlags != nil &&
flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled &&
chains.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains
Expand All @@ -1159,36 +1144,28 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags obse
}

func (ob *ChainClient) WatchGasPrice() {
ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice starting...")
// report gas price right away as the ticker takes time to kick in
err := ob.PostGasPrice()
if err != nil {
height, err := ob.zetaClient.GetBlockHeight()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("GetBlockHeight error")
} else {
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height)
}
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)
}

// start gas price ticker
ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchGasPrice_%d", ob.chain.ChainId), ob.GetChainParams().GasPriceTicker)
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("NewDynamicTicker error")
return
}
ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started with interval %d", ob.GetChainParams().GasPriceTicker)
ob.logger.WatchGasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d",
ob.chain.ChainId, ob.GetChainParams().GasPriceTicker)

defer ticker.Stop()
for {
select {
case <-ticker.C():
err = ob.PostGasPrice()
if err != nil {
height, err := ob.zetaClient.GetBlockHeight()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("GetBlockHeight error")
} else {
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height)
}
ob.logger.WatchGasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId)
}
ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice)
case <-ob.stop:
Expand Down
3 changes: 3 additions & 0 deletions zetaclient/evm/inbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func (ob *ChainClient) ExternalChainWatcherForNewInboundTrackerSuggestions() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue
}
err := ob.ObserveIntxTrackers()
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msg("ObserveTrackerSuggestions error")
Expand Down
71 changes: 37 additions & 34 deletions zetaclient/zetacore_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,42 +138,45 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) {

// schedule keysign for pending cctxs on each chain
coreContext := appContext.ZetaCoreContext()
supportedChains := coreContext.GetEnabledChains()
for _, c := range supportedChains {
if c.ChainId == co.bridge.ZetaChain().ChainId {
continue
}
// update chain parameters for signer and chain client
signer, err := co.GetUpdatedSigner(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId)
continue
}
ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId)
continue
}

cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId)
continue
}
// Set Pending transactions prometheus gauge
metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending))

// #nosec G701 range is verified
zetaHeight := uint64(bn)
if chains.IsEVMChain(c.ChainId) {
co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else if chains.IsBitcoinChain(c.ChainId) {
co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else {
co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId)
continue
if flags := coreContext.GetCrossChainFlags(); flags.IsOutboundEnabled {
supportedChains := coreContext.GetEnabledChains()
for _, c := range supportedChains {
if c.ChainId == co.bridge.ZetaChain().ChainId {
continue
}
// update chain parameters for signer and chain client
signer, err := co.GetUpdatedSigner(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId)
continue
}
ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId)
continue
}

cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId)
continue
}
// Set Pending transactions prometheus gauge
metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending))

// #nosec G701 range is verified
zetaHeight := uint64(bn)
if chains.IsEVMChain(c.ChainId) {
co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else if chains.IsBitcoinChain(c.ChainId) {
co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else {
co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId)
continue
}
}
}

// update last processed block number
lastBlockNum = bn
metrics.LastCoreBlockNumber.Set(float64(lastBlockNum))
Expand Down

0 comments on commit 13b75e3

Please sign in to comment.