Skip to content

Commit

Permalink
move to zetaclient/zetacore client
Browse files Browse the repository at this point in the history
  • Loading branch information
gartnera committed Dec 5, 2024
1 parent d8e4fc7 commit b19de8b
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 124 deletions.
2 changes: 2 additions & 0 deletions zetaclient/chains/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
cometbft_types "github.com/cometbft/cometbft/types"
upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcommon "github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -145,6 +146,7 @@ type ZetacoreClient interface {
GetUpgradePlan(ctx context.Context) (*upgradetypes.Plan, error)

PostOutboundTracker(ctx context.Context, chainID int64, nonce uint64, txHash string) (string, error)
NewBlockSubscriber(ctx context.Context) (chan cometbft_types.EventDataNewBlock, error)
}

// BTCRPCClient is the interface for BTC RPC client
Expand Down
194 changes: 91 additions & 103 deletions zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,122 +289,110 @@ func (oc *Orchestrator) runScheduler(ctx context.Context) error {
return err
}

observeTicker := time.NewTicker(3 * time.Second)
var lastBlockNum int64
newBlockChan, err := oc.zetacoreClient.NewBlockSubscriber(ctx)
if err != nil {
return err
}

Check warning on line 295 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L292-L295

Added lines #L292 - L295 were not covered by tests

for {
select {
case <-oc.stop:
oc.logger.Warn().Msg("runScheduler: stopped")
return nil
case <-observeTicker.C:
{
bn, err := oc.zetacoreClient.GetBlockHeight(ctx)
case <-time.After(time.Second * 10):
// the subscription should automatically reconnect after zetacore
// restart, but we should log this just in case that logic is not
// working
oc.logger.Warn().Msg("runScheduler: no blocks after 10 seconds")
case newBlock := <-newBlockChan:
bn := newBlock.Block.Height

balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx)
if err != nil {
oc.logger.Error().Err(err).Msgf("couldn't get operator balance")
} else {
diff := oc.lastOperatorBalance.Sub(balance)
if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) {
oc.ts.AddFeeEntry(bn, diff.Int64())
oc.lastOperatorBalance = balance
}

Check warning on line 318 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L302-L318

Added lines #L302 - L318 were not covered by tests
}

// set current hot key burn rate
metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64()))

// get chain ids without zeta chain
chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) {
return c.ID(), !c.IsZeta()
})

Check warning on line 327 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L322-L327

Added lines #L322 - L327 were not covered by tests

// query pending cctxs across all external chains within rate limit
cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs)
if err != nil {
oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed")
}

Check warning on line 333 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L330-L333

Added lines #L330 - L333 were not covered by tests

// schedule keysign for pending cctxs on each chain
for _, chain := range app.ListChains() {
// skip zeta chain
if chain.IsZeta() {
continue

Check warning on line 339 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L336-L339

Added lines #L336 - L339 were not covered by tests
}

chainID := chain.ID()

// update chain parameters for signer and chain observer
signer, err := oc.resolveSigner(app, chainID)

Check warning on line 345 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L342-L345

Added lines #L342 - L345 were not covered by tests
if err != nil {
oc.logger.Error().Err(err).Msg("StartCctxScheduler: GetBlockHeight fail")
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve signer")

Check warning on line 349 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L347-L349

Added lines #L347 - L349 were not covered by tests
continue
}
if bn < 0 {
oc.logger.Error().Msg("runScheduler: GetBlockHeight returned negative height")

ob, err := oc.resolveObserver(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve observer")

Check warning on line 357 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L353-L357

Added lines #L353 - L357 were not covered by tests
continue
}
if lastBlockNum == 0 {
lastBlockNum = bn - 1

// get cctxs from map and set pending transactions prometheus gauge
cctxList := cctxMap[chainID]

metrics.PendingTxsPerChain.
WithLabelValues(chain.Name()).
Set(float64(len(cctxList)))

if len(cctxList) == 0 {
continue

Check warning on line 369 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L362-L369

Added lines #L362 - L369 were not covered by tests
}
if bn > lastBlockNum { // we have a new block
bn = lastBlockNum + 1
if bn%10 == 0 {
oc.logger.Debug().Msgf("runScheduler: zetacore heart beat: %d", bn)
}

balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx)
if err != nil {
oc.logger.Error().Err(err).Msgf("couldn't get operator balance")
} else {
diff := oc.lastOperatorBalance.Sub(balance)
if diff.GT(sdkmath.NewInt(0)) && diff.LT(sdkmath.NewInt(math.MaxInt64)) {
oc.ts.AddFeeEntry(bn, diff.Int64())
oc.lastOperatorBalance = balance
}
}

// set current hot key burn rate
metrics.HotKeyBurnRate.Set(float64(oc.ts.HotKeyBurnRate.GetBurnRate().Int64()))

// get chain ids without zeta chain
chainIDs := lo.FilterMap(app.ListChains(), func(c zctx.Chain, _ int) (int64, bool) {
return c.ID(), !c.IsZeta()
})

// query pending cctxs across all external chains within rate limit
cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, chainIDs)
if err != nil {
oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed")
}

// schedule keysign for pending cctxs on each chain
for _, chain := range app.ListChains() {
// skip zeta chain
if chain.IsZeta() {
continue
}

chainID := chain.ID()

// update chain parameters for signer and chain observer
signer, err := oc.resolveSigner(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve signer")
continue
}

ob, err := oc.resolveObserver(app, chainID)
if err != nil {
oc.logger.Error().Err(err).
Int64(logs.FieldChain, chainID).
Msg("runScheduler: unable to resolve observer")
continue
}

// get cctxs from map and set pending transactions prometheus gauge
cctxList := cctxMap[chainID]

metrics.PendingTxsPerChain.
WithLabelValues(chain.Name()).
Set(float64(len(cctxList)))

if len(cctxList) == 0 {
continue
}

if !app.IsOutboundObservationEnabled() {
continue
}

// #nosec G115 range is verified
zetaHeight := uint64(bn)

switch {
case chain.IsEVM():
oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsBitcoin():
oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsSolana():
oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsTON():
oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer)
default:
oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID)
continue
}
}

// update last processed block number
lastBlockNum = bn
oc.ts.SetCoreBlockNumber(lastBlockNum)

if !app.IsOutboundObservationEnabled() {
continue

Check warning on line 373 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L372-L373

Added lines #L372 - L373 were not covered by tests
}

// #nosec G115 range is verified
zetaHeight := uint64(bn)

switch {
case chain.IsEVM():
oc.ScheduleCctxEVM(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsBitcoin():
oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsSolana():
oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsTON():
oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer)
default:
oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID)
continue

Check warning on line 390 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L377-L390

Added lines #L377 - L390 were not covered by tests
}
}

// update last processed block number
oc.ts.SetCoreBlockNumber(bn)

Check warning on line 395 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L395

Added line #L395 was not covered by tests
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions zetaclient/testutils/mocks/zetacore_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 24 additions & 4 deletions zetaclient/zetacore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"strings"
"sync"

rpchttp "github.com/cometbft/cometbft/rpc/client/http"
cometbft_rpc_client "github.com/cometbft/cometbft/rpc/client"
cometbft_http_client "github.com/cometbft/cometbft/rpc/client/http"
cosmosclient "github.com/cosmos/cosmos-sdk/client"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
"github.com/pkg/errors"
Expand Down Expand Up @@ -35,6 +36,7 @@ type Client struct {
config config.ClientConfiguration

cosmosClientContext cosmosclient.Context
cometBFTClient cometbft_rpc_client.Client

blockHeight int64
accountNumber map[authz.KeyType]uint64
Expand All @@ -52,7 +54,7 @@ var unsecureGRPC = grpc.WithTransportCredentials(insecure.NewCredentials())

type constructOpts struct {
customTendermint bool
tendermintClient cosmosclient.TendermintRPC
tendermintClient cometbft_rpc_client.Client

customAccountRetriever bool
accountRetriever cosmosclient.AccountRetriever
Expand All @@ -61,7 +63,7 @@ type constructOpts struct {
type Opt func(cfg *constructOpts)

// WithTendermintClient sets custom tendermint client
func WithTendermintClient(client cosmosclient.TendermintRPC) Opt {
func WithTendermintClient(client cometbft_rpc_client.Client) Opt {
return func(c *constructOpts) {
c.customTendermint = true
c.tendermintClient = client
Expand Down Expand Up @@ -123,12 +125,30 @@ func NewClient(
return nil, errors.Wrap(err, "unable to build cosmos client context")
}

cometBFTClientIface := constructOptions.tendermintClient

// create a cometbft client if one was not provided in the constructOptions
if !constructOptions.customTendermint {
cometBFTURL := "http://" + tendermintRPC(chainIP)
cometBFTClient, err := cometbft_http_client.New(cometBFTURL, "/websocket")
if err != nil {
return nil, errors.Wrapf(err, "new cometbft client (%s)", cometBFTURL)
}

Check warning on line 136 in zetaclient/zetacore/client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore/client.go#L132-L136

Added lines #L132 - L136 were not covered by tests
// start websockets
err = cometBFTClient.WSEvents.Start()
if err != nil {
return nil, errors.Wrap(err, "cometbft start")
}
cometBFTClientIface = cometBFTClient

Check warning on line 142 in zetaclient/zetacore/client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore/client.go#L138-L142

Added lines #L138 - L142 were not covered by tests
}

return &Client{
Clients: zetacoreClients,
logger: log,
config: cfg,

cosmosClientContext: cosmosContext,
cometBFTClient: cometBFTClientIface,

accountNumber: accountsMap,
seqNumber: seqMap,
Expand Down Expand Up @@ -178,7 +198,7 @@ func buildCosmosClientContext(
remote = fmt.Sprintf("tcp://%s", remote)
}

wsClient, err := rpchttp.New(remote, "/websocket")
wsClient, err := cometbft_http_client.New(remote, "/websocket")

Check warning on line 201 in zetaclient/zetacore/client.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore/client.go#L201

Added line #L201 was not covered by tests
if err != nil {
return cosmosclient.Context{}, err
}
Expand Down
14 changes: 1 addition & 13 deletions zetaclient/zetacore/client_cosmos.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package zetacore

import (
"context"
"fmt"

sdkmath "cosmossdk.io/math"
tmhttp "github.com/cometbft/cometbft/rpc/client/http"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
genutiltypes "github.com/cosmos/cosmos-sdk/x/genutil/types"
"github.com/pkg/errors"
Expand All @@ -16,17 +14,7 @@ import (
// GetGenesisSupply returns the genesis supply.
// NOTE that this method is brittle as it uses STATEFUL connection
func (c *Client) GetGenesisSupply(ctx context.Context) (sdkmath.Int, error) {
tmURL := fmt.Sprintf("http://%s", c.config.ChainRPC)

s, err := tmhttp.New(tmURL, "/websocket")
if err != nil {
return sdkmath.ZeroInt(), errors.Wrap(err, "failed to create tm client")
}

// nolint:errcheck
defer s.Stop()

res, err := s.Genesis(ctx)
res, err := c.cometBFTClient.Genesis(ctx)

Check warning on line 17 in zetaclient/zetacore/client_cosmos.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore/client_cosmos.go#L17

Added line #L17 was not covered by tests
if err != nil {
return sdkmath.ZeroInt(), errors.Wrap(err, "failed to get genesis")
}
Expand Down
Loading

0 comments on commit b19de8b

Please sign in to comment.