diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index d5124bbd6a..a320d222e3 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -324,7 +324,7 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS if !canSelect { return nil, utils.LavaFormatError("failed getting sessions from used Providers", nil, utils.LogAttr("usedProviders", usedProviders), utils.LogAttr("endpoint", csm.rpcEndpoint)) } - defer func() { usedProviders.AddUsed(consumerSessionMap) }() + defer func() { usedProviders.AddUsed(consumerSessionMap, errRet) }() initUnwantedProviders := usedProviders.GetUnwantedProvidersToSend() extensionNames := common.GetExtensionNames(extensions) diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index 1b5c64438b..18a578844f 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -24,7 +24,7 @@ var AllowInsecureConnectionToProviders = false type UsedProvidersInf interface { RemoveUsed(providerAddress string, err error) TryLockSelection(context.Context) bool - AddUsed(ConsumerSessionsMap) + AddUsed(ConsumerSessionsMap, error) GetUnwantedProvidersToSend() map[string]struct{} } diff --git a/protocol/lavasession/used_providers.go b/protocol/lavasession/used_providers.go index 342e216ab4..21d0e2a898 100644 --- a/protocol/lavasession/used_providers.go +++ b/protocol/lavasession/used_providers.go @@ -100,17 +100,19 @@ func (up *UsedProviders) ClearUnwanted() { up.unwantedProviders = map[string]struct{}{} } -func (up *UsedProviders) AddUsed(sessions ConsumerSessionsMap) { +func (up *UsedProviders) AddUsed(sessions ConsumerSessionsMap, err error) { if up == nil { return } up.lock.Lock() defer up.lock.Unlock() // this is nil safe - up.sessionsLatestBatch = 0 - for provider := range sessions { // the key for ConsumerSessionsMap is the provider public address - up.providers[provider] = struct{}{} - up.sessionsLatestBatch++ + if len(sessions) > 0 && err == nil { + up.sessionsLatestBatch = 0 + for provider := range sessions { // the key for ConsumerSessionsMap is the provider public address + up.providers[provider] = struct{}{} + up.sessionsLatestBatch++ + } } up.selecting = false } diff --git a/protocol/lavasession/used_providers_test.go b/protocol/lavasession/used_providers_test.go index f2f2557b25..042394b4e5 100644 --- a/protocol/lavasession/used_providers_test.go +++ b/protocol/lavasession/used_providers_test.go @@ -23,7 +23,7 @@ func TestUsedProviders(t *testing.T) { unwanted := usedProviders.GetUnwantedProvidersToSend() require.Len(t, unwanted, 0) consumerSessionsMap := ConsumerSessionsMap{"test": &SessionInfo{}, "test2": &SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap) + usedProviders.AddUsed(consumerSessionsMap, nil) canUseAgain = usedProviders.tryLockSelection() require.True(t, canUseAgain) unwanted = usedProviders.GetUnwantedProvidersToSend() @@ -32,7 +32,7 @@ func TestUsedProviders(t *testing.T) { canUseAgain = usedProviders.tryLockSelection() require.False(t, canUseAgain) consumerSessionsMap = ConsumerSessionsMap{"test3": &SessionInfo{}, "test4": &SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap) + usedProviders.AddUsed(consumerSessionsMap, nil) unwanted = usedProviders.GetUnwantedProvidersToSend() require.Len(t, unwanted, 4) require.Equal(t, 4, usedProviders.CurrentlyUsed()) @@ -68,7 +68,7 @@ func TestUsedProvidersAsync(t *testing.T) { go func() { time.Sleep(time.Millisecond * 10) consumerSessionsMap := ConsumerSessionsMap{"test": &SessionInfo{}, "test2": &SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap) + usedProviders.AddUsed(consumerSessionsMap, nil) }() ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100) defer cancel() diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index 37f74646da..5396286353 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -96,7 +96,7 @@ func TestRelayProcessorHappyFlow(t *testing.T) { require.Zero(t, usedProviders.CurrentlyUsed()) require.Zero(t, usedProviders.SessionsLatestBatch()) consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap) + usedProviders.AddUsed(consumerSessionsMap, nil) ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() go sendSuccessResp(relayProcessor, "lava@test", time.Millisecond*5) @@ -138,7 +138,7 @@ func TestRelayProcessorTimeout(t *testing.T) { require.Zero(t, usedProviders.CurrentlyUsed()) require.Zero(t, usedProviders.SessionsLatestBatch()) consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap) + usedProviders.AddUsed(consumerSessionsMap, nil) go func() { time.Sleep(time.Millisecond * 5) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) @@ -147,7 +147,7 @@ func TestRelayProcessorTimeout(t *testing.T) { require.NoError(t, ctx.Err()) require.True(t, canUse) consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test3": &lavasession.SessionInfo{}, "lava@test4": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap) + usedProviders.AddUsed(consumerSessionsMap, nil) }() go sendSuccessResp(relayProcessor, "lava@test", time.Millisecond*20) ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*200) @@ -190,7 +190,7 @@ func TestRelayProcessorRetry(t *testing.T) { require.Zero(t, usedProviders.CurrentlyUsed()) require.Zero(t, usedProviders.SessionsLatestBatch()) consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap) + usedProviders.AddUsed(consumerSessionsMap, nil) go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) go sendSuccessResp(relayProcessor, "lava@test2", time.Millisecond*20) @@ -234,7 +234,7 @@ func TestRelayProcessorRetryNodeError(t *testing.T) { require.Zero(t, usedProviders.CurrentlyUsed()) require.Zero(t, usedProviders.SessionsLatestBatch()) consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap) + usedProviders.AddUsed(consumerSessionsMap, nil) go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) go sendNodeError(relayProcessor, "lava@test2", time.Millisecond*20) @@ -278,7 +278,7 @@ func TestRelayProcessorStatefulApi(t *testing.T) { require.Zero(t, usedProviders.CurrentlyUsed()) require.Zero(t, usedProviders.SessionsLatestBatch()) consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava4@test": &lavasession.SessionInfo{}, "lava3@test": &lavasession.SessionInfo{}, "lava@test": &lavasession.SessionInfo{}, "lava2@test": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap) + usedProviders.AddUsed(consumerSessionsMap, nil) go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) go sendNodeError(relayProcessor, "lava2@test", time.Millisecond*20) go sendNodeError(relayProcessor, "lava3@test", time.Millisecond*25) @@ -323,7 +323,7 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) { require.Zero(t, usedProviders.CurrentlyUsed()) require.Zero(t, usedProviders.SessionsLatestBatch()) consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava4@test": &lavasession.SessionInfo{}, "lava3@test": &lavasession.SessionInfo{}, "lava@test": &lavasession.SessionInfo{}, "lava2@test": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap) + usedProviders.AddUsed(consumerSessionsMap, nil) go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) go sendNodeError(relayProcessor, "lava2@test", time.Millisecond*20) go sendNodeError(relayProcessor, "lava3@test", time.Millisecond*25) @@ -368,7 +368,7 @@ func TestRelayProcessorLatest(t *testing.T) { require.Zero(t, usedProviders.SessionsLatestBatch()) consumerSessionsMap := lavasession.ConsumerSessionsMap{"lava@test": &lavasession.SessionInfo{}, "lava@test2": &lavasession.SessionInfo{}} - usedProviders.AddUsed(consumerSessionsMap) + usedProviders.AddUsed(consumerSessionsMap, nil) go sendProtocolError(relayProcessor, "lava@test", time.Millisecond*5, fmt.Errorf("bad")) go sendSuccessResp(relayProcessor, "lava@test2", time.Millisecond*20)