Skip to content

Commit

Permalink
a bit refactor and added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ws4charlie committed Apr 4, 2024
1 parent 810f9e0 commit 46f5e98
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 79 deletions.
17 changes: 9 additions & 8 deletions zetaclient/bitcoin/bitcoin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,12 @@ func (ob *BTCChainClient) WatchInTx() {

defer ticker.Stop()
ob.logger.InTx.Info().Msgf("WatchInTx started for chain %d", ob.chain.ChainId)
sampledLogger := ob.logger.InTx.Sample(&zerolog.BasicSampler{N: 10})
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue
}
if !ob.GetChainParams().IsSupported {
if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) {
sampledLogger.Info().Msgf("WatchInTx: inbound observation is disabled for chain %d", ob.chain.ChainId)
continue
}
err := ob.ObserveInTx()
Expand Down Expand Up @@ -435,6 +434,8 @@ func (ob *BTCChainClient) ObserveInTx() error {
}

// add block header to zetabridge
// TODO: consider having a separate ticker(from TSS scaning) for posting block headers
// https://github.com/zeta-chain/node/issues/1847
flags := ob.coreContext.GetCrossChainFlags()
if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled {
err = ob.postBlockHeader(bn)
Expand Down Expand Up @@ -1129,13 +1130,13 @@ func (ob *BTCChainClient) WatchOutTx() {
}

defer ticker.Stop()
ob.logger.OutTx.Info().Msgf("WatchInTx started for chain %d", ob.chain.ChainId)
sampledLogger := ob.logger.OutTx.Sample(&zerolog.BasicSampler{N: 10})
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled {
continue
}
if !ob.GetChainParams().IsSupported {
if !corecontext.IsOutboundObservationEnabled(ob.coreContext, ob.GetChainParams()) {
sampledLogger.Info().Msgf("WatchOutTx: outbound observation is disabled for chain %d", ob.chain.ChainId)
continue
}
trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending)
Expand Down
6 changes: 2 additions & 4 deletions zetaclient/bitcoin/inbound_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/zeta-chain/zetacore/pkg/coin"
corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context"
"github.com/zeta-chain/zetacore/zetaclient/types"
"github.com/zeta-chain/zetacore/zetaclient/zetabridge"
)
Expand All @@ -22,10 +23,7 @@ func (ob *BTCChainClient) WatchIntxTracker() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue
}
if !ob.GetChainParams().IsSupported {
if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) {
continue
}
err := ob.ObserveTrackerSuggestions()
Expand Down
12 changes: 12 additions & 0 deletions zetaclient/core_context/zeta_core_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,15 @@ func (c *ZetaCoreContext) Update(
c.currentTssPubkey = tssPubKey
}
}

// IsOutboundObservationEnabled returns true if the chain is supported and outbound flag is enabled
func IsOutboundObservationEnabled(c *ZetaCoreContext, chainParams observertypes.ChainParams) bool {
flags := c.GetCrossChainFlags()
return chainParams.IsSupported && flags.IsOutboundEnabled
}

// IsInboundObservationEnabled returns true if the chain is supported and inbound flag is enabled
func IsInboundObservationEnabled(c *ZetaCoreContext, chainParams observertypes.ChainParams) bool {
flags := c.GetCrossChainFlags()
return chainParams.IsSupported && flags.IsInboundEnabled
}
101 changes: 94 additions & 7 deletions zetaclient/core_context/zeta_core_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package corecontext_test
import (
"testing"

"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"github.com/zeta-chain/zetacore/pkg/chains"
"github.com/zeta-chain/zetacore/testutil/sample"
Expand All @@ -12,6 +13,44 @@ import (
corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context"
)

func assertPanic(t *testing.T, f func(), errorLog string) {
defer func() {
r := recover()
if r != nil {
require.Contains(t, r, errorLog)
}
}()
f()
}

func getTestCoreContext(
evmChain chains.Chain,
evmChainParams *observertypes.ChainParams,
ccFlags observertypes.CrosschainFlags) *corecontext.ZetaCoreContext {
// create config
cfg := config.NewConfig()
cfg.EVMChainConfigs[evmChain.ChainId] = config.EVMConfig{
Chain: evmChain,
}
// create core context
coreContext := corecontext.NewZetaCoreContext(cfg)
evmChainParamsMap := make(map[int64]*observertypes.ChainParams)
evmChainParamsMap[evmChain.ChainId] = evmChainParams

// feed chain params
coreContext.Update(
&observertypes.Keygen{},
[]chains.Chain{evmChain},
evmChainParamsMap,
nil,
"",
ccFlags,
true,
zerolog.Logger{},
)
return coreContext
}

func TestNewZetaCoreContext(t *testing.T) {
t.Run("should create new zeta core context with empty config", func(t *testing.T) {
testCfg := config.NewConfig()
Expand Down Expand Up @@ -264,12 +303,60 @@ func TestUpdateZetaCoreContext(t *testing.T) {
})
}

func assertPanic(t *testing.T, f func(), errorLog string) {
defer func() {
r := recover()
if r != nil {
require.Contains(t, r, errorLog)
func TestIsOutboundObservationEnabled(t *testing.T) {
// create test chain params and flags
evmChain := chains.EthChain()
ccFlags := *sample.CrosschainFlags()
chainParams := &observertypes.ChainParams{
ChainId: evmChain.ChainId,
IsSupported: true,
}

t.Run("should return true if chain is supported and outbound flag is enabled", func(t *testing.T) {
coreCTX := getTestCoreContext(evmChain, chainParams, ccFlags)
require.True(t, corecontext.IsOutboundObservationEnabled(coreCTX, *chainParams))
})
t.Run("should return false if chain is not supported yet", func(t *testing.T) {
paramsUnsupported := &observertypes.ChainParams{
ChainId: evmChain.ChainId,
IsSupported: false,
}
}()
f()
coreCTXUnsupported := getTestCoreContext(evmChain, paramsUnsupported, ccFlags)
require.False(t, corecontext.IsOutboundObservationEnabled(coreCTXUnsupported, *paramsUnsupported))
})
t.Run("should return false if outbound flag is disabled", func(t *testing.T) {
flagsDisabled := ccFlags
flagsDisabled.IsOutboundEnabled = false
coreCTXDisabled := getTestCoreContext(evmChain, chainParams, flagsDisabled)
require.False(t, corecontext.IsOutboundObservationEnabled(coreCTXDisabled, *chainParams))
})
}

func TestIsInboundObservationEnabled(t *testing.T) {
// create test chain params and flags
evmChain := chains.EthChain()
ccFlags := *sample.CrosschainFlags()
chainParams := &observertypes.ChainParams{
ChainId: evmChain.ChainId,
IsSupported: true,
}

t.Run("should return true if chain is supported and inbound flag is enabled", func(t *testing.T) {
coreCTX := getTestCoreContext(evmChain, chainParams, ccFlags)
require.True(t, corecontext.IsInboundObservationEnabled(coreCTX, *chainParams))
})
t.Run("should return false if chain is not supported yet", func(t *testing.T) {
paramsUnsupported := &observertypes.ChainParams{
ChainId: evmChain.ChainId,
IsSupported: false,
}
coreCTXUnsupported := getTestCoreContext(evmChain, paramsUnsupported, ccFlags)
require.False(t, corecontext.IsInboundObservationEnabled(coreCTXUnsupported, *paramsUnsupported))
})
t.Run("should return false if inbound flag is disabled", func(t *testing.T) {
flagsDisabled := ccFlags
flagsDisabled.IsInboundEnabled = false
coreCTXDisabled := getTestCoreContext(evmChain, chainParams, flagsDisabled)
require.False(t, corecontext.IsInboundObservationEnabled(coreCTXDisabled, *chainParams))
})
}
30 changes: 14 additions & 16 deletions zetaclient/evm/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,14 +619,14 @@ func (ob *ChainClient) WatchOutTx() {
return
}

ob.logger.OutTx.Info().Msgf("WatchOutTx started for chain %d", ob.chain.ChainId)
sampledLogger := ob.logger.OutTx.Sample(&zerolog.BasicSampler{N: 10})
defer ticker.Stop()
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsOutboundEnabled {
continue
}
if !ob.GetChainParams().IsSupported {
if !corecontext.IsOutboundObservationEnabled(ob.coreContext, ob.GetChainParams()) {
sampledLogger.Info().Msgf("WatchOutTx: outbound observation is disabled for chain %d", ob.chain.ChainId)
continue
}
trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending)
Expand All @@ -639,22 +639,22 @@ func (ob *ChainClient) WatchOutTx() {
continue
}
txCount := 0
var receipt *ethtypes.Receipt
var transaction *ethtypes.Transaction
var outtxReceipt *ethtypes.Receipt
var outtx *ethtypes.Transaction
for _, txHash := range tracker.HashList {
if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok {
if receipt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok {
txCount++
receipt = recpt
transaction = tx
outtxReceipt = receipt
outtx = tx
ob.logger.OutTx.Info().Msgf("WatchOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt)
if txCount > 1 {
ob.logger.OutTx.Error().Msgf(
"WatchOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction)
"WatchOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, outtxReceipt, outtx)
}
}
}
if txCount == 1 { // should be only one txHash confirmed for each nonce.
ob.SetTxNReceipt(nonceInt, receipt, transaction)
ob.SetTxNReceipt(nonceInt, outtxReceipt, outtx)
} else if txCount > 1 { // should not happen. We can't tell which txHash is true. It might happen (e.g. glitchy/hacked endpoint)
ob.logger.OutTx.Error().Msgf("WatchOutTx: confirmed multiple (%d) outTx for chain %d nonce %d", txCount, ob.chain.ChainId, nonceInt)
}
Expand Down Expand Up @@ -841,11 +841,8 @@ func (ob *ChainClient) WatchInTx() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue
}
if !ob.GetChainParams().IsSupported {
sampledLogger.Info().Msgf("WatchInTx: chain %d is not supported", ob.chain.ChainId)
if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) {
sampledLogger.Info().Msgf("WatchInTx: inbound observation is disabled for chain %d", ob.chain.ChainId)
continue
}
err := ob.observeInTX(sampledLogger)
Expand Down Expand Up @@ -1127,6 +1124,7 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64) uint64 {
for bn := startBlock; bn <= toBlock; bn++ {
// post new block header (if any) to zetabridge and ignore error
// TODO: consider having a independent ticker(from TSS scaning) for posting block headers
// https://github.com/zeta-chain/node/issues/1847
flags := ob.coreContext.GetCrossChainFlags()
if flags.BlockHeaderVerificationFlags != nil &&
flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled &&
Expand Down
6 changes: 2 additions & 4 deletions zetaclient/evm/inbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/zeta-chain/zetacore/x/crosschain/types"
"github.com/zeta-chain/zetacore/zetaclient/compliance"
"github.com/zeta-chain/zetacore/zetaclient/config"
corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context"
clienttypes "github.com/zeta-chain/zetacore/zetaclient/types"
"github.com/zeta-chain/zetacore/zetaclient/zetabridge"
"golang.org/x/net/context"
Expand All @@ -42,10 +43,7 @@ func (ob *ChainClient) WatchIntxTracker() {
for {
select {
case <-ticker.C():
if flags := ob.coreContext.GetCrossChainFlags(); !flags.IsInboundEnabled {
continue
}
if !ob.GetChainParams().IsSupported {
if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) {
continue
}
err := ob.ObserveIntxTrackers()
Expand Down
77 changes: 37 additions & 40 deletions zetaclient/zetacore_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,46 +135,43 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) {

// schedule keysign for pending cctxs on each chain
coreContext := appContext.ZetaCoreContext()
if flags := coreContext.GetCrossChainFlags(); flags.IsOutboundEnabled {
supportedChains := coreContext.GetEnabledChains()
for _, c := range supportedChains {
if c.ChainId == co.bridge.ZetaChain().ChainId {
continue
}
// update chain parameters for signer and chain client
signer, err := co.GetUpdatedSigner(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId)
continue
}
ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId)
continue
}
if !ob.GetChainParams().IsSupported {
co.logger.ZetaChainWatcher.Info().Msgf("startCctxScheduler: chain %d is not supported", c.ChainId)
continue
}

cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId)
continue
}
// Set Pending transactions prometheus gauge
metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending))

// #nosec G701 range is verified
zetaHeight := uint64(bn)
if chains.IsEVMChain(c.ChainId) {
co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else if chains.IsBitcoinChain(c.ChainId) {
co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else {
co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId)
continue
}
supportedChains := coreContext.GetEnabledChains()
for _, c := range supportedChains {
if c.ChainId == co.bridge.ZetaChain().ChainId {
continue
}
// update chain parameters for signer and chain client
signer, err := co.GetUpdatedSigner(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getUpdatedSigner failed for chain %d", c.ChainId)
continue
}
ob, err := co.GetUpdatedChainClient(coreContext, c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId)
continue
}
if !corecontext.IsOutboundObservationEnabled(coreContext, ob.GetChainParams()) {
continue
}

cctxList, totalPending, err := co.bridge.ListPendingCctx(c.ChainId)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: ListPendingCctx failed for chain %d", c.ChainId)
continue
}
// Set Pending transactions prometheus gauge
metrics.PendingTxsPerChain.WithLabelValues(c.ChainName.String()).Set(float64(totalPending))

// #nosec G701 range is verified
zetaHeight := uint64(bn)
if chains.IsEVMChain(c.ChainId) {
co.scheduleCctxEVM(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else if chains.IsBitcoinChain(c.ChainId) {
co.scheduleCctxBTC(outTxMan, zetaHeight, c.ChainId, cctxList, ob, signer)
} else {
co.logger.ZetaChainWatcher.Error().Msgf("startCctxScheduler: unsupported chain %d", c.ChainId)
continue
}
}

Expand Down

0 comments on commit 46f5e98

Please sign in to comment.