diff --git a/protocol/chainlib/chain_message_queries.go b/protocol/chainlib/chain_message_queries.go index 7fce2df76f..a8e453cdf1 100644 --- a/protocol/chainlib/chain_message_queries.go +++ b/protocol/chainlib/chain_message_queries.go @@ -3,7 +3,7 @@ package chainlib import "github.com/lavanet/lava/protocol/common" func ShouldSendToAllProviders(chainMessage ChainMessage) bool { - return chainMessage.GetApi().Category.Stateful == common.CONSISTENCY_SELECT_ALLPROVIDERS + return chainMessage.GetApi().Category.Stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS } func GetAddon(chainMessage ChainMessageForSend) string { diff --git a/protocol/common/collections.go b/protocol/common/collections.go index ad688973a8..7e75f5028d 100644 --- a/protocol/common/collections.go +++ b/protocol/common/collections.go @@ -5,8 +5,8 @@ import ( ) const ( - CONSISTENCY_SELECT_ALLPROVIDERS = 1 - NOSTATE = 0 + CONSISTENCY_SELECT_ALL_PROVIDERS = 1 + NO_STATE = 0 ) func GetExtensionNames(extensionCollection []*spectypes.Extension) (extensions []string) { diff --git a/protocol/common/timeout.go b/protocol/common/timeout.go index f456c1f95b..3b6e6d4708 100644 --- a/protocol/common/timeout.go +++ b/protocol/common/timeout.go @@ -72,7 +72,7 @@ type TimeoutInfo struct { func GetTimeoutForProcessing(relayTimeout time.Duration, timeoutInfo TimeoutInfo) time.Duration { ctxTimeout := DefaultTimeout - if timeoutInfo.Hanging || timeoutInfo.CU > 100 || timeoutInfo.Stateful == CONSISTENCY_SELECT_ALLPROVIDERS { + if timeoutInfo.Hanging || timeoutInfo.CU > 100 || timeoutInfo.Stateful == CONSISTENCY_SELECT_ALL_PROVIDERS { ctxTimeout = DefaultTimeoutLong } if relayTimeout > ctxTimeout { diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index a320d222e3..04e609e5f6 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -492,7 +492,7 @@ func (csm *ConsumerSessionManager) getValidProviderAddresses(ignoredProvidersLis } } var providers []string - if stateful == common.CONSISTENCY_SELECT_ALLPROVIDERS && csm.providerOptimizer.Strategy() != provideroptimizer.STRATEGY_COST { + if stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS && csm.providerOptimizer.Strategy() != provideroptimizer.STRATEGY_COST { providers = GetAllProviders(validAddresses, ignoredProvidersList) } else { providers = csm.providerOptimizer.ChooseProvider(validAddresses, ignoredProvidersList, cu, requestedBlock, OptimizerPerturbation) diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index a43be66df7..965f693263 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -139,7 +139,7 @@ func TestHappyFlow(t *testing.T) { pairingList := createPairingList("", true) err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. require.NoError(t, err) - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) for _, cs := range css { @@ -161,7 +161,7 @@ func TestHappyFlowVirtualEpoch(t *testing.T) { pairingList := createPairingList("", true) err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. require.NoError(t, err) - css, err := csm.GetSessions(ctx, maxCuForVirtualEpoch*(virtualEpoch+1), NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, virtualEpoch) // get a session + css, err := csm.GetSessions(ctx, maxCuForVirtualEpoch*(virtualEpoch+1), NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, virtualEpoch) // get a session require.NoError(t, err) for _, cs := range css { @@ -185,7 +185,7 @@ func TestVirtualEpochWithFailure(t *testing.T) { err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. require.NoError(t, err) - _, err = csm.GetSessions(ctx, maxCuForVirtualEpoch*(virtualEpoch+1)+10, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, virtualEpoch) // get a session + _, err = csm.GetSessions(ctx, maxCuForVirtualEpoch*(virtualEpoch+1)+10, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, virtualEpoch) // get a session require.Error(t, err) } @@ -195,8 +195,8 @@ func TestPairingReset(t *testing.T) { pairingList := createPairingList("", true) err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. require.NoError(t, err) - csm.validAddresses = []string{} // set valid addresses to zero - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + csm.validAddresses = []string{} // set valid addresses to zero + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) require.Equal(t, len(csm.validAddresses), len(csm.pairingAddresses)) @@ -225,7 +225,7 @@ func TestPairingResetWithFailures(t *testing.T) { if len(csm.validAddresses) == 0 { // wait for all pairings to be blocked. break } - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) for _, cs := range css { @@ -234,7 +234,7 @@ func TestPairingResetWithFailures(t *testing.T) { } } require.Equal(t, len(csm.validAddresses), 0) - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) require.Equal(t, len(csm.validAddresses), len(csm.pairingAddresses)) @@ -259,7 +259,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) { if len(csm.validAddresses) == 0 { // wait for all pairings to be blocked. break } - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session for _, cs := range css { err = csm.OnSessionFailure(cs.Session, nil) @@ -271,7 +271,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) { } } require.Equal(t, len(csm.validAddresses), 0) - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) require.Equal(t, len(csm.validAddresses), len(csm.pairingAddresses)) @@ -283,7 +283,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) { } } - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) for _, cs := range css { @@ -318,7 +318,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) { sessionList := make([]session, numberOfAllowedSessionsPerConsumer) sessionListData := make([]SessTestData, numberOfAllowedSessionsPerConsumer) for i := 0; i < numberOfAllowedSessionsPerConsumer; i++ { - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) for _, cs := range css { // get a session @@ -354,7 +354,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) { } for i := 0; i < numberOfAllowedSessionsPerConsumer; i++ { - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) for _, cs := range css { // get a session @@ -387,7 +387,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) { } func successfulSession(ctx context.Context, csm *ConsumerSessionManager, t *testing.T, p int, ch chan int) { - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) for _, cs := range css { @@ -400,7 +400,7 @@ func successfulSession(ctx context.Context, csm *ConsumerSessionManager, t *test } func failedSession(ctx context.Context, csm *ConsumerSessionManager, t *testing.T, p int, ch chan int) { - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) for _, cs := range css { @@ -518,7 +518,7 @@ func TestSessionFailureAndGetReportedProviders(t *testing.T) { pairingList := createPairingList("", true) err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. require.NoError(t, err) - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) for _, cs := range css { @@ -552,7 +552,7 @@ func TestSessionFailureEpochMisMatch(t *testing.T) { pairingList := createPairingList("", true) err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. require.NoError(t, err) - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) for _, cs := range css { @@ -573,7 +573,7 @@ func TestAllProvidersEndpointsDisabled(t *testing.T) { pairingList := createPairingList("", false) err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. require.NoError(t, err) - cs, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + cs, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.Nil(t, cs) require.Error(t, err) } @@ -613,7 +613,7 @@ func TestGetSession(t *testing.T) { pairingList := createPairingList("", true) err := csm.UpdateAllProviders(firstEpochHeight, pairingList) require.NoError(t, err) - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) require.NoError(t, err) for _, cs := range css { @@ -659,7 +659,7 @@ func TestPairingWithAddons(t *testing.T) { // block all providers initialProvidersLen := len(csm.getValidAddresses(addon, nil)) for i := 0; i < initialProvidersLen; i++ { - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.NO_STATE, 0) // get a session require.NoError(t, err, i) for _, cs := range css { err = csm.OnSessionFailure(cs.Session, ReportAndBlockProviderError) @@ -671,7 +671,7 @@ func TestPairingWithAddons(t *testing.T) { if addon != "" { require.NotEqual(t, csm.getValidAddresses(addon, nil), csm.getValidAddresses("", nil)) } - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.NO_STATE, 0) // get a session require.NoError(t, err) for _, cs := range css { err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false) @@ -734,7 +734,7 @@ func TestPairingWithExtensions(t *testing.T) { } initialProvidersLen := len(csm.getValidAddresses(extensionOpt.addon, extensionOpt.extensions)) for i := 0; i < initialProvidersLen; i++ { - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, extensionOpt.addon, extensionsList, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, extensionOpt.addon, extensionsList, common.NO_STATE, 0) // get a session require.NoError(t, err, i) for _, cs := range css { err = csm.OnSessionFailure(cs.Session, ReportAndBlockProviderError) @@ -746,7 +746,7 @@ func TestPairingWithExtensions(t *testing.T) { if len(extensionOpt.extensions) > 0 || extensionOpt.addon != "" { require.NotEqual(t, csm.getValidAddresses(extensionOpt.addon, extensionOpt.extensions), csm.getValidAddresses("", nil)) } - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, extensionOpt.addon, extensionsList, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, extensionOpt.addon, extensionsList, common.NO_STATE, 0) // get a session require.NoError(t, err) for _, cs := range css { err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false) @@ -762,7 +762,7 @@ func TestNoPairingsError(t *testing.T) { err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers. require.NoError(t, err) time.Sleep(5 * time.Millisecond) // let probes finish - _, err = csm.getValidProviderAddresses(map[string]struct{}{}, 10, 100, "invalid", nil, common.NOSTATE) + _, err = csm.getValidProviderAddresses(map[string]struct{}{}, 10, 100, "invalid", nil, common.NO_STATE) require.Error(t, err) require.True(t, PairingListEmptyError.Is(err)) } @@ -781,7 +781,7 @@ func TestPairingWithStateful(t *testing.T) { providerAddresses := csm.getValidAddresses(addon, nil) allProviders := len(providerAddresses) require.Equal(t, 10, allProviders) - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.CONSISTENCY_SELECT_ALLPROVIDERS, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.CONSISTENCY_SELECT_ALL_PROVIDERS, 0) // get a session require.NoError(t, err) require.Equal(t, allProviders, len(css)) for _, cs := range css { @@ -790,7 +790,7 @@ func TestPairingWithStateful(t *testing.T) { } usedProviders := NewUsedProviders(nil) usedProviders.RemoveUsed(providerAddresses[0], nil) - css, err = csm.GetSessions(ctx, cuForFirstRequest, usedProviders, servicedBlockNumber, addon, nil, common.CONSISTENCY_SELECT_ALLPROVIDERS, 0) // get a session + css, err = csm.GetSessions(ctx, cuForFirstRequest, usedProviders, servicedBlockNumber, addon, nil, common.CONSISTENCY_SELECT_ALL_PROVIDERS, 0) // get a session require.NoError(t, err) require.Equal(t, allProviders-1, len(css)) }) diff --git a/protocol/lavasession/end_to_end_lavasession_test.go b/protocol/lavasession/end_to_end_lavasession_test.go index 6cdd141aca..abc42fb087 100644 --- a/protocol/lavasession/end_to_end_lavasession_test.go +++ b/protocol/lavasession/end_to_end_lavasession_test.go @@ -28,7 +28,7 @@ func TestHappyFlowE2EEmergency(t *testing.T) { successfulRelays++ for i := 0; i < len(consumerVirtualEpochs); i++ { - css, err := csm.GetSessions(ctx, maxCuForVirtualEpoch, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, consumerVirtualEpochs[i]) // get a session + css, err := csm.GetSessions(ctx, maxCuForVirtualEpoch, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, consumerVirtualEpochs[i]) // get a session require.NoError(t, err) for _, cs := range css { @@ -92,7 +92,7 @@ func TestHappyFlowE2EEmergency(t *testing.T) { func TestHappyFlowEmergencyInConsumer(t *testing.T) { csm, psm, ctx := prepareSessionsWithFirstRelay(t, maxCuForVirtualEpoch) - css, err := csm.GetSessions(ctx, maxCuForVirtualEpoch, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, virtualEpoch) // get a session + css, err := csm.GetSessions(ctx, maxCuForVirtualEpoch, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, virtualEpoch) // get a session require.NoError(t, err) for _, cs := range css { @@ -157,7 +157,7 @@ func prepareSessionsWithFirstRelay(t *testing.T, cuForFirstRequest uint64) (*Con err := csm.UpdateAllProviders(epoch1, cswpList) // update the providers. require.NoError(t, err) // get single consumer session - css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session + css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session require.NoError(t, err) for _, cs := range css { diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index 7cf9682283..98b08393a0 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -30,7 +30,7 @@ const ( func NewRelayProcessor(ctx context.Context, usedProviders *lavasession.UsedProviders, requiredSuccesses int, chainMessage chainlib.ChainMessage, consumerConsistency *ConsumerConsistency, dappID string, consumerIp string) *RelayProcessor { guid, _ := utils.GetUniqueIdentifier(ctx) selection := Quorum // select the majority of node responses - if chainlib.GetStateful(chainMessage) == common.CONSISTENCY_SELECT_ALLPROVIDERS { + if chainlib.GetStateful(chainMessage) == common.CONSISTENCY_SELECT_ALL_PROVIDERS { selection = BestResult // select the majority of node successes } if requiredSuccesses <= 0 {