From ac25bb2976a095c8d905e2a3f6c5fb2c8f0539ab Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Mon, 26 Jun 2023 10:07:01 +0200 Subject: [PATCH 1/8] (BIDS-2245) split notification collector and sender in separate binaries --- Makefile | 8 +- cmd/explorer/main.go | 3 - cmd/notification-collector/main.go | 162 +++++++++++++++++++++++++++++ cmd/notification-sender/main.go | 162 +++++++++++++++++++++++++++++ services/services.go | 11 +- 5 files changed, 336 insertions(+), 10 deletions(-) create mode 100644 cmd/notification-collector/main.go create mode 100644 cmd/notification-sender/main.go diff --git a/Makefile b/Makefile index e52878a4a6..b110581b4e 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ BUILDDATE=`date -u +"%Y-%m-%dT%H:%M:%S%:z"` PACKAGE=eth2-exporter LDFLAGS="-X ${PACKAGE}/version.Version=${VERSION} -X ${PACKAGE}/version.BuildDate=${BUILDDATE} -X ${PACKAGE}/version.GitCommit=${GITCOMMIT} -X ${PACKAGE}/version.GitDate=${GITDATE} -s -w" -all: explorer stats frontend-data-updater eth1indexer ethstore-exporter rewards-exporter node-jobs-processor signatures +all: explorer stats frontend-data-updater eth1indexer ethstore-exporter rewards-exporter node-jobs-processor signatures notification-sender notification-collector lint: golint ./... @@ -44,6 +44,12 @@ signatures: misc: go build --ldflags=${LDFLAGS} -o bin/misc cmd/misc/main.go +notification-sender: + go build --ldflags=${LDFLAGS} -o bin/notification-sender cmd/notification-sender/main.go + +notification-collector: + go build --ldflags=${LDFLAGS} -o bin/notification-collector cmd/notification-collector/main.go + playground: go build --ldflags=${LDFLAGS} -o bin/add_income_stats cmd/playground/add_income_stats/main.go go build --ldflags=${LDFLAGS} -o bin/re_calculate_stats_totals cmd/playground/re_calculate_stats_totals/main.go diff --git a/cmd/explorer/main.go b/cmd/explorer/main.go index b3bec70e2d..d2338ce6dc 100644 --- a/cmd/explorer/main.go +++ b/cmd/explorer/main.go @@ -723,9 +723,6 @@ func main() { } }() } - if utils.Config.Notifications.Enabled { - services.InitNotifications(utils.Config.Notifications.PubkeyCachePath) - } if utils.Config.Metrics.Enabled { go func(addr string) { diff --git a/cmd/notification-collector/main.go b/cmd/notification-collector/main.go new file mode 100644 index 0000000000..f1927bb07c --- /dev/null +++ b/cmd/notification-collector/main.go @@ -0,0 +1,162 @@ +package main + +import ( + "eth2-exporter/cache" + "eth2-exporter/db" + "eth2-exporter/metrics" + "eth2-exporter/services" + "eth2-exporter/types" + "eth2-exporter/utils" + "eth2-exporter/version" + "flag" + "fmt" + "net/http" + "strings" + "sync" + + "github.com/sirupsen/logrus" + + _ "eth2-exporter/docs" + _ "net/http/pprof" + + _ "github.com/jackc/pgx/v4/stdlib" +) + +func main() { + configPath := flag.String("config", "", "Path to the config file, if empty string defaults will be used") + + flag.Parse() + + cfg := &types.Config{} + err := utils.ReadConfig(cfg, *configPath) + if err != nil { + logrus.Fatalf("error reading config file: %v", err) + } + utils.Config = cfg + logrus.WithFields(logrus.Fields{ + "config": *configPath, + "version": version.Version, + "chainName": utils.Config.Chain.Config.ConfigName}).Printf("starting") + + if utils.Config.Chain.Config.SlotsPerEpoch == 0 || utils.Config.Chain.Config.SecondsPerSlot == 0 { + utils.LogFatal(err, "invalid chain configuration specified, you must specify the slots per epoch, seconds per slot and genesis timestamp in the config file", 0) + } + + if utils.Config.Pprof.Enabled { + go func() { + logrus.Infof("starting pprof http server on port %s", utils.Config.Pprof.Port) + logrus.Info(http.ListenAndServe(fmt.Sprintf("0.0.0.0:%s", utils.Config.Pprof.Port), nil)) + }() + } + + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + db.MustInitDB(&types.DatabaseConfig{ + Username: cfg.WriterDatabase.Username, + Password: cfg.WriterDatabase.Password, + Name: cfg.WriterDatabase.Name, + Host: cfg.WriterDatabase.Host, + Port: cfg.WriterDatabase.Port, + MaxOpenConns: cfg.WriterDatabase.MaxOpenConns, + MaxIdleConns: cfg.WriterDatabase.MaxIdleConns, + }, &types.DatabaseConfig{ + Username: cfg.ReaderDatabase.Username, + Password: cfg.ReaderDatabase.Password, + Name: cfg.ReaderDatabase.Name, + Host: cfg.ReaderDatabase.Host, + Port: cfg.ReaderDatabase.Port, + MaxOpenConns: cfg.ReaderDatabase.MaxOpenConns, + MaxIdleConns: cfg.ReaderDatabase.MaxIdleConns, + }) + }() + + wg.Add(1) + go func() { + defer wg.Done() + db.MustInitFrontendDB(&types.DatabaseConfig{ + Username: cfg.Frontend.WriterDatabase.Username, + Password: cfg.Frontend.WriterDatabase.Password, + Name: cfg.Frontend.WriterDatabase.Name, + Host: cfg.Frontend.WriterDatabase.Host, + Port: cfg.Frontend.WriterDatabase.Port, + MaxOpenConns: cfg.Frontend.WriterDatabase.MaxOpenConns, + MaxIdleConns: cfg.Frontend.WriterDatabase.MaxIdleConns, + }, &types.DatabaseConfig{ + Username: cfg.Frontend.ReaderDatabase.Username, + Password: cfg.Frontend.ReaderDatabase.Password, + Name: cfg.Frontend.ReaderDatabase.Name, + Host: cfg.Frontend.ReaderDatabase.Host, + Port: cfg.Frontend.ReaderDatabase.Port, + MaxOpenConns: cfg.Frontend.ReaderDatabase.MaxOpenConns, + MaxIdleConns: cfg.Frontend.ReaderDatabase.MaxIdleConns, + }) + }() + + wg.Add(1) + go func() { + defer wg.Done() + bt, err := db.InitBigtable(utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance, fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID), utils.Config.RedisCacheEndpoint) + if err != nil { + logrus.Fatalf("error connecting to bigtable: %v", err) + } + db.BigtableClient = bt + }() + + if utils.Config.TieredCacheProvider == "redis" || len(utils.Config.RedisCacheEndpoint) != 0 { + wg.Add(1) + go func() { + defer wg.Done() + cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint) + logrus.Infof("Tiered Cache initialized. Latest finalized epoch: %v", services.LatestFinalizedEpoch()) + + }() + } + + wg.Wait() + if utils.Config.TieredCacheProvider == "bigtable" && len(utils.Config.RedisCacheEndpoint) == 0 { + cache.MustInitTieredCacheBigtable(db.BigtableClient.GetClient(), fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID)) + logrus.Infof("Tiered Cache initialized. Latest finalized epoch: %v", services.LatestFinalizedEpoch()) + } + + if utils.Config.TieredCacheProvider != "bigtable" && utils.Config.TieredCacheProvider != "redis" { + logrus.Fatalf("No cache provider set. Please set TierdCacheProvider (example redis, bigtable)") + } + + defer db.ReaderDb.Close() + defer db.WriterDb.Close() + defer db.FrontendReaderDB.Close() + defer db.FrontendWriterDB.Close() + defer db.BigtableClient.Close() + + if utils.Config.Metrics.Enabled { + go metrics.MonitorDB(db.WriterDb) + DBInfo := []string{ + cfg.WriterDatabase.Username, + cfg.WriterDatabase.Password, + cfg.WriterDatabase.Host, + cfg.WriterDatabase.Port, + cfg.WriterDatabase.Name} + DBStr := strings.Join(DBInfo, "-") + frontendDBInfo := []string{ + cfg.Frontend.WriterDatabase.Username, + cfg.Frontend.WriterDatabase.Password, + cfg.Frontend.WriterDatabase.Host, + cfg.Frontend.WriterDatabase.Port, + cfg.Frontend.WriterDatabase.Name} + frontendDBStr := strings.Join(frontendDBInfo, "-") + if DBStr != frontendDBStr { + go metrics.MonitorDB(db.FrontendWriterDB) + } + } + + logrus.Infof("database connection established") + + services.InitNotificationCollector(utils.Config.Notifications.PubkeyCachePath) + + utils.WaitForCtrlC() + + logrus.Println("exiting...") +} diff --git a/cmd/notification-sender/main.go b/cmd/notification-sender/main.go new file mode 100644 index 0000000000..2e0720beb4 --- /dev/null +++ b/cmd/notification-sender/main.go @@ -0,0 +1,162 @@ +package main + +import ( + "eth2-exporter/cache" + "eth2-exporter/db" + "eth2-exporter/metrics" + "eth2-exporter/services" + "eth2-exporter/types" + "eth2-exporter/utils" + "eth2-exporter/version" + "flag" + "fmt" + "net/http" + "strings" + "sync" + + "github.com/sirupsen/logrus" + + _ "eth2-exporter/docs" + _ "net/http/pprof" + + _ "github.com/jackc/pgx/v4/stdlib" +) + +func main() { + configPath := flag.String("config", "", "Path to the config file, if empty string defaults will be used") + + flag.Parse() + + cfg := &types.Config{} + err := utils.ReadConfig(cfg, *configPath) + if err != nil { + logrus.Fatalf("error reading config file: %v", err) + } + utils.Config = cfg + logrus.WithFields(logrus.Fields{ + "config": *configPath, + "version": version.Version, + "chainName": utils.Config.Chain.Config.ConfigName}).Printf("starting") + + if utils.Config.Chain.Config.SlotsPerEpoch == 0 || utils.Config.Chain.Config.SecondsPerSlot == 0 { + utils.LogFatal(err, "invalid chain configuration specified, you must specify the slots per epoch, seconds per slot and genesis timestamp in the config file", 0) + } + + if utils.Config.Pprof.Enabled { + go func() { + logrus.Infof("starting pprof http server on port %s", utils.Config.Pprof.Port) + logrus.Info(http.ListenAndServe(fmt.Sprintf("0.0.0.0:%s", utils.Config.Pprof.Port), nil)) + }() + } + + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + db.MustInitDB(&types.DatabaseConfig{ + Username: cfg.WriterDatabase.Username, + Password: cfg.WriterDatabase.Password, + Name: cfg.WriterDatabase.Name, + Host: cfg.WriterDatabase.Host, + Port: cfg.WriterDatabase.Port, + MaxOpenConns: cfg.WriterDatabase.MaxOpenConns, + MaxIdleConns: cfg.WriterDatabase.MaxIdleConns, + }, &types.DatabaseConfig{ + Username: cfg.ReaderDatabase.Username, + Password: cfg.ReaderDatabase.Password, + Name: cfg.ReaderDatabase.Name, + Host: cfg.ReaderDatabase.Host, + Port: cfg.ReaderDatabase.Port, + MaxOpenConns: cfg.ReaderDatabase.MaxOpenConns, + MaxIdleConns: cfg.ReaderDatabase.MaxIdleConns, + }) + }() + + wg.Add(1) + go func() { + defer wg.Done() + db.MustInitFrontendDB(&types.DatabaseConfig{ + Username: cfg.Frontend.WriterDatabase.Username, + Password: cfg.Frontend.WriterDatabase.Password, + Name: cfg.Frontend.WriterDatabase.Name, + Host: cfg.Frontend.WriterDatabase.Host, + Port: cfg.Frontend.WriterDatabase.Port, + MaxOpenConns: cfg.Frontend.WriterDatabase.MaxOpenConns, + MaxIdleConns: cfg.Frontend.WriterDatabase.MaxIdleConns, + }, &types.DatabaseConfig{ + Username: cfg.Frontend.ReaderDatabase.Username, + Password: cfg.Frontend.ReaderDatabase.Password, + Name: cfg.Frontend.ReaderDatabase.Name, + Host: cfg.Frontend.ReaderDatabase.Host, + Port: cfg.Frontend.ReaderDatabase.Port, + MaxOpenConns: cfg.Frontend.ReaderDatabase.MaxOpenConns, + MaxIdleConns: cfg.Frontend.ReaderDatabase.MaxIdleConns, + }) + }() + + wg.Add(1) + go func() { + defer wg.Done() + bt, err := db.InitBigtable(utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance, fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID), utils.Config.RedisCacheEndpoint) + if err != nil { + logrus.Fatalf("error connecting to bigtable: %v", err) + } + db.BigtableClient = bt + }() + + if utils.Config.TieredCacheProvider == "redis" || len(utils.Config.RedisCacheEndpoint) != 0 { + wg.Add(1) + go func() { + defer wg.Done() + cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint) + logrus.Infof("Tiered Cache initialized. Latest finalized epoch: %v", services.LatestFinalizedEpoch()) + + }() + } + + wg.Wait() + if utils.Config.TieredCacheProvider == "bigtable" && len(utils.Config.RedisCacheEndpoint) == 0 { + cache.MustInitTieredCacheBigtable(db.BigtableClient.GetClient(), fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID)) + logrus.Infof("Tiered Cache initialized. Latest finalized epoch: %v", services.LatestFinalizedEpoch()) + } + + if utils.Config.TieredCacheProvider != "bigtable" && utils.Config.TieredCacheProvider != "redis" { + logrus.Fatalf("No cache provider set. Please set TierdCacheProvider (example redis, bigtable)") + } + + defer db.ReaderDb.Close() + defer db.WriterDb.Close() + defer db.FrontendReaderDB.Close() + defer db.FrontendWriterDB.Close() + defer db.BigtableClient.Close() + + if utils.Config.Metrics.Enabled { + go metrics.MonitorDB(db.WriterDb) + DBInfo := []string{ + cfg.WriterDatabase.Username, + cfg.WriterDatabase.Password, + cfg.WriterDatabase.Host, + cfg.WriterDatabase.Port, + cfg.WriterDatabase.Name} + DBStr := strings.Join(DBInfo, "-") + frontendDBInfo := []string{ + cfg.Frontend.WriterDatabase.Username, + cfg.Frontend.WriterDatabase.Password, + cfg.Frontend.WriterDatabase.Host, + cfg.Frontend.WriterDatabase.Port, + cfg.Frontend.WriterDatabase.Name} + frontendDBStr := strings.Join(frontendDBInfo, "-") + if DBStr != frontendDBStr { + go metrics.MonitorDB(db.FrontendWriterDB) + } + } + + logrus.Infof("database connection established") + + services.InitNotificationSender() + + utils.WaitForCtrlC() + + logrus.Println("exiting...") +} diff --git a/services/services.go b/services/services.go index c0b41b42d3..88a38a1a0f 100644 --- a/services/services.go +++ b/services/services.go @@ -84,18 +84,17 @@ func Init() { ready.Wait() } -func InitNotifications(pubkeyCachePath string) { +func InitNotificationSender() { + logger.Infof("starting notifications-sender") + go notificationSender() +} +func InitNotificationCollector(pubkeyCachePath string) { err := initPubkeyCache(pubkeyCachePath) if err != nil { logger.Fatalf("error initializing pubkey cache path for notifications: %v", err) } - if utils.Config.Notifications.Sender { - logger.Infof("starting notifications-sender") - go notificationSender() - } - go notificationCollector() } From 823e776fea8d1ed267674961d6f90ce2f2c4a7b0 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Mon, 26 Jun 2023 10:18:01 +0200 Subject: [PATCH 2/8] (BIDS-2245) add loging to indicate that user db notifications are enabled --- services/notifications.go | 1 + 1 file changed, 1 insertion(+) diff --git a/services/notifications.go b/services/notifications.go index 4b4b50822c..a353d51d72 100644 --- a/services/notifications.go +++ b/services/notifications.go @@ -110,6 +110,7 @@ func notificationCollector() { // Network DB Notifications (user related, must only run on one instance ever!!!!) if utils.Config.Notifications.UserDBNotifications { + logger.Infof("collecting user db notifications") userNotifications, err := collectUserDbNotifications(epoch) if err != nil { logger.Errorf("error collection user db notifications: %v", err) From ccf6b82aa04003054eaccab65016537910c5ab06 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Mon, 26 Jun 2023 10:23:42 +0200 Subject: [PATCH 3/8] (BIDS-2245) improve logging --- services/notifications.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/services/notifications.go b/services/notifications.go index a353d51d72..6cd72cea0c 100644 --- a/services/notifications.go +++ b/services/notifications.go @@ -238,42 +238,42 @@ func collectNotifications(epoch uint64) (map[uint64]map[types.EventName][]types. metrics.Errors.WithLabelValues("notifications_collect_missed_attestation").Inc() return nil, fmt.Errorf("error collecting validator_attestation_missed notifications: %v", err) } - logger.Infof("collecting attestation & offline notifications took: %v\n", time.Since(start)) + logger.Infof("collecting attestation & offline notifications took: %v", time.Since(start)) err = collectBlockProposalNotifications(notificationsByUserID, 1, types.ValidatorExecutedProposalEventName, epoch) if err != nil { metrics.Errors.WithLabelValues("notifications_collect_executed_block_proposal").Inc() return nil, fmt.Errorf("error collecting validator_proposal_submitted notifications: %v", err) } - logger.Infof("collecting block proposal proposed notifications took: %v\n", time.Since(start)) + logger.Infof("collecting block proposal proposed notifications took: %v", time.Since(start)) err = collectBlockProposalNotifications(notificationsByUserID, 2, types.ValidatorMissedProposalEventName, epoch) if err != nil { metrics.Errors.WithLabelValues("notifications_collect_missed_block_proposal").Inc() return nil, fmt.Errorf("error collecting validator_proposal_missed notifications: %v", err) } - logger.Infof("collecting block proposal missed notifications took: %v\n", time.Since(start)) + logger.Infof("collecting block proposal missed notifications took: %v", time.Since(start)) err = collectValidatorGotSlashedNotifications(notificationsByUserID, epoch) if err != nil { metrics.Errors.WithLabelValues("notifications_collect_validator_got_slashed").Inc() return nil, fmt.Errorf("error collecting validator_got_slashed notifications: %v", err) } - logger.Infof("collecting validator got slashed notifications took: %v\n", time.Since(start)) + logger.Infof("collecting validator got slashed notifications took: %v", time.Since(start)) err = collectWithdrawalNotifications(notificationsByUserID, epoch) if err != nil { metrics.Errors.WithLabelValues("notifications_collect_validator_withdrawal").Inc() return nil, fmt.Errorf("error collecting withdrawal notifications: %v", err) } - logger.Infof("collecting withdrawal notifications took: %v\n", time.Since(start)) + logger.Infof("collecting withdrawal notifications took: %v", time.Since(start)) err = collectNetworkNotifications(notificationsByUserID, types.NetworkLivenessIncreasedEventName) if err != nil { metrics.Errors.WithLabelValues("notifications_collect_network").Inc() return nil, fmt.Errorf("error collecting network notifications: %v", err) } - logger.Infof("collecting network notifications took: %v\n", time.Since(start)) + logger.Infof("collecting network notifications took: %v", time.Since(start)) // Rocketpool { @@ -293,28 +293,28 @@ func collectNotifications(epoch uint64) (map[uint64]map[types.EventName][]types. metrics.Errors.WithLabelValues("notifications_collect_rocketpool_comission").Inc() return nil, fmt.Errorf("error collecting rocketpool commission: %v", err) } - logger.Infof("collecting rocketpool commissions took: %v\n", time.Since(start)) + logger.Infof("collecting rocketpool commissions took: %v", time.Since(start)) err = collectRocketpoolRewardClaimRoundNotifications(notificationsByUserID, types.RocketpoolNewClaimRoundStartedEventName) if err != nil { metrics.Errors.WithLabelValues("notifications_collect_rocketpool_reward_claim").Inc() return nil, fmt.Errorf("error collecting new rocketpool claim round: %v", err) } - logger.Infof("collecting rocketpool claim round took: %v\n", time.Since(start)) + logger.Infof("collecting rocketpool claim round took: %v", time.Since(start)) err = collectRocketpoolRPLCollateralNotifications(notificationsByUserID, types.RocketpoolCollateralMaxReached, epoch) if err != nil { metrics.Errors.WithLabelValues("notifications_collect_rocketpool_rpl_collateral_max_reached").Inc() return nil, fmt.Errorf("error collecting rocketpool max collateral: %v", err) } - logger.Infof("collecting rocketpool max collateral took: %v\n", time.Since(start)) + logger.Infof("collecting rocketpool max collateral took: %v", time.Since(start)) err = collectRocketpoolRPLCollateralNotifications(notificationsByUserID, types.RocketpoolCollateralMinReached, epoch) if err != nil { metrics.Errors.WithLabelValues("notifications_collect_rocketpool_rpl_collateral_min_reached").Inc() return nil, fmt.Errorf("error collecting rocketpool min collateral: %v", err) } - logger.Infof("collecting rocketpool min collateral took: %v\n", time.Since(start)) + logger.Infof("collecting rocketpool min collateral took: %v", time.Since(start)) } } @@ -323,7 +323,7 @@ func collectNotifications(epoch uint64) (map[uint64]map[types.EventName][]types. metrics.Errors.WithLabelValues("notifications_collect_sync_committee").Inc() return nil, fmt.Errorf("error collecting sync committee: %v", err) } - logger.Infof("collecting sync committee took: %v\n", time.Since(start)) + logger.Infof("collecting sync committee took: %v", time.Since(start)) return notificationsByUserID, nil } @@ -855,7 +855,7 @@ func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventN continue } } else if w.Retries > 5 && !w.LastSent.Valid { - logger.Warn("error webhook has more than 5 retries and does not have a valid last_sent timestamp") + logger.Warnf("error webhook '%v' has more than 5 retries and does not have a valid last_sent timestamp", w.Url) continue } From 11ccd3cd738899a84b800a5009ec7d6849df3a19 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Mon, 26 Jun 2023 11:34:34 +0200 Subject: [PATCH 4/8] (BIDS-2245) updated logging --- cmd/explorer/main.go | 6 +++--- cmd/notification-collector/main.go | 7 +++---- cmd/notification-sender/main.go | 7 +++---- db/db.go | 8 ++++++++ services/notifications.go | 7 +++---- 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/cmd/explorer/main.go b/cmd/explorer/main.go index d2338ce6dc..57f9eb3ac6 100644 --- a/cmd/explorer/main.go +++ b/cmd/explorer/main.go @@ -179,7 +179,7 @@ func main() { go func() { defer wg.Done() cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint) - logrus.Infof("Tiered Cache initialized. Latest finalized epoch: %v", services.LatestFinalizedEpoch()) + logrus.Infof("tiered Cache initialized, latest finalized epoch: %v", services.LatestFinalizedEpoch()) }() } @@ -187,11 +187,11 @@ func main() { wg.Wait() if utils.Config.TieredCacheProvider == "bigtable" && len(utils.Config.RedisCacheEndpoint) == 0 { cache.MustInitTieredCacheBigtable(db.BigtableClient.GetClient(), fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID)) - logrus.Infof("Tiered Cache initialized. Latest finalized epoch: %v", services.LatestFinalizedEpoch()) + logrus.Infof("tiered Cache initialized, latest finalized epoch: %v", services.LatestFinalizedEpoch()) } if utils.Config.TieredCacheProvider != "bigtable" && utils.Config.TieredCacheProvider != "redis" { - logrus.Fatalf("No cache provider set. Please set TierdCacheProvider (example redis, bigtable)") + logrus.Fatalf("no cache provider set, please set TierdCacheProvider (example redis, bigtable)") } defer db.ReaderDb.Close() diff --git a/cmd/notification-collector/main.go b/cmd/notification-collector/main.go index f1927bb07c..3d84e29f3d 100644 --- a/cmd/notification-collector/main.go +++ b/cmd/notification-collector/main.go @@ -110,19 +110,18 @@ func main() { go func() { defer wg.Done() cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint) - logrus.Infof("Tiered Cache initialized. Latest finalized epoch: %v", services.LatestFinalizedEpoch()) - + logrus.Infof("tiered Cache initialized, latest finalized epoch: %v", services.LatestFinalizedEpoch()) }() } wg.Wait() if utils.Config.TieredCacheProvider == "bigtable" && len(utils.Config.RedisCacheEndpoint) == 0 { cache.MustInitTieredCacheBigtable(db.BigtableClient.GetClient(), fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID)) - logrus.Infof("Tiered Cache initialized. Latest finalized epoch: %v", services.LatestFinalizedEpoch()) + logrus.Infof("tiered Cache initialized, latest finalized epoch: %v", services.LatestFinalizedEpoch()) } if utils.Config.TieredCacheProvider != "bigtable" && utils.Config.TieredCacheProvider != "redis" { - logrus.Fatalf("No cache provider set. Please set TierdCacheProvider (example redis, bigtable)") + logrus.Fatalf("no cache provider set, please set TierdCacheProvider (example redis, bigtable)") } defer db.ReaderDb.Close() diff --git a/cmd/notification-sender/main.go b/cmd/notification-sender/main.go index 2e0720beb4..a096199468 100644 --- a/cmd/notification-sender/main.go +++ b/cmd/notification-sender/main.go @@ -110,19 +110,18 @@ func main() { go func() { defer wg.Done() cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint) - logrus.Infof("Tiered Cache initialized. Latest finalized epoch: %v", services.LatestFinalizedEpoch()) - + logrus.Infof("tiered Cache initialized, latest finalized epoch: %v", services.LatestFinalizedEpoch()) }() } wg.Wait() if utils.Config.TieredCacheProvider == "bigtable" && len(utils.Config.RedisCacheEndpoint) == 0 { cache.MustInitTieredCacheBigtable(db.BigtableClient.GetClient(), fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID)) - logrus.Infof("Tiered Cache initialized. Latest finalized epoch: %v", services.LatestFinalizedEpoch()) + logrus.Infof("tiered Cache initialized, latest finalized epoch: %v", services.LatestFinalizedEpoch()) } if utils.Config.TieredCacheProvider != "bigtable" && utils.Config.TieredCacheProvider != "redis" { - logrus.Fatalf("No cache provider set. Please set TierdCacheProvider (example redis, bigtable)") + logrus.Fatalf("no cache provider set, pease set TierdCacheProvider (example redis, bigtable)") } defer db.ReaderDb.Close() diff --git a/db/db.go b/db/db.go index f1490c782b..85d8cadd3d 100644 --- a/db/db.go +++ b/db/db.go @@ -75,6 +75,9 @@ func mustInitDB(writer *types.DatabaseConfig, reader *types.DatabaseConfig) (*sq if writer.MaxIdleConns == 0 { writer.MaxIdleConns = 10 } + if writer.MaxOpenConns < writer.MaxIdleConns { + writer.MaxIdleConns = writer.MaxOpenConns + } if reader.MaxOpenConns == 0 { reader.MaxOpenConns = 50 @@ -82,7 +85,11 @@ func mustInitDB(writer *types.DatabaseConfig, reader *types.DatabaseConfig) (*sq if reader.MaxIdleConns == 0 { reader.MaxIdleConns = 10 } + if reader.MaxOpenConns < reader.MaxIdleConns { + reader.MaxIdleConns = reader.MaxOpenConns + } + logger.Infof("initializing writer db connection to %v with %v/%v conn limit", writer.Host, writer.MaxIdleConns, writer.MaxOpenConns) dbConnWriter, err := sqlx.Open("pgx", fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", writer.Username, writer.Password, writer.Host, writer.Port, writer.Name)) if err != nil { utils.LogFatal(err, "error getting Connection Writer database", 0) @@ -98,6 +105,7 @@ func mustInitDB(writer *types.DatabaseConfig, reader *types.DatabaseConfig) (*sq return dbConnWriter, dbConnWriter } + logger.Infof("initializing reader db connection to %v with %v/%v conn limit", writer.Host, reader.MaxIdleConns, reader.MaxOpenConns) dbConnReader, err := sqlx.Open("pgx", fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", reader.Username, reader.Password, reader.Host, reader.Port, reader.Name)) if err != nil { utils.LogFatal(err, "error getting Connection Reader database", 0) diff --git a/services/notifications.go b/services/notifications.go index 6cd72cea0c..b7f4612a46 100644 --- a/services/notifications.go +++ b/services/notifications.go @@ -141,7 +141,7 @@ func notificationSender() { start := time.Now() ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) - conn, err := db.FrontendReaderDB.Conn(ctx) + conn, err := db.FrontendWriterDB.Conn(ctx) if err != nil { logger.WithError(err).Error("error creating connection") cancel() @@ -502,7 +502,7 @@ func garbageCollectNotificationQueue(useDB *sqlx.DB) error { rowsAffected, _ := rows.RowsAffected() - logger.Infof("Deleted %v rows from the notification_queue", rowsAffected) + logger.Infof("deleted %v rows from the notification_queue", rowsAffected) return nil } @@ -855,7 +855,7 @@ func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventN continue } } else if w.Retries > 5 && !w.LastSent.Valid { - logger.Warnf("error webhook '%v' has more than 5 retries and does not have a valid last_sent timestamp", w.Url) + logger.Warnf("webhook '%v' has more than 5 retries and does not have a valid last_sent timestamp", w.Url) continue } @@ -1124,7 +1124,6 @@ func sendDiscordNotifications(useDB *sqlx.DB) error { continue // skip } - logger.Infof("discord request webhook body: %s", reqBody.String()) resp, err := client.Post(webhook.Url, "application/json", reqBody) if err != nil { logger.Errorf("error sending discord webhook request: %v", err) From a8e1d31250e68238f8fa7ab33c0183cce0de822c Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Mon, 26 Jun 2023 13:32:49 +0200 Subject: [PATCH 5/8] (BIDS-2245) update logging --- notify/firebase.go | 2 +- services/notifications.go | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/notify/firebase.go b/notify/firebase.go index d6a5fbc9c6..bf1a642671 100644 --- a/notify/firebase.go +++ b/notify/firebase.go @@ -90,6 +90,6 @@ func SendPushBatch(messages []*messaging.Message) error { } } - logger.Infof("Sent %d firebase notifications in %d of %d tries. Successful: %d | Failed: %d", len(messages), tries, len(waitBeforeTryInSeconds), resultSuccessCount, resultFailureCount) + logger.Infof("sent %d firebase notifications in %d of %d tries. successful: %d | failed: %d", len(messages), tries, len(waitBeforeTryInSeconds), resultSuccessCount, resultFailureCount) return nil } diff --git a/services/notifications.go b/services/notifications.go index b7f4612a46..5108fbe156 100644 --- a/services/notifications.go +++ b/services/notifications.go @@ -231,7 +231,7 @@ func collectNotifications(epoch uint64) (map[uint64]map[types.EventName][]types. return nil, fmt.Errorf("epochs coherence check failed, aborting") } - logger.Infof("Started collecting notifications") + logger.Infof("started collecting notifications") err = collectAttestationAndOfflineValidatorNotifications(notificationsByUserID, 0, epoch) if err != nil { @@ -1012,13 +1012,13 @@ func sendWebhookNotifications(useDB *sqlx.DB) error { metrics.NotificationsSent.WithLabelValues("webhook", resp.Status).Inc() } - if resp != nil && resp.StatusCode < 400 { - _, err := useDB.Exec(`UPDATE notification_queue SET sent = now();`) - if err != nil { - logger.WithError(err).Errorf("error updating notification_queue table") - return - } + _, err = useDB.Exec(`UPDATE notification_queue SET sent = now() where id = $1`, n.Id) + if err != nil { + logger.WithError(err).Errorf("error updating notification_queue table") + return + } + if resp != nil && resp.StatusCode < 400 { _, err = useDB.Exec(`UPDATE users_webhooks SET retries = 0, last_sent = now() WHERE id = $1;`, n.Content.Webhook.ID) if err != nil { logger.WithError(err).Errorf("error updating users_webhooks table; setting retries to zero") From c0c2f7aecc43ab3d811bf22fce90d3ad750a5dd7 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Tue, 27 Jun 2023 10:57:42 +0200 Subject: [PATCH 6/8] (BIDS-2245) fix typo --- cmd/notification-sender/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/notification-sender/main.go b/cmd/notification-sender/main.go index a096199468..6a9e950dfd 100644 --- a/cmd/notification-sender/main.go +++ b/cmd/notification-sender/main.go @@ -121,7 +121,7 @@ func main() { } if utils.Config.TieredCacheProvider != "bigtable" && utils.Config.TieredCacheProvider != "redis" { - logrus.Fatalf("no cache provider set, pease set TierdCacheProvider (example redis, bigtable)") + logrus.Fatalf("no cache provider set, please set TierdCacheProvider (example redis, bigtable)") } defer db.ReaderDb.Close() From 5a771bc7ba1eb35b46535f9228e289f597d8e360 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Tue, 27 Jun 2023 10:58:18 +0200 Subject: [PATCH 7/8] (BIDS-2245) remove notifications flag --- types/config.go | 1 - 1 file changed, 1 deletion(-) diff --git a/types/config.go b/types/config.go index ff56b83181..36d757e93f 100644 --- a/types/config.go +++ b/types/config.go @@ -169,7 +169,6 @@ type Config struct { Pprof bool `yaml:"pprof" envconfig:"METRICS_PPROF"` } `yaml:"metrics"` Notifications struct { - Enabled bool `yaml:"enabled" envconfig:"NOTIFICATIONS_ENABLED"` Sender bool `yaml:"sender" envconfig:"NOTIFICATIONS_SENDER"` UserDBNotifications bool `yaml:"userDbNotifications" envconfig:"USERDB_NOTIFICATIONS_ENABLED"` FirebaseCredentialsPath string `yaml:"firebaseCredentialsPath" envconfig:"NOTIFICATIONS_FIREBASE_CRED_PATH"` From 8f4d97187e8f4f1f68b67e22194cb2b068033df2 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Tue, 27 Jun 2023 12:06:23 +0200 Subject: [PATCH 8/8] (BIDS-2245) remove obsolete notification sender config --- types/config.go | 1 - 1 file changed, 1 deletion(-) diff --git a/types/config.go b/types/config.go index 36d757e93f..587daa009a 100644 --- a/types/config.go +++ b/types/config.go @@ -169,7 +169,6 @@ type Config struct { Pprof bool `yaml:"pprof" envconfig:"METRICS_PPROF"` } `yaml:"metrics"` Notifications struct { - Sender bool `yaml:"sender" envconfig:"NOTIFICATIONS_SENDER"` UserDBNotifications bool `yaml:"userDbNotifications" envconfig:"USERDB_NOTIFICATIONS_ENABLED"` FirebaseCredentialsPath string `yaml:"firebaseCredentialsPath" envconfig:"NOTIFICATIONS_FIREBASE_CRED_PATH"` ValidatorBalanceDecreasedNotificationsEnabled bool `yaml:"validatorBalanceDecreasedNotificationsEnabled" envconfig:"VALIDATOR_BALANCE_DECREASED_NOTIFICATIONS_ENABLED"`