diff --git a/zetaclient/chains/evm/base/logger.go b/zetaclient/chains/evm/base/logger.go new file mode 100644 index 0000000000..c675450d2d --- /dev/null +++ b/zetaclient/chains/evm/base/logger.go @@ -0,0 +1,21 @@ +package base + +import "github.com/rs/zerolog" + +// ObserverLogger is the base logger for chain observers +type ObserverLogger struct { + // the parent logger for the chain observer + Chain zerolog.Logger + + // the logger for inbound transactions + Inbound zerolog.Logger + + // the logger for outbound transactions + Outbound zerolog.Logger + + // the logger for the chain's gas price + GasPrice zerolog.Logger + + // the logger for the compliance check + Compliance zerolog.Logger +} diff --git a/zetaclient/chains/evm/base/observer.go b/zetaclient/chains/evm/base/observer.go new file mode 100644 index 0000000000..29fc185256 --- /dev/null +++ b/zetaclient/chains/evm/base/observer.go @@ -0,0 +1,252 @@ +package base + +import ( + "fmt" + "os" + "strconv" + "sync/atomic" + + lru "github.com/hashicorp/golang-lru" + "github.com/pkg/errors" + "github.com/rs/zerolog" + "github.com/zeta-chain/zetacore/pkg/chains" + "github.com/zeta-chain/zetacore/zetaclient/context" + "github.com/zeta-chain/zetacore/zetaclient/metrics" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + + observertypes "github.com/zeta-chain/zetacore/x/observer/types" + "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" + clienttypes "github.com/zeta-chain/zetacore/zetaclient/types" +) + +// Observer is the base chain observer +type Observer struct { + // the external chain + chain chains.Chain + + // the external chain parameters + chainParams observertypes.ChainParams + + // zetacore context + zetacoreContext *context.ZetacoreContext + + // zetacore client + zetacoreClient interfaces.ZetacoreClient + + // tss signer + tss interfaces.TSSSigner + + // the latest block height of external chain + lastBlock uint64 + + // the last successfully scanned block height + lastBlockScanned uint64 + + // lru cache for chain blocks + blockCache *lru.Cache + + // observer database for persistency + db *gorm.DB + + // the channel to stop the observer + stop chan struct{} + + // telemetry server + ts *metrics.TelemetryServer +} + +// NewObserver creates a new base observer +func NewObserver( + chain chains.Chain, + chainParams observertypes.ChainParams, + zetacoreContext *context.ZetacoreContext, + zetacoreClient interfaces.ZetacoreClient, + tss interfaces.TSSSigner, + dbPath string, + ts *metrics.TelemetryServer, +) (*Observer, error) { + ob := Observer{ + chain: chain, + chainParams: chainParams, + zetacoreContext: zetacoreContext, + zetacoreClient: zetacoreClient, + tss: tss, + lastBlock: 0, + lastBlockScanned: 0, + stop: make(chan struct{}), + ts: ts, + } + + // create block cache + var err error + ob.blockCache, err = lru.New(1000) + if err != nil { + return nil, err + } + + // open database + err = ob.OpenDB(dbPath) + if err != nil { + return nil, err + } + + return &ob, nil +} + +// Chain returns the chain for the observer +func (ob *Observer) Chain() chains.Chain { + return ob.chain +} + +// WithChain attaches a new chain to the observer +func (ob *Observer) WithChain(chain chains.Chain) *Observer { + ob.chain = chain + return ob +} + +// ChainParams returns the chain params for the observer +func (ob *Observer) ChainParams() observertypes.ChainParams { + return ob.chainParams +} + +// WithChainParams attaches a new chain params to the observer +func (ob *Observer) WithChainParams(params observertypes.ChainParams) *Observer { + ob.chainParams = params + return ob +} + +// ZetacoreContext returns the zetacore context for the observer +func (ob *Observer) ZetacoreContext() *context.ZetacoreContext { + return ob.zetacoreContext +} + +// WithZetacoreClient attaches a new zetacore client to the observer +func (ob *Observer) WithZetacoreClient(client interfaces.ZetacoreClient) *Observer { + ob.zetacoreClient = client + return ob +} + +// GetLastBlock get external last block height +func (ob *Observer) GetLastBlock() uint64 { + return atomic.LoadUint64(&ob.lastBlock) +} + +// SetLastBlock set external last block height +func (ob *Observer) SetLastBlock(height uint64) { + atomic.StoreUint64(&ob.lastBlock, height) +} + +// GetLastBlockScanned get last block scanned (not necessarily caught up with external block; could be slow/paused) +func (ob *Observer) GetLastBlockScanned() uint64 { + height := atomic.LoadUint64(&ob.lastBlockScanned) + return height +} + +// SetLastBlockScanned set last block scanned (not necessarily caught up with external block; could be slow/paused) +func (ob *Observer) SetLastBlockScanned(blockNumber uint64) { + atomic.StoreUint64(&ob.lastBlockScanned, blockNumber) + metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.ChainName.String()).Set(float64(blockNumber)) +} + +// BlockCache returns the block cache for the observer +func (ob *Observer) BlockCache() *lru.Cache { + return ob.blockCache +} + +// WithBlockCache attaches a new block cache to the observer +func (ob *Observer) WithBlockCache(cache *lru.Cache) *Observer { + ob.blockCache = cache + return ob +} + +// Stop returns the stop channel for the observer +func (ob *Observer) Stop() chan struct{} { + return ob.stop +} + +// TelemetryServer returns the telemetry server for the observer +func (ob *Observer) TelemetryServer() *metrics.TelemetryServer { + return ob.ts +} + +// OpenDB open sql database in the given path +func (ob *Observer) OpenDB(dbPath string) error { + if dbPath != "" { + // create db path if not exist + if _, err := os.Stat(dbPath); os.IsNotExist(err) { + err := os.MkdirAll(dbPath, os.ModePerm) + if err != nil { + return errors.Wrap(err, "error creating db path") + } + } + + // open db by chain name + chainName := ob.chain.ChainName.String() + path := fmt.Sprintf("%s/%s", dbPath, chainName) + db, err := gorm.Open(sqlite.Open(path), &gorm.Config{}) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("error opening observer db for chain: %s", chainName)) + } + + // migrate db + err = db.AutoMigrate(&clienttypes.ReceiptSQLType{}, + &clienttypes.TransactionSQLType{}, + &clienttypes.LastBlockSQLType{}) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("error migrating observer db for chain: %s", chainName)) + } + ob.db = db + } + return nil +} + +// LoadLastBlockScanned loads last scanned block from environment variable or from database +// The last scanned block is the height from which the observer should start scanning for inbound transactions +func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) (fromLatest bool, err error) { + // get environment variable + envvar := ob.chain.ChainName.String() + "_SCAN_FROM" + scanFromBlock := os.Getenv(envvar) + + // load from environment variable if set + if scanFromBlock != "" { + logger.Info(). + Msgf("LoadLastBlockScanned: envvar %s is set; scan from block %s", envvar, scanFromBlock) + if scanFromBlock == clienttypes.EnvVarLatest { + return true, nil + } + blockNumber, err := strconv.ParseUint(scanFromBlock, 10, 64) + if err != nil { + return false, err + } + ob.SetLastBlockScanned(blockNumber) + return false, nil + } + + // load from DB otherwise. If not found, start from latest block + blockNumber, err := ob.ReadLastBlockScannedFromDB() + if err != nil { + logger.Info().Msgf("LoadLastBlockScanned: chain %d starts scanning from latest block", ob.chain.ChainId) + return true, nil + } + ob.SetLastBlockScanned(blockNumber) + logger.Info(). + Msgf("LoadLastBlockScanned: chain %d starts scanning from block %d", ob.chain.ChainId, ob.GetLastBlockScanned()) + + return false, nil +} + +// WriteLastBlockScannedToDB saves the last scanned block to the database +func (ob *Observer) WriteLastBlockScannedToDB(lastScannedBlock uint64) error { + return ob.db.Save(clienttypes.ToLastBlockSQLType(lastScannedBlock)).Error +} + +// ReadLastBlockScannedFromDB reads the last scanned block from the database +func (ob *Observer) ReadLastBlockScannedFromDB() (uint64, error) { + var lastBlock clienttypes.LastBlockSQLType + if err := ob.db.First(&lastBlock, clienttypes.LastBlockNumID).Error; err != nil { + // not found + return 0, err + } + return lastBlock.Num, nil +} diff --git a/zetaclient/chains/evm/base/observer_test.go b/zetaclient/chains/evm/base/observer_test.go new file mode 100644 index 0000000000..49e0890e7c --- /dev/null +++ b/zetaclient/chains/evm/base/observer_test.go @@ -0,0 +1,11 @@ +package base_test + +import ( + "testing" +) + +func TestNewObserver(t *testing.T) { +} + +func TestChain(t *testing.T) { +} diff --git a/zetaclient/chains/evm/base/signer.go b/zetaclient/chains/evm/base/signer.go new file mode 100644 index 0000000000..22f1111448 --- /dev/null +++ b/zetaclient/chains/evm/base/signer.go @@ -0,0 +1 @@ +package base