Skip to content

Commit

Permalink
refactor(zetaclient): subscribe to new blocks in scheduler (#3228)
Browse files Browse the repository at this point in the history
* move to zetaclient/zetacore client

* add SubscribeNewBlocks test coverage

* add core_block_time_latency metric

* remove underscores from imports

* skip block notification if latency is too high

* skip precompiles in performance tests
  • Loading branch information
gartnera authored Dec 10, 2024
1 parent 3a8fa1e commit 376b714
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 134 deletions.
1 change: 1 addition & 0 deletions cmd/zetae2e/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func localE2ETest(cmd *cobra.Command, _ []string) {
if testPerformance {
logger.Print("⚠️ performance tests enabled, regular tests will be skipped")
skipRegular = true
skipPrecompiles = true
}

// start timer
Expand Down
6 changes: 3 additions & 3 deletions cmd/zetae2e/local/monitor_block_production.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

rpchttp "github.com/cometbft/cometbft/rpc/client/http"
cometbft_types "github.com/cometbft/cometbft/types"
cometbfttypes "github.com/cometbft/cometbft/types"

"github.com/zeta-chain/node/e2e/config"
)
Expand Down Expand Up @@ -37,11 +37,11 @@ func monitorBlockProduction(ctx context.Context, conf config.Config) error {
if err != nil {
return fmt.Errorf("subscribe: %w", err)
}
latestNewBlockEvent := cometbft_types.EventDataNewBlock{}
latestNewBlockEvent := cometbfttypes.EventDataNewBlock{}
for {
select {
case event := <-blockEventChan:
newBlockEvent, ok := event.Data.(cometbft_types.EventDataNewBlock)
newBlockEvent, ok := event.Data.(cometbfttypes.EventDataNewBlock)
if !ok {
return fmt.Errorf("expecting new block event, got %T", event.Data)
}
Expand Down
2 changes: 2 additions & 0 deletions zetaclient/chains/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
cometbfttypes "github.com/cometbft/cometbft/types"
upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcommon "github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -145,6 +146,7 @@ type ZetacoreClient interface {
GetUpgradePlan(ctx context.Context) (*upgradetypes.Plan, error)

PostOutboundTracker(ctx context.Context, chainID int64, nonce uint64, txHash string) (string, error)
NewBlockSubscriber(ctx context.Context) (chan cometbfttypes.EventDataNewBlock, error)
}

// BTCRPCClient is the interface for BTC RPC client
Expand Down
8 changes: 8 additions & 0 deletions zetaclient/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ var (
Help: "Last core block number",
})

// CoreBlockLatency is a gauge that measures the difference between system time and
// block time from zetacore
CoreBlockLatency = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: ZetaClientNamespace,
Name: "core_block_latency",
Help: "Difference between system time and block time from zetacore",
})

// Info is a gauge that contains information about the zetaclient environment
Info = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ZetaClientNamespace,
Expand Down
205 changes: 102 additions & 103 deletions zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,122 +289,121 @@ func (oc *Orchestrator) runScheduler(ctx context.Context) error {
return err
}

observeTicker := time.NewTicker(3 * time.Second)
var lastBlockNum int64
newBlockChan, err := oc.zetacoreClient.NewBlockSubscriber(ctx)
if err != nil {
return err
}

for {
select {
case <-oc.stop:
oc.logger.Warn().Msg("runScheduler: stopped")
return nil
case <-observeTicker.C:
{
bn, err := oc.zetacoreClient.GetBlockHeight(ctx)
case <-time.After(time.Second * 10):
// the subscription should automatically reconnect after zetacore
// restart, but we should log this just in case that logic is not
// working
oc.logger.Warn().Msg("runScheduler: no blocks after 10 seconds")
case newBlock := <-newBlockChan:
bn := newBlock.Block.Height

blockTimeLatency := time.Since(newBlock.Block.Time)
blockTimeLatencySeconds := blockTimeLatency.Seconds()
metrics.CoreBlockLatency.Set(blockTimeLatencySeconds)

if blockTimeLatencySeconds > 15 {
oc.logger.Warn().
Float64("latency", blockTimeLatencySeconds).
Msgf("runScheduler: core block latency too high")
continue
}

balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx)
if err != nil {
oc.logger.Error().Err(err).Msgf("couldn't get operator balance")
} else {
diff := oc.lastOperatorBalance.Sub(balance)
if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) {
oc.ts.AddFeeEntry(bn, diff.Int64())
oc.lastOperatorBalance = balance
}
}

// set current hot key burn rate
metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64()))

// get chain ids without zeta chain
chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) {
return c.ID(), !c.IsZeta()
})

// query pending cctxs across all external chains within rate limit
cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs)
if err != nil {
oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed")
}

// schedule keysign for pending cctxs on each chain
for _, chain := range app.ListChains() {
// skip zeta chain
if chain.IsZeta() {
continue
}

chainID := chain.ID()

// update chain parameters for signer and chain observer
signer, err := oc.resolveSigner(app, chainID)
if err != nil {
oc.logger.Error().Err(err).Msg("StartCctxScheduler: GetBlockHeight fail")
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve signer")
continue
}
if bn < 0 {
oc.logger.Error().Msg("runScheduler: GetBlockHeight returned negative height")

ob, err := oc.resolveObserver(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve observer")
continue
}

// get cctxs from map and set pending transactions prometheus gauge
cctxList := cctxMap[chainID]

metrics.PendingTxsPerChain.
WithLabelValues(chain.Name()).
Set(float64(len(cctxList)))

if len(cctxList) == 0 {
continue
}
if lastBlockNum == 0 {
lastBlockNum = bn - 1

if !app.IsOutboundObservationEnabled() {
continue
}
if bn > lastBlockNum { // we have a new block
bn = lastBlockNum + 1
if bn%10 == 0 {
oc.logger.Debug().Msgf("runScheduler: zetacore heart beat: %d", bn)
}

balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx)
if err != nil {
oc.logger.Error().Err(err).Msgf("couldn't get operator balance")
} else {
diff := oc.lastOperatorBalance.Sub(balance)
if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) {
oc.ts.AddFeeEntry(bn, diff.Int64())
oc.lastOperatorBalance = balance
}
}

// set current hot key burn rate
metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64()))

// get chain ids without zeta chain
chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) {
return c.ID(), !c.IsZeta()
})

// query pending cctxs across all external chains within rate limit
cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs)
if err != nil {
oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed")
}

// schedule keysign for pending cctxs on each chain
for _, chain := range app.ListChains() {
// skip zeta chain
if chain.IsZeta() {
continue
}

chainID := chain.ID()

// update chain parameters for signer and chain observer
signer, err := oc.resolveSigner(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve signer")
continue
}

ob, err := oc.resolveObserver(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve observer")
continue
}

// get cctxs from map and set pending transactions prometheus gauge
cctxList := cctxMap[chainID]

metrics.PendingTxsPerChain.
WithLabelValues(chain.Name()).
Set(float64(len(cctxList)))

if len(cctxList) == 0 {
continue
}

if !app.IsOutboundObservationEnabled() {
continue
}

// #nosec G115 range is verified
zetaHeight := uint64(bn)

switch {
case chain.IsEVM():
oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsBitcoin():
oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsSolana():
oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsTON():
oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer)
default:
oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID)
continue
}
}

// update last processed block number
lastBlockNum = bn
oc.ts.SetCoreBlockNumber(lastBlockNum)

// #nosec G115 range is verified
zetaHeight := uint64(bn)

switch {
case chain.IsEVM():
oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsBitcoin():
oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsSolana():
oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsTON():
oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer)
default:
oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID)
continue
}
}

// update last processed block number
oc.ts.SetCoreBlockNumber(bn)
}
}
}
Expand Down
32 changes: 28 additions & 4 deletions zetaclient/testutils/mocks/cometbft_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mocks
import (
"context"
"encoding/hex"
"sync"
"testing"

abci "github.com/cometbft/cometbft/abci/types"
Expand All @@ -20,6 +21,9 @@ type CometBFTClient struct {
err error
code uint32
txHash bytes.HexBytes

subscribers map[string]chan<- coretypes.ResultEvent
subscribersLock sync.Mutex
}

func (c *CometBFTClient) BroadcastTxCommit(
Expand Down Expand Up @@ -82,11 +86,31 @@ func (c *CometBFTClient) SetError(err error) *CometBFTClient {
return c
}

// PublishToSubscribers will publish an event to all subscribers (mock only)
func (c *CometBFTClient) PublishToSubscribers(event coretypes.ResultEvent) {
c.subscribersLock.Lock()
defer c.subscribersLock.Unlock()
for _, ch := range c.subscribers {
ch <- event
}
}

func (c *CometBFTClient) Subscribe(
_ context.Context,
subscriber, _ string,
_ ...int,
) (out <-chan coretypes.ResultEvent, err error) {
outChan := make(chan coretypes.ResultEvent)
c.subscribers[subscriber] = outChan
return outChan, nil
}

func NewSDKClientWithErr(t *testing.T, err error, code uint32) *CometBFTClient {
return &CometBFTClient{
t: t,
Client: mock.Client{},
err: err,
code: code,
t: t,
Client: mock.Client{},
err: err,
code: code,
subscribers: make(map[string]chan<- coretypes.ResultEvent),
}
}
27 changes: 27 additions & 0 deletions zetaclient/testutils/mocks/zetacore_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 376b714

Please sign in to comment.