Skip to content

Commit

Permalink
added base signer and logger
Browse files Browse the repository at this point in the history
  • Loading branch information
ws4charlie committed Jun 6, 2024
1 parent 2cdff62 commit d2d19f1
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 115 deletions.
5 changes: 4 additions & 1 deletion zetaclient/chains/base/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func DefaultLogger() Logger {
}
}

// ObserverLogger is the base logger for chain observers
// ObserverLogger contains the loggers for chain observers
type ObserverLogger struct {
// the parent logger for the chain observer
Chain zerolog.Logger
Expand All @@ -43,6 +43,9 @@ type ObserverLogger struct {
// the logger for the chain's gas price
GasPrice zerolog.Logger

// the logger for block headers
Headers zerolog.Logger

// the logger for the compliance check
Compliance zerolog.Logger
}
Expand Down
181 changes: 111 additions & 70 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,50 @@ import (
)

const (
// EnvVarLatestBlock is the environment variable to indicate latest block height
EnvVarLatestBlock = "latest"

// DefaultBlockCacheSize is the default size of the block cache
DefaultBlockCacheSize = 1000
)

// Observer is the base chain observer for external chains
// Observer is the base observer.
type Observer struct {
// the external chain
// chain contains static information about the observed chain
chain chains.Chain

// the external chain parameters
// chainParams contains the dynamic chain parameters of the observed chain
chainParams observertypes.ChainParams

// zetacore context
// coreContext contains context data of ZetaChain
zetacoreContext *context.ZetacoreContext

// zetacore client
// zetacoreClient is the client to interact with ZetaChain
zetacoreClient interfaces.ZetacoreClient

// tss signer
// tss is the TSS signer
tss interfaces.TSSSigner

// the latest block height of external chain
// lastBlock is the last block height of the observed chain
lastBlock uint64

// the last successfully scanned block height
// lastBlockScanned is the last block height scanned by the observer
lastBlockScanned uint64

// lru cache for chain blocks
// blockCache is the cache for blocks
blockCache *lru.Cache

// observer database for persistency
// db is the database to persist data
db *gorm.DB

// the channel to stop the observer
stop chan struct{}
// logger contains the loggers used by observer
logger ObserverLogger

// telemetry server
ts *metrics.TelemetryServer
// stop is the channel to signal the observer to stop
stop chan struct{}
}

// NewObserver creates a new base observer
// NewObserver creates a new base observer.
func NewObserver(
chain chains.Chain,
chainParams observertypes.ChainParams,
Expand All @@ -70,7 +73,7 @@ func NewObserver(
tss interfaces.TSSSigner,
blockCacheSize int,
dbPath string,
ts *metrics.TelemetryServer,
logger Logger,
) (*Observer, error) {
ob := Observer{
chain: chain,
Expand All @@ -81,9 +84,11 @@ func NewObserver(
lastBlock: 0,
lastBlockScanned: 0,
stop: make(chan struct{}),
ts: ts,
}

// setup loggers
ob.WithLogger(logger)

// create block cache
var err error
ob.blockCache, err = lru.New(blockCacheSize)
Expand All @@ -100,106 +105,168 @@ func NewObserver(
return &ob, nil
}

// Chain returns the chain for the observer
// Chain returns the chain for the observer.
func (ob *Observer) Chain() chains.Chain {
return ob.chain
}

// WithChain attaches a new chain to the observer
// WithChain attaches a new chain to the observer.
func (ob *Observer) WithChain(chain chains.Chain) *Observer {
ob.chain = chain
return ob
}

// ChainParams returns the chain params for the observer
// ChainParams returns the chain params for the observer.
func (ob *Observer) ChainParams() observertypes.ChainParams {
return ob.chainParams
}

// WithChainParams attaches a new chain params to the observer
// WithChainParams attaches a new chain params to the observer.
func (ob *Observer) WithChainParams(params observertypes.ChainParams) *Observer {
ob.chainParams = params
return ob
}

// ZetacoreContext returns the zetacore context for the observer
// ZetacoreContext returns the zetacore context for the observer.
func (ob *Observer) ZetacoreContext() *context.ZetacoreContext {
return ob.zetacoreContext
}

// ZetacoreClient returns the zetacore client for the observer
// WithZetacoreContext attaches a new zetacore context to the observer.
func (ob *Observer) WithZetacoreContext(context *context.ZetacoreContext) *Observer {
ob.zetacoreContext = context
return ob
}

// ZetacoreClient returns the zetacore client for the observer.
func (ob *Observer) ZetacoreClient() interfaces.ZetacoreClient {
return ob.zetacoreClient
}

// WithZetacoreClient attaches a new zetacore client to the observer
// WithZetacoreClient attaches a new zetacore client to the observer.
func (ob *Observer) WithZetacoreClient(client interfaces.ZetacoreClient) *Observer {
ob.zetacoreClient = client
return ob
}

// Tss returns the tss signer for the observer
// Tss returns the tss signer for the observer.
func (ob *Observer) TSS() interfaces.TSSSigner {
return ob.tss
}

// LastBlock get external last block height
// WithTSS attaches a new tss signer to the observer.
func (ob *Observer) WithTSS(tss interfaces.TSSSigner) *Observer {
ob.tss = tss
return ob
}

// LastBlock get external last block height.
func (ob *Observer) LastBlock() uint64 {
return atomic.LoadUint64(&ob.lastBlock)
}

// WithLastBlock set external last block height
// WithLastBlock set external last block height.
func (ob *Observer) WithLastBlock(lastBlock uint64) *Observer {
atomic.StoreUint64(&ob.lastBlock, lastBlock)
return ob
}

// LastBlockScanned get last block scanned (not necessarily caught up with external block; could be slow/paused)
// LastBlockScanned get last block scanned (not necessarily caught up with the chain; could be slow/paused).
func (ob *Observer) LastBlockScanned() uint64 {
height := atomic.LoadUint64(&ob.lastBlockScanned)
return height
}

// WithLastBlockScanned set last block scanned (not necessarily caught up with external block; could be slow/paused)
// WithLastBlockScanned set last block scanned (not necessarily caught up with the chain; could be slow/paused).
func (ob *Observer) WithLastBlockScanned(blockNumber uint64) *Observer {
atomic.StoreUint64(&ob.lastBlockScanned, blockNumber)
metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.ChainName.String()).Set(float64(blockNumber))
return ob
}

// BlockCache returns the block cache for the observer
// BlockCache returns the block cache for the observer.
func (ob *Observer) BlockCache() *lru.Cache {
return ob.blockCache
}

// WithBlockCache attaches a new block cache to the observer
// WithBlockCache attaches a new block cache to the observer.
func (ob *Observer) WithBlockCache(cache *lru.Cache) *Observer {
ob.blockCache = cache
return ob
}

// Stop returns the stop channel for the observer
// DB returns the database for the observer.
func (ob *Observer) DB() *gorm.DB {
return ob.db
}

// Logger returns the logger for the observer.
func (ob *Observer) Logger() ObserverLogger {
return ob.logger
}

// WithLogger attaches a new logger to the observer.
func (ob *Observer) WithLogger(logger Logger) *Observer {
chainLogger := logger.Std.With().Int64("chain", ob.chain.ChainId).Logger()
ob.logger = ObserverLogger{
Chain: chainLogger,
Inbound: chainLogger.With().Str("module", "inbound").Logger(),
Outbound: chainLogger.With().Str("module", "outbound").Logger(),
GasPrice: chainLogger.With().Str("module", "gasprice").Logger(),
Headers: chainLogger.With().Str("module", "headers").Logger(),
Compliance: logger.Compliance,
}
return ob
}

// Stop returns the stop channel for the observer.
func (ob *Observer) Stop() chan struct{} {
return ob.stop
}

// TelemetryServer returns the telemetry server for the observer
func (ob *Observer) TelemetryServer() *metrics.TelemetryServer {
return ob.ts
// OpenDB open sql database in the given path.
func (ob *Observer) OpenDB(dbPath string) error {
if dbPath != "" {
// create db path if not exist
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
err := os.MkdirAll(dbPath, os.ModePerm)
if err != nil {
return errors.Wrap(err, "error creating db path")
}
}

// open db by chain name
chainName := ob.chain.ChainName.String()
path := fmt.Sprintf("%s/%s", dbPath, chainName)
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{})
if err != nil {
return errors.Wrap(err, "error opening db")
}

// migrate db
err = db.AutoMigrate(&clienttypes.ReceiptSQLType{},
&clienttypes.TransactionSQLType{},
&clienttypes.LastBlockSQLType{})
if err != nil {
return errors.Wrap(err, "error migrating db")
}
ob.db = db
}
return nil
}

// LoadLastBlockScanned loads last scanned block from environment variable or from database
// The last scanned block is the height from which the observer should start scanning for inbound transactions
// LoadLastBlockScanned loads last scanned block from environment variable or from database.
// The last scanned block is the height from which the observer should continue scanning.
func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) (fromLatest bool, err error) {
// get environment variable
envvar := ob.chain.ChainName.String() + "_SCAN_FROM"
envvar := EnvVarLatestBlockByChain(ob.chain)
scanFromBlock := os.Getenv(envvar)

// load from environment variable if set
if scanFromBlock != "" {
logger.Info().
Msgf("LoadLastBlockScanned: envvar %s is set; scan from block %s", envvar, scanFromBlock)
if scanFromBlock == clienttypes.EnvVarLatest {
if scanFromBlock == EnvVarLatestBlock {
return true, nil
}
blockNumber, err := strconv.ParseUint(scanFromBlock, 10, 64)
Expand All @@ -223,12 +290,12 @@ func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) (fromLatest bool
return false, nil
}

// WriteLastBlockScannedToDB saves the last scanned block to the database
// WriteLastBlockScannedToDB saves the last scanned block to the database.
func (ob *Observer) WriteLastBlockScannedToDB(lastScannedBlock uint64) error {
return ob.db.Save(clienttypes.ToLastBlockSQLType(lastScannedBlock)).Error
}

// ReadLastBlockScannedFromDB reads the last scanned block from the database
// ReadLastBlockScannedFromDB reads the last scanned block from the database.
func (ob *Observer) ReadLastBlockScannedFromDB() (uint64, error) {
var lastBlock clienttypes.LastBlockSQLType
if err := ob.db.First(&lastBlock, clienttypes.LastBlockNumID).Error; err != nil {
Expand All @@ -238,33 +305,7 @@ func (ob *Observer) ReadLastBlockScannedFromDB() (uint64, error) {
return lastBlock.Num, nil
}

// OpenDB open sql database in the given path
func (ob *Observer) OpenDB(dbPath string) error {
if dbPath != "" {
// create db path if not exist
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
err := os.MkdirAll(dbPath, os.ModePerm)
if err != nil {
return errors.Wrap(err, "error creating db path")
}
}

// open db by chain name
chainName := ob.chain.ChainName.String()
path := fmt.Sprintf("%s/%s", dbPath, chainName)
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{})
if err != nil {
return errors.Wrap(err, "error opening db")
}

// migrate db
err = db.AutoMigrate(&clienttypes.ReceiptSQLType{},
&clienttypes.TransactionSQLType{},
&clienttypes.LastBlockSQLType{})
if err != nil {
return errors.Wrap(err, "error migrating db")
}
ob.db = db
}
return nil
// EnvVarLatestBlock returns the environment variable for the latest block by chain.
func EnvVarLatestBlockByChain(chain chains.Chain) string {
return chain.ChainName.String() + "_SCAN_FROM"
}
Loading

0 comments on commit d2d19f1

Please sign in to comment.