Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT - DR flag now takes effecwt in sendDataReliabilityRelayIfApplicable #1839

Merged
merged 1 commit into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,15 @@ const (

// helper struct to propagate flags deeper into the code in an organized manner
type ConsumerCmdFlags struct {
HeadersFlag string // comma separated list of headers, or * for all, default simple cors specification headers
CredentialsFlag string // access-control-allow-credentials, defaults to "true"
OriginFlag string // comma separated list of origins, or * for all, default enabled completely
MethodsFlag string // whether to allow access control headers *, most proxies have their own access control so its not required
CDNCacheDuration string // how long to cache the preflight response defaults 24 hours (in seconds) "86400"
RelaysHealthEnableFlag bool // enables relay health check
RelaysHealthIntervalFlag time.Duration // interval for relay health check
DebugRelays bool // enables debug mode for relays
DisableConflictTransactions bool // disable conflict transactions
StaticSpecPath string // path to the spec file, works only when bootstrapping a single chain.
HeadersFlag string // comma separated list of headers, or * for all, default simple cors specification headers
CredentialsFlag string // access-control-allow-credentials, defaults to "true"
OriginFlag string // comma separated list of origins, or * for all, default enabled completely
MethodsFlag string // whether to allow access control headers *, most proxies have their own access control so its not required
CDNCacheDuration string // how long to cache the preflight response defaults 24 hours (in seconds) "86400"
RelaysHealthEnableFlag bool // enables relay health check
RelaysHealthIntervalFlag time.Duration // interval for relay health check
DebugRelays bool // enables debug mode for relays
StaticSpecPath string // path to the spec file, works only when bootstrapping a single chain.
}

// default rolling logs behavior (if enabled) will store 3 files each 100MB for up to 1 day every time.
Expand Down
23 changes: 11 additions & 12 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt

// spawn up ConsumerStateTracker
lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx)
consumerStateTracker, err := statetracker.NewConsumerStateTracker(ctx, options.txFactory, options.clientCtx, lavaChainFetcher, consumerMetricsManager, options.cmdFlags.DisableConflictTransactions)
consumerStateTracker, err := statetracker.NewConsumerStateTracker(ctx, options.txFactory, options.clientCtx, lavaChainFetcher, consumerMetricsManager)
if err != nil {
utils.LavaFormatFatal("failed to create a NewConsumerStateTracker", err)
}
Expand Down Expand Up @@ -649,16 +649,15 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77

maxConcurrentProviders := viper.GetUint(common.MaximumConcurrentProvidersFlagName)
consumerPropagatedFlags := common.ConsumerCmdFlags{
HeadersFlag: viper.GetString(common.CorsHeadersFlag),
CredentialsFlag: viper.GetString(common.CorsCredentialsFlag),
OriginFlag: viper.GetString(common.CorsOriginFlag),
MethodsFlag: viper.GetString(common.CorsMethodsFlag),
CDNCacheDuration: viper.GetString(common.CDNCacheDurationFlag),
RelaysHealthEnableFlag: viper.GetBool(common.RelaysHealthEnableFlag),
RelaysHealthIntervalFlag: viper.GetDuration(common.RelayHealthIntervalFlag),
DebugRelays: viper.GetBool(DebugRelaysFlagName),
DisableConflictTransactions: viper.GetBool(common.DisableConflictTransactionsFlag),
StaticSpecPath: viper.GetString(common.UseStaticSpecFlag),
HeadersFlag: viper.GetString(common.CorsHeadersFlag),
CredentialsFlag: viper.GetString(common.CorsCredentialsFlag),
OriginFlag: viper.GetString(common.CorsOriginFlag),
MethodsFlag: viper.GetString(common.CorsMethodsFlag),
CDNCacheDuration: viper.GetString(common.CDNCacheDurationFlag),
RelaysHealthEnableFlag: viper.GetBool(common.RelaysHealthEnableFlag),
RelaysHealthIntervalFlag: viper.GetDuration(common.RelayHealthIntervalFlag),
DebugRelays: viper.GetBool(DebugRelaysFlagName),
StaticSpecPath: viper.GetString(common.UseStaticSpecFlag),
}

// validate user is does not provide multi chain setup when using the offline spec feature.
Expand Down Expand Up @@ -754,7 +753,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
cmdRPCConsumer.Flags().String(refererMarkerFlagName, "lava-referer-", "the string marker to identify referer")
cmdRPCConsumer.Flags().String(reportsSendBEAddress, "", "address to send reports to")
cmdRPCConsumer.Flags().BoolVar(&lavasession.DebugProbes, DebugProbesFlagName, false, "adding information to probes")
cmdRPCConsumer.Flags().Bool(common.DisableConflictTransactionsFlag, false, "disabling conflict transactions, this flag should not be used as it harms the network's data reliability and therefore the service.")
cmdRPCConsumer.Flags().BoolVar(&statetracker.DisableDR, common.DisableConflictTransactionsFlag, statetracker.DisableDR, "disabling conflict transactions, this flag should not be used as it harms the network's data reliability and therefore the service.")
cmdRPCConsumer.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks")
cmdRPCConsumer.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain")
cmdRPCConsumer.Flags().IntVar(&relayCountOnNodeError, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors")
Expand Down
4 changes: 4 additions & 0 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/lavanet/lava/v4/protocol/lavasession"
"github.com/lavanet/lava/v4/protocol/metrics"
"github.com/lavanet/lava/v4/protocol/performance"
"github.com/lavanet/lava/v4/protocol/statetracker"
"github.com/lavanet/lava/v4/protocol/upgrade"
"github.com/lavanet/lava/v4/utils"
"github.com/lavanet/lava/v4/utils/protocopy"
Expand Down Expand Up @@ -1254,6 +1255,9 @@ func (rpccs *RPCConsumerServer) getFirstSubscriptionReply(ctx context.Context, h
}

func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context.Context, protocolMessage chainlib.ProtocolMessage, dataReliabilityThreshold uint32, relayProcessor *RelayProcessor) error {
if statetracker.DisableDR {
return nil
}
processingTimeout, expectedRelayTimeout := rpccs.getProcessingTimeout(protocolMessage)
// Wait another relayTimeout duration to maybe get additional relay results
if relayProcessor.usedProviders.CurrentlyUsed() > 0 {
Expand Down
8 changes: 4 additions & 4 deletions protocol/statetracker/consumer_state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
protocoltypes "github.com/lavanet/lava/v4/x/protocol/types"
)

var DisableDR = false

type ConsumerTxSenderInf interface {
TxSenderConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict) error
}
Expand All @@ -29,10 +31,9 @@ type ConsumerStateTracker struct {
ConsumerTxSenderInf
*StateTracker
ConsumerEmergencyTrackerInf
disableConflictTransactions bool
}

func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, chainFetcher chaintracker.ChainFetcher, metrics *metrics.ConsumerMetricsManager, disableConflictTransactions bool) (ret *ConsumerStateTracker, err error) {
func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, chainFetcher chaintracker.ChainFetcher, metrics *metrics.ConsumerMetricsManager) (ret *ConsumerStateTracker, err error) {
emergencyTracker, blockNotFoundCallback := NewEmergencyTracker(metrics)
stateQuery := updaters.NewConsumerStateQuery(ctx, clientCtx)
stateTrackerBase, err := NewStateTracker(ctx, txFactory, stateQuery.StateQuery, chainFetcher, blockNotFoundCallback)
Expand All @@ -48,7 +49,6 @@ func NewConsumerStateTracker(ctx context.Context, txFactory tx.Factory, clientCt
StateQuery: stateQuery,
ConsumerTxSenderInf: txSender,
ConsumerEmergencyTrackerInf: emergencyTracker,
disableConflictTransactions: disableConflictTransactions,
}

err = cst.RegisterForDowntimeParamsUpdates(ctx, emergencyTracker)
Expand Down Expand Up @@ -105,7 +105,7 @@ func (cst *ConsumerStateTracker) RegisterFinalizationConsensusForUpdates(ctx con
}

func (cst *ConsumerStateTracker) TxConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, conflictHandler common.ConflictHandlerInterface) error {
if cst.disableConflictTransactions {
if DisableDR {
utils.LavaFormatInfo("found Conflict, but transactions are disabled, returning")
return nil
}
Expand Down
Loading