Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: integrated base Observer structure into existing EVM/BTC observers #2359

Merged
merged 32 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
61f7f02
save local new files to remote
ws4charlie Jun 4, 2024
695caa7
initiated base observer
ws4charlie Jun 4, 2024
1500138
move base to chains folder
ws4charlie Jun 4, 2024
2cdff62
moved logger to base package
ws4charlie Jun 5, 2024
d2d19f1
added base signer and logger
ws4charlie Jun 6, 2024
7b358d8
Merge branch 'develop' of https://github.com/zeta-chain/node into ref…
ws4charlie Jun 6, 2024
9da97f9
Merge branch 'develop' into refactor-base-signer-observer
ws4charlie Jun 17, 2024
f6784ee
added changelog entry
ws4charlie Jun 17, 2024
bad53bb
Merge branch 'develop' into refactor-base-signer-observer
ws4charlie Jun 17, 2024
13e6251
integrated base signer into evm/bitcoin; integrated base observer int…
ws4charlie Jun 19, 2024
b51ab8a
integrated base observer to evm and bitcoin chain
ws4charlie Jun 20, 2024
5edd452
Merge branch 'develop' of https://github.com/zeta-chain/node into ref…
ws4charlie Jun 20, 2024
7d1650b
added changelog entry
ws4charlie Jun 20, 2024
0cb55e1
cherry pick base Signer structure integration
ws4charlie Jun 20, 2024
2b4b01c
updated PR number in changelog
ws4charlie Jun 20, 2024
d882b05
updated PR number in changelog
ws4charlie Jun 20, 2024
921ca67
Merge branch 'develop' into refactor-integrate-base-signer
ws4charlie Jun 20, 2024
cdbf149
cherry picked the integration of base Observer
ws4charlie Jun 20, 2024
625e633
moved pure RPC methods to rpc package
ws4charlie Jun 20, 2024
df1dcbe
Merge branch 'develop' of https://github.com/zeta-chain/node into ref…
ws4charlie Jun 21, 2024
45d1a9c
moved Mutex to base Observer struct
ws4charlie Jun 21, 2024
642e54e
type check on cached block, header
ws4charlie Jun 21, 2024
72061f3
Merge branch 'develop' into refactor-integrate-base-observer
ws4charlie Jun 21, 2024
0ee1e60
update changelog PR number and added unit test for Stop() method
ws4charlie Jun 21, 2024
e6111eb
Merge branch 'develop' into refactor-integrate-base-observer
ws4charlie Jun 24, 2024
0652bf4
replace magic numbers with constants
ws4charlie Jun 24, 2024
f5914d8
updated method name in logging
ws4charlie Jun 24, 2024
44e99fa
Merge branch 'develop' of https://github.com/zeta-chain/node into ref…
ws4charlie Jun 24, 2024
a96898d
Move sqlite-mem db to testutils
swift1337 Jun 25, 2024
956f681
Merge branch 'develop' into refactor-integrate-base-observer
ws4charlie Jun 25, 2024
ab0c960
Add base signer's Lock & Unlock
swift1337 Jun 25, 2024
5925d10
Merge branch 'refactor-integrate-base-observer' of github.com:zeta-ch…
swift1337 Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* [2340](https://github.com/zeta-chain/node/pull/2340) - add ValidateInbound method for cctx orchestrator
* [2344](https://github.com/zeta-chain/node/pull/2344) - group common data of EVM/Bitcoin signer and observer using base structs
* [2357](https://github.com/zeta-chain/node/pull/2357) - integrate base Signer structure into EVM/Bitcoin Signer
* [2359](https://github.com/zeta-chain/node/pull/2357) - integrate base Observer structure into EVM/Bitcoin Observer
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved

### Tests

Expand Down
58 changes: 52 additions & 6 deletions cmd/zetaclientd/utils.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package main

import (
"fmt"

sdk "github.com/cosmos/cosmos-sdk/types"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"

"github.com/zeta-chain/zetacore/zetaclient/authz"
"github.com/zeta-chain/zetacore/zetaclient/chains/base"
btcobserver "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/observer"
btcrpc "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/rpc"
btcsigner "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/signer"
evmobserver "github.com/zeta-chain/zetacore/zetaclient/chains/evm/observer"
evmsigner "github.com/zeta-chain/zetacore/zetaclient/chains/evm/signer"
Expand Down Expand Up @@ -116,33 +120,75 @@ func CreateChainObserverMap(
logger base.Logger,
ts *metrics.TelemetryServer,
) (map[int64]interfaces.ChainObserver, error) {
zetacoreContext := appContext.ZetacoreContext()
observerMap := make(map[int64]interfaces.ChainObserver)
// EVM observers
for _, evmConfig := range appContext.Config().GetAllEVMConfigs() {
if evmConfig.Chain.IsZetaChain() {
continue
}
_, found := appContext.ZetacoreContext().GetEVMChainParams(evmConfig.Chain.ChainId)
chainParams, found := zetacoreContext.GetEVMChainParams(evmConfig.Chain.ChainId)
if !found {
logger.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String())
continue
}
co, err := evmobserver.NewObserver(appContext, zetacoreClient, tss, dbpath, logger, evmConfig, ts)

// create EVM client
evmClient, err := ethclient.Dial(evmConfig.Endpoint)
if err != nil {
logger.Std.Error().Err(err).Msgf("error dailing endpoint %s", evmConfig.Endpoint)
continue
}

// create EVM chain observer
co, err := evmobserver.NewObserver(
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved
evmConfig,
evmClient,
*chainParams,
zetacoreContext,
zetacoreClient,
tss,
dbpath,
logger,
ts,
)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewObserver error for evm chain %s", evmConfig.Chain.String())
continue
}
observerMap[evmConfig.Chain.ChainId] = co
}

// BTC observer
_, chainParams, found := zetacoreContext.GetBTCChainParams()
if !found {
return nil, fmt.Errorf("bitcoin chains params not found")
}

// create BTC chain observer
btcChain, btcConfig, enabled := appContext.GetBTCChainAndConfig()
if enabled {
co, err := btcobserver.NewObserver(appContext, btcChain, zetacoreClient, tss, dbpath, logger, btcConfig, ts)
btcClient, err := btcrpc.NewRPCClient(btcConfig)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewObserver error for bitcoin chain %s", btcChain.String())

logger.Std.Error().Err(err).Msgf("error creating rpc client for bitcoin chain %s", btcChain.String())
} else {
observerMap[btcChain.ChainId] = co
// create BTC chain observer
co, err := btcobserver.NewObserver(
btcChain,
btcClient,
*chainParams,
zetacoreContext,
zetacoreClient,
tss,
dbpath,
logger,
ts,
)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewObserver error for bitcoin chain %s", btcChain.String())
} else {
observerMap[btcChain.ChainId] = co
}
}
}

Expand Down
136 changes: 91 additions & 45 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"github.com/rs/zerolog"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"

"github.com/zeta-chain/zetacore/pkg/chains"
observertypes "github.com/zeta-chain/zetacore/x/observer/types"
Expand All @@ -28,9 +29,12 @@
// Cached blocks can be used to get block information and verify transactions
DefaultBlockCacheSize = 1000

// DefaultHeadersCacheSize is the default number of headers that the observer will keep in cache for performance (without RPC calls)
// DefaultHeaderCacheSize is the default number of headers that the observer will keep in cache for performance (without RPC calls)
// Cached headers can be used to get header information
DefaultHeadersCacheSize = 1000
DefaultHeaderCacheSize = 1000
swift1337 marked this conversation as resolved.
Show resolved Hide resolved

// TempSQLiteDBPath is the temporary in-memory SQLite database used for testing
TempSQLiteDBPath = "file::memory:?cache=shared"
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved
)

// Observer is the base structure for chain observers, grouping the common logic for each chain observer client.
Expand Down Expand Up @@ -84,8 +88,7 @@
zetacoreClient interfaces.ZetacoreClient,
tss interfaces.TSSSigner,
blockCacheSize int,
headersCacheSize int,
dbPath string,
headerCacheSize int,
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved
ts *metrics.TelemetryServer,
logger Logger,
) (*Observer, error) {
Expand All @@ -112,18 +115,25 @@
}

// create header cache
ob.headerCache, err = lru.New(headersCacheSize)
ob.headerCache, err = lru.New(headerCacheSize)
if err != nil {
return nil, errors.Wrap(err, "error creating header cache")
}

// open database
err = ob.OpenDB(dbPath)
return &ob, nil
}

// Stop notifies all goroutines to stop and closes the database.
func (ob *Observer) Stop() {
ob.logger.Chain.Info().Msgf("observer is stopping for chain %d", ob.Chain().ChainId)
close(ob.stop)

Check warning on line 129 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L127-L129

Added lines #L127 - L129 were not covered by tests

// close database
err := ob.CloseDB()

Check warning on line 132 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L132

Added line #L132 was not covered by tests
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error opening observer db for chain: %s", chain.ChainName))
ob.Logger().Chain.Error().Err(err).Msgf("CloseDB failed for chain %d", ob.Chain().ChainId)

Check warning on line 134 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L134

Added line #L134 was not covered by tests
}

return &ob, nil
ob.Logger().Chain.Info().Msgf("observer stopped for chain %d", ob.Chain().ChainId)

Check warning on line 136 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L136

Added line #L136 was not covered by tests
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved
}

// Chain returns the chain for the observer.
Expand Down Expand Up @@ -232,9 +242,20 @@
return ob.db
}

// WithTelemetryServer attaches a new telemetry server to the observer.
func (ob *Observer) WithTelemetryServer(ts *metrics.TelemetryServer) *Observer {
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
ob.ts = ts
return ob
}

// TelemetryServer returns the telemetry server for the observer.
func (ob *Observer) TelemetryServer() *metrics.TelemetryServer {
return ob.ts
}

// Logger returns the logger for the observer.
func (ob *Observer) Logger() ObserverLogger {
return ob.logger
func (ob *Observer) Logger() *ObserverLogger {
return &ob.logger
}

// WithLogger attaches a new logger to the observer.
Expand All @@ -251,45 +272,64 @@
return ob
}

// Stop returns the stop channel for the observer.
func (ob *Observer) Stop() chan struct{} {
// StopChannel returns the stop channel for the observer.
func (ob *Observer) StopChannel() chan struct{} {

Check warning on line 276 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L276

Added line #L276 was not covered by tests
return ob.stop
}

// OpenDB open sql database in the given path.
func (ob *Observer) OpenDB(dbPath string) error {
if dbPath != "" {
// create db path if not exist
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
err := os.MkdirAll(dbPath, os.ModePerm)
if err != nil {
return errors.Wrap(err, "error creating db path")
}
}

// open db by chain name
chainName := ob.chain.ChainName.String()
path := fmt.Sprintf("%s/%s", dbPath, chainName)
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{})
func (ob *Observer) OpenDB(dbPath string, dbName string) error {
// create db path if not exist
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
err := os.MkdirAll(dbPath, os.ModePerm)
if err != nil {
return errors.Wrap(err, "error opening db")
return errors.Wrapf(err, "error creating db path: %s", dbPath)
}
}

// migrate db
err = db.AutoMigrate(&clienttypes.ReceiptSQLType{},
&clienttypes.TransactionSQLType{},
&clienttypes.LastBlockSQLType{})
if err != nil {
return errors.Wrap(err, "error migrating db")
}
ob.db = db
// use custom dbName or chain name if not provided
if dbName == "" {
dbName = ob.chain.ChainName.String()
}
path := fmt.Sprintf("%s/%s", dbPath, dbName)

// use memory db if specified
if dbPath == TempSQLiteDBPath {
path = TempSQLiteDBPath
}

// open db
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{Logger: logger.Default.LogMode(logger.Silent)})
if err != nil {
return errors.Wrap(err, "error opening db")

Check warning on line 304 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L304

Added line #L304 was not covered by tests
}

// migrate db
err = db.AutoMigrate(&clienttypes.LastBlockSQLType{})
if err != nil {
return errors.Wrap(err, "error migrating db")

Check warning on line 310 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L310

Added line #L310 was not covered by tests
}
ob.db = db

return nil
}
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved

// CloseDB close the database.
func (ob *Observer) CloseDB() error {
dbInst, err := ob.db.DB()
if err != nil {
return fmt.Errorf("error getting database instance: %w", err)

Check warning on line 321 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L321

Added line #L321 was not covered by tests
}
err = dbInst.Close()
if err != nil {
return fmt.Errorf("error closing database: %w", err)

Check warning on line 325 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L325

Added line #L325 was not covered by tests
}
return nil
}

// LoadLastBlockScanned loads last scanned block from environment variable or from database.
// The last scanned block is the height from which the observer should continue scanning.
func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) (fromLatest bool, err error) {
func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) error {
// get environment variable
envvar := EnvVarLatestBlockByChain(ob.chain)
scanFromBlock := os.Getenv(envvar)
Expand All @@ -299,27 +339,33 @@
logger.Info().
Msgf("LoadLastBlockScanned: envvar %s is set; scan from block %s", envvar, scanFromBlock)
if scanFromBlock == EnvVarLatestBlock {
return true, nil
return nil
}
blockNumber, err := strconv.ParseUint(scanFromBlock, 10, 64)
if err != nil {
return false, err
return err
}
ob.WithLastBlockScanned(blockNumber)
return false, nil
return nil
}

// load from DB otherwise. If not found, start from latest block
blockNumber, err := ob.ReadLastBlockScannedFromDB()
if err != nil {
logger.Info().Msgf("LoadLastBlockScanned: chain %d starts scanning from latest block", ob.chain.ChainId)
return true, nil
logger.Info().Msgf("LoadLastBlockScanned: last scanned block not found in db for chain %d", ob.chain.ChainId)
return nil
}
ob.WithLastBlockScanned(blockNumber)
logger.Info().
Msgf("LoadLastBlockScanned: chain %d starts scanning from block %d", ob.chain.ChainId, ob.LastBlockScanned())

return false, nil
return nil
}

// SaveLastBlockScanned saves the last scanned block to memory and database.
func (ob *Observer) SaveLastBlockScanned(blockNumber uint64) error {
ob.WithLastBlockScanned(blockNumber)
return ob.WriteLastBlockScannedToDB(blockNumber)
}

// WriteLastBlockScannedToDB saves the last scanned block to the database.
Expand All @@ -339,5 +385,5 @@

// EnvVarLatestBlock returns the environment variable for the latest block by chain.
func EnvVarLatestBlockByChain(chain chains.Chain) string {
return chain.ChainName.String() + "_SCAN_FROM"
return fmt.Sprintf("CHAIN_%d_SCAN_FROM", chain.ChainId)
}
Loading
Loading