From 46f5e98e22654149331e12aa924094f5d9a31bff Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Thu, 4 Apr 2024 12:55:10 -0500 Subject: [PATCH] a bit refactor and added unit tests --- zetaclient/bitcoin/bitcoin_client.go | 17 +-- zetaclient/bitcoin/inbound_tracker.go | 6 +- zetaclient/core_context/zeta_core_context.go | 12 +++ .../core_context/zeta_core_context_test.go | 101 ++++++++++++++++-- zetaclient/evm/evm_client.go | 30 +++--- zetaclient/evm/inbounds.go | 6 +- zetaclient/zetacore_observer.go | 77 +++++++------ 7 files changed, 170 insertions(+), 79 deletions(-) diff --git a/zetaclient/bitcoin/bitcoin_client.go b/zetaclient/bitcoin/bitcoin_client.go index d1f6c0cc47..110c89c474 100644 --- a/zetaclient/bitcoin/bitcoin_client.go +++ b/zetaclient/bitcoin/bitcoin_client.go @@ -330,13 +330,12 @@ func (ob *BTCChainClient) WatchInTx() { defer ticker.Stop() ob.logger.InTx.Info().Msgf("WatchInTx started for chain %d", ob.chain.ChainId) + sampledLogger := ob.logger.InTx.Sample(&zerolog.BasicSampler{N: 10}) for { select { case <-ticker.C(): - if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled { - continue - } - if !ob.GetChainParams().IsSupported { + if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { + sampledLogger.Info().Msgf("WatchInTx: inbound observation is disabled for chain %d", ob.chain.ChainId) continue } err := ob.ObserveInTx() @@ -435,6 +434,8 @@ func (ob *BTCChainClient) ObserveInTx() error { } // add block header to zetabridge + // TODO: consider having a separate ticker(from TSS scaning) for posting block headers + // https://github.com/zeta-chain/node/issues/1847 flags := ob.coreContext.GetCrossChainFlags() if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled { err = ob.postBlockHeader(bn) @@ -1129,13 +1130,13 @@ func (ob *BTCChainClient) WatchOutTx() { } defer ticker.Stop() + ob.logger.OutTx.Info().Msgf("WatchInTx started for chain %d", ob.chain.ChainId) + sampledLogger := ob.logger.OutTx.Sample(&zerolog.BasicSampler{N: 10}) for { select { case <-ticker.C(): - if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled { - continue - } - if !ob.GetChainParams().IsSupported { + if !corecontext.IsOutboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { + sampledLogger.Info().Msgf("WatchOutTx: outbound observation is disabled for chain %d", ob.chain.ChainId) continue } trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending) diff --git a/zetaclient/bitcoin/inbound_tracker.go b/zetaclient/bitcoin/inbound_tracker.go index 1e8763cc32..e0dbd595cb 100644 --- a/zetaclient/bitcoin/inbound_tracker.go +++ b/zetaclient/bitcoin/inbound_tracker.go @@ -6,6 +6,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/zeta-chain/zetacore/pkg/coin" + corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" "github.com/zeta-chain/zetacore/zetaclient/types" "github.com/zeta-chain/zetacore/zetaclient/zetabridge" ) @@ -22,10 +23,7 @@ func (ob *BTCChainClient) WatchIntxTracker() { for { select { case <-ticker.C(): - if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled { - continue - } - if !ob.GetChainParams().IsSupported { + if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { continue } err := ob.ObserveTrackerSuggestions() diff --git a/zetaclient/core_context/zeta_core_context.go b/zetaclient/core_context/zeta_core_context.go index d35c502c86..346563be04 100644 --- a/zetaclient/core_context/zeta_core_context.go +++ b/zetaclient/core_context/zeta_core_context.go @@ -179,3 +179,15 @@ func (c *ZetaCoreContext) Update( c.currentTssPubkey = tssPubKey } } + +// IsOutboundObservationEnabled returns true if the chain is supported and outbound flag is enabled +func IsOutboundObservationEnabled(c *ZetaCoreContext, chainParams observertypes.ChainParams) bool { + flags := c.GetCrossChainFlags() + return chainParams.IsSupported && flags.IsOutboundEnabled +} + +// IsInboundObservationEnabled returns true if the chain is supported and inbound flag is enabled +func IsInboundObservationEnabled(c *ZetaCoreContext, chainParams observertypes.ChainParams) bool { + flags := c.GetCrossChainFlags() + return chainParams.IsSupported && flags.IsInboundEnabled +} diff --git a/zetaclient/core_context/zeta_core_context_test.go b/zetaclient/core_context/zeta_core_context_test.go index 14f36a1cc2..8745fdc95c 100644 --- a/zetaclient/core_context/zeta_core_context_test.go +++ b/zetaclient/core_context/zeta_core_context_test.go @@ -3,6 +3,7 @@ package corecontext_test import ( "testing" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/testutil/sample" @@ -12,6 +13,44 @@ import ( corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" ) +func assertPanic(t *testing.T, f func(), errorLog string) { + defer func() { + r := recover() + if r != nil { + require.Contains(t, r, errorLog) + } + }() + f() +} + +func getTestCoreContext( + evmChain chains.Chain, + evmChainParams *observertypes.ChainParams, + ccFlags observertypes.CrosschainFlags) *corecontext.ZetaCoreContext { + // create config + cfg := config.NewConfig() + cfg.EVMChainConfigs[evmChain.ChainId] = config.EVMConfig{ + Chain: evmChain, + } + // create core context + coreContext := corecontext.NewZetaCoreContext(cfg) + evmChainParamsMap := make(map[int64]*observertypes.ChainParams) + evmChainParamsMap[evmChain.ChainId] = evmChainParams + + // feed chain params + coreContext.Update( + &observertypes.Keygen{}, + []chains.Chain{evmChain}, + evmChainParamsMap, + nil, + "", + ccFlags, + true, + zerolog.Logger{}, + ) + return coreContext +} + func TestNewZetaCoreContext(t *testing.T) { t.Run("should create new zeta core context with empty config", func(t *testing.T) { testCfg := config.NewConfig() @@ -264,12 +303,60 @@ func TestUpdateZetaCoreContext(t *testing.T) { }) } -func assertPanic(t *testing.T, f func(), errorLog string) { - defer func() { - r := recover() - if r != nil { - require.Contains(t, r, errorLog) +func TestIsOutboundObservationEnabled(t *testing.T) { + // create test chain params and flags + evmChain := chains.EthChain() + ccFlags := *sample.CrosschainFlags() + chainParams := &observertypes.ChainParams{ + ChainId: evmChain.ChainId, + IsSupported: true, + } + + t.Run("should return true if chain is supported and outbound flag is enabled", func(t *testing.T) { + coreCTX := getTestCoreContext(evmChain, chainParams, ccFlags) + require.True(t, corecontext.IsOutboundObservationEnabled(coreCTX, *chainParams)) + }) + t.Run("should return false if chain is not supported yet", func(t *testing.T) { + paramsUnsupported := &observertypes.ChainParams{ + ChainId: evmChain.ChainId, + IsSupported: false, } - }() - f() + coreCTXUnsupported := getTestCoreContext(evmChain, paramsUnsupported, ccFlags) + require.False(t, corecontext.IsOutboundObservationEnabled(coreCTXUnsupported, *paramsUnsupported)) + }) + t.Run("should return false if outbound flag is disabled", func(t *testing.T) { + flagsDisabled := ccFlags + flagsDisabled.IsOutboundEnabled = false + coreCTXDisabled := getTestCoreContext(evmChain, chainParams, flagsDisabled) + require.False(t, corecontext.IsOutboundObservationEnabled(coreCTXDisabled, *chainParams)) + }) +} + +func TestIsInboundObservationEnabled(t *testing.T) { + // create test chain params and flags + evmChain := chains.EthChain() + ccFlags := *sample.CrosschainFlags() + chainParams := &observertypes.ChainParams{ + ChainId: evmChain.ChainId, + IsSupported: true, + } + + t.Run("should return true if chain is supported and inbound flag is enabled", func(t *testing.T) { + coreCTX := getTestCoreContext(evmChain, chainParams, ccFlags) + require.True(t, corecontext.IsInboundObservationEnabled(coreCTX, *chainParams)) + }) + t.Run("should return false if chain is not supported yet", func(t *testing.T) { + paramsUnsupported := &observertypes.ChainParams{ + ChainId: evmChain.ChainId, + IsSupported: false, + } + coreCTXUnsupported := getTestCoreContext(evmChain, paramsUnsupported, ccFlags) + require.False(t, corecontext.IsInboundObservationEnabled(coreCTXUnsupported, *paramsUnsupported)) + }) + t.Run("should return false if inbound flag is disabled", func(t *testing.T) { + flagsDisabled := ccFlags + flagsDisabled.IsInboundEnabled = false + coreCTXDisabled := getTestCoreContext(evmChain, chainParams, flagsDisabled) + require.False(t, corecontext.IsInboundObservationEnabled(coreCTXDisabled, *chainParams)) + }) } diff --git a/zetaclient/evm/evm_client.go b/zetaclient/evm/evm_client.go index 817ee66754..450176df90 100644 --- a/zetaclient/evm/evm_client.go +++ b/zetaclient/evm/evm_client.go @@ -619,14 +619,14 @@ func (ob *ChainClient) WatchOutTx() { return } + ob.logger.OutTx.Info().Msgf("WatchOutTx started for chain %d", ob.chain.ChainId) + sampledLogger := ob.logger.OutTx.Sample(&zerolog.BasicSampler{N: 10}) defer ticker.Stop() for { select { case <-ticker.C(): - if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled { - continue - } - if !ob.GetChainParams().IsSupported { + if !corecontext.IsOutboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { + sampledLogger.Info().Msgf("WatchOutTx: outbound observation is disabled for chain %d", ob.chain.ChainId) continue } trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending) @@ -639,22 +639,22 @@ func (ob *ChainClient) WatchOutTx() { continue } txCount := 0 - var receipt *ethtypes.Receipt - var transaction *ethtypes.Transaction + var outtxReceipt *ethtypes.Receipt + var outtx *ethtypes.Transaction for _, txHash := range tracker.HashList { - if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok { + if receipt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok { txCount++ - receipt = recpt - transaction = tx + outtxReceipt = receipt + outtx = tx ob.logger.OutTx.Info().Msgf("WatchOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt) if txCount > 1 { ob.logger.OutTx.Error().Msgf( - "WatchOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction) + "WatchOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, outtxReceipt, outtx) } } } if txCount == 1 { // should be only one txHash confirmed for each nonce. - ob.SetTxNReceipt(nonceInt, receipt, transaction) + ob.SetTxNReceipt(nonceInt, outtxReceipt, outtx) } else if txCount > 1 { // should not happen. We can't tell which txHash is true. It might happen (e.g. glitchy/hacked endpoint) ob.logger.OutTx.Error().Msgf("WatchOutTx: confirmed multiple (%d) outTx for chain %d nonce %d", txCount, ob.chain.ChainId, nonceInt) } @@ -841,11 +841,8 @@ func (ob *ChainClient) WatchInTx() { for { select { case <-ticker.C(): - if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled { - continue - } - if !ob.GetChainParams().IsSupported { - sampledLogger.Info().Msgf("WatchInTx: chain %d is not supported", ob.chain.ChainId) + if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { + sampledLogger.Info().Msgf("WatchInTx: inbound observation is disabled for chain %d", ob.chain.ChainId) continue } err := ob.observeInTX(sampledLogger) @@ -1127,6 +1124,7 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64) uint64 { for bn := startBlock; bn <= toBlock; bn++ { // post new block header (if any) to zetabridge and ignore error // TODO: consider having a independent ticker(from TSS scaning) for posting block headers + // https://github.com/zeta-chain/node/issues/1847 flags := ob.coreContext.GetCrossChainFlags() if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled && diff --git a/zetaclient/evm/inbounds.go b/zetaclient/evm/inbounds.go index 3dbed486e9..819ab09f77 100644 --- a/zetaclient/evm/inbounds.go +++ b/zetaclient/evm/inbounds.go @@ -20,6 +20,7 @@ import ( "github.com/zeta-chain/zetacore/x/crosschain/types" "github.com/zeta-chain/zetacore/zetaclient/compliance" "github.com/zeta-chain/zetacore/zetaclient/config" + corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" clienttypes "github.com/zeta-chain/zetacore/zetaclient/types" "github.com/zeta-chain/zetacore/zetaclient/zetabridge" "golang.org/x/net/context" @@ -42,10 +43,7 @@ func (ob *ChainClient) WatchIntxTracker() { for { select { case <-ticker.C(): - if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled { - continue - } - if !ob.GetChainParams().IsSupported { + if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { continue } err := ob.ObserveIntxTrackers() diff --git a/zetaclient/zetacore_observer.go b/zetaclient/zetacore_observer.go index 3d9006110f..415b5d5d21 100644 --- a/zetaclient/zetacore_observer.go +++ b/zetaclient/zetacore_observer.go @@ -135,46 +135,43 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) { // schedule keysign for pending cctxs on each chain coreContext := appContext.ZetaCoreContext() - if flags := coreContext.GetCrossChainFlags(); flags.IsOutboundEnabled { - supportedChains := coreContext.GetEnabledChains() - for _, c := range supportedChains { - if c.ChainId == co.bridge.ZetaChain().ChainId { - continue - } - // update chain parameters for signer and chain client - signer, err := co.GetUpdatedSigner(coreContext, c.ChainId) - if err != nil { - co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId) - continue - } - ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId) - if err != nil { - co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId) - continue - } - if !ob.GetChainParams().IsSupported { - co.logger.ZetaChainWatcher.Info().Msgf("startCctxScheduler: chain %d is not supported", c.ChainId) - continue - } - - cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId) - if err != nil { - co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId) - continue - } - // Set Pending transactions prometheus gauge - metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending)) - - // #nosec G701 range is verified - zetaHeight := uint64(bn) - if chains.IsEVMChain(c.ChainId) { - co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer) - } else if chains.IsBitcoinChain(c.ChainId) { - co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer) - } else { - co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId) - continue - } + supportedChains := coreContext.GetEnabledChains() + for _, c := range supportedChains { + if c.ChainId == co.bridge.ZetaChain().ChainId { + continue + } + // update chain parameters for signer and chain client + signer, err := co.GetUpdatedSigner(coreContext, c.ChainId) + if err != nil { + co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId) + continue + } + ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId) + if err != nil { + co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId) + continue + } + if !corecontext.IsOutboundObservationEnabled(coreContext, ob.GetChainParams()) { + continue + } + + cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId) + if err != nil { + co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId) + continue + } + // Set Pending transactions prometheus gauge + metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending)) + + // #nosec G701 range is verified + zetaHeight := uint64(bn) + if chains.IsEVMChain(c.ChainId) { + co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer) + } else if chains.IsBitcoinChain(c.ChainId) { + co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer) + } else { + co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId) + continue } }