From 74181036c8ccf32520db082d9c6ffa37d334c0fb Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Mon, 17 Jun 2024 23:54:29 -0500 Subject: [PATCH] added metrics server to base signer/observer --- zetaclient/chains/base/observer.go | 29 ++++++ zetaclient/chains/base/observer_test.go | 115 +++++++++++++++--------- zetaclient/chains/base/signer.go | 21 ++++- zetaclient/chains/base/signer_test.go | 11 ++- 4 files changed, 131 insertions(+), 45 deletions(-) diff --git a/zetaclient/chains/base/observer.go b/zetaclient/chains/base/observer.go index 0f148b6a87..07603a8517 100644 --- a/zetaclient/chains/base/observer.go +++ b/zetaclient/chains/base/observer.go @@ -26,6 +26,9 @@ const ( // DefaultBlockCacheSize is the default size of the block cache DefaultBlockCacheSize = 1000 + + // DefaultHeadersCacheSize is the default size of the headers cache + DefaultHeadersCacheSize = 1000 ) // Observer is the base observer. @@ -54,9 +57,15 @@ type Observer struct { // blockCache is the cache for blocks blockCache *lru.Cache + // headerCache is the cache for headers + headerCache *lru.Cache + // db is the database to persist data db *gorm.DB + // ts is the telemetry server for metrics + ts *metrics.TelemetryServer + // logger contains the loggers used by observer logger ObserverLogger @@ -72,7 +81,9 @@ func NewObserver( zetacoreClient interfaces.ZetacoreClient, tss interfaces.TSSSigner, blockCacheSize int, + headersCacheSize int, dbPath string, + ts *metrics.TelemetryServer, logger Logger, ) (*Observer, error) { ob := Observer{ @@ -83,6 +94,7 @@ func NewObserver( tss: tss, lastBlock: 0, lastBlockScanned: 0, + ts: ts, stop: make(chan struct{}), } @@ -96,6 +108,12 @@ func NewObserver( return nil, errors.Wrap(err, "error creating block cache") } + // create header cache + ob.headerCache, err = lru.New(headersCacheSize) + if err != nil { + return nil, errors.Wrap(err, "error creating header cache") + } + // open database err = ob.OpenDB(dbPath) if err != nil { @@ -195,6 +213,17 @@ func (ob *Observer) WithBlockCache(cache *lru.Cache) *Observer { return ob } +// HeaderCache returns the header cache for the observer. +func (ob *Observer) HeaderCache() *lru.Cache { + return ob.headerCache +} + +// WithHeaderCache attaches a new header cache to the observer. +func (ob *Observer) WithHeaderCache(cache *lru.Cache) *Observer { + ob.headerCache = cache + return ob +} + // DB returns the database for the observer. func (ob *Observer) DB() *gorm.DB { return ob.db diff --git a/zetaclient/chains/base/observer_test.go b/zetaclient/chains/base/observer_test.go index 13d724d497..7dd2f18081 100644 --- a/zetaclient/chains/base/observer_test.go +++ b/zetaclient/chains/base/observer_test.go @@ -33,7 +33,6 @@ func createObserver(t *testing.T, dbPath string) *base.Observer { zetacoreContext := context.NewZetacoreContext(config.NewConfig()) zetacoreClient := mocks.NewMockZetacoreClient() tss := mocks.NewTSSMainnet() - blockCacheSize := base.DefaultBlockCacheSize // create observer logger := base.DefaultLogger() @@ -43,8 +42,10 @@ func createObserver(t *testing.T, dbPath string) *base.Observer { zetacoreContext, zetacoreClient, tss, - blockCacheSize, + base.DefaultBlockCacheSize, + base.DefaultHeadersCacheSize, dbPath, + nil, logger, ) require.NoError(t, err) @@ -60,55 +61,73 @@ func TestNewObserver(t *testing.T) { zetacoreClient := mocks.NewMockZetacoreClient() tss := mocks.NewTSSMainnet() blockCacheSize := base.DefaultBlockCacheSize + headersCacheSize := base.DefaultHeadersCacheSize dbPath := createTempDir(t) // test cases tests := []struct { - name string - chain chains.Chain - chainParams observertypes.ChainParams - zetacoreContext *context.ZetacoreContext - zetacoreClient interfaces.ZetacoreClient - tss interfaces.TSSSigner - blockCacheSize int - dbPath string - fail bool - message string + name string + chain chains.Chain + chainParams observertypes.ChainParams + zetacoreContext *context.ZetacoreContext + zetacoreClient interfaces.ZetacoreClient + tss interfaces.TSSSigner + blockCacheSize int + headersCacheSize int + dbPath string + fail bool + message string }{ { - name: "should be able to create new observer", - chain: chain, - chainParams: chainParams, - zetacoreContext: zetacoreContext, - zetacoreClient: zetacoreClient, - tss: tss, - blockCacheSize: blockCacheSize, - dbPath: dbPath, - fail: false, + name: "should be able to create new observer", + chain: chain, + chainParams: chainParams, + zetacoreContext: zetacoreContext, + zetacoreClient: zetacoreClient, + tss: tss, + blockCacheSize: blockCacheSize, + headersCacheSize: headersCacheSize, + dbPath: dbPath, + fail: false, + }, + { + name: "should return error on invalid block cache size", + chain: chain, + chainParams: chainParams, + zetacoreContext: zetacoreContext, + zetacoreClient: zetacoreClient, + tss: tss, + blockCacheSize: 0, + headersCacheSize: headersCacheSize, + dbPath: dbPath, + fail: true, + message: "error creating block cache", }, { - name: "should return error on invalid block cache size", - chain: chain, - chainParams: chainParams, - zetacoreContext: zetacoreContext, - zetacoreClient: zetacoreClient, - tss: tss, - blockCacheSize: 0, - dbPath: dbPath, - fail: true, - message: "error creating block cache", + name: "should return error on invalid header cache size", + chain: chain, + chainParams: chainParams, + zetacoreContext: zetacoreContext, + zetacoreClient: zetacoreClient, + tss: tss, + blockCacheSize: blockCacheSize, + headersCacheSize: 0, + dbPath: dbPath, + fail: true, + message: "error creating header cache", }, { - name: "should return error on invalid db path", - chain: chain, - chainParams: chainParams, - zetacoreContext: zetacoreContext, - zetacoreClient: zetacoreClient, - tss: tss, - blockCacheSize: blockCacheSize, - dbPath: "/invalid/123db", - fail: true, - message: "error opening observer db", + name: "should return error on invalid db path", + chain: chain, + chainParams: chainParams, + zetacoreContext: zetacoreContext, + zetacoreClient: zetacoreClient, + tss: tss, + blockCacheSize: blockCacheSize, + headersCacheSize: headersCacheSize, + dbPath: "/invalid/123db", + fail: true, + message: "error opening observer db", }, } @@ -122,7 +141,9 @@ func TestNewObserver(t *testing.T) { tt.zetacoreClient, tt.tss, tt.blockCacheSize, + tt.headersCacheSize, tt.dbPath, + nil, base.DefaultLogger(), ) if tt.fail { @@ -196,7 +217,7 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithLastBlockScanned(newLastBlockScanned) require.Equal(t, newLastBlockScanned, ob.LastBlockScanned()) }) - t.Run("should be able to update block cache", func(t *testing.T) { + t.Run("should be able to replace block cache", func(t *testing.T) { ob := createObserver(t, dbPath) // update block cache @@ -206,6 +227,16 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithBlockCache(newBlockCache) require.Equal(t, newBlockCache, ob.BlockCache()) }) + t.Run("should be able to replace headers cache", func(t *testing.T) { + ob := createObserver(t, dbPath) + + // update headers cache + newHeadersCache, err := lru.New(200) + require.NoError(t, err) + + ob = ob.WithHeaderCache(newHeadersCache) + require.Equal(t, newHeadersCache, ob.HeaderCache()) + }) t.Run("should be able to get database", func(t *testing.T) { ob := createObserver(t, dbPath) diff --git a/zetaclient/chains/base/signer.go b/zetaclient/chains/base/signer.go index 622e1a47a8..534cdb0bc3 100644 --- a/zetaclient/chains/base/signer.go +++ b/zetaclient/chains/base/signer.go @@ -4,6 +4,7 @@ import ( "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" "github.com/zeta-chain/zetacore/zetaclient/context" + "github.com/zeta-chain/zetacore/zetaclient/metrics" ) // Signer is the base signer @@ -17,6 +18,9 @@ type Signer struct { // tss is the TSS signer tss interfaces.TSSSigner + // ts is the telemetry server for metrics + ts *metrics.TelemetryServer + // logger contains the loggers used by signer logger Logger } @@ -26,12 +30,14 @@ func NewSigner( chain chains.Chain, zetacoreContext *context.ZetacoreContext, tss interfaces.TSSSigner, + ts *metrics.TelemetryServer, logger Logger, ) *Signer { return &Signer{ chain: chain, zetacoreContext: zetacoreContext, tss: tss, + ts: ts, logger: Logger{ Std: logger.Std.With().Int64("chain", chain.ChainId).Str("module", "signer").Logger(), Compliance: logger.Compliance, @@ -72,7 +78,18 @@ func (s *Signer) WithTSS(tss interfaces.TSSSigner) *Signer { return s } +// TelemetryServer returns the telemetry server for the signer +func (s *Signer) TelemetryServer() *metrics.TelemetryServer { + return s.ts +} + +// WithTelemetryServer attaches a new telemetry server to the signer +func (s *Signer) WithTelemetryServer(ts *metrics.TelemetryServer) *Signer { + s.ts = ts + return s +} + // Logger returns the logger for the signer -func (s *Signer) Logger() Logger { - return s.logger +func (s *Signer) Logger() *Logger { + return &s.logger } diff --git a/zetaclient/chains/base/signer_test.go b/zetaclient/chains/base/signer_test.go index 7c3628637b..960c508d6e 100644 --- a/zetaclient/chains/base/signer_test.go +++ b/zetaclient/chains/base/signer_test.go @@ -9,6 +9,7 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/chains/base" "github.com/zeta-chain/zetacore/zetaclient/config" "github.com/zeta-chain/zetacore/zetaclient/context" + "github.com/zeta-chain/zetacore/zetaclient/metrics" "github.com/zeta-chain/zetacore/zetaclient/testutils/mocks" ) @@ -21,7 +22,7 @@ func createSigner(_ *testing.T) *base.Signer { logger := base.DefaultLogger() // create signer - return base.NewSigner(chain, zetacoreContext, tss, logger) + return base.NewSigner(chain, zetacoreContext, tss, nil, logger) } func TestNewSigner(t *testing.T) { @@ -54,6 +55,14 @@ func TestSignerGetterAndSetter(t *testing.T) { signer = signer.WithTSS(newTSS) require.Equal(t, newTSS, signer.TSS()) }) + t.Run("should be able to update telemetry server", func(t *testing.T) { + signer := createSigner(t) + + // update telemetry server + newTs := metrics.NewTelemetryServer() + signer = signer.WithTelemetryServer(newTs) + require.Equal(t, newTs, signer.TelemetryServer()) + }) t.Run("should be able to get logger", func(t *testing.T) { ob := createSigner(t) logger := ob.Logger()