diff --git a/changelog.md b/changelog.md index 73e8e04e28..1fa1f6d681 100644 --- a/changelog.md +++ b/changelog.md @@ -58,6 +58,7 @@ * [1955](https://github.com/zeta-chain/node/pull/1955) - improve emissions module coverage * [1941](https://github.com/zeta-chain/node/pull/1941) - add unit tests for zetabridge package * [1985](https://github.com/zeta-chain/node/pull/1985) - improve fungible module coverage +* [1992](https://github.com/zeta-chain/node/pull/1992) - remove setupKeeper from crosschain module ### Fixes @@ -66,6 +67,8 @@ * [1883](https://github.com/zeta-chain/node/issues/1883) - zetaclient should check 'IsSupported' flag to pause/unpause a specific chain * [1633](https://github.com/zeta-chain/node/issues/1633) - zetaclient should be able to pick up new connector and erc20Custody addresses * [1944](https://github.com/zeta-chain/node/pull/1944) - fix evm signer unit tests +* [1888](https://github.com/zeta-chain/node/issues/1888) - zetaclient should stop inbound/outbound txs according to cross-chain flags +* [1970](https://github.com/zeta-chain/node/issues/1970) - remove the timeout in the evm outtx tracker processing thread ### Chores diff --git a/x/crosschain/keeper/gas_price_test.go b/x/crosschain/keeper/gas_price_test.go index 67aeb95cd6..bdd867081e 100644 --- a/x/crosschain/keeper/gas_price_test.go +++ b/x/crosschain/keeper/gas_price_test.go @@ -1,4 +1,4 @@ -package keeper +package keeper_test import ( "strconv" @@ -6,11 +6,13 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/require" + keepertest "github.com/zeta-chain/zetacore/testutil/keeper" + "github.com/zeta-chain/zetacore/x/crosschain/keeper" "github.com/zeta-chain/zetacore/x/crosschain/types" ) // Keeper Tests -func createNGasPrice(keeper *Keeper, ctx sdk.Context, n int) []types.GasPrice { +func createNGasPrice(keeper *keeper.Keeper, ctx sdk.Context, n int) []types.GasPrice { items := make([]types.GasPrice, n) for i := range items { items[i].Creator = "any" @@ -22,26 +24,27 @@ func createNGasPrice(keeper *Keeper, ctx sdk.Context, n int) []types.GasPrice { } func TestGasPriceGet(t *testing.T) { - keeper, ctx := setupKeeper(t) - items := createNGasPrice(keeper, ctx, 10) + k, ctx, _, _ := keepertest.CrosschainKeeper(t) + items := createNGasPrice(k, ctx, 10) for _, item := range items { - rst, found := keeper.GetGasPrice(ctx, item.ChainId) + rst, found := k.GetGasPrice(ctx, item.ChainId) require.True(t, found) require.Equal(t, item, rst) } } + func TestGasPriceRemove(t *testing.T) { - keeper, ctx := setupKeeper(t) - items := createNGasPrice(keeper, ctx, 10) + k, ctx, _, _ := keepertest.CrosschainKeeper(t) + items := createNGasPrice(k, ctx, 10) for _, item := range items { - keeper.RemoveGasPrice(ctx, item.Index) - _, found := keeper.GetGasPrice(ctx, item.ChainId) + k.RemoveGasPrice(ctx, item.Index) + _, found := k.GetGasPrice(ctx, item.ChainId) require.False(t, found) } } func TestGasPriceGetAll(t *testing.T) { - keeper, ctx := setupKeeper(t) - items := createNGasPrice(keeper, ctx, 10) - require.Equal(t, items, keeper.GetAllGasPrice(ctx)) + k, ctx, _, _ := keepertest.CrosschainKeeper(t) + items := createNGasPrice(k, ctx, 10) + require.Equal(t, items, k.GetAllGasPrice(ctx)) } diff --git a/x/crosschain/keeper/grpc_query_gas_price_test.go b/x/crosschain/keeper/grpc_query_gas_price_test.go index b8e9f0606e..bba8f9ae74 100644 --- a/x/crosschain/keeper/grpc_query_gas_price_test.go +++ b/x/crosschain/keeper/grpc_query_gas_price_test.go @@ -1,4 +1,4 @@ -package keeper +package keeper_test import ( "fmt" @@ -7,15 +7,16 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/query" "github.com/stretchr/testify/require" + keepertest "github.com/zeta-chain/zetacore/testutil/keeper" "github.com/zeta-chain/zetacore/x/crosschain/types" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func TestGasPriceQuerySingle(t *testing.T) { - keeper, ctx := setupKeeper(t) + k, ctx, _, _ := keepertest.CrosschainKeeper(t) wctx := sdk.WrapSDKContext(ctx) - msgs := createNGasPrice(keeper, ctx, 2) + msgs := createNGasPrice(k, ctx, 2) for _, tc := range []struct { desc string request *types.QueryGetGasPriceRequest @@ -49,7 +50,7 @@ func TestGasPriceQuerySingle(t *testing.T) { } { tc := tc t.Run(tc.desc, func(t *testing.T) { - response, err := keeper.GasPrice(wctx, tc.request) + response, err := k.GasPrice(wctx, tc.request) if tc.err != nil { require.Error(t, err) } else { @@ -60,9 +61,9 @@ func TestGasPriceQuerySingle(t *testing.T) { } func TestGasPriceQueryPaginated(t *testing.T) { - keeper, ctx := setupKeeper(t) + k, ctx, _, _ := keepertest.CrosschainKeeper(t) wctx := sdk.WrapSDKContext(ctx) - msgs := createNGasPrice(keeper, ctx, 5) + msgs := createNGasPrice(k, ctx, 5) request := func(next []byte, offset, limit uint64, total bool) *types.QueryAllGasPriceRequest { return &types.QueryAllGasPriceRequest{ @@ -77,7 +78,7 @@ func TestGasPriceQueryPaginated(t *testing.T) { t.Run("ByOffset", func(t *testing.T) { step := 2 for i := 0; i < len(msgs); i += step { - resp, err := keeper.GasPriceAll(wctx, request(nil, uint64(i), uint64(step), false)) + resp, err := k.GasPriceAll(wctx, request(nil, uint64(i), uint64(step), false)) require.NoError(t, err) for j := i; j < len(msgs) && j < i+step; j++ { require.Equal(t, &msgs[j], resp.GasPrice[j-i]) @@ -88,7 +89,7 @@ func TestGasPriceQueryPaginated(t *testing.T) { step := 2 var next []byte for i := 0; i < len(msgs); i += step { - resp, err := keeper.GasPriceAll(wctx, request(next, 0, uint64(step), false)) + resp, err := k.GasPriceAll(wctx, request(next, 0, uint64(step), false)) require.NoError(t, err) for j := i; j < len(msgs) && j < i+step; j++ { require.Equal(t, &msgs[j], resp.GasPrice[j-i]) @@ -97,12 +98,12 @@ func TestGasPriceQueryPaginated(t *testing.T) { } }) t.Run("Total", func(t *testing.T) { - resp, err := keeper.GasPriceAll(wctx, request(nil, 0, 0, true)) + resp, err := k.GasPriceAll(wctx, request(nil, 0, 0, true)) require.NoError(t, err) require.Equal(t, len(msgs), int(resp.Pagination.Total)) }) t.Run("InvalidRequest", func(t *testing.T) { - _, err := keeper.GasPriceAll(wctx, nil) + _, err := k.GasPriceAll(wctx, nil) require.ErrorIs(t, err, status.Error(codes.InvalidArgument, "invalid request")) }) } diff --git a/x/crosschain/keeper/grpc_query_last_block_height_test.go b/x/crosschain/keeper/grpc_query_last_block_height_test.go index 057fe120b9..81e3c7d2d9 100644 --- a/x/crosschain/keeper/grpc_query_last_block_height_test.go +++ b/x/crosschain/keeper/grpc_query_last_block_height_test.go @@ -1,4 +1,4 @@ -package keeper +package keeper_test import ( "math" @@ -7,15 +7,16 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/query" "github.com/stretchr/testify/require" + keepertest "github.com/zeta-chain/zetacore/testutil/keeper" "github.com/zeta-chain/zetacore/x/crosschain/types" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func TestLastBlockHeightQuerySingle(t *testing.T) { - keeper, ctx := setupKeeper(t) + k, ctx, _, _ := keepertest.CrosschainKeeper(t) wctx := sdk.WrapSDKContext(ctx) - msgs := createNLastBlockHeight(keeper, ctx, 2) + msgs := createNLastBlockHeight(k, ctx, 2) for _, tc := range []struct { desc string request *types.QueryGetLastBlockHeightRequest @@ -44,7 +45,7 @@ func TestLastBlockHeightQuerySingle(t *testing.T) { } { tc := tc t.Run(tc.desc, func(t *testing.T) { - response, err := keeper.LastBlockHeight(wctx, tc.request) + response, err := k.LastBlockHeight(wctx, tc.request) if tc.err != nil { require.ErrorIs(t, err, tc.err) } else { @@ -56,14 +57,14 @@ func TestLastBlockHeightQuerySingle(t *testing.T) { func TestLastBlockHeightLimits(t *testing.T) { t.Run("should err if last send height is max int", func(t *testing.T) { - keeper, ctx := setupKeeper(t) + k, ctx, _, _ := keepertest.CrosschainKeeper(t) wctx := sdk.WrapSDKContext(ctx) - keeper.SetLastBlockHeight(ctx, types.LastBlockHeight{ + k.SetLastBlockHeight(ctx, types.LastBlockHeight{ Index: "index", LastSendHeight: math.MaxInt64, }) - res, err := keeper.LastBlockHeight(wctx, &types.QueryGetLastBlockHeightRequest{ + res, err := k.LastBlockHeight(wctx, &types.QueryGetLastBlockHeightRequest{ Index: "index", }) require.Nil(t, res) @@ -71,15 +72,15 @@ func TestLastBlockHeightLimits(t *testing.T) { }) t.Run("should err if last receive height is max int", func(t *testing.T) { - keeper, ctx := setupKeeper(t) + k, ctx, _, _ := keepertest.CrosschainKeeper(t) wctx := sdk.WrapSDKContext(ctx) - keeper.SetLastBlockHeight(ctx, types.LastBlockHeight{ + k.SetLastBlockHeight(ctx, types.LastBlockHeight{ Index: "index", LastSendHeight: 10, LastReceiveHeight: math.MaxInt64, }) - res, err := keeper.LastBlockHeight(wctx, &types.QueryGetLastBlockHeightRequest{ + res, err := k.LastBlockHeight(wctx, &types.QueryGetLastBlockHeightRequest{ Index: "index", }) require.Nil(t, res) @@ -88,9 +89,9 @@ func TestLastBlockHeightLimits(t *testing.T) { } func TestLastBlockHeightQueryPaginated(t *testing.T) { - keeper, ctx := setupKeeper(t) + k, ctx, _, _ := keepertest.CrosschainKeeper(t) wctx := sdk.WrapSDKContext(ctx) - msgs := createNLastBlockHeight(keeper, ctx, 5) + msgs := createNLastBlockHeight(k, ctx, 5) request := func(next []byte, offset, limit uint64, total bool) *types.QueryAllLastBlockHeightRequest { return &types.QueryAllLastBlockHeightRequest{ @@ -105,7 +106,7 @@ func TestLastBlockHeightQueryPaginated(t *testing.T) { t.Run("ByOffset", func(t *testing.T) { step := 2 for i := 0; i < len(msgs); i += step { - resp, err := keeper.LastBlockHeightAll(wctx, request(nil, uint64(i), uint64(step), false)) + resp, err := k.LastBlockHeightAll(wctx, request(nil, uint64(i), uint64(step), false)) require.NoError(t, err) for j := i; j < len(msgs) && j < i+step; j++ { require.Equal(t, &msgs[j], resp.LastBlockHeight[j-i]) @@ -116,7 +117,7 @@ func TestLastBlockHeightQueryPaginated(t *testing.T) { step := 2 var next []byte for i := 0; i < len(msgs); i += step { - resp, err := keeper.LastBlockHeightAll(wctx, request(next, 0, uint64(step), false)) + resp, err := k.LastBlockHeightAll(wctx, request(next, 0, uint64(step), false)) require.NoError(t, err) for j := i; j < len(msgs) && j < i+step; j++ { require.Equal(t, &msgs[j], resp.LastBlockHeight[j-i]) @@ -125,12 +126,12 @@ func TestLastBlockHeightQueryPaginated(t *testing.T) { } }) t.Run("Total", func(t *testing.T) { - resp, err := keeper.LastBlockHeightAll(wctx, request(nil, 0, 0, true)) + resp, err := k.LastBlockHeightAll(wctx, request(nil, 0, 0, true)) require.NoError(t, err) require.Equal(t, len(msgs), int(resp.Pagination.Total)) }) t.Run("InvalidRequest", func(t *testing.T) { - _, err := keeper.LastBlockHeightAll(wctx, nil) + _, err := k.LastBlockHeightAll(wctx, nil) require.ErrorIs(t, err, status.Error(codes.InvalidArgument, "invalid request")) }) } diff --git a/x/crosschain/keeper/keeper_test.go b/x/crosschain/keeper/keeper_test.go deleted file mode 100644 index 58cc10896e..0000000000 --- a/x/crosschain/keeper/keeper_test.go +++ /dev/null @@ -1,65 +0,0 @@ -package keeper - -import ( - "testing" - - "github.com/cosmos/cosmos-sdk/codec" - codectypes "github.com/cosmos/cosmos-sdk/codec/types" - "github.com/cosmos/cosmos-sdk/store" - storetypes "github.com/cosmos/cosmos-sdk/store/types" - sdk "github.com/cosmos/cosmos-sdk/types" - authkeeper "github.com/cosmos/cosmos-sdk/x/auth/keeper" - bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper" - typesparams "github.com/cosmos/cosmos-sdk/x/params/types" - stakingkeeper "github.com/cosmos/cosmos-sdk/x/staking/keeper" - "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/libs/log" - tmproto "github.com/tendermint/tendermint/proto/tendermint/types" - tmdb "github.com/tendermint/tm-db" - authoritykeeper "github.com/zeta-chain/zetacore/x/authority/keeper" - "github.com/zeta-chain/zetacore/x/crosschain/types" - fungiblekeeper "github.com/zeta-chain/zetacore/x/fungible/keeper" - "github.com/zeta-chain/zetacore/x/observer/keeper" -) - -func setupKeeper(t testing.TB) (*Keeper, sdk.Context) { - storeKey := sdk.NewKVStoreKey(types.StoreKey) - memStoreKey := storetypes.NewMemoryStoreKey(types.MemStoreKey) - - db := tmdb.NewMemDB() - stateStore := store.NewCommitMultiStore(db) - stateStore.MountStoreWithDB(storeKey, storetypes.StoreTypeIAVL, db) - stateStore.MountStoreWithDB(memStoreKey, storetypes.StoreTypeMemory, nil) - require.NoError(t, stateStore.LoadLatestVersion()) - - registry := codectypes.NewInterfaceRegistry() - cdc := codec.NewProtoCodec(registry) - - paramsSubspace := typesparams.NewSubspace(cdc, - types.Amino, - storeKey, - memStoreKey, - "ZetacoreParams", - ) - bankKeeper := bankkeeper.BaseKeeper{} - authKeeper := authkeeper.AccountKeeper{} - observerKeeper := keeper.Keeper{} - fungibleKeeper := fungiblekeeper.Keeper{} - authorityKeeper := authoritykeeper.Keeper{} - - k := NewKeeper( - codec.NewProtoCodec(registry), - storeKey, - memStoreKey, - stakingkeeper.Keeper{}, // custom - paramsSubspace, - authKeeper, - bankKeeper, - observerKeeper, - &fungibleKeeper, - authorityKeeper, - ) - - ctx := sdk.NewContext(stateStore, tmproto.Header{}, false, log.NewNopLogger()) - return k, ctx -} diff --git a/x/crosschain/keeper/last_block_height_test.go b/x/crosschain/keeper/last_block_height_test.go index 8a284f6f97..707b89ff02 100644 --- a/x/crosschain/keeper/last_block_height_test.go +++ b/x/crosschain/keeper/last_block_height_test.go @@ -1,4 +1,4 @@ -package keeper +package keeper_test import ( "fmt" @@ -6,10 +6,12 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/require" + keepertest "github.com/zeta-chain/zetacore/testutil/keeper" + "github.com/zeta-chain/zetacore/x/crosschain/keeper" "github.com/zeta-chain/zetacore/x/crosschain/types" ) -func createNLastBlockHeight(keeper *Keeper, ctx sdk.Context, n int) []types.LastBlockHeight { +func createNLastBlockHeight(keeper *keeper.Keeper, ctx sdk.Context, n int) []types.LastBlockHeight { items := make([]types.LastBlockHeight, n) for i := range items { items[i].Creator = "any" @@ -20,26 +22,26 @@ func createNLastBlockHeight(keeper *Keeper, ctx sdk.Context, n int) []types.Last } func TestLastBlockHeightGet(t *testing.T) { - keeper, ctx := setupKeeper(t) - items := createNLastBlockHeight(keeper, ctx, 10) + k, ctx, _, _ := keepertest.CrosschainKeeper(t) + items := createNLastBlockHeight(k, ctx, 10) for _, item := range items { - rst, found := keeper.GetLastBlockHeight(ctx, item.Index) + rst, found := k.GetLastBlockHeight(ctx, item.Index) require.True(t, found) require.Equal(t, item, rst) } } func TestLastBlockHeightRemove(t *testing.T) { - keeper, ctx := setupKeeper(t) - items := createNLastBlockHeight(keeper, ctx, 10) + k, ctx, _, _ := keepertest.CrosschainKeeper(t) + items := createNLastBlockHeight(k, ctx, 10) for _, item := range items { - keeper.RemoveLastBlockHeight(ctx, item.Index) - _, found := keeper.GetLastBlockHeight(ctx, item.Index) + k.RemoveLastBlockHeight(ctx, item.Index) + _, found := k.GetLastBlockHeight(ctx, item.Index) require.False(t, found) } } func TestLastBlockHeightGetAll(t *testing.T) { - keeper, ctx := setupKeeper(t) - items := createNLastBlockHeight(keeper, ctx, 10) - require.Equal(t, items, keeper.GetAllLastBlockHeight(ctx)) + k, ctx, _, _ := keepertest.CrosschainKeeper(t) + items := createNLastBlockHeight(k, ctx, 10) + require.Equal(t, items, k.GetAllLastBlockHeight(ctx)) } diff --git a/zetaclient/bitcoin/bitcoin_client.go b/zetaclient/bitcoin/bitcoin_client.go index 51bc1ab758..110c89c474 100644 --- a/zetaclient/bitcoin/bitcoin_client.go +++ b/zetaclient/bitcoin/bitcoin_client.go @@ -330,10 +330,12 @@ func (ob *BTCChainClient) WatchInTx() { defer ticker.Stop() ob.logger.InTx.Info().Msgf("WatchInTx started for chain %d", ob.chain.ChainId) + sampledLogger := ob.logger.InTx.Sample(&zerolog.BasicSampler{N: 10}) for { select { case <-ticker.C(): - if !ob.GetChainParams().IsSupported { + if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { + sampledLogger.Info().Msgf("WatchInTx: inbound observation is disabled for chain %d", ob.chain.ChainId) continue } err := ob.ObserveInTx() @@ -384,12 +386,6 @@ func (ob *BTCChainClient) postBlockHeader(tip int64) error { } func (ob *BTCChainClient) ObserveInTx() error { - // make sure inbound TXS / Send is enabled by the protocol - flags := ob.coreContext.GetCrossChainFlags() - if !flags.IsInboundEnabled { - return errors.New("inbound TXS / Send has been disabled by the protocol") - } - // get and update latest block height cnt, err := ob.rpcClient.GetBlockCount() if err != nil { @@ -438,6 +434,9 @@ func (ob *BTCChainClient) ObserveInTx() error { } // add block header to zetabridge + // TODO: consider having a separate ticker(from TSS scaning) for posting block headers + // https://github.com/zeta-chain/node/issues/1847 + flags := ob.coreContext.GetCrossChainFlags() if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled { err = ob.postBlockHeader(bn) if err != nil { @@ -583,11 +582,20 @@ func (ob *BTCChainClient) IsSendOutTxProcessed(cctx *types.CrossChainTx, logger // WatchGasPrice watches Bitcoin chain for gas rate and post to zetacore func (ob *BTCChainClient) WatchGasPrice() { + // report gas price right away as the ticker takes time to kick in + err := ob.PostGasPrice() + if err != nil { + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) + } + + // start gas price ticker ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchGasPrice", ob.GetChainParams().GasPriceTicker) if err != nil { ob.logger.GasPrice.Error().Err(err).Msg("error creating ticker") return } + ob.logger.GasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d", + ob.chain.ChainId, ob.GetChainParams().GasPriceTicker) defer ticker.Stop() for { @@ -1122,10 +1130,13 @@ func (ob *BTCChainClient) WatchOutTx() { } defer ticker.Stop() + ob.logger.OutTx.Info().Msgf("WatchInTx started for chain %d", ob.chain.ChainId) + sampledLogger := ob.logger.OutTx.Sample(&zerolog.BasicSampler{N: 10}) for { select { case <-ticker.C(): - if !ob.GetChainParams().IsSupported { + if !corecontext.IsOutboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { + sampledLogger.Info().Msgf("WatchOutTx: outbound observation is disabled for chain %d", ob.chain.ChainId) continue } trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending) diff --git a/zetaclient/bitcoin/bitcoin_client_test.go b/zetaclient/bitcoin/bitcoin_client_test.go index 58377cdc65..dfa576c9c1 100644 --- a/zetaclient/bitcoin/bitcoin_client_test.go +++ b/zetaclient/bitcoin/bitcoin_client_test.go @@ -712,12 +712,3 @@ func TestGetBtcEventErrors(t *testing.T) { require.Nil(t, event) }) } - -func TestBTCChainClient_ObserveInTx(t *testing.T) { - t.Run("should return error", func(t *testing.T) { - // create mainnet mock client - btcClient := MockBTCClientMainnet() - err := btcClient.ObserveInTx() - require.ErrorContains(t, err, "inbound TXS / Send has been disabled by the protocol") - }) -} diff --git a/zetaclient/bitcoin/inbound_tracker.go b/zetaclient/bitcoin/inbound_tracker.go index b948a6c979..e0dbd595cb 100644 --- a/zetaclient/bitcoin/inbound_tracker.go +++ b/zetaclient/bitcoin/inbound_tracker.go @@ -6,6 +6,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/zeta-chain/zetacore/pkg/coin" + corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" "github.com/zeta-chain/zetacore/zetaclient/types" "github.com/zeta-chain/zetacore/zetaclient/zetabridge" ) @@ -22,7 +23,7 @@ func (ob *BTCChainClient) WatchIntxTracker() { for { select { case <-ticker.C(): - if !ob.GetChainParams().IsSupported { + if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { continue } err := ob.ObserveTrackerSuggestions() diff --git a/zetaclient/core_context/zeta_core_context.go b/zetaclient/core_context/zeta_core_context.go index d35c502c86..346563be04 100644 --- a/zetaclient/core_context/zeta_core_context.go +++ b/zetaclient/core_context/zeta_core_context.go @@ -179,3 +179,15 @@ func (c *ZetaCoreContext) Update( c.currentTssPubkey = tssPubKey } } + +// IsOutboundObservationEnabled returns true if the chain is supported and outbound flag is enabled +func IsOutboundObservationEnabled(c *ZetaCoreContext, chainParams observertypes.ChainParams) bool { + flags := c.GetCrossChainFlags() + return chainParams.IsSupported && flags.IsOutboundEnabled +} + +// IsInboundObservationEnabled returns true if the chain is supported and inbound flag is enabled +func IsInboundObservationEnabled(c *ZetaCoreContext, chainParams observertypes.ChainParams) bool { + flags := c.GetCrossChainFlags() + return chainParams.IsSupported && flags.IsInboundEnabled +} diff --git a/zetaclient/core_context/zeta_core_context_test.go b/zetaclient/core_context/zeta_core_context_test.go index 14f36a1cc2..8745fdc95c 100644 --- a/zetaclient/core_context/zeta_core_context_test.go +++ b/zetaclient/core_context/zeta_core_context_test.go @@ -3,6 +3,7 @@ package corecontext_test import ( "testing" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/testutil/sample" @@ -12,6 +13,44 @@ import ( corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" ) +func assertPanic(t *testing.T, f func(), errorLog string) { + defer func() { + r := recover() + if r != nil { + require.Contains(t, r, errorLog) + } + }() + f() +} + +func getTestCoreContext( + evmChain chains.Chain, + evmChainParams *observertypes.ChainParams, + ccFlags observertypes.CrosschainFlags) *corecontext.ZetaCoreContext { + // create config + cfg := config.NewConfig() + cfg.EVMChainConfigs[evmChain.ChainId] = config.EVMConfig{ + Chain: evmChain, + } + // create core context + coreContext := corecontext.NewZetaCoreContext(cfg) + evmChainParamsMap := make(map[int64]*observertypes.ChainParams) + evmChainParamsMap[evmChain.ChainId] = evmChainParams + + // feed chain params + coreContext.Update( + &observertypes.Keygen{}, + []chains.Chain{evmChain}, + evmChainParamsMap, + nil, + "", + ccFlags, + true, + zerolog.Logger{}, + ) + return coreContext +} + func TestNewZetaCoreContext(t *testing.T) { t.Run("should create new zeta core context with empty config", func(t *testing.T) { testCfg := config.NewConfig() @@ -264,12 +303,60 @@ func TestUpdateZetaCoreContext(t *testing.T) { }) } -func assertPanic(t *testing.T, f func(), errorLog string) { - defer func() { - r := recover() - if r != nil { - require.Contains(t, r, errorLog) +func TestIsOutboundObservationEnabled(t *testing.T) { + // create test chain params and flags + evmChain := chains.EthChain() + ccFlags := *sample.CrosschainFlags() + chainParams := &observertypes.ChainParams{ + ChainId: evmChain.ChainId, + IsSupported: true, + } + + t.Run("should return true if chain is supported and outbound flag is enabled", func(t *testing.T) { + coreCTX := getTestCoreContext(evmChain, chainParams, ccFlags) + require.True(t, corecontext.IsOutboundObservationEnabled(coreCTX, *chainParams)) + }) + t.Run("should return false if chain is not supported yet", func(t *testing.T) { + paramsUnsupported := &observertypes.ChainParams{ + ChainId: evmChain.ChainId, + IsSupported: false, } - }() - f() + coreCTXUnsupported := getTestCoreContext(evmChain, paramsUnsupported, ccFlags) + require.False(t, corecontext.IsOutboundObservationEnabled(coreCTXUnsupported, *paramsUnsupported)) + }) + t.Run("should return false if outbound flag is disabled", func(t *testing.T) { + flagsDisabled := ccFlags + flagsDisabled.IsOutboundEnabled = false + coreCTXDisabled := getTestCoreContext(evmChain, chainParams, flagsDisabled) + require.False(t, corecontext.IsOutboundObservationEnabled(coreCTXDisabled, *chainParams)) + }) +} + +func TestIsInboundObservationEnabled(t *testing.T) { + // create test chain params and flags + evmChain := chains.EthChain() + ccFlags := *sample.CrosschainFlags() + chainParams := &observertypes.ChainParams{ + ChainId: evmChain.ChainId, + IsSupported: true, + } + + t.Run("should return true if chain is supported and inbound flag is enabled", func(t *testing.T) { + coreCTX := getTestCoreContext(evmChain, chainParams, ccFlags) + require.True(t, corecontext.IsInboundObservationEnabled(coreCTX, *chainParams)) + }) + t.Run("should return false if chain is not supported yet", func(t *testing.T) { + paramsUnsupported := &observertypes.ChainParams{ + ChainId: evmChain.ChainId, + IsSupported: false, + } + coreCTXUnsupported := getTestCoreContext(evmChain, paramsUnsupported, ccFlags) + require.False(t, corecontext.IsInboundObservationEnabled(coreCTXUnsupported, *paramsUnsupported)) + }) + t.Run("should return false if inbound flag is disabled", func(t *testing.T) { + flagsDisabled := ccFlags + flagsDisabled.IsInboundEnabled = false + coreCTXDisabled := getTestCoreContext(evmChain, chainParams, flagsDisabled) + require.False(t, corecontext.IsInboundObservationEnabled(coreCTXDisabled, *chainParams)) + }) } diff --git a/zetaclient/evm/evm_client.go b/zetaclient/evm/evm_client.go index 4125e4a999..ba452bb305 100644 --- a/zetaclient/evm/evm_client.go +++ b/zetaclient/evm/evm_client.go @@ -603,61 +603,48 @@ func (ob *ChainClient) IsSendOutTxProcessed(cctx *crosschaintypes.CrossChainTx, // WatchOutTx watches evm chain for outgoing txs status func (ob *ChainClient) WatchOutTx() { - // read env variables if set - timeoutNonce, err := strconv.Atoi(os.Getenv("OS_TIMEOUT_NONCE")) - if err != nil || timeoutNonce <= 0 { - timeoutNonce = 100 * 3 // process up to 100 hashes - } - ob.logger.OutTx.Info().Msgf("WatchOutTx: using timeoutNonce %d seconds", timeoutNonce) - ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchOutTx_%d", ob.chain.ChainId), ob.GetChainParams().OutTxTicker) if err != nil { ob.logger.OutTx.Error().Err(err).Msg("error creating ticker") return } + ob.logger.OutTx.Info().Msgf("WatchOutTx started for chain %d", ob.chain.ChainId) + sampledLogger := ob.logger.OutTx.Sample(&zerolog.BasicSampler{N: 10}) defer ticker.Stop() for { select { case <-ticker.C(): - if !ob.GetChainParams().IsSupported { + if !corecontext.IsOutboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { + sampledLogger.Info().Msgf("WatchOutTx: outbound observation is disabled for chain %d", ob.chain.ChainId) continue } trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending) if err != nil { continue } - //FIXME: remove this timeout here to ensure that all trackers are queried - outTimeout := time.After(time.Duration(timeoutNonce) * time.Second) - TRACKERLOOP: for _, tracker := range trackers { nonceInt := tracker.Nonce if ob.isTxConfirmed(nonceInt) { // Go to next tracker if this one already has a confirmed tx continue } txCount := 0 - var receipt *ethtypes.Receipt - var transaction *ethtypes.Transaction + var outtxReceipt *ethtypes.Receipt + var outtx *ethtypes.Transaction for _, txHash := range tracker.HashList { - select { - case <-outTimeout: - ob.logger.OutTx.Warn().Msgf("WatchOutTx: timeout on chain %d nonce %d", ob.chain.ChainId, nonceInt) - break TRACKERLOOP - default: - if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok { - txCount++ - receipt = recpt - transaction = tx - ob.logger.OutTx.Info().Msgf("WatchOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt) - if txCount > 1 { - ob.logger.OutTx.Error().Msgf( - "WatchOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction) - } + if receipt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok { + txCount++ + outtxReceipt = receipt + outtx = tx + ob.logger.OutTx.Info().Msgf("WatchOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt) + if txCount > 1 { + ob.logger.OutTx.Error().Msgf( + "WatchOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, outtxReceipt, outtx) } } } if txCount == 1 { // should be only one txHash confirmed for each nonce. - ob.SetTxNReceipt(nonceInt, receipt, transaction) + ob.SetTxNReceipt(nonceInt, outtxReceipt, outtx) } else if txCount > 1 { // should not happen. We can't tell which txHash is true. It might happen (e.g. glitchy/hacked endpoint) ob.logger.OutTx.Error().Msgf("WatchOutTx: confirmed multiple (%d) outTx for chain %d nonce %d", txCount, ob.chain.ChainId, nonceInt) } @@ -844,8 +831,8 @@ func (ob *ChainClient) WatchInTx() { for { select { case <-ticker.C(): - if !ob.GetChainParams().IsSupported { - sampledLogger.Info().Msgf("WatchInTx: chain %d is not supported", ob.chain.ChainId) + if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { + sampledLogger.Info().Msgf("WatchInTx: inbound observation is disabled for chain %d", ob.chain.ChainId) continue } err := ob.observeInTX(sampledLogger) @@ -908,12 +895,6 @@ func (ob *ChainClient) postBlockHeader(tip uint64) error { } func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error { - // make sure inbound TXS / Send is enabled by the protocol - flags := ob.coreContext.GetCrossChainFlags() - if !flags.IsInboundEnabled { - return errors.New("inbound TXS / Send has been disabled by the protocol") - } - // get and update latest block height blockNumber, err := ob.evmClient.BlockNumber(context.Background()) if err != nil { @@ -951,7 +932,7 @@ func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error { lastScannedDeposited := ob.ObserveERC20Deposited(startBlock, toBlock) // task 3: query the incoming tx to TSS address (read at most 100 blocks in one go) - lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock, flags) + lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock) // note: using lowest height for all 3 events is not perfect, but it's simple and good enough lastScannedLowest := lastScannedZetaSent @@ -1128,11 +1109,13 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 // ObserverTSSReceive queries the incoming gas asset to TSS address and posts to zetabridge // returns the last block successfully scanned -func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags observertypes.CrosschainFlags) uint64 { +func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64) uint64 { // query incoming gas asset for bn := startBlock; bn <= toBlock; bn++ { // post new block header (if any) to zetabridge and ignore error // TODO: consider having a independent ticker(from TSS scaning) for posting block headers + // https://github.com/zeta-chain/node/issues/1847 + flags := ob.coreContext.GetCrossChainFlags() if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled && chains.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains @@ -1155,23 +1138,20 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags obse // WatchGasPrice watches evm chain for gas prices and post to zetacore func (ob *ChainClient) WatchGasPrice() { - ob.logger.GasPrice.Info().Msg("WatchGasPrice starting...") + // report gas price right away as the ticker takes time to kick in err := ob.PostGasPrice() if err != nil { - height, err := ob.zetaClient.GetBlockHeight() - if err != nil { - ob.logger.GasPrice.Error().Err(err).Msg("GetBlockHeight error") - } else { - ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height) - } + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) } + // start gas price ticker ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchGasPrice_%d", ob.chain.ChainId), ob.GetChainParams().GasPriceTicker) if err != nil { ob.logger.GasPrice.Error().Err(err).Msg("NewDynamicTicker error") return } - ob.logger.GasPrice.Info().Msgf("WatchGasPrice started with interval %d", ob.GetChainParams().GasPriceTicker) + ob.logger.GasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d", + ob.chain.ChainId, ob.GetChainParams().GasPriceTicker) defer ticker.Stop() for { @@ -1182,12 +1162,7 @@ func (ob *ChainClient) WatchGasPrice() { } err = ob.PostGasPrice() if err != nil { - height, err := ob.zetaClient.GetBlockHeight() - if err != nil { - ob.logger.GasPrice.Error().Err(err).Msg("GetBlockHeight error") - } else { - ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height) - } + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) } ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.GasPrice) case <-ob.stop: diff --git a/zetaclient/evm/inbounds.go b/zetaclient/evm/inbounds.go index d6f6fcb223..819ab09f77 100644 --- a/zetaclient/evm/inbounds.go +++ b/zetaclient/evm/inbounds.go @@ -8,6 +8,7 @@ import ( "strings" sdkmath "cosmossdk.io/math" + ethcommon "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/onrik/ethrpc" "github.com/pkg/errors" @@ -15,13 +16,12 @@ import ( "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/zetaconnector.non-eth.sol" "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/pkg/coin" + "github.com/zeta-chain/zetacore/pkg/constant" + "github.com/zeta-chain/zetacore/x/crosschain/types" "github.com/zeta-chain/zetacore/zetaclient/compliance" "github.com/zeta-chain/zetacore/zetaclient/config" + corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" clienttypes "github.com/zeta-chain/zetacore/zetaclient/types" - - ethcommon "github.com/ethereum/go-ethereum/common" - "github.com/zeta-chain/zetacore/pkg/constant" - "github.com/zeta-chain/zetacore/x/crosschain/types" "github.com/zeta-chain/zetacore/zetaclient/zetabridge" "golang.org/x/net/context" ) @@ -43,7 +43,7 @@ func (ob *ChainClient) WatchIntxTracker() { for { select { case <-ticker.C(): - if !ob.GetChainParams().IsSupported { + if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { continue } err := ob.ObserveIntxTrackers() diff --git a/zetaclient/zetacore_observer.go b/zetaclient/zetacore_observer.go index 34d0774507..415b5d5d21 100644 --- a/zetaclient/zetacore_observer.go +++ b/zetaclient/zetacore_observer.go @@ -5,21 +5,18 @@ import ( "math" "time" + sdkmath "cosmossdk.io/math" ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/rs/zerolog" "github.com/zeta-chain/zetacore/pkg/chains" + "github.com/zeta-chain/zetacore/x/crosschain/types" + observertypes "github.com/zeta-chain/zetacore/x/observer/types" appcontext "github.com/zeta-chain/zetacore/zetaclient/app_context" "github.com/zeta-chain/zetacore/zetaclient/bitcoin" corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" "github.com/zeta-chain/zetacore/zetaclient/interfaces" - "github.com/zeta-chain/zetacore/zetaclient/outtxprocessor" - - observertypes "github.com/zeta-chain/zetacore/x/observer/types" - - sdkmath "cosmossdk.io/math" - - "github.com/rs/zerolog" - "github.com/zeta-chain/zetacore/x/crosschain/types" "github.com/zeta-chain/zetacore/zetaclient/metrics" + "github.com/zeta-chain/zetacore/zetaclient/outtxprocessor" ) const ( @@ -154,8 +151,7 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) { co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId) continue } - if !ob.GetChainParams().IsSupported { - co.logger.ZetaChainWatcher.Info().Msgf("startCctxScheduler: chain %d is not supported", c.ChainId) + if !corecontext.IsOutboundObservationEnabled(coreContext, ob.GetChainParams()) { continue } @@ -178,6 +174,7 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) { continue } } + // update last processed block number lastBlockNum = bn metrics.LastCoreBlockNumber.Set(float64(lastBlockNum))