Skip to content

Commit

Permalink
refactor(zetaclient): remove redundant getters/setters (#3268)
Browse files Browse the repository at this point in the history
* Simplify base signer

* Base observer: remove header cache

* Remove redundant getters/setters from evm/btc/sol/base observers & signers

* Fix inbound cmd
  • Loading branch information
swift1337 authored Dec 11, 2024
1 parent 376b714 commit 7d4d99b
Show file tree
Hide file tree
Showing 30 changed files with 562 additions and 1,029 deletions.
73 changes: 31 additions & 42 deletions cmd/zetaclientd/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@ import (
"strings"

"cosmossdk.io/errors"
"github.com/btcsuite/btcd/rpcclient"
sdk "github.com/cosmos/cosmos-sdk/types"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/onrik/ethrpc"
"github.com/rs/zerolog"
"github.com/spf13/cobra"

"github.com/zeta-chain/node/pkg/coin"
"github.com/zeta-chain/node/testutil/sample"
"github.com/zeta-chain/node/zetaclient/chains/base"
btcobserver "github.com/zeta-chain/node/zetaclient/chains/bitcoin/observer"
evmobserver "github.com/zeta-chain/node/zetaclient/chains/evm/observer"
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
"github.com/zeta-chain/node/zetaclient/db"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/orchestrator"
"github.com/zeta-chain/node/zetaclient/zetacore"
Expand Down Expand Up @@ -87,27 +86,26 @@ func InboundGetBallot(_ *cobra.Command, args []string) error {
return err
}

chainProto := chain.RawChain()
baseLogger := base.Logger{Std: zerolog.Nop(), Compliance: zerolog.Nop()}

observers, err := orchestrator.CreateChainObserverMap(ctx, client, nil, db.SqliteInMemory, baseLogger, nil)
if err != nil {
return errors.Wrap(err, "failed to create chain observer map")
}

// get ballot identifier according to the chain type
if chain.IsEVM() {
evmObserver := evmobserver.Observer{}
evmObserver.WithZetacoreClient(client)
var ethRPC *ethrpc.EthRPC
var client *ethclient.Client
coinType := coin.CoinType_Cmd
for chainIDFromConfig, evmConfig := range cfg.GetAllEVMConfigs() {
if chainIDFromConfig == chainID {
ethRPC = ethrpc.NewEthRPC(evmConfig.Endpoint)
client, err = ethclient.Dial(evmConfig.Endpoint)
if err != nil {
return err
}
evmObserver.WithEvmClient(client)
evmObserver.WithEvmJSONRPC(ethRPC)
evmObserver.WithChain(*chainProto)
}
observer, ok := observers[chainID]
if !ok {
return fmt.Errorf("observer not found for evm chain %d", chain.ID())
}

evmObserver, ok := observer.(*evmobserver.Observer)
if !ok {
return fmt.Errorf("observer is not evm observer for chain %d", chain.ID())
}

coinType := coin.CoinType_Cmd
hash := ethcommon.HexToHash(inboundHash)
tx, isPending, err := evmObserver.TransactionByHash(inboundHash)
if err != nil {
Expand All @@ -118,7 +116,7 @@ func InboundGetBallot(_ *cobra.Command, args []string) error {
return fmt.Errorf("tx is still pending")
}

receipt, err := client.TransactionReceipt(context.Background(), hash)
receipt, err := evmObserver.TransactionReceipt(ctx, hash)
if err != nil {
return fmt.Errorf("tx receipt not found on chain %s, %d", err.Error(), chain.ID())
}
Expand Down Expand Up @@ -158,33 +156,23 @@ func InboundGetBallot(_ *cobra.Command, args []string) error {
}
fmt.Println("CoinType : ", coinType)
} else if chain.IsBitcoin() {
btcObserver := btcobserver.Observer{}
btcObserver.WithZetacoreClient(client)
btcObserver.WithChain(*chainProto)
btcConfig, found := cfg.GetBTCConfig(chainID)
if !found {
return fmt.Errorf("unable to find config for BTC chain %d", chainID)
}
connCfg := &rpcclient.ConnConfig{
Host: btcConfig.RPCHost,
User: btcConfig.RPCUsername,
Pass: btcConfig.RPCPassword,
HTTPPostMode: true,
DisableTLS: true,
Params: btcConfig.RPCParams,
observer, ok := observers[chainID]
if !ok {
return fmt.Errorf("observer not found for btc chain %d", chainID)
}

btcClient, err := rpcclient.New(connCfg, nil)
if err != nil {
return err
btcObserver, ok := observer.(*btcobserver.Observer)
if !ok {
return fmt.Errorf("observer is not btc observer for chain %d", chainID)
}
btcObserver.WithBtcClient(btcClient)

ballotIdentifier, err = btcObserver.CheckReceiptForBtcTxHash(ctx, inboundHash, false)
if err != nil {
return err
}
}
fmt.Println("BallotIdentifier : ", ballotIdentifier)

fmt.Println("BallotIdentifier: ", ballotIdentifier)

// query ballot
ballot, err := client.GetBallot(ctx, ballotIdentifier)
Expand All @@ -193,9 +181,10 @@ func InboundGetBallot(_ *cobra.Command, args []string) error {
}

for _, vote := range ballot.Voters {
fmt.Printf("%s : %s \n", vote.VoterAddress, vote.VoteType)
fmt.Printf("%s: %s\n", vote.VoterAddress, vote.VoteType)
}
fmt.Println("BallotStatus : ", ballot.BallotStatus)

fmt.Println("BallotStatus: ", ballot.BallotStatus)

return nil
}
2 changes: 1 addition & 1 deletion cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func Start(_ *cobra.Command, _ []string) error {

// CreateSignerMap: This creates a map of all signers for each chain.
// Each signer is responsible for signing transactions for a particular chain
signerMap, err := orchestrator.CreateSignerMap(ctx, tss, logger, telemetryServer)
signerMap, err := orchestrator.CreateSignerMap(ctx, tss, logger)
if err != nil {
log.Error().Err(err).Msg("Unable to create signer map")
return err
Expand Down
120 changes: 30 additions & 90 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ const (
// DefaultBlockCacheSize is the default number of blocks that the observer will keep in cache for performance (without RPC calls)
// Cached blocks can be used to get block information and verify transactions
DefaultBlockCacheSize = 1000

// DefaultHeaderCacheSize is the default number of headers that the observer will keep in cache for performance (without RPC calls)
// Cached headers can be used to get header information
DefaultHeaderCacheSize = 1000
)

// Observer is the base structure for chain observers, grouping the common logic for each chain observer client.
Expand Down Expand Up @@ -64,12 +60,8 @@ type Observer struct {
// rpcAlertLatency is the threshold of RPC latency to trigger an alert
rpcAlertLatency time.Duration

// blockCache is the cache for blocks
blockCache *lru.Cache

// headerCache is the cache for headers
headerCache *lru.Cache

// db is the database to persist data
db *db.DB

Expand All @@ -95,13 +87,17 @@ func NewObserver(
zetacoreClient interfaces.ZetacoreClient,
tss interfaces.TSSSigner,
blockCacheSize int,
headerCacheSize int,
rpcAlertLatency int64,
ts *metrics.TelemetryServer,
database *db.DB,
logger Logger,
) (*Observer, error) {
ob := Observer{
blockCache, err := lru.New(blockCacheSize)
if err != nil {
return nil, errors.Wrap(err, "error creating block cache")
}

return &Observer{
chain: chain,
chainParams: chainParams,
zetacoreClient: zetacoreClient,
Expand All @@ -112,27 +108,11 @@ func NewObserver(
rpcAlertLatency: time.Duration(rpcAlertLatency) * time.Second,
ts: ts,
db: database,
blockCache: blockCache,
mu: &sync.Mutex{},
logger: newObserverLogger(chain, logger),
stop: make(chan struct{}),
}

// setup loggers
ob.WithLogger(logger)

// create block cache
var err error
ob.blockCache, err = lru.New(blockCacheSize)
if err != nil {
return nil, errors.Wrap(err, "error creating block cache")
}

// create header cache
ob.headerCache, err = lru.New(headerCacheSize)
if err != nil {
return nil, errors.Wrap(err, "error creating header cache")
}

return &ob, nil
}, nil
}

// Start starts the observer. Returns false if it's already started (noop).
Expand Down Expand Up @@ -178,12 +158,6 @@ func (ob *Observer) Chain() chains.Chain {
return ob.chain
}

// WithChain attaches a new chain to the observer.
func (ob *Observer) WithChain(chain chains.Chain) *Observer {
ob.chain = chain
return ob
}

// ChainParams returns the chain params for the observer.
func (ob *Observer) ChainParams() observertypes.ChainParams {
ob.mu.Lock()
Expand All @@ -205,23 +179,11 @@ func (ob *Observer) ZetacoreClient() interfaces.ZetacoreClient {
return ob.zetacoreClient
}

// WithZetacoreClient attaches a new zetacore client to the observer.
func (ob *Observer) WithZetacoreClient(client interfaces.ZetacoreClient) *Observer {
ob.zetacoreClient = client
return ob
}

// TSS returns the tss signer for the observer.
func (ob *Observer) TSS() interfaces.TSSSigner {
return ob.tss
}

// WithTSS attaches a new tss signer to the observer.
func (ob *Observer) WithTSS(tss interfaces.TSSSigner) *Observer {
ob.tss = tss
return ob
}

// TSSAddressString returns the TSS address for the chain.
//
// Note: all chains uses TSS EVM address except Bitcoin chain.
Expand Down Expand Up @@ -287,23 +249,6 @@ func (ob *Observer) BlockCache() *lru.Cache {
return ob.blockCache
}

// WithBlockCache attaches a new block cache to the observer.
func (ob *Observer) WithBlockCache(cache *lru.Cache) *Observer {
ob.blockCache = cache
return ob
}

// HeaderCache returns the header cache for the observer.
func (ob *Observer) HeaderCache() *lru.Cache {
return ob.headerCache
}

// WithHeaderCache attaches a new header cache to the observer.
func (ob *Observer) WithHeaderCache(cache *lru.Cache) *Observer {
ob.headerCache = cache
return ob
}

// OutboundID returns a unique identifier for the outbound transaction.
// The identifier is now used as the key for maps that store outbound related data (e.g. transaction, receipt, etc).
func (ob *Observer) OutboundID(nonce uint64) string {
Expand All @@ -316,12 +261,6 @@ func (ob *Observer) DB() *db.DB {
return ob.db
}

// WithTelemetryServer attaches a new telemetry server to the observer.
func (ob *Observer) WithTelemetryServer(ts *metrics.TelemetryServer) *Observer {
ob.ts = ts
return ob
}

// TelemetryServer returns the telemetry server for the observer.
func (ob *Observer) TelemetryServer() *metrics.TelemetryServer {
return ob.ts
Expand All @@ -332,26 +271,6 @@ func (ob *Observer) Logger() *ObserverLogger {
return &ob.logger
}

// WithLogger attaches a new logger to the observer.
func (ob *Observer) WithLogger(logger Logger) *Observer {
chainLogger := logger.Std.
With().
Int64(logs.FieldChain, ob.chain.ChainId).
Str(logs.FieldChainNetwork, ob.chain.Network.String()).
Logger()

ob.logger = ObserverLogger{
Chain: chainLogger,
Inbound: chainLogger.With().Str(logs.FieldModule, logs.ModNameInbound).Logger(),
Outbound: chainLogger.With().Str(logs.FieldModule, logs.ModNameOutbound).Logger(),
GasPrice: chainLogger.With().Str(logs.FieldModule, logs.ModNameGasPrice).Logger(),
Headers: chainLogger.With().Str(logs.FieldModule, logs.ModNameHeaders).Logger(),
Compliance: logger.Compliance,
}

return ob
}

// Mu returns the mutex for the observer.
func (ob *Observer) Mu() *sync.Mutex {
return ob.mu
Expand Down Expand Up @@ -544,3 +463,24 @@ func EnvVarLatestBlockByChain(chain chains.Chain) string {
func EnvVarLatestTxByChain(chain chains.Chain) string {
return fmt.Sprintf("CHAIN_%d_SCAN_FROM_TX", chain.ChainId)
}

func newObserverLogger(chain chains.Chain, logger Logger) ObserverLogger {
withLogFields := func(l zerolog.Logger) zerolog.Logger {
return l.With().
Int64(logs.FieldChain, chain.ChainId).
Str(logs.FieldChainNetwork, chain.Network.String()).
Logger()
}

log := withLogFields(logger.Std)
complianceLog := withLogFields(logger.Compliance)

return ObserverLogger{
Chain: log,
Inbound: log.With().Str(logs.FieldModule, logs.ModNameInbound).Logger(),
Outbound: log.With().Str(logs.FieldModule, logs.ModNameOutbound).Logger(),
GasPrice: log.With().Str(logs.FieldModule, logs.ModNameGasPrice).Logger(),
Headers: log.With().Str(logs.FieldModule, logs.ModNameHeaders).Logger(),
Compliance: complianceLog,
}
}
Loading

0 comments on commit 7d4d99b

Please sign in to comment.