diff --git a/zetaclient/chains/evm/base/observer.go b/zetaclient/chains/evm/base/observer.go index 29fc185256..1865560fd5 100644 --- a/zetaclient/chains/evm/base/observer.go +++ b/zetaclient/chains/evm/base/observer.go @@ -20,6 +20,11 @@ import ( clienttypes "github.com/zeta-chain/zetacore/zetaclient/types" ) +const ( + // DefaultBlockCacheSize is the default size of the block cache + DefaultBlockCacheSize = 1000 +) + // Observer is the base chain observer type Observer struct { // the external chain @@ -63,6 +68,7 @@ func NewObserver( zetacoreContext *context.ZetacoreContext, zetacoreClient interfaces.ZetacoreClient, tss interfaces.TSSSigner, + blockCacheSize int, dbPath string, ts *metrics.TelemetryServer, ) (*Observer, error) { @@ -80,15 +86,15 @@ func NewObserver( // create block cache var err error - ob.blockCache, err = lru.New(1000) + ob.blockCache, err = lru.New(blockCacheSize) if err != nil { - return nil, err + return nil, errors.Wrap(err, "error creating block cache") } // open database err = ob.OpenDB(dbPath) if err != nil { - return nil, err + return nil, errors.Wrap(err, fmt.Sprintf("error opening observer db for chain: %s", chain.ChainName)) } return &ob, nil @@ -121,32 +127,44 @@ func (ob *Observer) ZetacoreContext() *context.ZetacoreContext { return ob.zetacoreContext } +// ZetacoreClient returns the zetacore client for the observer +func (ob *Observer) ZetacoreClient() interfaces.ZetacoreClient { + return ob.zetacoreClient +} + // WithZetacoreClient attaches a new zetacore client to the observer func (ob *Observer) WithZetacoreClient(client interfaces.ZetacoreClient) *Observer { ob.zetacoreClient = client return ob } -// GetLastBlock get external last block height -func (ob *Observer) GetLastBlock() uint64 { +// Tss returns the tss signer for the observer +func (ob *Observer) TSS() interfaces.TSSSigner { + return ob.tss +} + +// LastBlock get external last block height +func (ob *Observer) LastBlock() uint64 { return atomic.LoadUint64(&ob.lastBlock) } -// SetLastBlock set external last block height -func (ob *Observer) SetLastBlock(height uint64) { - atomic.StoreUint64(&ob.lastBlock, height) +// WithLastBlock set external last block height +func (ob *Observer) WithLastBlock(lastBlock uint64) *Observer { + atomic.StoreUint64(&ob.lastBlock, lastBlock) + return ob } -// GetLastBlockScanned get last block scanned (not necessarily caught up with external block; could be slow/paused) -func (ob *Observer) GetLastBlockScanned() uint64 { +// LastBlockScanned get last block scanned (not necessarily caught up with external block; could be slow/paused) +func (ob *Observer) LastBlockScanned() uint64 { height := atomic.LoadUint64(&ob.lastBlockScanned) return height } -// SetLastBlockScanned set last block scanned (not necessarily caught up with external block; could be slow/paused) -func (ob *Observer) SetLastBlockScanned(blockNumber uint64) { +// WithLastBlockScanned set last block scanned (not necessarily caught up with external block; could be slow/paused) +func (ob *Observer) WithLastBlockScanned(blockNumber uint64) *Observer { atomic.StoreUint64(&ob.lastBlockScanned, blockNumber) metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.ChainName.String()).Set(float64(blockNumber)) + return ob } // BlockCache returns the block cache for the observer @@ -170,37 +188,6 @@ func (ob *Observer) TelemetryServer() *metrics.TelemetryServer { return ob.ts } -// OpenDB open sql database in the given path -func (ob *Observer) OpenDB(dbPath string) error { - if dbPath != "" { - // create db path if not exist - if _, err := os.Stat(dbPath); os.IsNotExist(err) { - err := os.MkdirAll(dbPath, os.ModePerm) - if err != nil { - return errors.Wrap(err, "error creating db path") - } - } - - // open db by chain name - chainName := ob.chain.ChainName.String() - path := fmt.Sprintf("%s/%s", dbPath, chainName) - db, err := gorm.Open(sqlite.Open(path), &gorm.Config{}) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("error opening observer db for chain: %s", chainName)) - } - - // migrate db - err = db.AutoMigrate(&clienttypes.ReceiptSQLType{}, - &clienttypes.TransactionSQLType{}, - &clienttypes.LastBlockSQLType{}) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("error migrating observer db for chain: %s", chainName)) - } - ob.db = db - } - return nil -} - // LoadLastBlockScanned loads last scanned block from environment variable or from database // The last scanned block is the height from which the observer should start scanning for inbound transactions func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) (fromLatest bool, err error) { @@ -219,7 +206,7 @@ func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) (fromLatest bool if err != nil { return false, err } - ob.SetLastBlockScanned(blockNumber) + ob.WithLastBlockScanned(blockNumber) return false, nil } @@ -229,9 +216,9 @@ func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) (fromLatest bool logger.Info().Msgf("LoadLastBlockScanned: chain %d starts scanning from latest block", ob.chain.ChainId) return true, nil } - ob.SetLastBlockScanned(blockNumber) + ob.WithLastBlockScanned(blockNumber) logger.Info(). - Msgf("LoadLastBlockScanned: chain %d starts scanning from block %d", ob.chain.ChainId, ob.GetLastBlockScanned()) + Msgf("LoadLastBlockScanned: chain %d starts scanning from block %d", ob.chain.ChainId, ob.LastBlockScanned()) return false, nil } @@ -245,8 +232,39 @@ func (ob *Observer) WriteLastBlockScannedToDB(lastScannedBlock uint64) error { func (ob *Observer) ReadLastBlockScannedFromDB() (uint64, error) { var lastBlock clienttypes.LastBlockSQLType if err := ob.db.First(&lastBlock, clienttypes.LastBlockNumID).Error; err != nil { - // not found + // record not found return 0, err } return lastBlock.Num, nil } + +// OpenDB open sql database in the given path +func (ob *Observer) OpenDB(dbPath string) error { + if dbPath != "" { + // create db path if not exist + if _, err := os.Stat(dbPath); os.IsNotExist(err) { + err := os.MkdirAll(dbPath, os.ModePerm) + if err != nil { + return errors.Wrap(err, "error creating db path") + } + } + + // open db by chain name + chainName := ob.chain.ChainName.String() + path := fmt.Sprintf("%s/%s", dbPath, chainName) + db, err := gorm.Open(sqlite.Open(path), &gorm.Config{}) + if err != nil { + return errors.Wrap(err, "error opening db") + } + + // migrate db + err = db.AutoMigrate(&clienttypes.ReceiptSQLType{}, + &clienttypes.TransactionSQLType{}, + &clienttypes.LastBlockSQLType{}) + if err != nil { + return errors.Wrap(err, "error migrating db") + } + ob.db = db + } + return nil +} diff --git a/zetaclient/chains/evm/base/observer_test.go b/zetaclient/chains/evm/base/observer_test.go index 49e0890e7c..402995367f 100644 --- a/zetaclient/chains/evm/base/observer_test.go +++ b/zetaclient/chains/evm/base/observer_test.go @@ -1,11 +1,204 @@ package base_test import ( + "os" "testing" + + lru "github.com/hashicorp/golang-lru" + "github.com/stretchr/testify/require" + "github.com/zeta-chain/zetacore/pkg/chains" + "github.com/zeta-chain/zetacore/testutil/sample" + observertypes "github.com/zeta-chain/zetacore/x/observer/types" + "github.com/zeta-chain/zetacore/zetaclient/chains/evm/base" + "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" + "github.com/zeta-chain/zetacore/zetaclient/config" + "github.com/zeta-chain/zetacore/zetaclient/context" + "github.com/zeta-chain/zetacore/zetaclient/testutils/mocks" ) +// create a temporary path for database +func tempDbPath(t *testing.T) string { + // Create a temporary file to get a unique name + tempPath, err := os.MkdirTemp("", "tempdb-") + require.NoError(t, err) + return tempPath +} + +// createObserver creates a new observer for testing +func createObserver(t *testing.T) *base.Observer { + // constructor parameters + chain := chains.Ethereum + chainParams := *sample.ChainParams(chain.ChainId) + zetacoreContext := context.NewZetacoreContext(config.NewConfig()) + zetacoreClient := mocks.NewMockZetacoreClient() + tss := mocks.NewTSSMainnet() + blockCacheSize := base.DefaultBlockCacheSize + dbPath := tempDbPath(t) + + // create observer + ob, err := base.NewObserver(chain, chainParams, zetacoreContext, zetacoreClient, tss, blockCacheSize, dbPath, nil) + require.NoError(t, err) + + return ob +} + func TestNewObserver(t *testing.T) { + // constructor parameters + chain := chains.Ethereum + chainParams := *sample.ChainParams(chain.ChainId) + zetacoreContext := context.NewZetacoreContext(config.NewConfig()) + zetacoreClient := mocks.NewMockZetacoreClient() + tss := mocks.NewTSSMainnet() + blockCacheSize := base.DefaultBlockCacheSize + dbPath := tempDbPath(t) + + // test cases + tests := []struct { + name string + chain chains.Chain + chainParams observertypes.ChainParams + zetacoreContext *context.ZetacoreContext + zetacoreClient interfaces.ZetacoreClient + tss interfaces.TSSSigner + blockCacheSize int + dbPath string + fail bool + message string + }{ + { + name: "should be able to create new observer", + chain: chain, + chainParams: chainParams, + zetacoreContext: zetacoreContext, + zetacoreClient: zetacoreClient, + tss: tss, + blockCacheSize: blockCacheSize, + dbPath: dbPath, + fail: false, + }, + { + name: "should return error on invalid block cache size", + chain: chain, + chainParams: chainParams, + zetacoreContext: zetacoreContext, + zetacoreClient: zetacoreClient, + tss: tss, + blockCacheSize: 0, + dbPath: dbPath, + fail: true, + message: "error creating block cache", + }, + { + name: "should return error on invalid db path", + chain: chain, + chainParams: chainParams, + zetacoreContext: zetacoreContext, + zetacoreClient: zetacoreClient, + tss: tss, + blockCacheSize: blockCacheSize, + dbPath: "/invalid/123db", + fail: true, + message: "error opening observer db", + }, + } + + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ob, err := base.NewObserver(tt.chain, tt.chainParams, tt.zetacoreContext, tt.zetacoreClient, tt.tss, tt.blockCacheSize, tt.dbPath, nil) + if tt.fail { + require.ErrorContains(t, err, tt.message) + require.Nil(t, ob) + return + } + + require.NoError(t, err) + require.NotNil(t, ob) + }) + } } -func TestChain(t *testing.T) { +func TestObserverSetters(t *testing.T) { + t.Run("should be able to update chain", func(t *testing.T) { + ob := createObserver(t) + + // update chain + newChain := chains.BscMainnet + ob = ob.WithChain(chains.BscMainnet) + require.Equal(t, newChain, ob.Chain()) + }) + t.Run("should be able to update chain params", func(t *testing.T) { + ob := createObserver(t) + + // update chain params + newChainParams := *sample.ChainParams(chains.BscMainnet.ChainId) + ob = ob.WithChainParams(newChainParams) + require.True(t, observertypes.ChainParamsEqual(newChainParams, ob.ChainParams())) + }) + t.Run("should be able to update zetacore client", func(t *testing.T) { + ob := createObserver(t) + + // update zetacore client + newZetacoreClient := mocks.NewMockZetacoreClient() + ob = ob.WithZetacoreClient(newZetacoreClient) + require.Equal(t, newZetacoreClient, ob.ZetacoreClient()) + }) + t.Run("should be able to update last block", func(t *testing.T) { + ob := createObserver(t) + + // update last block + newLastBlock := uint64(100) + ob = ob.WithLastBlock(newLastBlock) + require.Equal(t, newLastBlock, ob.LastBlock()) + }) + t.Run("should be able to update last block scanned", func(t *testing.T) { + ob := createObserver(t) + + // update last block scanned + newLastBlockScanned := uint64(100) + ob = ob.WithLastBlockScanned(newLastBlockScanned) + require.Equal(t, newLastBlockScanned, ob.LastBlockScanned()) + }) + t.Run("should be able to update block cache", func(t *testing.T) { + ob := createObserver(t) + + // update block cache + newBlockCache, err := lru.New(200) + require.NoError(t, err) + + ob = ob.WithBlockCache(newBlockCache) + require.Equal(t, newBlockCache, ob.BlockCache()) + }) +} + +func TestOpenDB(t *testing.T) { + ob := createObserver(t) + dbPath := tempDbPath(t) + + t.Run("should be able to open db", func(t *testing.T) { + err := ob.OpenDB(dbPath) + require.NoError(t, err) + }) + t.Run("should return error on invalid db path", func(t *testing.T) { + err := ob.OpenDB("/invalid/123db") + require.ErrorContains(t, err, "error creating db path") + }) +} + +func TestReadWriteLastBlockScannedToDB(t *testing.T) { + t.Run("should be able to write and read last block scanned to db", func(t *testing.T) { + ob := createObserver(t) + err := ob.WriteLastBlockScannedToDB(100) + require.NoError(t, err) + + lastBlockScanned, err := ob.ReadLastBlockScannedFromDB() + require.NoError(t, err) + require.EqualValues(t, 100, lastBlockScanned) + }) + t.Run("should return error when last block scanned not found in db", func(t *testing.T) { + ob := createObserver(t) + lastScannedBlock, err := ob.ReadLastBlockScannedFromDB() + require.Error(t, err) + require.Zero(t, lastScannedBlock) + }) }