Skip to content

Commit

Permalink
Merge branch 'PRT-improve-timout-handling' of github.com:lavanet/lava…
Browse files Browse the repository at this point in the history
… into PRT-improve-timout-handling
  • Loading branch information
omerlavanet committed Mar 11, 2024
2 parents 0e225a3 + 511d2be commit 888db6a
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 34 deletions.
2 changes: 1 addition & 1 deletion protocol/chainlib/chain_message_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package chainlib
import "github.com/lavanet/lava/protocol/common"

func ShouldSendToAllProviders(chainMessage ChainMessage) bool {
return chainMessage.GetApi().Category.Stateful == common.CONSISTENCY_SELECT_ALLPROVIDERS
return chainMessage.GetApi().Category.Stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS
}

func GetAddon(chainMessage ChainMessageForSend) string {
Expand Down
4 changes: 2 additions & 2 deletions protocol/common/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
)

const (
CONSISTENCY_SELECT_ALLPROVIDERS = 1
NOSTATE = 0
CONSISTENCY_SELECT_ALL_PROVIDERS = 1
NO_STATE = 0
)

func GetExtensionNames(extensionCollection []*spectypes.Extension) (extensions []string) {
Expand Down
2 changes: 1 addition & 1 deletion protocol/common/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type TimeoutInfo struct {

func GetTimeoutForProcessing(relayTimeout time.Duration, timeoutInfo TimeoutInfo) time.Duration {
ctxTimeout := DefaultTimeout
if timeoutInfo.Hanging || timeoutInfo.CU > 100 || timeoutInfo.Stateful == CONSISTENCY_SELECT_ALLPROVIDERS {
if timeoutInfo.Hanging || timeoutInfo.CU > 100 || timeoutInfo.Stateful == CONSISTENCY_SELECT_ALL_PROVIDERS {
ctxTimeout = DefaultTimeoutLong
}
if relayTimeout > ctxTimeout {
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (csm *ConsumerSessionManager) getValidProviderAddresses(ignoredProvidersLis
}
}
var providers []string
if stateful == common.CONSISTENCY_SELECT_ALLPROVIDERS && csm.providerOptimizer.Strategy() != provideroptimizer.STRATEGY_COST {
if stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS && csm.providerOptimizer.Strategy() != provideroptimizer.STRATEGY_COST {
providers = GetAllProviders(validAddresses, ignoredProvidersList)
} else {
providers = csm.providerOptimizer.ChooseProvider(validAddresses, ignoredProvidersList, cu, requestedBlock, OptimizerPerturbation)
Expand Down
50 changes: 25 additions & 25 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestHappyFlow(t *testing.T) {
pairingList := createPairingList("", true)
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css {
Expand All @@ -161,7 +161,7 @@ func TestHappyFlowVirtualEpoch(t *testing.T) {
pairingList := createPairingList("", true)
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)
css, err := csm.GetSessions(ctx, maxCuForVirtualEpoch*(virtualEpoch+1), NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, virtualEpoch) // get a session
css, err := csm.GetSessions(ctx, maxCuForVirtualEpoch*(virtualEpoch+1), NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, virtualEpoch) // get a session
require.NoError(t, err)

for _, cs := range css {
Expand All @@ -185,7 +185,7 @@ func TestVirtualEpochWithFailure(t *testing.T) {
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)

_, err = csm.GetSessions(ctx, maxCuForVirtualEpoch*(virtualEpoch+1)+10, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, virtualEpoch) // get a session
_, err = csm.GetSessions(ctx, maxCuForVirtualEpoch*(virtualEpoch+1)+10, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, virtualEpoch) // get a session
require.Error(t, err)
}

Expand All @@ -195,8 +195,8 @@ func TestPairingReset(t *testing.T) {
pairingList := createPairingList("", true)
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)
csm.validAddresses = []string{} // set valid addresses to zero
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
csm.validAddresses = []string{} // set valid addresses to zero
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)
require.Equal(t, len(csm.validAddresses), len(csm.pairingAddresses))

Expand Down Expand Up @@ -225,7 +225,7 @@ func TestPairingResetWithFailures(t *testing.T) {
if len(csm.validAddresses) == 0 { // wait for all pairings to be blocked.
break
}
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css {
Expand All @@ -234,7 +234,7 @@ func TestPairingResetWithFailures(t *testing.T) {
}
}
require.Equal(t, len(csm.validAddresses), 0)
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)
require.Equal(t, len(csm.validAddresses), len(csm.pairingAddresses))

Expand All @@ -259,7 +259,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) {
if len(csm.validAddresses) == 0 { // wait for all pairings to be blocked.
break
}
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session

for _, cs := range css {
err = csm.OnSessionFailure(cs.Session, nil)
Expand All @@ -271,7 +271,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) {
}
}
require.Equal(t, len(csm.validAddresses), 0)
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)
require.Equal(t, len(csm.validAddresses), len(csm.pairingAddresses))

Expand All @@ -283,7 +283,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) {
}
}

css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css {
Expand Down Expand Up @@ -318,7 +318,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) {
sessionList := make([]session, numberOfAllowedSessionsPerConsumer)
sessionListData := make([]SessTestData, numberOfAllowedSessionsPerConsumer)
for i := 0; i < numberOfAllowedSessionsPerConsumer; i++ {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css { // get a session
Expand Down Expand Up @@ -354,7 +354,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) {
}

for i := 0; i < numberOfAllowedSessionsPerConsumer; i++ {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css { // get a session
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) {
}

func successfulSession(ctx context.Context, csm *ConsumerSessionManager, t *testing.T, p int, ch chan int) {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css {
Expand All @@ -400,7 +400,7 @@ func successfulSession(ctx context.Context, csm *ConsumerSessionManager, t *test
}

func failedSession(ctx context.Context, csm *ConsumerSessionManager, t *testing.T, p int, ch chan int) {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css {
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestSessionFailureAndGetReportedProviders(t *testing.T) {
pairingList := createPairingList("", true)
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css {
Expand Down Expand Up @@ -552,7 +552,7 @@ func TestSessionFailureEpochMisMatch(t *testing.T) {
pairingList := createPairingList("", true)
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css {
Expand All @@ -573,7 +573,7 @@ func TestAllProvidersEndpointsDisabled(t *testing.T) {
pairingList := createPairingList("", false)
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)
cs, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
cs, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.Nil(t, cs)
require.Error(t, err)
}
Expand Down Expand Up @@ -613,7 +613,7 @@ func TestGetSession(t *testing.T) {
pairingList := createPairingList("", true)
err := csm.UpdateAllProviders(firstEpochHeight, pairingList)
require.NoError(t, err)
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0)
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0)
require.NoError(t, err)

for _, cs := range css {
Expand Down Expand Up @@ -659,7 +659,7 @@ func TestPairingWithAddons(t *testing.T) {
// block all providers
initialProvidersLen := len(csm.getValidAddresses(addon, nil))
for i := 0; i < initialProvidersLen; i++ {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.NO_STATE, 0) // get a session
require.NoError(t, err, i)
for _, cs := range css {
err = csm.OnSessionFailure(cs.Session, ReportAndBlockProviderError)
Expand All @@ -671,7 +671,7 @@ func TestPairingWithAddons(t *testing.T) {
if addon != "" {
require.NotEqual(t, csm.getValidAddresses(addon, nil), csm.getValidAddresses("", nil))
}
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
Expand Down Expand Up @@ -734,7 +734,7 @@ func TestPairingWithExtensions(t *testing.T) {
}
initialProvidersLen := len(csm.getValidAddresses(extensionOpt.addon, extensionOpt.extensions))
for i := 0; i < initialProvidersLen; i++ {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, extensionOpt.addon, extensionsList, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, extensionOpt.addon, extensionsList, common.NO_STATE, 0) // get a session
require.NoError(t, err, i)
for _, cs := range css {
err = csm.OnSessionFailure(cs.Session, ReportAndBlockProviderError)
Expand All @@ -746,7 +746,7 @@ func TestPairingWithExtensions(t *testing.T) {
if len(extensionOpt.extensions) > 0 || extensionOpt.addon != "" {
require.NotEqual(t, csm.getValidAddresses(extensionOpt.addon, extensionOpt.extensions), csm.getValidAddresses("", nil))
}
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, extensionOpt.addon, extensionsList, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, extensionOpt.addon, extensionsList, common.NO_STATE, 0) // get a session
require.NoError(t, err)
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
Expand All @@ -762,7 +762,7 @@ func TestNoPairingsError(t *testing.T) {
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)
time.Sleep(5 * time.Millisecond) // let probes finish
_, err = csm.getValidProviderAddresses(map[string]struct{}{}, 10, 100, "invalid", nil, common.NOSTATE)
_, err = csm.getValidProviderAddresses(map[string]struct{}{}, 10, 100, "invalid", nil, common.NO_STATE)
require.Error(t, err)
require.True(t, PairingListEmptyError.Is(err))
}
Expand All @@ -781,7 +781,7 @@ func TestPairingWithStateful(t *testing.T) {
providerAddresses := csm.getValidAddresses(addon, nil)
allProviders := len(providerAddresses)
require.Equal(t, 10, allProviders)
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.CONSISTENCY_SELECT_ALLPROVIDERS, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.CONSISTENCY_SELECT_ALL_PROVIDERS, 0) // get a session
require.NoError(t, err)
require.Equal(t, allProviders, len(css))
for _, cs := range css {
Expand All @@ -790,7 +790,7 @@ func TestPairingWithStateful(t *testing.T) {
}
usedProviders := NewUsedProviders(nil)
usedProviders.RemoveUsed(providerAddresses[0], nil)
css, err = csm.GetSessions(ctx, cuForFirstRequest, usedProviders, servicedBlockNumber, addon, nil, common.CONSISTENCY_SELECT_ALLPROVIDERS, 0) // get a session
css, err = csm.GetSessions(ctx, cuForFirstRequest, usedProviders, servicedBlockNumber, addon, nil, common.CONSISTENCY_SELECT_ALL_PROVIDERS, 0) // get a session
require.NoError(t, err)
require.Equal(t, allProviders-1, len(css))
})
Expand Down
6 changes: 3 additions & 3 deletions protocol/lavasession/end_to_end_lavasession_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestHappyFlowE2EEmergency(t *testing.T) {
successfulRelays++

for i := 0; i < len(consumerVirtualEpochs); i++ {
css, err := csm.GetSessions(ctx, maxCuForVirtualEpoch, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, consumerVirtualEpochs[i]) // get a session
css, err := csm.GetSessions(ctx, maxCuForVirtualEpoch, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, consumerVirtualEpochs[i]) // get a session
require.NoError(t, err)

for _, cs := range css {
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestHappyFlowE2EEmergency(t *testing.T) {
func TestHappyFlowEmergencyInConsumer(t *testing.T) {
csm, psm, ctx := prepareSessionsWithFirstRelay(t, maxCuForVirtualEpoch)

css, err := csm.GetSessions(ctx, maxCuForVirtualEpoch, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, virtualEpoch) // get a session
css, err := csm.GetSessions(ctx, maxCuForVirtualEpoch, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, virtualEpoch) // get a session
require.NoError(t, err)

for _, cs := range css {
Expand Down Expand Up @@ -157,7 +157,7 @@ func prepareSessionsWithFirstRelay(t *testing.T, cuForFirstRequest uint64) (*Con
err := csm.UpdateAllProviders(epoch1, cswpList) // update the providers.
require.NoError(t, err)
// get single consumer session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NOSTATE, 0) // get a session
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css {
Expand Down
2 changes: 1 addition & 1 deletion protocol/rpcconsumer/relay_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
func NewRelayProcessor(ctx context.Context, usedProviders *lavasession.UsedProviders, requiredSuccesses int, chainMessage chainlib.ChainMessage, consumerConsistency *ConsumerConsistency, dappID string, consumerIp string) *RelayProcessor {
guid, _ := utils.GetUniqueIdentifier(ctx)
selection := Quorum // select the majority of node responses
if chainlib.GetStateful(chainMessage) == common.CONSISTENCY_SELECT_ALLPROVIDERS {
if chainlib.GetStateful(chainMessage) == common.CONSISTENCY_SELECT_ALL_PROVIDERS {
selection = BestResult // select the majority of node successes
}
if requiredSuccesses <= 0 {
Expand Down

0 comments on commit 888db6a

Please sign in to comment.