Skip to content

Commit

Permalink
fix addUsed with len(0) resetting the wait
Browse files Browse the repository at this point in the history
  • Loading branch information
omerlavanet committed Mar 11, 2024
1 parent 8d0ff06 commit 0e225a3
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 18 deletions.
2 changes: 1 addition & 1 deletion protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
if !canSelect {
return nil, utils.LavaFormatError("failed getting sessions from used Providers", nil, utils.LogAttr("usedProviders", usedProviders), utils.LogAttr("endpoint", csm.rpcEndpoint))
}
defer func() { usedProviders.AddUsed(consumerSessionMap) }()
defer func() { usedProviders.AddUsed(consumerSessionMap, errRet) }()
initUnwantedProviders := usedProviders.GetUnwantedProvidersToSend()

extensionNames := common.GetExtensionNames(extensions)
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var AllowInsecureConnectionToProviders = false
type UsedProvidersInf interface {
RemoveUsed(providerAddress string, err error)
TryLockSelection(context.Context) bool
AddUsed(ConsumerSessionsMap)
AddUsed(ConsumerSessionsMap, error)
GetUnwantedProvidersToSend() map[string]struct{}
}

Expand Down
12 changes: 7 additions & 5 deletions protocol/lavasession/used_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,19 @@ func (up *UsedProviders) ClearUnwanted() {
up.unwantedProviders = map[string]struct{}{}
}

func (up *UsedProviders) AddUsed(sessions ConsumerSessionsMap) {
func (up *UsedProviders) AddUsed(sessions ConsumerSessionsMap, err error) {
if up == nil {
return
}
up.lock.Lock()
defer up.lock.Unlock()
// this is nil safe
up.sessionsLatestBatch = 0
for provider := range sessions { // the key for ConsumerSessionsMap is the provider public address
up.providers[provider] = struct{}{}
up.sessionsLatestBatch++
if len(sessions) > 0 && err == nil {
up.sessionsLatestBatch = 0
for provider := range sessions { // the key for ConsumerSessionsMap is the provider public address
up.providers[provider] = struct{}{}
up.sessionsLatestBatch++
}
}
up.selecting = false
}
Expand Down
6 changes: 3 additions & 3 deletions protocol/lavasession/used_providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestUsedProviders(t *testing.T) {
unwanted := usedProviders.GetUnwantedProvidersToSend()
require.Len(t, unwanted, 0)
consumerSessionsMap := ConsumerSessionsMap{"test": &SessionInfo{}, "test2": &SessionInfo{}}
usedProviders.AddUsed(consumerSessionsMap)
usedProviders.AddUsed(consumerSessionsMap, nil)
canUseAgain = usedProviders.tryLockSelection()
require.True(t, canUseAgain)
unwanted = usedProviders.GetUnwantedProvidersToSend()
Expand All @@ -32,7 +32,7 @@ func TestUsedProviders(t *testing.T) {
canUseAgain = usedProviders.tryLockSelection()
require.False(t, canUseAgain)
consumerSessionsMap = ConsumerSessionsMap{"test3": &SessionInfo{}, "test4": &SessionInfo{}}
usedProviders.AddUsed(consumerSessionsMap)
usedProviders.AddUsed(consumerSessionsMap, nil)
unwanted = usedProviders.GetUnwantedProvidersToSend()
require.Len(t, unwanted, 4)
require.Equal(t, 4, usedProviders.CurrentlyUsed())
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestUsedProvidersAsync(t *testing.T) {
go func() {
time.Sleep(time.Millisecond * 10)
consumerSessionsMap := ConsumerSessionsMap{"test": &SessionInfo{}, "test2": &SessionInfo{}}
usedProviders.AddUsed(consumerSessionsMap)
usedProviders.AddUsed(consumerSessionsMap, nil)
}()
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()
Expand Down
16 changes: 8 additions & 8 deletions protocol/rpcconsumer/relay_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestRelayProcessorHappyFlow(t *testing.T) {
require.Zero(t, usedProviders.CurrentlyUsed())
require.Zero(t, usedProviders.SessionsLatestBatch())
consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}}
usedProviders.AddUsed(consumerSessionsMap)
usedProviders.AddUsed(consumerSessionsMap, nil)
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
go sendSuccessResp(relayProcessor, "lava@test", time.Millisecond*5)
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestRelayProcessorTimeout(t *testing.T) {
require.Zero(t, usedProviders.CurrentlyUsed())
require.Zero(t, usedProviders.SessionsLatestBatch())
consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}}
usedProviders.AddUsed(consumerSessionsMap)
usedProviders.AddUsed(consumerSessionsMap, nil)
go func() {
time.Sleep(time.Millisecond * 5)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand All @@ -147,7 +147,7 @@ func TestRelayProcessorTimeout(t *testing.T) {
require.NoError(t, ctx.Err())
require.True(t, canUse)
consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test3": &lavasession.SessionInfo{}, "lava@test4": &lavasession.SessionInfo{}}
usedProviders.AddUsed(consumerSessionsMap)
usedProviders.AddUsed(consumerSessionsMap, nil)
}()
go sendSuccessResp(relayProcessor, "lava@test", time.Millisecond*20)
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*200)
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestRelayProcessorRetry(t *testing.T) {
require.Zero(t, usedProviders.CurrentlyUsed())
require.Zero(t, usedProviders.SessionsLatestBatch())
consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}}
usedProviders.AddUsed(consumerSessionsMap)
usedProviders.AddUsed(consumerSessionsMap, nil)

go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad"))
go sendSuccessResp(relayProcessor, "lava@test2", time.Millisecond*20)
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestRelayProcessorRetryNodeError(t *testing.T) {
require.Zero(t, usedProviders.CurrentlyUsed())
require.Zero(t, usedProviders.SessionsLatestBatch())
consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}}
usedProviders.AddUsed(consumerSessionsMap)
usedProviders.AddUsed(consumerSessionsMap, nil)

go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad"))
go sendNodeError(relayProcessor, "lava@test2", time.Millisecond*20)
Expand Down Expand Up @@ -278,7 +278,7 @@ func TestRelayProcessorStatefulApi(t *testing.T) {
require.Zero(t, usedProviders.CurrentlyUsed())
require.Zero(t, usedProviders.SessionsLatestBatch())
consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava4@test": &lavasession.SessionInfo{}, "lava3@test": &lavasession.SessionInfo{}, "lava@test": &lavasession.SessionInfo{}, "lava2@test": &lavasession.SessionInfo{}}
usedProviders.AddUsed(consumerSessionsMap)
usedProviders.AddUsed(consumerSessionsMap, nil)
go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad"))
go sendNodeError(relayProcessor, "lava2@test", time.Millisecond*20)
go sendNodeError(relayProcessor, "lava3@test", time.Millisecond*25)
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) {
require.Zero(t, usedProviders.CurrentlyUsed())
require.Zero(t, usedProviders.SessionsLatestBatch())
consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava4@test": &lavasession.SessionInfo{}, "lava3@test": &lavasession.SessionInfo{}, "lava@test": &lavasession.SessionInfo{}, "lava2@test": &lavasession.SessionInfo{}}
usedProviders.AddUsed(consumerSessionsMap)
usedProviders.AddUsed(consumerSessionsMap, nil)
go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad"))
go sendNodeError(relayProcessor, "lava2@test", time.Millisecond*20)
go sendNodeError(relayProcessor, "lava3@test", time.Millisecond*25)
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestRelayProcessorLatest(t *testing.T) {
require.Zero(t, usedProviders.SessionsLatestBatch())

consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}}
usedProviders.AddUsed(consumerSessionsMap)
usedProviders.AddUsed(consumerSessionsMap, nil)

go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad"))
go sendSuccessResp(relayProcessor, "lava@test2", time.Millisecond*20)
Expand Down

0 comments on commit 0e225a3

Please sign in to comment.