Skip to content

Commit

Permalink
Merge pull request #2355 from gobitfly/BIDS-2245/split_notification_p…
Browse files Browse the repository at this point in the history
…rocess

Bids 2245/split notification process
  • Loading branch information
recy21 authored Jun 27, 2023
2 parents 678af5c + 8f4d971 commit e644da8
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 38 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ BUILDDATE=`date -u +"%Y-%m-%dT%H:%M:%S%:z"`
PACKAGE=eth2-exporter
LDFLAGS="-X ${PACKAGE}/version.Version=${VERSION} -X ${PACKAGE}/version.BuildDate=${BUILDDATE} -X ${PACKAGE}/version.GitCommit=${GITCOMMIT} -X ${PACKAGE}/version.GitDate=${GITDATE} -s -w"
all: explorer stats frontend-data-updater eth1indexer ethstore-exporter rewards-exporter node-jobs-processor signatures
all: explorer stats frontend-data-updater eth1indexer ethstore-exporter rewards-exporter node-jobs-processor signatures notification-sender notification-collector
lint:
golint ./...
Expand Down Expand Up @@ -44,6 +44,12 @@ signatures:
misc:
go build --ldflags=${LDFLAGS} -o bin/misc cmd/misc/main.go
notification-sender:
go build --ldflags=${LDFLAGS} -o bin/notification-sender cmd/notification-sender/main.go
notification-collector:
go build --ldflags=${LDFLAGS} -o bin/notification-collector cmd/notification-collector/main.go
playground:
go build --ldflags=${LDFLAGS} -o bin/add_income_stats cmd/playground/add_income_stats/main.go
go build --ldflags=${LDFLAGS} -o bin/re_calculate_stats_totals cmd/playground/re_calculate_stats_totals/main.go
Expand Down
9 changes: 3 additions & 6 deletions cmd/explorer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,19 @@ func main() {
go func() {
defer wg.Done()
cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint)
logrus.Infof("Tiered Cache initialized. Latest finalized epoch: %v", services.LatestFinalizedEpoch())
logrus.Infof("tiered Cache initialized, latest finalized epoch: %v", services.LatestFinalizedEpoch())

}()
}

wg.Wait()
if utils.Config.TieredCacheProvider == "bigtable" && len(utils.Config.RedisCacheEndpoint) == 0 {
cache.MustInitTieredCacheBigtable(db.BigtableClient.GetClient(), fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID))
logrus.Infof("Tiered Cache initialized. Latest finalized epoch: %v", services.LatestFinalizedEpoch())
logrus.Infof("tiered Cache initialized, latest finalized epoch: %v", services.LatestFinalizedEpoch())
}

if utils.Config.TieredCacheProvider != "bigtable" && utils.Config.TieredCacheProvider != "redis" {
logrus.Fatalf("No cache provider set. Please set TierdCacheProvider (example redis, bigtable)")
logrus.Fatalf("no cache provider set, please set TierdCacheProvider (example redis, bigtable)")
}

defer db.ReaderDb.Close()
Expand Down Expand Up @@ -728,9 +728,6 @@ func main() {
}
}()
}
if utils.Config.Notifications.Enabled {
services.InitNotifications(utils.Config.Notifications.PubkeyCachePath)
}

if utils.Config.Metrics.Enabled {
go func(addr string) {
Expand Down
161 changes: 161 additions & 0 deletions cmd/notification-collector/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package main

import (
"eth2-exporter/cache"
"eth2-exporter/db"
"eth2-exporter/metrics"
"eth2-exporter/services"
"eth2-exporter/types"
"eth2-exporter/utils"
"eth2-exporter/version"
"flag"
"fmt"
"net/http"
"strings"
"sync"

"github.com/sirupsen/logrus"

_ "eth2-exporter/docs"
_ "net/http/pprof"

_ "github.com/jackc/pgx/v4/stdlib"
)

func main() {
configPath := flag.String("config", "", "Path to the config file, if empty string defaults will be used")

flag.Parse()

cfg := &types.Config{}
err := utils.ReadConfig(cfg, *configPath)
if err != nil {
logrus.Fatalf("error reading config file: %v", err)
}
utils.Config = cfg
logrus.WithFields(logrus.Fields{
"config": *configPath,
"version": version.Version,
"chainName": utils.Config.Chain.Config.ConfigName}).Printf("starting")

if utils.Config.Chain.Config.SlotsPerEpoch == 0 || utils.Config.Chain.Config.SecondsPerSlot == 0 {
utils.LogFatal(err, "invalid chain configuration specified, you must specify the slots per epoch, seconds per slot and genesis timestamp in the config file", 0)
}

if utils.Config.Pprof.Enabled {
go func() {
logrus.Infof("starting pprof http server on port %s", utils.Config.Pprof.Port)
logrus.Info(http.ListenAndServe(fmt.Sprintf("0.0.0.0:%s", utils.Config.Pprof.Port), nil))
}()
}

wg := &sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
db.MustInitDB(&types.DatabaseConfig{
Username: cfg.WriterDatabase.Username,
Password: cfg.WriterDatabase.Password,
Name: cfg.WriterDatabase.Name,
Host: cfg.WriterDatabase.Host,
Port: cfg.WriterDatabase.Port,
MaxOpenConns: cfg.WriterDatabase.MaxOpenConns,
MaxIdleConns: cfg.WriterDatabase.MaxIdleConns,
}, &types.DatabaseConfig{
Username: cfg.ReaderDatabase.Username,
Password: cfg.ReaderDatabase.Password,
Name: cfg.ReaderDatabase.Name,
Host: cfg.ReaderDatabase.Host,
Port: cfg.ReaderDatabase.Port,
MaxOpenConns: cfg.ReaderDatabase.MaxOpenConns,
MaxIdleConns: cfg.ReaderDatabase.MaxIdleConns,
})
}()

wg.Add(1)
go func() {
defer wg.Done()
db.MustInitFrontendDB(&types.DatabaseConfig{
Username: cfg.Frontend.WriterDatabase.Username,
Password: cfg.Frontend.WriterDatabase.Password,
Name: cfg.Frontend.WriterDatabase.Name,
Host: cfg.Frontend.WriterDatabase.Host,
Port: cfg.Frontend.WriterDatabase.Port,
MaxOpenConns: cfg.Frontend.WriterDatabase.MaxOpenConns,
MaxIdleConns: cfg.Frontend.WriterDatabase.MaxIdleConns,
}, &types.DatabaseConfig{
Username: cfg.Frontend.ReaderDatabase.Username,
Password: cfg.Frontend.ReaderDatabase.Password,
Name: cfg.Frontend.ReaderDatabase.Name,
Host: cfg.Frontend.ReaderDatabase.Host,
Port: cfg.Frontend.ReaderDatabase.Port,
MaxOpenConns: cfg.Frontend.ReaderDatabase.MaxOpenConns,
MaxIdleConns: cfg.Frontend.ReaderDatabase.MaxIdleConns,
})
}()

wg.Add(1)
go func() {
defer wg.Done()
bt, err := db.InitBigtable(utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance, fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID), utils.Config.RedisCacheEndpoint)
if err != nil {
logrus.Fatalf("error connecting to bigtable: %v", err)
}
db.BigtableClient = bt
}()

if utils.Config.TieredCacheProvider == "redis" || len(utils.Config.RedisCacheEndpoint) != 0 {
wg.Add(1)
go func() {
defer wg.Done()
cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint)
logrus.Infof("tiered Cache initialized, latest finalized epoch: %v", services.LatestFinalizedEpoch())
}()
}

wg.Wait()
if utils.Config.TieredCacheProvider == "bigtable" && len(utils.Config.RedisCacheEndpoint) == 0 {
cache.MustInitTieredCacheBigtable(db.BigtableClient.GetClient(), fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID))
logrus.Infof("tiered Cache initialized, latest finalized epoch: %v", services.LatestFinalizedEpoch())
}

if utils.Config.TieredCacheProvider != "bigtable" && utils.Config.TieredCacheProvider != "redis" {
logrus.Fatalf("no cache provider set, please set TierdCacheProvider (example redis, bigtable)")
}

defer db.ReaderDb.Close()
defer db.WriterDb.Close()
defer db.FrontendReaderDB.Close()
defer db.FrontendWriterDB.Close()
defer db.BigtableClient.Close()

if utils.Config.Metrics.Enabled {
go metrics.MonitorDB(db.WriterDb)
DBInfo := []string{
cfg.WriterDatabase.Username,
cfg.WriterDatabase.Password,
cfg.WriterDatabase.Host,
cfg.WriterDatabase.Port,
cfg.WriterDatabase.Name}
DBStr := strings.Join(DBInfo, "-")
frontendDBInfo := []string{
cfg.Frontend.WriterDatabase.Username,
cfg.Frontend.WriterDatabase.Password,
cfg.Frontend.WriterDatabase.Host,
cfg.Frontend.WriterDatabase.Port,
cfg.Frontend.WriterDatabase.Name}
frontendDBStr := strings.Join(frontendDBInfo, "-")
if DBStr != frontendDBStr {
go metrics.MonitorDB(db.FrontendWriterDB)
}
}

logrus.Infof("database connection established")

services.InitNotificationCollector(utils.Config.Notifications.PubkeyCachePath)

utils.WaitForCtrlC()

logrus.Println("exiting...")
}
161 changes: 161 additions & 0 deletions cmd/notification-sender/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package main

import (
"eth2-exporter/cache"
"eth2-exporter/db"
"eth2-exporter/metrics"
"eth2-exporter/services"
"eth2-exporter/types"
"eth2-exporter/utils"
"eth2-exporter/version"
"flag"
"fmt"
"net/http"
"strings"
"sync"

"github.com/sirupsen/logrus"

_ "eth2-exporter/docs"
_ "net/http/pprof"

_ "github.com/jackc/pgx/v4/stdlib"
)

func main() {
configPath := flag.String("config", "", "Path to the config file, if empty string defaults will be used")

flag.Parse()

cfg := &types.Config{}
err := utils.ReadConfig(cfg, *configPath)
if err != nil {
logrus.Fatalf("error reading config file: %v", err)
}
utils.Config = cfg
logrus.WithFields(logrus.Fields{
"config": *configPath,
"version": version.Version,
"chainName": utils.Config.Chain.Config.ConfigName}).Printf("starting")

if utils.Config.Chain.Config.SlotsPerEpoch == 0 || utils.Config.Chain.Config.SecondsPerSlot == 0 {
utils.LogFatal(err, "invalid chain configuration specified, you must specify the slots per epoch, seconds per slot and genesis timestamp in the config file", 0)
}

if utils.Config.Pprof.Enabled {
go func() {
logrus.Infof("starting pprof http server on port %s", utils.Config.Pprof.Port)
logrus.Info(http.ListenAndServe(fmt.Sprintf("0.0.0.0:%s", utils.Config.Pprof.Port), nil))
}()
}

wg := &sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
db.MustInitDB(&types.DatabaseConfig{
Username: cfg.WriterDatabase.Username,
Password: cfg.WriterDatabase.Password,
Name: cfg.WriterDatabase.Name,
Host: cfg.WriterDatabase.Host,
Port: cfg.WriterDatabase.Port,
MaxOpenConns: cfg.WriterDatabase.MaxOpenConns,
MaxIdleConns: cfg.WriterDatabase.MaxIdleConns,
}, &types.DatabaseConfig{
Username: cfg.ReaderDatabase.Username,
Password: cfg.ReaderDatabase.Password,
Name: cfg.ReaderDatabase.Name,
Host: cfg.ReaderDatabase.Host,
Port: cfg.ReaderDatabase.Port,
MaxOpenConns: cfg.ReaderDatabase.MaxOpenConns,
MaxIdleConns: cfg.ReaderDatabase.MaxIdleConns,
})
}()

wg.Add(1)
go func() {
defer wg.Done()
db.MustInitFrontendDB(&types.DatabaseConfig{
Username: cfg.Frontend.WriterDatabase.Username,
Password: cfg.Frontend.WriterDatabase.Password,
Name: cfg.Frontend.WriterDatabase.Name,
Host: cfg.Frontend.WriterDatabase.Host,
Port: cfg.Frontend.WriterDatabase.Port,
MaxOpenConns: cfg.Frontend.WriterDatabase.MaxOpenConns,
MaxIdleConns: cfg.Frontend.WriterDatabase.MaxIdleConns,
}, &types.DatabaseConfig{
Username: cfg.Frontend.ReaderDatabase.Username,
Password: cfg.Frontend.ReaderDatabase.Password,
Name: cfg.Frontend.ReaderDatabase.Name,
Host: cfg.Frontend.ReaderDatabase.Host,
Port: cfg.Frontend.ReaderDatabase.Port,
MaxOpenConns: cfg.Frontend.ReaderDatabase.MaxOpenConns,
MaxIdleConns: cfg.Frontend.ReaderDatabase.MaxIdleConns,
})
}()

wg.Add(1)
go func() {
defer wg.Done()
bt, err := db.InitBigtable(utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance, fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID), utils.Config.RedisCacheEndpoint)
if err != nil {
logrus.Fatalf("error connecting to bigtable: %v", err)
}
db.BigtableClient = bt
}()

if utils.Config.TieredCacheProvider == "redis" || len(utils.Config.RedisCacheEndpoint) != 0 {
wg.Add(1)
go func() {
defer wg.Done()
cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint)
logrus.Infof("tiered Cache initialized, latest finalized epoch: %v", services.LatestFinalizedEpoch())
}()
}

wg.Wait()
if utils.Config.TieredCacheProvider == "bigtable" && len(utils.Config.RedisCacheEndpoint) == 0 {
cache.MustInitTieredCacheBigtable(db.BigtableClient.GetClient(), fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID))
logrus.Infof("tiered Cache initialized, latest finalized epoch: %v", services.LatestFinalizedEpoch())
}

if utils.Config.TieredCacheProvider != "bigtable" && utils.Config.TieredCacheProvider != "redis" {
logrus.Fatalf("no cache provider set, please set TierdCacheProvider (example redis, bigtable)")
}

defer db.ReaderDb.Close()
defer db.WriterDb.Close()
defer db.FrontendReaderDB.Close()
defer db.FrontendWriterDB.Close()
defer db.BigtableClient.Close()

if utils.Config.Metrics.Enabled {
go metrics.MonitorDB(db.WriterDb)
DBInfo := []string{
cfg.WriterDatabase.Username,
cfg.WriterDatabase.Password,
cfg.WriterDatabase.Host,
cfg.WriterDatabase.Port,
cfg.WriterDatabase.Name}
DBStr := strings.Join(DBInfo, "-")
frontendDBInfo := []string{
cfg.Frontend.WriterDatabase.Username,
cfg.Frontend.WriterDatabase.Password,
cfg.Frontend.WriterDatabase.Host,
cfg.Frontend.WriterDatabase.Port,
cfg.Frontend.WriterDatabase.Name}
frontendDBStr := strings.Join(frontendDBInfo, "-")
if DBStr != frontendDBStr {
go metrics.MonitorDB(db.FrontendWriterDB)
}
}

logrus.Infof("database connection established")

services.InitNotificationSender()

utils.WaitForCtrlC()

logrus.Println("exiting...")
}
Loading

0 comments on commit e644da8

Please sign in to comment.