From b19de8b3e16d7131859809b7d57e57b9daf6b5ba Mon Sep 17 00:00:00 2001 From: Alex Gartner Date: Thu, 5 Dec 2024 12:22:52 -0800 Subject: [PATCH] move to zetaclient/zetacore client --- zetaclient/chains/interfaces/interfaces.go | 2 + zetaclient/orchestrator/orchestrator.go | 194 ++++++++---------- zetaclient/testutils/mocks/zetacore_client.go | 27 +++ zetaclient/zetacore/client.go | 28 ++- zetaclient/zetacore/client_cosmos.go | 14 +- zetaclient/zetacore/client_subscriptions.go | 39 ++++ zetaclient/zetacore/client_test.go | 16 +- 7 files changed, 196 insertions(+), 124 deletions(-) create mode 100644 zetaclient/zetacore/client_subscriptions.go diff --git a/zetaclient/chains/interfaces/interfaces.go b/zetaclient/chains/interfaces/interfaces.go index 8e2b8e2a3a..1cc0f1d11d 100644 --- a/zetaclient/chains/interfaces/interfaces.go +++ b/zetaclient/chains/interfaces/interfaces.go @@ -11,6 +11,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/wire" + cometbft_types "github.com/cometbft/cometbft/types" upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types" "github.com/ethereum/go-ethereum/accounts/abi/bind" ethcommon "github.com/ethereum/go-ethereum/common" @@ -145,6 +146,7 @@ type ZetacoreClient interface { GetUpgradePlan(ctx context.Context) (*upgradetypes.Plan, error) PostOutboundTracker(ctx context.Context, chainID int64, nonce uint64, txHash string) (string, error) + NewBlockSubscriber(ctx context.Context) (chan cometbft_types.EventDataNewBlock, error) } // BTCRPCClient is the interface for BTC RPC client diff --git a/zetaclient/orchestrator/orchestrator.go b/zetaclient/orchestrator/orchestrator.go index c29b41466a..6f0a45543c 100644 --- a/zetaclient/orchestrator/orchestrator.go +++ b/zetaclient/orchestrator/orchestrator.go @@ -289,122 +289,110 @@ func (oc *Orchestrator) runScheduler(ctx context.Context) error { return err } - observeTicker := time.NewTicker(3 * time.Second) - var lastBlockNum int64 + newBlockChan, err := oc.zetacoreClient.NewBlockSubscriber(ctx) + if err != nil { + return err + } + for { select { case <-oc.stop: oc.logger.Warn().Msg("runScheduler: stopped") return nil - case <-observeTicker.C: - { - bn, err := oc.zetacoreClient.GetBlockHeight(ctx) + case <-time.After(time.Second * 10): + // the subscription should automatically reconnect after zetacore + // restart, but we should log this just in case that logic is not + // working + oc.logger.Warn().Msg("runScheduler: no blocks after 10 seconds") + case newBlock := <-newBlockChan: + bn := newBlock.Block.Height + + balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx) + if err != nil { + oc.logger.Error().Err(err).Msgf("couldn't get operator balance") + } else { + diff := oc.lastOperatorBalance.Sub(balance) + if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) { + oc.ts.AddFeeEntry(bn, diff.Int64()) + oc.lastOperatorBalance = balance + } + } + + // set current hot key burn rate + metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64())) + + // get chain ids without zeta chain + chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) { + return c.ID(), !c.IsZeta() + }) + + // query pending cctxs across all external chains within rate limit + cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs) + if err != nil { + oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed") + } + + // schedule keysign for pending cctxs on each chain + for _, chain := range app.ListChains() { + // skip zeta chain + if chain.IsZeta() { + continue + } + + chainID := chain.ID() + + // update chain parameters for signer and chain observer + signer, err := oc.resolveSigner(app, chainID) if err != nil { - oc.logger.Error().Err(err).Msg("StartCctxScheduler: GetBlockHeight fail") + oc.logger.Error().Err(err). + Int64(logs.FieldChain, chainID). + Msg("runScheduler: unable to resolve signer") continue } - if bn < 0 { - oc.logger.Error().Msg("runScheduler: GetBlockHeight returned negative height") + + ob, err := oc.resolveObserver(app, chainID) + if err != nil { + oc.logger.Error().Err(err). + Int64(logs.FieldChain, chainID). + Msg("runScheduler: unable to resolve observer") continue } - if lastBlockNum == 0 { - lastBlockNum = bn - 1 + + // get cctxs from map and set pending transactions prometheus gauge + cctxList := cctxMap[chainID] + + metrics.PendingTxsPerChain. + WithLabelValues(chain.Name()). + Set(float64(len(cctxList))) + + if len(cctxList) == 0 { + continue } - if bn > lastBlockNum { // we have a new block - bn = lastBlockNum + 1 - if bn%10 == 0 { - oc.logger.Debug().Msgf("runScheduler: zetacore heart beat: %d", bn) - } - - balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx) - if err != nil { - oc.logger.Error().Err(err).Msgf("couldn't get operator balance") - } else { - diff := oc.lastOperatorBalance.Sub(balance) - if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) { - oc.ts.AddFeeEntry(bn, diff.Int64()) - oc.lastOperatorBalance = balance - } - } - - // set current hot key burn rate - metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64())) - - // get chain ids without zeta chain - chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) { - return c.ID(), !c.IsZeta() - }) - - // query pending cctxs across all external chains within rate limit - cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs) - if err != nil { - oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed") - } - - // schedule keysign for pending cctxs on each chain - for _, chain := range app.ListChains() { - // skip zeta chain - if chain.IsZeta() { - continue - } - - chainID := chain.ID() - - // update chain parameters for signer and chain observer - signer, err := oc.resolveSigner(app, chainID) - if err != nil { - oc.logger.Error().Err(err). - Int64(logs.FieldChain, chainID). - Msg("runScheduler: unable to resolve signer") - continue - } - - ob, err := oc.resolveObserver(app, chainID) - if err != nil { - oc.logger.Error().Err(err). - Int64(logs.FieldChain, chainID). - Msg("runScheduler: unable to resolve observer") - continue - } - - // get cctxs from map and set pending transactions prometheus gauge - cctxList := cctxMap[chainID] - - metrics.PendingTxsPerChain. - WithLabelValues(chain.Name()). - Set(float64(len(cctxList))) - - if len(cctxList) == 0 { - continue - } - - if !app.IsOutboundObservationEnabled() { - continue - } - - // #nosec G115 range is verified - zetaHeight := uint64(bn) - - switch { - case chain.IsEVM(): - oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer) - case chain.IsBitcoin(): - oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer) - case chain.IsSolana(): - oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer) - case chain.IsTON(): - oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer) - default: - oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID) - continue - } - } - - // update last processed block number - lastBlockNum = bn - oc.ts.SetCoreBlockNumber(lastBlockNum) + + if !app.IsOutboundObservationEnabled() { + continue + } + + // #nosec G115 range is verified + zetaHeight := uint64(bn) + + switch { + case chain.IsEVM(): + oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer) + case chain.IsBitcoin(): + oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer) + case chain.IsSolana(): + oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer) + case chain.IsTON(): + oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer) + default: + oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID) + continue } } + + // update last processed block number + oc.ts.SetCoreBlockNumber(bn) } } } diff --git a/zetaclient/testutils/mocks/zetacore_client.go b/zetaclient/testutils/mocks/zetacore_client.go index dd507ad5b5..0ca1681c85 100644 --- a/zetaclient/testutils/mocks/zetacore_client.go +++ b/zetaclient/testutils/mocks/zetacore_client.go @@ -23,6 +23,7 @@ import ( upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types" zerolog "github.com/rs/zerolog" + cometbft_types "github.com/cometbft/cometbft/types" ) // ZetacoreClient is an autogenerated mock type for the ZetacoreClient type @@ -891,6 +892,32 @@ func (_m *ZetacoreClient) PostVoteTSS(ctx context.Context, tssPubKey string, key return r0, r1 } +// NewBlockSubscriber provides a mock function with given fields: ctx +func (_m *ZetacoreClient) NewBlockSubscriber(ctx context.Context) (chan cometbft_types.EventDataNewBlock, error) { + ret := _m.Called(ctx) + + var r0 chan cometbft_types.EventDataNewBlock + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (chan cometbft_types.EventDataNewBlock, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) chan cometbft_types.EventDataNewBlock); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(chan cometbft_types.EventDataNewBlock) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // NewZetacoreClient creates a new instance of ZetacoreClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewZetacoreClient(t interface { diff --git a/zetaclient/zetacore/client.go b/zetaclient/zetacore/client.go index 65078a72cb..454c02fbf6 100644 --- a/zetaclient/zetacore/client.go +++ b/zetaclient/zetacore/client.go @@ -6,7 +6,8 @@ import ( "strings" "sync" - rpchttp "github.com/cometbft/cometbft/rpc/client/http" + cometbft_rpc_client "github.com/cometbft/cometbft/rpc/client" + cometbft_http_client "github.com/cometbft/cometbft/rpc/client/http" cosmosclient "github.com/cosmos/cosmos-sdk/client" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" "github.com/pkg/errors" @@ -35,6 +36,7 @@ type Client struct { config config.ClientConfiguration cosmosClientContext cosmosclient.Context + cometBFTClient cometbft_rpc_client.Client blockHeight int64 accountNumber map[authz.KeyType]uint64 @@ -52,7 +54,7 @@ var unsecureGRPC = grpc.WithTransportCredentials(insecure.NewCredentials()) type constructOpts struct { customTendermint bool - tendermintClient cosmosclient.TendermintRPC + tendermintClient cometbft_rpc_client.Client customAccountRetriever bool accountRetriever cosmosclient.AccountRetriever @@ -61,7 +63,7 @@ type constructOpts struct { type Opt func(cfg *constructOpts) // WithTendermintClient sets custom tendermint client -func WithTendermintClient(client cosmosclient.TendermintRPC) Opt { +func WithTendermintClient(client cometbft_rpc_client.Client) Opt { return func(c *constructOpts) { c.customTendermint = true c.tendermintClient = client @@ -123,12 +125,30 @@ func NewClient( return nil, errors.Wrap(err, "unable to build cosmos client context") } + cometBFTClientIface := constructOptions.tendermintClient + + // create a cometbft client if one was not provided in the constructOptions + if !constructOptions.customTendermint { + cometBFTURL := "http://" + tendermintRPC(chainIP) + cometBFTClient, err := cometbft_http_client.New(cometBFTURL, "/websocket") + if err != nil { + return nil, errors.Wrapf(err, "new cometbft client (%s)", cometBFTURL) + } + // start websockets + err = cometBFTClient.WSEvents.Start() + if err != nil { + return nil, errors.Wrap(err, "cometbft start") + } + cometBFTClientIface = cometBFTClient + } + return &Client{ Clients: zetacoreClients, logger: log, config: cfg, cosmosClientContext: cosmosContext, + cometBFTClient: cometBFTClientIface, accountNumber: accountsMap, seqNumber: seqMap, @@ -178,7 +198,7 @@ func buildCosmosClientContext( remote = fmt.Sprintf("tcp://%s", remote) } - wsClient, err := rpchttp.New(remote, "/websocket") + wsClient, err := cometbft_http_client.New(remote, "/websocket") if err != nil { return cosmosclient.Context{}, err } diff --git a/zetaclient/zetacore/client_cosmos.go b/zetaclient/zetacore/client_cosmos.go index fe5c3dbf1d..5dce0d1c12 100644 --- a/zetaclient/zetacore/client_cosmos.go +++ b/zetaclient/zetacore/client_cosmos.go @@ -2,10 +2,8 @@ package zetacore import ( "context" - "fmt" sdkmath "cosmossdk.io/math" - tmhttp "github.com/cometbft/cometbft/rpc/client/http" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" genutiltypes "github.com/cosmos/cosmos-sdk/x/genutil/types" "github.com/pkg/errors" @@ -16,17 +14,7 @@ import ( // GetGenesisSupply returns the genesis supply. // NOTE that this method is brittle as it uses STATEFUL connection func (c *Client) GetGenesisSupply(ctx context.Context) (sdkmath.Int, error) { - tmURL := fmt.Sprintf("http://%s", c.config.ChainRPC) - - s, err := tmhttp.New(tmURL, "/websocket") - if err != nil { - return sdkmath.ZeroInt(), errors.Wrap(err, "failed to create tm client") - } - - // nolint:errcheck - defer s.Stop() - - res, err := s.Genesis(ctx) + res, err := c.cometBFTClient.Genesis(ctx) if err != nil { return sdkmath.ZeroInt(), errors.Wrap(err, "failed to get genesis") } diff --git a/zetaclient/zetacore/client_subscriptions.go b/zetaclient/zetacore/client_subscriptions.go new file mode 100644 index 0000000000..36f2aee2fa --- /dev/null +++ b/zetaclient/zetacore/client_subscriptions.go @@ -0,0 +1,39 @@ +package zetacore + +import ( + "context" + + cometbft_types "github.com/cometbft/cometbft/types" +) + +const ( + newBlockSubscriptionFilter = "tm.event='NewBlock'" +) + +// NewBlockSubscriber subscribes to cometbft new block events +func (c *Client) NewBlockSubscriber(ctx context.Context) (chan cometbft_types.EventDataNewBlock, error) { + rawBlockEventChan, err := c.cometBFTClient.Subscribe(ctx, "", newBlockSubscriptionFilter) + if err != nil { + return nil, err + } + + blockEventChan := make(chan cometbft_types.EventDataNewBlock) + + go func() { + for { + select { + case <-ctx.Done(): + return + case event := <-rawBlockEventChan: + newBlockEvent, ok := event.Data.(cometbft_types.EventDataNewBlock) + if !ok { + c.logger.Error().Msgf("expecting new block event, got %T", event.Data) + continue + } + blockEventChan <- newBlockEvent + } + } + }() + + return blockEventChan, nil +} diff --git a/zetaclient/zetacore/client_test.go b/zetaclient/zetacore/client_test.go index 6392fd9dba..940df90c0b 100644 --- a/zetaclient/zetacore/client_test.go +++ b/zetaclient/zetacore/client_test.go @@ -6,7 +6,6 @@ import ( "testing" abci "github.com/cometbft/cometbft/abci/types" - cosmosclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/testutil/mock" "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/query" @@ -22,6 +21,7 @@ import ( "go.nhat.io/grpcmock" "go.nhat.io/grpcmock/planner" + cometbft_rpc_client "github.com/cometbft/cometbft/rpc/client" "github.com/zeta-chain/node/cmd/zetacored/config" crosschaintypes "github.com/zeta-chain/node/x/crosschain/types" "github.com/zeta-chain/node/zetaclient/keys" @@ -111,7 +111,7 @@ func withDefaultObserverKeys() clientTestOpt { return withObserverKeys(keys.NewKeysWithKeybase(keyRing, address, testSigner, "")) } -func withTendermint(client cosmosclient.TendermintRPC) clientTestOpt { +func withTendermint(client cometbft_rpc_client.Client) clientTestOpt { return func(cfg *clientTestConfig) { cfg.opts = append(cfg.opts, WithTendermintClient(client)) } } @@ -177,7 +177,11 @@ func TestZetacore_GetZetaHotKeyBalance(t *testing.T) { method := "/cosmos.bank.v1beta1.Query/Balance" setupMockServer(t, banktypes.RegisterQueryServer, method, input, expectedOutput) - client := setupZetacoreClient(t, withDefaultObserverKeys()) + client := setupZetacoreClient( + t, + withDefaultObserverKeys(), + withTendermint(mocks.NewSDKClientWithErr(t, nil, 0)), + ) // should be able to get balance of signer client.keys = keys.NewKeysWithKeybase(mocks.NewKeyring(), types.AccAddress{}, "bob", "") @@ -219,7 +223,11 @@ func TestZetacore_GetAllOutboundTrackerByChain(t *testing.T) { method := "/zetachain.zetacore.crosschain.Query/OutboundTrackerAllByChain" setupMockServer(t, crosschaintypes.RegisterQueryServer, method, input, expectedOutput) - client := setupZetacoreClient(t, withDefaultObserverKeys()) + client := setupZetacoreClient( + t, + withDefaultObserverKeys(), + withTendermint(mocks.NewSDKClientWithErr(t, nil, 0)), + ) resp, err := client.GetAllOutboundTrackerByChain(ctx, chain.ChainId, interfaces.Ascending) require.NoError(t, err)