From 95bd4b67012c7b4f079ac6da201d97e7cee422f8 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Thu, 27 Oct 2022 16:50:27 +0200 Subject: [PATCH] node: improve pprof lifecycle (#9628) --- config/config.go | 9 + node/id.go | 35 -- node/node.go | 813 +++++----------------------------------------- node/node_test.go | 24 ++ node/setup.go | 711 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 817 insertions(+), 775 deletions(-) delete mode 100644 node/id.go create mode 100644 node/setup.go diff --git a/config/config.go b/config/config.go index 76a89f9532f..f072cad2d2a 100644 --- a/config/config.go +++ b/config/config.go @@ -423,6 +423,7 @@ type RPCConfig struct { TLSKeyFile string `mapstructure:"tls_key_file"` // pprof listen address (https://golang.org/pkg/net/http/pprof) + // FIXME: This should be moved under the instrumentation section PprofListenAddress string `mapstructure:"pprof_laddr"` } @@ -506,6 +507,10 @@ func (cfg *RPCConfig) IsCorsEnabled() bool { return len(cfg.CORSAllowedOrigins) != 0 } +func (cfg *RPCConfig) IsPprofEnabled() bool { + return len(cfg.PprofListenAddress) != 0 +} + func (cfg RPCConfig) KeyFile() string { path := cfg.TLSKeyFile if filepath.IsAbs(path) { @@ -1201,6 +1206,10 @@ func (cfg *InstrumentationConfig) ValidateBasic() error { return nil } +func (cfg *InstrumentationConfig) IsPrometheusEnabled() bool { + return cfg.Prometheus && cfg.PrometheusListenAddr != "" +} + //----------------------------------------------------------------------------- // Utils diff --git a/node/id.go b/node/id.go deleted file mode 100644 index ffa162f8157..00000000000 --- a/node/id.go +++ /dev/null @@ -1,35 +0,0 @@ -package node - -import ( - "time" - - "github.com/tendermint/tendermint/crypto" -) - -type ID struct { - Name string - PubKey crypto.PubKey -} - -type PrivNodeID struct { - ID - PrivKey crypto.PrivKey -} - -type Greeting struct { - ID - Version string - ChainID string - Message string - Time time.Time -} - -type SignedNodeGreeting struct { - Greeting - Signature []byte -} - -func (pnid *PrivNodeID) SignGreeting() *SignedNodeGreeting { - // greeting := NodeGreeting{} - return nil -} diff --git a/node/node.go b/node/node.go index 5857461a95b..18f3f1e84cd 100644 --- a/node/node.go +++ b/node/node.go @@ -1,49 +1,34 @@ package node import ( - "bytes" "context" - "errors" "fmt" "net" "net/http" - "strings" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" - dbm "github.com/tendermint/tm-db" - abci "github.com/tendermint/tendermint/abci/types" bc "github.com/tendermint/tendermint/blocksync" cfg "github.com/tendermint/tendermint/config" cs "github.com/tendermint/tendermint/consensus" - "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/evidence" - tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/libs/service" - "github.com/tendermint/tendermint/light" mempl "github.com/tendermint/tendermint/mempool" - mempoolv0 "github.com/tendermint/tendermint/mempool/v0" - mempoolv1 "github.com/tendermint/tendermint/mempool/v1" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/pex" - "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/proxy" rpccore "github.com/tendermint/tendermint/rpc/core" grpccore "github.com/tendermint/tendermint/rpc/grpc" rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/indexer" - blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" - blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null" - "github.com/tendermint/tendermint/state/indexer/sink/psql" "github.com/tendermint/tendermint/state/txindex" - "github.com/tendermint/tendermint/state/txindex/kv" "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/statesync" "github.com/tendermint/tendermint/store" @@ -51,94 +36,54 @@ import ( tmtime "github.com/tendermint/tendermint/types/time" "github.com/tendermint/tendermint/version" - _ "net/http/pprof" //nolint: gosec // securely exposed on separate, optional port - - _ "github.com/lib/pq" // provide the psql db driver + _ "net/http/pprof" //nolint: gosec ) -//------------------------------------------------------------------------------ - -// DBContext specifies config information for loading a new DB. -type DBContext struct { - ID string - Config *cfg.Config -} - -// DBProvider takes a DBContext and returns an instantiated DB. -type DBProvider func(*DBContext) (dbm.DB, error) - -const readHeaderTimeout = 10 * time.Second - -// DefaultDBProvider returns a database using the DBBackend and DBDir -// specified in the ctx.Config. -func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) { - dbType := dbm.BackendType(ctx.Config.DBBackend) - return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()) -} - -// GenesisDocProvider returns a GenesisDoc. -// It allows the GenesisDoc to be pulled from sources other than the -// filesystem, for instance from a distributed key-value store cluster. -type GenesisDocProvider func() (*types.GenesisDoc, error) - -// DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads -// the GenesisDoc from the config.GenesisFile() on the filesystem. -func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider { - return func() (*types.GenesisDoc, error) { - return types.GenesisDocFromFile(config.GenesisFile()) - } -} - -// Provider takes a config and a logger and returns a ready to go Node. -type Provider func(*cfg.Config, log.Logger) (*Node, error) +// Node is the highest level interface to a full Tendermint node. +// It includes all configuration information and running services. +type Node struct { + service.BaseService -// DefaultNewNode returns a Tendermint node with default settings for the -// PrivValidator, ClientCreator, GenesisDoc, and DBProvider. -// It implements NodeProvider. -func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { - nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile()) - if err != nil { - return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err) - } + // config + config *cfg.Config + genesisDoc *types.GenesisDoc // initial validator set + privValidator types.PrivValidator // local node's validator key - return NewNode(config, - privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()), - nodeKey, - proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), - DefaultGenesisDocProviderFunc(config), - DefaultDBProvider, - DefaultMetricsProvider(config.Instrumentation), - logger, - ) -} + // network + transport *p2p.MultiplexTransport + sw *p2p.Switch // p2p connections + addrBook pex.AddrBook // known peers + nodeInfo p2p.NodeInfo + nodeKey *p2p.NodeKey // our node privkey + isListening bool -// MetricsProvider returns a consensus, p2p and mempool Metrics. -type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) - -// DefaultMetricsProvider returns Metrics build using Prometheus client library -// if Prometheus is enabled. Otherwise, it returns no-op Metrics. -func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { - return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) { - if config.Prometheus { - return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID), - p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID), - mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID), - sm.PrometheusMetrics(config.Namespace, "chain_id", chainID), - proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID) - } - return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics() - } + // services + eventBus *types.EventBus // pub/sub for services + stateStore sm.Store + blockStore *store.BlockStore // store the blockchain to disk + bcReactor p2p.Reactor // for block-syncing + mempoolReactor p2p.Reactor // for gossipping transactions + mempool mempl.Mempool + stateSync bool // whether the node should state sync on startup + stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots + stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node + stateSyncGenesis sm.State // provides the genesis state for state sync + consensusState *cs.State // latest consensus state + consensusReactor *cs.Reactor // for participating in the consensus + pexReactor *pex.Reactor // for exchanging peer addresses + evidencePool *evidence.Pool // tracking evidence + proxyApp proxy.AppConns // connection to the application + rpcListeners []net.Listener // rpc servers + txIndexer txindex.TxIndexer + blockIndexer indexer.BlockIndexer + indexerService *txindex.IndexerService + prometheusSrv *http.Server + pprofSrv *http.Server } // Option sets a parameter for the node. type Option func(*Node) -// Temporary interface for switching to block sync, we should get rid of v0 and v1 reactors. -// See: https://github.com/tendermint/tendermint/issues/4595 -type blockSyncReactor interface { - SwitchToBlockSync(sm.State) error -} - // CustomReactors allows you to add custom reactors (name -> p2p.Reactor) to // the node's Switch. // @@ -190,517 +135,6 @@ func StateProvider(stateProvider statesync.StateProvider) Option { //------------------------------------------------------------------------------ -// Node is the highest level interface to a full Tendermint node. -// It includes all configuration information and running services. -type Node struct { - service.BaseService - - // config - config *cfg.Config - genesisDoc *types.GenesisDoc // initial validator set - privValidator types.PrivValidator // local node's validator key - - // network - transport *p2p.MultiplexTransport - sw *p2p.Switch // p2p connections - addrBook pex.AddrBook // known peers - nodeInfo p2p.NodeInfo - nodeKey *p2p.NodeKey // our node privkey - isListening bool - - // services - eventBus *types.EventBus // pub/sub for services - stateStore sm.Store - blockStore *store.BlockStore // store the blockchain to disk - bcReactor p2p.Reactor // for block-syncing - mempoolReactor p2p.Reactor // for gossipping transactions - mempool mempl.Mempool - stateSync bool // whether the node should state sync on startup - stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots - stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node - stateSyncGenesis sm.State // provides the genesis state for state sync - consensusState *cs.State // latest consensus state - consensusReactor *cs.Reactor // for participating in the consensus - pexReactor *pex.Reactor // for exchanging peer addresses - evidencePool *evidence.Pool // tracking evidence - proxyApp proxy.AppConns // connection to the application - rpcListeners []net.Listener // rpc servers - txIndexer txindex.TxIndexer - blockIndexer indexer.BlockIndexer - indexerService *txindex.IndexerService - prometheusSrv *http.Server -} - -func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { - var blockStoreDB dbm.DB - blockStoreDB, err = dbProvider(&DBContext{"blockstore", config}) - if err != nil { - return - } - blockStore = store.NewBlockStore(blockStoreDB) - - stateDB, err = dbProvider(&DBContext{"state", config}) - if err != nil { - return - } - - return -} - -func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) { - proxyApp := proxy.NewAppConns(clientCreator, metrics) - proxyApp.SetLogger(logger.With("module", "proxy")) - if err := proxyApp.Start(); err != nil { - return nil, fmt.Errorf("error starting proxy app connections: %v", err) - } - return proxyApp, nil -} - -func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) { - eventBus := types.NewEventBus() - eventBus.SetLogger(logger.With("module", "events")) - if err := eventBus.Start(); err != nil { - return nil, err - } - return eventBus, nil -} - -func createAndStartIndexerService( - config *cfg.Config, - chainID string, - dbProvider DBProvider, - eventBus *types.EventBus, - logger log.Logger, -) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) { - var ( - txIndexer txindex.TxIndexer - blockIndexer indexer.BlockIndexer - ) - - switch config.TxIndex.Indexer { - case "kv": - store, err := dbProvider(&DBContext{"tx_index", config}) - if err != nil { - return nil, nil, nil, err - } - - txIndexer = kv.NewTxIndex(store) - blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events"))) - - case "psql": - if config.TxIndex.PsqlConn == "" { - return nil, nil, nil, errors.New(`no psql-conn is set for the "psql" indexer`) - } - es, err := psql.NewEventSink(config.TxIndex.PsqlConn, chainID) - if err != nil { - return nil, nil, nil, fmt.Errorf("creating psql indexer: %w", err) - } - txIndexer = es.TxIndexer() - blockIndexer = es.BlockIndexer() - - default: - txIndexer = &null.TxIndex{} - blockIndexer = &blockidxnull.BlockerIndexer{} - } - - indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false) - indexerService.SetLogger(logger.With("module", "txindex")) - - if err := indexerService.Start(); err != nil { - return nil, nil, nil, err - } - - return indexerService, txIndexer, blockIndexer, nil -} - -func doHandshake( - stateStore sm.Store, - state sm.State, - blockStore sm.BlockStore, - genDoc *types.GenesisDoc, - eventBus types.BlockEventPublisher, - proxyApp proxy.AppConns, - consensusLogger log.Logger, -) error { - handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc) - handshaker.SetLogger(consensusLogger) - handshaker.SetEventBus(eventBus) - if err := handshaker.Handshake(proxyApp); err != nil { - return fmt.Errorf("error during handshake: %v", err) - } - return nil -} - -func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) { - // Log the version info. - logger.Info("Version info", - "tendermint_version", version.TMCoreSemVer, - "abci", version.ABCISemVer, - "block", version.BlockProtocol, - "p2p", version.P2PProtocol, - "commit_hash", version.TMGitCommitHash, - ) - - // If the state and software differ in block version, at least log it. - if state.Version.Consensus.Block != version.BlockProtocol { - logger.Info("Software and state have different block protocols", - "software", version.BlockProtocol, - "state", state.Version.Consensus.Block, - ) - } - - addr := pubKey.Address() - // Log whether this node is a validator or an observer - if state.Validators.HasAddress(addr) { - consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey) - } else { - consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey) - } -} - -func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { - if state.Validators.Size() > 1 { - return false - } - addr, _ := state.Validators.GetByIndex(0) - return bytes.Equal(pubKey.Address(), addr) -} - -func createMempoolAndMempoolReactor( - config *cfg.Config, - proxyApp proxy.AppConns, - state sm.State, - memplMetrics *mempl.Metrics, - logger log.Logger, -) (mempl.Mempool, p2p.Reactor) { - switch config.Mempool.Version { - case cfg.MempoolV1: - mp := mempoolv1.NewTxMempool( - logger, - config.Mempool, - proxyApp.Mempool(), - state.LastBlockHeight, - mempoolv1.WithMetrics(memplMetrics), - mempoolv1.WithPreCheck(sm.TxPreCheck(state)), - mempoolv1.WithPostCheck(sm.TxPostCheck(state)), - ) - - reactor := mempoolv1.NewReactor( - config.Mempool, - mp, - ) - if config.Consensus.WaitForTxs() { - mp.EnableTxsAvailable() - } - - return mp, reactor - - case cfg.MempoolV0: - mp := mempoolv0.NewCListMempool( - config.Mempool, - proxyApp.Mempool(), - state.LastBlockHeight, - mempoolv0.WithMetrics(memplMetrics), - mempoolv0.WithPreCheck(sm.TxPreCheck(state)), - mempoolv0.WithPostCheck(sm.TxPostCheck(state)), - ) - - mp.SetLogger(logger) - - reactor := mempoolv0.NewReactor( - config.Mempool, - mp, - ) - if config.Consensus.WaitForTxs() { - mp.EnableTxsAvailable() - } - - return mp, reactor - - default: - return nil, nil - } -} - -func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider, - stateDB dbm.DB, blockStore *store.BlockStore, logger log.Logger, -) (*evidence.Reactor, *evidence.Pool, error) { - evidenceDB, err := dbProvider(&DBContext{"evidence", config}) - if err != nil { - return nil, nil, err - } - evidenceLogger := logger.With("module", "evidence") - evidencePool, err := evidence.NewPool(evidenceDB, sm.NewStore(stateDB, sm.StoreOptions{ - DiscardABCIResponses: config.Storage.DiscardABCIResponses, - }), blockStore) - if err != nil { - return nil, nil, err - } - evidenceReactor := evidence.NewReactor(evidencePool) - evidenceReactor.SetLogger(evidenceLogger) - return evidenceReactor, evidencePool, nil -} - -func createBlocksyncReactor(config *cfg.Config, - state sm.State, - blockExec *sm.BlockExecutor, - blockStore *store.BlockStore, - blockSync bool, - logger log.Logger, -) (bcReactor p2p.Reactor, err error) { - switch config.BlockSync.Version { - case "v0": - bcReactor = bc.NewReactor(state.Copy(), blockExec, blockStore, blockSync) - case "v1", "v2": - return nil, fmt.Errorf("block sync version %s has been deprecated. Please use v0", config.BlockSync.Version) - default: - return nil, fmt.Errorf("unknown fastsync version %s", config.BlockSync.Version) - } - - bcReactor.SetLogger(logger.With("module", "blocksync")) - return bcReactor, nil -} - -func createConsensusReactor(config *cfg.Config, - state sm.State, - blockExec *sm.BlockExecutor, - blockStore sm.BlockStore, - mempool mempl.Mempool, - evidencePool *evidence.Pool, - privValidator types.PrivValidator, - csMetrics *cs.Metrics, - waitSync bool, - eventBus *types.EventBus, - consensusLogger log.Logger, -) (*cs.Reactor, *cs.State) { - consensusState := cs.NewState( - config.Consensus, - state.Copy(), - blockExec, - blockStore, - mempool, - evidencePool, - cs.StateMetrics(csMetrics), - ) - consensusState.SetLogger(consensusLogger) - if privValidator != nil { - consensusState.SetPrivValidator(privValidator) - } - consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics)) - consensusReactor.SetLogger(consensusLogger) - // services which will be publishing and/or subscribing for messages (events) - // consensusReactor will set it on consensusState and blockExecutor - consensusReactor.SetEventBus(eventBus) - return consensusReactor, consensusState -} - -func createTransport( - config *cfg.Config, - nodeInfo p2p.NodeInfo, - nodeKey *p2p.NodeKey, - proxyApp proxy.AppConns, -) ( - *p2p.MultiplexTransport, - []p2p.PeerFilterFunc, -) { - var ( - mConnConfig = p2p.MConnConfig(config.P2P) - transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig) - connFilters = []p2p.ConnFilterFunc{} - peerFilters = []p2p.PeerFilterFunc{} - ) - - if !config.P2P.AllowDuplicateIP { - connFilters = append(connFilters, p2p.ConnDuplicateIPFilter()) - } - - // Filter peers by addr or pubkey with an ABCI query. - // If the query return code is OK, add peer. - if config.FilterPeers { - connFilters = append( - connFilters, - // ABCI query for address filtering. - func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { - res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ - Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), - }) - if err != nil { - return err - } - if res.IsErr() { - return fmt.Errorf("error querying abci app: %v", res) - } - - return nil - }, - ) - - peerFilters = append( - peerFilters, - // ABCI query for ID filtering. - func(_ p2p.IPeerSet, p p2p.Peer) error { - res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ - Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()), - }) - if err != nil { - return err - } - if res.IsErr() { - return fmt.Errorf("error querying abci app: %v", res) - } - - return nil - }, - ) - } - - p2p.MultiplexTransportConnFilters(connFilters...)(transport) - - // Limit the number of incoming connections. - max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) - p2p.MultiplexTransportMaxIncomingConnections(max)(transport) - - return transport, peerFilters -} - -func createSwitch(config *cfg.Config, - transport p2p.Transport, - p2pMetrics *p2p.Metrics, - peerFilters []p2p.PeerFilterFunc, - mempoolReactor p2p.Reactor, - bcReactor p2p.Reactor, - stateSyncReactor *statesync.Reactor, - consensusReactor *cs.Reactor, - evidenceReactor *evidence.Reactor, - nodeInfo p2p.NodeInfo, - nodeKey *p2p.NodeKey, - p2pLogger log.Logger, -) *p2p.Switch { - sw := p2p.NewSwitch( - config.P2P, - transport, - p2p.WithMetrics(p2pMetrics), - p2p.SwitchPeerFilters(peerFilters...), - ) - sw.SetLogger(p2pLogger) - sw.AddReactor("MEMPOOL", mempoolReactor) - sw.AddReactor("BLOCKSYNC", bcReactor) - sw.AddReactor("CONSENSUS", consensusReactor) - sw.AddReactor("EVIDENCE", evidenceReactor) - sw.AddReactor("STATESYNC", stateSyncReactor) - - sw.SetNodeInfo(nodeInfo) - sw.SetNodeKey(nodeKey) - - p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile()) - return sw -} - -func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch, - p2pLogger log.Logger, nodeKey *p2p.NodeKey, -) (pex.AddrBook, error) { - addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) - addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) - - // Add ourselves to addrbook to prevent dialing ourselves - if config.P2P.ExternalAddress != "" { - addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ExternalAddress)) - if err != nil { - return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err) - } - addrBook.AddOurAddress(addr) - } - if config.P2P.ListenAddress != "" { - addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ListenAddress)) - if err != nil { - return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err) - } - addrBook.AddOurAddress(addr) - } - - sw.SetAddrBook(addrBook) - - return addrBook, nil -} - -func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, - sw *p2p.Switch, logger log.Logger, -) *pex.Reactor { - // TODO persistent peers ? so we can have their DNS addrs saved - pexReactor := pex.NewReactor(addrBook, - &pex.ReactorConfig{ - Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "), - SeedMode: config.P2P.SeedMode, - // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000 - // blocks assuming 10s blocks ~ 28 hours. - // TODO (melekes): make it dynamic based on the actual block latencies - // from the live network. - // https://github.com/tendermint/tendermint/issues/3523 - SeedDisconnectWaitPeriod: 28 * time.Hour, - PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod, - }) - pexReactor.SetLogger(logger.With("module", "pex")) - sw.AddReactor("PEX", pexReactor) - return pexReactor -} - -// startStateSync starts an asynchronous state sync process, then switches to block sync mode. -func startStateSync(ssR *statesync.Reactor, bcR blockSyncReactor, conR *cs.Reactor, - stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, blockSync bool, - stateStore sm.Store, blockStore *store.BlockStore, state sm.State, -) error { - ssR.Logger.Info("Starting state sync") - - if stateProvider == nil { - var err error - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - stateProvider, err = statesync.NewLightClientStateProvider( - ctx, - state.ChainID, state.Version, state.InitialHeight, - config.RPCServers, light.TrustOptions{ - Period: config.TrustPeriod, - Height: config.TrustHeight, - Hash: config.TrustHashBytes(), - }, ssR.Logger.With("module", "light")) - if err != nil { - return fmt.Errorf("failed to set up light client state provider: %w", err) - } - } - - go func() { - state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime) - if err != nil { - ssR.Logger.Error("State sync failed", "err", err) - return - } - err = stateStore.Bootstrap(state) - if err != nil { - ssR.Logger.Error("Failed to bootstrap node with new state", "err", err) - return - } - err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit) - if err != nil { - ssR.Logger.Error("Failed to store last seen commit", "err", err) - return - } - - if blockSync { - // FIXME Very ugly to have these metrics bleed through here. - conR.Metrics.StateSyncing.Set(0) - conR.Metrics.BlockSyncing.Set(1) - err = bcR.SwitchToBlockSync(state) - if err != nil { - ssR.Logger.Error("Failed to switch to block sync", "err", err) - return - } - } else { - conR.SwitchToConsensus(state, true) - } - }() - return nil -} - // NewNode returns a new, ready to go, Tendermint Node. func NewNode(config *cfg.Config, privValidator types.PrivValidator, @@ -798,7 +232,7 @@ func NewNode(config *cfg.Config, mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger) // Make Evidence Reactor - evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger) + evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateStore, blockStore, logger) if err != nil { return nil, err } @@ -891,12 +325,8 @@ func NewNode(config *cfg.Config, pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) } - if config.RPC.PprofListenAddress != "" { - go func() { - logger.Info("Starting pprof server", "laddr", config.RPC.PprofListenAddress) - logger.Error("pprof server error", "err", http.ListenAndServe(config.RPC.PprofListenAddress, nil)) - }() - } + // Add private IDs to addrbook to block those peers being added + addrBook.AddPrivateIDs(splitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ")) node := &Node{ config: config, @@ -945,8 +375,15 @@ func (n *Node) OnStart() error { time.Sleep(genTime.Sub(now)) } - // Add private IDs to addrbook to block those peers being added - n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " ")) + // run pprof server if it is enabled + if n.config.RPC.IsPprofEnabled() { + n.pprofSrv = n.startPprofServer() + } + + // begin prometheus metrics gathering if it is enabled + if n.config.Instrumentation.IsPrometheusEnabled() { + n.prometheusSrv = n.startPrometheusServer() + } // Start the RPC server before the P2P server // so we can eg. receive txs for the first block @@ -958,11 +395,6 @@ func (n *Node) OnStart() error { n.rpcListeners = listeners } - if n.config.Instrumentation.Prometheus && - n.config.Instrumentation.PrometheusListenAddr != "" { - n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr) - } - // Start the transport. addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress)) if err != nil { @@ -1047,6 +479,11 @@ func (n *Node) OnStop() { n.Logger.Error("Prometheus HTTP server Shutdown", "err", err) } } + if n.pprofSrv != nil { + if err := n.pprofSrv.Shutdown(context.Background()); err != nil { + n.Logger.Error("Pprof HTTP server Shutdown", "err", err) + } + } if n.blockStore != nil { if err := n.blockStore.Close(); err != nil { n.Logger.Error("problem closing blockstore", "err", err) @@ -1215,9 +652,9 @@ func (n *Node) startRPC() ([]net.Listener, error) { // startPrometheusServer starts a Prometheus HTTP server, listening for metrics // collectors on addr. -func (n *Node) startPrometheusServer(addr string) *http.Server { +func (n *Node) startPrometheusServer() *http.Server { srv := &http.Server{ - Addr: addr, + Addr: n.config.Instrumentation.PrometheusListenAddr, Handler: promhttp.InstrumentMetricHandler( prometheus.DefaultRegisterer, promhttp.HandlerFor( prometheus.DefaultGatherer, @@ -1235,6 +672,22 @@ func (n *Node) startPrometheusServer(addr string) *http.Server { return srv } +// starts a ppro +func (n *Node) startPprofServer() *http.Server { + srv := &http.Server{ + Addr: n.config.RPC.PprofListenAddress, + Handler: nil, + ReadHeaderTimeout: readHeaderTimeout, + } + go func() { + if err := srv.ListenAndServe(); err != http.ErrServerClosed { + // Error starting or closing listener: + n.Logger.Error("pprof HTTP server ListenAndServe", "err", err) + } + }() + return srv +} + // Switch returns the Node's Switch. func (n *Node) Switch() *p2p.Switch { return n.sw @@ -1368,123 +821,3 @@ func makeNodeInfo( err := nodeInfo.Validate() return nodeInfo, err } - -//------------------------------------------------------------------------------ - -var genesisDocKey = []byte("genesisDoc") - -// LoadStateFromDBOrGenesisDocProvider attempts to load the state from the -// database, or creates one using the given genesisDocProvider. On success this also -// returns the genesis doc loaded through the given provider. -func LoadStateFromDBOrGenesisDocProvider( - stateDB dbm.DB, - genesisDocProvider GenesisDocProvider, -) (sm.State, *types.GenesisDoc, error) { - // Get genesis doc - genDoc, err := loadGenesisDoc(stateDB) - if err != nil { - genDoc, err = genesisDocProvider() - if err != nil { - return sm.State{}, nil, err - } - - err = genDoc.ValidateAndComplete() - if err != nil { - return sm.State{}, nil, fmt.Errorf("error in genesis doc: %w", err) - } - // save genesis doc to prevent a certain class of user errors (e.g. when it - // was changed, accidentally or not). Also good for audit trail. - if err := saveGenesisDoc(stateDB, genDoc); err != nil { - return sm.State{}, nil, err - } - } - stateStore := sm.NewStore(stateDB, sm.StoreOptions{ - DiscardABCIResponses: false, - }) - state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) - if err != nil { - return sm.State{}, nil, err - } - return state, genDoc, nil -} - -// panics if failed to unmarshal bytes -func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) { - b, err := db.Get(genesisDocKey) - if err != nil { - panic(err) - } - if len(b) == 0 { - return nil, errors.New("genesis doc not found") - } - var genDoc *types.GenesisDoc - err = tmjson.Unmarshal(b, &genDoc) - if err != nil { - panic(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, b)) - } - return genDoc, nil -} - -// panics if failed to marshal the given genesis document -func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) error { - b, err := tmjson.Marshal(genDoc) - if err != nil { - return fmt.Errorf("failed to save genesis doc due to marshaling error: %w", err) - } - if err := db.SetSync(genesisDocKey, b); err != nil { - return err - } - - return nil -} - -func createAndStartPrivValidatorSocketClient( - listenAddr, - chainID string, - logger log.Logger, -) (types.PrivValidator, error) { - pve, err := privval.NewSignerListener(listenAddr, logger) - if err != nil { - return nil, fmt.Errorf("failed to start private validator: %w", err) - } - - pvsc, err := privval.NewSignerClient(pve, chainID) - if err != nil { - return nil, fmt.Errorf("failed to start private validator: %w", err) - } - - // try to get a pubkey from private validate first time - _, err = pvsc.GetPubKey() - if err != nil { - return nil, fmt.Errorf("can't get pubkey: %w", err) - } - - const ( - retries = 50 // 50 * 100ms = 5s total - timeout = 100 * time.Millisecond - ) - pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout) - - return pvscWithRetries, nil -} - -// splitAndTrimEmpty slices s into all subslices separated by sep and returns a -// slice of the string s with all leading and trailing Unicode code points -// contained in cutset removed. If sep is empty, SplitAndTrim splits after each -// UTF-8 sequence. First part is equivalent to strings.SplitN with a count of -// -1. also filter out empty strings, only return non-empty strings. -func splitAndTrimEmpty(s, sep, cutset string) []string { - if s == "" { - return []string{} - } - - spl := strings.Split(s, sep) - nonEmptyStrings := make([]string, 0, len(spl)) - for i := 0; i < len(spl); i++ { - element := strings.Trim(spl[i], cutset) - if element != "" { - nonEmptyStrings = append(nonEmptyStrings, element) - } - } - return nonEmptyStrings -} diff --git a/node/node_test.go b/node/node_test.go index ee23892b1cd..4fdf63f3f4e 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "net/http" "os" "syscall" "testing" @@ -135,6 +136,29 @@ func TestNodeSetAppVersion(t *testing.T) { assert.Equal(t, n.nodeInfo.(p2p.DefaultNodeInfo).ProtocolVersion.App, appVersion) } +func TestPprofServer(t *testing.T) { + config := test.ResetTestRoot("node_pprof_test") + defer os.RemoveAll(config.RootDir) + config.RPC.PprofListenAddress = testFreeAddr(t) + + // should not work yet + _, err := http.Get("http://" + config.RPC.PprofListenAddress) //nolint: bodyclose + assert.Error(t, err) + + n, err := DefaultNewNode(config, log.TestingLogger()) + assert.NoError(t, err) + assert.NoError(t, n.Start()) + defer func() { + require.NoError(t, n.Stop()) + }() + assert.NotNil(t, n.pprofSrv) + + resp, err := http.Get("http://" + config.RPC.PprofListenAddress + "/debug/pprof") + assert.NoError(t, err) + defer resp.Body.Close() + assert.Equal(t, 200, resp.StatusCode) +} + func TestNodeSetPrivValTCP(t *testing.T) { addr := "tcp://" + testFreeAddr(t) diff --git a/node/setup.go b/node/setup.go new file mode 100644 index 00000000000..e9449558eaa --- /dev/null +++ b/node/setup.go @@ -0,0 +1,711 @@ +package node + +import ( + "bytes" + "context" + "errors" + "fmt" + "net" + "strings" + "time" + + dbm "github.com/tendermint/tm-db" + + abci "github.com/tendermint/tendermint/abci/types" + bc "github.com/tendermint/tendermint/blocksync" + cfg "github.com/tendermint/tendermint/config" + cs "github.com/tendermint/tendermint/consensus" + "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/evidence" + + tmjson "github.com/tendermint/tendermint/libs/json" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/light" + mempl "github.com/tendermint/tendermint/mempool" + mempoolv0 "github.com/tendermint/tendermint/mempool/v0" + mempoolv1 "github.com/tendermint/tendermint/mempool/v1" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/pex" + "github.com/tendermint/tendermint/privval" + "github.com/tendermint/tendermint/proxy" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" + blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" + blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null" + "github.com/tendermint/tendermint/state/indexer/sink/psql" + "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/state/txindex/kv" + "github.com/tendermint/tendermint/state/txindex/null" + "github.com/tendermint/tendermint/statesync" + "github.com/tendermint/tendermint/store" + "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/version" + + _ "github.com/lib/pq" // provide the psql db driver +) + +// DBContext specifies config information for loading a new DB. +type DBContext struct { + ID string + Config *cfg.Config +} + +// DBProvider takes a DBContext and returns an instantiated DB. +type DBProvider func(*DBContext) (dbm.DB, error) + +const readHeaderTimeout = 10 * time.Second + +// DefaultDBProvider returns a database using the DBBackend and DBDir +// specified in the ctx.Config. +func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) { + dbType := dbm.BackendType(ctx.Config.DBBackend) + return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()) +} + +// GenesisDocProvider returns a GenesisDoc. +// It allows the GenesisDoc to be pulled from sources other than the +// filesystem, for instance from a distributed key-value store cluster. +type GenesisDocProvider func() (*types.GenesisDoc, error) + +// DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads +// the GenesisDoc from the config.GenesisFile() on the filesystem. +func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider { + return func() (*types.GenesisDoc, error) { + return types.GenesisDocFromFile(config.GenesisFile()) + } +} + +// Provider takes a config and a logger and returns a ready to go Node. +type Provider func(*cfg.Config, log.Logger) (*Node, error) + +// DefaultNewNode returns a Tendermint node with default settings for the +// PrivValidator, ClientCreator, GenesisDoc, and DBProvider. +// It implements NodeProvider. +func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { + nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile()) + if err != nil { + return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err) + } + + return NewNode(config, + privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()), + nodeKey, + proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), + DefaultGenesisDocProviderFunc(config), + DefaultDBProvider, + DefaultMetricsProvider(config.Instrumentation), + logger, + ) +} + +// MetricsProvider returns a consensus, p2p and mempool Metrics. +type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) + +// DefaultMetricsProvider returns Metrics build using Prometheus client library +// if Prometheus is enabled. Otherwise, it returns no-op Metrics. +func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { + return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) { + if config.Prometheus { + return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID), + p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID), + mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID), + sm.PrometheusMetrics(config.Namespace, "chain_id", chainID), + proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID) + } + return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics() + } +} + +type blockSyncReactor interface { + SwitchToBlockSync(sm.State) error +} + +//------------------------------------------------------------------------------ + +func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { + var blockStoreDB dbm.DB + blockStoreDB, err = dbProvider(&DBContext{"blockstore", config}) + if err != nil { + return + } + blockStore = store.NewBlockStore(blockStoreDB) + + stateDB, err = dbProvider(&DBContext{"state", config}) + if err != nil { + return + } + + return +} + +func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) { + proxyApp := proxy.NewAppConns(clientCreator, metrics) + proxyApp.SetLogger(logger.With("module", "proxy")) + if err := proxyApp.Start(); err != nil { + return nil, fmt.Errorf("error starting proxy app connections: %v", err) + } + return proxyApp, nil +} + +func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) { + eventBus := types.NewEventBus() + eventBus.SetLogger(logger.With("module", "events")) + if err := eventBus.Start(); err != nil { + return nil, err + } + return eventBus, nil +} + +func createAndStartIndexerService( + config *cfg.Config, + chainID string, + dbProvider DBProvider, + eventBus *types.EventBus, + logger log.Logger, +) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) { + var ( + txIndexer txindex.TxIndexer + blockIndexer indexer.BlockIndexer + ) + + switch config.TxIndex.Indexer { + case "kv": + store, err := dbProvider(&DBContext{"tx_index", config}) + if err != nil { + return nil, nil, nil, err + } + + txIndexer = kv.NewTxIndex(store) + blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events"))) + + case "psql": + if config.TxIndex.PsqlConn == "" { + return nil, nil, nil, errors.New(`no psql-conn is set for the "psql" indexer`) + } + es, err := psql.NewEventSink(config.TxIndex.PsqlConn, chainID) + if err != nil { + return nil, nil, nil, fmt.Errorf("creating psql indexer: %w", err) + } + txIndexer = es.TxIndexer() + blockIndexer = es.BlockIndexer() + + default: + txIndexer = &null.TxIndex{} + blockIndexer = &blockidxnull.BlockerIndexer{} + } + + indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false) + indexerService.SetLogger(logger.With("module", "txindex")) + + if err := indexerService.Start(); err != nil { + return nil, nil, nil, err + } + + return indexerService, txIndexer, blockIndexer, nil +} + +func doHandshake( + stateStore sm.Store, + state sm.State, + blockStore sm.BlockStore, + genDoc *types.GenesisDoc, + eventBus types.BlockEventPublisher, + proxyApp proxy.AppConns, + consensusLogger log.Logger, +) error { + handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc) + handshaker.SetLogger(consensusLogger) + handshaker.SetEventBus(eventBus) + if err := handshaker.Handshake(proxyApp); err != nil { + return fmt.Errorf("error during handshake: %v", err) + } + return nil +} + +func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) { + // Log the version info. + logger.Info("Version info", + "tendermint_version", version.TMCoreSemVer, + "abci", version.ABCISemVer, + "block", version.BlockProtocol, + "p2p", version.P2PProtocol, + "commit_hash", version.TMGitCommitHash, + ) + + // If the state and software differ in block version, at least log it. + if state.Version.Consensus.Block != version.BlockProtocol { + logger.Info("Software and state have different block protocols", + "software", version.BlockProtocol, + "state", state.Version.Consensus.Block, + ) + } + + addr := pubKey.Address() + // Log whether this node is a validator or an observer + if state.Validators.HasAddress(addr) { + consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey) + } else { + consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey) + } +} + +func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { + if state.Validators.Size() > 1 { + return false + } + addr, _ := state.Validators.GetByIndex(0) + return bytes.Equal(pubKey.Address(), addr) +} + +func createMempoolAndMempoolReactor( + config *cfg.Config, + proxyApp proxy.AppConns, + state sm.State, + memplMetrics *mempl.Metrics, + logger log.Logger, +) (mempl.Mempool, p2p.Reactor) { + switch config.Mempool.Version { + case cfg.MempoolV1: + mp := mempoolv1.NewTxMempool( + logger, + config.Mempool, + proxyApp.Mempool(), + state.LastBlockHeight, + mempoolv1.WithMetrics(memplMetrics), + mempoolv1.WithPreCheck(sm.TxPreCheck(state)), + mempoolv1.WithPostCheck(sm.TxPostCheck(state)), + ) + + reactor := mempoolv1.NewReactor( + config.Mempool, + mp, + ) + if config.Consensus.WaitForTxs() { + mp.EnableTxsAvailable() + } + + return mp, reactor + + case cfg.MempoolV0: + mp := mempoolv0.NewCListMempool( + config.Mempool, + proxyApp.Mempool(), + state.LastBlockHeight, + mempoolv0.WithMetrics(memplMetrics), + mempoolv0.WithPreCheck(sm.TxPreCheck(state)), + mempoolv0.WithPostCheck(sm.TxPostCheck(state)), + ) + + mp.SetLogger(logger) + + reactor := mempoolv0.NewReactor( + config.Mempool, + mp, + ) + if config.Consensus.WaitForTxs() { + mp.EnableTxsAvailable() + } + + return mp, reactor + + default: + return nil, nil + } +} + +func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider, + stateStore sm.Store, blockStore *store.BlockStore, logger log.Logger, +) (*evidence.Reactor, *evidence.Pool, error) { + evidenceDB, err := dbProvider(&DBContext{"evidence", config}) + if err != nil { + return nil, nil, err + } + evidenceLogger := logger.With("module", "evidence") + evidencePool, err := evidence.NewPool(evidenceDB, stateStore, blockStore) + if err != nil { + return nil, nil, err + } + evidenceReactor := evidence.NewReactor(evidencePool) + evidenceReactor.SetLogger(evidenceLogger) + return evidenceReactor, evidencePool, nil +} + +func createBlocksyncReactor(config *cfg.Config, + state sm.State, + blockExec *sm.BlockExecutor, + blockStore *store.BlockStore, + blockSync bool, + logger log.Logger, +) (bcReactor p2p.Reactor, err error) { + switch config.BlockSync.Version { + case "v0": + bcReactor = bc.NewReactor(state.Copy(), blockExec, blockStore, blockSync) + case "v1", "v2": + return nil, fmt.Errorf("block sync version %s has been deprecated. Please use v0", config.BlockSync.Version) + default: + return nil, fmt.Errorf("unknown fastsync version %s", config.BlockSync.Version) + } + + bcReactor.SetLogger(logger.With("module", "blocksync")) + return bcReactor, nil +} + +func createConsensusReactor(config *cfg.Config, + state sm.State, + blockExec *sm.BlockExecutor, + blockStore sm.BlockStore, + mempool mempl.Mempool, + evidencePool *evidence.Pool, + privValidator types.PrivValidator, + csMetrics *cs.Metrics, + waitSync bool, + eventBus *types.EventBus, + consensusLogger log.Logger, +) (*cs.Reactor, *cs.State) { + consensusState := cs.NewState( + config.Consensus, + state.Copy(), + blockExec, + blockStore, + mempool, + evidencePool, + cs.StateMetrics(csMetrics), + ) + consensusState.SetLogger(consensusLogger) + if privValidator != nil { + consensusState.SetPrivValidator(privValidator) + } + consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics)) + consensusReactor.SetLogger(consensusLogger) + // services which will be publishing and/or subscribing for messages (events) + // consensusReactor will set it on consensusState and blockExecutor + consensusReactor.SetEventBus(eventBus) + return consensusReactor, consensusState +} + +func createTransport( + config *cfg.Config, + nodeInfo p2p.NodeInfo, + nodeKey *p2p.NodeKey, + proxyApp proxy.AppConns, +) ( + *p2p.MultiplexTransport, + []p2p.PeerFilterFunc, +) { + var ( + mConnConfig = p2p.MConnConfig(config.P2P) + transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig) + connFilters = []p2p.ConnFilterFunc{} + peerFilters = []p2p.PeerFilterFunc{} + ) + + if !config.P2P.AllowDuplicateIP { + connFilters = append(connFilters, p2p.ConnDuplicateIPFilter()) + } + + // Filter peers by addr or pubkey with an ABCI query. + // If the query return code is OK, add peer. + if config.FilterPeers { + connFilters = append( + connFilters, + // ABCI query for address filtering. + func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { + res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ + Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), + }) + if err != nil { + return err + } + if res.IsErr() { + return fmt.Errorf("error querying abci app: %v", res) + } + + return nil + }, + ) + + peerFilters = append( + peerFilters, + // ABCI query for ID filtering. + func(_ p2p.IPeerSet, p p2p.Peer) error { + res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ + Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()), + }) + if err != nil { + return err + } + if res.IsErr() { + return fmt.Errorf("error querying abci app: %v", res) + } + + return nil + }, + ) + } + + p2p.MultiplexTransportConnFilters(connFilters...)(transport) + + // Limit the number of incoming connections. + max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) + p2p.MultiplexTransportMaxIncomingConnections(max)(transport) + + return transport, peerFilters +} + +func createSwitch(config *cfg.Config, + transport p2p.Transport, + p2pMetrics *p2p.Metrics, + peerFilters []p2p.PeerFilterFunc, + mempoolReactor p2p.Reactor, + bcReactor p2p.Reactor, + stateSyncReactor *statesync.Reactor, + consensusReactor *cs.Reactor, + evidenceReactor *evidence.Reactor, + nodeInfo p2p.NodeInfo, + nodeKey *p2p.NodeKey, + p2pLogger log.Logger, +) *p2p.Switch { + sw := p2p.NewSwitch( + config.P2P, + transport, + p2p.WithMetrics(p2pMetrics), + p2p.SwitchPeerFilters(peerFilters...), + ) + sw.SetLogger(p2pLogger) + sw.AddReactor("MEMPOOL", mempoolReactor) + sw.AddReactor("BLOCKSYNC", bcReactor) + sw.AddReactor("CONSENSUS", consensusReactor) + sw.AddReactor("EVIDENCE", evidenceReactor) + sw.AddReactor("STATESYNC", stateSyncReactor) + + sw.SetNodeInfo(nodeInfo) + sw.SetNodeKey(nodeKey) + + p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile()) + return sw +} + +func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch, + p2pLogger log.Logger, nodeKey *p2p.NodeKey, +) (pex.AddrBook, error) { + addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) + addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) + + // Add ourselves to addrbook to prevent dialing ourselves + if config.P2P.ExternalAddress != "" { + addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ExternalAddress)) + if err != nil { + return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err) + } + addrBook.AddOurAddress(addr) + } + if config.P2P.ListenAddress != "" { + addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ListenAddress)) + if err != nil { + return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err) + } + addrBook.AddOurAddress(addr) + } + + sw.SetAddrBook(addrBook) + + return addrBook, nil +} + +func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, + sw *p2p.Switch, logger log.Logger, +) *pex.Reactor { + // TODO persistent peers ? so we can have their DNS addrs saved + pexReactor := pex.NewReactor(addrBook, + &pex.ReactorConfig{ + Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "), + SeedMode: config.P2P.SeedMode, + // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000 + // blocks assuming 10s blocks ~ 28 hours. + // TODO (melekes): make it dynamic based on the actual block latencies + // from the live network. + // https://github.com/tendermint/tendermint/issues/3523 + SeedDisconnectWaitPeriod: 28 * time.Hour, + PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod, + }) + pexReactor.SetLogger(logger.With("module", "pex")) + sw.AddReactor("PEX", pexReactor) + return pexReactor +} + +// startStateSync starts an asynchronous state sync process, then switches to block sync mode. +func startStateSync(ssR *statesync.Reactor, bcR blockSyncReactor, conR *cs.Reactor, + stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, blockSync bool, + stateStore sm.Store, blockStore *store.BlockStore, state sm.State, +) error { + ssR.Logger.Info("Starting state sync") + + if stateProvider == nil { + var err error + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + stateProvider, err = statesync.NewLightClientStateProvider( + ctx, + state.ChainID, state.Version, state.InitialHeight, + config.RPCServers, light.TrustOptions{ + Period: config.TrustPeriod, + Height: config.TrustHeight, + Hash: config.TrustHashBytes(), + }, ssR.Logger.With("module", "light")) + if err != nil { + return fmt.Errorf("failed to set up light client state provider: %w", err) + } + } + + go func() { + state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime) + if err != nil { + ssR.Logger.Error("State sync failed", "err", err) + return + } + err = stateStore.Bootstrap(state) + if err != nil { + ssR.Logger.Error("Failed to bootstrap node with new state", "err", err) + return + } + err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit) + if err != nil { + ssR.Logger.Error("Failed to store last seen commit", "err", err) + return + } + + if blockSync { + // FIXME Very ugly to have these metrics bleed through here. + conR.Metrics.StateSyncing.Set(0) + conR.Metrics.BlockSyncing.Set(1) + err = bcR.SwitchToBlockSync(state) + if err != nil { + ssR.Logger.Error("Failed to switch to block sync", "err", err) + return + } + } else { + conR.SwitchToConsensus(state, true) + } + }() + return nil +} + +//------------------------------------------------------------------------------ + +var genesisDocKey = []byte("genesisDoc") + +// LoadStateFromDBOrGenesisDocProvider attempts to load the state from the +// database, or creates one using the given genesisDocProvider. On success this also +// returns the genesis doc loaded through the given provider. +func LoadStateFromDBOrGenesisDocProvider( + stateDB dbm.DB, + genesisDocProvider GenesisDocProvider, +) (sm.State, *types.GenesisDoc, error) { + // Get genesis doc + genDoc, err := loadGenesisDoc(stateDB) + if err != nil { + genDoc, err = genesisDocProvider() + if err != nil { + return sm.State{}, nil, err + } + + err = genDoc.ValidateAndComplete() + if err != nil { + return sm.State{}, nil, fmt.Errorf("error in genesis doc: %w", err) + } + // save genesis doc to prevent a certain class of user errors (e.g. when it + // was changed, accidentally or not). Also good for audit trail. + if err := saveGenesisDoc(stateDB, genDoc); err != nil { + return sm.State{}, nil, err + } + } + stateStore := sm.NewStore(stateDB, sm.StoreOptions{ + DiscardABCIResponses: false, + }) + state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) + if err != nil { + return sm.State{}, nil, err + } + return state, genDoc, nil +} + +// panics if failed to unmarshal bytes +func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) { + b, err := db.Get(genesisDocKey) + if err != nil { + panic(err) + } + if len(b) == 0 { + return nil, errors.New("genesis doc not found") + } + var genDoc *types.GenesisDoc + err = tmjson.Unmarshal(b, &genDoc) + if err != nil { + panic(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, b)) + } + return genDoc, nil +} + +// panics if failed to marshal the given genesis document +func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) error { + b, err := tmjson.Marshal(genDoc) + if err != nil { + return fmt.Errorf("failed to save genesis doc due to marshaling error: %w", err) + } + if err := db.SetSync(genesisDocKey, b); err != nil { + return err + } + + return nil +} + +func createAndStartPrivValidatorSocketClient( + listenAddr, + chainID string, + logger log.Logger, +) (types.PrivValidator, error) { + pve, err := privval.NewSignerListener(listenAddr, logger) + if err != nil { + return nil, fmt.Errorf("failed to start private validator: %w", err) + } + + pvsc, err := privval.NewSignerClient(pve, chainID) + if err != nil { + return nil, fmt.Errorf("failed to start private validator: %w", err) + } + + // try to get a pubkey from private validate first time + _, err = pvsc.GetPubKey() + if err != nil { + return nil, fmt.Errorf("can't get pubkey: %w", err) + } + + const ( + retries = 50 // 50 * 100ms = 5s total + timeout = 100 * time.Millisecond + ) + pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout) + + return pvscWithRetries, nil +} + +// splitAndTrimEmpty slices s into all subslices separated by sep and returns a +// slice of the string s with all leading and trailing Unicode code points +// contained in cutset removed. If sep is empty, SplitAndTrim splits after each +// UTF-8 sequence. First part is equivalent to strings.SplitN with a count of +// -1. also filter out empty strings, only return non-empty strings. +func splitAndTrimEmpty(s, sep, cutset string) []string { + if s == "" { + return []string{} + } + + spl := strings.Split(s, sep) + nonEmptyStrings := make([]string, 0, len(spl)) + for i := 0; i < len(spl); i++ { + element := strings.Trim(spl[i], cutset) + if element != "" { + nonEmptyStrings = append(nonEmptyStrings, element) + } + } + return nonEmptyStrings +}