From 6b51f0755f9f125ddd71465453d045bf7d9eba71 Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 22 Nov 2023 15:11:10 +0100 Subject: [PATCH] Add metrics for the number of stored events --- service/adapters/prometheus/prometheus.go | 13 ++++++++ service/adapters/sqlite/event_repository.go | 11 +++++++ .../adapters/sqlite/event_repository_test.go | 33 +++++++++++++++++++ service/app/app.go | 3 ++ service/app/handler_update_metrics.go | 6 ++++ 5 files changed, 66 insertions(+) diff --git a/service/adapters/prometheus/prometheus.go b/service/adapters/prometheus/prometheus.go index 1e184b0..c26e60c 100644 --- a/service/adapters/prometheus/prometheus.go +++ b/service/adapters/prometheus/prometheus.go @@ -44,6 +44,7 @@ type Prometheus struct { relayConnectionReceivedMessagesCounter *prometheus.CounterVec relayConnectionDisconnectionsCounter *prometheus.CounterVec storedRelayAddressesGauge prometheus.Gauge + storedEventsGauge prometheus.Gauge registry *prometheus.Registry @@ -126,6 +127,12 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { Help: "Number of stored relay addresses.", }, ) + storedEventsGauge := prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "stored_events_gauge", + Help: "Number of stored events.", + }, + ) reg := prometheus.NewRegistry() for _, v := range []prometheus.Collector{ @@ -140,6 +147,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { relayConnectionReceivedMessagesCounter, relayConnectionDisconnectionsCounter, storedRelayAddressesGauge, + storedEventsGauge, collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), collectors.NewGoCollector(), } { @@ -173,6 +181,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { relayConnectionReceivedMessagesCounter: relayConnectionReceivedMessagesCounter, relayConnectionDisconnectionsCounter: relayConnectionDisconnectionsCounter, storedRelayAddressesGauge: storedRelayAddressesGauge, + storedEventsGauge: storedEventsGauge, registry: reg, @@ -234,6 +243,10 @@ func (p *Prometheus) ReportNumberOfStoredRelayAddresses(n int) { p.storedRelayAddressesGauge.Set(float64(n)) } +func (p *Prometheus) ReportNumberOfStoredEvents(n int) { + p.storedEventsGauge.Set(float64(n)) +} + func (p *Prometheus) Registry() *prometheus.Registry { return p.registry } diff --git a/service/adapters/sqlite/event_repository.go b/service/adapters/sqlite/event_repository.go index f44c80e..f1af2f2 100644 --- a/service/adapters/sqlite/event_repository.go +++ b/service/adapters/sqlite/event_repository.go @@ -46,6 +46,17 @@ func (r *EventRepository) Get(ctx context.Context, eventID domain.EventId) (doma return r.readEvent(result) } +func (r *EventRepository) Count(ctx context.Context) (int, error) { + row := r.tx.QueryRow(`SELECT COUNT(*) FROM events`) + + var count int + if err := row.Scan(&count); err != nil { + return 0, errors.Wrap(err, "error scanning") + } + + return count, nil +} + func (m *EventRepository) readEvent(result *sql.Row) (domain.Event, error) { var payload []byte diff --git a/service/adapters/sqlite/event_repository_test.go b/service/adapters/sqlite/event_repository_test.go index bdf89ef..e8f320a 100644 --- a/service/adapters/sqlite/event_repository_test.go +++ b/service/adapters/sqlite/event_repository_test.go @@ -69,3 +69,36 @@ func TestEventRepository_ItIsPossibleToSaveAndGetEvents(t *testing.T) { }) require.NoError(t, err) } + +func TestEventRepository_CountCountsSavedEvents(t *testing.T) { + ctx := fixtures.TestContext(t) + adapters := NewTestAdapters(ctx, t) + + err := adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + n, err := adapters.EventRepository.Count(ctx) + require.NoError(t, err) + require.Equal(t, 0, n) + + return nil + }) + require.NoError(t, err) + + for i := 0; i < 5; i++ { + err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + err := adapters.EventRepository.Save(ctx, fixtures.SomeEvent()) + require.NoError(t, err) + + return nil + }) + require.NoError(t, err) + + err = adapters.TransactionProvider.Transact(ctx, func(ctx context.Context, adapters sqlite.TestAdapters) error { + n, err := adapters.EventRepository.Count(ctx) + require.NoError(t, err) + require.Equal(t, i+1, n) + + return nil + }) + require.NoError(t, err) + } +} diff --git a/service/app/app.go b/service/app/app.go index fc686eb..de8269d 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -30,6 +30,8 @@ type EventRepository interface { // Get returns ErrEventNotFound. Get(ctx context.Context, eventID domain.EventId) (domain.Event, error) + + Count(ctx context.Context) (int, error) } type RelayRepository interface { @@ -96,6 +98,7 @@ type Metrics interface { ReportReceivedEvent(address domain.RelayAddress) ReportQueueLength(topic string, n int) ReportNumberOfStoredRelayAddresses(n int) + ReportNumberOfStoredEvents(n int) } type ApplicationCall interface { diff --git a/service/app/handler_update_metrics.go b/service/app/handler_update_metrics.go index 7234e0a..afb4459 100644 --- a/service/app/handler_update_metrics.go +++ b/service/app/handler_update_metrics.go @@ -44,6 +44,12 @@ func (h *UpdateMetricsHandler) Handle(ctx context.Context) (err error) { } h.metrics.ReportNumberOfStoredRelayAddresses(n) + n, err = adapters.Events.Count(ctx) + if err != nil { + return errors.Wrap(err, "error counting events") + } + h.metrics.ReportNumberOfStoredEvents(n) + return nil }); err != nil { return errors.Wrap(err, "transaction error")