Skip to content

Commit

Permalink
added metrics server to base signer/observer
Browse files Browse the repository at this point in the history
  • Loading branch information
ws4charlie committed Jun 18, 2024
1 parent bad53bb commit 7418103
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 45 deletions.
29 changes: 29 additions & 0 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (

// DefaultBlockCacheSize is the default size of the block cache
DefaultBlockCacheSize = 1000

// DefaultHeadersCacheSize is the default size of the headers cache
DefaultHeadersCacheSize = 1000
)

// Observer is the base observer.
Expand Down Expand Up @@ -54,9 +57,15 @@ type Observer struct {
// blockCache is the cache for blocks
blockCache *lru.Cache

// headerCache is the cache for headers
headerCache *lru.Cache

// db is the database to persist data
db *gorm.DB

// ts is the telemetry server for metrics
ts *metrics.TelemetryServer

// logger contains the loggers used by observer
logger ObserverLogger

Expand All @@ -72,7 +81,9 @@ func NewObserver(
zetacoreClient interfaces.ZetacoreClient,
tss interfaces.TSSSigner,
blockCacheSize int,
headersCacheSize int,
dbPath string,
ts *metrics.TelemetryServer,
logger Logger,
) (*Observer, error) {
ob := Observer{
Expand All @@ -83,6 +94,7 @@ func NewObserver(
tss: tss,
lastBlock: 0,
lastBlockScanned: 0,
ts: ts,
stop: make(chan struct{}),
}

Expand All @@ -96,6 +108,12 @@ func NewObserver(
return nil, errors.Wrap(err, "error creating block cache")
}

// create header cache
ob.headerCache, err = lru.New(headersCacheSize)
if err != nil {
return nil, errors.Wrap(err, "error creating header cache")
}

// open database
err = ob.OpenDB(dbPath)
if err != nil {
Expand Down Expand Up @@ -195,6 +213,17 @@ func (ob *Observer) WithBlockCache(cache *lru.Cache) *Observer {
return ob
}

// HeaderCache returns the header cache for the observer.
func (ob *Observer) HeaderCache() *lru.Cache {
return ob.headerCache
}

// WithHeaderCache attaches a new header cache to the observer.
func (ob *Observer) WithHeaderCache(cache *lru.Cache) *Observer {
ob.headerCache = cache
return ob
}

// DB returns the database for the observer.
func (ob *Observer) DB() *gorm.DB {
return ob.db
Expand Down
115 changes: 73 additions & 42 deletions zetaclient/chains/base/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func createObserver(t *testing.T, dbPath string) *base.Observer {
zetacoreContext := context.NewZetacoreContext(config.NewConfig())
zetacoreClient := mocks.NewMockZetacoreClient()
tss := mocks.NewTSSMainnet()
blockCacheSize := base.DefaultBlockCacheSize

// create observer
logger := base.DefaultLogger()
Expand All @@ -43,8 +42,10 @@ func createObserver(t *testing.T, dbPath string) *base.Observer {
zetacoreContext,
zetacoreClient,
tss,
blockCacheSize,
base.DefaultBlockCacheSize,
base.DefaultHeadersCacheSize,
dbPath,
nil,
logger,
)
require.NoError(t, err)
Expand All @@ -60,55 +61,73 @@ func TestNewObserver(t *testing.T) {
zetacoreClient := mocks.NewMockZetacoreClient()
tss := mocks.NewTSSMainnet()
blockCacheSize := base.DefaultBlockCacheSize
headersCacheSize := base.DefaultHeadersCacheSize
dbPath := createTempDir(t)

// test cases
tests := []struct {
name string
chain chains.Chain
chainParams observertypes.ChainParams
zetacoreContext *context.ZetacoreContext
zetacoreClient interfaces.ZetacoreClient
tss interfaces.TSSSigner
blockCacheSize int
dbPath string
fail bool
message string
name string
chain chains.Chain
chainParams observertypes.ChainParams
zetacoreContext *context.ZetacoreContext
zetacoreClient interfaces.ZetacoreClient
tss interfaces.TSSSigner
blockCacheSize int
headersCacheSize int
dbPath string
fail bool
message string
}{
{
name: "should be able to create new observer",
chain: chain,
chainParams: chainParams,
zetacoreContext: zetacoreContext,
zetacoreClient: zetacoreClient,
tss: tss,
blockCacheSize: blockCacheSize,
dbPath: dbPath,
fail: false,
name: "should be able to create new observer",
chain: chain,
chainParams: chainParams,
zetacoreContext: zetacoreContext,
zetacoreClient: zetacoreClient,
tss: tss,
blockCacheSize: blockCacheSize,
headersCacheSize: headersCacheSize,
dbPath: dbPath,
fail: false,
},
{
name: "should return error on invalid block cache size",
chain: chain,
chainParams: chainParams,
zetacoreContext: zetacoreContext,
zetacoreClient: zetacoreClient,
tss: tss,
blockCacheSize: 0,
headersCacheSize: headersCacheSize,
dbPath: dbPath,
fail: true,
message: "error creating block cache",
},
{
name: "should return error on invalid block cache size",
chain: chain,
chainParams: chainParams,
zetacoreContext: zetacoreContext,
zetacoreClient: zetacoreClient,
tss: tss,
blockCacheSize: 0,
dbPath: dbPath,
fail: true,
message: "error creating block cache",
name: "should return error on invalid header cache size",
chain: chain,
chainParams: chainParams,
zetacoreContext: zetacoreContext,
zetacoreClient: zetacoreClient,
tss: tss,
blockCacheSize: blockCacheSize,
headersCacheSize: 0,
dbPath: dbPath,
fail: true,
message: "error creating header cache",
},
{
name: "should return error on invalid db path",
chain: chain,
chainParams: chainParams,
zetacoreContext: zetacoreContext,
zetacoreClient: zetacoreClient,
tss: tss,
blockCacheSize: blockCacheSize,
dbPath: "/invalid/123db",
fail: true,
message: "error opening observer db",
name: "should return error on invalid db path",
chain: chain,
chainParams: chainParams,
zetacoreContext: zetacoreContext,
zetacoreClient: zetacoreClient,
tss: tss,
blockCacheSize: blockCacheSize,
headersCacheSize: headersCacheSize,
dbPath: "/invalid/123db",
fail: true,
message: "error opening observer db",
},
}

Expand All @@ -122,7 +141,9 @@ func TestNewObserver(t *testing.T) {
tt.zetacoreClient,
tt.tss,
tt.blockCacheSize,
tt.headersCacheSize,
tt.dbPath,
nil,
base.DefaultLogger(),
)
if tt.fail {
Expand Down Expand Up @@ -196,7 +217,7 @@ func TestObserverGetterAndSetter(t *testing.T) {
ob = ob.WithLastBlockScanned(newLastBlockScanned)
require.Equal(t, newLastBlockScanned, ob.LastBlockScanned())
})
t.Run("should be able to update block cache", func(t *testing.T) {
t.Run("should be able to replace block cache", func(t *testing.T) {
ob := createObserver(t, dbPath)

// update block cache
Expand All @@ -206,6 +227,16 @@ func TestObserverGetterAndSetter(t *testing.T) {
ob = ob.WithBlockCache(newBlockCache)
require.Equal(t, newBlockCache, ob.BlockCache())
})
t.Run("should be able to replace headers cache", func(t *testing.T) {
ob := createObserver(t, dbPath)

// update headers cache
newHeadersCache, err := lru.New(200)
require.NoError(t, err)

ob = ob.WithHeaderCache(newHeadersCache)
require.Equal(t, newHeadersCache, ob.HeaderCache())
})
t.Run("should be able to get database", func(t *testing.T) {
ob := createObserver(t, dbPath)

Expand Down
21 changes: 19 additions & 2 deletions zetaclient/chains/base/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/zeta-chain/zetacore/pkg/chains"
"github.com/zeta-chain/zetacore/zetaclient/chains/interfaces"
"github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
)

// Signer is the base signer
Expand All @@ -17,6 +18,9 @@ type Signer struct {
// tss is the TSS signer
tss interfaces.TSSSigner

// ts is the telemetry server for metrics
ts *metrics.TelemetryServer

// logger contains the loggers used by signer
logger Logger
}
Expand All @@ -26,12 +30,14 @@ func NewSigner(
chain chains.Chain,
zetacoreContext *context.ZetacoreContext,
tss interfaces.TSSSigner,
ts *metrics.TelemetryServer,
logger Logger,
) *Signer {
return &Signer{
chain: chain,
zetacoreContext: zetacoreContext,
tss: tss,
ts: ts,
logger: Logger{
Std: logger.Std.With().Int64("chain", chain.ChainId).Str("module", "signer").Logger(),
Compliance: logger.Compliance,
Expand Down Expand Up @@ -72,7 +78,18 @@ func (s *Signer) WithTSS(tss interfaces.TSSSigner) *Signer {
return s
}

// TelemetryServer returns the telemetry server for the signer
func (s *Signer) TelemetryServer() *metrics.TelemetryServer {
return s.ts
}

// WithTelemetryServer attaches a new telemetry server to the signer
func (s *Signer) WithTelemetryServer(ts *metrics.TelemetryServer) *Signer {
s.ts = ts
return s
}

// Logger returns the logger for the signer
func (s *Signer) Logger() Logger {
return s.logger
func (s *Signer) Logger() *Logger {
return &s.logger
}
11 changes: 10 additions & 1 deletion zetaclient/chains/base/signer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/zeta-chain/zetacore/zetaclient/chains/base"
"github.com/zeta-chain/zetacore/zetaclient/config"
"github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
"github.com/zeta-chain/zetacore/zetaclient/testutils/mocks"
)

Expand All @@ -21,7 +22,7 @@ func createSigner(_ *testing.T) *base.Signer {
logger := base.DefaultLogger()

// create signer
return base.NewSigner(chain, zetacoreContext, tss, logger)
return base.NewSigner(chain, zetacoreContext, tss, nil, logger)
}

func TestNewSigner(t *testing.T) {
Expand Down Expand Up @@ -54,6 +55,14 @@ func TestSignerGetterAndSetter(t *testing.T) {
signer = signer.WithTSS(newTSS)
require.Equal(t, newTSS, signer.TSS())
})
t.Run("should be able to update telemetry server", func(t *testing.T) {
signer := createSigner(t)

// update telemetry server
newTs := metrics.NewTelemetryServer()
signer = signer.WithTelemetryServer(newTs)
require.Equal(t, newTs, signer.TelemetryServer())
})
t.Run("should be able to get logger", func(t *testing.T) {
ob := createSigner(t)
logger := ob.Logger()
Expand Down

0 comments on commit 7418103

Please sign in to comment.