Skip to content

Commit

Permalink
refactor(zetaclient): subscribe to new blocks in scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
gartnera committed Nov 27, 2024
1 parent 08ff881 commit 276e5ad
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 103 deletions.
14 changes: 14 additions & 0 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof" // #nosec G108 -- pprof enablement is intentional
"os"
Expand All @@ -10,6 +11,7 @@ import (
"strings"
"syscall"

cometbft_client "github.com/cometbft/cometbft/rpc/client/http"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -217,12 +219,24 @@ func Start(_ *cobra.Command, _ []string) error {
return errors.Wrap(err, "unable to create chain observer map")
}

cometbftURL := fmt.Sprintf("http://%s:%d", cfg.ZetaCoreURL, 26657)
cometbftClient, err := cometbft_client.New(cometbftURL, "/websocket")
if err != nil {
return errors.Wrapf(err, "new cometbft client (%s)", cometbftURL)
}
// start websockets
err = cometbftClient.WSEvents.Start()
if err != nil {
return errors.Wrap(err, "cometbft start")
}

// Orchestrator wraps the zetacore client and adds the observers and signer maps to it.
// This is the high level object used for CCTX interactions
// It also handles background configuration updates from zetacore
maestro, err := orchestrator.New(
ctx,
zetacoreClient,
cometbftClient,
signerMap,
observerMap,
tss,
Expand Down
205 changes: 102 additions & 103 deletions zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"time"

sdkmath "cosmossdk.io/math"
cometbft_rpc_client "github.com/cometbft/cometbft/rpc/client"
cometbft_types "github.com/cometbft/cometbft/types"
eth "github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -51,6 +53,9 @@ type Orchestrator struct {
// zetacore client
zetacoreClient interfaces.ZetacoreClient

// cometbft client
cometbftClient cometbft_rpc_client.Client

// signerMap contains the chain signers indexed by chainID
signerMap map[int64]interfaces.ChainSigner

Expand Down Expand Up @@ -84,6 +89,7 @@ type multiLogger struct {
func New(
ctx context.Context,
client interfaces.ZetacoreClient,
cometbftClient cometbft_rpc_client.Client,
signerMap map[int64]interfaces.ChainSigner,
observerMap map[int64]interfaces.ChainObserver,
tss interfaces.TSSSigner,
Expand All @@ -105,6 +111,7 @@ func New(

return &Orchestrator{
zetacoreClient: client,
cometbftClient: cometbftClient,

Check warning on line 114 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L114

Added line #L114 was not covered by tests

signerMap: signerMap,
observerMap: observerMap,
Expand Down Expand Up @@ -289,122 +296,114 @@ func (oc *Orchestrator) runScheduler(ctx context.Context) error {
return err
}

observeTicker := time.NewTicker(3 * time.Second)
var lastBlockNum int64
blockEventChan, err := oc.cometbftClient.Subscribe(ctx, "", "tm.event='NewBlock'")
if err != nil {
return err
}

Check warning on line 302 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L299-L302

Added lines #L299 - L302 were not covered by tests

for {
select {
case <-oc.stop:
oc.logger.Warn().Msg("runScheduler: stopped")
return nil
case <-observeTicker.C:
{
bn, err := oc.zetacoreClient.GetBlockHeight(ctx)
case <-time.After(time.Second * 10):
// the subscription should automatically reconnect after zetacore
// restart, but we should log this just in case that logic is not
// working
oc.logger.Warn().Msg("runScheduler: no blocks after 10 seconds")
case event := <-blockEventChan:
newBlockEvent, ok := event.Data.(cometbft_types.EventDataNewBlock)
if !ok {
return fmt.Errorf("expecting new block event, got %T", event.Data)
}
bn := newBlockEvent.Block.Height

balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx)
if err != nil {
oc.logger.Error().Err(err).Msgf("couldn't get operator balance")
} else {
diff := oc.lastOperatorBalance.Sub(balance)
if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) {
oc.ts.AddFeeEntry(bn, diff.Int64())
oc.lastOperatorBalance = balance
}

Check warning on line 329 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L309-L329

Added lines #L309 - L329 were not covered by tests
}

// set current hot key burn rate
metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64()))

// get chain ids without zeta chain
chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) {
return c.ID(), !c.IsZeta()
})

Check warning on line 338 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L333-L338

Added lines #L333 - L338 were not covered by tests

// query pending cctxs across all external chains within rate limit
cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs)
if err != nil {
oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed")
}

Check warning on line 344 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L341-L344

Added lines #L341 - L344 were not covered by tests

// schedule keysign for pending cctxs on each chain
for _, chain := range app.ListChains() {
// skip zeta chain
if chain.IsZeta() {
continue

Check warning on line 350 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L347-L350

Added lines #L347 - L350 were not covered by tests
}

chainID := chain.ID()

// update chain parameters for signer and chain observer
signer, err := oc.resolveSigner(app, chainID)

Check warning on line 356 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L353-L356

Added lines #L353 - L356 were not covered by tests
if err != nil {
oc.logger.Error().Err(err).Msg("StartCctxScheduler: GetBlockHeight fail")
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve signer")

Check warning on line 360 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L358-L360

Added lines #L358 - L360 were not covered by tests
continue
}
if bn < 0 {
oc.logger.Error().Msg("runScheduler: GetBlockHeight returned negative height")

ob, err := oc.resolveObserver(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve observer")
continue

Check warning on line 369 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L364-L369

Added lines #L364 - L369 were not covered by tests
}

// get cctxs from map and set pending transactions prometheus gauge
cctxList := cctxMap[chainID]

metrics.PendingTxsPerChain.
WithLabelValues(chain.Name()).
Set(float64(len(cctxList)))

if len(cctxList) == 0 {

Check warning on line 379 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L373-L379

Added lines #L373 - L379 were not covered by tests
continue
}
if lastBlockNum == 0 {
lastBlockNum = bn - 1

if !app.IsOutboundObservationEnabled() {
continue

Check warning on line 384 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L383-L384

Added lines #L383 - L384 were not covered by tests
}
if bn > lastBlockNum { // we have a new block
bn = lastBlockNum + 1
if bn%10 == 0 {
oc.logger.Debug().Msgf("runScheduler: zetacore heart beat: %d", bn)
}

balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx)
if err != nil {
oc.logger.Error().Err(err).Msgf("couldn't get operator balance")
} else {
diff := oc.lastOperatorBalance.Sub(balance)
if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) {
oc.ts.AddFeeEntry(bn, diff.Int64())
oc.lastOperatorBalance = balance
}
}

// set current hot key burn rate
metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64()))

// get chain ids without zeta chain
chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) {
return c.ID(), !c.IsZeta()
})

// query pending cctxs across all external chains within rate limit
cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs)
if err != nil {
oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed")
}

// schedule keysign for pending cctxs on each chain
for _, chain := range app.ListChains() {
// skip zeta chain
if chain.IsZeta() {
continue
}

chainID := chain.ID()

// update chain parameters for signer and chain observer
signer, err := oc.resolveSigner(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve signer")
continue
}

ob, err := oc.resolveObserver(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve observer")
continue
}

// get cctxs from map and set pending transactions prometheus gauge
cctxList := cctxMap[chainID]

metrics.PendingTxsPerChain.
WithLabelValues(chain.Name()).
Set(float64(len(cctxList)))

if len(cctxList) == 0 {
continue
}

if !app.IsOutboundObservationEnabled() {
continue
}

// #nosec G115 range is verified
zetaHeight := uint64(bn)

switch {
case chain.IsEVM():
oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsBitcoin():
oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsSolana():
oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsTON():
oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer)
default:
oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID)
continue
}
}

// update last processed block number
lastBlockNum = bn
oc.ts.SetCoreBlockNumber(lastBlockNum)

// #nosec G115 range is verified
zetaHeight := uint64(bn)

switch {
case chain.IsEVM():
oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsBitcoin():
oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsSolana():
oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsTON():
oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer)
default:
oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID)
continue

Check warning on line 401 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L388-L401

Added lines #L388 - L401 were not covered by tests
}
}

// update last processed block number
oc.ts.SetCoreBlockNumber(bn)

Check warning on line 406 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L406

Added line #L406 was not covered by tests
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions zetaclient/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
zctx "github.com/zeta-chain/node/zetaclient/context"

cometbft_rpc_client_mock "github.com/cometbft/cometbft/rpc/client/mocks"
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/coin"
"github.com/zeta-chain/node/testutil/sample"
Expand Down Expand Up @@ -587,6 +588,7 @@ func mockOrchestrator(t *testing.T, zetaClient interfaces.ZetacoreClient, chains

return &Orchestrator{
zetacoreClient: zetaClient,
cometbftClient: &cometbft_rpc_client_mock.Client{},
signerMap: signers,
observerMap: observers,
}
Expand Down

0 comments on commit 276e5ad

Please sign in to comment.