Skip to content

Commit

Permalink
Relay downloader factory and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Nov 16, 2023
1 parent b03d209 commit 5647546
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 30 deletions.
1 change: 1 addition & 0 deletions cmd/event-service/di/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func buildTestTransactionSqliteAdapters(*sql.DB, *sql.Tx, buildTransactionSqlite
}

var downloaderSet = wire.NewSet(
app.NewRelayDownloaderFactory,
app.NewDownloader,

relays.NewBootstrapRelaySource,
Expand Down
5 changes: 3 additions & 2 deletions cmd/event-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 19 additions & 5 deletions service/adapters/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ const (
labelVcsTime = "vcsTime"
labelGo = "go"

labelConnectionAddress = "address"
labelConnectionState = "state"
labelAddress = "address"
labelConnectionState = "state"

labelResult = "result"
labelResultSuccess = "success"
Expand All @@ -35,6 +35,7 @@ type Prometheus struct {
relayDownloadersGauge prometheus.Gauge
subscriptionQueueLengthGauge *prometheus.GaugeVec
relayConnectionStateGauge *prometheus.GaugeVec
receivedEventsCounter *prometheus.CounterVec

registry *prometheus.Registry

Expand Down Expand Up @@ -81,7 +82,14 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
Name: "relay_connection_state_gauge",
Help: "State of relay connections.",
},
[]string{labelConnectionAddress, labelConnectionState},
[]string{labelAddress, labelConnectionState},
)
receivedEventsCounter := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "received_events_counter",
Help: "Number of received events.",
},
[]string{labelAddress},
)

reg := prometheus.NewRegistry()
Expand All @@ -92,6 +100,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
subscriptionQueueLengthGauge,
versionGague,
relayConnectionStateGauge,
receivedEventsCounter,
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
collectors.NewGoCollector(),
} {
Expand Down Expand Up @@ -120,6 +129,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
relayDownloadersGauge: relayDownloadersGauge,
subscriptionQueueLengthGauge: subscriptionQueueLengthGauge,
relayConnectionStateGauge: relayConnectionStateGauge,
receivedEventsCounter: receivedEventsCounter,

registry: reg,

Expand All @@ -144,13 +154,17 @@ func (p *Prometheus) ReportRelayConnectionsState(m map[domain.RelayAddress]relay
for address, state := range m {
p.relayConnectionStateGauge.With(
prometheus.Labels{
labelConnectionState: state.String(),
labelConnectionAddress: address.String(),
labelConnectionState: state.String(),
labelAddress: address.String(),
},
).Set(1)
}
}

func (p *Prometheus) ReportReceivedEvent(address domain.RelayAddress) {
p.receivedEventsCounter.With(prometheus.Labels{labelAddress: address.String()}).Inc()
}

func (p *Prometheus) Registry() *prometheus.Registry {
return p.registry
}
Expand Down
1 change: 1 addition & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type ReceivedEventSubscriber interface {
type Metrics interface {
StartApplicationCall(handlerName string) ApplicationCall
ReportNumberOfRelayDownloaders(n int)
ReportReceivedEvent(address domain.RelayAddress)
}

type ApplicationCall interface {
Expand Down
76 changes: 53 additions & 23 deletions service/app/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ const (
refreshKnownRelaysEvery = 1 * time.Minute
)

var (
globalEventTypesToDownload = []domain.EventKind{
domain.EventKindMetadata,
domain.EventKindRecommendedRelay,
domain.EventKindContacts,
domain.EventKindRelayListMetadata,
}
)

type ReceivedEventPublisher interface {
Publish(relay domain.RelayAddress, event domain.Event)
}
Expand All @@ -41,29 +50,26 @@ type Downloader struct {

bootstrapRelaySource BootstrapRelaySource
relaySource RelaySource
relayConnections RelayConnections
receivedEventPublisher ReceivedEventPublisher
logger logging.Logger
metrics Metrics
relayDownloaderFactory *RelayDownloaderFactory
}

func NewDownloader(
bootstrapRelaySource BootstrapRelaySource,
relaySource RelaySource,
relayConnections RelayConnections,
receivedEventPublisher ReceivedEventPublisher,
logger logging.Logger,
metrics Metrics,
relayDownloaderFactory *RelayDownloaderFactory,
) *Downloader {
return &Downloader{
relayDownloaders: make(map[domain.RelayAddress]context.CancelFunc),

bootstrapRelaySource: bootstrapRelaySource,
relaySource: relaySource,
relayConnections: relayConnections,
receivedEventPublisher: receivedEventPublisher,
logger: logger.New("downloader"),
metrics: metrics,
relayDownloaderFactory: relayDownloaderFactory,
}
}

Expand Down Expand Up @@ -133,12 +139,10 @@ func (d *Downloader) updateDownloaders(ctx context.Context) error {
WithField("address", relayAddress.String()).
Message("creating a downloader")

downloader := NewRelayDownloader(
d.receivedEventPublisher,
d.relayConnections,
d.logger,
relayAddress,
)
downloader, err := d.relayDownloaderFactory.CreateRelayDownloader(relayAddress)
if err != nil {
return errors.Wrap(err, "error creating a relay downloader")
}

ctx, cancel := context.WithCancel(ctx)
go downloader.Run(ctx)
Expand Down Expand Up @@ -176,28 +180,36 @@ type RelayDownloader struct {
receivedEventPublisher ReceivedEventPublisher
relayConnections RelayConnections
logger logging.Logger
metrics Metrics
}

func NewRelayDownloader(
address domain.RelayAddress,
receivedEventPublisher ReceivedEventPublisher,
relayConnections RelayConnections,
logger logging.Logger,
address domain.RelayAddress,
metrics Metrics,
) *RelayDownloader {
v := &RelayDownloader{
address: address,

receivedEventPublisher: receivedEventPublisher,
relayConnections: relayConnections,
logger: logger.New(fmt.Sprintf("relayDownloader(%s)", address.String())),
metrics: metrics,
}
return v
}

func (d *RelayDownloader) Run(ctx context.Context) {
//go d.storeMetricsLoop(ctx)

go d.downloadMessages(ctx, domain.NewFilter(nil, d.eventKindsToDownloadForEveryone(), nil, d.downloadSince()))
go d.downloadMessages(ctx, domain.NewFilter(
nil,
globalEventTypesToDownload,
nil,
d.downloadSince(),
))

//for {
// if err := d.refreshRelays(ctx); err != nil {
Expand All @@ -215,15 +227,6 @@ func (d *RelayDownloader) Run(ctx context.Context) {
//}
}

func (d *RelayDownloader) eventKindsToDownloadForEveryone() []domain.EventKind {
return []domain.EventKind{
domain.EventKindMetadata,
domain.EventKindRecommendedRelay,
domain.EventKindContacts,
domain.EventKindRelayListMetadata,
}
}

func (d *RelayDownloader) downloadSince() *time.Time {
return internal.Pointer(time.Now().Add(-downloadEventsFromLast))

Expand Down Expand Up @@ -322,6 +325,7 @@ func (d *RelayDownloader) downloadMessagesWithErr(ctx context.Context, filter do

for eventOrEOSE := range ch {
if !eventOrEOSE.EOSE() {
d.metrics.ReportReceivedEvent(d.address)
d.receivedEventPublisher.Publish(d.address, eventOrEOSE.Event())
}
}
Expand Down Expand Up @@ -370,3 +374,29 @@ func (m *DatabaseRelaySource) GetRelays(ctx context.Context) ([]domain.RelayAddr

return result, nil
}

type RelayDownloaderFactory struct {
relayConnections RelayConnections
receivedEventPublisher ReceivedEventPublisher
logger logging.Logger
metrics Metrics
}

func NewRelayDownloaderFactory(relayConnections RelayConnections, receivedEventPublisher ReceivedEventPublisher, logger logging.Logger, metrics Metrics) *RelayDownloaderFactory {
return &RelayDownloaderFactory{
relayConnections: relayConnections,
receivedEventPublisher: receivedEventPublisher,
logger: logger.New("relayDownloaderFactory"),
metrics: metrics,
}
}

func (r *RelayDownloaderFactory) CreateRelayDownloader(address domain.RelayAddress) (*RelayDownloader, error) {
return NewRelayDownloader(
address,
r.receivedEventPublisher,
r.relayConnections,
r.logger,
r.metrics,
), nil
}

0 comments on commit 5647546

Please sign in to comment.