From 376b7143ffb38c6df4d0efade5438bfd45675f9e Mon Sep 17 00:00:00 2001 From: Alex Gartner Date: Tue, 10 Dec 2024 11:28:16 -0800 Subject: [PATCH] refactor(zetaclient): subscribe to new blocks in scheduler (#3228) * move to zetaclient/zetacore client * add SubscribeNewBlocks test coverage * add core_block_time_latency metric * remove underscores from imports * skip block notification if latency is too high * skip precompiles in performance tests --- cmd/zetae2e/local/local.go | 1 + cmd/zetae2e/local/monitor_block_production.go | 6 +- zetaclient/chains/interfaces/interfaces.go | 2 + zetaclient/metrics/metrics.go | 8 + zetaclient/orchestrator/orchestrator.go | 205 +++++++++--------- zetaclient/testutils/mocks/cometbft_client.go | 32 ++- zetaclient/testutils/mocks/zetacore_client.go | 27 +++ zetaclient/zetacore/client.go | 34 ++- zetaclient/zetacore/client_cosmos.go | 14 +- zetaclient/zetacore/client_subscriptions.go | 35 +++ zetaclient/zetacore/client_test.go | 46 +++- 11 files changed, 276 insertions(+), 134 deletions(-) create mode 100644 zetaclient/zetacore/client_subscriptions.go diff --git a/cmd/zetae2e/local/local.go b/cmd/zetae2e/local/local.go index b8e11048cb..0ebb6244f9 100644 --- a/cmd/zetae2e/local/local.go +++ b/cmd/zetae2e/local/local.go @@ -125,6 +125,7 @@ func localE2ETest(cmd *cobra.Command, _ []string) { if testPerformance { logger.Print("⚠️ performance tests enabled, regular tests will be skipped") skipRegular = true + skipPrecompiles = true } // start timer diff --git a/cmd/zetae2e/local/monitor_block_production.go b/cmd/zetae2e/local/monitor_block_production.go index 89de03012e..801dfc921b 100644 --- a/cmd/zetae2e/local/monitor_block_production.go +++ b/cmd/zetae2e/local/monitor_block_production.go @@ -7,7 +7,7 @@ import ( "time" rpchttp "github.com/cometbft/cometbft/rpc/client/http" - cometbft_types "github.com/cometbft/cometbft/types" + cometbfttypes "github.com/cometbft/cometbft/types" "github.com/zeta-chain/node/e2e/config" ) @@ -37,11 +37,11 @@ func monitorBlockProduction(ctx context.Context, conf config.Config) error { if err != nil { return fmt.Errorf("subscribe: %w", err) } - latestNewBlockEvent := cometbft_types.EventDataNewBlock{} + latestNewBlockEvent := cometbfttypes.EventDataNewBlock{} for { select { case event := <-blockEventChan: - newBlockEvent, ok := event.Data.(cometbft_types.EventDataNewBlock) + newBlockEvent, ok := event.Data.(cometbfttypes.EventDataNewBlock) if !ok { return fmt.Errorf("expecting new block event, got %T", event.Data) } diff --git a/zetaclient/chains/interfaces/interfaces.go b/zetaclient/chains/interfaces/interfaces.go index 8e2b8e2a3a..cd195912bb 100644 --- a/zetaclient/chains/interfaces/interfaces.go +++ b/zetaclient/chains/interfaces/interfaces.go @@ -11,6 +11,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/wire" + cometbfttypes "github.com/cometbft/cometbft/types" upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types" "github.com/ethereum/go-ethereum/accounts/abi/bind" ethcommon "github.com/ethereum/go-ethereum/common" @@ -145,6 +146,7 @@ type ZetacoreClient interface { GetUpgradePlan(ctx context.Context) (*upgradetypes.Plan, error) PostOutboundTracker(ctx context.Context, chainID int64, nonce uint64, txHash string) (string, error) + NewBlockSubscriber(ctx context.Context) (chan cometbfttypes.EventDataNewBlock, error) } // BTCRPCClient is the interface for BTC RPC client diff --git a/zetaclient/metrics/metrics.go b/zetaclient/metrics/metrics.go index ddd8b0aa3f..e614cbf676 100644 --- a/zetaclient/metrics/metrics.go +++ b/zetaclient/metrics/metrics.go @@ -85,6 +85,14 @@ var ( Help: "Last core block number", }) + // CoreBlockLatency is a gauge that measures the difference between system time and + // block time from zetacore + CoreBlockLatency = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: ZetaClientNamespace, + Name: "core_block_latency", + Help: "Difference between system time and block time from zetacore", + }) + // Info is a gauge that contains information about the zetaclient environment Info = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: ZetaClientNamespace, diff --git a/zetaclient/orchestrator/orchestrator.go b/zetaclient/orchestrator/orchestrator.go index c29b41466a..2e73131357 100644 --- a/zetaclient/orchestrator/orchestrator.go +++ b/zetaclient/orchestrator/orchestrator.go @@ -289,122 +289,121 @@ func (oc *Orchestrator) runScheduler(ctx context.Context) error { return err } - observeTicker := time.NewTicker(3 * time.Second) - var lastBlockNum int64 + newBlockChan, err := oc.zetacoreClient.NewBlockSubscriber(ctx) + if err != nil { + return err + } + for { select { case <-oc.stop: oc.logger.Warn().Msg("runScheduler: stopped") return nil - case <-observeTicker.C: - { - bn, err := oc.zetacoreClient.GetBlockHeight(ctx) + case <-time.After(time.Second * 10): + // the subscription should automatically reconnect after zetacore + // restart, but we should log this just in case that logic is not + // working + oc.logger.Warn().Msg("runScheduler: no blocks after 10 seconds") + case newBlock := <-newBlockChan: + bn := newBlock.Block.Height + + blockTimeLatency := time.Since(newBlock.Block.Time) + blockTimeLatencySeconds := blockTimeLatency.Seconds() + metrics.CoreBlockLatency.Set(blockTimeLatencySeconds) + + if blockTimeLatencySeconds > 15 { + oc.logger.Warn(). + Float64("latency", blockTimeLatencySeconds). + Msgf("runScheduler: core block latency too high") + continue + } + + balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx) + if err != nil { + oc.logger.Error().Err(err).Msgf("couldn't get operator balance") + } else { + diff := oc.lastOperatorBalance.Sub(balance) + if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) { + oc.ts.AddFeeEntry(bn, diff.Int64()) + oc.lastOperatorBalance = balance + } + } + + // set current hot key burn rate + metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64())) + + // get chain ids without zeta chain + chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) { + return c.ID(), !c.IsZeta() + }) + + // query pending cctxs across all external chains within rate limit + cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs) + if err != nil { + oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed") + } + + // schedule keysign for pending cctxs on each chain + for _, chain := range app.ListChains() { + // skip zeta chain + if chain.IsZeta() { + continue + } + + chainID := chain.ID() + + // update chain parameters for signer and chain observer + signer, err := oc.resolveSigner(app, chainID) if err != nil { - oc.logger.Error().Err(err).Msg("StartCctxScheduler: GetBlockHeight fail") + oc.logger.Error().Err(err). + Int64(logs.FieldChain, chainID). + Msg("runScheduler: unable to resolve signer") continue } - if bn < 0 { - oc.logger.Error().Msg("runScheduler: GetBlockHeight returned negative height") + + ob, err := oc.resolveObserver(app, chainID) + if err != nil { + oc.logger.Error().Err(err). + Int64(logs.FieldChain, chainID). + Msg("runScheduler: unable to resolve observer") + continue + } + + // get cctxs from map and set pending transactions prometheus gauge + cctxList := cctxMap[chainID] + + metrics.PendingTxsPerChain. + WithLabelValues(chain.Name()). + Set(float64(len(cctxList))) + + if len(cctxList) == 0 { continue } - if lastBlockNum == 0 { - lastBlockNum = bn - 1 + + if !app.IsOutboundObservationEnabled() { + continue } - if bn > lastBlockNum { // we have a new block - bn = lastBlockNum + 1 - if bn%10 == 0 { - oc.logger.Debug().Msgf("runScheduler: zetacore heart beat: %d", bn) - } - - balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx) - if err != nil { - oc.logger.Error().Err(err).Msgf("couldn't get operator balance") - } else { - diff := oc.lastOperatorBalance.Sub(balance) - if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) { - oc.ts.AddFeeEntry(bn, diff.Int64()) - oc.lastOperatorBalance = balance - } - } - - // set current hot key burn rate - metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64())) - - // get chain ids without zeta chain - chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) { - return c.ID(), !c.IsZeta() - }) - - // query pending cctxs across all external chains within rate limit - cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs) - if err != nil { - oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed") - } - - // schedule keysign for pending cctxs on each chain - for _, chain := range app.ListChains() { - // skip zeta chain - if chain.IsZeta() { - continue - } - - chainID := chain.ID() - - // update chain parameters for signer and chain observer - signer, err := oc.resolveSigner(app, chainID) - if err != nil { - oc.logger.Error().Err(err). - Int64(logs.FieldChain, chainID). - Msg("runScheduler: unable to resolve signer") - continue - } - - ob, err := oc.resolveObserver(app, chainID) - if err != nil { - oc.logger.Error().Err(err). - Int64(logs.FieldChain, chainID). - Msg("runScheduler: unable to resolve observer") - continue - } - - // get cctxs from map and set pending transactions prometheus gauge - cctxList := cctxMap[chainID] - - metrics.PendingTxsPerChain. - WithLabelValues(chain.Name()). - Set(float64(len(cctxList))) - - if len(cctxList) == 0 { - continue - } - - if !app.IsOutboundObservationEnabled() { - continue - } - - // #nosec G115 range is verified - zetaHeight := uint64(bn) - - switch { - case chain.IsEVM(): - oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer) - case chain.IsBitcoin(): - oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer) - case chain.IsSolana(): - oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer) - case chain.IsTON(): - oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer) - default: - oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID) - continue - } - } - - // update last processed block number - lastBlockNum = bn - oc.ts.SetCoreBlockNumber(lastBlockNum) + + // #nosec G115 range is verified + zetaHeight := uint64(bn) + + switch { + case chain.IsEVM(): + oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer) + case chain.IsBitcoin(): + oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer) + case chain.IsSolana(): + oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer) + case chain.IsTON(): + oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer) + default: + oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID) + continue } } + + // update last processed block number + oc.ts.SetCoreBlockNumber(bn) } } } diff --git a/zetaclient/testutils/mocks/cometbft_client.go b/zetaclient/testutils/mocks/cometbft_client.go index dcb452621d..daa37a9467 100644 --- a/zetaclient/testutils/mocks/cometbft_client.go +++ b/zetaclient/testutils/mocks/cometbft_client.go @@ -3,6 +3,7 @@ package mocks import ( "context" "encoding/hex" + "sync" "testing" abci "github.com/cometbft/cometbft/abci/types" @@ -20,6 +21,9 @@ type CometBFTClient struct { err error code uint32 txHash bytes.HexBytes + + subscribers map[string]chan<- coretypes.ResultEvent + subscribersLock sync.Mutex } func (c *CometBFTClient) BroadcastTxCommit( @@ -82,11 +86,31 @@ func (c *CometBFTClient) SetError(err error) *CometBFTClient { return c } +// PublishToSubscribers will publish an event to all subscribers (mock only) +func (c *CometBFTClient) PublishToSubscribers(event coretypes.ResultEvent) { + c.subscribersLock.Lock() + defer c.subscribersLock.Unlock() + for _, ch := range c.subscribers { + ch <- event + } +} + +func (c *CometBFTClient) Subscribe( + _ context.Context, + subscriber, _ string, + _ ...int, +) (out <-chan coretypes.ResultEvent, err error) { + outChan := make(chan coretypes.ResultEvent) + c.subscribers[subscriber] = outChan + return outChan, nil +} + func NewSDKClientWithErr(t *testing.T, err error, code uint32) *CometBFTClient { return &CometBFTClient{ - t: t, - Client: mock.Client{}, - err: err, - code: code, + t: t, + Client: mock.Client{}, + err: err, + code: code, + subscribers: make(map[string]chan<- coretypes.ResultEvent), } } diff --git a/zetaclient/testutils/mocks/zetacore_client.go b/zetaclient/testutils/mocks/zetacore_client.go index dd507ad5b5..fa5b34486b 100644 --- a/zetaclient/testutils/mocks/zetacore_client.go +++ b/zetaclient/testutils/mocks/zetacore_client.go @@ -23,6 +23,7 @@ import ( upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types" zerolog "github.com/rs/zerolog" + cometbfttypes "github.com/cometbft/cometbft/types" ) // ZetacoreClient is an autogenerated mock type for the ZetacoreClient type @@ -891,6 +892,32 @@ func (_m *ZetacoreClient) PostVoteTSS(ctx context.Context, tssPubKey string, key return r0, r1 } +// NewBlockSubscriber provides a mock function with given fields: ctx +func (_m *ZetacoreClient) NewBlockSubscriber(ctx context.Context) (chan cometbfttypes.EventDataNewBlock, error) { + ret := _m.Called(ctx) + + var r0 chan cometbfttypes.EventDataNewBlock + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (chan cometbfttypes.EventDataNewBlock, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) chan cometbfttypes.EventDataNewBlock); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(chan cometbfttypes.EventDataNewBlock) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // NewZetacoreClient creates a new instance of ZetacoreClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewZetacoreClient(t interface { diff --git a/zetaclient/zetacore/client.go b/zetaclient/zetacore/client.go index 65078a72cb..de54435c7e 100644 --- a/zetaclient/zetacore/client.go +++ b/zetaclient/zetacore/client.go @@ -6,7 +6,8 @@ import ( "strings" "sync" - rpchttp "github.com/cometbft/cometbft/rpc/client/http" + cometbftrpc "github.com/cometbft/cometbft/rpc/client" + cometbfthttp "github.com/cometbft/cometbft/rpc/client/http" cosmosclient "github.com/cosmos/cosmos-sdk/client" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" "github.com/pkg/errors" @@ -18,7 +19,7 @@ import ( "github.com/zeta-chain/node/app" "github.com/zeta-chain/node/pkg/authz" "github.com/zeta-chain/node/pkg/chains" - zetacore_rpc "github.com/zeta-chain/node/pkg/rpc" + zetacorerpc "github.com/zeta-chain/node/pkg/rpc" "github.com/zeta-chain/node/zetaclient/chains/interfaces" "github.com/zeta-chain/node/zetaclient/config" keyinterfaces "github.com/zeta-chain/node/zetaclient/keys/interfaces" @@ -29,12 +30,13 @@ var _ interfaces.ZetacoreClient = &Client{} // Client is the client to send tx to zetacore type Client struct { - zetacore_rpc.Clients + zetacorerpc.Clients logger zerolog.Logger config config.ClientConfiguration cosmosClientContext cosmosclient.Context + cometBFTClient cometbftrpc.Client blockHeight int64 accountNumber map[authz.KeyType]uint64 @@ -52,7 +54,7 @@ var unsecureGRPC = grpc.WithTransportCredentials(insecure.NewCredentials()) type constructOpts struct { customTendermint bool - tendermintClient cosmosclient.TendermintRPC + tendermintClient cometbftrpc.Client customAccountRetriever bool accountRetriever cosmosclient.AccountRetriever @@ -61,7 +63,7 @@ type constructOpts struct { type Opt func(cfg *constructOpts) // WithTendermintClient sets custom tendermint client -func WithTendermintClient(client cosmosclient.TendermintRPC) Opt { +func WithTendermintClient(client cometbftrpc.Client) Opt { return func(c *constructOpts) { c.customTendermint = true c.tendermintClient = client @@ -106,7 +108,7 @@ func NewClient( encodingCfg := app.MakeEncodingConfig() - zetacoreClients, err := zetacore_rpc.NewGRPCClients(cosmosGRPC(chainIP), unsecureGRPC) + zetacoreClients, err := zetacorerpc.NewGRPCClients(cosmosGRPC(chainIP), unsecureGRPC) if err != nil { return nil, errors.Wrap(err, "grpc dial fail") } @@ -123,12 +125,30 @@ func NewClient( return nil, errors.Wrap(err, "unable to build cosmos client context") } + cometBFTClientIface := constructOptions.tendermintClient + + // create a cometbft client if one was not provided in the constructOptions + if !constructOptions.customTendermint { + cometBFTURL := "http://" + tendermintRPC(chainIP) + cometBFTClient, err := cometbfthttp.New(cometBFTURL, "/websocket") + if err != nil { + return nil, errors.Wrapf(err, "new cometbft client (%s)", cometBFTURL) + } + // start websockets + err = cometBFTClient.WSEvents.Start() + if err != nil { + return nil, errors.Wrap(err, "cometbft start") + } + cometBFTClientIface = cometBFTClient + } + return &Client{ Clients: zetacoreClients, logger: log, config: cfg, cosmosClientContext: cosmosContext, + cometBFTClient: cometBFTClientIface, accountNumber: accountsMap, seqNumber: seqMap, @@ -178,7 +198,7 @@ func buildCosmosClientContext( remote = fmt.Sprintf("tcp://%s", remote) } - wsClient, err := rpchttp.New(remote, "/websocket") + wsClient, err := cometbfthttp.New(remote, "/websocket") if err != nil { return cosmosclient.Context{}, err } diff --git a/zetaclient/zetacore/client_cosmos.go b/zetaclient/zetacore/client_cosmos.go index fe5c3dbf1d..5dce0d1c12 100644 --- a/zetaclient/zetacore/client_cosmos.go +++ b/zetaclient/zetacore/client_cosmos.go @@ -2,10 +2,8 @@ package zetacore import ( "context" - "fmt" sdkmath "cosmossdk.io/math" - tmhttp "github.com/cometbft/cometbft/rpc/client/http" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" genutiltypes "github.com/cosmos/cosmos-sdk/x/genutil/types" "github.com/pkg/errors" @@ -16,17 +14,7 @@ import ( // GetGenesisSupply returns the genesis supply. // NOTE that this method is brittle as it uses STATEFUL connection func (c *Client) GetGenesisSupply(ctx context.Context) (sdkmath.Int, error) { - tmURL := fmt.Sprintf("http://%s", c.config.ChainRPC) - - s, err := tmhttp.New(tmURL, "/websocket") - if err != nil { - return sdkmath.ZeroInt(), errors.Wrap(err, "failed to create tm client") - } - - // nolint:errcheck - defer s.Stop() - - res, err := s.Genesis(ctx) + res, err := c.cometBFTClient.Genesis(ctx) if err != nil { return sdkmath.ZeroInt(), errors.Wrap(err, "failed to get genesis") } diff --git a/zetaclient/zetacore/client_subscriptions.go b/zetaclient/zetacore/client_subscriptions.go new file mode 100644 index 0000000000..cb4229b31b --- /dev/null +++ b/zetaclient/zetacore/client_subscriptions.go @@ -0,0 +1,35 @@ +package zetacore + +import ( + "context" + + cometbfttypes "github.com/cometbft/cometbft/types" +) + +// NewBlockSubscriber subscribes to cometbft new block events +func (c *Client) NewBlockSubscriber(ctx context.Context) (chan cometbfttypes.EventDataNewBlock, error) { + rawBlockEventChan, err := c.cometBFTClient.Subscribe(ctx, "", cometbfttypes.EventQueryNewBlock.String()) + if err != nil { + return nil, err + } + + blockEventChan := make(chan cometbfttypes.EventDataNewBlock) + + go func() { + for { + select { + case <-ctx.Done(): + return + case event := <-rawBlockEventChan: + newBlockEvent, ok := event.Data.(cometbfttypes.EventDataNewBlock) + if !ok { + c.logger.Error().Msgf("expecting new block event, got %T", event.Data) + continue + } + blockEventChan <- newBlockEvent + } + } + }() + + return blockEventChan, nil +} diff --git a/zetaclient/zetacore/client_test.go b/zetaclient/zetacore/client_test.go index 6392fd9dba..f1fe5a2526 100644 --- a/zetaclient/zetacore/client_test.go +++ b/zetaclient/zetacore/client_test.go @@ -6,7 +6,6 @@ import ( "testing" abci "github.com/cometbft/cometbft/abci/types" - cosmosclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/testutil/mock" "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/query" @@ -22,6 +21,9 @@ import ( "go.nhat.io/grpcmock" "go.nhat.io/grpcmock/planner" + cometbftrpc "github.com/cometbft/cometbft/rpc/client" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + cometbfttypes "github.com/cometbft/cometbft/types" "github.com/zeta-chain/node/cmd/zetacored/config" crosschaintypes "github.com/zeta-chain/node/x/crosschain/types" "github.com/zeta-chain/node/zetaclient/keys" @@ -111,7 +113,7 @@ func withDefaultObserverKeys() clientTestOpt { return withObserverKeys(keys.NewKeysWithKeybase(keyRing, address, testSigner, "")) } -func withTendermint(client cosmosclient.TendermintRPC) clientTestOpt { +func withTendermint(client cometbftrpc.Client) clientTestOpt { return func(cfg *clientTestConfig) { cfg.opts = append(cfg.opts, WithTendermintClient(client)) } } @@ -177,7 +179,11 @@ func TestZetacore_GetZetaHotKeyBalance(t *testing.T) { method := "/cosmos.bank.v1beta1.Query/Balance" setupMockServer(t, banktypes.RegisterQueryServer, method, input, expectedOutput) - client := setupZetacoreClient(t, withDefaultObserverKeys()) + client := setupZetacoreClient( + t, + withDefaultObserverKeys(), + withTendermint(mocks.NewSDKClientWithErr(t, nil, 0)), + ) // should be able to get balance of signer client.keys = keys.NewKeysWithKeybase(mocks.NewKeyring(), types.AccAddress{}, "bob", "") @@ -219,7 +225,11 @@ func TestZetacore_GetAllOutboundTrackerByChain(t *testing.T) { method := "/zetachain.zetacore.crosschain.Query/OutboundTrackerAllByChain" setupMockServer(t, crosschaintypes.RegisterQueryServer, method, input, expectedOutput) - client := setupZetacoreClient(t, withDefaultObserverKeys()) + client := setupZetacoreClient( + t, + withDefaultObserverKeys(), + withTendermint(mocks.NewSDKClientWithErr(t, nil, 0)), + ) resp, err := client.GetAllOutboundTrackerByChain(ctx, chain.ChainId, interfaces.Ascending) require.NoError(t, err) @@ -229,3 +239,31 @@ func TestZetacore_GetAllOutboundTrackerByChain(t *testing.T) { require.NoError(t, err) require.Equal(t, expectedOutput.OutboundTracker, resp) } + +func TestZetacore_SubscribeNewBlocks(t *testing.T) { + ctx := context.Background() + cometBFTClient := mocks.NewSDKClientWithErr(t, nil, 0) + client := setupZetacoreClient( + t, + withDefaultObserverKeys(), + withTendermint(cometBFTClient), + ) + + newBlockChan, err := client.NewBlockSubscriber(ctx) + require.NoError(t, err) + + height := int64(10) + + cometBFTClient.PublishToSubscribers(coretypes.ResultEvent{ + Data: cometbfttypes.EventDataNewBlock{ + Block: &cometbfttypes.Block{ + Header: cometbfttypes.Header{ + Height: height, + }, + }, + }, + }) + + newBlockEvent := <-newBlockChan + require.Equal(t, height, newBlockEvent.Block.Header.Height) +}