diff --git a/cmd/elasticindexer/flags.go b/cmd/elasticindexer/flags.go index d7e84b70..084f3b3e 100644 --- a/cmd/elasticindexer/flags.go +++ b/cmd/elasticindexer/flags.go @@ -47,4 +47,9 @@ var ( Name: "disable-ansi-color", Usage: "Boolean option for disabling ANSI colors in the logging system.", } + // sovereign defines a flag that specifies if what run type components should use + sovereign = cli.BoolFlag{ + Name: "sovereign-config", + Usage: "If set to true, will use sovereign run type components", + } ) diff --git a/cmd/elasticindexer/main.go b/cmd/elasticindexer/main.go index 85246d13..4c3e9366 100644 --- a/cmd/elasticindexer/main.go +++ b/cmd/elasticindexer/main.go @@ -11,13 +11,14 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/core/closing" "github.com/multiversx/mx-chain-core-go/data/outport" + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-chain-logger-go/file" + "github.com/urfave/cli" + "github.com/multiversx/mx-chain-es-indexer-go/config" "github.com/multiversx/mx-chain-es-indexer-go/factory" "github.com/multiversx/mx-chain-es-indexer-go/metrics" "github.com/multiversx/mx-chain-es-indexer-go/process/wsindexer" - logger "github.com/multiversx/mx-chain-logger-go" - "github.com/multiversx/mx-chain-logger-go/file" - "github.com/urfave/cli" ) var ( @@ -63,6 +64,7 @@ func main() { logLevel, logSaveFile, disableAnsiColor, + sovereign, } app.Authors = []cli.Author{ { @@ -98,7 +100,7 @@ func startIndexer(ctx *cli.Context) error { } statusMetrics := metrics.NewStatusMetrics() - wsHost, err := factory.CreateWsIndexer(cfg, clusterCfg, statusMetrics, ctx.App.Version) + wsHost, err := factory.CreateWsIndexer(cfg, clusterCfg, statusMetrics, ctx.App.Version, ctx.GlobalBool(sovereign.Name)) if err != nil { return fmt.Errorf("%w while creating the indexer", err) } diff --git a/factory/wsIndexerFactory.go b/factory/wsIndexerFactory.go index 90beb5a9..5b41359c 100644 --- a/factory/wsIndexerFactory.go +++ b/factory/wsIndexerFactory.go @@ -7,23 +7,24 @@ import ( factoryHasher "github.com/multiversx/mx-chain-core-go/hashing/factory" "github.com/multiversx/mx-chain-core-go/marshal" factoryMarshaller "github.com/multiversx/mx-chain-core-go/marshal/factory" + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-chain-es-indexer-go/config" "github.com/multiversx/mx-chain-es-indexer-go/core" "github.com/multiversx/mx-chain-es-indexer-go/process/factory" "github.com/multiversx/mx-chain-es-indexer-go/process/wsindexer" - logger "github.com/multiversx/mx-chain-logger-go" ) var log = logger.GetOrCreate("elasticindexer") // CreateWsIndexer will create a new instance of wsindexer.WSClient -func CreateWsIndexer(cfg config.Config, clusterCfg config.ClusterConfig, statusMetrics core.StatusMetricsHandler, version string) (wsindexer.WSClient, error) { +func CreateWsIndexer(cfg config.Config, clusterCfg config.ClusterConfig, statusMetrics core.StatusMetricsHandler, version string, isSovereignType bool) (wsindexer.WSClient, error) { wsMarshaller, err := factoryMarshaller.NewMarshalizer(clusterCfg.Config.WebSocket.DataMarshallerType) if err != nil { return nil, err } - dataIndexer, err := createDataIndexer(cfg, clusterCfg, wsMarshaller, statusMetrics, version) + dataIndexer, err := createDataIndexer(cfg, clusterCfg, wsMarshaller, statusMetrics, version, isSovereignType) if err != nil { return nil, err } @@ -57,6 +58,7 @@ func createDataIndexer( wsMarshaller marshal.Marshalizer, statusMetrics core.StatusMetricsHandler, version string, + isSovereignConfig bool, ) (wsindexer.DataIndexer, error) { marshaller, err := factoryMarshaller.NewMarshalizer(cfg.Config.Marshaller.Type) if err != nil { @@ -76,6 +78,7 @@ func createDataIndexer( } return factory.NewIndexer(factory.ArgsIndexerFactory{ + SovereignConfig: isSovereignConfig, UseKibana: clusterCfg.Config.ElasticCluster.UseKibana, Denomination: cfg.Config.Economics.Denomination, BulkRequestMaxSize: clusterCfg.Config.ElasticCluster.BulkRequestMaxSizeInBytes, diff --git a/process/factory/indexerFactory.go b/process/factory/indexerFactory.go index 5da830f1..027dc651 100644 --- a/process/factory/indexerFactory.go +++ b/process/factory/indexerFactory.go @@ -12,14 +12,16 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" + logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-chain-es-indexer-go/client" "github.com/multiversx/mx-chain-es-indexer-go/client/logging" "github.com/multiversx/mx-chain-es-indexer-go/client/transport" indexerCore "github.com/multiversx/mx-chain-es-indexer-go/core" + "github.com/multiversx/mx-chain-es-indexer-go/factory/runType" "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/factory" - logger "github.com/multiversx/mx-chain-logger-go" ) var log = logger.GetOrCreate("indexer/factory") @@ -30,6 +32,7 @@ type ArgsIndexerFactory struct { Enabled bool UseKibana bool ImportDB bool + SovereignConfig bool Denomination int BulkRequestMaxSize int Url string @@ -44,6 +47,7 @@ type ArgsIndexerFactory struct { AddressPubkeyConverter core.PubkeyConverter ValidatorPubkeyConverter core.PubkeyConverter StatusMetrics indexerCore.StatusMetricsHandler + RunTypeComponents runType.RunTypeComponentsHandler } // NewIndexer will create a new instance of Indexer @@ -53,6 +57,15 @@ func NewIndexer(args ArgsIndexerFactory) (dataindexer.Indexer, error) { return nil, err } + if args.SovereignConfig { + args.RunTypeComponents, err = createManagedRunTypeComponents(runType.NewSovereignRunTypeComponentsFactory()) + } else { + args.RunTypeComponents, err = createManagedRunTypeComponents(runType.NewRunTypeComponentsFactory()) + } + if err != nil { + return nil, err + } + elasticProcessor, err := createElasticProcessor(args) if err != nil { return nil, err @@ -72,6 +85,20 @@ func NewIndexer(args ArgsIndexerFactory) (dataindexer.Indexer, error) { return dataindexer.NewDataIndexer(arguments) } +func createManagedRunTypeComponents(factory runType.RunTypeComponentsCreator) (runType.RunTypeComponentsHandler, error) { + managedRunTypeComponents, err := runType.NewManagedRunTypeComponents(factory) + if err != nil { + return nil, err + } + + err = managedRunTypeComponents.Create() + if err != nil { + return nil, err + } + + return managedRunTypeComponents, nil +} + func retryBackOff(attempt int) time.Duration { d := time.Duration(math.Exp2(float64(attempt))) * time.Second log.Debug("elastic: retry backoff", "attempt", attempt, "sleep duration", d)