Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Prt v 3 1 11 near hf #1740

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestQoS(t *testing.T) {
currentLatency := time.Millisecond
expectedLatency := time.Millisecond
latestServicedBlock := expectedBH
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1, false)
require.Equal(t, uint64(1), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(1), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(1), singleConsumerSession.QoSInfo.SyncScoreSum)
Expand All @@ -292,7 +292,7 @@ func TestQoS(t *testing.T) {
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)

latestServicedBlock = expectedBH + 1
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1, false)
require.Equal(t, uint64(2), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(2), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(2), singleConsumerSession.QoSInfo.SyncScoreSum)
Expand All @@ -302,7 +302,7 @@ func TestQoS(t *testing.T) {
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)

singleConsumerSession.QoSInfo.TotalRelays++ // this is how we add a failure
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1)
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, 1, false)
require.Equal(t, uint64(3), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(4), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(3), singleConsumerSession.QoSInfo.SyncScoreSum)
Expand All @@ -313,7 +313,7 @@ func TestQoS(t *testing.T) {
require.Equal(t, sdk.OneDec(), singleConsumerSession.QoSInfo.LastQoSReport.Latency)

latestServicedBlock = expectedBH - 1 // is one block below threshold
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1)
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1, false)
require.Equal(t, uint64(4), singleConsumerSession.QoSInfo.AnsweredRelays)
require.Equal(t, uint64(5), singleConsumerSession.QoSInfo.TotalRelays)
require.Equal(t, int64(3), singleConsumerSession.QoSInfo.SyncScoreSum)
Expand All @@ -325,7 +325,7 @@ func TestQoS(t *testing.T) {
latestServicedBlock = expectedBH + 1
// add in a loop so availability goes above 95%
for i := 5; i < 100; i++ {
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1)
singleConsumerSession.CalculateQoS(currentLatency, expectedLatency*2, expectedBH-latestServicedBlock, numOfProviders, 1, false)
}
require.Equal(t, sdk.MustNewDecFromStr("0.8"), singleConsumerSession.QoSInfo.LastQoSReport.Availability) // because availability below 95% is 0
require.Equal(t, sdk.MustNewDecFromStr("0.989898989898989898"), singleConsumerSession.QoSInfo.LastQoSReport.Sync)
Expand Down
5 changes: 3 additions & 2 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,7 @@ func (csm *ConsumerSessionManager) OnSessionDone(
numOfProviders int,
providersCount uint64,
isHangingApi bool,
reduceAvailability bool,
) error {
// release locks, update CU, relaynum etc..
if err := consumerSession.VerifyLock(); err != nil {
Expand All @@ -1040,8 +1041,8 @@ func (csm *ConsumerSessionManager) OnSessionDone(
consumerSession.ConsecutiveErrors = []error{}
consumerSession.LatestBlock = latestServicedBlock // update latest serviced block
// calculate QoS
consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount))
go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock))
consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount), reduceAvailability)
go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock), reduceAvailability)
csm.updateMetricsManager(consumerSession, currentLatency, !isHangingApi) // apply latency only for non hanging apis
return nil
}
Expand Down
22 changes: 11 additions & 11 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestHappyFlow(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -416,7 +416,7 @@ func runOnSessionDoneForConsumerSessionMap(t *testing.T, css ConsumerSessionsMap
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err := csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err := csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestHappyFlowVirtualEpoch(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, maxCuForVirtualEpoch*(virtualEpoch+1))
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch*(virtualEpoch+1), time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch*(virtualEpoch+1), time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, maxCuForVirtualEpoch*(virtualEpoch+1))
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -484,7 +484,7 @@ func TestPairingReset(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -573,7 +573,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) {
require.Equal(t, epoch, csm.currentEpoch)

if rand.Intn(2) > 0 {
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, cs.CuSum, cuForFirstRequest)
require.Equal(t, cs.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -653,7 +653,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) {
for j := numberOfAllowedSessionsPerConsumer / 2; j < numberOfAllowedSessionsPerConsumer; j++ {
cs := sessionList[j].cs
if rand.Intn(2) > 0 {
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
require.Equal(t, sessionListData[j].cuSum+cuForFirstRequest, cs.CuSum)
require.Equal(t, cs.LatestRelayCu, latestRelayCuAfterDone)
Expand All @@ -676,7 +676,7 @@ func successfulSession(ctx context.Context, csm *ConsumerSessionManager, t *test
for _, cs := range css {
require.NotNil(t, cs)
time.Sleep(time.Duration((rand.Intn(500) + 1)) * time.Millisecond)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
ch <- p
}
Expand Down Expand Up @@ -957,7 +957,7 @@ func TestPairingWithAddons(t *testing.T) {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
}
})
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func TestPairingWithExtensions(t *testing.T) {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, extensionOpt.addon, extensionsList, common.NO_STATE, 0) // get a session
require.NoError(t, err)
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
}
})
Expand Down Expand Up @@ -1068,7 +1068,7 @@ func TestPairingWithStateful(t *testing.T) {
require.NoError(t, err)
require.Equal(t, allProviders, len(css))
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, false)
require.NoError(t, err)
}
usedProviders := NewUsedProviders(nil)
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type ConsumerSessionsMap map[string]*SessionInfo
type ProviderOptimizer interface {
AppendProbeRelayData(providerAddress string, latency time.Duration, success bool)
AppendRelayFailure(providerAddress string)
AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64)
AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64, reduceAvailability bool)
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int)
GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, *pairingtypes.QualityOfServiceReport)
Strategy() provideroptimizer.Strategy
Expand Down
4 changes: 2 additions & 2 deletions protocol/lavasession/end_to_end_lavasession_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestHappyFlowE2EEmergency(t *testing.T) {
err = psm.OnSessionDone(sps, cs.Session.RelayNum-skippedRelays)
require.NoError(t, err)

err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false, false)
require.NoError(t, err)
}

Expand Down Expand Up @@ -195,7 +195,7 @@ func prepareSessionsWithFirstRelay(t *testing.T, cuForFirstRequest uint64) (*Con
require.NoError(t, err)

// Consumer Side:
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false, false)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down
8 changes: 5 additions & 3 deletions protocol/lavasession/single_consumer_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ func (cs *SingleConsumerSession) getQosComputedResultOrZero() sdk.Dec {
return sdk.ZeroDec()
}

func (cs *SingleConsumerSession) CalculateQoS(latency, expectedLatency time.Duration, blockHeightDiff int64, numOfProviders int, servicersToCount int64) {
func (cs *SingleConsumerSession) CalculateQoS(latency, expectedLatency time.Duration, blockHeightDiff int64, numOfProviders int, servicersToCount int64, reduceAvailability bool) {
// Add current Session QoS
cs.QoSInfo.TotalRelays++ // increase total relays
cs.QoSInfo.AnsweredRelays++ // increase answered relays
cs.QoSInfo.TotalRelays++ // increase total relays
if !reduceAvailability { // incase we want to reduce availability to this provider due to some reason we skip answered.
cs.QoSInfo.AnsweredRelays++ // increase answered relays
}

if cs.QoSInfo.LastQoSReport == nil {
cs.QoSInfo.LastQoSReport = &pairingtypes.QualityOfServiceReport{}
Expand Down
10 changes: 5 additions & 5 deletions protocol/provideroptimizer/provider_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,18 @@ func (po *ProviderOptimizer) UpdateWeights(weights map[string]int64) {
}

func (po *ProviderOptimizer) AppendRelayFailure(providerAddress string) {
po.appendRelayData(providerAddress, 0, false, false, 0, 0, time.Now())
po.appendRelayData(providerAddress, 0, false, false, 0, 0, time.Now(), true)
}

func (po *ProviderOptimizer) AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64) {
po.appendRelayData(providerAddress, latency, isHangingApi, true, cu, syncBlock, time.Now())
func (po *ProviderOptimizer) AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64, reduceAvailability bool) {
po.appendRelayData(providerAddress, latency, isHangingApi, true, cu, syncBlock, time.Now(), reduceAvailability)
}

func (po *ProviderOptimizer) appendRelayData(providerAddress string, latency time.Duration, isHangingApi, success bool, cu, syncBlock uint64, sampleTime time.Time) {
func (po *ProviderOptimizer) appendRelayData(providerAddress string, latency time.Duration, isHangingApi, success bool, cu, syncBlock uint64, sampleTime time.Time, reduceAvailability bool) {
latestSync, timeSync := po.updateLatestSyncData(syncBlock, sampleTime)
providerData, _ := po.getProviderData(providerAddress)
halfTime := po.calculateHalfTime(providerAddress, sampleTime)
providerData = po.updateProbeEntryAvailability(providerData, success, RELAY_UPDATE_WEIGHT, halfTime, sampleTime)
providerData = po.updateProbeEntryAvailability(providerData, !reduceAvailability, RELAY_UPDATE_WEIGHT, halfTime, sampleTime)
if success {
if latency > 0 {
baseLatency := po.baseWorldLatency + common.BaseTimePerCU(cu)/2
Expand Down
Loading
Loading