From 276e5ad424c9ce9fa0620a5839b04ecfced5580f Mon Sep 17 00:00:00 2001 From: Alex Gartner Date: Wed, 27 Nov 2024 12:20:13 -0800 Subject: [PATCH] refactor(zetaclient): subscribe to new blocks in scheduler --- cmd/zetaclientd/start.go | 14 ++ zetaclient/orchestrator/orchestrator.go | 205 +++++++++---------- zetaclient/orchestrator/orchestrator_test.go | 2 + 3 files changed, 118 insertions(+), 103 deletions(-) diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index 76a3c0808c..24a813affe 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "net/http" _ "net/http/pprof" // #nosec G108 -- pprof enablement is intentional "os" @@ -10,6 +11,7 @@ import ( "strings" "syscall" + cometbft_client "github.com/cometbft/cometbft/rpc/client/http" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -217,12 +219,24 @@ func Start(_ *cobra.Command, _ []string) error { return errors.Wrap(err, "unable to create chain observer map") } + cometbftURL := fmt.Sprintf("http://%s:%d", cfg.ZetaCoreURL, 26657) + cometbftClient, err := cometbft_client.New(cometbftURL, "/websocket") + if err != nil { + return errors.Wrapf(err, "new cometbft client (%s)", cometbftURL) + } + // start websockets + err = cometbftClient.WSEvents.Start() + if err != nil { + return errors.Wrap(err, "cometbft start") + } + // Orchestrator wraps the zetacore client and adds the observers and signer maps to it. // This is the high level object used for CCTX interactions // It also handles background configuration updates from zetacore maestro, err := orchestrator.New( ctx, zetacoreClient, + cometbftClient, signerMap, observerMap, tss, diff --git a/zetaclient/orchestrator/orchestrator.go b/zetaclient/orchestrator/orchestrator.go index c29b41466a..a6db903875 100644 --- a/zetaclient/orchestrator/orchestrator.go +++ b/zetaclient/orchestrator/orchestrator.go @@ -9,6 +9,8 @@ import ( "time" sdkmath "cosmossdk.io/math" + cometbft_rpc_client "github.com/cometbft/cometbft/rpc/client" + cometbft_types "github.com/cometbft/cometbft/types" eth "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" "github.com/rs/zerolog" @@ -51,6 +53,9 @@ type Orchestrator struct { // zetacore client zetacoreClient interfaces.ZetacoreClient + // cometbft client + cometbftClient cometbft_rpc_client.Client + // signerMap contains the chain signers indexed by chainID signerMap map[int64]interfaces.ChainSigner @@ -84,6 +89,7 @@ type multiLogger struct { func New( ctx context.Context, client interfaces.ZetacoreClient, + cometbftClient cometbft_rpc_client.Client, signerMap map[int64]interfaces.ChainSigner, observerMap map[int64]interfaces.ChainObserver, tss interfaces.TSSSigner, @@ -105,6 +111,7 @@ func New( return &Orchestrator{ zetacoreClient: client, + cometbftClient: cometbftClient, signerMap: signerMap, observerMap: observerMap, @@ -289,122 +296,114 @@ func (oc *Orchestrator) runScheduler(ctx context.Context) error { return err } - observeTicker := time.NewTicker(3 * time.Second) - var lastBlockNum int64 + blockEventChan, err := oc.cometbftClient.Subscribe(ctx, "", "tm.event='NewBlock'") + if err != nil { + return err + } + for { select { case <-oc.stop: oc.logger.Warn().Msg("runScheduler: stopped") return nil - case <-observeTicker.C: - { - bn, err := oc.zetacoreClient.GetBlockHeight(ctx) + case <-time.After(time.Second * 10): + // the subscription should automatically reconnect after zetacore + // restart, but we should log this just in case that logic is not + // working + oc.logger.Warn().Msg("runScheduler: no blocks after 10 seconds") + case event := <-blockEventChan: + newBlockEvent, ok := event.Data.(cometbft_types.EventDataNewBlock) + if !ok { + return fmt.Errorf("expecting new block event, got %T", event.Data) + } + bn := newBlockEvent.Block.Height + + balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx) + if err != nil { + oc.logger.Error().Err(err).Msgf("couldn't get operator balance") + } else { + diff := oc.lastOperatorBalance.Sub(balance) + if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) { + oc.ts.AddFeeEntry(bn, diff.Int64()) + oc.lastOperatorBalance = balance + } + } + + // set current hot key burn rate + metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64())) + + // get chain ids without zeta chain + chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) { + return c.ID(), !c.IsZeta() + }) + + // query pending cctxs across all external chains within rate limit + cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs) + if err != nil { + oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed") + } + + // schedule keysign for pending cctxs on each chain + for _, chain := range app.ListChains() { + // skip zeta chain + if chain.IsZeta() { + continue + } + + chainID := chain.ID() + + // update chain parameters for signer and chain observer + signer, err := oc.resolveSigner(app, chainID) if err != nil { - oc.logger.Error().Err(err).Msg("StartCctxScheduler: GetBlockHeight fail") + oc.logger.Error().Err(err). + Int64(logs.FieldChain, chainID). + Msg("runScheduler: unable to resolve signer") continue } - if bn < 0 { - oc.logger.Error().Msg("runScheduler: GetBlockHeight returned negative height") + + ob, err := oc.resolveObserver(app, chainID) + if err != nil { + oc.logger.Error().Err(err). + Int64(logs.FieldChain, chainID). + Msg("runScheduler: unable to resolve observer") + continue + } + + // get cctxs from map and set pending transactions prometheus gauge + cctxList := cctxMap[chainID] + + metrics.PendingTxsPerChain. + WithLabelValues(chain.Name()). + Set(float64(len(cctxList))) + + if len(cctxList) == 0 { continue } - if lastBlockNum == 0 { - lastBlockNum = bn - 1 + + if !app.IsOutboundObservationEnabled() { + continue } - if bn > lastBlockNum { // we have a new block - bn = lastBlockNum + 1 - if bn%10 == 0 { - oc.logger.Debug().Msgf("runScheduler: zetacore heart beat: %d", bn) - } - - balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx) - if err != nil { - oc.logger.Error().Err(err).Msgf("couldn't get operator balance") - } else { - diff := oc.lastOperatorBalance.Sub(balance) - if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) { - oc.ts.AddFeeEntry(bn, diff.Int64()) - oc.lastOperatorBalance = balance - } - } - - // set current hot key burn rate - metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64())) - - // get chain ids without zeta chain - chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) { - return c.ID(), !c.IsZeta() - }) - - // query pending cctxs across all external chains within rate limit - cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs) - if err != nil { - oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed") - } - - // schedule keysign for pending cctxs on each chain - for _, chain := range app.ListChains() { - // skip zeta chain - if chain.IsZeta() { - continue - } - - chainID := chain.ID() - - // update chain parameters for signer and chain observer - signer, err := oc.resolveSigner(app, chainID) - if err != nil { - oc.logger.Error().Err(err). - Int64(logs.FieldChain, chainID). - Msg("runScheduler: unable to resolve signer") - continue - } - - ob, err := oc.resolveObserver(app, chainID) - if err != nil { - oc.logger.Error().Err(err). - Int64(logs.FieldChain, chainID). - Msg("runScheduler: unable to resolve observer") - continue - } - - // get cctxs from map and set pending transactions prometheus gauge - cctxList := cctxMap[chainID] - - metrics.PendingTxsPerChain. - WithLabelValues(chain.Name()). - Set(float64(len(cctxList))) - - if len(cctxList) == 0 { - continue - } - - if !app.IsOutboundObservationEnabled() { - continue - } - - // #nosec G115 range is verified - zetaHeight := uint64(bn) - - switch { - case chain.IsEVM(): - oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer) - case chain.IsBitcoin(): - oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer) - case chain.IsSolana(): - oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer) - case chain.IsTON(): - oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer) - default: - oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID) - continue - } - } - - // update last processed block number - lastBlockNum = bn - oc.ts.SetCoreBlockNumber(lastBlockNum) + + // #nosec G115 range is verified + zetaHeight := uint64(bn) + + switch { + case chain.IsEVM(): + oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer) + case chain.IsBitcoin(): + oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer) + case chain.IsSolana(): + oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer) + case chain.IsTON(): + oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer) + default: + oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID) + continue } } + + // update last processed block number + oc.ts.SetCoreBlockNumber(bn) } } } diff --git a/zetaclient/orchestrator/orchestrator_test.go b/zetaclient/orchestrator/orchestrator_test.go index 8637a47e17..10e99c385e 100644 --- a/zetaclient/orchestrator/orchestrator_test.go +++ b/zetaclient/orchestrator/orchestrator_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" zctx "github.com/zeta-chain/node/zetaclient/context" + cometbft_rpc_client_mock "github.com/cometbft/cometbft/rpc/client/mocks" "github.com/zeta-chain/node/pkg/chains" "github.com/zeta-chain/node/pkg/coin" "github.com/zeta-chain/node/testutil/sample" @@ -587,6 +588,7 @@ func mockOrchestrator(t *testing.T, zetaClient interfaces.ZetacoreClient, chains return &Orchestrator{ zetacoreClient: zetaClient, + cometbftClient: &cometbft_rpc_client_mock.Client{}, signerMap: signers, observerMap: observers, }