diff --git a/cmd/event-service/di/wire.go b/cmd/event-service/di/wire.go index 5332923..6a17b93 100644 --- a/cmd/event-service/di/wire.go +++ b/cmd/event-service/di/wire.go @@ -91,6 +91,7 @@ func buildTestTransactionSqliteAdapters(*sql.DB, *sql.Tx, buildTransactionSqlite } var downloaderSet = wire.NewSet( + app.NewRelayDownloaderFactory, app.NewDownloader, relays.NewBootstrapRelaySource, diff --git a/cmd/event-service/di/wire_gen.go b/cmd/event-service/di/wire_gen.go index 39dc0e0..73f6093 100644 --- a/cmd/event-service/di/wire_gen.go +++ b/cmd/event-service/di/wire_gen.go @@ -63,7 +63,8 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S databaseRelaySource := app.NewDatabaseRelaySource(genericTransactionProvider, logger) relayConnections := relays.NewRelayConnections(contextContext, logger, prometheusPrometheus) receivedEventPubSub := memorypubsub.NewReceivedEventPubSub() - downloader := app.NewDownloader(bootstrapRelaySource, databaseRelaySource, relayConnections, receivedEventPubSub, logger, prometheusPrometheus) + relayDownloaderFactory := app.NewRelayDownloaderFactory(relayConnections, receivedEventPubSub, logger, prometheusPrometheus) + downloader := app.NewDownloader(bootstrapRelaySource, databaseRelaySource, logger, prometheusPrometheus, relayDownloaderFactory) receivedEventSubscriber := memorypubsub2.NewReceivedEventSubscriber(receivedEventPubSub, saveReceivedEventHandler, logger) pubSub := sqlite.NewPubSub(db, logger) subscriber := sqlite.NewSubscriber(pubSub, db) @@ -178,6 +179,6 @@ type buildTransactionSqliteAdaptersDependencies struct { Logger logging.Logger } -var downloaderSet = wire.NewSet(app.NewDownloader, relays.NewBootstrapRelaySource, wire.Bind(new(app.BootstrapRelaySource), new(*relays.BootstrapRelaySource)), app.NewDatabaseRelaySource, wire.Bind(new(app.RelaySource), new(*app.DatabaseRelaySource)), relays.NewRelayConnections, wire.Bind(new(app.RelayConnections), new(*relays.RelayConnections))) +var downloaderSet = wire.NewSet(app.NewRelayDownloaderFactory, app.NewDownloader, relays.NewBootstrapRelaySource, wire.Bind(new(app.BootstrapRelaySource), new(*relays.BootstrapRelaySource)), app.NewDatabaseRelaySource, wire.Bind(new(app.RelaySource), new(*app.DatabaseRelaySource)), relays.NewRelayConnections, wire.Bind(new(app.RelayConnections), new(*relays.RelayConnections))) var domainSet = wire.NewSet(domain.NewRelaysExtractor, wire.Bind(new(app.RelaysExtractor), new(*domain.RelaysExtractor))) diff --git a/service/adapters/prometheus/prometheus.go b/service/adapters/prometheus/prometheus.go index 3b2985e..8c8a4fc 100644 --- a/service/adapters/prometheus/prometheus.go +++ b/service/adapters/prometheus/prometheus.go @@ -20,8 +20,8 @@ const ( labelVcsTime = "vcsTime" labelGo = "go" - labelConnectionAddress = "address" - labelConnectionState = "state" + labelAddress = "address" + labelConnectionState = "state" labelResult = "result" labelResultSuccess = "success" @@ -35,6 +35,7 @@ type Prometheus struct { relayDownloadersGauge prometheus.Gauge subscriptionQueueLengthGauge *prometheus.GaugeVec relayConnectionStateGauge *prometheus.GaugeVec + receivedEventsCounter *prometheus.CounterVec registry *prometheus.Registry @@ -81,7 +82,14 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { Name: "relay_connection_state_gauge", Help: "State of relay connections.", }, - []string{labelConnectionAddress, labelConnectionState}, + []string{labelAddress, labelConnectionState}, + ) + receivedEventsCounter := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "received_events_counter", + Help: "Number of received events.", + }, + []string{labelAddress}, ) reg := prometheus.NewRegistry() @@ -92,6 +100,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { subscriptionQueueLengthGauge, versionGague, relayConnectionStateGauge, + receivedEventsCounter, collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), collectors.NewGoCollector(), } { @@ -120,6 +129,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { relayDownloadersGauge: relayDownloadersGauge, subscriptionQueueLengthGauge: subscriptionQueueLengthGauge, relayConnectionStateGauge: relayConnectionStateGauge, + receivedEventsCounter: receivedEventsCounter, registry: reg, @@ -144,13 +154,17 @@ func (p *Prometheus) ReportRelayConnectionsState(m map[domain.RelayAddress]relay for address, state := range m { p.relayConnectionStateGauge.With( prometheus.Labels{ - labelConnectionState: state.String(), - labelConnectionAddress: address.String(), + labelConnectionState: state.String(), + labelAddress: address.String(), }, ).Set(1) } } +func (p *Prometheus) ReportReceivedEvent(address domain.RelayAddress) { + p.receivedEventsCounter.With(prometheus.Labels{labelAddress: address.String()}).Inc() +} + func (p *Prometheus) Registry() *prometheus.Registry { return p.registry } diff --git a/service/app/app.go b/service/app/app.go index 299513c..50c7896 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -63,6 +63,7 @@ type ReceivedEventSubscriber interface { type Metrics interface { StartApplicationCall(handlerName string) ApplicationCall ReportNumberOfRelayDownloaders(n int) + ReportReceivedEvent(address domain.RelayAddress) } type ApplicationCall interface { diff --git a/service/app/downloader.go b/service/app/downloader.go index 0ec582c..eb22ec0 100644 --- a/service/app/downloader.go +++ b/service/app/downloader.go @@ -19,6 +19,15 @@ const ( refreshKnownRelaysEvery = 1 * time.Minute ) +var ( + globalEventTypesToDownload = []domain.EventKind{ + domain.EventKindMetadata, + domain.EventKindRecommendedRelay, + domain.EventKindContacts, + domain.EventKindRelayListMetadata, + } +) + type ReceivedEventPublisher interface { Publish(relay domain.RelayAddress, event domain.Event) } @@ -41,29 +50,26 @@ type Downloader struct { bootstrapRelaySource BootstrapRelaySource relaySource RelaySource - relayConnections RelayConnections - receivedEventPublisher ReceivedEventPublisher logger logging.Logger metrics Metrics + relayDownloaderFactory *RelayDownloaderFactory } func NewDownloader( bootstrapRelaySource BootstrapRelaySource, relaySource RelaySource, - relayConnections RelayConnections, - receivedEventPublisher ReceivedEventPublisher, logger logging.Logger, metrics Metrics, + relayDownloaderFactory *RelayDownloaderFactory, ) *Downloader { return &Downloader{ relayDownloaders: make(map[domain.RelayAddress]context.CancelFunc), bootstrapRelaySource: bootstrapRelaySource, relaySource: relaySource, - relayConnections: relayConnections, - receivedEventPublisher: receivedEventPublisher, logger: logger.New("downloader"), metrics: metrics, + relayDownloaderFactory: relayDownloaderFactory, } } @@ -133,12 +139,10 @@ func (d *Downloader) updateDownloaders(ctx context.Context) error { WithField("address", relayAddress.String()). Message("creating a downloader") - downloader := NewRelayDownloader( - d.receivedEventPublisher, - d.relayConnections, - d.logger, - relayAddress, - ) + downloader, err := d.relayDownloaderFactory.CreateRelayDownloader(relayAddress) + if err != nil { + return errors.Wrap(err, "error creating a relay downloader") + } ctx, cancel := context.WithCancel(ctx) go downloader.Run(ctx) @@ -176,13 +180,15 @@ type RelayDownloader struct { receivedEventPublisher ReceivedEventPublisher relayConnections RelayConnections logger logging.Logger + metrics Metrics } func NewRelayDownloader( + address domain.RelayAddress, receivedEventPublisher ReceivedEventPublisher, relayConnections RelayConnections, logger logging.Logger, - address domain.RelayAddress, + metrics Metrics, ) *RelayDownloader { v := &RelayDownloader{ address: address, @@ -190,6 +196,7 @@ func NewRelayDownloader( receivedEventPublisher: receivedEventPublisher, relayConnections: relayConnections, logger: logger.New(fmt.Sprintf("relayDownloader(%s)", address.String())), + metrics: metrics, } return v } @@ -197,7 +204,12 @@ func NewRelayDownloader( func (d *RelayDownloader) Run(ctx context.Context) { //go d.storeMetricsLoop(ctx) - go d.downloadMessages(ctx, domain.NewFilter(nil, d.eventKindsToDownloadForEveryone(), nil, d.downloadSince())) + go d.downloadMessages(ctx, domain.NewFilter( + nil, + globalEventTypesToDownload, + nil, + d.downloadSince(), + )) //for { // if err := d.refreshRelays(ctx); err != nil { @@ -215,15 +227,6 @@ func (d *RelayDownloader) Run(ctx context.Context) { //} } -func (d *RelayDownloader) eventKindsToDownloadForEveryone() []domain.EventKind { - return []domain.EventKind{ - domain.EventKindMetadata, - domain.EventKindRecommendedRelay, - domain.EventKindContacts, - domain.EventKindRelayListMetadata, - } -} - func (d *RelayDownloader) downloadSince() *time.Time { return internal.Pointer(time.Now().Add(-downloadEventsFromLast)) @@ -322,6 +325,7 @@ func (d *RelayDownloader) downloadMessagesWithErr(ctx context.Context, filter do for eventOrEOSE := range ch { if !eventOrEOSE.EOSE() { + d.metrics.ReportReceivedEvent(d.address) d.receivedEventPublisher.Publish(d.address, eventOrEOSE.Event()) } } @@ -370,3 +374,29 @@ func (m *DatabaseRelaySource) GetRelays(ctx context.Context) ([]domain.RelayAddr return result, nil } + +type RelayDownloaderFactory struct { + relayConnections RelayConnections + receivedEventPublisher ReceivedEventPublisher + logger logging.Logger + metrics Metrics +} + +func NewRelayDownloaderFactory(relayConnections RelayConnections, receivedEventPublisher ReceivedEventPublisher, logger logging.Logger, metrics Metrics) *RelayDownloaderFactory { + return &RelayDownloaderFactory{ + relayConnections: relayConnections, + receivedEventPublisher: receivedEventPublisher, + logger: logger.New("relayDownloaderFactory"), + metrics: metrics, + } +} + +func (r *RelayDownloaderFactory) CreateRelayDownloader(address domain.RelayAddress) (*RelayDownloader, error) { + return NewRelayDownloader( + address, + r.receivedEventPublisher, + r.relayConnections, + r.logger, + r.metrics, + ), nil +}