Skip to content

Commit

Permalink
initiated base observer
Browse files Browse the repository at this point in the history
  • Loading branch information
ws4charlie committed Jun 4, 2024
1 parent 61f7f02 commit 695caa7
Show file tree
Hide file tree
Showing 2 changed files with 259 additions and 48 deletions.
112 changes: 65 additions & 47 deletions zetaclient/chains/evm/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
clienttypes "github.com/zeta-chain/zetacore/zetaclient/types"
)

const (
// DefaultBlockCacheSize is the default size of the block cache
DefaultBlockCacheSize = 1000
)

// Observer is the base chain observer
type Observer struct {
// the external chain
Expand Down Expand Up @@ -63,6 +68,7 @@ func NewObserver(
zetacoreContext *context.ZetacoreContext,
zetacoreClient interfaces.ZetacoreClient,
tss interfaces.TSSSigner,
blockCacheSize int,
dbPath string,
ts *metrics.TelemetryServer,
) (*Observer, error) {
Expand All @@ -80,15 +86,15 @@ func NewObserver(

// create block cache
var err error
ob.blockCache, err = lru.New(1000)
ob.blockCache, err = lru.New(blockCacheSize)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "error creating block cache")
}

// open database
err = ob.OpenDB(dbPath)
if err != nil {
return nil, err
return nil, errors.Wrap(err, fmt.Sprintf("error opening observer db for chain: %s", chain.ChainName))
}

return &ob, nil
Expand Down Expand Up @@ -121,32 +127,44 @@ func (ob *Observer) ZetacoreContext() *context.ZetacoreContext {
return ob.zetacoreContext
}

// ZetacoreClient returns the zetacore client for the observer
func (ob *Observer) ZetacoreClient() interfaces.ZetacoreClient {
return ob.zetacoreClient
}

// WithZetacoreClient attaches a new zetacore client to the observer
func (ob *Observer) WithZetacoreClient(client interfaces.ZetacoreClient) *Observer {
ob.zetacoreClient = client
return ob
}

// GetLastBlock get external last block height
func (ob *Observer) GetLastBlock() uint64 {
// Tss returns the tss signer for the observer
func (ob *Observer) TSS() interfaces.TSSSigner {
return ob.tss
}

// LastBlock get external last block height
func (ob *Observer) LastBlock() uint64 {
return atomic.LoadUint64(&ob.lastBlock)
}

// SetLastBlock set external last block height
func (ob *Observer) SetLastBlock(height uint64) {
atomic.StoreUint64(&ob.lastBlock, height)
// WithLastBlock set external last block height
func (ob *Observer) WithLastBlock(lastBlock uint64) *Observer {
atomic.StoreUint64(&ob.lastBlock, lastBlock)
return ob
}

// GetLastBlockScanned get last block scanned (not necessarily caught up with external block; could be slow/paused)
func (ob *Observer) GetLastBlockScanned() uint64 {
// LastBlockScanned get last block scanned (not necessarily caught up with external block; could be slow/paused)
func (ob *Observer) LastBlockScanned() uint64 {
height := atomic.LoadUint64(&ob.lastBlockScanned)
return height
}

// SetLastBlockScanned set last block scanned (not necessarily caught up with external block; could be slow/paused)
func (ob *Observer) SetLastBlockScanned(blockNumber uint64) {
// WithLastBlockScanned set last block scanned (not necessarily caught up with external block; could be slow/paused)
func (ob *Observer) WithLastBlockScanned(blockNumber uint64) *Observer {
atomic.StoreUint64(&ob.lastBlockScanned, blockNumber)
metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.ChainName.String()).Set(float64(blockNumber))
return ob
}

// BlockCache returns the block cache for the observer
Expand All @@ -170,37 +188,6 @@ func (ob *Observer) TelemetryServer() *metrics.TelemetryServer {
return ob.ts
}

// OpenDB open sql database in the given path
func (ob *Observer) OpenDB(dbPath string) error {
if dbPath != "" {
// create db path if not exist
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
err := os.MkdirAll(dbPath, os.ModePerm)
if err != nil {
return errors.Wrap(err, "error creating db path")
}
}

// open db by chain name
chainName := ob.chain.ChainName.String()
path := fmt.Sprintf("%s/%s", dbPath, chainName)
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{})
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error opening observer db for chain: %s", chainName))
}

// migrate db
err = db.AutoMigrate(&clienttypes.ReceiptSQLType{},
&clienttypes.TransactionSQLType{},
&clienttypes.LastBlockSQLType{})
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error migrating observer db for chain: %s", chainName))
}
ob.db = db
}
return nil
}

// LoadLastBlockScanned loads last scanned block from environment variable or from database
// The last scanned block is the height from which the observer should start scanning for inbound transactions
func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) (fromLatest bool, err error) {
Expand All @@ -219,7 +206,7 @@ func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) (fromLatest bool
if err != nil {
return false, err
}
ob.SetLastBlockScanned(blockNumber)
ob.WithLastBlockScanned(blockNumber)
return false, nil
}

Expand All @@ -229,9 +216,9 @@ func (ob *Observer) LoadLastBlockScanned(logger zerolog.Logger) (fromLatest bool
logger.Info().Msgf("LoadLastBlockScanned: chain %d starts scanning from latest block", ob.chain.ChainId)
return true, nil
}
ob.SetLastBlockScanned(blockNumber)
ob.WithLastBlockScanned(blockNumber)
logger.Info().
Msgf("LoadLastBlockScanned: chain %d starts scanning from block %d", ob.chain.ChainId, ob.GetLastBlockScanned())
Msgf("LoadLastBlockScanned: chain %d starts scanning from block %d", ob.chain.ChainId, ob.LastBlockScanned())

return false, nil
}
Expand All @@ -245,8 +232,39 @@ func (ob *Observer) WriteLastBlockScannedToDB(lastScannedBlock uint64) error {
func (ob *Observer) ReadLastBlockScannedFromDB() (uint64, error) {
var lastBlock clienttypes.LastBlockSQLType
if err := ob.db.First(&lastBlock, clienttypes.LastBlockNumID).Error; err != nil {
// not found
// record not found
return 0, err
}
return lastBlock.Num, nil
}

// OpenDB open sql database in the given path
func (ob *Observer) OpenDB(dbPath string) error {
if dbPath != "" {
// create db path if not exist
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
err := os.MkdirAll(dbPath, os.ModePerm)
if err != nil {
return errors.Wrap(err, "error creating db path")
}
}

// open db by chain name
chainName := ob.chain.ChainName.String()
path := fmt.Sprintf("%s/%s", dbPath, chainName)
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{})
if err != nil {
return errors.Wrap(err, "error opening db")
}

// migrate db
err = db.AutoMigrate(&clienttypes.ReceiptSQLType{},
&clienttypes.TransactionSQLType{},
&clienttypes.LastBlockSQLType{})
if err != nil {
return errors.Wrap(err, "error migrating db")
}
ob.db = db
}
return nil
}
195 changes: 194 additions & 1 deletion zetaclient/chains/evm/base/observer_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,204 @@
package base_test

import (
"os"
"testing"

lru "github.com/hashicorp/golang-lru"
"github.com/stretchr/testify/require"
"github.com/zeta-chain/zetacore/pkg/chains"
"github.com/zeta-chain/zetacore/testutil/sample"
observertypes "github.com/zeta-chain/zetacore/x/observer/types"
"github.com/zeta-chain/zetacore/zetaclient/chains/evm/base"
"github.com/zeta-chain/zetacore/zetaclient/chains/interfaces"
"github.com/zeta-chain/zetacore/zetaclient/config"
"github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/testutils/mocks"
)

// create a temporary path for database
func tempDbPath(t *testing.T) string {
// Create a temporary file to get a unique name
tempPath, err := os.MkdirTemp("", "tempdb-")
require.NoError(t, err)
return tempPath
}

// createObserver creates a new observer for testing
func createObserver(t *testing.T) *base.Observer {
// constructor parameters
chain := chains.Ethereum
chainParams := *sample.ChainParams(chain.ChainId)
zetacoreContext := context.NewZetacoreContext(config.NewConfig())
zetacoreClient := mocks.NewMockZetacoreClient()
tss := mocks.NewTSSMainnet()
blockCacheSize := base.DefaultBlockCacheSize
dbPath := tempDbPath(t)

// create observer
ob, err := base.NewObserver(chain, chainParams, zetacoreContext, zetacoreClient, tss, blockCacheSize, dbPath, nil)
require.NoError(t, err)

return ob
}

func TestNewObserver(t *testing.T) {
// constructor parameters
chain := chains.Ethereum
chainParams := *sample.ChainParams(chain.ChainId)
zetacoreContext := context.NewZetacoreContext(config.NewConfig())
zetacoreClient := mocks.NewMockZetacoreClient()
tss := mocks.NewTSSMainnet()
blockCacheSize := base.DefaultBlockCacheSize
dbPath := tempDbPath(t)

// test cases
tests := []struct {
name string
chain chains.Chain
chainParams observertypes.ChainParams
zetacoreContext *context.ZetacoreContext
zetacoreClient interfaces.ZetacoreClient
tss interfaces.TSSSigner
blockCacheSize int
dbPath string
fail bool
message string
}{
{
name: "should be able to create new observer",
chain: chain,
chainParams: chainParams,
zetacoreContext: zetacoreContext,
zetacoreClient: zetacoreClient,
tss: tss,
blockCacheSize: blockCacheSize,
dbPath: dbPath,
fail: false,
},
{
name: "should return error on invalid block cache size",
chain: chain,
chainParams: chainParams,
zetacoreContext: zetacoreContext,
zetacoreClient: zetacoreClient,
tss: tss,
blockCacheSize: 0,
dbPath: dbPath,
fail: true,
message: "error creating block cache",
},
{
name: "should return error on invalid db path",
chain: chain,
chainParams: chainParams,
zetacoreContext: zetacoreContext,
zetacoreClient: zetacoreClient,
tss: tss,
blockCacheSize: blockCacheSize,
dbPath: "/invalid/123db",
fail: true,
message: "error opening observer db",
},
}

// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ob, err := base.NewObserver(tt.chain, tt.chainParams, tt.zetacoreContext, tt.zetacoreClient, tt.tss, tt.blockCacheSize, tt.dbPath, nil)
if tt.fail {
require.ErrorContains(t, err, tt.message)
require.Nil(t, ob)
return
}

require.NoError(t, err)
require.NotNil(t, ob)
})
}
}

func TestChain(t *testing.T) {
func TestObserverSetters(t *testing.T) {
t.Run("should be able to update chain", func(t *testing.T) {
ob := createObserver(t)

// update chain
newChain := chains.BscMainnet
ob = ob.WithChain(chains.BscMainnet)
require.Equal(t, newChain, ob.Chain())
})
t.Run("should be able to update chain params", func(t *testing.T) {
ob := createObserver(t)

// update chain params
newChainParams := *sample.ChainParams(chains.BscMainnet.ChainId)
ob = ob.WithChainParams(newChainParams)
require.True(t, observertypes.ChainParamsEqual(newChainParams, ob.ChainParams()))
})
t.Run("should be able to update zetacore client", func(t *testing.T) {
ob := createObserver(t)

// update zetacore client
newZetacoreClient := mocks.NewMockZetacoreClient()
ob = ob.WithZetacoreClient(newZetacoreClient)
require.Equal(t, newZetacoreClient, ob.ZetacoreClient())
})
t.Run("should be able to update last block", func(t *testing.T) {
ob := createObserver(t)

// update last block
newLastBlock := uint64(100)
ob = ob.WithLastBlock(newLastBlock)
require.Equal(t, newLastBlock, ob.LastBlock())
})
t.Run("should be able to update last block scanned", func(t *testing.T) {
ob := createObserver(t)

// update last block scanned
newLastBlockScanned := uint64(100)
ob = ob.WithLastBlockScanned(newLastBlockScanned)
require.Equal(t, newLastBlockScanned, ob.LastBlockScanned())
})
t.Run("should be able to update block cache", func(t *testing.T) {
ob := createObserver(t)

// update block cache
newBlockCache, err := lru.New(200)
require.NoError(t, err)

ob = ob.WithBlockCache(newBlockCache)
require.Equal(t, newBlockCache, ob.BlockCache())
})
}

func TestOpenDB(t *testing.T) {
ob := createObserver(t)
dbPath := tempDbPath(t)

t.Run("should be able to open db", func(t *testing.T) {
err := ob.OpenDB(dbPath)
require.NoError(t, err)
})
t.Run("should return error on invalid db path", func(t *testing.T) {
err := ob.OpenDB("/invalid/123db")
require.ErrorContains(t, err, "error creating db path")
})
}

func TestReadWriteLastBlockScannedToDB(t *testing.T) {
t.Run("should be able to write and read last block scanned to db", func(t *testing.T) {
ob := createObserver(t)
err := ob.WriteLastBlockScannedToDB(100)
require.NoError(t, err)

lastBlockScanned, err := ob.ReadLastBlockScannedFromDB()
require.NoError(t, err)
require.EqualValues(t, 100, lastBlockScanned)
})
t.Run("should return error when last block scanned not found in db", func(t *testing.T) {
ob := createObserver(t)
lastScannedBlock, err := ob.ReadLastBlockScannedFromDB()
require.Error(t, err)
require.Zero(t, lastScannedBlock)
})
}

0 comments on commit 695caa7

Please sign in to comment.