diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index 42392f19a5..8ce27f055b 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -65,16 +65,15 @@ const ( // helper struct to propagate flags deeper into the code in an organized manner type ConsumerCmdFlags struct { - HeadersFlag string // comma separated list of headers, or * for all, default simple cors specification headers - CredentialsFlag string // access-control-allow-credentials, defaults to "true" - OriginFlag string // comma separated list of origins, or * for all, default enabled completely - MethodsFlag string // whether to allow access control headers *, most proxies have their own access control so its not required - CDNCacheDuration string // how long to cache the preflight response defaults 24 hours (in seconds) "86400" - RelaysHealthEnableFlag bool // enables relay health check - RelaysHealthIntervalFlag time.Duration // interval for relay health check - DebugRelays bool // enables debug mode for relays - DisableConflictTransactions bool // disable conflict transactions - StaticSpecPath string // path to the spec file, works only when bootstrapping a single chain. + HeadersFlag string // comma separated list of headers, or * for all, default simple cors specification headers + CredentialsFlag string // access-control-allow-credentials, defaults to "true" + OriginFlag string // comma separated list of origins, or * for all, default enabled completely + MethodsFlag string // whether to allow access control headers *, most proxies have their own access control so its not required + CDNCacheDuration string // how long to cache the preflight response defaults 24 hours (in seconds) "86400" + RelaysHealthEnableFlag bool // enables relay health check + RelaysHealthIntervalFlag time.Duration // interval for relay health check + DebugRelays bool // enables debug mode for relays + StaticSpecPath string // path to the spec file, works only when bootstrapping a single chain. } // default rolling logs behavior (if enabled) will store 3 files each 100MB for up to 1 day every time. diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index 445c9058dd..4baa6c00fd 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -203,7 +203,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt // spawn up ConsumerStateTracker lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx) - consumerStateTracker, err := statetracker.NewConsumerStateTracker(ctx, options.txFactory, options.clientCtx, lavaChainFetcher, consumerMetricsManager, options.cmdFlags.DisableConflictTransactions) + consumerStateTracker, err := statetracker.NewConsumerStateTracker(ctx, options.txFactory, options.clientCtx, lavaChainFetcher, consumerMetricsManager) if err != nil { utils.LavaFormatFatal("failed to create a NewConsumerStateTracker", err) } @@ -649,16 +649,15 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 maxConcurrentProviders := viper.GetUint(common.MaximumConcurrentProvidersFlagName) consumerPropagatedFlags := common.ConsumerCmdFlags{ - HeadersFlag: viper.GetString(common.CorsHeadersFlag), - CredentialsFlag: viper.GetString(common.CorsCredentialsFlag), - OriginFlag: viper.GetString(common.CorsOriginFlag), - MethodsFlag: viper.GetString(common.CorsMethodsFlag), - CDNCacheDuration: viper.GetString(common.CDNCacheDurationFlag), - RelaysHealthEnableFlag: viper.GetBool(common.RelaysHealthEnableFlag), - RelaysHealthIntervalFlag: viper.GetDuration(common.RelayHealthIntervalFlag), - DebugRelays: viper.GetBool(DebugRelaysFlagName), - DisableConflictTransactions: viper.GetBool(common.DisableConflictTransactionsFlag), - StaticSpecPath: viper.GetString(common.UseStaticSpecFlag), + HeadersFlag: viper.GetString(common.CorsHeadersFlag), + CredentialsFlag: viper.GetString(common.CorsCredentialsFlag), + OriginFlag: viper.GetString(common.CorsOriginFlag), + MethodsFlag: viper.GetString(common.CorsMethodsFlag), + CDNCacheDuration: viper.GetString(common.CDNCacheDurationFlag), + RelaysHealthEnableFlag: viper.GetBool(common.RelaysHealthEnableFlag), + RelaysHealthIntervalFlag: viper.GetDuration(common.RelayHealthIntervalFlag), + DebugRelays: viper.GetBool(DebugRelaysFlagName), + StaticSpecPath: viper.GetString(common.UseStaticSpecFlag), } // validate user is does not provide multi chain setup when using the offline spec feature. @@ -754,7 +753,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().String(refererMarkerFlagName, "lava-referer-", "the string marker to identify referer") cmdRPCConsumer.Flags().String(reportsSendBEAddress, "", "address to send reports to") cmdRPCConsumer.Flags().BoolVar(&lavasession.DebugProbes, DebugProbesFlagName, false, "adding information to probes") - cmdRPCConsumer.Flags().Bool(common.DisableConflictTransactionsFlag, false, "disabling conflict transactions, this flag should not be used as it harms the network's data reliability and therefore the service.") + cmdRPCConsumer.Flags().BoolVar(&statetracker.DisableDR, common.DisableConflictTransactionsFlag, statetracker.DisableDR, "disabling conflict transactions, this flag should not be used as it harms the network's data reliability and therefore the service.") cmdRPCConsumer.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks") cmdRPCConsumer.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain") cmdRPCConsumer.Flags().IntVar(&relayCountOnNodeError, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 5204d4adce..8592da8408 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -27,6 +27,7 @@ import ( "github.com/lavanet/lava/v4/protocol/lavasession" "github.com/lavanet/lava/v4/protocol/metrics" "github.com/lavanet/lava/v4/protocol/performance" + "github.com/lavanet/lava/v4/protocol/statetracker" "github.com/lavanet/lava/v4/protocol/upgrade" "github.com/lavanet/lava/v4/utils" "github.com/lavanet/lava/v4/utils/protocopy" @@ -1254,6 +1255,9 @@ func (rpccs *RPCConsumerServer) getFirstSubscriptionReply(ctx context.Context, h } func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context.Context, protocolMessage chainlib.ProtocolMessage, dataReliabilityThreshold uint32, relayProcessor *RelayProcessor) error { + if statetracker.DisableDR { + return nil + } processingTimeout, expectedRelayTimeout := rpccs.getProcessingTimeout(protocolMessage) // Wait another relayTimeout duration to maybe get additional relay results if relayProcessor.usedProviders.CurrentlyUsed() > 0 { diff --git a/protocol/statetracker/consumer_state_tracker.go b/protocol/statetracker/consumer_state_tracker.go index ffa21cd6a7..db91d8312e 100644 --- a/protocol/statetracker/consumer_state_tracker.go +++ b/protocol/statetracker/consumer_state_tracker.go @@ -18,6 +18,8 @@ import ( protocoltypes "github.com/lavanet/lava/v4/x/protocol/types" ) +var DisableDR = false + type ConsumerTxSenderInf interface { TxSenderConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict) error } @@ -29,10 +31,9 @@ type ConsumerStateTracker struct { ConsumerTxSenderInf *StateTracker ConsumerEmergencyTrackerInf - disableConflictTransactions bool } -func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, chainFetcher chaintracker.ChainFetcher, metrics *metrics.ConsumerMetricsManager, disableConflictTransactions bool) (ret *ConsumerStateTracker, err error) { +func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, chainFetcher chaintracker.ChainFetcher, metrics *metrics.ConsumerMetricsManager) (ret *ConsumerStateTracker, err error) { emergencyTracker, blockNotFoundCallback := NewEmergencyTracker(metrics) stateQuery := updaters.NewConsumerStateQuery(ctx, clientCtx) stateTrackerBase, err := NewStateTracker(ctx, txFactory, stateQuery.StateQuery, chainFetcher, blockNotFoundCallback) @@ -48,7 +49,6 @@ func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCt StateQuery: stateQuery, ConsumerTxSenderInf: txSender, ConsumerEmergencyTrackerInf: emergencyTracker, - disableConflictTransactions: disableConflictTransactions, } err = cst.RegisterForDowntimeParamsUpdates(ctx, emergencyTracker) @@ -105,7 +105,7 @@ func (cst *ConsumerStateTracker) RegisterFinalizationConsensusForUpdates(ctx con } func (cst *ConsumerStateTracker) TxConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, conflictHandler common.ConflictHandlerInterface) error { - if cst.disableConflictTransactions { + if DisableDR { utils.LavaFormatInfo("found Conflict, but transactions are disabled, returning") return nil }