Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(zetaclient): subscribe to new blocks in scheduler #3228

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/zetae2e/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func localE2ETest(cmd *cobra.Command, _ []string) {
if testPerformance {
logger.Print("⚠️ performance tests enabled, regular tests will be skipped")
skipRegular = true
skipPrecompiles = true
}

// start timer
Expand Down
6 changes: 3 additions & 3 deletions cmd/zetae2e/local/monitor_block_production.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

rpchttp "github.com/cometbft/cometbft/rpc/client/http"
cometbft_types "github.com/cometbft/cometbft/types"
cometbfttypes "github.com/cometbft/cometbft/types"

"github.com/zeta-chain/node/e2e/config"
)
Expand Down Expand Up @@ -37,11 +37,11 @@ func monitorBlockProduction(ctx context.Context, conf config.Config) error {
if err != nil {
return fmt.Errorf("subscribe: %w", err)
}
latestNewBlockEvent := cometbft_types.EventDataNewBlock{}
latestNewBlockEvent := cometbfttypes.EventDataNewBlock{}
for {
select {
case event := <-blockEventChan:
newBlockEvent, ok := event.Data.(cometbft_types.EventDataNewBlock)
newBlockEvent, ok := event.Data.(cometbfttypes.EventDataNewBlock)
if !ok {
return fmt.Errorf("expecting new block event, got %T", event.Data)
}
Expand Down
2 changes: 2 additions & 0 deletions zetaclient/chains/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
cometbfttypes "github.com/cometbft/cometbft/types"
upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcommon "github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -145,6 +146,7 @@ type ZetacoreClient interface {
GetUpgradePlan(ctx context.Context) (*upgradetypes.Plan, error)

PostOutboundTracker(ctx context.Context, chainID int64, nonce uint64, txHash string) (string, error)
NewBlockSubscriber(ctx context.Context) (chan cometbfttypes.EventDataNewBlock, error)
}

// BTCRPCClient is the interface for BTC RPC client
Expand Down
8 changes: 8 additions & 0 deletions zetaclient/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ var (
Help: "Last core block number",
})

// CoreBlockLatency is a gauge that measures the difference between system time and
// block time from zetacore
CoreBlockLatency = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: ZetaClientNamespace,
Name: "core_block_latency",
Help: "Difference between system time and block time from zetacore",
})

// Info is a gauge that contains information about the zetaclient environment
Info = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ZetaClientNamespace,
Expand Down
205 changes: 102 additions & 103 deletions zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,122 +289,121 @@
return err
}

observeTicker := time.NewTicker(3 * time.Second)
gartnera marked this conversation as resolved.
Show resolved Hide resolved
var lastBlockNum int64
newBlockChan, err := oc.zetacoreClient.NewBlockSubscriber(ctx)
if err != nil {
return err
}

Check warning on line 295 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L292-L295

Added lines #L292 - L295 were not covered by tests

for {
select {
case <-oc.stop:
oc.logger.Warn().Msg("runScheduler: stopped")
return nil
case <-observeTicker.C:
{
bn, err := oc.zetacoreClient.GetBlockHeight(ctx)
case <-time.After(time.Second * 10):
// the subscription should automatically reconnect after zetacore
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved
// restart, but we should log this just in case that logic is not
// working
oc.logger.Warn().Msg("runScheduler: no blocks after 10 seconds")
case newBlock := <-newBlockChan:
gartnera marked this conversation as resolved.
Show resolved Hide resolved
bn := newBlock.Block.Height

blockTimeLatency := time.Since(newBlock.Block.Time)
blockTimeLatencySeconds := blockTimeLatency.Seconds()
metrics.CoreBlockLatency.Set(blockTimeLatencySeconds)

if blockTimeLatencySeconds > 15 {
oc.logger.Warn().
Float64("latency", blockTimeLatencySeconds).
Msgf("runScheduler: core block latency too high")
continue

Check warning on line 318 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L302-L318

Added lines #L302 - L318 were not covered by tests
}

balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx)
if err != nil {
oc.logger.Error().Err(err).Msgf("couldn't get operator balance")
} else {
diff := oc.lastOperatorBalance.Sub(balance)
if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) {
oc.ts.AddFeeEntry(bn, diff.Int64())
oc.lastOperatorBalance = balance
}

Check warning on line 329 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L321-L329

Added lines #L321 - L329 were not covered by tests
}

// set current hot key burn rate
metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64()))

// get chain ids without zeta chain
chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) {
gartnera marked this conversation as resolved.
Show resolved Hide resolved
return c.ID(), !c.IsZeta()
})

Check warning on line 338 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L333-L338

Added lines #L333 - L338 were not covered by tests

// query pending cctxs across all external chains within rate limit
cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs)
if err != nil {
oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed")
}

Check warning on line 344 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L341-L344

Added lines #L341 - L344 were not covered by tests

// schedule keysign for pending cctxs on each chain
for _, chain := range app.ListChains() {
// skip zeta chain
if chain.IsZeta() {
continue

Check warning on line 350 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L347-L350

Added lines #L347 - L350 were not covered by tests
}

chainID := chain.ID()

// update chain parameters for signer and chain observer
signer, err := oc.resolveSigner(app, chainID)

Check warning on line 356 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L353-L356

Added lines #L353 - L356 were not covered by tests
if err != nil {
oc.logger.Error().Err(err).Msg("StartCctxScheduler: GetBlockHeight fail")
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve signer")

Check warning on line 360 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L358-L360

Added lines #L358 - L360 were not covered by tests
continue
}
if bn < 0 {
oc.logger.Error().Msg("runScheduler: GetBlockHeight returned negative height")

ob, err := oc.resolveObserver(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve observer")
continue

Check warning on line 369 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L364-L369

Added lines #L364 - L369 were not covered by tests
}

// get cctxs from map and set pending transactions prometheus gauge
cctxList := cctxMap[chainID]

metrics.PendingTxsPerChain.
WithLabelValues(chain.Name()).
Set(float64(len(cctxList)))

if len(cctxList) == 0 {

Check warning on line 379 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L373-L379

Added lines #L373 - L379 were not covered by tests
continue
}
if lastBlockNum == 0 {
lastBlockNum = bn - 1

if !app.IsOutboundObservationEnabled() {
kingpinXD marked this conversation as resolved.
Show resolved Hide resolved
continue

Check warning on line 384 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L383-L384

Added lines #L383 - L384 were not covered by tests
}
if bn > lastBlockNum { // we have a new block
bn = lastBlockNum + 1
if bn%10 == 0 {
oc.logger.Debug().Msgf("runScheduler: zetacore heart beat: %d", bn)
}

balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx)
if err != nil {
oc.logger.Error().Err(err).Msgf("couldn't get operator balance")
} else {
diff := oc.lastOperatorBalance.Sub(balance)
if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) {
oc.ts.AddFeeEntry(bn, diff.Int64())
oc.lastOperatorBalance = balance
}
}

// set current hot key burn rate
metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64()))

// get chain ids without zeta chain
chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) {
return c.ID(), !c.IsZeta()
})

// query pending cctxs across all external chains within rate limit
cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs)
if err != nil {
oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed")
}

// schedule keysign for pending cctxs on each chain
for _, chain := range app.ListChains() {
// skip zeta chain
if chain.IsZeta() {
continue
}

chainID := chain.ID()

// update chain parameters for signer and chain observer
signer, err := oc.resolveSigner(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve signer")
continue
}

ob, err := oc.resolveObserver(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve observer")
continue
}

// get cctxs from map and set pending transactions prometheus gauge
cctxList := cctxMap[chainID]

metrics.PendingTxsPerChain.
WithLabelValues(chain.Name()).
Set(float64(len(cctxList)))

if len(cctxList) == 0 {
continue
}

if !app.IsOutboundObservationEnabled() {
continue
}

// #nosec G115 range is verified
zetaHeight := uint64(bn)

switch {
case chain.IsEVM():
oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsBitcoin():
oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsSolana():
oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsTON():
oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer)
default:
oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID)
continue
}
}

// update last processed block number
lastBlockNum = bn
oc.ts.SetCoreBlockNumber(lastBlockNum)

// #nosec G115 range is verified
zetaHeight := uint64(bn)

switch {
case chain.IsEVM():
oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsBitcoin():
oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsSolana():
oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsTON():
oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer)
default:
oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID)
continue

Check warning on line 401 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L388-L401

Added lines #L388 - L401 were not covered by tests
gartnera marked this conversation as resolved.
Show resolved Hide resolved
}
}

// update last processed block number
oc.ts.SetCoreBlockNumber(bn)

Check warning on line 406 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L406

Added line #L406 was not covered by tests
}
}
}
Expand Down
32 changes: 28 additions & 4 deletions zetaclient/testutils/mocks/cometbft_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mocks
import (
"context"
"encoding/hex"
"sync"
"testing"

abci "github.com/cometbft/cometbft/abci/types"
Expand All @@ -20,6 +21,9 @@ type CometBFTClient struct {
err error
code uint32
txHash bytes.HexBytes

subscribers map[string]chan<- coretypes.ResultEvent
subscribersLock sync.Mutex
}

func (c *CometBFTClient) BroadcastTxCommit(
Expand Down Expand Up @@ -82,11 +86,31 @@ func (c *CometBFTClient) SetError(err error) *CometBFTClient {
return c
}

// PublishToSubscribers will publish an event to all subscribers (mock only)
func (c *CometBFTClient) PublishToSubscribers(event coretypes.ResultEvent) {
c.subscribersLock.Lock()
defer c.subscribersLock.Unlock()
for _, ch := range c.subscribers {
ch <- event
}
}

func (c *CometBFTClient) Subscribe(
_ context.Context,
subscriber, _ string,
_ ...int,
) (out <-chan coretypes.ResultEvent, err error) {
outChan := make(chan coretypes.ResultEvent)
c.subscribers[subscriber] = outChan
return outChan, nil
}

func NewSDKClientWithErr(t *testing.T, err error, code uint32) *CometBFTClient {
return &CometBFTClient{
t: t,
Client: mock.Client{},
err: err,
code: code,
t: t,
Client: mock.Client{},
err: err,
code: code,
subscribers: make(map[string]chan<- coretypes.ResultEvent),
}
}
27 changes: 27 additions & 0 deletions zetaclient/testutils/mocks/zetacore_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading